线程池调用execute提交工作—>创立Worker(设置属性thead、firstTask)—>worker.thread.start()—>实际上调用的是worker.run()—>线程池的runWorker(worker)—>worker.firstTask.run();
runWork办法 外围(线程复用)
final void runWorker(Worker w) {
//获取以后线程
Thread wt = Thread.currentThread();
//获取w的firstTask
Runnable task = w.firstTask;
//设置w的firstTask为null
w.firstTask = null;
// 开释锁,设置AQS的state为0,容许中断
w.unlock();
//用于标识线程是否异样终止,finally中processWorkerExit()办法会有不同逻辑
boolean completedAbruptly = true;
try {
//循环调用getTask()获取工作,一直从工作缓存队列获取工作并执行
while (task != null || (task = getTask()) != null) {
//进入循环外部,代表曾经获取到可执行的工作,则对worker对象加锁,保障线程在执行工作过程中不会被中断
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) || //若线程池状态大于等于STOP,那么意味着该线程要中断
(Thread.interrupted() && //线程被中断
runStateAtLeast(ctl.get(), STOP))) && //且是因为线程池外部状态变动而被中断
!wt.isInterrupted()) //确保该线程未被中断
//收回中断请求
wt.interrupt();
try {
//开始执行工作前的Hook办法
beforeExecute(wt, task);
Throwable thrown = null;
try {
//到这里正式开始执行工作
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行工作后的Hook办法
afterExecute(task, thrown);
}
} finally {
//置空task,筹备通过getTask()获取下一个工作
task = null;
//completedTasks递增
w.completedTasks++;
//开释掉worker持有的独占锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
//到这里,线程执行完结,须要执行完结线程的一些清理工作
//线程执行完结可能有两种状况:
//1.getTask()返回null,也就是说,这个worker的使命完结了,线程执行完结
//2.工作执行过程中产生了异样
//第一种状况,getTask()返回null,那么getTask()中会将workerCount递加
//第二种状况,workerCount没有进行解决,这个递加操作会在processWorkerExit()中解决
processWorkerExit(w, completedAbruptly);
}
}
循环取工作 geTask()
private Runnable getTask() {
//标识以后线程是否超时未能获取到task对象
boolean timedOut = false;
for (;;) {
//获取线程池的管制状态
int c = ctl.get();
//获取线程池的运行状态
int rs = runStateOf(c);
//如果线程池状态大于等于STOP,或者处于SHUTDOWN状态,并且阻塞队列为空,线程池工作线程数量递加,办法返回null,回收线程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//获取worker数量
int wc = workerCountOf(c);
//标识以后线程在闲暇时,是否应该超时回收
// 如果allowCoreThreadTimeOut为ture,或以后线程数大于外围池大小,则须要超时回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果worker数量大于maximumPoolSize(有可能调用了 setMaximumPoolSize(),导致worker数量大于maximumPoolSize)
if ((wc > maximumPoolSize || (timed && timedOut)) //或者获取工作超时
&& (wc > 1 || workQueue.isEmpty())) { //workerCount大于1或者阻塞队列为空(在阻塞队列不为空时,须要保障至多有一个工作线程)
if (compareAndDecrementWorkerCount(c))
//线程池工作线程数量递加,办法返回null,回收线程
return null;
//线程池工作线程数量递加失败,跳过残余局部,持续循环
continue;
}
try {
//如果容许超时回收,则调用阻塞队列的poll(),只在keepAliveTime工夫内期待获取工作,一旦超过则返回null
//否则调用take(),如果队列为空,线程进入阻塞状态,有限时期待工作,直到队列中有可取工作或者响应中断信号退出
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//若task不为null,则返回胜利获取的task对象
if (r != null)
return r;
// 若返回task为null,示意线程闲暇工夫超时,则设置timeOut为true
timedOut = true;
} catch (InterruptedException retry) {
//如果此worker产生了中断,采取的计划是重试,没有超时
//在哪些状况下会产生中断?调用setMaximumPoolSize(),shutDown(),shutDownNow()
timedOut = false;
}
}
}
ps:线程池工作线程执行工作产生异样如何解决?
processWorkerExit办法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 失常的话再runWorker的getTask办法workerCount曾经被减一了
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 累加线程的completedTasks
completedTaskCount += w.completedTasks;
// 从线程池中移除超时或者出现异常的线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试进行线程池
tryTerminate();
int c = ctl.get();
// runState为RUNNING或SHUTDOWN
if (runStateLessThan(c, STOP)) {
// 线程不是异样完结
if (!completedAbruptly) {
// 线程池最小闲暇数,容许core thread超时就是0,否则就是corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min == 0然而队列不为空要保障有1个线程来执行队列中的工作
if (min == 0 && !workQueue.isEmpty())
min = 1;
// 线程池还不为空那就不必放心了
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 1.线程异样退出
// 2.线程池为空,然而队列中还有工作没执行,看addWoker办法对这种状况的解决
addWorker(null, false);
}
}
发表回复