共计 34756 个字符,预计需要花费 87 分钟才能阅读完成。
常识回顾
并发工具类咱们曾经讲了很多,这些工具类的「指标」是让咱们只关注工作自身,并且漠视线程间单干细节,简化了并发编程难度的同时,也减少了很多安全性。工具类的对使用者的「指标」尽管统一,但每一个工具类自身都有它独特的利用场景,比方:
- 我会手动创立线程,为什么要应用线程池? 介绍了应用线程池治理线程将一个大工作分解成多个子工作来简略执行,借助 不会用 Java Future,我狐疑你泡茶没我快, 又是超长图文!!的 Future 个性获取子工作执行后果——二者联合应用就能够解决简略的并行任务
- 搞定 CompletableFuture,并发异步编程和编写串行程序还有什么区别?借助 CompletableFuture 大大降低了异步编程的难度——应用串行的思维对工作进行编排执行(AND 或 OR 聚合)
- 既生 ExecutorService, 何生 CompletionService?因为工作实现工夫有先后,为防止期待阻塞——CompletionService 是批量并行任务的最佳抉择
将下面三种通用场景形象化展现一下:
联合上图置信你的脑海里曾经浮现出这几个工具类的具体实现形式,感觉这曾经涵盖了所有的并发场景。
TYTS,以上这些形式的子线程接到工作后不会再持续拆分成「子子」工作,也就是说,子线程即使接到很大或很简单的工作也得硬着头皮致力执行完,很显然这个大工作是问题要害
如果能把大工作拆分成更小的子问题,直到子问题简略到能够间接求解就好了,这就是分治的思维
分治思维
在计算机科学中,分治法是一种很重要的算法。字面上的解释是「分而治之」,就是把一个简单的问题 分成 两个或更多的雷同或类似的 子问题 ,再把子问题分成更小的子问题……直到最初子问题能够简略的间接求解,原问题的解就变成了子问题解的 合并。
这个技巧是很多高效算法的根底,如排序算法 (疾速排序,归并排序),傅立叶变换 (疾速傅立叶变换)……,如果你是搞大数据的,MapReduce 就是分支思维的典型,如果你想更具体的了解分治相干的算法,请参考这篇一文图解分治算法和思维
联合下面的形容,置信你脑海中曾经构建进去分治的模型了:
那所有的大工作都能用分治算法来解决吗?很显然不是的
分治法实用的状况
总体来说,分治法所能解决的问题个别具备以下几个特色:
- 该问题的规模放大到肯定的水平就能够容易地解决
- 该问题能够合成为若干个规模较小的雷同问题,即该问题具备最优子结构性质。
- 利用该问题合成出的子问题的解能够合并为该问题的解;
- 该问题所合成出的各个子问题是互相独立的,即子问题之间不蕴含公共的子子问题
理解了分治算法的核心思想,咱们就来看看 Java 是如何利用分治思维拆分与合并工作的吧
ForkJoin
有子工作,天然要用到多线程。咱们很早之前说过,执行子工作的线程不容许独自创立,要用线程池治理。秉承雷同设计理念,再联合分治算法,ForkJoin 框架中就呈现了 ForkJoinPool 和 ForkJoinTask。正所谓:
天对地,雨对风。大陆对长空。山花对海树,赤曰对天穹
套用已有常识,简略了解就是这样滴:
咱们之前说过无数次,JDK 不会反复造轮子,这里谈及类似是为了让大家有个简略的直观印象,内里必定有所差异,咱们先大抵看一下这两个类:
ForkJoinTask
又是这个男人,Doug Lea
,怎么就那么牛(破音)
/**
* Abstract base class for tasks that run within a {@link ForkJoinPool}.
* A {@code ForkJoinTask} is a thread-like entity that is much
* lighter weight than a normal thread. Huge numbers of tasks and
* subtasks may be hosted by a small number of actual threads in a
* ForkJoinPool, at the price of some usage limitations.
*
* @since 1.7
* @author Doug Lea
*/
public abstract class ForkJoinTask<V> implements Future<V>, Serializable
能够看到 ForkJoinTask
实现了 Future
接口(那就是具备 Future 接口的个性),同样如其名,fork()
和 join()
天然是它的两个外围办法
fork()
: 异步执行一个子工作(下面说的拆分)join()
: 阻塞以后线程期待子工作的执行后果(下面说的合并)
另外,从下面代码中能够看出,ForkJoinTask
是一个抽象类,在分治模型中,它还有两个形象子类 RecursiveAction
和 RecursiveTask
那这两个子抽象类有什么差异呢?如果你关上 IDE,你应该一眼就能看出差异,so easy
public abstract class RecursiveAction extends ForkJoinTask<Void>{
...
/**
* The main computation performed by this task.
*/
protected abstract void compute();
...
}
public abstract class RecursiveTask<V> extends ForkJoinTask<V>{
...
protected abstract void compute();
...
}
两个类外面都定义了一个 形象 办法 compute()
,须要子类重写实现具体逻辑
那子类要遵循什么逻辑重写这个办法呢?
遵循分治思维,重写的逻辑很简略,就是答复三个问题:
- 什么时候进一步拆分工作?
- 什么时候满足最小可执行工作,即不再进行拆分?
- 什么时候汇总子工作后果
用「伪代码」再翻译一下下面这段话,大略就是这样滴:
if(工作小到不必持续拆分){间接计算失去后果}else{
拆分子工作
调用子工作的 fork()进行计算
调用子工作的 join()合并计算结果}
(作为程序员,如果你写过递归运算,这个逻辑了解起来是非常简单的)
介绍到这里,就能够用 ForkJoin 干些事件了——经典 Fibonacci 计算就能够用分治思维(不信,你逐条依照下面 分治算法实用状况 自问自答一下?),间接借用官网 Docs(留神看 compute 办法),额定增加个 main 办法来看一下:
@Slf4j
public class ForkJoinDemo {public static void main(String[] args) {
int n = 20;
// 为了追踪子线程名称,须要重写 ForkJoinWorkerThreadFactory 的办法
final ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool -> {final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("my-thread" + worker.getPoolIndex());
return worker;
};
// 创立分治工作线程池,能够追踪到线程名称
ForkJoinPool forkJoinPool = new ForkJoinPool(4, factory, null, false);
// 疾速创立 ForkJoinPool 办法
// ForkJoinPool forkJoinPool = new ForkJoinPool(4);
// 创立分治工作
Fibonacci fibonacci = new Fibonacci(n);
// 调用 invoke 办法启动分治工作
Integer result = forkJoinPool.invoke(fibonacci);
log.info("Fibonacci {} 的后果是 {}", n, result);
}
}
@Slf4j
class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) {this.n = n;}
@Override
public Integer compute() {
// 和递归相似,定义可计算的最小单元
if (n <= 1) {return n;}
// 想查看子线程名称输入的能够关上上面正文
//log.info(Thread.currentThread().getName());
Fibonacci f1 = new Fibonacci(n - 1);
// 拆分成子工作
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
// f1.join 期待子工作执行后果
return f2.compute() + f1.join();
}
}
执行后果如下:
停顿到这里,置信根本的应用就曾经搞定了,下面代码中应用了 ForkJoinPool,那问题来了:
池化既然是一类思维,Java 曾经有了
ThreadPoolExecutor
,为什么又要搞出个ForkJoinPool
呢?
借助上面这张图,先来回顾一下 ThreadPoolExecutor 的实现原理(详情请看为什么要应用线程池):
一眼就能看进去这是典型的 生产者 / 消费者
模式,消费者线程都从一个共享的 Task Queue 中生产提交的工作 。ThreadPoolExecutor 简略的并行操作次要是为了 执行工夫不确定的工作(I/O 或定时工作等)
JDK 反复造轮子是不可能的,分治思维其实也能够了解成一种父子工作依赖的关系,当依赖层级十分深,用 ThreadPoolExecutor
来解决这种关系很显然是不太事实的,所以 ForkJoinPool
作为性能补充就呈现了
ForkJoinPool
工作拆分后有依赖关系,还得缩小线程之间的竞争,那就让线程执行属于本人的 task 就能够了呗,所以较 ThreadPoolExecutor
的单个 TaskQueue 的模式,ForkJoinPool
是多个 TaskQueue 的模式,简略用图来示意,就是这样滴:
有多个工作队列,所以在 ForkJoinPool 中就有一个数组模式的成员变量 WorkQueue[]
。那问题又来了
工作队列有多个,提交的工作放到哪个队列中呢?(上图中的
Router Rule
局部)
这就须要一套路由规定,从下面的代码 Demo 中能够了解,提交的工作次要有两种:
- 有内部间接提交的(submission task)
- 也有工作本人 fork 进去的(worker task)
为了进一步辨别这两种 task,Doug Lea 就设计一个简略的路由规定:
- 将
submission task
放到 WorkQueue 数组的「偶数」
下标中 - 将
worker task
放在 WorkQueue 的「奇数」
下标中,并且只有奇数下标才有线程 (worker) 与之绝对
应部分丰盛一下上图就是这样滴:
每个工作执行工夫都是不一样的(当然是在 CPU 眼里),执行快的线程的工作队列的工作就可能是空的,为了最大化利用 CPU 资源,就容许闲暇线程拿取其它工作队列中的内容,这个过程就叫做 work-stealing
(工作窃取)
以后线程要执行一个工作,其余线程还有可能过去窃取工作,这就会产生竞争,为了缩小竞争,WorkQueue 就设计成了一个双端队列:
- 反对 LIFO(last-in-first-out) 的 push(放)和 pop(拿)操作——操作 top 端
- 反对 FIFO (first-in-first-out) 的 poll(拿)操作——操作 base 端
线程(worker)操作本人的 WorkQueue 默认是 LIFO 操作(可选 FIFO),当线程(worker)尝试窃取其余 WorkQueue 里的工作时,这个时候执行的是 FIFO 操作,即从 base 端窃取,用图丰盛一下就是这样滴:
这样的益处非常明显了:
- LIFO 操作只有对应的 worker 能力执行,push 和 pop 不须要思考并发
- 拆分时,越大的工作越在 WorkQueue 的 base 端,尽早合成,可能尽快进入计算
从 WorkQueue 的成员变量的修饰符中也能看出一二了(base 有 volatile 润饰,而 top 却没有):
volatile int base; // index of next slot for poll
int top; // index of next slot for push
到这里,置信你曾经理解 ForkJoinPool 的根本实现原理了,但也会随同着很多疑难(这都是怎么实现的?),比方:
- 有竞争就须要锁,ForkJoinPool 是如何管制状态的呢?
- ForkJoinPool 的线程数是怎么管制的呢?
- 下面说的路由规定的具体逻辑是什么呢?
- ……
保留住这些问题,一点点看源码来理解一下吧:
源码剖析(JDK 1.8)
ForkJoinPool 的源码波及到大量的位运算,这里会把外围局部说分明,想要了解的更深刻,还须要大家本人一点点追踪查看
联合下面的铺垫,你应该晓得 ForkJoinPool 里有三个重要的角色:
- ForkJoinWorkerThread(继承 Thread):就是咱们下面说的线程(Worker)
- WorkQueue:双向的工作队列
- ForkJoinTask:Worker 执行的对象
源码剖析的整个流程也是围绕这几个类的办法来阐明,但在理解这三个角色之前,咱们须要先理解 ForkJoinPool 都为这三个角色铺垫了哪些内容
故事就得从 ForkJoinPool 的构造方法说起
ForkJoinPool 构造方法
public ForkJoinPool() {this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism) {this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();}
除了以上三个构造方法之外,在 JDK1.8 中还减少了另外一种初始化 ForkJoinPool 对象的形式(QQ:这是什么设计模式?):
static final ForkJoinPool common;
/**
* @return the common pool instance
* @since 1.8
*/
public static ForkJoinPool commonPool() {
// assert common != null : "static init error";
return common;
}
Common 是在动态块外面初始化的(只会被执行一次):
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {public ForkJoinPool run() {return makeCommonPool(); }});
private static ForkJoinPool makeCommonPool() {
int parallelism = -1;
... 其余默认初始化内容
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
// 执行下面的构造方法
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
因为这是一个单例通用的 ForkJoinPool,所以切记:
如果应用通用 ForkJoinPool,最好只做 CPU 密集型的计算操作,不要有不确定性的 I/O 内容在工作外面,以防拖垮整体
下面所有的构造方法最初都会调用这个公有办法:
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
参数有点多,在这里解释一下每个参数的含意:
序号 | 参数名 | 形容 / 解释 |
---|---|---|
1 | parallelism | 并行度,这并不是定义的线程数,具体线程数,以及 WorkQueue 的长度等都是依据这个并行度来计算的,通过下面 makeCommonPool 办法能够晓得,parallelism 默认值是 CPU 外围线程数减 1 |
2 | factory | 很常见了,创立 ForkJoinWorkerThread 的工厂接口 |
3 | handler | 每个线程的异样处理器 |
4 | mode | 下面说的 WorkQueue 的模式,LIFO/FIFO; |
5 | workerNamePrefix | ForkJoinWorkerThread 的前缀名称 |
6 | ctl | 线程池的外围控制线程字段 |
在构造方法中就曾经有位运算了,太难了:
想晓得 ForkJoinPool 的成员变量 config 要表白的意思,就要认真拆开来看
static final int SMASK = 0xffff; // short bits == max index
this.config = (parallelism & SMASK) | mode;
parallelism & SMASK
其实就是要保障并行度的值不能大于 SMASK,下面所有的构造方法在传入 parallelism 的时候都会调用 checkParallelism
来查看合法性:
static final int MAX_CAP = 0x7fff; // max #workers - 1
private static int checkParallelism(int parallelism) {if (parallelism <= 0 || parallelism > MAX_CAP)
throw new IllegalArgumentException();
return parallelism;
}
能够看到 parallelism 的最大值就是 MAX_CAP
了,0x7fff
必定小于0xffff
。所以 config 的值其实就是:
this.config = parallelism | mode;
这里假如 parallelism 就是 MAX_CAP
, 而后与 mode 进行 或运算
,其中 mode 有三种:
- LIFO_QUEUE
- FIFO_QUEUE
- SHARED_QUEUE
上面以 LIFO_QUEUE 和 FIFO_QUEUE 举例说明:
// Mode bits for ForkJoinPool.config and WorkQueue.config
static final int MODE_MASK = 0xffff << 16; // top half of int
static final int LIFO_QUEUE = 0;
static final int FIFO_QUEUE = 1 << 16;
static final int SHARED_QUEUE = 1 << 31; // must be negative
所以 parallelism | mode
依据 mode 的不同会产生两种后果,然而会失去一个确认的信息:
config 的第 17 位示意模式,低 15 位示意并行度 parallelism
当咱们须要从 config 中获取模式 mode 时候,只须要用 mode 掩码(MODE_MASK)和 config 做 与运算
就能够了
所以一张图概括 config 就是:
long np = (long)(-parallelism); // offset ctl counts
下面这段代码就是将并行度 parallelism 补码转换为 long 型,以 MAX_CAP
作为并行度为例,np 的值就是
这个 np 的值,就会用作 ForkJoinPool 成员变量 ctl 的计算:
// Active counts 沉闷线程数
private static final int AC_SHIFT = 48;
private static final long AC_UNIT = 0x0001L << AC_SHIFT;
private static final long AC_MASK = 0xffffL << AC_SHIFT;
// Total counts 总线程数
private static final int TC_SHIFT = 32;
private static final long TC_UNIT = 0x0001L << TC_SHIFT;
private static final long TC_MASK = 0xffffL << TC_SHIFT;
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
// 计算 ctl
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
np << AC_SHIFT
即 np 向左挪动 48 位,这样原来的低 16 位变成了高 16 位,再用 AC 掩码(AC_MASK)做与运算
,也就是说 ctl 的 49 ~ 64 位示意沉闷线程数np << TC_SHIFT
即 np 向左挪动 32 位,这样原来的低 16 位变成了 33 ~ 48 位,再用 TC 掩码做与运算
,也就是说 ctl 的 33 ~ 48 位示意总线程数
最初二者再进行或运算,如果并行度还是 MAX_CAP
,那 ctl 的最初后果就是:
到这里,咱们才浏览完一个构造函数的内容,从最终的论断能够看出,初始化后 AC = TC,并且 ctl 是一个小于零的数,ctl 是 64 位的 long 类型,低 32 位是如何结构的并没有在构造函数中体现进去,但正文给了明确的阐明:
/*
* Bits and masks for field ctl, packed with 4 16 bit subfields:
* AC: Number of active running workers minus target parallelism
* TC: Number of total workers minus target parallelism
* SS: version count and status of top waiting thread
* ID: poolIndex of top of Treiber stack of waiters
*
* When convenient, we can extract the lower 32 stack top bits
* (including version bits) as sp=(int)ctl. The offsets of counts
* by the target parallelism and the positionings of fields makes
* it possible to perform the most common checks via sign tests of
* fields: When ac is negative, there are not enough active
* workers, when tc is negative, there are not enough total
* workers. When sp is non-zero, there are waiting workers. To
* deal with possibly negative fields, we use casts in and out of
* "short" and/or signed shifts to maintain signedness.
*
* Because it occupies uppermost bits, we can add one active count
* using getAndAddLong of AC_UNIT, rather than CAS, when returning
* from a blocked join. Other updates entail multiple subfields
* and masking, requiring CAS.
*/
这段正文次要阐明了低 32 位的作用(前面会从源码中体现进去,这里先有个印象会对前面源码浏览有帮忙),按正文含意先欠缺一下 ctl 的值:
- SS:栈顶工作线程状态和版本数(每一个线程在挂起时都会持有前一个期待线程所在工作队列的索引,由此形成一个期待的工作线程栈,栈顶是最新期待的线程),第一位示意状态
1:不流动(inactive)
;0:流动(active)
,后 15 示意版本号,避免 ABA 问题 - ID: 栈顶工作线程所在工作队列的索引
正文中还说,另 sp=(int)ctl
,即获取 64 位 ctl 的低 32 位(SS | ID
),因为低 32 位都是创立出线程之后才会存在的值,所以推断出,如果 sp != 0,就存在期待的工作线程,唤醒应用就行,不必创立新的线程。这样就通过 ctl 能够获取到无关线程所须要的所有信息了
除了构造方法所构建的成员变量,ForkJoinPool 还有一个十分重要的成员变量 runState
,和你之前理解的常识一样,线程池也须要状态来进行治理
volatile int runState; // lockable status
// runState bits: SHUTDOWN must be negative, others arbitrary powers of two
private static final int RSLOCK = 1; // 线程池被锁定
private static final int RSIGNAL = 1 << 1; // 线程池有线程须要唤醒
private static final int STARTED = 1 << 2; // 线程池曾经初始化
private static final int STOP = 1 << 29; // 线程池进行
private static final int TERMINATED = 1 << 30; // 线程池终止
private static final int SHUTDOWN = 1 << 31; // 线程池敞开
runState
有下面 6 种状态切换,按正文所言,只有 SHUTDOWN
状态是正数,其余都是整数,在并发环境更改状态必然要用到锁,ForkJoinPool 对线程池加锁和解锁别离由 lockRunState
和 unlockRunState
来实现 (这两个办法能够暂且不必深刻了解,能够临时跳过,只须要了解它们是帮忙平安更改线程池状态的锁即可)
不深刻理解能够,然而我不能不写啊 …… 你待会不是得回看吗?
lockRunState
/**
* Acquires the runState lock; returns current (locked) runState.
*/
// 从办法正文中看到,该办法肯定会返回 locked 的 runState,也就是说肯定会加锁胜利
private int lockRunState() {
int rs;
return ((((rs = runState) & RSLOCK) != 0 ||
!U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ?
awaitRunStateLock() : rs);
}
- 因为 RSLOCK = 1,如果 runState & RSLOCK == 0,则阐明目前没有加锁,进入
或运算
的下半段 CAS - 先通过 CAS 尝试加锁,尝试胜利间接返回,尝试失败则要调用
awaitRunStateLock
办法
/**
* Spins and/or blocks until runstate lock is available. See
* above for explanation.
*/
private int awaitRunStateLock() {
Object lock;
boolean wasInterrupted = false;
for (int spins = SPINS, r = 0, rs, ns;;) {
// 判断是否加锁(== 0 示意未加锁)if (((rs = runState) & RSLOCK) == 0) {
// 通过 CAS 加锁
if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) {if (wasInterrupted) {
try {
// 重置线程终端标记
Thread.currentThread().interrupt();
} catch (SecurityException ignore) {// 这里居然 catch 了个寂寞}
}
// 加锁胜利返回最新的 runState,for 循环的惟一失常进口
return ns;
}
}
else if (r == 0)
r = ThreadLocalRandom.nextSecondarySeed();
else if (spins > 0) {
r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift
if (r >= 0)
--spins;
}
// Flag1 如果是其余线程正在初始化占用锁,则调用 yield 办法让出 CPU,让其疾速初始化
else if ((rs & STARTED) == 0 || (lock = stealCounter) == null)
Thread.yield(); // initialization race
// Flag2 如果其它线程持有锁,并且线程池曾经初始化,则将唤醒位标记为 1
else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {
// 进入互斥锁
synchronized (lock) {
// 再次判断,如果等于 0,阐明进入互斥锁前刚好有线程进行了唤醒,就不必期待,间接进行唤醒操作即可,否则就进入期待
if ((runState & RSIGNAL) != 0) {
try {lock.wait();
} catch (InterruptedException ie) {if (!(Thread.currentThread() instanceof
ForkJoinWorkerThread))
wasInterrupted = true;
}
}
else
lock.notifyAll();}
}
}
}
下面代码 33 ~ 34(Flag1)行以及 36 ~ 50(Flag2)行,如果你没看后续代码,当初来了解是有些艰难的,我这里先提前阐明一下:
Flag1: 当残缺的初始化 ForkJoinPool 时,间接利用了 stealCounter 这个原子变量,因为初始化时(调用 externalSubmit 时),才会对 StealCounter 赋值。所以,这里的逻辑是,当状态不是 STARTED 或者 stealCounter 为空,让出线程期待,也就是说,别的线程还没初始化齐全,让其持续占用锁初始化即可
Flag2: 咱们在讲期待 / 告诉模型时就说,不要让有限自旋尝试,如果资源不满足就期待,如果资源满足了就告诉,所以,如果 (runState & RSIGNAL) == 0
成立,阐明有线程须要唤醒,间接唤醒就好,否则也别浪费资源,被动期待一会
当浏览到这的代码时,马上就抛出来两个问题:
Q1: 既然是加锁,为什么不必已有的轮子 ReentrantLock 呢?
PS:如果你读过并发系列 Java AQS 队列同步器以及 ReentrantLock 的利用,你会晓得 ReentrantLock 是用一个残缺字段 state 来管制同步状态。但这里在竞争锁的时候还会判断线程池的状态,如果是初始化状态被动 yield 放弃 CPU 来缩小竞争;另外,用一个残缺的 runState 不同位来示意状态也体现出更细的粒度吧
Q2: synchronized 大法虽好,然而咱们都晓得这是比拟重量级的锁,为什么还在这里利用了呢?
PS: 首先 synchronized 通过一直优化,没有它刚诞生时那么重,另外依照 Flag 2 的代码含意,进入 synchronized 同步块的概率还是很低的,能够用最简略的形式稳稳兜底(奥卡姆剃刀了原理?)
有加锁天然要解锁,向下看 unlockRunState
unlockRunState
解锁的逻辑绝对简略多了,总体目标是革除锁标记位。如果顺利将状态批改为指标状态,天然解锁胜利;否则示意有别的线程进入了 wait,须要调用 notifyAll 唤醒,从新尝试竞争
/**
* Unlocks and sets runState to newRunState.
*
* @param oldRunState a value returned from lockRunState
* @param newRunState the next value (must have lock bit clear).
*/
private void unlockRunState(int oldRunState, int newRunState) {if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) {
Object lock = stealCounter;
runState = newRunState; // clears RSIGNAL bit
if (lock != null)
synchronized (lock) {lock.notifyAll(); }
}
}
这两个办法贯通着后续代码剖析的始终,多留神 unlockRunState
的入参即可,另外你也看到了告诉都是用的 notifyAll,而不是 notify,这个问题咱们之前重点阐明过,你还记得为什么吗?如果不记得,关上并发编程之期待告诉机制 回顾一下吧
第一层常识铺垫曾经差不多了,后退
invoke/submit/execute
回到本文最开始带有 main 函数的 demo,咱们向 ForkJoinPool 提交工作调用的是 invoke 办法, 其实 ForkJoinPool 还反对 submit 和 execute 两种形式来提交工作。并发的玩法十分相似,这三类办法的作业也很好辨别:
- invoke:提交工作,并期待返回执行后果
- submit:提交并立即返回工作,ForkJoinTask 实现了 Future,能够充分利用 Future 的个性
- execute:只提交工作
在这三大类根底上又重载了几个更细粒度的办法,这里不一一列举:
public <T> T invoke(ForkJoinTask<T> task) {if (task == null)
throw new NullPointerException();
externalPush(task);
return task.join();}
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {if (task == null)
throw new NullPointerException();
externalPush(task);
return task;
}
public void execute(ForkJoinTask<?> task) {if (task == null)
throw new NullPointerException();
externalPush(task);
}
置信你曾经发现了,提交工作的办法都会调用 externalPush(task) 这个用法,源码的配角终于要退场了
然而 ……
如果你看 externalPush 代码,第一行就是申明一个 WorkQueue 数组变量,为了后续流程更加丝滑,咱还得铺垫一点 WorkQueue 的常识(又要铺垫)
WorkQueue
一看这么多成员变量,还是很慌的,不过,咱们只须要把我几个次要的就足够了
// 初始队列容量
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
// 最大队列容量
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
// Instance fields
volatile int scanState; // versioned, <0: inactive; odd:scanning
int stackPred; // pool stack (ctl) predecessor 后任池(WorkQueue[])索引,由此形成一个栈
int nsteals; // number of steals 偷取的工作个数
int hint; // randomization and stealer index hint 记录偷取者的索引,不便前面顺藤摸瓜
int config; // pool index and mode
volatile int qlock; // 1: locked, < 0: terminate; else 0
volatile int base; // index of next slot for poll
int top; // index of next slot for push
ForkJoinTask<?>[] array; // the elements (initially unallocated) 工作数组
final ForkJoinPool pool; // the containing pool (may be null)
final ForkJoinWorkerThread owner; // owning thread or null if shared 当前工作队列的工作线程,共享模式下为 null
volatile Thread parker; // == owner during call to park; else null 调用 park 阻塞期间为 owner,其余状况为 null
volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin 记录以后 join 来的工作
volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer 记录从其余工作队列偷取过去的工作
咱们下面说了,WorkQueue 是一个双端队列,线程池有 runState,WorkQueue 有 scanState
- 小于零:inactive (未激活状态)
- 奇数:scanning(扫描状态)
- 偶数:running(运行状态)
操作线程池须要锁,操作队列也是须要锁的,qlock 就派上用场了
- 1: 锁定
- 0:未锁定
- 小于零:终止状态
WorkQueue 中也有个 config,然而和 ForkJoinPool 中的是不一样的,WorkQueue 中的 config 记录了该 WorkQueue 在 WorkQueue[] 数组的下标以及 mode
其余字段的含意咱们就写在代码正文中吧,配角从新退场,这次是真的
externalPush
文章后面说过,task 会细分成 submission task
和 worker task
,worker task
是 fork
进去的,那从这个入口进入的,天然也就是 submission task
了,也就是说:
- 通过
invoke()
|submit()
|execute()
等办法提交的 task, 是submission task
,会放到 WorkQueue 数组的 偶数 索引地位- 调用
fork()
办法生成出的工作,叫 worker task,会放到 WorkQueue 数组的 奇数 索引地位
该办法上的正文也写的很分明,具体请参考代码正文
/**
* Tries to add the given task to a submission queue at
* submitter's current queue. Only the (vastly) most common path
* is directly handled in this method, while screening for need
* for externalSubmit.
*
* @param task the task. Caller must ensure non-null.
*/
final void externalPush(ForkJoinTask<?> task) {WorkQueue[] ws; WorkQueue q; int m;
//Flag1: 通过 ThreadLocalRandom 产生随机数,用于上面计算槽位索引
int r = ThreadLocalRandom.getProbe();
int rs = runState; // 初始状态为 0
//Flag2: 如果 ws,即 ForkJoinPool 中的 WorkQueue 数组曾经实现初始化,且依据随机数定位的 index 存在 workQueue, 且 cas 的形式加锁胜利
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
// 对 WorkQueue 操作加锁
U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a; int am, n, s;
//WorkQueue 中的工作数组不为空
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) { // 组长度大于工作个数,不须要扩容
int j = ((am & s) << ASHIFT) + ABASE; //WorkQueue 中的工作数组不为空
U.putOrderedObject(a, j, task); // 向 Queue 中放入工作
U.putOrderedInt(q, QTOP, s + 1);//top 值加一
U.putIntVolatile(q, QLOCK, 0); // 对 WorkQueue 操作解锁
// 工作个数小于等于 1,那么此槽位上的线程有可能期待,如果大家都没工作,可能都在期待,新工作来了,唤醒,起来干活了
if (n <= 1)
// 唤醒可能存在期待的线程
signalWork(ws, q);
return;
}
// 工作入队失败,后面加锁了,这里也要解锁
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
//Flag3: 不满足上述条件,也就是说下面的这些 WorkQueue[]等都不存在,就要通过这个办法所有从头开始创立
externalSubmit(task);
}
下面加了三处 Flag,为了让大家更好的了解代码还是有必要做进一步阐明的:
Flag1: ThreadLocalRandom 是 ThreadLocal 的衍生物,每个线程默认的 probe 是 0,当线程调用 ThreadLocalRandom.current()时,会初始化 seed 和 probe,保护在线程外部,这里就晓得是生成一个随机数就好,具体细节还是值得大家自行看一下
Flag2: 这里蕴含的信息还是十分多的
// 二进制为:0000 0000 0000 0000 0000 0000 0111 1110
static final int SQMASK = 0x007e; // max 64 (even) slots
- m 的值代表 WorkQueue 数组的最大下表
- m & r 会保障随机数 r 大于 m 的局部不可用
- m & r & SQMASK 因为 SQMASK 最初一位是 0,最终的后果就会是偶数
- r != 0 阐明以后线程曾经初始化过一些内容
- rs > 0 阐明 ForkJoinPool 的 runState 也曾经被初始化过
Flag3: 看过 flag2 的形容,你也就很好了解 Flag 3 了,如果是第一次提交工作,必走 Flag 3 的 externalSubmit
办法
externalSubmit
这个办法很长,但没超过 80 行,具体请看办法正文
// 初始化所须要的所有
private void externalSubmit(ForkJoinTask<?> task) {
int r; // initialize caller's probe
// 生成随机数
if ((r = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();}
for (;;) {WorkQueue[] ws; WorkQueue q; int rs, m, k;
boolean move = false;
// 如果线程池的状态为终止状态,则帮忙终止
if ((rs = runState) < 0) {tryTerminate(false, false); // help terminate
throw new RejectedExecutionException();}
//Flag1: 再判断一次状态是否为初始化,因为在 lockRunState 过程中有可能状态被别的线程更改了
else if ((rs & STARTED) == 0 || // initialize
((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
int ns = 0;
//Flag1.1: 加锁
rs = lockRunState();
try {if ((rs & STARTED) == 0) {
// 初始化 stealcounter 的值(工作窃取计数器,原子变量)U.compareAndSwapObject(this, STEALCOUNTER, null,
new AtomicLong());
// create workQueues array with size a power of two
// 取 config 的低 16 位(确切说是低 15 位),获取并行度
int p = config & SMASK; // ensure at least 2 slots
//Flag1.2: 如果你看过 HashMap 的源码,这个就很好了解了,获取 2 次幂大小
int n = (p > 1) ? p - 1 : 1;
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
// 初始化 WorkQueue 数组
workQueues = new WorkQueue[n];
// 标记初始化实现
ns = STARTED;
}
} finally {
// 解锁
unlockRunState(rs, (rs & ~RSLOCK) | ns);
}
}
//Flag2 下面剖析过,取偶数位槽位,将工作放进偶数槽位
else if ((q = ws[k = r & m & SQMASK]) != null) {
// 对 WorkQueue 加锁
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a = q.array;
int s = q.top;
// 初始化工作提交标识
boolean submitted = false; // initial submission or resizing
try { // locked version of push
// 计算内存偏移量,放工作,更新 top 值
if ((a != null && a.length > s + 1 - q.base) ||
(a = q.growArray()) != null) {int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
// 提交工作胜利
submitted = true;
}
} finally {
//WorkQueue 解锁
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
// 工作提交胜利了
if (submitted) {
// 天然要唤醒可能存在期待的线程来解决工作了
signalWork(ws, q);
return;
}
}
// 工作提交没胜利,能够从新计算随机数,再走一次流程
move = true; // move on failure
}
//Flag3: 接 Flag2,如果找到的槽位是空,则要初始化一个 WorkQueue
else if (((rs = runState) & RSLOCK) == 0) { // create new queue
q = new WorkQueue(this, null);
// 设置工作队列的窃取线索值
q.hint = r;
// 如下面 WorkQueue 中 config 的介绍,记录以后 WorkQueue 在 WorkQueue[]数组中的值,和队列模式
q.config = k | SHARED_QUEUE;
// 初始化为 inactive 状态
q.scanState = INACTIVE;
// 加锁
rs = lockRunState(); // publish index
if (rs > 0 && (ws = workQueues) != null &&
k < ws.length && ws[k] == null)
ws[k] = q; // else terminated
// 解锁
unlockRunState(rs, rs & ~RSLOCK);
}
else
move = true; // move if busy
if (move)
r = ThreadLocalRandom.advanceProbe(r);
}
}
Flag1.1 : 有个细节须要说一下,咱们在 Java AQS 队列同步器以及 ReentrantLock 的利用 时提到过应用锁的范式以及为什么要这样用,ForkJoinPool
这里同样遵循这种范式
Lock lock = new ReentrantLock();
lock.lock();
try{...}finally{lock.unlock();
}
Flag1.2: 简略形容这个过程,就是依据不同的并行度来初始化不同大小的 WorkQueue[]数组,数组大小要求是 2 的 n 次幂,所以给大家个表格直观了解一下并行度和队列容量的关系:
并行度 p | 容量 |
---|---|
1,2 | 4 |
3,4 | 8 |
5 ~ 8 | 16 |
9 ~ 16 | 32 |
Flag 1,2,3: 如果你了解了下面这个办法,很显然,第一次执行这个办法外部的逻辑程序应该是 Flag1
——> Flag3
——>Flag2
externalSubmit 如果工作胜利提交,就会调用 signalWork
办法了
signalWork
后面铺垫的常识要大规模派上用场(一大波僵尸来袭),are you ready?
如果 ForkJoinPool 的 ctl 成员变量的作用曾经忘了,连忙向上翻从新记忆一下
// 常量值
static final int SS_SEQ = 1 << 16; // version count
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c; int sp, i; WorkQueue v; Thread p;
// ctl 小于零,阐明流动的线程数 AC 不够
while ((c = ctl) < 0L) { // too few active
// 取 ctl 的低 32 位,如果为 0,阐明没有期待的线程
if ((sp = (int)c) == 0) { // no idle workers
// 取 TC 的高位,如果不等于 0,则阐明目前的工作着还没有达到并行度
if ((c & ADD_WORKER) != 0L) // too few workers
// 增加 Worker,也就是说要创立线程了
tryAddWorker(c);
break;
}
// 未开始或者已进行,间接跳出
if (ws == null) // unstarted/terminated
break;
//i= 闲暇线程栈顶端所属的工作队列索引
if (ws.length <= (i = sp & SMASK)) // terminated
break;
if ((v = ws[i]) == null) // terminating
break;
// 程序执行到这里,阐明有闲暇线程,计算下一个 scanState,减少了版本号,并且调整为 active 状态
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
int d = sp - v.scanState; // screen CAS
// 计算下一个 ctl 的值,流动线程数 AC + 1,通过 stackPred 获得前一个 WorkQueue 的索引,从新设置回 sp,行程最终的 ctl 值
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
// 更新 ctl 的值
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs; // activate v
// 如果有线程阻塞,则调用 unpark 唤醒即可
if ((p = v.parker) != null)
U.unpark(p);
break;
}
// 没有工作,间接跳出
if (q != null && q.base == q.top) // no more work
break;
}
}
假如程序刚开始执行,那么流动线程数以及总线程数必定都没达到并行度要求,这时就会调用 tryAddWorker
办法了
tryAddWorker
tryAddWorker 的逻辑就非常简单了,因为是操作线程池,同样会用到 lockRunState
/unlockRunState
的锁管制
private void tryAddWorker(long c) {
// 初始化增加 worker 表识
boolean add = false;
do {
// 因为要增加 Worker,所以 AC 和 TC 都要加一
long nc = ((AC_MASK & (c + AC_UNIT)) |
(TC_MASK & (c + TC_UNIT)));
//ctl 还没被扭转
if (ctl == c) {
int rs, stop; // check if terminating
if ((stop = (rs = lockRunState()) & STOP) == 0)
// 更新 ctl 的值,add = U.compareAndSwapLong(this, CTL, c, nc);
unlockRunState(rs, rs & ~RSLOCK);
if (stop != 0)
break;
//ctl 值更新胜利,开始真正的创立 Worker
if (add) {createWorker();
break;
}
}
// 从新获取 ctl,并且没有达到最大线程数,并且没有闲暇的线程
} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}
一切顺利,就要调用 createWorker 办法来创立真正的 Worker 了,局势逐步清朗
createWorker
介绍过了 WorkerQueue 和 ForkJoinTask,上文说的三个重要角色中的最初一个 ForkJoinWorkerThread
终于退场了
private boolean createWorker() {
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
// 如果工厂曾经存在了,就用 factory 来创立线程,会去注册线程,这里的 this 就是 ForkJoinPool 对象
if (fac != null && (wt = fac.newThread(this)) != null) {
// 启动线程
wt.start();
return true;
}
} catch (Throwable rex) {ex = rex;}
// 如果创立线程失败,就要逆向登记线程,包含后面对 ctl 等的操作
deregisterWorker(wt, ex);
return false;
}
Worker 线程是如何与 WorkQueue 对应的,就藏在 fac.newThread(this)
这个办法外面,上面这点代码展现一下调用过程
public ForkJoinWorkerThread newThread(ForkJoinPool pool);
static final class DefaultForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {return new ForkJoinWorkerThread(pool);
}
}
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// Use a placeholder until a useful name can be set in registerWorker
super("aForkJoinWorkerThread");
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}
很显然核心内容在 registerWorker
办法外面了
registerWorker
WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
this.pool = pool;
this.owner = owner;
// Place indices in the center of array (that is not yet allocated)
base = top = INITIAL_QUEUE_CAPACITY >>> 1;
}
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
UncaughtExceptionHandler handler;
// 这里线程被设置为守护线程,因为,当只剩下守护线程时,JVM 就会推出
wt.setDaemon(true); // configure thread
// 填补解决异样的 handler
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
// 创立一个 WorkQueue,并且设置以后 WorkQueue 的 owner 是以后线程
WorkQueue w = new WorkQueue(this, wt);
int i = 0; // assign a pool index
// 又用到了 config 的常识,提取出咱们冀望的 WorkQueue 模式
int mode = config & MODE_MASK;
// 加锁
int rs = lockRunState();
try {WorkQueue[] ws; int n; // skip if no array
// 判断 ForkJoinPool 的 WorkQueue[]都初始化齐全
if ((ws = workQueues) != null && (n = ws.length) > 0) {
// 一种魔数计算形式,用以缩小抵触
int s = indexSeed += SEED_INCREMENT; // unlikely to collide
// 假如 WorkQueue 的初始长度是 16,那这里的 m 就是 15,最终目标就是为了失去一个奇数
int m = n - 1;
// 和失去偶数的计算形式一样,失去一个小于 m 的奇数 i
i = ((s << 1) | 1) & m; // odd-numbered indices
// 如果这个槽位不为空,阐明曾经被其余线程初始化过了,也就是有抵触,选取别的槽位
if (ws[i] != null) { // collision
int probes = 0; // step by approx half n
// 步长加 2,也就保障 step 还是奇数
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
// 始终遍历,直到找到空槽位,如果都遍历了一遍,那就须要对 WorkQueue[]扩容了
while (ws[i = (i + step) & m] != null) {if (++probes >= n) {workQueues = ws = Arrays.copyOf(ws, n <<= 1);
m = n - 1;
probes = 0;
}
}
}
// 初始化一个随机数
w.hint = s; // use as random seed
// 如文章后面所说,config 记录索引值和模式
w.config = i | mode;
// 扫描状态也记录为索引值,如文章后面所说,奇数示意为 scanning 状态
w.scanState = i; // publication fence
// 把初始化好的 WorkQueue 放到 ForkJoinPool 的 WorkQueue[]数组中
ws[i] = w;
}
} finally {
// 解锁
unlockRunState(rs, rs & ~RSLOCK);
}
// 设置 worker 的前缀名,用于业务辨别
wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
// 返回以后线程创立的 WorkQueue,回到上一层调用栈,也就将 WorkQueue 注册到 ForkJoinWorkerThread 外面了
return w;
}
到这里线程是顺利创立胜利了,可是如果线程没有创立胜利,就须要 deregisterWorker 来做善后工作了
deregisterWorker
deregisterWorker 办法接管刚刚创立的线程援用和异样作为参数,来做善后工作,将 registerWorker 相干工作撤销回来
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
WorkQueue w = null;
if (wt != null && (w = wt.workQueue) != null) {WorkQueue[] ws; // remove index from array
// 获取以后线程注册的索引值
int idx = w.config & SMASK;
// 加锁
int rs = lockRunState();
// 如果奇数槽位都不为空,则清空内容
if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
ws[idx] = null;
// 解锁
unlockRunState(rs, rs & ~RSLOCK);
}
long c; // decrement counts
// 死循环式 CAS 更改 ctl 的值,将后面 AC 和 TC 加 1 的值再减 1,ctl 就在那里,不增不减
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
(TC_MASK & (c - TC_UNIT)) |
(SP_MASK & c))));
// 清空 WorkQueue,将其中的 task 勾销掉
if (w != null) {
w.qlock = -1; // ensure set
w.transferStealCount(this);
w.cancelAll(); // cancel remaining tasks}
// 可能的替换操作
for (;;) { // possibly replace
WorkQueue[] ws; int m, sp;
// 如果线程池终止了,那就跳出循环即可
if (tryTerminate(false, false) || w == null || w.array == null ||
(runState & STOP) != 0 || (ws = workQueues) == null ||
(m = ws.length - 1) < 0) // already terminating
break;
// 以后线程创立失败,通过 sp 判断,如果还存在闲暇线程,则调用 tryRelease 来唤醒这个线程,而后跳出
if ((sp = (int)(c = ctl)) != 0) { // wake up replacement
if (tryRelease(c, ws[sp & m], AC_UNIT))
break;
}
// 如果没闲暇线程,并且还没有达到满足并行度的条件,那就得再次尝试创立一个线程,补救刚刚的失败
else if (ex != null && (c & ADD_WORKER) != 0L) {tryAddWorker(c); // create replacement
break;
}
else // don't need replacement
break;
}
if (ex == null) // help clean on way out
// 解决异样
ForkJoinTask.helpExpungeStaleExceptions();
else // rethrow
ForkJoinTask.rethrow(ex);
}
总之 deregisterWorker 办法从线程池里登记线程,清空 WorkQueue,同时更新 ctl,最初做可能的替换,依据线程池的状态决定是否找一个本人的替代者:
- 有闲暇线程,则唤醒一个
- 没有闲暇线程,再次尝试创立一个新的工作线程
deregisterWorker 线程解释分明了是为了帮忙大家残缺了解流程,但 registerWorker 胜利后的流程还没走完,咱得持续,有了 Worker,那就调用 wt.start()
干活吧
run
ForkJoinWorkerThread 继承自 Thread,调用 start() 办法后,天然要调用本人重写的 run() 办法
public void run() {if (workQueue.array == null) { // only run once
Throwable exception = null;
try {onStart();
//Work 开始工作,解决 workQueue 中的工作
pool.runWorker(workQueue);
} catch (Throwable ex) {exception = ex;} finally {
try {onTermination(exception);
} catch (Throwable ex) {if (exception == null)
exception = ex;
} finally {pool.deregisterWorker(this, exception);
}
}
}
}
办法的重点天然是进入到 runWorker
runWorker
runWorker 是很惯例的三部曲操作:
- scan: 通过扫描获取工作
- runTask:执行扫描到的工作
- awaitWork:没工作进入期待
具体请看正文
final void runWorker(WorkQueue w) {
// 初始化队列,并依据须要是否扩容为原来的 2 倍
w.growArray(); // allocate queue
int seed = w.hint; // initially holds randomization hint
int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
// 死循环更新偏移 r,为扫描工作作筹备
for (ForkJoinTask<?> t;;) {
// 扫描工作
if ((t = scan(w, r)) != null)
// 扫描到就执行工作
w.runTask(t);
// 没扫描到就期待,如果等也等不到工作,那就跳出循环别死等了
else if (!awaitWork(w, r))
break;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}
先来看 scan 办法
scan
ForkJoinPool 的工作窃取机制要来了,如何 steal 的,就藏在 scan 办法中
private ForkJoinTask<?> scan(WorkQueue w, int r) {WorkQueue[] ws; int m;
// 再次验证 workQueue[]数组的初始化状况
if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
// 获取以后扫描状态
int ss = w.scanState; // initially non-negative
// 又一个死循环,留神到进口地位就好
// 和后面逻辑相似,随机一个起始地位,并赋值给 k
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
int b, n; long c;
// 如果 k 槽位不为空
if ((q = ws[k]) != null) {
//base-top 小于零,并且工作 q 不为空
if ((n = (b = q.base) - q.top) < 0 &&
(a = q.array) != null) { // non-empty
// 获取 base 的偏移量,赋值给 i
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
// 从 base 端获取工作,和前文的形容的 steal 搭配上了,是从 base 端 steal
if ((t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i))) != null &&
q.base == b) {
// 是 active 状态
if (ss >= 0) {
// 更新 WorkQueue 中数组 i 索引地位为空,并且更新 base 的值
if (U.compareAndSwapObject(a, i, t, null)) {
q.base = b + 1;
//n<-1, 阐明以后队列还有残余工作,持续唤醒可能存在的其余线程
if (n < -1) // signal others
signalWork(ws, q);
// 间接返回工作
return t;
}
}
else if (oldSum == 0 && // try to activate
w.scanState < 0)
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
}
// 如果获取工作失败,则筹备换地位扫描
if (ss < 0) // refresh
ss = w.scanState;
r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
origin = k = r & m; // move and rescan
oldSum = checkSum = 0;
continue;
}
checkSum += b;
}
// k 始终在变,扫描到最初,如果等于 origin,阐明曾经扫描了一圈还没扫描到工作
if ((k = (k + 1) & m) == origin) { // continue until stable
if ((ss >= 0 || (ss == (ss = w.scanState))) &&
oldSum == (oldSum = checkSum)) {if (ss < 0 || w.qlock < 0) // already inactive
break;
// 筹备 inactive 当前工作队列
int ns = ss | INACTIVE; // try to inactivate
// 流动线程数 AC 减 1
long nc = ((SP_MASK & ns) |
(UC_MASK & ((c = ctl) - AC_UNIT)));
w.stackPred = (int)c; // hold prev stack top
U.putInt(w, QSCANSTATE, ns);
if (U.compareAndSwapLong(this, CTL, c, nc))
ss = ns;
else
w.scanState = ss; // back out
}
checkSum = 0;
}
}
}
return null;
}
如果顺利扫描到工作,那就要调用 runTask 办法来真正的运行这个工作了
runTask
马上就靠近假相了,steal 到工作了,就干点闲事吧
final void runTask(ForkJoinTask<?> task) {if (task != null) {
scanState &= ~SCANNING; // mark as busy
//Flag1: 记录以后的工作是偷来的,至于如何执行 task,是咱们写在 compute 办法中的,咱们一会看 doExec() 办法
(currentSteal = task).doExec();
U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
execLocalTasks();
ForkJoinWorkerThread thread = owner;
// 累加偷来的数量,亲兄弟明算帐啊,尽管算完也没啥实际意义
if (++nsteals < 0) // collect on overflow
transferStealCount(pool);
// 工作执行完后,就从新更新 scanState 为 SCANNING
scanState |= SCANNING;
if (thread != null)
thread.afterTopLevelExec();}
}
Flag1: doExec 办法才是真正执行工作的要害,它是链接咱们自定义 compute 办法的外围,来看 doExec 办法
doExec
局势一片大好,挺住,揭开 exec 的面纱,就看到实质了
//ForkJoinTask 中的形象办法,RecursiveTask 和 RecursiveAction 都重写了它
protected abstract boolean exec();
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {completed = exec();
} catch (Throwable rex) {return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
//RecursiveTask 重写的内容,终于看到咱们文章结尾 demo 中的 compute 了
protected final boolean exec() {result = compute();
return true;
}
到这里,咱们曾经看到实质了,绕了这么一大圈,终于和咱们本人重写的 compute 办法分割到了一起,真是不容易,然而 runWorker 三部曲还差最初一曲 awaitWork 没谱,咱们来看看
awaitWork
下面说的是 scan 到了工作,要是没有 scan 到工作,那就得将以后线程阻塞一下,具体标注在正文中,能够简略理解一下
private boolean awaitWork(WorkQueue w, int r) {if (w == null || w.qlock < 0) // w is terminating
return false;
for (int pred = w.stackPred, spins = SPINS, ss;;) {if ((ss = w.scanState) >= 0)
break;
else if (spins > 0) {
r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
if (r >= 0 && --spins == 0) { // randomize spins
WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
if (pred != 0 && (ws = workQueues) != null &&
(j = pred & SMASK) < ws.length &&
// 前驱工作队列还在
(v = ws[j]) != null && // see if pred parking
// 并且工作队列曾经激活,阐明工作来了了
(v.parker == null || v.scanState >= 0))
// 持续自旋等一会,别返回 false
spins = SPINS; // continue spinning
}
}
// 自旋之后,再次查看工作队列是否终止,若是,退出扫描
else if (w.qlock < 0) // recheck after spins
return false;
else if (!Thread.interrupted()) {
long c, prevctl, parkTime, deadline;
int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
if ((ac <= 0 && tryTerminate(false, false)) ||
(runState & STOP) != 0) // pool terminating
return false;
if (ac <= 0 && ss == (int)c) { // is last waiter
prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
int t = (short)(c >>> TC_SHIFT); // shrink excess spares
if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
return false; // else use timed wait
parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;}
else
prevctl = parkTime = deadline = 0L;
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport
w.parker = wt;
if (w.scanState < 0 && ctl == c) // recheck before park
U.park(false, parkTime);
U.putOrderedObject(w, QPARKER, null);
U.putObject(wt, PARKBLOCKER, null);
if (w.scanState >= 0)
break;
if (parkTime != 0L && ctl == c &&
deadline - System.nanoTime() <= 0L &&
U.compareAndSwapLong(this, CTL, c, prevctl))
return false; // shrink pool
}
}
return true;
}
到这里,ForkJoinPool 的残缺流程算是有个根本理解了,然而咱们后面讲的这些内容都是从 submission task 作为切入点的。刚刚聊到的 compute 办法,咱们依照分治算法范式写了本人的逻辑,具体请回看文中结尾的 demo,很要害的一点是,咱们在 compute 中调用了 fork 办法,这就给咱们理解 worker task 的机会了,持续来看 fork 办法
fork
Fork 办法的逻辑很简略,如果以后线程是 ForkJoinWorkerThread 类型,也就是说曾经通过上文注册的 Worker,那么间接调用 push 办法将 task 放到以后线程领有的 WorkQueue 中,否则就再调用 externalPush 重走咱们已上说的所有逻辑(你敢再走一遍吗?)
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
//push 办法很简略,这里就不再过多解释了
final void push(ForkJoinTask<?> task) {ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();}
}
有 fork 就有 join,持续看一下 join 办法()
join
join 的外围调用在 doJoin,然而看到这么多级联三元运算符,我慌了
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();}
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
//status,task 的运行状态
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();}
咱们将 doJoin 办法用咱们最相熟的 if/else 做个改变,是不是就恍然大悟了
private int doJoin() {
int s;
Thread t;
ForkJoinWorkerThread wt;
ForkJoinPool.WorkQueue w;
if((s = status) < 0) { // 有后果,间接返回
return s;
}else {if((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
// 如果是 ForkJoinWorkerThread Worker
if((w = (wt = (ForkJoinWorkerThread) t).workQueue).tryUnpush(this) // 相似下面提到的 scan,然而是专项尝试从本工作队列里取出期待的工作
// 取出了工作,就去执行它,并返回后果
&& (s = doExec()) < 0) {return s;}else {
// 也有可能别的线程把这个工作偷走了,那就执行外部期待办法
return wt.pool.awaitJoin(w, this, 0L);
}
}else {
// 如果不是 ForkJoinWorkerThread,执行内部期待办法
return externalAwaitDone();}
}
}
其中 awaitJoin 和 externalAwaitDone 都用到了 Helper(帮忙)和 Compensating(弥补)两种策略,这两种策略大家齐全能够自行浏览了,尤其是 awaitJoin 办法,强烈推荐大家自行浏览,其中 pop 的过程在这里,这里不再开展
到这里,无关 ForkJoinPool 相干的内容就算是完结了,为了让大家有个更好的了解 fork/join 机制,咱们还是画几张图解释一下
Fork/Join 图解
假如咱们的大工作是 Task(8), 最终被分治成可执行的最小单元是 Task(1)
依照分治思维拆分工作的整体指标就是这样滴:
从内部先提交一个大的 Task(8),将其放在偶数槽位中(请留神色彩对应)
不满足并行度,会创立 Worker 1 来扫描,并从 base 端窃取到工作 task(8),执行到 compute, fork
出两个 task(4), 并 push 到 WorkQueue 中
在执行工作时始终会确认是否满足并行度要求,如果没有就会持续创立新的 Worker,与此同时,也会持续 fork 工作,直到最小单元。Worker1 会从 top 端 pop 进去 task(4) 来持续 compute 和 fork,并从新 push 到 WorkQueue 中
task(2) 还不是最小单元,所以会持续 pop 出 task(2),并最终 fork 出两个 task(1) push 到 WorkQueue 中
task(1) 曾经是最小粒度了,能够间接 pop 进去执行,获取最终后果;在 Worker1 进行这些 pop 操作的同时,为了满足并行度要求也会创立的其余 Worker,比方 Worker 2,这时 Worker2 会从 Worker 1 所在队列的 base 端窃取工作
Worker 2 仍旧是依照这个规定进行 pop->fork,到最终能够 exec 工作,假如 Worker 1 的工作先执行完,要 join 后果,当 join task(4) 时,通过 hint 定位到是谁偷走了 task(4),这时顺藤摸瓜找到 Worker2,如果 Worker2 还有工作没执行完,Worker1 再窃取回来帮着执行,这样互帮互助,最终疾速实现工作
灵魂诘问
- 为什么说 ForkjoinPool 效率要更高?同时倡议应用 commonPool?
- JDK1.8 Stream 底层就充沛用到了 ForkJoinPool,你晓得还有哪里用到了 ForkJoinPool 了吗?
- ForkJoinPool 最多会有多少个槽位?
- 上面代码有人说不能充分利用 ForkJoinPool,多个 task 的提交要用 invokeAll,你晓得为什么吗?如果不必 invokeAll,要怎么应用 fork/join 呢?
protected Long compute() {if (工作足够小) {return cal();
}
SumTask subtask1 = new SumTask(...);
SumTask subtask2 = new SumTask(...);
// 别离对子工作调用 fork():
subtask1.fork();
subtask2.fork();
// 别离获取合并后果:
Long subresult1 = subtask1.join();
Long subresult2 = subtask2.join();
return subresult1 + subresult2;
}
总结
这又是一篇长文,很多小伙伴私下都倡议我将长文拆开,一方面读者好消化,另一方面我本人也在数量的体现上变得高产。几次想拆开,但好多文章拆开就失去了连续性(大家都有遗忘曲线)。过年没回老家,就有工夫撸文章了。为了更好的了解源码,文章的根底铺垫内容很多,看到这,你应该很累了,想要将更零散的知识点串起来,那就多看代码正文回味一下,而后一起膜拜 Doug Lea 吧
参考
- Java 并发编程实战
- https://www.liaoxuefeng.com/a…
- https://www.cnblogs.com/aniao…
- https://cloud.tencent.com/dev…