一、简介

ThreadPoolExecutor是Java并发编程中应用最宽泛的类之一,因为线程的创立和销毁须要耗费系统资源,所以通过应用线程池来无效治理线程。

二、工作流程

当向线程池提交一个工作后,线程池是如何来解决的?

如上图:

  1. 判断外围线程池是否已满。如果不是,则创立一个新的工作线程来执行工作。如果已满,进入下个步骤
  2. 判断工作队列是否已满。如果工作队列没有满,则将新提交的工作存储在工作队列里。如果队列满了,进入下个步骤
  3. 判断线程池是否已满。如果没有,则创立一个新的工作线程来执行工作
  4. 如果线程池已满,则交给饱和策略来解决这个工作

三、源码解读

3.1 线程池参数

ThreadPoolExecutor的结构函数参数含意:

  1. corePoolSize:外围线程数大小,当线程数<corePoolSize ,会创立线程(即便有闲暇的外围线程也会创立)来执行工作
  2. maximumPoolSize:最大线程数, 当线程数 >= corePoolSize的时候,会把工作放入工作队列中,当工作队列也满了之后,会再创立新的线程,直到达到最大线程数
  3. keepAliveTime :放弃存活工夫,当线程数大于corePoolSize的闲暇线程能放弃存活的最大工夫。
  4. unit:工夫单位
  5. workQueue:保留工作的阻塞队列
  6. threadFactory:创立线程的工厂
  7. handler:回绝策略 (包含4种:间接抛出异样 - 默认、应用调用者所在线程来运行工作、抛弃队列里最近一个工作来执行当前任务、不解决间接抛弃

3.2 线程池运行状态、工作线程数量

ThreadPoolExecutor很奇妙的应用了一个AtomicInteger类型的ctl变量,来示意线程池运行状态工作线程数量高3位用来示意线程池运行状态,低29位用于示意工作线程数量。线程池运行状态有5种,状态和其对应的高3位别离是RUNNING(111)、SHUTDOWN(000)、STOP(001)、TIDYING(010)和TERMINATED(011)。runStateOf()和workerCountOf()办法别离能够计算出线程池运行状态和工作线程数量。源码如下:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bitsprivate static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;// Packing and unpacking ctlprivate static int runStateOf(int c)     { return c & ~CAPACITY; }private static int workerCountOf(int c)  { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }

3.3 工作线程Worker

ThreadPoolExecutor外部有一个工作线程Worker类,它并不是继承Thread类,而是外部蕴含了一个Thread对象,而且在初始化该Thread对象时,还将以后Worker对象作为结构函数参数传入,造成了一种互相援用的关系。
工作线程Worker类也是设计的很奇妙,这个类继承了AbstractQueuedSynchronizer且实现了Runnable接口,实现Runnable接口是比拟容易了解的,它的run()办法就是该工作线程的须要做的工作。而为什么要继承AbstractQueuedSynchronizer呢?这是用于后续判断该工作线程是否曾经在执行中,如果正在执行中,那么该工作线程就不能被中断。代码如下

private final class Worker    extends AbstractQueuedSynchronizer    implements Runnable{    /** Thread this worker is running in.  Null if factory fails. */    final Thread thread;    /** Initial task to run.  Possibly null. */    Runnable firstTask;    /** Per-thread task counter */    volatile long completedTasks;        Worker(Runnable firstTask) {        setState(-1); // inhibit interrupts until runWorker        this.firstTask = firstTask;        this.thread = getThreadFactory().newThread(this);    }    /** Delegates main run loop to outer runWorker  */    public void run() {        runWorker(this);    }    // 上面是一些实现AQS须要实现的办法,不一一开展了}

3.4 提交工作

咱们通常应用execute()或submit()来提交工作,那么submit()外部也是应用execute()办法来执行的,所以这里咱们看下execute()办法的源码:

public void execute(Runnable command) {    if (command == null)        throw new NullPointerException();    /*     * 分为3个步骤     *  1. 如果以后线程数小于外围线程池大小,会调用addWorker办法,addWorker办法会依据运行状态和线程数量来判断是否须要新增工作线程     *  2. 如果以后线程池运行状态是RUNNING,尝试增加工作到工作队列workQueue中     *  3. 如果无奈增加到工作队列中,持续调用addWorker办法,判断是否新增工作线程     *  4. 最初如果还是无奈新增工作线程,则采取回绝策略     */    int c = ctl.get();    if (workerCountOf(c) < corePoolSize) {        if (addWorker(command, true))            return;        c = ctl.get();    }    if (isRunning(c) && workQueue.offer(command)) {        int recheck = ctl.get();        if (! isRunning(recheck) && remove(command))            reject(command);        else if (workerCountOf(recheck) == 0)            addWorker(null, false);    }    else if (!addWorker(command, false))        reject(command);}

3.5 新增工作线程

通过下面的代码,很显著咱们接下来就须要看下addWorker()是如何依据运行状态和线程数量来判断是否须要新增工作线程,并且这个办法也是ThreadPoolExecutor中最重要的办法。首先依据运行状态和线程数量来判断是否新增工作线程,如果判断不新增,间接返回false,否则新增工作线程。源码如下:

private boolean addWorker(Runnable firstTask, boolean core) {    retry:    for (;;) {        int c = ctl.get();        int rs = runStateOf(c);        /*         * 1. 如果运行状态是STOP、TIDYING和TERMINATED,那么间接返回false,新增工作线程失败         * 2. 如果运行状态是RUNNING,间接进行下一步        * 3. 如果运行状态是SHUTDOWN,当firstTask不为空(代表新来的工作),也间接返回false,新增工作线程失败。        *    firstTask为空的话,如果工作队列为空,也不新增工作线程,反之新增工作线程。        */        if (rs >= SHUTDOWN &&            ! (rs == SHUTDOWN &&               firstTask == null &&               ! workQueue.isEmpty()))            return false;                    /*         * 这里次要是判断工作线程数量是否大于外围线程数(core为true)或大于最大线程数(core为false),如果大于,则增加工作线程失败         * 否则将ctl的值加1,也就是将工作线程数量加1,并且跳出两层循环         */            for (;;) {            int wc = workerCountOf(c);            if (wc >= CAPACITY ||                wc >= (core ? corePoolSize : maximumPoolSize))                return false;            if (compareAndIncrementWorkerCount(c))                break retry;            c = ctl.get();  // Re-read ctl            if (runStateOf(c) != rs)                continue retry;            // else CAS failed due to workerCount change; retry inner loop        }    }    boolean workerStarted = false;    boolean workerAdded = false;    Worker w = null;    try {                // new一个工作线程        w = new Worker(firstTask);        final Thread t = w.thread;        if (t != null) {            // 获取线程池的锁            final ReentrantLock mainLock = this.mainLock;            mainLock.lock();            try {                int rs = runStateOf(ctl.get());                // 再次查看线程池运行状态                if (rs < SHUTDOWN ||                    (rs == SHUTDOWN && firstTask == null)) {                    if (t.isAlive()) // precheck that t is startable                        throw new IllegalThreadStateException();                    workers.add(w);                    int s = workers.size();                    // 如果当前工作线程数大于工作线程数的历史最大值,则更新历史最大值                    if (s > largestPoolSize)                        largestPoolSize = s;                    workerAdded = true;                }            } finally {                mainLock.unlock();            }            if (workerAdded) {                // 启动工作线程                t.start();                workerStarted = true;            }        }    } finally {        // 如果新增工作线程失败,将ctl的值减1,也就是将工作线程数量减1        if (! workerStarted)            addWorkerFailed(w);    }    return workerStarted;}

3.6 工作线程的工作流程

工作线程Worker的run()办法中的runWroker(this)就是其工作流程,如果firstTask不为空,间接执行firstTask,否则从工作队列中获取工作(如果须要回收工作线程,则应用超时获取工作机制,一旦获取工作超时,则回收工作线程,如果不须要回收工作线程,工作线程将会始终阻塞,直到获取工作),代码如下

final void runWorker(Worker w) {    Thread wt = Thread.currentThread();    Runnable task = w.firstTask;    w.firstTask = null;    w.unlock(); // allow interrupts    boolean completedAbruptly = true;    try {        // 如果firstTask不为空,间接执行,否则从工作队列中获取工作(超时获取或阻塞获取)        while (task != null || (task = getTask()) != null) {            // 获取工作线程的锁,代表工作线程不闲暇            w.lock();                        // 如果运行状态大于等于STOP,中断当前工作线程            if ((runStateAtLeast(ctl.get(), STOP) ||                 (Thread.interrupted() &&                  runStateAtLeast(ctl.get(), STOP))) &&                !wt.isInterrupted())                wt.interrupt();            try {                // 钩子函数,能够继承ThreadPoolExecutor,并重写该办法,通常用来监控工作执行                beforeExecute(wt, task);                Throwable thrown = null;                try {                    // 具体任务的执行                    task.run();                } catch (RuntimeException x) {                    thrown = x; throw x;                } catch (Error x) {                    thrown = x; throw x;                } catch (Throwable x) {                    thrown = x; throw new Error(x);                } finally {                    // 钩子函数,同beforeExecute()                    afterExecute(task, thrown);                }            } finally {                task = null;                w.completedTasks++;                w.unlock();            }        }        // 工作是否意外终止        completedAbruptly = false;    } finally {        // 退出工作线程        processWorkerExit(w, completedAbruptly);    }}

退出工作线程可分为失常退出或工作异样导致,当completedAbruptly为true时,代表是工作异样导致退出工作线程,completedAbruptly为false,代表工作队列为空,且失常回收线程(当allowCoreThreadTimeOut为false时,线程数量大于外围线程数,或当allowCoreThreadTimeOut为true时,线程数量大于0),代码如下:

private void processWorkerExit(Worker w, boolean completedAbruptly) {    // 如果工作异样导致工作线程退出,工作线程数减1,如果工作线程失常退出时,在getTask()办法中曾经将工作线程数减1    if (completedAbruptly)        decrementWorkerCount();    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {        completedTaskCount += w.completedTasks;        // 移除工作线程        workers.remove(w);    } finally {        mainLock.unlock();    }    tryTerminate();    int c = ctl.get();    // 如果运行状态小于STOP,判断是否须要新增工作线程来解决    if (runStateLessThan(c, STOP)) {        // 如果工作线程失常退出        if (!completedAbruptly) {            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;            if (min == 0 && ! workQueue.isEmpty())                min = 1;            // 如果工作线程曾经大于等于min值,那么不须要执行上面的addWork(),间接返回            if (workerCountOf(c) >= min)                return; // replacement not needed        }        // 执行addWorker()来新增工作线程        addWorker(null, false);    }}

3.7 敞开线程池

咱们通常会应用shutdown()或shutdownNow()办法来敞开线程池。

  • shutdown是将线程池的状态设置为SHUTDOWN状态,不承受新的工作,正在执行的工作和队列中的工作会继续执行完,如果工作队列为空,则中断并退出闲暇的工作线程
  • shutdownNow将线程池的状态设置成STOP,不承受新的工作,且尝试中断所有正在执行的工作,不执行并返回工作队列中的工作列表
    shutdown()和shutdownNow()办法相似,这里只看shutdown()办法,代码如下:
public void shutdown() {    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {        checkShutdownAccess();        // 更新运行状态为SHUTDOWN        advanceRunState(SHUTDOWN);        // 中断闲暇的工作线程,因为工作队列为空,所以该工作线程会退出        interruptIdleWorkers();        // 钩子函数        onShutdown(); // hook for ScheduledThreadPoolExecutor    } finally {        mainLock.unlock();    }    /**     * 尝试终止线程池     * 当运行状态为SHUTDOWN且工作队列为空且工作线程数为0,或者运行状态为STOP且工作线程数为0     * 代表线程池敞开,执行terminated函数,这也是个钩子函数     */    tryTerminate();}

四、总结

以上解读的就是ThreadPoolExecutor中最外围的代码,包含提交工作、新增工作线程、工作线程的工作流程以及线程池的敞开等,了解了这些代码,咱们就能够晓得ThreadPoolExecutor线程池具体是如何工作的,也能在工作中更加得心应手地去应用它。文中若有谬误,欢送斧正。