共计 8816 个字符,预计需要花费 23 分钟才能阅读完成。
这篇文章对 ThreadPoolExecutor 创建的线程池如何操作线程的生命周期通过源码的方式进行详细解析。通过对 execute 方法、addWorker 方法、Worker 类、runWorker 方法、getTask 方法、processWorkerExit 从源码角度详细阐述,文末有彩蛋。
exexcte 方法
public void execute(Runnable command) {if (command == null)
throw new NullPointerException();
int c = ctl.get();
/**
* workerCountOf 方法取出低 29 位的值,表示当前活动的线程数;* 如果当前活动的线程数小于 corePoolSize,则新建一个线程放入线程池中,并把该任务放到线程中
*/
if (workerCountOf(c) < corePoolSize) {
/**
* addWorker 中的第二个参数表示限制添加线程的数量 是根据据 corePoolSize 来判断还是 maximumPoolSize 来判断;* 如果是 ture,根据 corePoolSize 判断
* 如果是 false,根据 maximumPoolSize 判断
*/
if (addWorker(command, true))
return;
/**
* 如果添加失败,则重新获取 ctl 值
*/
c = ctl.get();}
/**
* 如果线程池是 Running 状态,并且任务添加到队列中
*/
if (isRunning(c) && workQueue.offer(command)) {
//double-check,重新获取 ctl 的值
int recheck = ctl.get();
/**
* 再次判断线程池的状态,如果不是运行状态,由于之前已经把 command 添加到阻塞队列中,这时候需要从队列中移除 command;* 通过 handler 使用拒绝策略对该任务进行处理,整个方法返回
*/
if (!isRunning(recheck) && remove(command))
reject(command);
/**
* 获取线程池中的有效线程数,如果数量是 0,则执行 addWorker 方法;* 第一个参数为 null,表示在线程池中创建一个线程,但不去启动
* 第二个参数为 false,将线程池的线程数量的上限设置为 maximumPoolSize,添加线程时根据 maximumPoolSize 来判断
*/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
/**
* 执行到这里,有两种情况:* 1、线程池的状态不是 RUNNING;* 2、线程池状态是 RUNNING,但是 workerCount >= corePoolSize,workerQueue 已满
* 这个时候,再次调用 addWorker 方法,第二个参数传 false,将线程池的有限线程数量的上限设置为 maximumPoolSize;* 如果失败则执行拒绝策略;*/
} else if (!addWorker(command, false))
reject(command);
}
简单来说,在执行 execute()方法时如果状态一直是 RUNNING 时,的执行过程如下:
- 如果 workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任
务;
- 如果 workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添
加到该阻塞队列中;
- 如 果 workerCount >= corePoolSize && workerCount <
maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新
提交的任务;
- 如果 workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根
据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
这里要注意一下 addWorker(null, false);,也就是创建一个线程,但并没有传入任务,因为
任务已经被添加到 workQueue 中了,所以 worker 在执行的时候,会直接从 workQueue 中
获取任务。所以,在 workerCountOf(recheck) == 0 时执行 addWorker(null, false); 也是
为了保证线程池在 RUNNING 状态下必须要有一个线程来执行任务。
addWorker 方法
addWorker 方法的主要作用是在线程池中创建一个新的线程并执行,firstTask 参数用于指定新增的线程执行的第一个任务,core 参数为 true 表示在新增线程时会判断当前活动线程数是否少于 corePoolSize,false 表示新增线程前需要判断当前活动的线程数是否少于 maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
/**
* 由于线程执行过程中,各种情况都有可能处于,通过自旋的方式来保证 worker 的增加;*/
for (; ;) {int c = ctl.get();
// 获取线程池运行状态
int rs = runStateOf(c);
/**
*
* 如果 rs >= SHUTDOWN, 则表示此时不再接收新任务;* 接下来是三个条件 通过 && 连接,只要有一个任务不满足,就返回 false;* 1.rs == SHUTDOWN,表示关闭状态,不再接收提交的任务,但却可以继续处理阻塞队列中已经保存的任务;* 2.fisrtTask 为空
* 3.Check if queue empty only if necessary.
*/
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
for (; ;) {
// 获取线程池的线程数
int wc = workerCountOf(c);
/**
* 如果线程数 >= CAPACITY,也就是 ctl 的低 29 位的最大值,则返回 false;* 这里的 core 用来判断 限制线程数量的上限是 corePoolSize 还是 maximumPoolSize;* 如果 core 是 ture 表示根据 corePoolSize 来比较;* 如果 core 是 false 表示根据 maximumPoolSize 来比较;*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
/**
* 通过 CAS 原子的方式来增加线程数量;* 如果成功,则跳出第一个 for 循环;*/
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 如果当前运行的状态不等于 rs,说明线程池的状态已经改变了,则返回第一个 for 循环继续执行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根据 firstTask 来创建 Worker 对象
w = new Worker(firstTask);
// 每一个 Worker 对象都会创建一个线程
final Thread t = w.thread;
if (t != null) {
// 创建可重入锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 获取线程池的状态
int rs = runStateOf(ctl.get());
/**
* 线程池的状态小于 SHUTDOWN,表示线程池处于 RUNNING 状态;* 如果 rs 是 RUNNING 状态或 rs 是 SHUTDOWN 状态并且 firstTask 为 null,向线程池中添加线程;* 因为在 SHUTDOWN 状态时不会再添加新的任务,但还是处理 workQueue 中的任务;*/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//workers 是一个 hashSet
workers.add(w);
int s = workers.size();
//largestPoolSize 记录线程池中出现的最大的线程数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {mainLock.unlock();
}
if (workerAdded) {
// 启动线程,Worker 实现了 Running 方法,此时会调用 Worker 的 run 方法
t.start();
workerStarted = true;
}
}
} finally {if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker 类
线程池中的每一个对象被封装成一个 Worker 对象,ThreadPool 维护的就是一组 Worker 对象。
Worker 类继承了 AQS,并实现了 Runnable 接口,其中包含了两个重要属性:firstTask 用来保存传入的任务,thread 是在调用构造方法是通过 ThreadFactory 来创建的线程,是用来处理任务的线程。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
/**
* 把 state 设置为 -1,, 阻止中断直到调用 runWorker 方法;* 因为 AQS 默认 state 是 0,如果刚创建一个 Worker 对象,还没有执行任务时,这时候不应该被中断
*/
setState(-1);
this.firstTask = firstTask;
/**
* 创建一个线程,newThread 方法传入的参数是 this,因为 Worker 本身继承了 Runnable 接口,也就是一个线程;* 所以一个 Worker 对象在启动的时候会调用 Worker 类中 run 方法
*/
this.thread = getThreadFactory().newThread(this);
}
}
Worker 类继承了 AQS,使用 AQS 来实现独占锁的功能。为什么不使用 ReentrantLock 来实现?
可以看到 tryAcquire 方法,他是不允许重入的,而 ReentrantLock 是允许可重入的:
- lock 方法一旦获取独占锁,表示当前线程正在执行任务中;
- 如果正在执行任务,则不应该中断线程;
- 如果该线程现在不是独占锁的状态,也就是空闲状态,说明它没有处理任务,这时可以对该线程进行中断;
- 线程池中执行 shutdown 方法或 tryTerminate 方法时会调用 interruptIdleWorkers 方法来中断空闲线程,interruptIdleWorkers 方法会使用 tryLock 方法来判断线程池中的线程是否是空闲状态;
- 之所以设置为不可重入的,是因为在任务调用 setCorePoolSize 这类线程池控制的方法时,不会中断正在运行的线程
所以,Worker 继承自 AQS,用于判断线程是否空闲以及是否处于被中断。
protected boolean tryAcquire(int unused) {
/**
* cas 修改 state,不可重入;* state 根据 0 来判断,所以 Worker 构造方法中讲 state 置为 - 1 是为了禁止在执行任务前对线程进行中断;* 因此,在 runWorker 方法中会先调用 Worker 对象的 unlock 方法将 state 设置为 0
*/
if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
runWorker 方法
在 Worker 类中的 run 方法调用了 runWorker 方法来执行任务
final void runWorker(Worker w) {Thread wt = Thread.currentThread();
// 获取第一个任务
Runnable task = w.firstTask;
w.firstTask = null;
// 允许中断
w.unlock();
// 是否因异常退出循环
boolean completedAbruptly = true;
try {
// 如果 task 为空,则通过 getTask 来获取任务
while (task != null || (task = getTask()) != null) {w.lock();
/**
* 如果线程池正在停止,那么要保证当前线程时中断状态;* 如果不是的话,则要保证当前线程不是中断状态
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//beforeExecute 和 afterExecute 是留给子类来实现的
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 通过任务方式执行,不是线程方式
task.run();} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();}
}
completedAbruptly = false;
} finally {
//processWorkerExit 会对 completedAbruptly 进行判断,表示在执行过程中是否出现异常
processWorkerExit(w, completedAbruptly);
}
}
总结一下 runWorker 方法的执行过程:
- while 循环不断地通过 getTask 方法来获取任务;
- getTask 方法从阻塞队列中获取任务;
- 如果线程池正在停止,那么要保证当前线程处于中断状态,否则要保证当前线程不是中断状态;
- 调用 task.run()执行任务;
- 如果 task 为 null 则会跳出循环,执行 processWorkerExit 方法;
- runWorker 方法执行完毕,也代表着 Worker 中的 run 方法执行完毕,销毁线程。
getTask 方法
getTask 方法用于从阻塞队列中获取任务
private Runnable getTask() {
//timeout 变量的值表示上次从阻塞队列中获取任务是否超时
boolean timedOut = false;
for (; ;) {int c = ctl.get();
int rs = runStateOf(c);
/**
* 如果 rs >= SHUTDOWN,表示线程池非 RUNNING 状态,需要再次判断:* 1、rs >= STOP,线程池是否正在 STOP
* 2、阻塞队列是否为空
* 满足上述条件之一,则将 workCount 减一,并返回 null;* 因为如果当前线程池的状态处于 STOP 及以上或队列为空,不能从阻塞队列中获取任务;*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
/**
* timed 变量用于判断是否需要进行超时控制;* allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时;* wc > corePoolSize,表示当前线程数大于核心线程数量;* 对于超过核心线程数量的这些线程,需要进行超时控制;*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* wc > maximumPoolSize 的情况是因为可能在此方法执行阶段同时执行了 setMaximumPoolSize 方法;* timed && timedOut 如果为 true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时;* 接下来判断,如果有效咸亨数量大于 1,或者 workQueue 为空,那么将尝试 workCount 减 1;* 如果减 1 失败,则返回重试;* 如果 wc== 1 时,也就说明当前线程是线程池中的唯一线程了;*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
/**
* timed 为 trure,则通过 workQueue 的 poll 方法进行超时控制,如果在 keepAliveTime 时间内没有获取任务,则返回 null;* 否则通过 take 方法,如果队列为空,则 take 方法会阻塞直到队列中不为空;*/
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 如果 r ==null,说明已经超时了,timedOut = true;
timedOut = true;
} catch (InterruptedException retry) {
// 如果获取任务时当前线程发生了中断,则将 timedOut = false;
timedOut = false;
}
}
}
注意:第二个 if 判断,目的是为了控制线程池的有效线程数量。
有上文分析得到,在 execute 方法时,如果当前线程池的线程数量超过 coolPoolSize 且小于 maxmumPoolSize,并且阻塞队列已满时,则可以通过增加工作线程。但是如果工作线程在超时时间内没有获取到任务,timeOut=true,说明 workQueue 为空,也就说当前线程池不需要那么多线程来执行任务了,可以把多于的 corePoolSize 数量的线程销毁掉,保证线程数量在 corePoolSize 即可。
什么时候会销毁线程?
当然是 runWorker 方法执行完后,也就是 Worker 中的 run 方法执行完后,由 JVM 自动回收。
processWorkerExit 方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
/**
* 如果 completedAbruptly 为 true,则说明线程执行时出现异常,需要将 workerCount 数量减一
* 如果 completedAbruptly 为 false,说明在 getTask 方法中已经对 workerCount 进行减一,这里不用再减
*/
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 统计完成的任务数
completedTaskCount += w.completedTasks;
// 从 workers 中移除,也就表示从线程池中移除一个工作线程
workers.remove(w);
} finally {mainLock.unlock();
}
// 钩子函数,根据线程池的状态来判断是否结束线程池
tryTerminate();
int c = ctl.get();
/**
* 当前线程是 RUNNING 或 SHUTDOWN 时,如果 worker 是异常结束,那么会直接 addWorker;* 如果 allowCoreThreadTimeOut=true,那么等待队列有任务,至少保留一个 worker;* 如果 allowCoreThreadTimeOut=false,workerCount 少于 coolPoolSize
*/
if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
至此,processWorkerExit 执行完之后,工作线程被销毁。
工作执行流程
工作线程的生命周期,从 execute 方法开始,Worker 使用 ThreadFactory 创建新的工作线程,runWorker 通过 getTask 获取任务,然后执行任务,如果 getTask 返回 null,进入 processWorkerExit,整个线程结束。