关于后端:一文看懂什么是forkjoin

什么是Fork/Join

Fork/Join 是JUC并发包下的一个并行处理框架,实现了ExecutorService接口的多线程处理器,它专为那些能够通过递归分解成更细小的工作而设计,最大化的利用多核处理器来进步应用程序的性能。

Fork/Join的运行流程大抵如下所示:

须要留神的是,图里的次级子工作能够始终分上来,始终分到子工作足够小为止,这里体现了分而治之(divide and conquer) 的算法思维。

工作窃取算法

工作窃取算法指的是在多线程执行不同工作队列的过程中,某个线程执行完本人队列的工作后从其余线程的工作队列里窃取工作来执行。

工作窃取流程如下图所示:

值得注意的是,当一个线程窃取另一个线程的时候,为了缩小两个工作线程之间的竞争,咱们通常应用双端队列来存储工作。被窃取的工作线程都从双端队列的头部拿工作执行,而窃取其余工作的线程从双端队列的尾部执行工作。

另外,当一个线程在窃取工作时要是没有其余可用的工作了,这个线程会进入阻塞状态以期待再次“工作”。

Fork/Join 实际

后面说Fork/Join框架简略来讲就是对工作的宰割与子工作的合并,所以要实现这个框架,先得有工作。在Fork/Join框架里提供了抽象类ForkJoinTask来实现工作。

ForkJoinTask

ForkJoinTask是一个相似一般线程的实体,然而比一般线程轻量得多。

fork()办法:应用线程池中的闲暇线程异步提交工作

public final ForkJoinTask<V> fork() {
    Thread t;
    // ForkJoinWorkerThread是执行ForkJoinTask的专有线程,由ForkJoinPool治理
    // 先判断以后线程是否是ForkJoin专有线程,如果是,则将工作push到以后线程所负责的队列里去
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        // 如果不是则将线程退出队列
        // 没有显式创立ForkJoinPool的时候走这里,提交工作到默认的common线程池中
        ForkJoinPool.common.externalPush(this);
    return this;
}

其实fork()只做了一件事,那就是把工作推入当前工作线程的工作队列里。

join()办法:期待解决工作的线程处理完毕,取得返回值。

咱们看下join()的源码:

public final V join() {
    int s;
    // doJoin()办法来获取当前任务的执行状态
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        // 工作异样,抛出异样
        reportException(s);
    // 工作失常实现,获取返回值
    return getRawResult();
}

/**
 * doJoin()办法用来返回当前任务的执行状态
 **/
private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    // 先判断工作是否执行结束,执行结束间接返回后果(执行状态)
    return (s = status) < 0 ? s :
    // 如果没有执行结束,先判断是否是ForkJoinWorkThread线程
    ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        // 如果是,先判断工作是否处于工作队列顶端(意味着下一个就执行它)
        // tryUnpush()办法判断工作是否处于当前工作队列顶端,是返回true
        // doExec()办法执行工作
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        // 如果是处于顶端并且工作执行结束,返回后果
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        // 如果不在顶端或者在顶端却没未执行结束,那就调用awitJoin()执行工作
        // awaitJoin():应用自旋使工作执行实现,返回后果
        wt.pool.awaitJoin(w, this, 0L) :
    // 如果不是ForkJoinWorkThread线程,执行externalAwaitDone()返回工作后果
    externalAwaitDone();
}

咱们在之前介绍过说Thread.join()会使线程阻塞,而ForkJoinPool.join()会使线程免于阻塞,上面是ForkJoinPool.join()的流程图:

RecursiveAction和RecursiveTask

通常状况下,在创立工作的时候咱们个别不间接继承ForkJoinTask,而是继承它的子类RecursiveAction和RecursiveTask。

两个都是ForkJoinTask的子类,RecursiveAction能够看做是无返回值的ForkJoinTask,RecursiveTask是有返回值的ForkJoinTask。

此外,两个子类都有执行次要计算的办法compute(),当然,RecursiveAction的compute()返回void,RecursiveTask的compute()有具体的返回值。

ForkJoinPool

ForkJoinPool是用于执行ForkJoinTask工作的执行(线程)池。

ForkJoinPool治理着执行池中的线程和工作队列,此外,执行池是否还接受任务,显示线程的运行状态也是在这里解决。

咱们来大抵看下ForkJoinPool的源码:

@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {
    // 工作队列
    volatile WorkQueue[] workQueues;   
    
    // 线程的运行状态
    volatile int runState;  
    
    // 创立ForkJoinWorkerThread的默认工厂,能够通过构造函数重写
    public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
    
    // 专用的线程池,其运行状态不受shutdown()和shutdownNow()的影响
    static final ForkJoinPool common;
    
    // 公有构造方法,没有任何安全检查和参数校验,由makeCommonPool间接调用
    // 其余构造方法都是源自于此办法
    // parallelism: 并行度,
    // 默认调用java.lang.Runtime.availableProcessors() 办法返回可用处理器的数量
    private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory, // 工作线程工厂
                         UncaughtExceptionHandler handler, // 回绝工作的handler
                         int mode, // 同步模式
                         String workerNamePrefix) { // 线程名prefix
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }

}

WorkQueue
双端队列,ForkJoinTask寄存在这里。

runState
ForkJoinPool的运行状态。SHUTDOWN状态用正数示意,其余用2的幂次示意。

当工作线程在解决本人的工作队列时,会从队列首取工作来执行(FIFO);如果是窃取其余队列的工作时,窃取的工作位于所属工作队列的队尾(LIFO)。

ForkJoinPool与传统线程池最显著的区别就是它保护了一个工作队列数组(volatile WorkQueue[] workQueues,ForkJoinPool中的每个工作线程都保护着一个工作队列)。

Fork/Join的应用

下面咱们说ForkJoinPool负责管理线程和工作,ForkJoinTask实现fork和join操作,所以要应用Fork/Join框架就离不开这两个类了,只是在理论开发中咱们罕用ForkJoinTask的子类RecursiveTask 和RecursiveAction来代替ForkJoinTask。

上面咱们用一个计算斐波那契数列第n项的例子来看一下Fork/Join的应用:

斐波那契数列数列是一个线性递推数列,从第三项开始,每一项的值都等于前两项之和:

1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89······

如果设f(n)为该数列的第n项(n∈N*),那么有:f(n) = f(n-1) + f(n-2)。

public class FibonacciTest {

    class Fibonacci extends RecursiveTask<Integer> {

        int n;

        public Fibonacci(int n) {
            this.n = n;
        }

        // 次要的实现逻辑都在compute()里
        @Override
        protected Integer compute() {
            // 这里先假如 n >= 0
            if (n <= 1) {
                return n;
            } else {
                // f(n-1)
                Fibonacci f1 = new Fibonacci(n - 1);
                f1.fork();
                // f(n-2)
                Fibonacci f2 = new Fibonacci(n - 2);
                f2.fork();
                // f(n) = f(n-1) + f(n-2)
                return f1.join() + f2.join();
            }
        }
    }

    @Test
    public void testFib() throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        System.out.println("CPU核数:" + Runtime.getRuntime().availableProcessors());
        long start = System.currentTimeMillis();
        Fibonacci fibonacci = new Fibonacci(40);
        Future<Integer> future = forkJoinPool.submit(fibonacci);
        System.out.println(future.get());
        long end = System.currentTimeMillis();
        System.out.println(String.format("耗时:%d millis", end - start));
    }
}

下面例子的输入:

  • CPU核数:4
  • 计算结果:102334155
  • 耗时:9490 ms
  • 须要留神的是,上述计算工夫复杂度为O(2^n),随着n的增长计算效率会越来越低,这也是下面的例子中n不敢取太大的起因。

总结

并不是所有的工作都适宜Fork/Join框架,比方下面的例子工作划分过于细小反而体现不出效率。因为Fork/Join是应用多个线程合作来计算的,所以会有线程通信和线程切换的开销。

如果要计算的工作比较简单,间接应用单线程会更快一些。但如果要计算的货色比较复杂,计算机又是多核的状况下,就能够充分利用多核CPU来进步计算速度。

本文由mdnice多平台公布

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理