ForkJoinPool线程池最大的特点就是分叉(fork)合并(join),将一个大工作拆分成多个小工作,并行执行,再联合工作窃取模式(worksteal)进步整体的执行效率,充分利用CPU资源。

一. 利用场景

ForkJoinPool应用分治算法,用绝对少的线程解决大量的工作,将一个大工作一拆为二,以此类推,每个子工作再拆分一半,直到达到最细颗粒度为止,即设置的阈值进行拆分,而后从最底层的工作开始计算,往上一层一层合并后果,简略的流程如下图:

从图中能够看出ForkJoinPool要先执行完子工作能力执行上一层工作,所以ForkJoinPool适宜在无限的线程数下实现有父子关系的工作场景,比方:疾速排序,二分查找,矩阵乘法,线性工夫抉择等场景,以及数组和汇合的运算。

上面是个简略的代码示例计算从1到1亿之间所有数字之和:

package com.javakk;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveTask;import java.util.stream.LongStream;/** * ForkJoinPool求和 * @author 老K */public class ForkJoinPoolTest {    private static ForkJoinPool forkJoinPool;    /**     * 求和工作类继承RecursiveTask     * ForkJoinTask一共有3个实现:     * RecursiveTask:有返回值     * RecursiveAction:无返回值     * CountedCompleter:无返回值工作,实现工作后能够触发回调     */    private static class SumTask extends RecursiveTask<Long> {        private long[] numbers;        private int from;        private int to;        public SumTask(long[] numbers, int from, int to) {            this.numbers = numbers;            this.from = from;            this.to = to;        }        /**         * ForkJoin执行工作的外围办法         * @return         */        @Override        protected Long compute() {            if (to - from < 10) { // 设置拆分的最细粒度,即阈值,如果满足条件就不再拆分,执行计算工作                long total = 0;                for (int i = from; i <= to; i++) {                    total += numbers[i];                }                return total;            } else { // 否则持续拆分,递归调用                int middle = (from + to) / 2;                SumTask taskLeft = new SumTask(numbers, from, middle);                SumTask taskRight = new SumTask(numbers, middle + 1, to);                taskLeft.fork();                taskRight.fork();                return taskLeft.join() + taskRight.join();            }        }    }    public static void main(String[] args) {        // 也能够jdk8提供的通用线程池ForkJoinPool.commonPool        // 能够在构造函数内指定线程数        forkJoinPool = new ForkJoinPool();        long[] numbers = LongStream.rangeClosed(1, 100000000).toArray();        // 这里能够调用submit办法返回的future,通过future.get获取后果        Long result = forkJoinPool.invoke(new SumTask(numbers, 0, numbers.length - 1));        forkJoinPool.shutdown();        System.out.println("最终后果:"+result);        System.out.println("沉闷线程数:"+forkJoinPool.getActiveThreadCount());        System.out.println("窃取工作数:"+forkJoinPool.getStealCount());    }}

输入后果(沉闷线程数和窃取工作会依据本地环境和工作执行状况变动):

最终后果:5000000050000000沉闷线程数:4窃取工作数:12

上例中在compute办法里拆分的最小粒度是10个元素,大家能够改成其余的值试下,会发现执行的效率差异很大,所以要留神拆分粒度对性能的影响。

ForkJoinPool外部的队列可能保障执行工作的程序,至于为什么它可能在无限的线程数量下实现十分多的工作,前面会讲到。

二. 与ThreadPoolExecutor原生线程池的区别

ForkJoinPool和ThreadPoolExecutor都实现了ExecutorExecutorService接口,都能够通过构造函数设置线程数,threadFactory,能够查看ForkJoinPool.makeCommonPool()办法的源码查看通用线程池的结构细节。

在内部结构上我感觉两个线程池最大的区别是在工作队列的设计上,如下图

ThreadPoolExecutor:

ForkJoinPool:

图上细节画的不谨严,但大抵能看出区别:

  • ForkJoinPool每个线程都有本人的队列
  • ThreadPoolExecutor共用一个队列

通过下面的代码示例能够看到应用ForkJoinPool能够在无限的线程数下来实现十分多的具备父子关系的工作,比方应用4个线程来实现超过2000万个工作。然而应用ThreadPoolExecutor是不可能的,因为ThreadPoolExecutor中的线程无奈抉择优先执行子工作,要实现2000万个具备父子关系的工作时,就须要2000万个线程,这样会导致ThreadPoolExecutor的工作队列撑满或创立的最大线程数把内存撑爆间接gg。

ForkJoinPool最适宜计算密集型工作,而且最好是非阻塞工作,之前的一篇文章:Java踩坑记系列之线程池 也说了线程池的不同应用场景和注意事项。

所以ForkJoinPool是ThreadPoolExecutor线程池的一种补充,是对计算密集型场景的增强。

三. 工作窃取的实现原理

第一节的代码示例输入结果显示沉闷线程是4个,但却实现了2000万个子工作,窃取工作是12个(窃取数跟拆分层级和计算复杂度无关),这是work steal工作窃取的作用。

ForkJoinPool类中的WorkQueue正是实现工作窃取的队列,javadoc中的正文如下:

粗心是大多数操作都产生在工作窃取队列中(在嵌套类工作队列中)。这些是非凡模式的Deques,次要有pushpoppoll操作。

Deque是双端队列(double ended queue缩写),头部和尾部任何一端都能够进行插入,删除,获取的操作,即反对FIFO(队列)也反对LIFO(栈)程序。

Deque接口的实现最常见的是LinkedList,除此还有ArrayDequeConcurrentLinkedDeque

工作窃取模式次要分以下几个步骤:

  1. 每个线程都有本人的双端队列
  2. 当调用fork办法时,将工作放进队列头部,线程以LIFO程序,应用push/pop形式解决队列中的工作
  3. 如果本人队列里的工作解决完后,会从其余线程保护的队列尾部应用poll的形式窃取工作,以达到充分利用CPU资源的目标
  4. 从尾部窃取能够缩小同原线程的竞争
  5. 当队列中剩最初一个工作时,通过cas解决原线程和窃取线程的竞争

流程大抵如下所示:

工作窃取便是ForkJoinPool线程池的劣势所在,在个别的线程池比方ThreadPoolExecutor中,如果一个线程正在执行的工作因为某种原因无奈持续运行,那么该线程会处于期待状态,包含singleThreadPoolfixedThreadPoolcachedThreadPool这几种线程池。

而在ForkJoinPool中,那么线程会被动寻找其余尚未被执行的工作而后窃取过去执行,缩小线程等待时间。

JDK8中的并行流(parallelStream)性能是基于ForkJoinPool实现的,另外还有java.util.concurrent.CompletableFuture异步回调future,外部应用的线程池也是ForkJoinPool。

文章起源:http://javakk.com/215.html