共计 9485 个字符,预计需要花费 24 分钟才能阅读完成。
前景回顾
在上一篇中咱们通过线程池的继承关系,具体分析了线程池的形象父类 AbstractExecutorService 中的 submit、invokeAll、invokeAny 办法。在本篇中,咱们将会把眼帘放在 ThreadPoolExecutor 具体实现当中,通过源码剖析咱们将会明确 7 个参数是如何在源码中运行的。
应用场景
咱们先回顾一下在理论场景下的业务代码,上面模仿了 10 个线程并行处理工作,而后进行线程池承受,最初期待线程池敞开。
public static void main(String[] args) throws InterruptedException {
// 开启线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
// 开启 10 个工作并行处理
for (int i = 0; i < 10; i++) {executor.execute(() -> {
// 模仿业务代码
try {Thread.sleep(1000);
System.out.println("工作完结");
} catch (InterruptedException e) {e.printStackTrace();
}
});
}
// 暂停线程池工作接管
executor.shutdown();
// 期待线程池完结
executor.awaitTermination(1,TimeUnit.MINUTES);
}
构造函数
总共重载了 4 个构造函数,设置了默认的参数,这种设计思路大家能够借鉴,上面只展现了其中两个重要的构造函数。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
// 设置默认工厂, 工厂中返回线程优先级为一般并且为非守护的线程
Executors.defaultThreadFactory(),
// 默认回绝策略为回绝产生时间接抛出异样
defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 判断参数边界
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
// 设置平安管理器, 不在本篇思考范畴内
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
// 设置外围线程数
this.corePoolSize = corePoolSize;
// 设置最大线程数
this.maximumPoolSize = maximumPoolSize;
// 设置工作队列
this.workQueue = workQueue;
// 设置线程闲暇工夫
this.keepAliveTime = unit.toNanos(keepAliveTime);
// 设置线程工厂
this.threadFactory = threadFactory;
// 设置回绝策略
this.handler = handler;
}
execute 办法
public void execute(Runnable command) {
// 边界判断
if (command == null)
throw new NullPointerException();
/**
判断当前工作线程数是否小于外围线程数
这里的 ctl 能够先认为它保留了线程池的工作线程数量和线程池状态
为什么一个变量能够示意两种状态前面会解释到
**/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 增加工作线程, 返回是否胜利创立, 胜利创立则返回
if (addWorker(command, true))
return;
/**
如果创立没胜利, 则从新获取线程池状态, 对于线程池具体状态会在下文形容
从新获取的起因在于 execute 是线程平安的办法
那么就会存在多线程调用, 在此期间线程池状态可能会发生变化, 敞开或有新工作增加
所以从新获取线程池状态放弃最新的状态
**/
c = ctl.get();}
/**
运行到这, 阐明当前工作线程大于外围线程数或者创立工作线程不胜利(线程池非 Running)
判断以后线程是否运行并且工作队列是否胜利增加工作
**/
if (isRunning(c) && workQueue.offer(command)) {
// 从新查看线程池状态
int recheck = ctl.get();
// 如非运行状态且可能删除删除, 则回绝工作
if (! isRunning(recheck) && remove(command))
reject(command);
// 如工作线程数为 0, 则增加工作线程, 此种状况产生在工作线程在闲暇工夫销毁时
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/**
运行到此阐明, 线程池状态非 running 或增加工作队列不胜利
则尝试增加工作线程, 如果增加不胜利, 则回绝工作
**/
else if (!addWorker(command, false))
reject(command);
}
总结:浏览完 execute 办法后,咱们能够总结线程池会先在 小于外围线程数 的时增加外围工作线程,在工作队列无奈增加工作时增加非核心工作线程,在线程池 非 running 状态 或工作队列满且工作线程满 时回绝工作。
回顾咱们上篇提出的问题:当咱们创立外围线程数 10 个,最大线程数 20 个,工作队列为无界队列的线程池,并同时来了 30 个工作。
问题一:请问线程池中的线程数为多少?
问题二:那如果我把工作队列改为大小为 20 的队列,那么当初最多能够接管多少申请?
通过源码的浏览咱们当初能够很简略的答复这两个问题。
- 问题一:通过源码可知前 10 个工作间接去创立外围工作线程,因为工作队列是无界的因而后 20 个工作间接退出了工作队列期待外围工作线程生产。
- 问题二:如把工作队列改为容量为 20 的队列,那么现可承受最大(最大线程数 + 队列容量)=40 个申请。
在浏览 execute 办法时,咱们把 ctl 属性、addWorker 当做了黑盒,只是通过作者正文和办法命名去判断办法大抵做了什么操作,并且咱们都晓得 execute 是一个线程平安的办法,它能够由不同的线程去调用,然而在源码中咱们也没有发现加锁的局部,小伙伴们必定十分好奇这些底层办法是如何做到这些的。
CTL
// 类型为原子整数类, 增删改查都是原子操作
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 代表共有 32-3=29 位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 代表最大容量为 2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 将线程池状态贮存在整数字节的高位中, 代表高 3 位代表线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 因高 3 位代表线程池状态, 此办法将低 29 位变为 0 就能够失去高 3 位状态
private static int runStateOf(int c) {return c & ~CAPACITY;}
// 与上雷同, 将高 3 位变为 0, 失去工作线程数量
private static int workerCountOf(int c) {return c & CAPACITY;}
// 将 rs 和 wc 的进行或运算
private static int ctlOf(int rs, int wc) {return rs | wc;}
// 判断 c 是否小于 s
private static boolean runStateLessThan(int c, int s) {return c < s;}
// 判断 c 是否大于等于 s
private static boolean runStateAtLeast(int c, int s) {return c >= s;}
// 因为只有 Running 小于 shutdown 通过此办法来判断
private static boolean isRunning(int c) {return c < SHUTDOWN;}
// 尝试应用 CAS 的形式给 ctl+1
private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);
}
// 尝试应用 CAS 的形式给 ctl-1
private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);
}
通过上方对于 ctl 的源码,咱们能够看出作者将一个整数变量分为了两个局部,一部分用来示意线程池状态,另一部分来示意当前工作线程数,将高 3 位来示意线程池以后状态,后 29 位示意线程池大小。通过这里骚的面试官又能够出题了,问最大线程数最大能够设置为多少,又要杀倒一片。
或者会有小伙伴不懂位运算看不懂该段逻辑,又是左移又是右移的各种位运算,但其实先把办法大抵的性能理解了并不影响前面源码的浏览。
因为该篇篇幅无限,举荐想要刨根问底的小伙伴查问一下问运算的材料。
线程池状态
在 ctl 属性的局部,咱们会发现有如下几个枚举状态,那么都代表什么意思呢?
- RUNNING:容许接管新工作并解决在工作队列中的工作。
- SHUTDOWN:不接管新工作但解决在工作队列中的工作。
- STOP:不接管新工作、不解决工作队列中工作、中断在解决中的工作
- TIDYING:所有工作已完结、工作线程数为 0、并会调用 terminated()钩子办法
- TERMINATED:terminated()钩子办法胜利执行
在线程池中状态是这样子流转的:
- RUNNING -> SHUTDOWN:调用线程池的 shutdown()办法。
- (RUNNING/SHUTDOWN) -> STOP:调用线程池的 shutdownNow()办法。
- SHUTDOWN -> TIDYING:当工作队列和工作线程都为空时。
- STOP -> TIDYING:当工作线程为空时。
- TIDYING -> TERMINATED:当 terminated()钩子办法胜利执行。
addWorker
该办法将会创立工作线程,并将创立数量管制在外围线程数或最大线程数,其中的 firstTask 为工作线程创立胜利后执行的第一个工作,第二个参数代表是否为外围工作线程,最终返回线程是否创立胜利。
private final HashSet<Worker> workers = new HashSet<Worker>();
private final ReentrantLock mainLock = new ReentrantLock();
private boolean addWorker(Runnable firstTask, boolean core) {
// 给最外层循环设置标记, 且该循环为死循环
retry:
for (;;) {
// 获取 ctl 值
int c = ctl.get();
// 获取线程池状态
int rs = runStateOf(c);
if (
// 判断是否为 SHUTDOWN、STOP、TIDYING、TERMINATED 其中之一
rs >= SHUTDOWN
&&
/**
只有在 SHUTDOWN 且无工作须要执行且工作队列非空的时候该段逻辑返回 true
代表须要持续增加工作队列执行工作队列中工作
**/
! (
// 状态为 SHUTDOWN
rs == SHUTDOWN &&
// 无第一个工作须要执行
firstTask == null &&
// 工作队列非空
! workQueue.isEmpty())
)
return false;
// 死循环
for (;;) {
// 获取当前工作线程数
int wc = workerCountOf(c);
if (
// 如以后数量超过最大容量间接返回
wc >= CAPACITY
||
// 如创立为外围工作线程则与最大外围线程大小比拟, 否则与最大线程数大小比拟
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS 减少工作线程数, 增加超过完结最外层循环
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS 执行没胜利, 值产生扭转, 须要从新读取 CTL 的值
c = ctl.get();
// 如线程池状态产生扭转, 从新执行最外层循环
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创立工作线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 因该段代码将会对 HashSet 进行操作,所以应用重入锁加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 从新获取状态
int rs = runStateOf(ctl.get());
if (
// rs 为 RUNNING 状态
rs < SHUTDOWN
||
// 这种状况为池中工作线程达到空时工夫被销毁但工作队列还有工作时
(rs == SHUTDOWN && firstTask == null)) {
// 预查看线程是否能够启动
if (t.isAlive())
throw new IllegalThreadStateException();
// 将工作线程增加进 workers 汇合中
workers.add(w);
// 记录工作线程数量最大达到的数量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 工作线程增加示意设置为胜利
workerAdded = true;
}
} finally {
// 可重入锁解锁
mainLock.unlock();}
// 如工作线程增加胜利, 将工作线程启动
if (workerAdded) {t.start();
workerStarted = true;
}
}
} finally {
// 如工作线程启动失败, 将工作线程从汇合中去除
if (! workerStarted)
addWorkerFailed(w);
}
// 返回工作线程是否增加胜利
return workerStarted;
}
通过该段源码,它在适当的机会把咱们的 task 工作传递给了工作线程并创立,并将创立胜利的工作线程退出汇合中。其中 CAS 死循环的模式,是咱们开发中能够借鉴学习的模式。
worker
通过源码,将 task 传递到 worker 中,并调用了 start()办法,那么阐明 worker 中必定是一个线程并且有它本人的 run 办法,那么咱们就很有必要探寻其中是如何进行编码的。
上图是 Worker 类的继承关系图,能够看出 Worker 继承了 AQS、实现了Runnable 办法,那么咱们就能够大胆的猜想他实现了某种锁的机制、并且能够被线程执行。
worker 结构器
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
// 这里先看做设置标记
setState(-1);
// 设置第一个将会执行的工作
this.firstTask = firstTask;
/**
通过最开始通过线程池结构器传入的线程池工厂创立线程
因为 worker 实现 Runnable 接口, 那么它就能够通过传入新线程中
能够推断出调用了 thread.start()就会执行 worker 的 run()办法
**/
this.thread = getThreadFactory().newThread(this);
}
run 办法
public void run() {runWorker(this);
}
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
final void runWorker(Worker w) {Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 设置标记可被中断
w.unlock();
boolean completedAbruptly = true;
try {
while (
// firstTask 不为空或能够从工作队列中获取到工作
task != null || (task = getTask()) != null) {w.lock();
// 中断判断, 在下篇介绍
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 钩子办法, 子类实现
beforeExecute(wt, task);
Throwable thrown = null;
// 调用 task 的 run 办法, 并抓住所有异样, 由钩子办法解决
try {task.run();
} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
} finally {
// 钩子办法, 子类实现
afterExecute(task, thrown);
}
} finally {
// 将 task 置空
task = null;
// 实现的工作数 +1
w.completedTasks++;
// 标记 worker 可用
w.unlock();}
}
completedAbruptly = false;
} finally {
// 执行工作线程退出
processWorkerExit(w, completedAbruptly);
}
}
通过该段代码,可用剖析出该段代码通过 while 循环始终从 getTask()中获取工作,那么上面剖析 getTask 办法。
getTask
private Runnable getTask() {
boolean timedOut = false;
// 死循环
for (;;) {
// 获取以后状态
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (
// SHUTDOWN、STOP、TIDYING、TERMINATED
rs >= SHUTDOWN &&
// STOP、TIDYING、TERMINATED 或工作队列为空
(rs >= STOP || workQueue.isEmpty())) {
// 工作线程数量 -1
decrementWorkerCount();
// return null 之后下层 runWorker 将会退出 while 循环执行工作线程退出
return null;
}
// 获取当前工作线程数量
int wc = workerCountOf(c);
// 判断是否容许超时: 当容许外围线程超时为 true 或以后数量超过外围线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (
// 当前工作线程数量超过最大数量或容许超时并且曾经超时
(wc > maximumPoolSize || (timed && timedOut))
&&
// 工作线程大于 1 或者工作队列为空
(wc > 1 || workQueue.isEmpty())) {
// 尝试 CAS 工作线程数量 -1
if (compareAndDecrementWorkerCount(c))
return null;
// CAS 不超过, 持续下一次循环
continue;
}
try {
// 如果容许超时则调用 poll 办法期待设置定的超时工夫, 否则调用 take 办法始终阻塞期待工作获取
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 获取到工作间接返回
if (r != null)
return r;
// 执行到这阐明获取工作超时, 设置超时标记位
timedOut = true;
} catch (InterruptedException retry) {
// 线程被中断, 设置超时标记为 false, 从新下一次循环
timedOut = false;
}
}
}
通过 getTask 办法咱们能够看出,在设置容许外围线程超时或以后线程数大于外围线程数则示意超时开启,由此开关来判断调用阻塞队列中的阻塞办法还是非阻塞办法,一旦超时则返回 null 那么 worker 的 run 办法就会退出循环进入 worker 销毁过程,由此实现线程池线程数量的动静批改。
总结
本文通过通过 execute 办法作为切入点,带大家意识了 CAS 模式、锁模式以及是如何解决线程池状态。
在浏览源码的过程中,很多人喜爱刨根问底,但其实浏览源码就是一个不求甚解的过程,在理论浏览源码过程中调用栈可能会达到 5 - 6 层甚至可能更多层,这样子浏览源码其实是十分低效的,在始终往下深挖的过程中你会发现你的工夫和精力在一直的被耗费,最初只明确了源码中的一部分的逻辑分支,和咱们浏览源码的初衷齐全不同。
所以我举荐浏览源码先浏览调用栈的 1 - 2 层,再往深了就不要去深究了,等到整体逻辑都看明确了能够再回过头来去学习哪些具体细节。