乐趣区

关于程序员:Java并发工具ForkJoin原理

咱们始终讲,并发编程能够分为三个层面的问题,别离是分工、合作和互斥,当你关注于工作的时候,你会发现你的视角曾经从并发编程的细节中跳进去了,你利用的更多的是事实世界的思维模式,类比的往往是事实世界里的分工,所以我把线程池、Future、CompletableFuture 和 CompletionService 都列到了分工外面。

上面我用事实世界里的工作流程图形容了并发编程畛域的简略并行任务、聚合工作和批量并行任务,辅以这些流程图,置信你肯定能将你的思维模式转换到事实世界里来。

从上到下,顺次为简略并行任务、聚合工作和批量并行任务示意图

下面提到的简略并行、聚合、批量并行这三种任务模型,基本上可能笼罩日常工作中的并发场景了,但还是不够全面,因为还有一种“分治”的任务模型没有笼罩到。分治 ,顾名思义,即分而治之,是一种解决简单问题的思维办法和模式;具体来讲,指的是 把一个简单的问题分解成多个类似的子问题,而后再把子问题分解成更小的子问题,直到子问题简略到能够间接求解。实践上来讲,解决每一个问题都对应着一个工作,所以对于问题的分治,实际上就是对于工作的分治。

分治思维在很多畛域都有宽泛的利用,例如算法畛域有分治算法(归并排序、疾速排序都属于分治算法,二分法查找也是一种分治算法);大数据畛域出名的计算框架 MapReduce 背地的思维也是分治。既然分治这种任务模型如此广泛,那 Java 显然也须要反对,Java 并发包里提供了一种叫做 Fork/Join 的并行计算框架,就是用来反对分治这种任务模型的。

分治任务模型

这里你须要先深刻理解一下分治任务模型,分治任务模型可分为两个阶段:一个阶段是 工作合成 ,也就是将工作迭代地合成为子工作,直至子工作能够间接计算出后果;另一个阶段是 后果合并,即逐层合并子工作的执行后果,直至取得最终后果。下图是一个简化的分治任务模型图,你能够对照着了解。

简版分治任务模型图

在这个分治任务模型里,工作和合成后的子工作具备相似性,这种相似性往往体现在工作和子工作的算法是雷同的,然而计算的数据规模是不同的。具备这种相似性的问题,咱们往往都采纳递归算法。

Fork/Join 的应用

Fork/Join 是一个并行计算的框架,次要就是用来反对分治任务模型的,这个计算框架里的 Fork 对应的是分治任务模型里的工作合成,Join 对应的是后果合并 。Fork/Join 计算框架次要蕴含两局部,一部分是 分治工作的线程池 ForkJoinPool,另一部分是 分治工作 ForkJoinTask。这两局部的关系相似于 ThreadPoolExecutor 和 Runnable 的关系,都能够了解为提交工作到线程池,只不过分治工作有本人独特类型 ForkJoinTask。

ForkJoinTask 是一个抽象类,它的办法有很多,最外围的是 fork()办法和 join()办法,其中 fork()办法会异步地执行一个子工作,而 join()办法则会阻塞以后线程来期待子工作的执行后果。ForkJoinTask 有两个子类——RecursiveAction 和 RecursiveTask,通过名字你就应该能晓得,它们都是用递归的形式来解决分治工作的。这两个子类都定义了形象办法 compute(),不过区别是 RecursiveAction 定义的 compute()没有返回值,而 RecursiveTask 定义的 compute()办法是有返回值的。这两个子类也是抽象类,在应用的时候,须要你定义子类去扩大。

接下来咱们就来实现一下,看看如何用 Fork/Join 这个并行计算框架计算斐波那契数列(上面的代码源自 Java 官网示例)。首先咱们须要创立一个分治工作线程池以及计算斐波那契数列的分治工作,之后通过调用分治工作线程池的 invoke() 办法来启动分治工作。因为计算斐波那契数列须要有返回值,所以 Fibonacci 继承自 RecursiveTask。分治工作 Fibonacci 须要实现 compute()办法,这个办法外面的逻辑和一般计算斐波那契数列十分相似,区别之处在于计算 Fibonacci(n - 1) 应用了异步子工作,这是通过 f1.fork() 这条语句实现的。

static void main(String[] args){
  // 创立分治工作线程池
  ForkJoinPool fjp =
    new ForkJoinPool(4);
  // 创立分治工作
  Fibonacci fib =
    new Fibonacci(30);
  // 启动分治工作
  Integer result =
    fjp.invoke(fib);
  // 输入后果
  System.out.println(result);
}
// 递归工作
static class Fibonacci extends
    RecursiveTask<Integer>{
  final int n;
  Fibonacci(int n){this.n = n;}
  protected Integer compute(){if (n <= 1)
      return n;
    Fibonacci f1 =
      new Fibonacci(n - 1);
    // 创立子工作
    f1.fork();
    Fibonacci f2 =
      new Fibonacci(n - 2);
    // 期待子工作后果,并合并后果
    return f2.compute() + f1.join();
  }
}

ForkJoinPool 工作原理

Fork/Join 并行计算的外围组件是 ForkJoinPool,所以上面咱们就来简略介绍一下 ForkJoinPool 的工作原理。

ThreadPoolExecutor 实质上是一个生产者 - 消费者模式的实现,外部有一个工作队列,这个工作队列是生产者和消费者通信的媒介;ThreadPoolExecutor 能够有多个工作线程,然而这些工作线程都共享一个工作队列。

ForkJoinPool 实质上也是一个生产者 - 消费者的实现,然而更加智能,你能够参考上面的 ForkJoinPool 工作原理图来了解其原理。ThreadPoolExecutor 外部只有一个工作队列,而 ForkJoinPool 外部有多个工作队列,当咱们通过 ForkJoinPool 的 invoke()或者 submit()办法提交工作时,ForkJoinPool 依据肯定的路由规定把工作提交到一个工作队列中,如果工作在执行过程中会创立出子工作,那么子工作会提交到工作线程对应的工作队列中。

如果工作线程对应的工作队列空了,是不是就没活儿干了呢?不是的,ForkJoinPool 反对一种叫做“工作窃取”的机制,如果工作线程闲暇了,那它能够“窃取”其余工作工作队列里的工作,例如下图中,线程 T2 对应的工作队列曾经空了,它能够“窃取”线程 T1 对应的工作队列的工作。如此一来,所有的工作线程都不会闲下来了。

ForkJoinPool 中的工作队列采纳的是双端队列,工作线程失常获取工作和“窃取工作”别离是从工作队列不同的端生产,这样能防止很多不必要的数据竞争。咱们这里介绍的仅仅是简化后的原理,ForkJoinPool 的实现远比咱们这里介绍的简单,如果你感兴趣,倡议去看它的源码。

ForkJoinPool 工作原理图

模仿 MapReduce 统计单词数量

学习 MapReduce 有一个入门程序,统计一个文件外面每个单词的数量,上面咱们来看看如何用 Fork/Join 并行计算框架来实现。

咱们能够先用二分法递归地将一个文件拆分成更小的文件,直到文件里只有一行数据,而后统计这一行数据里单词的数量,最初再逐级汇总后果,你能够对照后面的简版分治任务模型图来了解这个过程。

思路有了,咱们马上来实现。上面的示例程序用一个字符串数组 String[] fc 来模仿文件内容,fc 外面的元素与文件外面的行数据一一对应。要害的代码在 compute() 这个办法外面,这是一个递归办法,前半部分数据 fork 一个递归工作去解决(要害代码 mr1.fork()),后半局部数据则在当前任务中递归解决(mr2.compute())。

static void main(String[] args){String[] fc = {"hello world",
          "hello me",
          "hello fork",
          "hello join",
          "fork join in world"};
  // 创立 ForkJoin 线程池
  ForkJoinPool fjp =
      new ForkJoinPool(3);
  // 创立工作
  MR mr = new MR(fc, 0, fc.length);
  // 启动工作
  Map<String, Long> result =
      fjp.invoke(mr);
  // 输入后果
  result.forEach((k, v)->
    System.out.println(k+":"+v));
}
//MR 模仿类
static class MR extends
  RecursiveTask<Map<String, Long>> {private String[] fc;
  private int start, end;
  // 构造函数
  MR(String[] fc, int fr, int to){
    this.fc = fc;
    this.start = fr;
    this.end = to;
  }
  @Override protected
  Map<String, Long> compute(){if (end - start == 1) {return calc(fc[start]);
    } else {int mid = (start+end)/2;
      MR mr1 = new MR(fc, start, mid);
      mr1.fork();
      MR mr2 = new MR(fc, mid, end);
      // 计算子工作,并返回合并的后果
      return merge(mr2.compute(),
          mr1.join());
    }
  }
  // 合并后果
  private Map<String, Long> merge(
      Map<String, Long> r1,
      Map<String, Long> r2) {
    Map<String, Long> result =
        new HashMap<>();
    result.putAll(r1);
    // 合并后果
    r2.forEach((k, v) -> {Long c = result.get(k);
      if (c != null)
        result.put(k, c+v);
      else
        result.put(k, v);
    });
    return result;
  }
  // 统计单词数量
  private Map<String, Long>
      calc(String line) {
    Map<String, Long> result =
        new HashMap<>();
    // 宰割单词
    String [] words =
        line.split("\\s+");
    // 统计单词数量
    for (String w : words) {Long v = result.get(w);
      if (v != null)
        result.put(w, v+1);
      else
        result.put(w, 1L);
    }
    return result;
  }
}

总结

Fork/Join 并行计算框架次要解决的是分治工作。分治的核心思想是“分而治之”:将一个大的工作拆分成小的子工作去解决,而后再把子工作的后果聚合起来从而失去最终后果。这个过程十分相似于大数据处理中的 MapReduce,所以你能够把 Fork/Join 看作单机版的 MapReduce。

Fork/Join 并行计算框架的外围组件是 ForkJoinPool。ForkJoinPool 反对工作窃取机制,可能让所有线程的工作量根本平衡,不会呈现有的线程很忙,而有的线程很闲的情况,所以性能很好。Java 1.8 提供的 Stream API 外面并行流也是以 ForkJoinPool 为根底的。不过须要你留神的是,默认状况下所有的并行流计算都共享一个 ForkJoinPool,这个共享的 ForkJoinPool 默认的线程数是 CPU 的核数;如果所有的并行流计算都是 CPU 密集型计算的话,齐全没有问题,然而如果存在 I / O 密集型的并行流计算,那么很可能会因为一个很慢的 I / O 计算而拖慢整个零碎的性能。所以 倡议用不同的 ForkJoinPool 执行不同类型的计算工作

本文由 mdnice 多平台公布

退出移动版