对于线程池

线程池就是首先创立一些线程,它们的汇合称为线程池。应用线程池能够很好地进步性能,线程池在系统启动时即创立大量闲暇的线程,程序将一个工作传给线程池,线程池就会启动一条线程来执行这个工作,执行完结当前,该线程并不会死亡,而是再次返回线程池中成为闲暇状态,期待执行下一个工作

为什么要应用线程池

多线程运行工夫,零碎一直的启动和敞开新线程,老本十分高,会过渡耗费系统资源,以及过渡切换线程的危险,从而可能导致系统资源的解体。这时,线程池就是最好的抉择了

ThreadPoolExecutor类

Java外面线程池的顶级接口是Executor,然而严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService

ExecutorService真正的线程池接口。
ScheduledExecutorService能和Timer/TimerTask相似,解决那些须要工作反复执行的问题。
ThreadPoolExecutorExecutorService的默认实现。
ScheduledThreadPoolExecutor继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。

ThreadPoolExecutor

咱们来看一下ThreadPoolExecutor的具体实现:

在ThreadPoolExecutor中有四个构造方法

public class ThreadPoolExecutor extends AbstractExecutorService {        /**     * Core pool size is the minimum number of workers to keep alive    外围线程数量是维持线程池存活的最小数量,而且不容许超时,除非设置allowCoreThreadTimeOut,在这种状况下最小值为0     * (and not allow to time out etc) unless allowCoreThreadTimeOut     * is set, in which case the minimum is zero.     */    private volatile int corePoolSize;        /**     * Maximum pool size. Note that the actual maximum is internally    最大线程数量,留神,理论最大数量受容量限度     * bounded by CAPACITY.     */    private volatile int maximumPoolSize;        /**     * The queue used for holding tasks and handing off to worker        用于保留工作和移交给工作线程         * threads.  We do not require that workQueue.poll() returning             * null necessarily means that workQueue.isEmpty(), so rely     * solely on isEmpty to see if the queue is empty (which we must     * do for example when deciding whether to transition from     * SHUTDOWN to TIDYING).  This accommodates special-purpose     * queues such as DelayQueues for which poll() is allowed to     * return null even if it may later return non-null when delays     * expire.     */    private final BlockingQueue<Runnable> workQueue;        /**     * Timeout in nanoseconds for idle threads waiting for work.        闲暇线程的期待超时工夫,当线程数量超过corePoolSize或者allowCoreThreadTimeOut时应用,否则永远期待新的工作     * Threads use this timeout when there are more than corePoolSize     * present or if allowCoreThreadTimeOut. Otherwise they wait     * forever for new work.     */    private volatile long keepAliveTime;        /**     * Factory for new threads. All threads are created using this        创立线程的工厂     * factory (via method addWorker).  All callers must be prepared     * for addWorker to fail, which may reflect a system or user's     * policy limiting the number of threads.  Even though it is not     * treated as an error, failure to create threads may result in     * new tasks being rejected or existing ones remaining stuck in     * the queue.     *     * We go further and preserve pool invariants even in the face of     * errors such as OutOfMemoryError, that might be thrown while     * trying to create threads.  Such errors are rather common due to     * the need to allocate a native stack in Thread.start, and users     * will want to perform clean pool shutdown to clean up.  There     * will likely be enough memory available for the cleanup code to     * complete without encountering yet another OutOfMemoryError.     */    private volatile ThreadFactory threadFactory;        /**     * Handler called when saturated or shutdown in execute.        回绝策略,线程饱和或敞开时调用的处理程序     */    private volatile RejectedExecutionHandler handler;            //应用指定参数创立线程池,应用默认线程工厂和回绝策略    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,             Executors.defaultThreadFactory(), defaultHandler);    }    //应用指定参数创立线程池,应用默认回绝策略    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,             threadFactory, defaultHandler);    }    //应用指定参数创立线程池,应用默认线程工厂    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              RejectedExecutionHandler handler) {        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,             Executors.defaultThreadFactory(), handler);    }        //应用指定参数创立线程池    public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory,                              RejectedExecutionHandler handler) {        if (corePoolSize < 0 ||            maximumPoolSize <= 0 ||            maximumPoolSize < corePoolSize ||            keepAliveTime < 0)            throw new IllegalArgumentException();        if (workQueue == null || threadFactory == null || handler == null)            throw new NullPointerException();        this.corePoolSize = corePoolSize;        this.maximumPoolSize = maximumPoolSize;        this.workQueue = workQueue;        this.keepAliveTime = unit.toNanos(keepAliveTime);        this.threadFactory = threadFactory;        this.handler = handler;    }}
  • corePoolSize:外围池的大小,这个参数跟前面讲述的线程池的实现原理有十分大的关系。在创立了线程池后,默认状况下,线程池中并没有任何线程,而是期待有工作到来才创立线程去执行工作,除非调用了prestartAllCoreThreads()或者prestartCoreThread()办法,从这2个办法的名字就能够看出,是预创立线程的意思,即在没有工作到来之前就创立corePoolSize个线程或者一个线程。默认状况下,在创立了线程池后,线程池中的线程数为0,当有工作来之后,就会创立一个线程去执行工作,当线程池中的线程数目达到corePoolSize后,就会把达到的工作放到缓存队列当中;
  • maximumPoolSize:线程池最大线程数,这个参数也是一个十分重要的参数,它示意在线程池中最多能创立多少个线程;
  • keepAliveTime:示意线程没有工作执行时最多放弃多久工夫会终止。默认状况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程闲暇的工夫达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。然而如果调用了allowCoreThreadTimeOut(boolean)办法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • unit:参数keepAliveTime的工夫单位
  • workQueue:一个阻塞队列,用来存储期待执行的工作,这个参数的抉择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种抉择

    ArrayBlockingQueue;LinkedBlockingQueue;SynchronousQueue;
  • threadFactory:线程工厂,次要用来创立线程;
  • handler:示意当回绝解决工作时的策略,有以下四种取值:

      ThreadPoolExecutor.AbortPolicy:抛弃工作并抛出RejectedExecutionException异样。   ThreadPoolExecutor.DiscardPolicy:也是抛弃工作,然而不抛出异样。   ThreadPoolExecutor.DiscardOldestPolicy:抛弃队列最后面的工作,而后从新尝试执行工作(反复此过程)  ThreadPoolExecutor.CallerRunsPolicy:由调用线程解决该工作 

ExecutorService

ThreadPoolExecutor继承自AbstractExecutorService,AbstractExecutorService而实现了ExecutorService,ExecutorService实现了Executor

public interface Executor {    /**     * Executes the given command at some time in the future.  The command     * may execute in a new thread, in a pooled thread, or in the calling     * thread, at the discretion of the {@code Executor} implementation.     *     * @param command the runnable task     * @throws RejectedExecutionException if this task cannot be     * accepted for execution     * @throws NullPointerException if command is null     */    void execute(Runnable command);}

Executor是顶层接口,外面定义了execute(Runnable)办法,返回类型为void,用来执行传入的工作

ExecutorService实现了Executor并申明了一些办法:submit、invokeAll、invokeAny以及shutDown等

AbstractExecutorService实现了ExecutorService并对其中定义的办法做了实现

ThreadPoolExecutor继承自AbstractExecutorService

线程池的实现

线程状态

    // runState is stored in the high-order bits    private static final int RUNNING    = -1 << COUNT_BITS;    private static final int SHUTDOWN   =  0 << COUNT_BITS;    private static final int STOP       =  1 << COUNT_BITS;    private static final int TIDYING    =  2 << COUNT_BITS;    private static final int TERMINATED =  3 << COUNT_BITS;

当创立线程池后,初始时,线程池处于RUNNING状态;

  如果调用了shutdown()办法,则线程池处于SHUTDOWN状态,此时线程池不可能承受新的工作,它会期待所有工作执行结束;

  如果调用了shutdownNow()办法,则线程池处于STOP状态,此时线程池不能承受新的工作,并且会去尝试终止正在执行的工作;

  当线程池处于SHUTDOWN或STOP状态,并且所有工作线程曾经销毁,工作缓存队列曾经清空或执行完结后,线程池被设置为TERMINATED状态。

private final BlockingQueue<Runnable> workQueue;              //工作缓存队列,用来寄存期待执行的工作private final ReentrantLock mainLock = new ReentrantLock();   //线程池的次要状态锁,对线程池状态(比方线程池大小                                                              //、runState等)的扭转都要应用这个锁private final HashSet<Worker> workers = new HashSet<Worker>();  //用来寄存工作集 private volatile long  keepAliveTime;    //线程存货工夫   private volatile boolean allowCoreThreadTimeOut;   //是否容许为外围线程设置存活工夫private volatile int   corePoolSize;     //外围池的大小(即线程池中的线程数目大于这个参数时,提交的工作会被放进工作缓存队列)private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数 private volatile int   poolSize;       //线程池中以后的线程数 private volatile RejectedExecutionHandler handler; //工作回绝策略 private volatile ThreadFactory threadFactory;   //线程工厂,用来创立线程 private int largestPoolSize;   //用来记录线程池中已经呈现过的最大线程数 private long completedTaskCount;   //用来记录曾经执行结束的工作个数

举个例子:一个房间能够装15集体,目前有10集体,每个人同时只能做一件事件,那么只有10集体中有闲暇的就能够承受新的工作,如果没有闲暇的那新的工作就要排队期待,如果新增的工作越来越多,那就要思考减少人数到15个,如果还是不够就要思考是否要回绝新的工作或者放弃之前的工作了,当15集体有闲暇的,那就又须要思考缩小人数,因为要发工资的

那么在这个例子中10集体就是corePoolSize,15就是maximumPoolSize,workQueue是没有闲暇人时的期待队列,回绝或者放弃之前工作就是handler,人的闲暇工夫就是keepAliveTime,如果人数超过corePoolSize或者设置了allowCoreThreadTimeOut,那么工夫超过keepAliveTime后就要缩小人数

execute

ThreadPoolExecutor中最外围的办法就是execute

    public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        /*         * Proceed in 3 steps:         *         * 1. If fewer than corePoolSize threads are running, try to        如果正在运行的线程少于corePoolSize线程,尝试应用给定命令作为其第一个工作来启动新线程。 对addWorker的调用从原子性上查看runState和workerCount,         * start a new thread with the given command as its first            通过返回false来避免在不应该增加线程的状况下收回虚伪警报。         * task.  The call to addWorker atomically checks runState and         * workerCount, and so prevents false alarms that would add         * threads when it shouldn't, by returning false.         *         * 2. If a task can be successfully queued, then we still need    如果工作能够胜利排队,那么咱们依然须要仔细检查是否应该增加线程(因为现有线程自上次查看后就死掉了)或自从进入此办法以来该池已敞开。 因而,咱们从新查看状态,         * to double-check whether we should have added a thread        并在必要时回滚队列(如果已进行),或者在没有线程的状况下启动新线程。         * (because existing ones died since last checking) or that         * the pool shut down since entry into this method. So we         * recheck state and if necessary roll back the enqueuing if         * stopped, or start a new thread if there are none.         *         * 3. If we cannot queue task, then we try to add a new        如果咱们无奈将工作排队,则尝试增加一个新线程。 如果失败,咱们晓得咱们已敞开或已饱和,因而回绝该工作。         * thread.  If it fails, we know we are shut down or saturated         * and so reject the task.         */        int c = ctl.get();//ctl是一个AtomicInteger参数,用于判断线程状态        //判断工作线程数量是否小于外围线程数量,如果小于就减少工作线程数量        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }        //如果工作能够胜利排队        //如果线程正在运行,就尝试将工作增加进缓存队列中        //此时获取到的是一个闲暇的线程,线程运行中并且工作增加缓存队列胜利        if (isRunning(c) && workQueue.offer(command)) {            //此时获取到的是一个闲暇的线程,须要再次获取线程状态            int recheck = ctl.get();            //判断闲暇线程状态,如果线程不是running状态且工作曾经remove就执行回绝策略            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        //如果无奈从新排队        //增加失败        else if (!addWorker(command, false))            //执行回绝策略            reject(command);    }        
    //增加工作    firstTask 工作    core    是否退出外围线程    private boolean addWorker(Runnable firstTask, boolean core) {        retry:        //自旋期待        for (;;) {            int c = ctl.get();            //获取线执行状态            int rs = runStateOf(c);            // Check if queue empty only if necessary.            if (rs >= SHUTDOWN &&                ! (rs == SHUTDOWN &&                   firstTask == null &&                   ! workQueue.isEmpty()))                return false;            for (;;) {                int wc = workerCountOf(c);                if (wc >= CAPACITY ||                    wc >= (core ? corePoolSize : maximumPoolSize))                    return false;                if (compareAndIncrementWorkerCount(c))                    break retry;                c = ctl.get();  // Re-read ctl                if (runStateOf(c) != rs)                    continue retry;                // else CAS failed due to workerCount change; retry inner loop            }        }        boolean workerStarted = false;        boolean workerAdded = false;        Worker w = null;        try {            w = new Worker(firstTask);            final Thread t = w.thread;            if (t != null) {                //锁定                final ReentrantLock mainLock = this.mainLock;                mainLock.lock();                try {                    // Recheck while holding lock.                    // Back out on ThreadFactory failure or if                    // shut down before lock acquired.                    int rs = runStateOf(ctl.get());                    if (rs < SHUTDOWN ||                        (rs == SHUTDOWN && firstTask == null)) {                        if (t.isAlive()) // precheck that t is startable                            throw new IllegalThreadStateException();                        workers.add(w);                        int s = workers.size();                        if (s > largestPoolSize)                            largestPoolSize = s;                        workerAdded = true;                    }                } finally {                    mainLock.unlock();                }                if (workerAdded) {                    t.start();                    workerStarted = true;                }            }        } finally {            if (! workerStarted)                addWorkerFailed(w);        }        return workerStarted;    }

Worker

private final class Worker        extends AbstractQueuedSynchronizer        implements Runnable    {        /**         * This class will never be serialized, but we provide a         * serialVersionUID to suppress a javac warning.         */        private static final long serialVersionUID = 6138294804551838833L;        /** Thread this worker is running in.  Null if factory fails. */        final Thread thread;        /** Initial task to run.  Possibly null. */        Runnable firstTask;        /** Per-thread task counter */        volatile long completedTasks;        /**         * Creates with given first task and thread from ThreadFactory.         * @param firstTask the first task (null if none)         */        Worker(Runnable firstTask) {            setState(-1); // inhibit interrupts until runWorker            this.firstTask = firstTask;            this.thread = getThreadFactory().newThread(this);        }        /** Delegates main run loop to outer runWorker  */        public void run() {            runWorker(this);        }        // Lock methods        //        // The value 0 represents the unlocked state.        // The value 1 represents the locked state.        protected boolean isHeldExclusively() {            return getState() != 0;        }        protected boolean tryAcquire(int unused) {            if (compareAndSetState(0, 1)) {                setExclusiveOwnerThread(Thread.currentThread());                return true;            }            return false;        }        protected boolean tryRelease(int unused) {            setExclusiveOwnerThread(null);            setState(0);            return true;        }        public void lock()        { acquire(1); }        public boolean tryLock()  { return tryAcquire(1); }        public void unlock()      { release(1); }        public boolean isLocked() { return isHeldExclusively(); }        void interruptIfStarted() {            Thread t;            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {                try {                    t.interrupt();                } catch (SecurityException ignore) {                }            }        }    }
  • 如果以后线程池中的线程数目小于corePoolSize,则每来一个工作,就会创立一个线程去执行这个工作;
  • 如果以后线程池中的线程数目>=corePoolSize,则每来一个工作,会尝试将其增加到工作缓存队列当中,若增加胜利,则该工作会期待闲暇线程将其取出去执行;若增加失败(一般来说是工作缓存队列已满),则会尝试创立新的线程去执行这个工作;
  • 如果以后线程池中的线程数目达到maximumPoolSize,则会采取工作回绝策略进行解决;
  • 如果线程池中的线程数量大于 corePoolSize时,如果某线程闲暇工夫超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果容许为外围池中的线程设置存活工夫,那么外围池中的线程闲暇工夫超过keepAliveTime,线程也会被终止。

罕用的线程池

newSingleThreadExecutor

单个线程的线程池,即线程池中每次只有一个线程工作,单线程串行执行工作

外围池大小1,最大大小1,存活工夫0,即永不过期,应用LinkedBlockingQueue

public class SingleThreadExecutorTest {    public static void main(String[] args) {        // TODO Auto-generated method stub        //创立一个可重用固定线程数的线程池        ExecutorService pool = Executors.newSingleThreadExecutor();        //创立实现了Runnable接口对象,Thread对象当然也实现了Runnable接口;        Thread t1 = new MyThread();        Thread t2 = new MyThread();        Thread t3 = new MyThread();        Thread t4 = new MyThread();        Thread t5 = new MyThread();        //将线程放到池中执行;        pool.execute(t1);        pool.execute(t2);        pool.execute(t3);        pool.execute(t4);        pool.execute(t5);        //敞开线程池        pool.shutdown();    }}

newFixedThreadExecutor

固定数量的线程池,没提交一个工作就是一个线程,直到达到线程池的最大数量,而后前面进入期待队列直到后面的工作实现才继续执行

外围池大小nThreads,最大大小nThreads,存活工夫0,永不过期,应用LinkedBlockingQueue

public class FixedThreadExecutorTest {    public static void main(String[] args) {        // TODO Auto-generated method stub        //创立一个可重用固定线程数的线程池  外围池大小为nThreads,keepAliveTime为0,线程永不过期        ExecutorService pool = Executors.newFixedThreadPool(2);        //创立实现了Runnable接口对象,Thread对象当然也实现了Runnable接口;        Thread t1 = new MyThread();        Thread t2 = new MyThread();        Thread t3 = new MyThread();        Thread t4 = new MyThread();        Thread t5 = new MyThread();        //将线程放到池中执行;        pool.execute(t1);        pool.execute(t2);        pool.execute(t3);        pool.execute(t4);        pool.execute(t5);        //敞开线程池        pool.shutdown();    }}

newCacheThreadExecutor

可缓存线程池, 当线程池大小超过了解决工作所需的线程,那么就会回收局部闲暇(个别是60秒无执行)的线程,当有工作来时,又智能的增加新线程来执行。

外围池大小0,最大大小Integer.MAX_VALUE,存活工夫默认60s,应用SynchronousQueue

public class CachedThreadExecutorTest {    public static void main(String[] args) {        // TODO Auto-generated method stub        //创立一个可重用固定线程数的线程池  外围池大小为0,keepAliveTime为60L,默认60s过期        ExecutorService pool = Executors.newCachedThreadPool();        //创立实现了Runnable接口对象,Thread对象当然也实现了Runnable接口;        Thread t1 = new MyThread();        Thread t2 = new MyThread();        Thread t3 = new MyThread();        Thread t4 = new MyThread();        Thread t5 = new MyThread();        //将线程放到池中执行;        pool.execute(t1);        pool.execute(t2);        pool.execute(t3);        pool.execute(t4);        pool.execute(t5);        //敞开线程池        pool.shutdown();    }}

newScheduleThreadExecutor

大小无限度的线程池,反对定时和周期性的执行线程

外围池大小corePoolSize,最大大小Integer.MAX_VALUE,存活工夫0,永不过期,应用DelayedWorkQueue

public class NewScheduledThreadPool {    public static void main(String[] args) {        // TODO Auto-generated method stub        ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1);        exec.scheduleAtFixedRate(new Runnable() {//每隔一段时间就触发异样            @Override            public void run() {                // TODO Auto-generated method stub                //throw new RuntimeException();                System.out.println("===================");            }        }, 1000, 5000, TimeUnit.MILLISECONDS);        exec.scheduleAtFixedRate(new Runnable() {//每隔一段时间打印零碎工夫,证实两者是互不影响的            @Override            public void run() {                // TODO Auto-generated method stub                System.out.println(System.nanoTime());            }        }, 1000, 2000, TimeUnit.MILLISECONDS);    }}

MyThread

public class MyThread extends Thread {    @Override    public void run() {        // TODO Auto-generated method stub        // super.run();        System.out.println(Thread.currentThread().getName() + "正在执行....");    }}

缓存队列

 在后面咱们屡次提到了工作缓存队列,即workQueue,它用来寄存期待执行的工作。

  workQueue的类型为BlockingQueue<Runnable>,通常能够取上面三种类型:

  1)ArrayBlockingQueue:基于数组实现的有界阻塞队列,依照先进先出对数组进行排序,此队列创立时必须指定大小;

  2)LinkedBlockingQueue:基于链表的先进先出阻塞队列,依照先进先出对数组进行排序,如果创立时没有指定此队列大小,则默认为Integer.MAX_VALUE;

  3)synchronousQueue:这个队列比拟非凡,它不会保留提交的工作,而是将间接新建一个线程来执行新来的工作,不存储元素。

  4)DelayedWorkQueue:实现PriorityBlockingQueue实现提早获取的无界队列,创立元素时,能够指定多久能力从队列中获取以后元素,只有延时期满能力从队列中获取元素。

工作回绝策略

  当线程池的工作缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有工作到来就会采取工作回绝策略,通常有以下四种策略:

ThreadPoolExecutor.AbortPolicy:抛弃工作并抛出RejectedExecutionException异样。ThreadPoolExecutor.DiscardPolicy:也是抛弃工作,然而不抛出异样。ThreadPoolExecutor.DiscardOldestPolicy:抛弃队列最后面的工作,而后从新尝试执行工作(反复此过程)ThreadPoolExecutor.CallerRunsPolicy:由调用线程解决该工作