乐趣区

关于SegmentFault:Java中JUC扩展组件之Forkjoin

Fork/join介绍

Fork/join框架是 java7 提供的并行执行工作的框架, 是把大工作宰割成若干小工作, 最初汇总若干小工作的执行后果失去最终的后果。它的思维与 MapReduce 相似。Fork把一个大工作宰割成若干小工作,Join用于合并小工作的后果, 最初失去大框架的后果。次要采取工作窃取算法。

工作窃取 (work-stealing) 算法是指某个线程从其它队列窃取工作执行。

如果咱们须要做一个比拟大的工作,咱们能够把这个工作宰割为若干互不依赖的子工作,为了缩小线程间的竞争,于是把这些子工作别离放到不同的队列里,并为每个队列创立一个独自的线程来执行队列里的工作,线程和队列一一对应,比方 A 线程负责解决 A 队列里的工作。然而有的线程会先把本人队列里的工作干完,而其余线程对应的队列里还有工作期待解决。干完活的线程与其等着,不如去帮其余线程干活,于是它就去其余线程的队列里窃取一个工作来执行。而在这时它们会拜访同一个队列,所以为了缩小窃取工作线程和被窃取工作线程之间的竞争,通常会应用双端队列,被窃取工作线程永远从双端队列的头部拿工作执行,而窃取工作的线程永远从双端队列的尾部拿工作执行。

工作窃取算法的长处是充分利用线程进行并行计算,并缩小了线程间的竞争,其毛病是在某些状况下还是存在竞争,比方双端队列里只有一个工作时。并且耗费了更多的系统资源,比方创立多个线程和多个双端队列。

对于 Fork/Join 框架而言,当一个工作正在期待它应用 Join 操作创立的子工作完结时,执行这个工作的工作线程,寻找其余并未被执行的工作,并开始执行,通过这种形式,线程充分利用它们的运行工夫,来进步应用程序的性能。为了实现这个指标,Fork/Join 框架执行的工作有一些局限性:

  • 工作只能应用 Fork、Join 操作来作为同步机制,如果应用了其余同步机制,那他们在同步操作时,工作线程则不能执行其余工作。如:在框架的操作中,使工作进入睡眠,那么在这个睡眠期间内,正在执行这个工作的工作线程,将不会执行其余工作
  • 所执行的工作,不应该执行 IO 操作,如读和写数据文件
  • 工作不能抛出查看型异样,必须通过必要的代码解决它们

外围是两个类:ForkJoinTaskForkJoinPool。Pool 次要负责实现,包含下面所介绍的工作窃取算法,管理工作线程和提供对于工作的状态以及它们的执行信息;Task 次要提供在工作中,执行 Fork 与 Join 操作的机制。

Fork/join代码演示

package com.rumenz.task;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;


public class ForkJoinExample extends RecursiveTask<Integer> {

    public final static int threshold=2;
    private int start;
    private int end;

    public ForkJoinExample(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum=0;
        boolean b = (end - start) <= threshold;
        if(b){
            // 工作足够小的时候,间接计算,不进行决裂计算
            for (int i = start; i <=end ; i++) {sum+=i;}
        }else{int mid=(start+end)/2;
            // 持续决裂工作
            ForkJoinExample task1=new ForkJoinExample(start,mid);
            ForkJoinExample task2=new ForkJoinExample(mid+1,end);

            // 执行子工作
            task1.fork();
            task2.fork();

            // 期待工作执行完结合并其后果
            Integer m = task1.join();
            Integer n = task2.join();
            sum=m+n;

        }
        return sum;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 生成一个池
        ForkJoinPool forkJoinPool=new ForkJoinPool();
        
        ForkJoinTask task=new ForkJoinExample(1, 100000);
        ForkJoinTask<Integer> submit = forkJoinPool.submit(task);
        Integer sum = submit.get();
        System.out.println("最初的后果是:"+sum);

    }
}

通过这个例子让咱们再来进一步理解 ForkJoinTask,工作类继承 RecursiveTask,ForkJoinTask 与个别的工作的次要区别在于它须要实现 compute()办法,在这个办法里,首先须要判断工作是否足够小,如果足够小就间接执行工作。如果不足够小,就必须宰割成两个子工作,每个子工作在调用 fork()办法时,又会进入 compute()办法,看看以后子工作是否须要持续宰割成孙工作,如果不须要持续宰割,则执行以后子工作并返回后果。应用 join()办法会期待子工作执行完并失去其后果。

关注微信公众号:【入门小站】, 解锁更多知识点

退出移动版