共计 3536 个字符,预计需要花费 9 分钟才能阅读完成。
ForkJoinPool线程池最大的特点就是分叉 (fork) 合并 (join),将一个大工作拆分成多个小工作,并行执行,再联合工作窃取模式(worksteal
) 进步整体的执行效率,充分利用 CPU 资源。
一. 利用场景
ForkJoinPool应用分治算法,用绝对少的线程解决大量的工作,将一个大工作一拆为二,以此类推,每个子工作再拆分一半,直到达到最细颗粒度为止,即设置的阈值进行拆分,而后从最底层的工作开始计算,往上一层一层合并后果,简略的流程如下图:
从图中能够看出 ForkJoinPool 要先执行完子工作能力执行上一层工作,所以 ForkJoinPool 适宜在无限的线程数下实现有父子关系的工作场景,比方:疾速排序,二分查找,矩阵乘法,线性工夫抉择等场景,以及数组和汇合的运算。
上面是个简略的代码示例计算从 1 到 1 亿之间所有数字之和:
package com.javakk;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;
/**
* ForkJoinPool 求和
* @author 老 K
*/
public class ForkJoinPoolTest {
private static ForkJoinPool forkJoinPool;
/**
* 求和工作类继承 RecursiveTask
* ForkJoinTask 一共有 3 个实现:* RecursiveTask:有返回值
* RecursiveAction:无返回值
* CountedCompleter:无返回值工作,实现工作后能够触发回调
*/
private static class SumTask extends RecursiveTask<Long> {private long[] numbers;
private int from;
private int to;
public SumTask(long[] numbers, int from, int to) {
this.numbers = numbers;
this.from = from;
this.to = to;
}
/**
* ForkJoin 执行工作的外围办法
* @return
*/
@Override
protected Long compute() {if (to - from < 10) { // 设置拆分的最细粒度,即阈值,如果满足条件就不再拆分,执行计算工作
long total = 0;
for (int i = from; i <= to; i++) {total += numbers[i];
}
return total;
} else { // 否则持续拆分,递归调用
int middle = (from + to) / 2;
SumTask taskLeft = new SumTask(numbers, from, middle);
SumTask taskRight = new SumTask(numbers, middle + 1, to);
taskLeft.fork();
taskRight.fork();
return taskLeft.join() + taskRight.join();
}
}
}
public static void main(String[] args) {
// 也能够 jdk8 提供的通用线程池 ForkJoinPool.commonPool
// 能够在构造函数内指定线程数
forkJoinPool = new ForkJoinPool();
long[] numbers = LongStream.rangeClosed(1, 100000000).toArray();
// 这里能够调用 submit 办法返回的 future,通过 future.get 获取后果
Long result = forkJoinPool.invoke(new SumTask(numbers, 0, numbers.length - 1));
forkJoinPool.shutdown();
System.out.println("最终后果:"+result);
System.out.println("沉闷线程数:"+forkJoinPool.getActiveThreadCount());
System.out.println("窃取工作数:"+forkJoinPool.getStealCount());
}
}
输入后果(沉闷线程数和窃取工作会依据本地环境和工作执行状况变动):
最终后果:5000000050000000
沉闷线程数:4
窃取工作数:12
上例中在 compute 办法里拆分的最小粒度是 10 个元素,大家能够改成其余的值试下,会发现执行的效率差异很大,所以要留神拆分粒度对性能的影响。
ForkJoinPool 外部的队列可能保障执行工作的程序,至于为什么它可能在无限的线程数量下实现十分多的工作,前面会讲到。
二. 与 ThreadPoolExecutor 原生线程池的区别
ForkJoinPool 和 ThreadPoolExecutor 都实现了 Executor
和ExecutorService
接口,都能够通过构造函数设置线程数,threadFactory
,能够查看 ForkJoinPool.makeCommonPool()
办法的源码查看通用线程池的结构细节。
在内部结构上我感觉两个线程池最大的区别是在工作队列的设计上,如下图
ThreadPoolExecutor:
ForkJoinPool:
图上细节画的不谨严,但大抵能看出区别:
- ForkJoinPool 每个线程都有本人的队列
- ThreadPoolExecutor 共用一个队列
通过下面的代码示例能够看到应用 ForkJoinPool 能够在无限的线程数下来实现十分多的具备父子关系的工作,比方应用 4 个线程来实现超过 2000 万个工作。然而应用 ThreadPoolExecutor 是不可能的,因为 ThreadPoolExecutor 中的线程无奈抉择优先执行子工作,要实现 2000 万个具备父子关系的工作时,就须要 2000 万个线程,这样会导致 ThreadPoolExecutor 的工作队列撑满或创立的最大线程数把内存撑爆间接 gg。
ForkJoinPool 最适宜计算密集型工作,而且最好是非阻塞工作,之前的一篇文章:Java 踩坑记系列之线程池 也说了线程池的不同应用场景和注意事项。
所以ForkJoinPool 是 ThreadPoolExecutor 线程池的一种补充,是对计算密集型场景的增强。
三. 工作窃取的实现原理
第一节的代码示例输入结果显示沉闷线程是 4 个,但却实现了 2000 万个子工作,窃取工作是 12 个 (窃取数跟拆分层级和计算复杂度无关),这是work steal
工作窃取的作用。
ForkJoinPool 类中的 WorkQueue 正是实现工作窃取的队列,javadoc 中的正文如下:
粗心是大多数操作都产生在工作窃取队列中 (在嵌套类工作队列中)。这些是非凡模式的Deques,次要有push
,pop
,poll
操作。
Deque 是双端队列 (double ended queue 缩写),头部和尾部任何一端都能够进行插入,删除,获取的操作,即反对 FIFO(队列) 也反对 LIFO(栈)程序。
Deque 接口的实现最常见的是 LinkedList
,除此还有ArrayDeque
,ConcurrentLinkedDeque
等
工作窃取模式次要分以下几个步骤:
- 每个线程都有本人的双端队列
- 当调用 fork 办法时,将工作放进队列头部,线程以 LIFO 程序,应用 push/pop 形式解决队列中的工作
- 如果本人队列里的工作解决完后,会从其余线程保护的队列尾部应用 poll 的形式窃取工作,以达到充分利用 CPU 资源的目标
- 从尾部窃取能够缩小同原线程的竞争
- 当队列中剩最初一个工作时,通过 cas 解决原线程和窃取线程的竞争
流程大抵如下所示:
工作窃取便是 ForkJoinPool 线程池的劣势所在,在个别的线程池比方 ThreadPoolExecutor 中,如果一个线程正在执行的工作因为某种原因无奈持续运行,那么该线程会处于期待状态,包含 singleThreadPool
,fixedThreadPool
,cachedThreadPool
这几种线程池。
而在 ForkJoinPool 中,那么线程会被动寻找其余尚未被执行的工作而后窃取过去执行,缩小线程等待时间。
JDK8 中的并行流 (parallelStream) 性能是基于 ForkJoinPool 实现的,另外还有 java.util.concurrent.CompletableFuture
异步回调 future,外部应用的线程池也是 ForkJoinPool。
文章起源:http://javakk.com/215.html