共计 9491 个字符,预计需要花费 24 分钟才能阅读完成。
在上一篇文章《从 0 到 1 玩转线程池》中,我们了解了线程池的使用方法,以及向线程池中提交任务的完整流程和 ThreadPoolExecutor.execute 方法的源代码。在这篇文章中,我们将会从头阅读线程池 ThreadPoolExecutor 类的源代码,深入剖析线程池从提交任务到执行任务的完整流程,从而建立起完整的线程池运行模型。
查看 JDK 源码的方式
在 IDE 中,例如 IDEA 里,我们可以点击我们样例代码里的 ThreadPoolExecutor 类跳转到 JDK 中 ThreadPoolExecutor 类的源代码。在源代码中我们可以看到很多 java.util.concurrent 包的缔造者大牛“Doug Lea”所留下的各种注释,下面的图片就是该类源代码的一个截图。
这些注释的内容非常有参考价值,建议有能力的读者朋友可以自己阅读一遍。下面,我们就开始阅读 ThreadPoolExecutor 的源代码吧。
控制变量与线程池生命周期
在 ThreadPoolExecutor 类定义的开头,我们可以看到如下的几行代码:
// 控制变量,前 3 位表示状态,剩下的数据位表示有效的线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer 的位数减去 3 位状态位就是线程数的位数
private static final int COUNT_BITS = Integer.SIZE – 3;
// CAPACITY 就是线程数的上限(含),即 2^COUNT_BITS – 1 个
private static final int CAPACITY = (1 << COUNT_BITS) – 1;
第一行是一个用来作为控制变量的整型值,即一个 Integer。之所以要用 AtomicInteger 类是因为要保证多线程安全,在本系列之后的文章中会对 AtomicInteger 进行具体介绍。一个整型一般是 32 位,但是这里的代码为了保险起见,还是使用了 Integer.SIZE 来表示整型的总位数。这里的“位”指的是数据位 (bit),在计算机中,8bit = 1 字节,1024 字节 = 1KB,1024KB = 1MB。每一位都是一个 0 或 1 的数字,我们如果把整型想象成一个二进制(0 或 1) 的数组,那么一个 Integer 就是 32 个数字的数组。其中,前三个被用来表示状态,那么我们就可以表示 2^3 = 8 个不同的状态了。剩下的 29 位二进制数字都会被用于表示当前线程池中有效线程的数量,上限就是 (2^29 – 1) 个,即常量 CAPACITY。
之后的部分列出了线程池的所有状态:
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
在这里可以忽略数字后面的 << COUNT_BITS,可以把状态简单地理解为前面的数字部分,这样的简化基本不影响结论。
各个状态的解释如下:
RUNNING,正常运行状态,可以接受新的任务和处理队列中的任务
SHUTDOWN,关闭中状态,不能接受新任务,但是可以处理队列中的任务
STOP,停止中状态,不能接受新任务,也不处理队列中的任务,会中断进行中的任务
TIDYING,待结束状态,所有任务已经结束,线程数归 0,进入 TIDYING 状态后将会运行 terminated()方法
TERMINATED,结束状态,terminated()方法调用完成后进入
这几个状态所对应的数字值是按照顺序排列的,也就是说线程池的状态只能从小到大变化,这也方便了通过数字比较来判断状态所在的阶段,这种通过数字大小来比较状态值的方法在 ThreadPoolExecutor 的源码中会有大量的使用。
下图是这五个状态之间的变化过程:
当线程池被创建时会处于 RUNNING 状态,正常接受和处理任务;
当 shutdown()方法被直接调用,或者在线程池对象被 GC 回收时通过 finalize()方法隐式调用了 shutdown()方法时,线程池会进入 SHUTDOWN 状态。该状态下线程池仍然会继续执行完阻塞队列中的任务,只是不再接受新的任务了。当队列中的任务被执行完后,线程池中的线程也会被回收。当队列和线程都被清空后,线程池将进入 TIDYING 状态;
在线程池处于 RUNNING 或者 SHUTDOWN 状态时,如果有代码调用了 shutdownNow()方法,则线程池会进入 STOP 状态。在 STOP 状态下,线程池会直接清空阻塞队列中待执行的任务,然后中断所有正在进行中的任务并回收线程。当线程都被清空以后,线程池就会进入 TIDYING 状态;
当线程池进入 TIDYING 状态时,将会运行 terminated()方法,该方法执行完后,线程池就会进入最终的 TERMINATED 状态,彻底结束。
到这里我们就已经清楚地了解了线程从刚被创建时的 RUNNING 状态一直到最终的 TERMINATED 状态的整个生命周期了。那么当我们要向一个 RUNNING 状态的线程池提交任务时会发生些什么呢?
execute 方法的实现
我们一般会使用 execute 方法提交我们的任务,那么线程池在这个过程中做了什么呢?在 ThreadPoolExecutor 类的 execute()方法的源代码中,我们主要做了四件事:
如果当前线程池中的线程数小于核心线程数 corePoolSize,则通过 threadFactory 创建一个新的线程,并把入参中的任务作为第一个任务传入该线程;
如果当前线程池中的线程数已经达到了核心线程数 corePoolSize,那么就会通过阻塞队列 workerQueue 的 offer 方法来将任务添加到队列中保存,并等待线程空闲后进行执行;
如果线程数已经达到了 corePoolSize 且阻塞队列中无法插入该任务(比如已满),那么线程池就会再增加一个线程来执行该任务,除非线程数已经达到了最大线程数 maximumPoolSize;
如果确实已经达到了最大线程数,那么就会通过拒绝策略对象 handler 拒绝这个任务。
总体上的执行流程如下,下方的黑色同心圆代表流程结束:
这里解释一下阻塞队列的定义,方便大家阅读:
线程池中的阻塞队列专门用于存放需要等待线程空闲的待执行任务,而阻塞队列是这样的一种数据结构,它是一个队列(类似于一个 List),可以存放 0 到 N 个元素。我们可以对这个队列进行插入和弹出元素的操作,弹出操作可以理解为是一个获取并从队列中删除一个元素的操作。当队列中没有元素时,对这个队列的获取操作将会被阻塞,直到有元素被插入时才会被唤醒;当队列已满时,对这个队列的插入操作将会被阻塞,直到有元素被弹出后才会被唤醒。这样的一种数据结构非常适合于线程池的场景,当一个工作线程没有任务可处理时就会进入阻塞状态,直到有新任务提交后才被唤醒。
线程池中常用的阻塞队列一般有三种类型:直连队列、无界队列、有界队列。不同的阻塞队列类型会被线程池的行为产生不同的影响,有兴趣的读者可以在上一篇文章《从 0 到 1 玩转线程池》中找到不同类型阻塞队列的具体解释。
下面是带有注释的源代码,大家可以和上面的流程对照起来参考一下:
public void execute(Runnable command) {
// 检查提交的任务是否为空
if (command == null)
throw new NullPointerException();
// 获取控制变量值
int c = ctl.get();
// 检查当前线程数是否达到了核心线程数
if (workerCountOf(c) < corePoolSize) {
// 未达到核心线程数,则创建新线程
// 并将传入的任务作为该线程的第一个任务
if (addWorker(command, true))
// 添加线程成功则直接返回,否则继续执行
return;
// 因为前面调用了耗时操作 addWorker 方法
// 所以线程池状态有可能发生了改变,重新获取状态值
c = ctl.get();
}
// 判断线程池当前状态是否是运行中
// 如果是则调用 workQueue.offer 方法将任务放入阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
// 因为执行了耗时操作“放入阻塞队列”,所以重新获取状态值
int recheck = ctl.get();
// 如果当前状态不是运行中,则将刚才放入阻塞队列的任务拿出,如果拿出成功,则直接拒绝这个任务
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
// 如果线程池中没有线程了,那就创建一个
addWorker(null, false);
}
// 如果放入阻塞队列失败(如队列已满),则添加一个线程
else if (!addWorker(command, false))
// 如果添加线程失败(如已经达到了最大线程数),则拒绝任务
reject(command);
}
从上面的源码中我们可以知道,当一个任务被通过 ThreadPoolExecutor 的 execute 方法提交到线程池中执行时,这个任务有可能以两种方式被执行:
直接在创建一个新的 Worker 时被作为第一个任务传入,由这个新创建的线程来执行;
把任务放入一个阻塞队列,等待线程池中的工作线程 Worker 捞取任务进行执行。
这里的这个 Worker 指的就是 ThreadPoolExecutor.Worker 类,这是一个 ThreadPoolExecutor 的内部类,用于对基础线程类 Thread 进行包装和对线程进行管理。那么线程池到底是怎么利用 Worker 类来实现持续不断地接收提交的任务并执行的呢?接下来,我们通过 ThreadPoolExecutor 的源代码来一步一步抽丝剥茧,揭开线程池运行模型的神秘面纱。
addWorker 方法
在上文中的 execute 方法的代码中我们可以看到线程池是通过 addWorker 方法来向线程池中添加新线程的,那么新的线程又是如何运行起来的呢?
这里我们暂时跳过 addWorker 方法的详细源代码,因为虽然这个方法的代码行数较多,但是功能相对比较直接,只是通过 new Worker(firstTask)创建了一个代表线程的 Worker 对象,然后调用了这个对象所包含的 Thread 对象的 start()方法。
我们知道一旦调用了 Thread 类的 start()方法,则这个线程就会开始执行创建线程时传入的 Runnable 对象。从下面的 Worker 类构造器源代码可以看出,Worker 类正是把自己 (this 引用) 传入了线程的构造器当中,所以这个线程启动后就会执行 Worker 类的 run()方法了,而在 Worker 的 run()方法中只执行了一行很简单的代码 runWorker(this)。
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
runWorker 方法的实现
我们看到线程池中的线程在启动时会调用对应的 Worker 类的 runWorker 方法,而这里就是整个线程池任务执行的核心所在了。runWorker 方法中包含有一个类似无限循环的 while 语句,让 worker 对象可以一直持续不断地执行提交到线程池中的新任务或者等待下一个新任务的提交。
大家可以配合代码上带有的注释来理解该方法的具体实现:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 将 worker 的状态重置为正常状态,因为 state 状态值在构造器中被初始化为 -1
w.unlock();
// 通过 completedAbruptly 变量的值判断任务是否正常执行完成
boolean completedAbruptly = true;
try {
// 如果 task 为 null 就通过 getTask 方法获取阻塞队列中的下一个任务
// getTask 方法一般不会返回 null,所以这个 while 类似于一个无限循环
// worker 对象就通过这个方法的持续运行来不断处理新的任务
while (task != null || (task = getTask()) != null) {
// 每一次任务的执行都必须获取锁来保证下方临界区代码的线程安全
w.lock();
// 如果状态值大于等于 STOP(状态值是有序的,即 STOP、TIDYING、TERMINATED)
// 且当前线程还没有被中断,则主动中断线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
// 开始
try {
// 执行任务前处理操作,默认是一个空实现
// 在子类中可以通过重写来改变任务执行前的处理行为
beforeExecute(wt, task);
// 通过 thrown 变量保存任务执行过程中抛出的异常
// 提供给下面 finally 块中的 afterExecute 方法使用
Throwable thrown = null;
try {
// *** 重要:实际执行任务的代码
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
// 因为 Runnable 接口的 run 方法中不能抛出 Throwable 对象
// 所以要包装成 Error 对象抛出
thrown = x; throw new Error(x);
} finally {
// 执行任务后处理操作,默认是一个空实现
// 在子类中可以通过重写来改变任务执行后的处理行为
afterExecute(task, thrown);
}
} finally {
// 将循环变量 task 设置为 null,表示已处理完成
task = null;
// 累加当前 worker 已经完成的任务数
w.completedTasks++;
// 释放 while 体中第一行获取的锁
w.unlock();
}
}
// 将 completedAbruptly 变量设置为 false,表示任务正常处理完成
completedAbruptly = false;
} finally {
// 销毁当前的 worker 对象,并完成一些诸如完成任务数量统计之类的辅助性工作
// 在线程池当前状态小于 STOP 的情况下会创建一个新的 worker 来替换被销毁的 worker
processWorkerExit(w, completedAbruptly);
}
}
在 runWorker 方法的源代码中有两个比较重要的方法调用,一个是 while 条件中对 getTask 方法的调用,一个是在方法的最后对 processWorkerExit 方法的调用。下面是对这两个方法更详细的解释。
getTask 方法在阻塞队列中有待执行的任务时会从队列中弹出一个任务并返回,如果阻塞队列为空,那么就会阻塞等待新的任务提交到队列中直到超时(在一些配置下会一直等待而不超时),如果在超时之前获取到了新的任务,那么就会将这个任务作为返回值返回。所以一般 getTask 方法是不会返回 null 的,只会阻塞等待下一个任务并在之后将这个新任务作为返回值返回。
当 getTask 方法返回 null 时会导致当前 Worker 退出,当前线程被销毁。在以下情况下 getTask 方法才会返回 null:
当前线程池中的线程数超过了最大线程数。这是因为运行时通过调用 setMaximumPoolSize 修改了最大线程数而导致的结果;
线程池处于 STOP 状态。这种情况下所有线程都应该被立即回收销毁;
线程池处于 SHUTDOWN 状态,且阻塞队列为空。这种情况下已经不会有新的任务被提交到阻塞队列中了,所以线程应该被销毁;
线程可以被超时回收的情况下等待新任务超时。线程被超时回收一般有以下两种情况:
超出核心线程数部分的线程等待任务超时
允许核心线程超时(线程池配置)的情况下线程等待任务超时
processWorkerExit 方法会销毁当前线程对应的 Worker 对象,并执行一些累加总处理任务数等辅助操作,但在线程池当前状态小于 STOP 的情况下会创建一个新的 Worker 来替换被销毁的 Worker。
对 getTask 和 processWorkerExit 方法源代码感兴趣的读者可以阅读下一节来具体了解一下,不过跳过这一节也是完全可以的。
getTask 与 processWorkerExit 方法源代码
以下是 getTask 与 processWorkerExit 两个方法的带有中文解释的源代码:
private Runnable getTask() {
// 通过 timeOut 变量表示线程是否空闲时间超时了
boolean timedOut = false;
// 无限循环
for (;;) {
// 获取线程池状态
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果 线程池状态 >=STOP
// 或者 (线程池状态 ==SHUTDOWN && 阻塞队列为空)
// 则直接减少一个 worker 计数并返回 null(返回 null 会导致当前 worker 被销毁)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 获取线程池中的 worker 计数
int wc = workerCountOf(c);
// 判断当前线程是否会被超时销毁
// 会被超时销毁的情况:线程池允许核心线程超时 或 当前线程数大于核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果 (当前线程数大于最大线程数 或 (允许超时销毁 且 当前发生了空闲时间超时))
// 且 (当前线程数大于 1 或 阻塞队列为空) —— 该条件在阻塞队列不为空的情况下保证至少会保留一个线程继续处理任务
// 则 减少 worker 计数并返回 null(返回 null 会导致当前 worker 被销毁)
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 从阻塞队列中取出一个任务(如果队列为空会进入阻塞等待状态)
// 如果允许空闲超时销毁线程的话则带有一个等待的超时时间
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 如果获取到了任务就直接返回该任务,返回后会开始执行该任务
if (r != null)
return r;
// 如果任务为 null,则说明发生了等待超时,将空闲时间超时标志设置为 true
timedOut = true;
} catch (InterruptedException retry) {
// 如果等待被中断了,那说明空闲时间(等待任务的时间)还没有超时
timedOut = false;
}
}
}
processWorkerExit 方法的源代码:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果 completedAbruptly 为 true 则表示任务执行过程中抛出了未处理的异常
// 所以还没有正确地减少 worker 计数,这里需要减少一次 worker 计数
if (completedAbruptly)
decrementWorkerCount();
// 获取线程池的主锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 把将被销毁的线程已完成的任务数累计到线程池的完成任务总数上
completedTaskCount += w.completedTasks;
// 从 worker 集合中去掉将会销毁的 worker
workers.remove(w);
} finally {
// 释放线程池主锁
mainLock.unlock();
}
// 尝试结束线程池
// 这里是为了在关闭线程池时等到所有 worker 都被回收后再结束线程池
tryTerminate();
int c = ctl.get();
// 如果线程池状态 < STOP,即 RUNNING 或 SHUTDOWN
// 则需要考虑创建新线程来代替被销毁的线程
if (runStateLessThan(c, STOP)) {
// 如果 worker 是正常执行完的,则要判断一下是否已经满足了最小线程数要求
// 否则直接创建替代线程
if (!completedAbruptly) {
// 如果允许核心线程超时则最小线程数是 0,否则最小线程数等于核心线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果阻塞队列非空,则至少要有一个线程继续执行剩下的任务
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果当前线程数已经满足最小线程数要求
// 那么就不创建替代线程了
if (workerCountOf(c) >= min)
return;
}
// 重新创建一个 worker 来代替被销毁的线程
addWorker(null, false);
}
}
总结
到这里我们的线程池源代码之旅就结束了,在这篇文章中我们首先了解了线程池中的控制变量与状态变换流程,之后我们通过线程池的源代码深入解析了从提交任务到执行任务的全过程,相信通过这些知识我们已经可以在脑海中建立起一套完整的线程池运行模型了。如果大家有一些细节感觉还不是特别清晰的话,建议不妨再返回到文章的开头多读几遍,相信第二遍的阅读能给大家带来不一样的体验,因为我自己也是在第三次读 ThreadPoolExecutor 类的源代码时才真正打通了其中的一些重要关节的。
引子
在浏览 ThreadPoolExexutor 源码的过程中,有几个点我们其实并没有完全说清楚,比如对锁的加锁操作、对控制变量的多次获取、控制变量的 AtomicInteger 类型。在下一篇文章中,我将会介绍这些以锁、volatile 变量、CAS 操作、AQS 抽象类为代表的一系列线程同步方法,欢迎感兴趣的读者继续关注我后续发布的文章~