乐趣区

关于java:Java-并发之-ForkJoin-框架

什么是 Fork/Join 框架

Fork/Join 框架是一种在 JDK 7 引入的线程池,用于并行执行 把一个大工作拆成多个小工作并行执行,最终汇总每个小工作后果失去大工作后果 的特殊任务。通过其命名也很容易看出框架次要分为 ForkJoin 两个阶段,第一阶段 Fork 是把一个大工作拆分为多个子工作并行的执行,第二阶段 Join 是合并这些子工作的所有执行后果,最初失去大工作的后果。

这里不难发现其执行次要流程:首先判断一个工作是否足够小,如果工作足够小,则间接计算,否则,就拆分成几个更小的小工作别离计算,这个过程能够重复的拆分成一系列小工作。Fork/Join 框架是一种基于 分治 的算法,通过拆分大工作成多个独立的小工作,而后并行执行这些小工作,最初合并小工作的后果失去大工作的最终后果,通过并行计算以提高效率。

Fork/Join 框架应用示例

上面通过一个 计算列表中所有元素的总和 的示例来看看 Fork/Join 框架是如何应用的,总的思路是:将这个列表分成许多子列表,而后对每个子列表的元素进行求和,而后,咱们再计算所有这些值的总和就失去原始列表的和了。Fork/Join 框架中定义了 ForkJoinTask 来示意一个 Fork/Join 工作,其提供了 fork()join() 等操作,通常状况下,咱们并不需要间接继承这个 ForkJoinTask 类,而是应用框架提供的两个 ForkJoinTask 的子类:

  • RecursiveAction 用于示意 没有返回后果 Fork/Join 工作。
  • RecursiveTask 用于示意 有返回后果 Fork/Join 工作。

很显然,在这个示例中是须要返回后果的,能够定义 SumAction 类继承自 RecursiveTask,代码如下:

/**
 * @author mghio
 * @since 2021-07-25
 */
public class SumTask extends RecursiveTask<Long> {

  private static final int SEQUENTIAL_THRESHOLD = 50;

  private final List<Long> data;

  public SumTask(List<Long> data) {this.data = data;}

  @Override
  protected Long compute() {if (data.size() <= SEQUENTIAL_THRESHOLD) {long sum = computeSumDirectly();
      System.out.format("Sum of %s: %d\n", data.toString(), sum);
      return sum;
    } else {int mid = data.size() / 2;
      SumTask firstSubtask = new SumTask(data.subList(0, mid));
      SumTask secondSubtask = new SumTask(data.subList(mid, data.size()));
      // 执行子工作
      firstSubtask.fork();
      secondSubtask.fork();
      // 期待子工作执行实现,并获取后果
      long firstSubTaskResult = firstSubtask.join();
      long secondSubTaskResult = secondSubtask.join();
      return firstSubTaskResult + secondSubTaskResult;
    }
  }

  private long computeSumDirectly() {
    long sum = 0;
    for (Long l : data) {sum += l;}
    return sum;
  }

  public static void main(String[] args) {Random random = new Random();

    List<Long> data = random
        .longs(1_000, 1, 100)
        .boxed()
        .collect(Collectors.toList());

    ForkJoinPool pool = new ForkJoinPool();
    SumTask task = new SumTask(data);
    pool.invoke(task);

    System.out.println("Sum:" + pool.invoke(task));
  }
}

这里当列表大小小于 SEQUENTIAL_THRESHOLD 变量的值(阈值)时视为小工作,间接计算求和列表元素后果,否则再次拆分为小工作,运行后果如下:

通过这个示例代码能够发现,Fork/Join 框架 中 ForkJoinTask 工作与平时的个别工作的次要不同点在于:ForkJoinTask 须要实现形象办法 compute() 来定义计算逻辑,在这个办法里个别通用的实现模板是,首先先判断当前任务是否是小工作,如果是,就执行执行工作,如果不是小工作,则再次拆分为两个子工作,而后当每个子工作调用 fork() 办法时,会再次进入到 compute() 办法中,查看当前任务是否须要再拆分为子工作,如果曾经是小工作,则执行当前任务并返回后果,否则持续宰割,最初调用 join() 办法期待所有子工作执行实现并取得执行后果。伪代码如下:

if (problem is small) {directly solve problem.} else {
  Step 1. split problem into independent parts.
  Step 2. fork new subtasks to solve each part.
  Step 3. join all subtasks.
  Step 4. compose result from subresults.
}

Fork/Join 框架设计

Fork/Join 框架核心思想是把一个大工作拆分成若干个小工作,而后汇总每个小工作的后果最终失去大工作的后果,如果让你设计一个这样的框架,你会如何实现呢?(倡议思考一下),Fork/Join 框架的整个流程正如其名所示,分为两个步骤:

  1. 大工作宰割 须要有这么一个的类,用来将大工作拆分为子工作,可能一次拆分后的子工作还是比拟大,须要屡次拆分,直到拆分进去的子工作合乎咱们定义的小工作才完结。
  2. 执行工作并合并工作后果 第一步拆分进去的子工作别离寄存在一个个 双端队列 外面(P.S. 这里为什么要应用双端队列请看下文),而后每个队列启动一个线程从队列中获取工作执行。这些子工作的执行后果都会放到一个对立的队列中,而后再启动一个线程从这个队列中拿数据,最初合并这些数据返回。

Fork/Join 框架应用了如下两个类来实现以上两个步骤:

  • ForkJoinTask 类 在上文的实例中也有提到,示意 ForkJoin 工作,在应用框架时首先必须先定义工作,通常只须要继承自 ForkJoinTask 类的子类 RecursiveAction(无返回后果) 或者 RecursiveTask(有返回后果) 即可。
  • ForkJoinPool 从名字也能够猜到一二了,就是用来执行 ForkJoinTask 的线程池。大工作拆分出的子工作会增加到以后线程的 双端队列 的头部。

喜爱思考的你,心中想必会想到这么一种场景,当咱们须要实现一个大工作时,会先把这个大工作拆分为多个独立的子工作,这些子工作会放到独立的队列中,并为每个队列都创立一个独自的线程去执行队列里的工作,即这里线程和队列时一对一的关系,那么当有的线程可能会先把本人队列的工作执行实现了,而有的线程则没有执行实现,这就导致一些先执行完工作的线程干等了,这是个好问题。

既然是做并发的,必定要最大水平压迫计算机的性能,对于这种场景并发巨匠 Doug Lea 应用了工作窃取算法解决,应用 工作窃取算法 后,先实现本人队列中工作的线程会去其它线程的队列中”窃取“一个工作来执行,哈哈,一方有难,八方支援。然而此时这个线程和队列的持有线程会同时拜访同一个队列,所以为了 缩小窃取工作的线程和被窃取工作的线程之间的竞争 ForkJoin 抉择了 双端队列 这种数据结构,这样就能够依照这种规定执行工作了:被窃取工作的线程始终从队列头部获取工作并执行,窃取工作的线程应用从队列尾部获取工作执行。这个算法在绝大部分状况下都能够充分利用多线程进行并行计算,然而在双端队列里只有一个工作等极其状况下还是会存在肯定水平的竞争。

Fork/Join 框架实现原理

Fork/Join 框架的实现外围是 ForkJoinPool 类,该类的重要组成部分为 ForkJoinTask 数组和 ForkJoinWorkerThread 数组,其中 ForkJoinTask 数组用来寄存框架使用者给提交给 ForkJoinPool 的工作,ForkJoinWorkerThread 数组则负责执行这些工作。工作有如下四种状态:

  • NORMAL 已实现
  • CANCELLED 被勾销
  • SIGNAL 信号
  • EXCEPTIONAL 产生异样

上面来看看这两个类的外围办法实现原理,首先来看 ForkJoinTaskfork() 办法,源码如下:

办法对于 ForkJoinWorkerThread 类型的线程,首先会调用 ForkJoinWorkerThreadworkQueuepush() 办法 异步的去执行这个工作,而后马上返回后果。持续跟进 ForkJoinPoolpush() 办法,源码如下:

办法将当前任务增加到 ForkJoinTask 工作队列数组中,而后再调用 ForkJoinPoolsignalWork 办法创立或者唤醒一个工作线程来执行该工作。而后再来看看 ForkJoinTaskjoin() 办法,办法源码如下:

办法首先调用了 doJoin() 办法,该办法返回当前任务的状态,依据返回的工作状态做不同的解决:

  1. 已实现状态则间接返回后果
  2. 被勾销状态则间接抛出异样(CancellationException
  3. 产生异样状态则间接抛出对应的异样

持续跟进 doJoin() 办法,办法源码如下:

办法首先判断当前任务状态是否曾经执行实现,如果执行实现则间接返回工作状态。如果没有执行实现,则从工作数组中(workQueue)取出工作并执行,工作执行实现则设置工作状态为 NORMAL,如果出现异常则记录异样并设置工作状态为 EXCEPTIONAL(在 doExec() 办法中)。

总结

本文次要介绍了 Java 并发框架中的 Fork/Join 框架的基本原理和其应用的 工作窃取算法(work-stealing)、设计形式和局部实现源码。Fork/Join 框架在 JDK 的官网规范库中也有利用。比方 JDK 1.8+ 规范库提供的 Arrays.parallelSort(array) 能够进行并行排序,它的原理就是外部通过 Fork/Join 框架对大数组分拆进行并行排序,能够进步排序的速度,还有汇合中的 Collection.parallelStream() 办法底层也是基于 Fork/Join 框架实现的,最初就是定义小工作的阈值往往是须要通过测试验证能力正当给出,并且保障程序能够达到最好的性能。

退出移动版