关于java:JAVA并发编程ThreadPoolExecutor的源码解读

31次阅读

共计 7276 个字符,预计需要花费 19 分钟才能阅读完成。

一、简介

ThreadPoolExecutor 是 Java 并发编程中应用最宽泛的类之一,因为线程的创立和销毁须要耗费系统资源,所以通过应用线程池来无效治理线程。

二、工作流程

当向线程池提交一个工作后,线程池是如何来解决的?

如上图:

  1. 判断外围线程池是否已满。如果不是,则创立一个新的工作线程来执行工作。如果已满,进入下个步骤
  2. 判断工作队列是否已满。如果工作队列没有满,则将新提交的工作存储在工作队列里。如果队列满了,进入下个步骤
  3. 判断线程池是否已满。如果没有,则创立一个新的工作线程来执行工作
  4. 如果线程池已满,则交给饱和策略来解决这个工作

三、源码解读

3.1 线程池参数

ThreadPoolExecutor 的结构函数参数含意:

  1. corePoolSize:外围线程数大小,当线程数 <corePoolSize,会创立线程(即便有闲暇的外围线程也会创立)来执行工作
  2. maximumPoolSize:最大线程数,当线程数 >= corePoolSize 的时候,会把工作放入工作队列中,当工作队列也满了之后,会再创立新的线程,直到达到最大线程数
  3. keepAliveTime:放弃存活工夫,当线程数大于 corePoolSize 的闲暇线程能放弃存活的最大工夫。
  4. unit:工夫单位
  5. workQueue:保留工作的阻塞队列
  6. threadFactory:创立线程的工厂
  7. handler:回绝策略(包含 4 种:间接抛出异样 – 默认、应用调用者所在线程来运行工作、抛弃队列里最近一个工作来执行当前任务、不解决间接抛弃

3.2 线程池运行状态、工作线程数量

ThreadPoolExecutor 很奇妙的应用了一个 AtomicInteger 类型的 ctl 变量,来示意 线程池运行状态 工作线程数量 高 3 位用来示意线程池运行状态,低 29 位用于示意工作线程数量 。线程池运行状态有 5 种,状态和其对应的高 3 位别离是 RUNNING(111)、SHUTDOWN(000)、STOP(001)、TIDYING(010) 和 TERMINATED(011)。runStateOf()和 workerCountOf()办法别离能够计算出线程池运行状态和工作线程数量。源码如下:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     {return c & ~CAPACITY;}
private static int workerCountOf(int c)  {return c & CAPACITY;}
private static int ctlOf(int rs, int wc) {return rs | wc;}

3.3 工作线程 Worker

ThreadPoolExecutor 外部有一个工作线程 Worker 类,它并不是继承 Thread 类,而是外部蕴含了一个 Thread 对象,而且在初始化该 Thread 对象时,还将以后 Worker 对象作为结构函数参数传入,造成了一种互相援用的关系。
工作线程 Worker 类也是设计的很奇妙,这个类继承了 AbstractQueuedSynchronizer 且实现了 Runnable 接口,实现 Runnable 接口是比拟容易了解的,它的 run()办法就是该工作线程的须要做的工作。而为什么要继承 AbstractQueuedSynchronizer 呢?这是用于后续判断该工作线程是否曾经在执行中,如果正在执行中,那么该工作线程就不能被中断。代码如下

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;
    
    Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {runWorker(this);
    }
    // 上面是一些实现 AQS 须要实现的办法,不一一开展了
}

3.4 提交工作

咱们通常应用 execute()或 submit()来提交工作,那么 submit()外部也是应用 execute()办法来执行的,所以这里咱们看下 execute()办法的源码:

public void execute(Runnable command) {if (command == null)
        throw new NullPointerException();
    /*
     * 分为 3 个步骤
     *  1. 如果以后线程数小于外围线程池大小,会调用 addWorker 办法,addWorker 办法会依据运行状态和线程数量来判断是否须要新增工作线程
     *  2. 如果以后线程池运行状态是 RUNNING,尝试增加工作到工作队列 workQueue 中
     *  3. 如果无奈增加到工作队列中,持续调用 addWorker 办法,判断是否新增工作线程
     *  4. 最初如果还是无奈新增工作线程,则采取回绝策略
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
            return;
        c = ctl.get();}
    if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

3.5 新增工作线程

通过下面的代码,很显著咱们接下来就须要看下 addWorker()是如何依据 运行状态和线程数量 来判断是否须要新增工作线程,并且这个办法也是 ThreadPoolExecutor 中最重要的办法。首先依据运行状态和线程数量来判断是否新增工作线程,如果判断不新增,间接返回 false,否则新增工作线程。源码如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {int c = ctl.get();
        int rs = runStateOf(c);

        /*
         * 1. 如果运行状态是 STOP、TIDYING 和 TERMINATED,那么间接返回 false,新增工作线程失败
         * 2. 如果运行状态是 RUNNING,间接进行下一步
        * 3. 如果运行状态是 SHUTDOWN,当 firstTask 不为空(代表新来的工作),也间接返回 false,新增工作线程失败。*    firstTask 为空的话,如果工作队列为空,也不新增工作线程,反之新增工作线程。*/
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
            
        /*
         * 这里次要是判断工作线程数量是否大于外围线程数(core 为 true)或大于最大线程数(core 为 false),如果大于,则增加工作线程失败
         * 否则将 ctl 的值加 1,也就是将工作线程数量加 1,并且跳出两层循环
         */    
        for (;;) {int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        
        // new 一个工作线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 获取线程池的锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {int rs = runStateOf(ctl.get());
                // 再次查看线程池运行状态
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    // 如果当前工作线程数大于工作线程数的历史最大值,则更新历史最大值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {mainLock.unlock();
            }
            if (workerAdded) {
                // 启动工作线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 如果新增工作线程失败,将 ctl 的值减 1,也就是将工作线程数量减 1
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

3.6 工作线程的工作流程

工作线程 Worker 的 run()办法中的 runWroker(this)就是其工作流程,如果 firstTask 不为空,间接执行 firstTask,否则从工作队列中获取工作(如果须要回收工作线程,则应用超时获取工作机制,一旦获取工作超时,则回收工作线程,如果不须要回收工作线程,工作线程将会始终阻塞,直到获取工作),代码如下

final void runWorker(Worker w) {Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 如果 firstTask 不为空,间接执行,否则从工作队列中获取工作(超时获取或阻塞获取)while (task != null || (task = getTask()) != null) {
            // 获取工作线程的锁,代表工作线程不闲暇
            w.lock();
            
            // 如果运行状态大于等于 STOP,中断当前工作线程
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 钩子函数,能够继承 ThreadPoolExecutor,并重写该办法,通常用来监控工作执行
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 具体任务的执行
                    task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
                } finally {// 钩子函数,同 beforeExecute()
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();}
        }
        // 工作是否意外终止
        completedAbruptly = false;
    } finally {
        // 退出工作线程
        processWorkerExit(w, completedAbruptly);
    }
}

退出工作线程可分为 失常退出或工作异样 导致,当 completedAbruptly 为 true 时,代表是工作异样导致退出工作线程,completedAbruptly 为 false,代表工作队列为空,且失常回收线程(当 allowCoreThreadTimeOut 为 false 时,线程数量大于外围线程数,或当 allowCoreThreadTimeOut 为 true 时,线程数量大于 0),代码如下:

private void processWorkerExit(Worker w, boolean completedAbruptly) {// 如果工作异样导致工作线程退出,工作线程数减 1,如果工作线程失常退出时,在 getTask()办法中曾经将工作线程数减 1
    if (completedAbruptly)
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        // 移除工作线程
        workers.remove(w);
    } finally {mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    // 如果运行状态小于 STOP,判断是否须要新增工作线程来解决
    if (runStateLessThan(c, STOP)) {
        // 如果工作线程失常退出
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 如果工作线程曾经大于等于 min 值,那么不须要执行上面的 addWork(),间接返回
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 执行 addWorker()来新增工作线程
        addWorker(null, false);
    }
}

3.7 敞开线程池

咱们通常会应用 shutdown()或 shutdownNow()办法来敞开线程池。

  • shutdown 是将线程池的状态设置为 SHUTDOWN 状态,不承受新的工作,正在执行的工作和队列中的工作会继续执行完,如果工作队列为空,则中断并退出闲暇的工作线程
  • shutdownNow 将线程池的状态设置成 STOP,不承受新的工作,且尝试中断所有正在执行的工作,不执行并返回工作队列中的工作列表
    shutdown() 和 shutdownNow()办法相似,这里只看 shutdown()办法,代码如下:
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {checkShutdownAccess();
        // 更新运行状态为 SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中断闲暇的工作线程,因为工作队列为空,所以该工作线程会退出
        interruptIdleWorkers();
        // 钩子函数
        onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();
    }
    /**
     * 尝试终止线程池
     * 当运行状态为 SHUTDOWN 且工作队列为空且工作线程数为 0,或者运行状态为 STOP 且工作线程数为 0
     * 代表线程池敞开,执行 terminated 函数,这也是个钩子函数
     */
    tryTerminate();}

四、总结

以上解读的就是 ThreadPoolExecutor 中最外围的代码,包含提交工作、新增工作线程、工作线程的工作流程以及线程池的敞开等,了解了这些代码,咱们就能够晓得 ThreadPoolExecutor 线程池具体是如何工作的,也能在工作中更加得心应手地去应用它。文中若有谬误,欢送斧正。

正文完
 0