ForkJoinPool线程池最大的特点就是分叉(fork)合并(join),将一个大工作拆分成多个小工作,并行执行,再联合工作窃取模式(worksteal
)进步整体的执行效率,充分利用CPU资源。
一. 利用场景
ForkJoinPool应用分治算法,用绝对少的线程解决大量的工作,将一个大工作一拆为二,以此类推,每个子工作再拆分一半,直到达到最细颗粒度为止,即设置的阈值进行拆分,而后从最底层的工作开始计算,往上一层一层合并后果,简略的流程如下图:
从图中能够看出ForkJoinPool要先执行完子工作能力执行上一层工作,所以ForkJoinPool适宜在无限的线程数下实现有父子关系的工作场景,比方:疾速排序,二分查找,矩阵乘法,线性工夫抉择等场景,以及数组和汇合的运算。
上面是个简略的代码示例计算从1到1亿之间所有数字之和:
package com.javakk;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveTask;import java.util.stream.LongStream;/** * ForkJoinPool求和 * @author 老K */public class ForkJoinPoolTest { private static ForkJoinPool forkJoinPool; /** * 求和工作类继承RecursiveTask * ForkJoinTask一共有3个实现: * RecursiveTask:有返回值 * RecursiveAction:无返回值 * CountedCompleter:无返回值工作,实现工作后能够触发回调 */ private static class SumTask extends RecursiveTask<Long> { private long[] numbers; private int from; private int to; public SumTask(long[] numbers, int from, int to) { this.numbers = numbers; this.from = from; this.to = to; } /** * ForkJoin执行工作的外围办法 * @return */ @Override protected Long compute() { if (to - from < 10) { // 设置拆分的最细粒度,即阈值,如果满足条件就不再拆分,执行计算工作 long total = 0; for (int i = from; i <= to; i++) { total += numbers[i]; } return total; } else { // 否则持续拆分,递归调用 int middle = (from + to) / 2; SumTask taskLeft = new SumTask(numbers, from, middle); SumTask taskRight = new SumTask(numbers, middle + 1, to); taskLeft.fork(); taskRight.fork(); return taskLeft.join() + taskRight.join(); } } } public static void main(String[] args) { // 也能够jdk8提供的通用线程池ForkJoinPool.commonPool // 能够在构造函数内指定线程数 forkJoinPool = new ForkJoinPool(); long[] numbers = LongStream.rangeClosed(1, 100000000).toArray(); // 这里能够调用submit办法返回的future,通过future.get获取后果 Long result = forkJoinPool.invoke(new SumTask(numbers, 0, numbers.length - 1)); forkJoinPool.shutdown(); System.out.println("最终后果:"+result); System.out.println("沉闷线程数:"+forkJoinPool.getActiveThreadCount()); System.out.println("窃取工作数:"+forkJoinPool.getStealCount()); }}
输入后果(沉闷线程数和窃取工作会依据本地环境和工作执行状况变动):
最终后果:5000000050000000沉闷线程数:4窃取工作数:12
上例中在compute办法里拆分的最小粒度是10个元素,大家能够改成其余的值试下,会发现执行的效率差异很大,所以要留神拆分粒度对性能的影响。
ForkJoinPool外部的队列可能保障执行工作的程序,至于为什么它可能在无限的线程数量下实现十分多的工作,前面会讲到。
二. 与ThreadPoolExecutor原生线程池的区别
ForkJoinPool和ThreadPoolExecutor都实现了Executor
和ExecutorService
接口,都能够通过构造函数设置线程数,threadFactory
,能够查看ForkJoinPool.makeCommonPool()
办法的源码查看通用线程池的结构细节。
在内部结构上我感觉两个线程池最大的区别是在工作队列的设计上,如下图
ThreadPoolExecutor:
ForkJoinPool:
图上细节画的不谨严,但大抵能看出区别:
- ForkJoinPool每个线程都有本人的队列
- ThreadPoolExecutor共用一个队列
通过下面的代码示例能够看到应用ForkJoinPool能够在无限的线程数下来实现十分多的具备父子关系的工作,比方应用4个线程来实现超过2000万个工作。然而应用ThreadPoolExecutor是不可能的,因为ThreadPoolExecutor中的线程无奈抉择优先执行子工作,要实现2000万个具备父子关系的工作时,就须要2000万个线程,这样会导致ThreadPoolExecutor的工作队列撑满或创立的最大线程数把内存撑爆间接gg。
ForkJoinPool最适宜计算密集型工作,而且最好是非阻塞工作,之前的一篇文章:Java踩坑记系列之线程池 也说了线程池的不同应用场景和注意事项。
所以ForkJoinPool是ThreadPoolExecutor线程池的一种补充,是对计算密集型场景的增强。
三. 工作窃取的实现原理
第一节的代码示例输入结果显示沉闷线程是4个,但却实现了2000万个子工作,窃取工作是12个(窃取数跟拆分层级和计算复杂度无关),这是work steal
工作窃取的作用。
ForkJoinPool类中的WorkQueue正是实现工作窃取的队列,javadoc中的正文如下:
粗心是大多数操作都产生在工作窃取队列中(在嵌套类工作队列中)。这些是非凡模式的Deques,次要有push
,pop
,poll
操作。
Deque是双端队列(double ended queue缩写),头部和尾部任何一端都能够进行插入,删除,获取的操作,即反对FIFO(队列)也反对LIFO(栈)程序。
Deque接口的实现最常见的是LinkedList
,除此还有ArrayDeque
,ConcurrentLinkedDeque
等
工作窃取模式次要分以下几个步骤:
- 每个线程都有本人的双端队列
- 当调用fork办法时,将工作放进队列头部,线程以LIFO程序,应用push/pop形式解决队列中的工作
- 如果本人队列里的工作解决完后,会从其余线程保护的队列尾部应用poll的形式窃取工作,以达到充分利用CPU资源的目标
- 从尾部窃取能够缩小同原线程的竞争
- 当队列中剩最初一个工作时,通过cas解决原线程和窃取线程的竞争
流程大抵如下所示:
工作窃取便是ForkJoinPool线程池的劣势所在,在个别的线程池比方ThreadPoolExecutor中,如果一个线程正在执行的工作因为某种原因无奈持续运行,那么该线程会处于期待状态,包含singleThreadPool
,fixedThreadPool
,cachedThreadPool
这几种线程池。
而在ForkJoinPool中,那么线程会被动寻找其余尚未被执行的工作而后窃取过去执行,缩小线程等待时间。
JDK8中的并行流(parallelStream)性能是基于ForkJoinPool实现的,另外还有java.util.concurrent.CompletableFuture
异步回调future,外部应用的线程池也是ForkJoinPool。
文章起源:http://javakk.com/215.html