关于java:线程池ForkJoinPool简介

46次阅读

共计 3536 个字符,预计需要花费 9 分钟才能阅读完成。

ForkJoinPool线程池最大的特点就是分叉 (fork) 合并 (join),将一个大工作拆分成多个小工作,并行执行,再联合工作窃取模式(worksteal) 进步整体的执行效率,充分利用 CPU 资源。

一. 利用场景

ForkJoinPool应用分治算法,用绝对少的线程解决大量的工作,将一个大工作一拆为二,以此类推,每个子工作再拆分一半,直到达到最细颗粒度为止,即设置的阈值进行拆分,而后从最底层的工作开始计算,往上一层一层合并后果,简略的流程如下图:

从图中能够看出 ForkJoinPool 要先执行完子工作能力执行上一层工作,所以 ForkJoinPool 适宜在无限的线程数下实现有父子关系的工作场景,比方:疾速排序,二分查找,矩阵乘法,线性工夫抉择等场景,以及数组和汇合的运算。

上面是个简略的代码示例计算从 1 到 1 亿之间所有数字之和:

package com.javakk;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

/**
 * ForkJoinPool 求和
 * @author 老 K
 */
public class ForkJoinPoolTest {

    private static ForkJoinPool forkJoinPool;

    /**
     * 求和工作类继承 RecursiveTask
     * ForkJoinTask 一共有 3 个实现:* RecursiveTask:有返回值
     * RecursiveAction:无返回值
     * CountedCompleter:无返回值工作,实现工作后能够触发回调
     */
    private static class SumTask extends RecursiveTask<Long> {private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }

        /**
         * ForkJoin 执行工作的外围办法
         * @return
         */
        @Override
        protected Long compute() {if (to - from < 10) { // 设置拆分的最细粒度,即阈值,如果满足条件就不再拆分,执行计算工作
                long total = 0;
                for (int i = from; i <= to; i++) {total += numbers[i];
                }
                return total;
            } else { // 否则持续拆分,递归调用
                int middle = (from + to) / 2;
                SumTask taskLeft = new SumTask(numbers, from, middle);
                SumTask taskRight = new SumTask(numbers, middle + 1, to);
                taskLeft.fork();
                taskRight.fork();
                return taskLeft.join() + taskRight.join();
            }
        }
    }

    public static void main(String[] args) {
        // 也能够 jdk8 提供的通用线程池 ForkJoinPool.commonPool
        // 能够在构造函数内指定线程数
        forkJoinPool = new ForkJoinPool();
        long[] numbers = LongStream.rangeClosed(1, 100000000).toArray();
        // 这里能够调用 submit 办法返回的 future,通过 future.get 获取后果
        Long result = forkJoinPool.invoke(new SumTask(numbers, 0, numbers.length - 1));
        forkJoinPool.shutdown();
        System.out.println("最终后果:"+result);
        System.out.println("沉闷线程数:"+forkJoinPool.getActiveThreadCount());
        System.out.println("窃取工作数:"+forkJoinPool.getStealCount());
    }
}

输入后果(沉闷线程数和窃取工作会依据本地环境和工作执行状况变动):

最终后果:5000000050000000
沉闷线程数:4
窃取工作数:12

上例中在 compute 办法里拆分的最小粒度是 10 个元素,大家能够改成其余的值试下,会发现执行的效率差异很大,所以要留神拆分粒度对性能的影响。

ForkJoinPool 外部的队列可能保障执行工作的程序,至于为什么它可能在无限的线程数量下实现十分多的工作,前面会讲到。

二. 与 ThreadPoolExecutor 原生线程池的区别

ForkJoinPool 和 ThreadPoolExecutor 都实现了 ExecutorExecutorService接口,都能够通过构造函数设置线程数,threadFactory,能够查看 ForkJoinPool.makeCommonPool() 办法的源码查看通用线程池的结构细节。

在内部结构上我感觉两个线程池最大的区别是在工作队列的设计上,如下图

ThreadPoolExecutor:

ForkJoinPool:

图上细节画的不谨严,但大抵能看出区别:

  • ForkJoinPool 每个线程都有本人的队列
  • ThreadPoolExecutor 共用一个队列

通过下面的代码示例能够看到应用 ForkJoinPool 能够在无限的线程数下来实现十分多的具备父子关系的工作,比方应用 4 个线程来实现超过 2000 万个工作。然而应用 ThreadPoolExecutor 是不可能的,因为 ThreadPoolExecutor 中的线程无奈抉择优先执行子工作,要实现 2000 万个具备父子关系的工作时,就须要 2000 万个线程,这样会导致 ThreadPoolExecutor 的工作队列撑满或创立的最大线程数把内存撑爆间接 gg。

ForkJoinPool 最适宜计算密集型工作,而且最好是非阻塞工作,之前的一篇文章:Java 踩坑记系列之线程池 也说了线程池的不同应用场景和注意事项。

所以ForkJoinPool 是 ThreadPoolExecutor 线程池的一种补充,是对计算密集型场景的增强。

三. 工作窃取的实现原理

第一节的代码示例输入结果显示沉闷线程是 4 个,但却实现了 2000 万个子工作,窃取工作是 12 个 (窃取数跟拆分层级和计算复杂度无关),这是work steal 工作窃取的作用。

ForkJoinPool 类中的 WorkQueue 正是实现工作窃取的队列,javadoc 中的正文如下:

粗心是大多数操作都产生在工作窃取队列中 (在嵌套类工作队列中)。这些是非凡模式的Deques,次要有pushpoppoll 操作。

Deque 是双端队列 (double ended queue 缩写),头部和尾部任何一端都能够进行插入,删除,获取的操作,即反对 FIFO(队列) 也反对 LIFO(栈)程序。

Deque 接口的实现最常见的是 LinkedList,除此还有ArrayDequeConcurrentLinkedDeque

工作窃取模式次要分以下几个步骤:

  1. 每个线程都有本人的双端队列
  2. 当调用 fork 办法时,将工作放进队列头部,线程以 LIFO 程序,应用 push/pop 形式解决队列中的工作
  3. 如果本人队列里的工作解决完后,会从其余线程保护的队列尾部应用 poll 的形式窃取工作,以达到充分利用 CPU 资源的目标
  4. 从尾部窃取能够缩小同原线程的竞争
  5. 当队列中剩最初一个工作时,通过 cas 解决原线程和窃取线程的竞争

流程大抵如下所示:

工作窃取便是 ForkJoinPool 线程池的劣势所在,在个别的线程池比方 ThreadPoolExecutor 中,如果一个线程正在执行的工作因为某种原因无奈持续运行,那么该线程会处于期待状态,包含 singleThreadPoolfixedThreadPoolcachedThreadPool 这几种线程池。

而在 ForkJoinPool 中,那么线程会被动寻找其余尚未被执行的工作而后窃取过去执行,缩小线程等待时间。

JDK8 中的并行流 (parallelStream) 性能是基于 ForkJoinPool 实现的,另外还有 java.util.concurrent.CompletableFuture 异步回调 future,外部应用的线程池也是 ForkJoinPool。

文章起源:http://javakk.com/215.html

正文完
 0