共计 3484 个字符,预计需要花费 9 分钟才能阅读完成。
线程池调用 execute 提交工作—> 创立 Worker(设置属性 thead、firstTask)—>worker.thread.start()—> 实际上调用的是 worker.run()—> 线程池的 runWorker(worker)—>worker.firstTask.run();
runWork 办法 外围 (线程复用)
final void runWorker(Worker w) {
// 获取以后线程
Thread wt = Thread.currentThread();
// 获取 w 的 firstTask
Runnable task = w.firstTask;
// 设置 w 的 firstTask 为 null
w.firstTask = null;
// 开释锁,设置 AQS 的 state 为 0,容许中断
w.unlock();
// 用于标识线程是否异样终止,finally 中 processWorkerExit() 办法会有不同逻辑
boolean completedAbruptly = true;
try {// 循环调用 getTask() 获取工作, 一直从工作缓存队列获取工作并执行
while (task != null || (task = getTask()) != null) {
// 进入循环外部,代表曾经获取到可执行的工作,则对 worker 对象加锁,保障线程在执行工作过程中不会被中断
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) || // 若线程池状态大于等于 STOP,那么意味着该线程要中断
(Thread.interrupted() && // 线程被中断
runStateAtLeast(ctl.get(), STOP))) && // 且是因为线程池外部状态变动而被中断
!wt.isInterrupted()) // 确保该线程未被中断
// 收回中断请求
wt.interrupt();
try {
// 开始执行工作前的 Hook 办法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 到这里正式开始执行工作
task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
} finally {
// 执行工作后的 Hook 办法
afterExecute(task, thrown);
}
} finally {// 置空 task,筹备通过 getTask() 获取下一个工作
task = null;
//completedTasks 递增
w.completedTasks++;
// 开释掉 worker 持有的独占锁
w.unlock();}
}
completedAbruptly = false;
} finally {
// 到这里,线程执行完结,须要执行完结线程的一些清理工作
// 线程执行完结可能有两种状况://1.getTask() 返回 null,也就是说,这个 worker 的使命完结了,线程执行完结
//2. 工作执行过程中产生了异样
// 第一种状况,getTask() 返回 null,那么 getTask() 中会将 workerCount 递加
// 第二种状况,workerCount 没有进行解决,这个递加操作会在 processWorkerExit() 中解决
processWorkerExit(w, completedAbruptly);
}
}
循环取工作 geTask()
private Runnable getTask() {
// 标识以后线程是否超时未能获取到 task 对象
boolean timedOut = false;
for (;;) {
// 获取线程池的管制状态
int c = ctl.get();
// 获取线程池的运行状态
int rs = runStateOf(c);
// 如果线程池状态大于等于 STOP,或者处于 SHUTDOWN 状态,并且阻塞队列为空,线程池工作线程数量递加,办法返回 null,回收线程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();
return null;
}
// 获取 worker 数量
int wc = workerCountOf(c);
// 标识以后线程在闲暇时,是否应该超时回收
// 如果 allowCoreThreadTimeOut 为 ture,或以后线程数大于外围池大小,则须要超时回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果 worker 数量大于 maximumPoolSize(有可能调用了 setMaximumPoolSize(), 导致 worker 数量大于 maximumPoolSize)
if ((wc > maximumPoolSize || (timed && timedOut)) // 或者获取工作超时
&& (wc > 1 || workQueue.isEmpty())) { //workerCount 大于 1 或者阻塞队列为空(在阻塞队列不为空时,须要保障至多有一个工作线程)if (compareAndDecrementWorkerCount(c))
// 线程池工作线程数量递加,办法返回 null,回收线程
return null;
// 线程池工作线程数量递加失败,跳过残余局部,持续循环
continue;
}
try {// 如果容许超时回收,则调用阻塞队列的 poll(),只在 keepAliveTime 工夫内期待获取工作,一旦超过则返回 null
// 否则调用 take(),如果队列为空,线程进入阻塞状态,有限时期待工作,直到队列中有可取工作或者响应中断信号退出
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 若 task 不为 null,则返回胜利获取的 task 对象
if (r != null)
return r;
// 若返回 task 为 null,示意线程闲暇工夫超时,则设置 timeOut 为 true
timedOut = true;
} catch (InterruptedException retry) {
// 如果此 worker 产生了中断,采取的计划是重试,没有超时
// 在哪些状况下会产生中断?调用 setMaximumPoolSize(),shutDown(),shutDownNow()
timedOut = false;
}
}
}
ps: 线程池工作线程执行工作产生异样如何解决?
processWorkerExit 办法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 失常的话再 runWorker 的 getTask 办法 workerCount 曾经被减一了
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 累加线程的 completedTasks
completedTaskCount += w.completedTasks;
// 从线程池中移除超时或者出现异常的线程
workers.remove(w);
} finally {mainLock.unlock();
}
// 尝试进行线程池
tryTerminate();
int c = ctl.get();
// runState 为 RUNNING 或 SHUTDOWN
if (runStateLessThan(c, STOP)) {
// 线程不是异样完结
if (!completedAbruptly) {
// 线程池最小闲暇数,容许 core thread 超时就是 0,否则就是 corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果 min == 0 然而队列不为空要保障有 1 个线程来执行队列中的工作
if (min == 0 && !workQueue.isEmpty())
min = 1;
// 线程池还不为空那就不必放心了
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 1. 线程异样退出
// 2. 线程池为空,然而队列中还有工作没执行,看 addWoker 办法对这种状况的解决
addWorker(null, false);
}
}
正文完