追踪解析 ThreadPoolExecutor 源码

31次阅读

共计 8737 个字符,预计需要花费 22 分钟才能阅读完成。

零 前期准备
0 FBI WARNING
文章异常啰嗦且绕弯。
1 版本
JDK 版本 : OpenJDK 11.0.1
IDE : idea 2018.3
2 ThreadPoolExecutor 简介
ThreadPoolExecutor 是 jdk4 中加入的工具,被封装在 jdk 自带的 Executors 框架中,是 java 中最经典的线程池技术。
ThreadPoolExecutor 类在 concurrent 包下,和其它线程工具类一样都由 Doug Lea 大神操刀完成。
[在看完 Spring ioc 和 Gson 之后有点乏了,换换口味看一些 jdk 的源码]
3 Demo
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolDemo {

public static void main(String[] args){
// 创建线程池
// 这里使用固定线程数的线程池,线程数为 5
ExecutorService executorService = Executors.newFixedThreadPool(5);

for(int i = 0 ; i < 100 ; i ++){
final int ii = i;
// 创建 Runnable 作为线程池的任务
Runnable r = () -> System.out.println(ii);
// 执行
executorService.execute(r);
}
}
}
一 线程池的初始化
线程池的初始化调用的 Executors 框架的静态方法:
//Executors.class
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
继续追踪这个构造方法:
//ThreadPoolExecutor.class
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
继续追踪:
//ThreadPoolExecutor.class
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {

// 验证参数的有效性
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();

// 本例中不涉及权限
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();

// 线程数
this.corePoolSize = corePoolSize;
// 最大线程数
// 本例中使用固定线程数的线程池,所以线程数和最大线程数相等
this.maximumPoolSize = maximumPoolSize;
// 用于存储任务的队列
// 此处使用 LinkedBlockingQueue 来储存任务,其线程安全
this.workQueue = workQueue;
//keepAliveTime 参数用于表示:
// 对于超出线程和队列缓存总和的任务,是否要临时增加线程来处理
// 超出的线程的存在时间是多少
// 这里使用的是定长线程池,所以 keepAliveTime = 0,即不增加线程
this.keepAliveTime = unit.toNanos(keepAliveTime);
// 用于创建线程的工厂类
this.threadFactory = threadFactory;
//handler 用来处理 task 太多时候的拒绝策略
// 此例中使用的是默认的,即定义在 ThreadPoolExecutor 中的 defaultHandler 对象
this.handler = handler;
}
二 Worker
Worker 是 ThreadPoolExecutor 的内部类,可以看做是 Runnable 的代理类:
//ThreadPoolExecutor.class
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{

private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
// 完成 task 数量的计数器
volatile long completedTasks;

Worker(Runnable firstTask) {
// 这个方法是 AbstractQueuedSynchronizer 中的方法,功能相当于加锁
//-1 的意思是后续的任务会处于阻塞状态,即为已经加锁
setState(-1);
// 在创建的时候存入一个要处理的 task
// 需要注意的是每个 worker 对象被创建出来之后是可以重复利用来处理多个 task 的
this.firstTask = firstTask;
//worker 会用自身作为 Runnable 对象去创建一个线程
// 这里调用线程工厂进行线程创建
this.thread = getThreadFactory().newThread(this);
}

// 对于线程变量来说,其启动的就是 worker 的 run() 方法
public void run() {
//runWorker(…) 方法在 ThreadPoolExecutor 里
runWorker(this);
}

// 获取锁的状态
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 重写了 AbstractQueuedSynchronizer 中的 tryAcquire(…) 方法
// 尝试加锁
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 重写了 AbstractQueuedSynchronizer 中的 tryRelease(…) 方法
// 尝试释放锁
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 真正的加锁方法
public void lock() {
acquire(1);
}
// 尝试加锁
public boolean tryLock() {
return tryAcquire(1);
}
// 真正的释放锁方法
public void unlock() {
release(1);
}
// 判断是否在锁中
public boolean isLocked() {
return isHeldExclusively();
}
// 中断线程
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
追踪一下 runWorker(…) 方法:
//ThreadPoolExecutor.class
final void runWorker(Worker w) {
// 获取当前所在的线程的实例对象
Thread wt = Thread.currentThread();
// 获取 task
Runnable task = w.firstTask;
// 取出来之后把 task 置空
w.firstTask = null;
// 此处释放锁
w.unlock();
// 指示器,此变量为 true 的时候确认该方法已经执行完毕
boolean completedAbruptly = true;
try {
// 此处为一个 while 循环,用于不断的执行 task
//getTask() 方法会从队列里不断抓取 task 并进行执行
// 当 task 为 null,且队列里已经没有更多 task 的时候,就会终止循环
while (task != null || (task = getTask()) != null) {
// 加锁,独占线程
w.lock();
// 在这里会判断线程的状态,如果存在符合中断的情况,就会直接中断掉
if ((runStateAtLeast(ctl.get(), STOP)
|| (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();

try {
//beforeExecute(…) 和 afterExecute(…) 方法在 ThreadPoolExecutor 中并没有实现
// 是预留出来给使用者重写,以达到业务需求的方法
beforeExecute(wt, task);
try {
// 此处执行 task
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
// 将执行的 task 置空
task = null;
// 每完成一个 task 就会加 1
w.completedTasks++;
// 释放锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 这个方法会销毁掉 worker
// 同时如果检测到有新的 task 又会重新创建 Worker
processWorkerExit(w, completedAbruptly);
}
}
Worker 是线程池中真正起完成业务逻辑的组件,是任务和线程的封装。
三 线程池的状态控制
线程池的状态主要由 ctl 变量来进行控制:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl 是一个 AtomicInteger 类型的变量,其实可以简单理解为一个 int 值,AtomicInteger 只是能够适应高并发的原子化操作的需要。
ctl 的前 29 位数用来表示线程 (Worker) 的数量,后面三位用来表示线程池的状态。
线程池的状态有五种,分别是 Running、Shutdown、Stop、Tidying、Terminate,根据单词就能猜出大概。
注意的是,这五种状态在线程池中都以 int 变量的形式存在,从前到后依次变大,对状态的比较有一系列方法:
//ThreadPoolExecutor.class
private static boolean runStateLessThan(int c, int s) {
//c 的状态值要小于 s
return c < s;
}
//ThreadPoolExecutor.class
private static boolean runStateAtLeast(int c, int s) {
//c 的状态值要大于或等于 s
return c >= s;
}
//ThreadPoolExecutor.class
private static boolean isRunning(int c) {
// 状态里只有 RUNNING 是小于 SHUTDOWN 的
return c < SHUTDOWN;
}
在这些方法里,传入的参数 c 一般指的是当前线程池状态,s 是用来对比的参照状态。
四 线程池的执行
该 part 的起点:
executorService.execute(r);
来追踪 execute(…) 方法:
public void execute(Runnable command) {
// 有效性验证
if (command == null)
throw new NullPointerException();

//ctl 是一个 AtomicInteger 类型的变量,用来记录线程池的状态
int c = ctl.get();

//workerCountOf(…) 方法会返回当前运行的 Worker 的数量
if (workerCountOf(c) < corePoolSize) {
//Worker 的数量小于线程池容量的情况下
// 直接增加 Worker 并取出 task 去运行
if (addWorker(command, true))
return;
// 如果 Worker 已经顺利执行了 task,应该会直接返回掉
// 如果执行中出现了其它情况,则会继续往下走
// 此处刷新状态
c = ctl.get();
}
// 当 Worker 数量已经达到线程池的指定数量,或者添加 Worker 的时候出问题的时候,会进入此判断语句
// 先判断线程池是否处于活跃状态,且 task 是否已经被成功添加到队列中
// 如果不满足,会进入 else 语句中,先最后尝试一次 addWorker(…) 方法,如果不成功就拒绝 task
//reject(…) 方法会调用 handler 的拒绝策略
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}else if (!addWorker(command, false))
reject(command);
}
1 reject
这里先提及一下 reject(…) 方法:
//ThreadPoolExecutor.class
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
本质是调用了 handler 对象的相关方法。在本例中,handler 对象指向了 defaultHandler:
//ThreadPoolExecutor.class
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
defaultHandler 是一个 AbortPolicy 类型的对象,而 AbortPolicy 是 ThreadPoolExecutor 的静态内部类。
AbortPolicy 起作用的方法为 rejectedExecution(…) 方法:
//AbortPolicy.class
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException(“Task ” + r.toString() +
” rejected from ” + e.toString());
}
也就是说,在 task 过多的情况下,AbortPolicy 的应对策略是抛出异常。
2 addWorker
来看一下核心方法 addWorker(…):
//ThreadPoolExecutor.class
private boolean addWorker(Runnable firstTask, boolean core) {
// 先标记这个 for 循环,方便退出循环
retry:
// 在每一次循环开始之前会刷新一次状态标识
for (int c = ctl.get();;) {
// 这里先进行判断,如果线程池已经关闭了,或者没有 task 了,就会返回 false
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;

for (;;) {
// 如果 Worker 数量已经超出了最大值就会直接返回 false
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 将 ctl 变量的值加 1,如果成功了就会跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
// 在状态值比 SHUTDOWN 大的时候会直接跳到最外头的循环里
// 需要注意的是最外面的 for 循环会判断状态值是否大于 SHUTDOWN
// 如果大于 SHUTDOWN 的话就返回 false 了
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建一个 Worker
w = new Worker(firstTask);
// 获取线程对象
final Thread t = w.thread;
if (t != null) {
// 加锁,此处加的是一把全局的锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int c = ctl.get();
// 如果状态值 c 是 RUNNING,或者 就会进入这个判断语句
//
if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {
// 如果这个线程已经处于运作状态,会抛出异常
if (t.isAlive())
throw new IllegalThreadStateException();
//workers 是一个列表,用于存储 Worker 对象
workers.add(w);
// 获取 Worker 的数量
int s = workers.size();
//largestPoolSize 用来记录线程池达到过的最大线程数
if (s > largestPoolSize)
largestPoolSize = s;
// 标记 Worker 已经被添加
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
// 先判断 Worker 是否已经被添加到 workers 内了
if (workerAdded) {
// 这是该方法核心的启动线程方法
t.start();
// 标记 Worker 已经开始运行了
workerStarted = true;
}
}
} finally {
// 如果没有标记 Worker 已经开始工作,会在这里销毁掉 Worker
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
五 一点唠叨
先总结一下线程池的业务逻辑:
1 接收到 task (即实现了 Runnable 接口的实例对象) [execute(…) 方法]

2 用 task 去尝试创建一个 Worker 实例 [execute(…) 方法]
2.1 如果 Worker 数量没有达到线程池的指定最大值 -> 新建
2.2 如果 Worker 数量达到了线程池的指定最大值 -> 不会再创建,而是把 task 储存起来等待空闲的 Worker 去提取
2.3 如果 task 队列也已经满了,无法再添加 -> 触发拒绝机制(handler)

3 Worker 在执行的时候调用其内部的 Thread 实例对象的 start() 方法 [addWorker(…) 方法]

4 该 start() 方法会调用到 Worker 的 run() 方法 [Worker.class 内的 run() 方法]

5 Worker 的 run() 方法本质上是封装了 task 的 run() 方法 [runWorker(…) 方法]
主线业务逻辑不算复杂,比较艰难的是为了保证数据的一致性,线程池代码中充斥着大量的状态判断和锁机制。
并且为了考虑性能问题,线程池的设计没有使用悲观锁(synchronized 关键字),而是大量使用了 ASQ 和 ReetrentLock 机制。

本文仅为个人的学习笔记,可能存在错误或者表述不清的地方,有缘补充

正文完
 0