前景回顾
大家好呀,我是小张,咱们在上一篇中 面试官: 给我讲讲线程池 (中),通过线程池的常见 API 作为切入点,剖析了execute 办法的源码,其中学到了 Doug Lea 老爷子把一个变量拆成两个变量的骚操作、无锁开发(CAS)、如何并发控制线程池状态等,以及最初通过源码浏览给大家带来了我认为高效浏览源码的方法论。
如果还没有读过上一篇的小伙伴请先浏览上一篇,在本篇中将会围绕以下几个 API 的源码浏览中开展。
- processWorkerExit
- shutdown
- shutdownNow
- awaitTermination
工作线程退出
咱们持续上一篇中 worker 的 runWorker()的 worker 线程完结开始讲,上面我将代码中其余流程简化只展现了外围代码。
final void runWorker(Worker w) {
boolean completedAbruptly = true;
try {while (task != null || (task = getTask()) != null) {
try {beforeExecute(wt, task);
Throwable thrown = null;
try {task.run();
} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
} finally {afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();}
}
completedAbruptly = false;
} finally {processWorkerExit(w, completedAbruptly);
}
}
在上一篇中咱们通过剖析只有当 getTask()办法返回 null 的时候,while 循环才会完结,才会进入 processWorkerExit()办法中。
其中 getTask()会在线程池状态扭转时(变为非 Running),或者当获取工作超时且当前工作线程数大于外围线程数返回 null。
那么理所应当的 processWorkerExit 办法就是执行工作线程的清理工作,上面将通过源码带大家看看是如何进行工作线程的清理的。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果非正常完结, 将工作线程数 -1
if (completedAbruptly)
decrementWorkerCount();
// 获取锁, 该锁次要为了操作工作线程汇合
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 汇总该 worker 线程实现的工作数
completedTaskCount += w.completedTasks;
// 从 worker 汇合中删除
workers.remove(w);
} finally {mainLock.unlock();
}
// 因为线程池的状态不晓得在何时会被批改,所以须要尝试去完结线程池
tryTerminate();
int c = ctl.get();
// 当 Running 或 SHUTDOWN 状态
if (runStateLessThan(c, STOP)) {
// 如果非异常中断, 计算最小应存活工作线程数
if (!completedAbruptly) {
// 如容许外围线程超时, 最小值 0 否则为外围线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 当最小值为 0 时, 工作队列还有工作时, 至多还须要一个线程去解决
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 判断当前工作线程数是否小于最小值, 如是间接返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 如果异常中断或当前工作线程数小于最小值, 须要从新增加工作线程
addWorker(null, false);
}
}
通过上方剖析,咱们能够推断出工作线程清理过程是先清理 worker 汇合,再会执行尝试完结线程池,而后如果线程池状态处于 Running 或 SHUTDOWN 时,将会计算最小工作线程数 min,保障当前工作线程数总是有 min 个存活。有小伙伴可能会好奇,为什么会去尝试去完结线程池,其实情理很简略,线程池何时被完结对于程序是未知的,所以须要在每个线程通过的中央来去判断状态是否变动。
敞开线程池
在理论场景中,敞开线程池有两种形式,一种调用 shutdown 一种调用shutdownNow,两者区别在于前者线程池还能够持续解决线程池中工作,后者将会中断所有工作并将未执行的工作返回。上面我将通过比照的形式,让大家更分明的浏览其中的不同。
通过上图红框中,首先两者要扭转的状态不同,一个要扭转为 SHUTDOWN 一个要扭转为 STOP 状态。其次一个要中断闲暇线程,一个要中断所有线程。如果有小伙伴不理解这里的线程池状态,请到上一篇文章浏览。
那么线程池是如何被扭转状态,工作线程又是如何被中断的呢,浏览上面的源码将恍然大悟。
advanceRunState
// 推动线程池状态
private void advanceRunState(int targetState) {for (;;) {int c = ctl.get();
if (
// 如果以后状态大于等于指标状态则 break
runStateAtLeast(c, targetState) ||
// 如果以后状态小于指标状态则应用 CAS 尝试去批改, 批改胜利则 break
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
上方的代码非常简单,在多线程中并发中,线程池的状态随时可能发生变化。所以这里须要死循环通过 CAS 的形式去批改线程池状态,保障原子性。
interruptIdleWorkers
private void interruptIdleWorkers() {interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
// 咱们须要操作 worker 汇合, 所以上锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历 worker 汇合
for (Worker w : workers) {
Thread t = w.thread;
if (
// 线程非中断
!t.isInterrupted()
// 尝试获取 worker 的锁, 判断 worker 是否闲暇
&& w.tryLock()) {
try {
// 闲暇 worker, 间接应用中断
t.interrupt();} catch (SecurityException ignore) { } finally {
// 开释 worker 的锁
w.unlock();}
}
// 如只中断一个, 间接 break
if (onlyOne)
break;
}
} finally {
// 开释操作 worker 汇合的锁
mainLock.unlock();}
}
浏览下来,该办法浏览并不艰难,然而在 worker 是否闲暇处,应用了 tryLock 办法去判断 worker 是否闲暇,小伙伴们可能会疑难为什么 tryLock 胜利就是闲暇的 worker 线程呢?这里咱们就须要联合 runWorker()办法去看。
如上图,咱们会发现工作线程池要么处于期待①处返回 task 工作,要么处于③执行工作。咱们会发现在获取 task 返回时,就会进入②,这里不相熟 AQS 的小伙伴能够把②处认为是标记 worker 线程被占用。工作执行结束将会调用④办法,标记开释 worker 线程。所以咱们能够得悉,只有尝试占用胜利的都是没有获取到 task 工作的 worker 线程,换言之这些线程就是闲暇线程。
interruptWorkers
依据字面意思,该办法作用是中断所有工作线程池,不论是否正在执行工作。
private void interruptWorkers() {
// 获取占用 worker 汇合的锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历 worker 汇合
for (Worker w : workers)
// 间接中断
w.interruptIfStarted();} finally {
// 开释锁
mainLock.unlock();}
}
void interruptIfStarted() {
Thread t;
if (
// 0 为未被占用、1 为被占用
getState() >= 0
&&
// 线程不会空且未被标记为中断, 则调用线程的中断
(t = thread) != null && !t.isInterrupted()) {
try {
// 线程中断
t.interrupt();} catch (SecurityException ignore) {}}
}
咱们会发现中断所有线程和中断闲暇线程的办法中,惟一不同的是这个办法应用了 getState()大于等于 0 来判断线程状态。
那么为什么是大于等于 0 呢,咱们回到 worker 中的 lock 和 unlock 办法中。
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
Worker(Runnable firstTask) {setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
相熟 AQS 的小伙伴必定会明确,调用 lock 办法就会将状态改为 1,unlock 办法将会把状态改为 0。咱们回到 worker 类的构造函数中,会发现第一条就是先把状态设置为 -1,到这里是不是恍然大悟了,这就解释了为什么应用大于等 0 来判断线程状态,为了就是刚创立实现的 worker 线程是不须要被中断的。
tryTerminate
咱们会发现在 shutdown 和 shutdownNow 办法的最初都调用了 tryTerminate 办法,那么其中都做了什么操作?
final void tryTerminate() {for (;;) {
// 获取线程池状态
int c = ctl.get();
// 查看是否能够 Terminate
if (
// 状态处于 Running
isRunning(c) ||
// 大于等于 TIDYING=TIDYING、TERMINATED
runStateAtLeast(c, TIDYING) ||
// 处于 SHUTDOWN 状态且工作队列非空
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// ①执行到此阐明状态处于 SHUTDOWN 且工作队列为空, 阐明能够开始终结线程池了
if (workerCountOf(c) != 0) {
// 如果当前工作线程不为 0, 则中断 1 个工作线程, 具体为什么是一个, 下文会有解释
interruptIdleWorkers(ONLY_ONE);
return;
}
// 执行至此, 阐明工作线程数已为 0, 且状态处于 SHUTDOWN, 工作队列为空
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 尝试 CAS 批改状态为 TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 钩子办法, 子类实现
terminated();} finally {
// 钩子办法执行胜利, 设置状态为 TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 唤醒所有期待 TERMINATED 的线程, 下文有会有解释
termination.signalAll();}
return;
}
} finally {mainLock.unlock();
}
// else retry on failed CAS
}
}
浏览完该办法的小伙伴必定不了解,①处代码为什么判断工作线程不等于 0,只中断一个工作线程,而不是中断所有工作线程。咱们先上论断,不中断所有线程的起因是为了,到时候还须要至多一个线程执行①前面的代码,也就是须要线程去批改线程状态,不然就没有线程就批改状态了。
听到这个论断,小伙伴必定是一头雾水,当初我看到这段源码也是一头雾水,不晓得老爷子为什么这么设计。直到我在 IDE 里点击该办法查看它的援用时,所有就恍然大悟了。
如上图咱们会发现,tryTerminate()办法,在 processWorkerExit()办法中被援用到了,咱们在 上一篇剖析 runWorker 办法中会发现该办法处于,工作线程 run 的 finally 块中,也就是所有工作线程执行实现就会调用该办法。
到这里咱们晓得了工作线程的最初都将执行 tryTerminate 办法,然而还不能解释为什么工作线程数为 0,而不是 1 呢,不是说保障至多一个工作线程存活执行就能够了吗?
咱们的条件是工作线程数等于 0,那么必定有什么中央进行数量缩小,而且这个办法要处于 runWorker 办法外面。顺藤摸瓜,跟着这个思路咱们找到了 getTask()办法,咱们会发现①处,当处于 SHUTDOWN 时工作队列为空或状态为 STOP、TIDYING、TERMINATED 其中之一,就将工作线程数减一。其次咱们会发现当工作线程处于②处期待获取 task 时被中断的中断异样将会被③处抓住,而后从新回到①处缩小工作线程数,也就是说最初必定会存在最初一个线程将此处工作线程数减为 0,而后就去批改线程池状态到终态。
一图胜千言,上面我通过流程图向大家展现外部是如何运行的。
咱们将所有流程串联起来,左右两边是同时在运行的,所以这也就解释了为什么每次中断一个且工作线程等于 0 才会持续往下执行了。
期待线程池完结
通过下面简明扼要的阐述,咱们终于来到了最初一个办法中,最初一个办法也是一个非常简单的办法,我置信小伙伴们有了下面的根底,再来看这一段代码,几乎是小巫见大巫。
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {for (;;) {
// 如果以后状态为 TERMINATED, 就返回 true
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
// 已超时, 返回 false
if (nanos <= 0)
return false;
// 未超时, 持续期待
nanos = termination.awaitNanos(nanos);
}
} finally {mainLock.unlock();
}
}
状态为 TERMINATED 间接返回、超时间接返回这两种状况都很好了解,那如果我在期待期间线程状态变为 TERMINATED 了呢。是不是须要某个人来将我从期待中唤醒。
咱们发现在 tryTerminate 办法设置 TERMINATED 胜利当前,马上就告诉了所有在期待完结的线程了,那么这就串上了。
写在最初
看到这里的小伙伴,小张非常感谢你,谢谢你可能保持看到这里。其次你也得感激本人,感激本人的保持看到这里,将一个线程池外围源码盘然巨物给浏览完了。置信在这个过程,你也学习到了不少常识。在理论业务中,你可能不会写出那么简单的代码,各种状态充斥在代码中,然而老爷子其中的不少思维都是咱们能够借鉴学习。比方咱们在第一篇中写到形象模板办法、第二篇中 CAS 无锁编程、第三篇如何中断执行中的线程等等。