乐趣区

ForkJoinPool实现原理A-Java-ForkJoin-Framework

1. 概述

是一个可以并行执行任务的线程池。可以处理一个可递归划分的任务并获取结果(分而治之的思想,父任务等待子任务执行完成并组装结果)。因为是多线程去执行任务,可以充分利用多核,提高 cpu 的利用率。那么他如何做构建管理任务队列,多线程如何去处理任务, 以及他的应用场景和性能瓶颈是什么?通过下面原理以及源码我们来进一步了解。

2.Fork/Join 介绍

为分治算法的并行实现。

Result solve(Problem problem) {if (problem is small)
        directly solve problem
  else {
        split problem into independent parts
        fork new subtasks to solve each part
        join all subtasks
        compose result from subresults
        }
}

fork 操作会启动一个新的任务。而 join 则是等待该任务阻塞完成。

如果问题较大。我们只需要将问题分解。fork 出去,然后通过 join 等待结果,最后计算结果返回。这里有个问题就是大部分任务做的事情就是分解任务,然后等待子任务执行。

3. 数据结构

1. 会创建一个 Worker 线程池,每个线程用来处理队列中的任务。

2. 所有的任务都是一个轻量级可执行的类(ForkJoinTask)。任务的方法就在工作线程运行的时候被调用处理。

工作窃取

对于 CPU,重要的可能就是进程和线程的调度。但是对于上层,考虑的应该是如何充分利用线程资源。这就需要进行合理的任务(工作量)调度。
对于工作窃取调度策略实现如下:

  • 每个线程都有自己的任务队列。
  • 队列是一个双端队列,可以 前后取
  • Fork 的子任务只会被丢进所在线程的队列
  • 工作线程通过 LIFO 获取任务
  • 任务队列没有任务,会尝试随机窃取一个队列的任务执行。FIFO
  • 工作线程调用 join 的时候,线程并不会阻塞,而是会去处理其他队列的任务。直到当前任务执行完成,返回结果
  • 如果工作线程没有任务处理(自己队列无任务,并且没有窃取到任务),会让其让出资源。阻塞线程。等待后续激活。对于工作线程会有一个顶层线程去激活它。

工作线程获取自己队列任务和窃取别人任务的方式不同。这能减少竞争。
fork 采用 LIFO,这保证了队列头部任务都会是更大的任务,尾部是分解出来的子任务。窃取采用 FIFO,窃取更大的任务有助于本次窃取的性价比很高。

如上所述,ForkJoinPool 有一个 WorkQueue 数组。每个数组的元素是一个双端队列(Dqueue)。WorkQueue 主要用来存储需要执行的任务。

需要注意的一点是,线程对应的 WorkQueue 在数组的下标都是奇数。那么大家会问,偶数索引对应的 WorkQueue 是什么。其实是用来存储直接向 ForkJoinPool 提交的(外部提交)任务。

双端队列

需要考虑的是队列的并发控制。
双端队列采用数组的形式,维护了一个 top 和 base 的指针。push、pop、take 操作都会通过移动指针去维护。对于 push 和 pop 都是同一个线程去执行。所以不会有并发问题。take 窃取过程可能会有多线程操作。所以需要加锁。

那么核心问题就是解决队列元素不足时 pop、push 和 take 的冲突。
通过 volatile 的 top 和 base 指针。我们可以计算出队列元素数量。显然当数量大于 1 的时候。不冲突。

对于 pop 操作会先递减 top:if(–top) >= base) …

对于 take 操作先递增 base:if(++base < top) …

我们只需要比较这两个索引来检测是否会导致队列为空。对于 pop,如果可能导致为空,加锁再判断。然后根据结果进行对应逻辑。如果是 take。直接回退。不需要二次判断。

4. 跟踪源码

通过上面介绍,我们了解线程如何获取任务。要进一步了解我们就需要深入到源代码中。首先我们用一个测试程序来进行分析。

public class ForkJoinCalculator {
    private ForkJoinPool pool;
    public ForkJoinCalculator() {
        // 也可以使用公用的 ForkJoinPool:// pool = ForkJoinPool.commonPool()
        pool = new ForkJoinPool();}
    public long sumUp(long[] numbers) {return pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
    }
    private static class SumTask extends RecursiveTask<Long> {private long[] numbers;
        private int from;
        private int to;
        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }
        @Override
        protected Long compute() {
            // 当需要计算的数字小于 6 时,直接计算结果
            if (to - from < 6) {
                long total = 0;
                for (int i = from; i <= to; i++) {total += numbers[i];
                }
                return total;
                // 否则,把任务一分为二,递归计算
            } else {int middle = (from + to) / 2;
                SumTask taskLeft = new SumTask(numbers, from, middle);
                SumTask taskRight = new SumTask(numbers, middle + 1, to);
                taskLeft.fork();
                taskRight.fork();
                return taskLeft.join() + taskRight.join();
            }
        }
    }
    public static void main(String[] args) {long[] numbers = LongStream.rangeClosed(1, 1000).toArray();
        ForkJoinCalculator calculator = new ForkJoinCalculator();
        System.out.println(calculator.sumUp(numbers)); // 打印结果 500500
    }
}

SumTask 的继承关系


ForkJoinTask 就是队列的任务。扩展了 Future。有两个抽象的实现。
RecuriveTask 是带返回值的。RecuriveAction 是无返回值的。我们根据需求实现即可。

compute 方法就是任务执行时回调用的方法。一般在这里我们会根据逻辑对任务进行分解和计算。

ForkJoinPool#invoke

public <T> T invoke(ForkJoinTask<T> task) {if (task == null)
        throw new NullPointerException();
    externalPush(task);
    return task.join();}

首先会调用 externalPush 方法,将任务丢进队列。
然后调用 join 方法。获取返回结果。

externalPush 方法

final void externalPush(ForkJoinTask<?> task) {WorkQueue[] ws; WorkQueue q; int m;
    int r = ThreadLocalRandom.getProbe();
    int rs = runState;
    if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
        (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
        U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a; int am, n, s;
        if ((a = q.array) != null &&
            (am = a.length - 1) > (n = (s = q.top) - q.base)) {int j = ((am & s) << ASHIFT) + ABASE;
            U.putOrderedObject(a, j, task);
            U.putOrderedInt(q, QTOP, s + 1);
            U.putIntVolatile(q, QLOCK, 0);
            if (n <= 1)
                signalWork(ws, q);
            return;
        }
        U.compareAndSwapInt(q, QLOCK, 1, 0);
    }
    externalSubmit(task);
}

这个方法主要逻辑就是将任务加入队列。如果队列数组为空或者获取到的队列当前不可用。会调用 externalSubmit 方法。否则获取到该队列。加锁。将任务丢到队列的头部(top),修改 top。

externalSubmit 则是 externalPush 的完整版。对于各种特殊情况。比如第一次初始化。

signalWork 方法

final void signalWork(WorkQueue[] ws, WorkQueue q) {
    long c; int sp, i; WorkQueue v; Thread p;
    while ((c = ctl) < 0L) {                       // too few active
        if ((sp = (int)c) == 0) {                  // no idle workers
            if ((c & ADD_WORKER) != 0L)            // too few workers
                tryAddWorker(c);
            break;
        }
        if (ws == null)                            // unstarted/terminated
            break;
        if (ws.length <= (i = sp & SMASK))         // terminated
            break;
        if ((v = ws[i]) == null)                   // terminating
            break;
        int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
        int d = sp - v.scanState;                  // screen CAS
        long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
        if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
            v.scanState = vs;                      // activate v
            if ((p = v.parker) != null)
                U.unpark(p);
            break;
        }
        if (q != null && q.base == q.top)          // no more work
            break;
    }
}

这个方法主要用来创建和唤醒 worker

这里遇到了 ctl 我们说明一下,他将一个 64 位分为了四段。每段 16 位。由高到低分别为 AC,TC,SS,ID


图片引用自 https://blog.csdn.net/u010841…

AC 为负说明,活跃的 worker 没有到达预期数量,需要激活或者创建。

1. 没有空闲的 worker 并且 worker 太少的话会创建,具体是调用 tryAddWorker 方法。

2. 如果有空闲 worker,获取到空闲 worker,解除 worker 阻塞。并且更新 sp。

通过 SS 我们可以进行判断是否存在空闲的 worker。ID 用来定位该 worker 对应队列的下标。
创建和注册 worker 逻辑相对容易就不多做解释。

线程执行任务

final void runWorker(WorkQueue w) {w.growArray();                   // allocate queue
    int seed = w.hint;               // initially holds randomization hint
    int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
    for (ForkJoinTask<?> t;;) {if ((t = scan(w, r)) != null)
            w.runTask(t);
        else if (!awaitWork(w, r))
            break;
        r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
    }
}

首先调用 scan 去窃取一个任务。

Scan 逻辑如下:

  • 随机获取一个队列,尝试窃取
  • 如果窃取到任务,并且当前任务所在队列为 active,出队返回,如果是 inactive,尝试激活它
  • 如果没有继续循环遍历队列。
  • 直到遍历完所有的队列。还没有窃取到任务的话,修改状态为未激活,并将当前 workerqueue 挂在栈顶。等待去激活它。

什么时候回激活工作线程:

  • 提交任务的时候会有 singlework 去尝试激活。
  • 窃取任务的时候,发现该队列有任务,但是该队列未激活。则去激活它。

runTask 方法

窃取到任务之后会调用该方法。通过调用 doExec 去执行。

final void runTask(ForkJoinTask<?> task) {if (task != null) {
        scanState &= ~SCANNING; // mark as busy
        (currentSteal = task).doExec();
        U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
        execLocalTasks();
        ForkJoinWorkerThread thread = owner;
        if (++nsteals < 0)      // collect on overflow
            transferStealCount(pool);
        scanState |= SCANNING;
        if (thread != null)
            thread.afterTopLevelExec();}
}

执行完窃取任务之后,会执行本地队列的任务。彻底执行完成后会继续上述操作。

如果没有窃取到任务,并且本地队列也没有待执行的任务。那么会调用 awaitWork。

awaitWork 方法

这个方法其实我没有细看,但是这个方法的逻辑对于性能影响比较大。
按道理来讲如果,没有任务。还让该线程自旋的话,对 cpu 会有一些浪费,甚至在任务不足的时候窃取会导致任务的竞争。

所以我们需要根据线程活跃已经数量状况。选择性的终止 worker 或者阻塞 worker。

如果当前线程数达到数量,并且活跃超过两个,并且当前在栈顶,释放当前线程。

fork 方法

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

如果当前线程为工作线程,直接将任务丢进队列
否则丢到公共线程池的队列。

join 方法

主要是调用 dojoin 方法。

如果当前线程为工作线程,并且该任务在当前线程 top 位置,则执行该任务。如果执行完成。则返回。

如果未完成,调用 awaitJoin 方法。通过任务的状态 status 判断任务的执行状态。
如果非工作线程。那么阻塞该线程等待任务完成。

awaitJoin 方法

这个方法逻辑相对复杂一点。

final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
    int s = 0;
    if (task != null && w != null) {
        ForkJoinTask<?> prevJoin = w.currentJoin;
        U.putOrderedObject(w, QCURRENTJOIN, task);
        CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
            (CountedCompleter<?>)task : null;
        for (;;) {if ((s = task.status) < 0)
                break;
            if (cc != null)
                helpComplete(w, cc, 0);
            else if (w.base == w.top || w.tryRemoveAndExec(task))
                helpStealer(w, task);
            if ((s = task.status) < 0)
                break;
            long ms, ns;
            if (deadline == 0L)
                ms = 0L;
            else if ((ns = deadline - System.nanoTime()) <= 0L)
                break;
            else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                ms = 1L;
            if (tryCompensate(w)) {task.internalWait(ms);
                U.getAndAddLong(this, CTL, AC_UNIT);
            }
        }
        U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
    }
    return s;
}

记录当前任务为 currentJoin。

1. 如果为 CountedCompleter,则调用 helpComplete。

2. 否则通过 w.tryRemoveAndExec。这个方法的作用:如果当前任务在当前队列。执行并移除它。

3. 如果当前队列任务为空或者调用 join 的任务没有在当前队列。调用 helpStealer 方法。

4. 另一种策略为阻塞当前线程,让新的线程去执行该任务

helpStealer 方法

1. 找到偷窃当前任务的队列。其中 v 就是找到的队列。

if ((v = ws[i = (h + k) & m]) != null) {if (v.currentSteal == subtask) {
        j.hint = i;
        break;
    }
    checkSum += v.base;
}

2. 如果当偷窃当前任务的队列存在任务,那么循环从窃取队列 base 获取任务帮助其执行。

执行过程中,如果 join 的任务执行完,会返回。否则一直处理。如果当前队列有新任务产生,也会直接返回

if (v.base == b) {if (t == null)             // stale
        break descent;
    if (U.compareAndSwapObject(a, i, t, null)) {
        v.base = b + 1;
        ForkJoinTask<?> ps = w.currentSteal;
        int top = w.top;
        do {U.putOrderedObject(w, QCURRENTSTEAL, t);
            t.doExec();        // clear local tasks too} while (task.status >= 0 &&
                 w.top != top &&
                 (t = w.pop()) != null);
        U.putOrderedObject(w, QCURRENTSTEAL, ps);
        if (w.base != w.top)
            return;            // can't further help
    }
}

3. 如果窃取队列没有任务要执行。那么他会根据窃取队列的 currentJoin 重复执行上述逻辑。注意如果在执行的过程中发现两个队列的 currentSteal 和 currentJoin 不是目标任务,会重新寻找目标任务的队列。

这里其实有个核心的思想,就是针对目标任务。只要和目标任务有关就会帮助执行,否则跳出来重新寻找和目标任务有关的队列。直到目标任务执行完成。

5.ForkJoinPool 和 ThreadPoolExecutor

这两个都是 juc 里面的线程池。但是实现原理有很大的区别 FJ 使用工作窃取算法。通过多个队列可以减少并发。而 TP 则是单个队列。

通过上面代码。我们知道 FJ 使用递归的思想,可以在任务执行的过程中将任务分解。而使用 TP 的时候,如果分解出来大量的任务,会导致更多的队列竞争。

FJ 对于大量的任务,可以充分利用资源,减少竞争,并且通过窃取算法。实现任务的负载均衡。可以说 FJ 完全是通过工作窃取来驱动任务完成的。
但是对于较少的任务。多队列以及线程频繁的窃取会导致性能的急剧下降。

还有就是对于阻塞时间比较久的任务。我们可能更适合使用 TP, 毕竟阻塞会占用线程资源。我们需要更多的线程去处理任务。并且阻塞会降低队列的竞争。

参考文章

http://gee.cs.oswego.edu/dl/p…
https://blog.csdn.net/u010841…

退出移动版