共计 19535 个字符,预计需要花费 49 分钟才能阅读完成。
前提
很早之前就打算看一次 JUC 线程池 ThreadPoolExecutor
的源码实现,因为近段时间比较忙,始终没有工夫整顿出源码剖析的文章。之前在剖析扩大线程池实现可回调的 Future
时候已经提到并发巨匠 Doug Lea
在设计线程池 ThreadPoolExecutor
的提交工作的顶层接口 Executor
只有一个无状态的执行办法:
public interface Executor {void execute(Runnable command);
}
而 ExecutorService
提供了很多扩大办法底层基本上是基于 Executor#execute()
办法进行扩大。本文着重剖析 ThreadPoolExecutor#execute()
的实现,笔者会从实现原理、源码实现等角度联合简化例子进行具体的剖析。ThreadPoolExecutor
的源码从 JDK8 到 JDK11 根本没有变动,本文编写的时候应用的是 JDK11。
ThreadPoolExecutor 的原理
ThreadPoolExecutor
外面应用到 JUC 同步器框架 AbstractQueuedSynchronizer
(俗称AQS
)、大量的位操作、CAS
操作。ThreadPoolExecutor
提供了固定沉闷线程(外围线程)、额定的线程(线程池容量 – 外围线程数这部分额定创立的线程,上面称为非核心线程)、工作队列以及回绝策略这几个重要的性能。
JUC 同步器框架
ThreadPoolExecutor
外面应用到 JUC 同步器框架,次要用于四个方面:
- 全局锁
mainLock
成员属性,是可重入锁ReentrantLock
类型,次要是用于拜访工作线程Worker
汇合和进行数据统计记录时候的加锁操作。 - 条件变量
termination
,Condition
类型,次要用于线程进行期待终结awaitTermination()
办法时的带期限阻塞。 - 工作队列
workQueue
,BlockingQueue
类型,工作队列,用于寄存待执行的工作。 - 工作线程,外部类
Worker
类型,是线程池中真正的工作线程对象。
对于 AQS
笔者之前写过一篇相干源码剖析的文章:JUC 同步器框架 AbstractQueuedSynchronizer 源码图文剖析。
外围线程
这里先参考 ThreadPoolExecutor
的实现并且进行简化,实现一个只有外围线程的线程池,要求如下:
- 临时不思考工作执行异常情况下的解决。
- 工作队列为无界队列。
- 线程池容量固定为外围线程数量。
- 临时不思考回绝策略。
public class CoreThreadPool implements Executor {
private BlockingQueue<Runnable> workQueue;
private static final AtomicInteger COUNTER = new AtomicInteger();
private int coreSize;
private int threadCount = 0;
public CoreThreadPool(int coreSize) {
this.coreSize = coreSize;
this.workQueue = new LinkedBlockingQueue<>();}
@Override
public void execute(Runnable command) {if (++threadCount <= coreSize) {new Worker(command).start();} else {
try {workQueue.put(command);
} catch (InterruptedException e) {throw new IllegalStateException(e);
}
}
}
private class Worker extends Thread {
private Runnable firstTask;
public Worker(Runnable runnable) {super(String.format("Worker-%d", COUNTER.getAndIncrement()));
this.firstTask = runnable;
}
@Override
public void run() {
Runnable task = this.firstTask;
while (null != task || null != (task = getTask())) {
try {task.run();
} finally {task = null;}
}
}
}
private Runnable getTask() {
try {return workQueue.take();
} catch (InterruptedException e) {throw new IllegalStateException(e);
}
}
public static void main(String[] args) throws Exception {CoreThreadPool pool = new CoreThreadPool(5);
IntStream.range(0, 10)
.forEach(i -> pool.execute(() ->
System.out.println(String.format("Thread:%s,value:%d", Thread.currentThread().getName(), i))));
Thread.sleep(Integer.MAX_VALUE);
}
}
某次运行后果如下:
Thread:Worker-0,value:0
Thread:Worker-3,value:3
Thread:Worker-2,value:2
Thread:Worker-1,value:1
Thread:Worker-4,value:4
Thread:Worker-1,value:5
Thread:Worker-2,value:8
Thread:Worker-4,value:7
Thread:Worker-0,value:6
Thread:Worker-3,value:9
设计此线程池的时候,外围线程是懒创立的,如果线程闲暇的时候则阻塞在工作队列的 take()
办法,其实对于 ThreadPoolExecutor
也是相似这样实现,只是如果应用了 keepAliveTime
并且容许外围线程超时(allowCoreThreadTimeOut
设置为 true
)则会应用BlockingQueue#poll(keepAliveTime)
进行轮询代替永恒阻塞。
其余附加性能
构建 ThreadPoolExecutor
实例的时候,须要定义 maximumPoolSize
(线程池最大线程数)和corePoolSize
(外围线程数)。当工作队列是有界的阻塞队列,外围线程满负载,工作队列曾经满的状况下,会尝试创立额定的maximumPoolSize - corePoolSize
个线程去执行新提交的工作。当 ThreadPoolExecutor
这里实现的两个次要附加性能是:
- 肯定条件下会创立非核心线程去执行工作,非核心线程的回收周期(线程生命周期终结时刻)是
keepAliveTime
,线程生命周期终结的条件是:下一次通过工作队列获取工作的时候并且存活工夫超过keepAliveTime
。 - 提供回绝策略,也就是在外围线程满负载、工作队列已满、非核心线程满负载的条件下会触发回绝策略。
源码剖析
先剖析线程池的要害属性,接着剖析其状态管制,最初重点剖析 ThreadPoolExecutor#execute()
办法。
要害属性
public class ThreadPoolExecutor extends AbstractExecutorService {
// 控制变量 - 寄存状态和线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 工作队列,必须是阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 工作线程汇合,寄存线程池中所有的(沉闷的)工作线程,只有在持有全局锁 mainLock 的前提下能力拜访此汇合
private final HashSet<Worker> workers = new HashSet<>();
// 全局锁
private final ReentrantLock mainLock = new ReentrantLock();
// awaitTermination 办法应用的期待条件变量
private final Condition termination = mainLock.newCondition();
// 记录峰值线程数
private int largestPoolSize;
// 记录曾经胜利执行结束的工作数
private long completedTaskCount;
// 线程工厂,用于创立新的线程实例
private volatile ThreadFactory threadFactory;
// 拒绝执行处理器,对应不同的回绝策略
private volatile RejectedExecutionHandler handler;
// 闲暇线程期待工作的工夫周期,单位是纳秒
private volatile long keepAliveTime;
// 是否容许外围线程超时,如果为 true 则 keepAliveTime 对外围线程也失效
private volatile boolean allowCoreThreadTimeOut;
// 外围线程数
private volatile int corePoolSize;
// 线程池容量
private volatile int maximumPoolSize;
// 省略其余代码
}
上面看参数列表最长的构造函数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
能够自定义外围线程数、线程池容量(最大线程数)、闲暇线程期待工作周期、工作队列、线程工厂、回绝策略。上面简略剖析一下每个参数的含意和作用:
corePoolSize
:int 类型,外围线程数量。maximumPoolSize
:int 类型,最大线程数量,也就是线程池的容量。keepAliveTime
:long 类型,线程闲暇等待时间,也和工作线程的生命周期无关,下文会剖析。unit
:TimeUnit
类型,keepAliveTime
参数的工夫单位,实际上keepAliveTime
最终会转化为纳秒。workQueue
:BlockingQueue
类型,期待队列或者叫工作队列。threadFactory
:ThreadFactory
类型,线程工厂,用于创立工作线程(包含外围线程和非核心线程),默认应用Executors.defaultThreadFactory()
作为内建线程工厂实例,个别自定义线程工厂能力更好地跟踪工作线程。handler
:-
RejectedExecutionHandler
类型,线程池的拒绝执行处理器,更多时候称为回绝策略,回绝策略执行的机会是当阻塞队列已满、没有闲暇的线程(包含外围线程和非核心线程)并且持续提交工作。提供了 4 种内建的回绝策略实现:
AbortPolicy
:间接回绝策略,也就是不会执行工作,间接抛出RejectedExecutionException
,这是 默认的回绝策略。DiscardPolicy
:摈弃策略,也就是间接疏忽提交的工作(艰深来说就是空实现)。DiscardOldestPolicy
:摈弃最老工作策略,也就是通过poll()
办法取出工作队列队头的工作摈弃,而后执行以后提交的工作。-
CallerRunsPolicy
:调用者执行策略,也就是以后调用Executor#execute()
的线程间接调用工作Runnable#run()
, 个别不心愿工作失落会选用这种策略,但从理论角度来看,原来的异步调用用意会进化为同步调用。状态管制
状态管制次要围绕原子整型成员变量
ctl
:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 通过 ctl 值获取运行状态
private static int runStateOf(int c) {return c & ~COUNT_MASK;}
// 通过 ctl 值获取工作线程数
private static int workerCountOf(int c) {return c & COUNT_MASK;}
// 通过运行状态和工作线程数计算 ctl 的值,或运算
private static int ctlOf(int rs, int wc) {return rs | wc;}
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
// CAS 操作线程数减少 1
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
// CAS 操作线程数缩小 1
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
// 线程数间接缩小 1
private void decrementWorkerCount() {
ctl.addAndGet(-1);
}
接下来剖析一下线程池的状态变量,工作线程下限数量位的长度是 `COUNT_BITS`,它的值是 `Integer.SIZE - 3`,也就是正整数 29:> 咱们晓得,整型包装类型 Integer 实例的大小是 4 byte,一共 32 bit,也就是一共有 32 个位用于寄存 0 或者 1。在 ThreadPoolExecutor 实现中,应用 32 位的整型包装类型寄存工作线程数和线程池状态。其中,低 29 位用于寄存工作线程数,而高 3 位用于寄存线程池状态,所以线程池的状态最多只能有 2^3 种。工作线程下限数量为 2^29 - 1,超过 5 亿,这个数量在短时间内不必思考会超限。接着看工作线程下限数量掩码 `COUNT_MASK`,它的值是 `(1 < COUNT_BITS) - l`,也就是 1 左移 29 位,再减去 1,如果补全 32 位,它的位视图如下:![](https://upload-images.jianshu.io/upload_images/15462057-7779f5276218b4db.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
而后就是线程池的状态常量,这里只详细分析其中一个,其余类同,这里看 `RUNNING` 状态:
// - 1 的补码为:111-11111111111111111111111111111
// 左移 29 位后:111-00000000000000000000000000000
// 10 进制值为:-536870912
// 高 3 位 111 的值就是示意线程池正在处于运行状态
private static final int RUNNING = -1 << COUNT_BITS;
控制变量 `ctl` 的组成就是通过线程池运行状态 `rs` 和工作线程数 `wc` 通过 ** 或运算 ** 失去的:
// rs=RUNNING 值为:111-00000000000000000000000000000
// wc 的值为 0:000-00000000000000000000000000000
// rs | wc 的后果为:111-00000000000000000000000000000
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
那么咱们怎么从 `ctl` 中取出高 3 位的线程池状态?下面源码中提供的 `runStateOf()` 办法就是提取运行状态:
// 先把 COUNT_MASK 取反 (~COUNT_MASK),
失去:111-00000000000000000000000000000
// ctl 位图特点是:xxx-yyyyyyyyyyyyyyyyyyyyyyyyyyyyyy
// 两者做一次与运算即可失去高 3 位 xxx
private static int runStateOf(int c){
return c & ~COUNT_MASK;
}
同理,取出低 29 位的工作线程数量只须要把 `ctl` 和 `COUNT_MASK`(`000-11111111111111111111111111111`)做一次 ** 与运算 ** 即可。工作线程数为 0 的前提下,小结一下线程池的运行状态常量:![](https://upload-images.jianshu.io/upload_images/15462057-b513d5f0cac42860.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
这里有一个比拟非凡的技巧,因为运行状态值寄存在高 3 位,所以能够间接通过十进制值(** 甚至能够疏忽低 29 位,间接用 `ctl` 进行比拟,或者应用 `ctl` 和线程池状态常量进行比拟 **)来比拟和判断线程池的状态:> 工作线程数为 0 的前提下:RUNNING(-536870912) < SHUTDOWN(0) < STOP(536870912) < TIDYING(1073741824) < TERMINATED(1610612736)
上面这三个办法就是应用这种技巧:
// ctl 和状态常量比拟,判断是否小于
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// ctl 和状态常量比拟,判断是否小于或等于
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// ctl 和状态常量 SHUTDOWN 比拟,判断是否处于 RUNNING 状态
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
最初是线程池状态的跃迁图:![](https://upload-images.jianshu.io/upload_images/15462057-b53e2c5417c0015d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
> PS:线程池源码中有很多两头变量用了简略的单字母示意,例如 c 就是示意 ctl、wc 就是示意 worker count、rs 就是示意 running status。**execute 办法源码剖析 **
线程池异步执行工作的办法实现是 `ThreadPoolExecutor#execute()`,源码如下:
// 执行命令,其中命令(上面称工作)对象是 Runnable 的实例
public void execute(Runnable command) {
// 判断命令(工作)对象非空
if (command == null)
throw new NullPointerException();
// 获取 ctl 的值
int c = ctl.get();
// 判断如果当前工作线程数小于外围线程数,则创立新的外围线程并且执行传入的工作
if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))
// 如果创立新的外围线程胜利则间接返回
return;
// 这里阐明创立外围线程失败,须要更新 ctl 的长期变量 c
c = ctl.get();}
// 走到这里阐明创立新的外围线程失败,也就是当前工作线程数大于等于 corePoolSize
// 判断线程池是否处于运行中状态,同时尝试用非阻塞办法向工作队列放入工作(放入工作失败返回 false)if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();
// 这里是向工作队列投放工作胜利,对线程池的运行中状态做二次查看
// 如果线程池二次查看状态是非运行中状态,则从工作队列移除以后的工作调用回绝策略解决之(也就是移除后面胜利入队的工作实例)if (! isRunning(recheck) && remove(command))
// 调用回绝策略解决工作 - 返回
reject(command);
// 走到上面的 else if 分支,阐明有以下的前提:// 0、待执行的工作曾经胜利退出工作队列
// 1、线程池可能是 RUNNING 状态
// 2、传入的工作可能从工作队列中移除失败(移除失败的惟一可能就是工作曾经被执行了)// 如果当前工作线程数量为 0,则创立一个非核心线程并且传入的工作对象为 null - 返回
// 也就是创立的非核心线程不会马上运行,而是期待获取工作队列的工作去执行
// 如果前工作线程数量不为 0,原来应该是最初的 else 分支,然而能够什么也不做,因为工作曾经胜利入队列,总会有适合的机会调配其余闲暇线程去执行它
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 走到这里阐明有以下的前提:// 0、线程池中的工作线程总数曾经大于等于 corePoolSize(简略来说就是外围线程曾经全副懒创立结束)// 1、线程池可能不是 RUNNING 状态
// 2、线程池可能是 RUNNING 状态同时工作队列曾经满了
// 如果向工作队列投放工作失败,则会尝试创立非核心线程传入工作执行
// 创立非核心线程失败,此时须要拒绝执行工作
else if (!addWorker(command, false))
// 调用回绝策略解决工作 - 返回
reject(command);
}
这里简略剖析一下整个流程:1. 如果当前工作线程总数小于 `corePoolSize`,则间接创立外围线程执行工作(工作实例会传入间接用于结构工作线程实例)。2. 如果当前工作线程总数大于等于 `corePoolSize`,判断线程池是否处于运行中状态,同时尝试用非阻塞办法向工作队列放入工作,这里会二次查看线程池运行状态,如果当前工作线程数量为 0,则创立一个非核心线程并且传入的工作对象为 null。3. 如果向工作队列投放工作失败(工作队列曾经满了),则会尝试创立非核心线程传入工作实例执行。4. 如果创立非核心线程失败,此时须要拒绝执行工作,调用回绝策略解决工作。** 这里是一个纳闷点 **:为什么须要二次查看线程池的运行状态,当前工作线程数量为 0,尝试创立一个非核心线程并且传入的工作对象为 null?这个能够看 API 正文:> 如果一个工作胜利退出工作队列,咱们仍然须要二次查看是否须要增加一个工作线程(因为所有存活的工作线程有可能在最初一次查看之后曾经终结)或者执行以后办法的时候线程池是否曾经 shutdown 了。所以咱们须要二次查看线程池的状态,必须时把工作从工作队列中移除或者在没有可用的工作线程的前提下新建一个工作线程。工作提交流程从调用者的角度来看如下:![](https://upload-images.jianshu.io/upload_images/15462057-3ed4dbff7795890b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
**addWorker 办法源码剖析 **
`boolean addWorker(Runnable firstTask, boolean core)` 办法的第一的参数能够用于间接传入工作实例,第二个参数用于标识将要创立的工作线程是否外围线程。办法源码如下:
// 增加工作线程,如果返回 false 阐明没有新创建工作线程,如果返回 true 阐明创立和启动工作线程胜利
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 留神这是一个死循环 - 最外层循环
for (int c = ctl.get();;) {
// 这个是十分复杂的条件,这里先拆分多个与(&&)条件:// 1. 线程池状态至多为 SHUTDOWN 状态,也就是 rs >= SHUTDOWN(0)
// 2. 线程池状态至多为 STOP 状态,也就是 rs >= STOP(1),或者传入的工作实例 firstTask 不为 null,或者工作队列为空
// 其实这个判断的边界是线程池状态为 shutdown 状态下,不会再承受新的工作,在此前提下如果状态曾经到了 STOP、或者传入工作不为空、或者工作队列为空(曾经没有积压工作)都不须要增加新的线程
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
// 留神这也是一个死循环 - 二层循环
for (;;) {
// 这里每一轮循环都会从新获取工作线程数 wc
// 1. 如果传入的 core 为 true,示意将要创立外围线程,通过 wc 和 corePoolSize 判断,如果 wc >= corePoolSize,则返回 false 示意创立外围线程失败
// 1. 如果传入的 core 为 false,示意将要创非建外围线程,通过 wc 和 maximumPoolSize 判断,如果 wc >= maximumPoolSize,则返回 false 示意创立非核心线程失败
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 胜利通过 CAS 更新工作线程数 wc,则 break 到最外层的循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 走到这里阐明了通过 CAS 更新工作线程数 wc 失败,这个时候须要从新判断线程池的状态是否由 RUNNING 曾经变为 SHUTDOWN
c = ctl.get(); // Re-read ctl
// 如果线程池状态曾经由 RUNNING 曾经变为 SHUTDOWN,则从新跳出到外层循环继续执行
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// 如果线程池状态仍然是 RUNNING,CAS 更新工作线程数 wc 失败阐明有可能是并发更新导致的失败,则在内层循环重试即可
// else CAS failed due to workerCount change; retry inner loop
}
}
// 标记工作线程是否启动胜利
boolean workerStarted = false;
// 标记工作线程是否创立胜利
boolean workerAdded = false;
Worker w = null;
try {
// 传入工作实例 firstTask 创立 Worker 实例,Worker 结构外面会通过线程工厂创立新的 Thread 对象,所以上面能够间接操作 Thread t = w.thread
// 这一步 Worker 实例曾经创立,然而没有退出工作线程汇合或者启动它持有的线程 Thread 实例
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 这里须要全局加锁,因为会扭转一些指标值和非线程平安的汇合
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
// 这里次要在加锁的前提下判断 ThreadFactory 创立的线程是否存活或者判断获取锁胜利之后线程池状态是否曾经更变为 SHUTDOWN
// 1. 如果线程池状态仍然为 RUNNING,则只须要判断线程实例是否存活,须要增加到工作线程汇合和启动新的 Worker
// 2. 如果线程池状态小于 STOP,也就是 RUNNING 或者 SHUTDOWN 状态下,同时传入的工作实例 firstTask 为 null,则须要增加到工作线程汇合和启动新的 Worker
// 对于 2,换言之,如果线程池处于 SHUTDOWN 状态下,同时传入的工作实例 firstTask 不为 null,则不会增加到工作线程汇合和启动新的 Worker
// 这一步其实有可能创立了新的 Worker 实例然而并不启动(长期对象,没有任何强援用),这种 Worker 有可能胜利下一轮 GC 被收集的垃圾对象
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 把创立的工作线程实例增加到工作线程汇合
workers.add(w);
int s = workers.size();
// 尝试更新历史峰值工作线程数,也就是线程池峰值容量
if (s > largestPoolSize)
largestPoolSize = s;
// 这里更新工作线程是否启动胜利标识为 true,前面才会调用 Thread#start()办法启动实在的线程实例
workerAdded = true;
}
} finally {mainLock.unlock();
}
// 如果胜利增加工作线程,则调用 Worker 外部的线程实例 t 的 Thread#start()办法启动实在的线程实例
if (workerAdded) {t.start();
// 标记线程启动胜利
workerStarted = true;
}
}
} finally {
// 线程启动失败,须要从工作线程汇合移除对应的 Worker
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
// 增加 Worker 失败
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 从工作线程汇合移除之
if (w != null)
workers.remove(w);
// wc 数量减 1
decrementWorkerCount();
// 基于状态判断尝试终结线程池
tryTerminate();} finally {mainLock.unlock();
}
}
笔者发现了 `Doug Lea` 大神非常喜爱简单的条件判断,而且单行简单判断不喜爱加花括号,像上面这种代码在他编写的很多类库中都比拟常见:
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
// ….
// 代码拆分一下如下
boolean atLeastShutdown = runStateAtLeast(c, SHUTDOWN); # rs >= SHUTDOWN(0)
boolean atLeastStop = runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty();
if (atLeastShutdown && atLeastStop){
return false;
}
下面的剖析逻辑中须要留神一点,`Worker` 实例创立的同时,在其构造函数中会通过 `ThreadFactory` 创立一个 Java 线程 `Thread` 实例,前面会加锁后二次查看是否须要把 `Worker` 实例增加到工作线程汇合 `workers` 中和是否须要启动 `Worker` 中持有的 `Thread` 实例,只有启动了 `Thread` 实例实例,`Worker` 才真正开始运作,否则只是一个无用的长期对象。`Worker` 自身也实现了 `Runnable` 接口,它能够看成是一个 `Runnable` 的适配器。** 工作线程外部类 Worker 源码剖析 **
线程池中的每一个具体的工作线程被包装为外部类 `Worker` 实例,`Worker` 继承于 `AbstractQueuedSynchronizer(AQS)`,实现了 `Runnable` 接口:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
// 保留 ThreadFactory 创立的线程实例,如果 ThreadFactory 创立线程失败则为 null
final Thread thread;
// 保留传入的 Runnable 工作实例
Runnable firstTask;
// 记录每个线程实现的工作总数
volatile long completedTasks;
// 惟一的构造函数,传入工作实例 firstTask,留神能够为 null
Worker(Runnable firstTask) {// 禁止线程中断,直到 runWorker()办法执行
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 通过 ThreadFactory 创立线程实例,留神一下 Worker 实例本身作为 Runnable 用于创立新的线程实例
this.thread = getThreadFactory().newThread(this);
}
// 委托到内部的 runWorker()办法,留神 runWorker()办法是线程池的办法,而不是 Worker 的办法
public void run() {runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
// 是否持有独占锁,state 值为 1 的时候示意持有锁,state 值为 0 的时候示意曾经开释锁
protected boolean isHeldExclusively() {return getState() != 0;
}
// 独占模式下尝试获取资源,这里没有判断传入的变量,间接 CAS 判断 0 更新为 1 是否胜利,胜利则设置独占线程为以后线程
protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 独占模式下尝试是否资源,这里没有判断传入的变量,间接把 state 设置为 0
protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 加锁
public void lock() { acquire(1); }
// 尝试加锁
public boolean tryLock() { return tryAcquire(1); }
// 解锁
public void unlock() { release(1); }
// 是否锁定
public boolean isLocked() { return isHeldExclusively(); }
// 启动后进行线程中断,留神这里会判断线程实例的中断标记位是否为 false,只有中断标记位为 false 才会中断
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {t.interrupt();
} catch (SecurityException ignore) {}}
}
}
`Worker` 的构造函数外面的逻辑非常重要,通过 `ThreadFactory` 创立的 `Thread` 实例同时传入 `Worker` 实例,因为 `Worker` 自身实现了 `Runnable`,所以能够作为工作提交到线程中执行。只有 `Worker` 持有的线程实例 `w` 调用 `Thread#start()` 办法就能在适合机会执行 `Worker#run()`。简化一下逻辑如下:
// addWorker()办法中结构
Worker worker = createWorker();
// 通过线程池结构时候传入
ThreadFactory threadFactory = getThreadFactory();
// Worker 构造函数中
Thread thread = threadFactory.newThread(worker);
// addWorker() 办法中启动
thread.start();
`Worker` 继承自 `AQS`,这里应用了 `AQS` 的独占模式,有个技巧是结构 `Worker` 的时候,把 `AQS` 的资源(状态)通过 `setState(-1)` 设置为 -1,这是因为 `Worker` 实例刚创立时 `AQS` 中 `state` 的默认值为 0,此时线程尚未启动,不能在这个时候进行线程中断,见 `Worker#interruptIfStarted()` 办法。`Worker` 中两个笼罩 `AQS` 的办法 `tryAcquire()` 和 `tryRelease()` 都没有判断内部传入的变量,前者间接 `CAS(0,1)`,后者间接 `setState(0)`。接着看外围办法 `ThreadPoolExecutor#runWorker()`:
final void runWorker(Worker w) {
// 获取以后线程,实际上和 Worker 持有的线程实例是雷同的
Thread wt = Thread.currentThread();
// 获取 Worker 中持有的初始化时传入的工作对象,这里留神寄存在长期变量 task 中
Runnable task = w.firstTask;
// 设置 Worker 中持有的初始化时传入的工作对象为 null
w.firstTask = null;
// 因为 Worker 初始化时 AQS 中 state 设置为 -1,这里要先做一次解锁把 state 更新为 0,容许线程中断
w.unlock(); // allow interrupts
// 记录线程是否因为用户异样终结,默认是 true
boolean completedAbruptly = true;
try {
// 初始化工作对象不为 null,或者从工作队列获取工作不为空(从工作队列获取到的工作会更新到长期变量 task 中)// getTask()因为应用了阻塞队列,这个 while 循环如果命中后半段会处于阻塞或者超时阻塞状态,getTask()返回为 null 会导致线程跳出死循环使线程终结
while (task != null || (task = getTask()) != null) {
// Worker 加锁,实质是 AQS 获取资源并且尝试 CAS 更新 state 由 0 更变为 1
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果线程池正在进行(也就是由 RUNNING 或者 SHUTDOWN 状态向 STOP 状态变更),那么要确保当前工作线程是中断状态
// 否则,要保障以后线程不是中断状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 钩子办法,工作执行前
beforeExecute(wt, task);
try {task.run();
// 钩子办法,工作执行后 - 失常状况
afterExecute(task, null);
} catch (Throwable ex) {
// 钩子办法,工作执行后 - 异常情况
afterExecute(task, ex);
throw ex;
}
} finally {
// 清空 task 长期变量,这个很重要,否则 while 会死循环执行同一个 task
task = null;
// 累加 Worker 实现的工作数
w.completedTasks++;
// Worker 解锁,实质是 AQS 开释资源,设置 state 为 0
w.unlock();}
}
// 走到这里阐明某一次 getTask()返回为 null,线程失常退出
completedAbruptly = false;
} finally {
// 解决线程退出,completedAbruptly 为 true 阐明因为用户异样导致线程非正常退出
processWorkerExit(w, completedAbruptly);
}
}
这里重点拆解剖析一下判断当前工作线程中断状态的代码:
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
// 先简化一下判断逻辑,如下
// 判断线程池状态是否至多为 STOP,rs >= STOP(1)
boolean atLeastStop = runStateAtLeast(ctl.get(), STOP);
// 判断线程池状态是否至多为 STOP,同时判断以后线程的中断状态并且清空以后线程的中断状态
boolean interruptedAndAtLeastStop = Thread.interrupted() && runStateAtLeast(ctl.get(), STOP);
if (atLeastStop || interruptedAndAtLeastStop && !wt.isInterrupted()){
wt.interrupt();
}
`Thread.interrupted()` 办法获取线程的中断状态同时会清空该中断状态,这里之所以会调用这个办法是因为在执行下面这个 `if` 逻辑同时内部有可能调用 `shutdownNow()` 办法,`shutdownNow()` 办法中也存在中断所有 `Worker` 线程的逻辑,然而因为 `shutdownNow()` 办法中会遍历所有 `Worker` 做线程中断,有可能无奈及时在工作提交到 `Worker` 执行之前进行中断,所以这个中断逻辑会在 `Worker` 外部执行,就是 `if` 代码块的逻辑。这里还要留神的是:`STOP` 状态下会回绝所有新提交的工作,不会再执行工作队列中的工作,同时会中断所有 `Worker` 线程。也就是,** 即便工作 Runnable 曾经 `runWorker()` 中前半段逻辑取出,只有还没走到调用其 Runnable#run(),都有可能被中断 **。假如刚好产生了进入 `if` 代码块的逻辑同时内部调用了 `shutdownNow()` 办法,那么 `if` 逻辑内会判断线程中断状态并且重置,那么 `shutdownNow()` 办法中调用的 `interruptWorkers()` 就不会因为中断状态判断呈现问题导致二次中断线程(会导致异样)。小结一下下面 `runWorker()` 办法的外围流程:1. `Worker` 先执行一次解锁操作,用于解除不可中断状态。2. 通过 `while` 循环调用 `getTask()` 办法从工作队列中获取工作(当然,首轮循环也有可能是内部传入的 firstTask 工作实例)。3. 如果线程池更变为 `STOP` 状态,则须要确保工作线程是中断状态并且进行中断解决,否则要保障工作线程必须不是中断状态。4. 执行工作实例 `Runnale#run()` 办法,工作实例执行之前和之后(包含失常执行结束和异样执行状况)别离会调用钩子办法 `beforeExecute()` 和 `afterExecute()`。5. `while` 循环跳出意味着 `runWorker()` 办法完结和工作线程生命周期完结(`Worker#run()` 生命周期完结),会调用 `processWorkerExit()` 解决工作线程退出的后续工作。![](https://upload-images.jianshu.io/upload_images/15462057-7a5241c2da3c3c6b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
### ** 写在最初 **
欢送大家关注我的公众号【** 惊涛骇浪如码 **】,海量 Java 相干文章,学习材料都会在外面更新,整顿的材料也会放在外面。