关于后端:一文看懂什么是forkjoin

36次阅读

共计 4809 个字符,预计需要花费 13 分钟才能阅读完成。

什么是 Fork/Join

Fork/Join 是 JUC 并发包下的一个并行处理框架,实现了 ExecutorService 接口的多线程处理器,它专为那些能够通过递归分解成更细小的工作而设计,最大化的利用多核处理器来进步应用程序的性能。

Fork/Join 的运行流程大抵如下所示:

须要留神的是,图里的次级子工作能够始终分上来,始终分到子工作足够小为止, 这里体现了分而治之(divide and conquer) 的算法思维。

工作窃取算法

工作窃取算法指的是在多线程执行不同工作队列的过程中,某个线程执行完本人队列的工作后从其余线程的工作队列里窃取工作来执行。

工作窃取流程如下图所示:

值得注意的是,当一个线程窃取另一个线程的时候,为了缩小两个工作线程之间的竞争,咱们通常应用双端队列来存储工作。被窃取的工作线程都从双端队列的头部拿工作执行,而窃取其余工作的线程从双端队列的尾部执行工作。

另外,当一个线程在窃取工作时要是没有其余可用的工作了,这个线程会进入阻塞状态以期待再次“工作”。

Fork/Join 实际

后面说 Fork/Join 框架简略来讲就是对工作的宰割与子工作的合并,所以要实现这个框架,先得有工作。在 Fork/Join 框架里提供了抽象类 ForkJoinTask 来实现工作。

ForkJoinTask

ForkJoinTask 是一个相似一般线程的实体,然而比一般线程轻量得多。

fork()办法: 应用线程池中的闲暇线程异步提交工作

public final ForkJoinTask<V> fork() {
    Thread t;
    // ForkJoinWorkerThread 是执行 ForkJoinTask 的专有线程,由 ForkJoinPool 治理
    // 先判断以后线程是否是 ForkJoin 专有线程,如果是,则将工作 push 到以后线程所负责的队列里去
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        // 如果不是则将线程退出队列
        // 没有显式创立 ForkJoinPool 的时候走这里,提交工作到默认的 common 线程池中
        ForkJoinPool.common.externalPush(this);
    return this;
}

其实 fork()只做了一件事,那就是把工作推入当前工作线程的工作队列里。

join()办法:期待解决工作的线程处理完毕,取得返回值。

咱们看下 join()的源码:

public final V join() {
    int s;
    // doJoin()办法来获取当前任务的执行状态
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        // 工作异样,抛出异样
        reportException(s);
    // 工作失常实现,获取返回值
    return getRawResult();}

/**
 * doJoin()办法用来返回当前任务的执行状态
 **/
private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    // 先判断工作是否执行结束,执行结束间接返回后果(执行状态)return (s = status) < 0 ? s :
    // 如果没有执行结束,先判断是否是 ForkJoinWorkThread 线程
    ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        // 如果是,先判断工作是否处于工作队列顶端(意味着下一个就执行它)// tryUnpush()办法判断工作是否处于当前工作队列顶端,是返回 true
        // doExec()办法执行工作
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        // 如果是处于顶端并且工作执行结束,返回后果
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        // 如果不在顶端或者在顶端却没未执行结束,那就调用 awitJoin()执行工作
        // awaitJoin():应用自旋使工作执行实现,返回后果
        wt.pool.awaitJoin(w, this, 0L) :
    // 如果不是 ForkJoinWorkThread 线程,执行 externalAwaitDone()返回工作后果
    externalAwaitDone();}

咱们在之前介绍过说 Thread.join()会使线程阻塞,而 ForkJoinPool.join()会使线程免于阻塞,上面是 ForkJoinPool.join()的流程图:

RecursiveAction 和 RecursiveTask

通常状况下,在创立工作的时候咱们个别不间接继承 ForkJoinTask,而是继承它的子类 RecursiveAction 和 RecursiveTask。

两个都是 ForkJoinTask 的子类,RecursiveAction 能够看做是无返回值的 ForkJoinTask,RecursiveTask 是有返回值的 ForkJoinTask。

此外,两个子类都有执行次要计算的办法 compute(),当然,RecursiveAction 的 compute()返回 void,RecursiveTask 的 compute()有具体的返回值。

ForkJoinPool

ForkJoinPool 是用于执行 ForkJoinTask 工作的执行(线程)池。

ForkJoinPool 治理着执行池中的线程和工作队列,此外,执行池是否还接受任务,显示线程的运行状态也是在这里解决。

咱们来大抵看下 ForkJoinPool 的源码:

@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {
    // 工作队列
    volatile WorkQueue[] workQueues;   
    
    // 线程的运行状态
    volatile int runState;  
    
    // 创立 ForkJoinWorkerThread 的默认工厂,能够通过构造函数重写
    public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
    
    // 专用的线程池,其运行状态不受 shutdown()和 shutdownNow()的影响
    static final ForkJoinPool common;
    
    // 公有构造方法,没有任何安全检查和参数校验,由 makeCommonPool 间接调用
    // 其余构造方法都是源自于此办法
    // parallelism: 并行度,// 默认调用 java.lang.Runtime.availableProcessors() 办法返回可用处理器的数量
    private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory, // 工作线程工厂
                         UncaughtExceptionHandler handler, // 回绝工作的 handler
                         int mode, // 同步模式
                         String workerNamePrefix) { // 线程名 prefix
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }

}

WorkQueue
双端队列,ForkJoinTask 寄存在这里。

runState
ForkJoinPool 的运行状态。SHUTDOWN 状态用正数示意,其余用 2 的幂次示意。

当工作线程在解决本人的工作队列时,会从队列首取工作来执行(FIFO);如果是窃取其余队列的工作时,窃取的工作位于所属工作队列的队尾(LIFO)。

ForkJoinPool 与传统线程池最显著的区别就是它保护了一个工作队列数组(volatile WorkQueue[] workQueues,ForkJoinPool 中的每个工作线程都保护着一个工作队列)。

Fork/Join 的应用

下面咱们说 ForkJoinPool 负责管理线程和工作,ForkJoinTask 实现 fork 和 join 操作,所以要应用 Fork/Join 框架就离不开这两个类了,只是在理论开发中咱们罕用 ForkJoinTask 的子类 RecursiveTask 和 RecursiveAction 来代替 ForkJoinTask。

上面咱们用一个计算斐波那契数列第 n 项的例子来看一下 Fork/Join 的应用:

斐波那契数列数列是一个线性递推数列,从第三项开始,每一项的值都等于前两项之和:

1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89······

如果设 f(n)为该数列的第 n 项(n∈N*),那么有:f(n) = f(n-1) + f(n-2)。

public class FibonacciTest {

    class Fibonacci extends RecursiveTask<Integer> {

        int n;

        public Fibonacci(int n) {this.n = n;}

        // 次要的实现逻辑都在 compute()里
        @Override
        protected Integer compute() {
            // 这里先假如 n >= 0
            if (n <= 1) {return n;} else {// f(n-1)
                Fibonacci f1 = new Fibonacci(n - 1);
                f1.fork();
                // f(n-2)
                Fibonacci f2 = new Fibonacci(n - 2);
                f2.fork();
                // f(n) = f(n-1) + f(n-2)
                return f1.join() + f2.join();
            }
        }
    }

    @Test
    public void testFib() throws ExecutionException, InterruptedException {ForkJoinPool forkJoinPool = new ForkJoinPool();
        System.out.println("CPU 核数:" + Runtime.getRuntime().availableProcessors());
        long start = System.currentTimeMillis();
        Fibonacci fibonacci = new Fibonacci(40);
        Future<Integer> future = forkJoinPool.submit(fibonacci);
        System.out.println(future.get());
        long end = System.currentTimeMillis();
        System.out.println(String.format("耗时:%d millis", end - start));
    }
}

下面例子的输入:

  • CPU 核数:4
  • 计算结果:102334155
  • 耗时:9490 ms
  • 须要留神的是,上述计算工夫复杂度为 O(2^n),随着 n 的增长计算效率会越来越低,这也是下面的例子中 n 不敢取太大的起因。

总结

并不是所有的工作都适宜 Fork/Join 框架,比方下面的例子工作划分过于细小反而体现不出效率。因为 Fork/Join 是应用多个线程合作来计算的,所以会有线程通信和线程切换的开销。

如果要计算的工作比较简单,间接应用单线程会更快一些。但如果要计算的货色比较复杂,计算机又是多核的状况下,就能够充分利用多核 CPU 来进步计算速度。

本文由 mdnice 多平台公布

正文完
 0