ForkJoin框架之CountedCompleter工作线程及并行流

14次阅读

共计 23593 个字符,预计需要花费 59 分钟才能阅读完成。

前言

在前面的文章 ”ForkJoin 框架之 ForkJoinTask” 中梳理了 ForkJoin 框架的简要运行格架和异常处理流程, 显然要理解 ForkJoin 框架的调度, 包含工作窃取等思想, 需要去 ForkJoinPool 中了解, 而对于 ForkJoinTask 的拓展和使用则需要了解它的一些子类, 前文中偶尔会提到 ForkJoinTask 的一个子类:CountedCompleter, 直译为计数的完成器.

前文也说过,JAVA8 的并行流其实就是基于了 ForkJoin 框架实现, 因此并行流其实就在使用我们前面提到的工作窃取和分治思想. 为了方便对于 ForkJoinTask 的理解, 本文将详述 CountedCompleter(同时在 ForkJoinPool 中也需要了解它), 以及前文提到的工作线程 ForkJoinWorkerThread, 并简单看一看并行流.

CountedCompleter 源码

根据 doug 的注释,CoutedCompleter 是一个特殊的 ForkJoinTask, 它会在触发完成动作时, 检查有没有挂起 action, 若没有则执行一个完成动作. 这个概念有些抽象, 必须结合源码和源码作者给出的示例加以理解, 同样的, 理解了它, 也就理解了 CountedCompleter 的扩展类的实现方式, 从而能阅读懂有关的源码(如并行流中涉及到运行集拆分, 结果合并, 运算调度等源码).

它也是一个抽象类, 基于 ForkJoinTask 的 exec 函数进行了若干扩展.

public abstract class CountedCompleter<T> extends ForkJoinTask<T> 

// 任务的完成者, 很明显这是一个全局的栈结构(暂时这么理解吧, 其实也不太严格).
final CountedCompleter<?> completer;
// 重要字段, 代表完成前挂起的任务数量, 用 volatile 修饰.
volatile int pending;
// 带有 completer 的构造器.
protected CountedCompleter(CountedCompleter<?> completer) {this.completer = completer;}
// 不带 completer 的构造器
protected CountedCompleter() {this.completer = null;}
// 抽象的 compute 方法, 它是类似 ForkJoinTask 的扩展方式.
public abstract void compute();
// 重写的 exec 方法
protected final boolean exec() {
    // 直接调用 compute 方法并返回 false. 回到 ForkJoinTask 类中的 doExec 方法, 可以看到
    // 调用了 exec 后若得到 true 值, 将会执行 setCompletion(NORMAL)动作. 且该动作将在首次唤醒等待结果的线程.
    // 此处 return 了 false, 将不去执行上述操作. 详情参考上篇文章.
    compute();
    return false;
}

以上是 CountedCompleter 的签名, 字段, 构造器和核心的抽象方法 compute, 其实整个 CountedCompleter 就是在围着这点东西转, 首先看一看与 ForkJoinTask 的结合.

显然,CountedCompleter 简单重写了 ForkJoinTask 的 exec 方法简单调用抽象的 compute 方法并返回 false, 当出现异常时, 流程不变, 但当 compute 方式正常完成的情况, 将不可能进行父类后续的设置完成和唤醒操作. 因此它必须由 CountedCompleter 自定义的完成.

而 CountedCompleter 也确实暴露了一些公有函数, 但是调用的时机却要用户继承它之后决定. 我们先来继续一些辅助源码并理解 Completer 的设计理念, 稍后再来看它的完成方法.

//onCompletion 勾子方法, 默认空实现.
//CountedCompleter 在 tryComplete 方法中会在符合完成的第一个条件 (无挂起任务) 的情况下执行它.
//complete 方法也会对它有无条件地调用.
// 关于这两个方法稍后详述.
// 它的实现取决于要实现的操作, 并行流中的一些 ops 会在此处进行一些中间结果处理, 比如结果集的合并(reduce 操作).
public void onCompletion(CountedCompleter<?> caller) {
}

// 重写 ForkJoinTask 中的方法. 上篇源码分享文章中提过, 在 ForkJoinTask 的 setExceptionalCompletion 会调用 internalPropagateException
// 传递异常, 而且是个空实现, 而在 CountedCompleter 中实现了该方法, 并在内部调用 onExceptionalCompletion
void internalPropagateException(Throwable ex) {
    CountedCompleter<?> a = this, s = a;
    // 循环判断每一个 task 是否要传递异常给它的 completer
    // 无方法体的 while 循环. 道格大神的代码神迹.
    while (a.onExceptionalCompletion(ex, s) &&
            // 要传递给 completer 且具备 completer 且 completer 还不是完成态(正常或非正常)
           (a = (s = a).completer) != null && a.status >= 0 &&
            // 则令 completer 去记录异常完成, 若记录成功则进入下一轮循环.
           a.recordExceptionalCompletion(ex) == EXCEPTIONAL)
        ;
    // 因为 onExceptionalCompletion 固定返回 true, 若没有中间完成的任务, 直到最后一个 completer, 也就是 root,
    //root 不具备 completer, 将中断循环.
}

// 异常完成勾子方法.
// 按上一节的概念, 当 ForkJoinTask 执行出错, 即 exec->compute 出错时, 最终会调到此勾子. 或当手动 completeExceptionally 或 cancel 时.
public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller) {
    // 直接返回 true, 显然也是一个供扩展的方法. 返回 true 代表异常应该传递给 this 的 completer.
    return true;
}

// 返回 completer
public final CountedCompleter<?> getCompleter() {return completer;}

// 返回挂起任务数量.
public final int getPendingCount() {return pending;}

// 设置挂起任务数量
public final void setPendingCount(int count) {pending = count;}

// 原子地为挂起任务数量添加 delta
public final void addToPendingCount(int delta) {U.getAndAddInt(this, PENDING, delta);
}

   // 原子地将当前挂起任务数量从 expected 更改到 count
public final boolean compareAndSetPendingCount(int expected, int count) {return U.compareAndSwapInt(this, PENDING, expected, count);
}

// 将当前任务的挂起数量原子减至 0.
public final int decrementPendingCountUnlessZero() {
    int c;
    do {} while ((c = pending) != 0 &&
                 !U.compareAndSwapInt(this, PENDING, c, c - 1));
    return c;
}

// 返回 root completer. 逻辑很简单.
public final CountedCompleter<?> getRoot() {
    CountedCompleter<?> a = this, p;
    while ((p = a.completer) != null)
        a = p;
    return a;
}

以上是几个工具函数, 逻辑也很简单, 仅有一处可能留有疑问: 完成态 / 异常态是如何传递的.

现在大家应该理解为什么 ForkJoinTask 要将 internalPropagateException 置为空实现了, 显然, 对于不同方式的实现, 确实需要不同的传递行为.CountedCompleter 保存了一个类似 ” 栈结构 ” 的任务链, 虽然提前讲到栈底即为 root 任务(当然 root 在底部还是顶部本身不重要), 显然任何一个子任务出现了问题, 与它关联的父任务的行为显然要有一个明确的由子类定义的规则.

我们看到在重写的 internalPropagateException 方法中, 不停地判断当前任务是否要将异常信号传递给链上的下一个任务(on 方法始终返回 true, 没关系我们可以在子类中重写), 然后让未完成的 completer 去记录同一个异常 ex.

那么问题来了, 只要 completer 已完成过(正常完成过异常完成或取消), 显然 while 循环中断,completer 和它的后续 completer 将不会被处理(1). 同样, 若传递异常的任务本身就是另一个或几个任务的 completer, 它的异常信息显然不会反向传递(2).

对于问题(1), 显然如果后续的 completer 已出现过异常, 必然也会走一遍同样的逻辑, 传递给后面的 completer, 如果它正常完成, 也必然要有相应向后传递的行为, 否则无法解决(1), 我们接下来即论述相关方法.

对于问题 (2), 显然问题(1) 中描述的情况与此有所交集, 如果我们建立了一个 CountedCompleter 任务, 并在 compute 方法中大肆 fork 子任务入队,fork 之后不等子任务完成, 也不获取子任务的执行结果, 直接将父任务 setCompletion 或者 setExceptionalCompletion, 子任务还是会继续执行的.

为了便于理解, 我们继续来看与任务的完成有关的方法.

// 尝试完成根任务或减少栈链下游的某一个 completer 的挂起数(包含它自身).
public final void tryComplete() {
    //1. 初始用 a 保存 this, 后续为当前操作任务, 用 s 保存 a.
    CountedCompleter<?> a = this, s = a;
    for (int c;;) {//2. 第一次进入或在 6 造成竞态的某一次循环中,a(this 或 this 的 completer 链中的某一个)的的挂起任务数为 0, 代表它挂起的任务都完成了.
        if ((c = a.pending) == 0) {
            //3.a 的勾子方法, 若已经运行过 4, 且判断条件为假未能到 5 并在下一次循环重新回到 3 的情况,a!= s 且 a 是 s 的 completer,
            // 在对 onCompletion 重写时, 可以根据 this 与参数是否相等进行判断, 如并行流聚合时可以根据这个条件进行结果集的合并.
            a.onCompletion(s);
            //4. 将 a 指向自己的 completer,s 指向原来的 a.
            if ((a = (s = a).completer) == null) {
                //5. 原来 a 的 completer 不存在, 即 a 不是 root, 不需要再传递了, 让 root 进行 quietlyComplete 并返回.
                // 此时说明整条链上的 competer 挂起任务全部是 0.
                s.quietlyComplete();
                return;
            }
            // 隐藏的 7. 当原 a 的 completer 存在 (a 不是 root) 的情况, 继续对该 complter 判断挂起任务数或尝试减 1, 对下一个元素开启下一轮循环.
        }
        //6. 对 this 的 completer 栈的某一次循环时发现了挂起任务数不为 0 的, 则对该 completer 的挂起数减 1,
        // 表示它挂起的任务完成了一个, 并返回. 若在此时恰好出现了竞态, 另一条链上的任务抢先减一, 则当前
        // 的 a 要进入下一循环, 它可能会在 2 处判断通过, 进入到链上的下一个 completer 的传播逻辑.
        else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
            return;
    }
}

// 基本等效于 tryComplete, 只是不执行 onCompletion,tryComplete 会在判断链上某个 completer 的挂起任务数是 0 立即执行 onCompletion.
public final void propagateCompletion() {
    CountedCompleter<?> a = this, s = a;
    for (int c;;) {if ((c = a.pending) == 0) {if ((a = (s = a).completer) == null) {s.quietlyComplete();
                return;
            }
        }
        else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
            return;
    }
}


//complete 方法, 逻辑简单, 丝毫不考虑挂起数, 直接执行当前 task 的几个完成方法, 并尝试对 completer 进行 tryComplete.
// 它不改变自己的挂起任务数, 但会让 completer 对栈上的其他 completer 或自身尝试减少挂起数或完成 root.
public void complete(T rawResult) {
    CountedCompleter<?> p;
    setRawResult(rawResult);// 使用参数设置为当前任务的结果, 尽管它为空方法.
    onCompletion(this);// 直接调用 onCompletion 勾子.
    quietlyComplete();// 安静地将 status 置为 NORMAL.
    if ((p = completer) != null)
        // 自己不改变自身挂起数, 也不尝试完成 root, 但让 completer 尝试去向下执行这些操作.
        p.tryComplete();}

// 没办法单独理解这个方法名. 官方注释是和 nextComplete 放置在循环中使用.
public final CountedCompleter<?> firstComplete() {for (int c;;) {if ((c = pending) == 0)
            //1. 当前 task 没有挂起任务数, 则返回它.
            return this;
        else if (U.compareAndSwapInt(this, PENDING, c, c - 1))
            //2. 否则尝试减少一个挂起任务数并返回 null. 但当出现竞态时, 可能导致未能进入 2 而在下一次循环进入 1.
            return null;
    }
}

// 结合前面的 firstComplete 互相理解, 它会对当前任务判断是否有 completer, 有则对该 completer 进行 firstComplete,
// 否则将当前任务安静完成并返回 null.
// 故结果只能返回 null 或 completer
public final CountedCompleter<?> nextComplete() {
    CountedCompleter<?> p;
    if ((p = completer) != null)
        // 有 completer 且 completer 已无挂起任务数, 则返回 completer,
        // 有 completer 且 completer 有挂起任务数, 则尝试对该任务数减一并返回 null. 出现竞态则可能返回该 completer.
        return p.firstComplete();
    else {
        // 无 completer, 安静完成当前任务并返回 null.
        quietlyComplete();
        return null;
    }
}

// 等同于 getRoot().quietlyComplete()
public final void quietlyCompleteRoot() {for (CountedCompleter<?> a = this, p;;) {if ((p = a.completer) == null) {a.quietlyComplete();
            return;
        }
        a = p;
    }
}


// 如果当前任务未完成, 尝试去出栈执行, 并处理至多给定数量的其他未处理任务, 且对这些未处理任务
// 来说, 当前任务处于它们的完成路径上(即这些任务是 completer 栈链的前置任务), 实现特殊的工作窃取.
public final void helpComplete(int maxTasks) {
    Thread t; ForkJoinWorkerThread wt;
    if (maxTasks > 0 && status >= 0) {if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            // 当前线程是 ForkJoinWorkerThread, 尝试执行当前任务并尝试从线程的工作队列中尝试帮助前置任务执行.
            (wt = (ForkJoinWorkerThread)t).pool.
                helpComplete(wt.workQueue, this, maxTasks);
        else
            // 使用 common 池的 externalHelpComplete 方法.
            ForkJoinPool.common.externalHelpComplete(this, maxTasks);
    }
}

上一段代码总体逻辑不难, 有以下几点总结:

1. 显然 tryComplete 方法在调用后的最终结果只有两个: 自己或 completer 链前方的某一个 completer 的挂起任务数减 1(1), 自己或 completer 链前方某一个 completer(root)的 quietlyComplete 被执行 (2). 简单来说, 就是让 root 进行 quietlyComplete(链上每一个挂起任务数都是 0) 或让链上的某一个 completer 减少一个挂起任务.

2.tryComplete 方法只会对 root 进行 quietlyComplete, 进而 setComplete(NORMAL), 对于链上的其他任务, 最多会帮助挂起数减一, 而不会把它们置为完成态, 但是线程池在执行任务时, 或者直接对一个链上的 completer 进行 invoke,doExec 甚至 get 等操作时, 这些方法会将该中间 completer 进行 setComplete.

3. 每一个 CountedCompleter 都可能有自己的 completer 栈链, 每一个 CountedCompleter 也可以位于其他 CountedCompleter 的栈链上且上游不唯一而下游唯一一(倒树形), 任何一条栈链只能有一个 root,root 的 completer 为 null.

4. 从 tryComplete 方法来看正常运行情况下的规则, 每一个 CountedCompleter 的 tryComplete 只能向前影响到链上的另一个 completer, 因为实现数量的增加方法有好几处, 用户在实现时, 随时可能将一些 completer 的数量设置成任意的数, 故可以出现前面 tryComplete 注释中隐藏的 7 的情况, 即存在一个 completer, 它的下一个 completer 的挂起数是 0, 它却能将下下个 completer 安静完成或将其挂起数减一, 即跨无挂起数节点传递.

5. 前面列出的 helpComplete 方法是 CountedCompleter 的特殊工作窃取方法(或者也不能叫作窃取, 因为非 common 池情况窃取的是自己线程的任务,common 池则依赖于一个探测值), 具体的窃取细节在 ForkJoinPool 中, 将在后面的文章中论述, 但简单的逻辑已经在注释中描述清楚, 把它归到这一块, 也是因为它与前面描述的逻辑有所纠葛.124 提到了 tryComplete 的向前影响结果, 而在实际的应用中, 我们可能会有各种各样的情景,ForkJoin 框架无法阻止我们对 ForkJoinTask 的 exec 函数进行任意式的扩展, 也无法阻止我们对 CountedCompleter 的 compute 任意扩展, 那么如何在我们任意拓展的情景下保持效率和健壮? 比如下面这个使用场景:

a. 建立一种 ForkJoinTask, 直接继承 CountedCompleter 并重写 compute 方法, 则它可以运行在 ForkJoinPool 中.

b. 我们接下来在 compute 方法中多次根据计算结果集的大小进行拆分并递归 fork 子任务入池, 父任务成为子任务的 completer, 同时 compute 方法自身也负责不可拆分的计算逻辑, 并在自身这一块计算结束后, 可能等待所有 fork 入池的子任务结束, 也可能不等待子任务, 直接结束父任务, 让线程空出来做其他的事.

c. 所有子任务结束后, 使用一个合并函数合并子任务的结果集和自身的结果, 并作为最终的结果. 然后 tryComplete(如果 b 中使用了 join, 或者判断当前任务是 root).

显然,b 中 fork 出的子任务, 也同样要执行 bc 的逻辑. 那么可能出现这样的情况:

不同的父任务子任务在 ForkJoinPool 最初始压入当前工作线程的队列中, 但随时可能被其他工作线程甚至外部线程偷去执行.

父任务抢先抢得运行资源, 运行完自己计算的部分, 而入池的子任务及子孙任务有大量未完成.

难道父任务的执行线程就这样干等? 在前一篇文章中说过,ForkJoin 框架适宜多计算, 轻 io, 轻阻塞的情况, 且本身就是为了避免线程忙的忙死饿的饿死, 因此每个任务等待子任务执行结束是不可取的, 这或许也是为什么有了 ForkJoinTask, 却还要有 CountedCompleter 的原因之一吧.

若我们在任何每一个任务中只是单纯地将该分出去的子任务 fork 入池并执行自己那一部分, 并不让当前线程 join 子任务呢?(事实上不 join 子任务恰好可以将当前线程的资源腾出来做其他的事)

所以, 除了前面 5 中提到的若干种 (124) 向前影响 completer 栈链的挂起数或 root 的完成态, 还需要一个能向栈链后方有所影响的操作, 比如帮助子任务的完成, 毕竟子任务也是 b 中 fork 出来且由自己入队的.

helpComplete 方法就可以做到这一点, 它在 ForkJoinPool 中, 它仅应在当前任务未完成时使用, 首先它会尝试将当前任务从出队列并执行 (ForkJoinPool::popCC 及成功后续 doExec,LIFO), 出队失败则表示正在被执行甚至被偷去执行. 出队这一步之后, 再尝试自己的线程工作队列中找出自己的子孙任务(FIFO) 并进行执行(ForkJoinPool::pollAndExecCC).

而若执行完某个父任务的工作线程必然会调用 tryComplete 等有关方法, 将自身或栈链后方的某一个 completer 的挂起数减一, 甚至因为一些不合理的 api 使用 (如直接更改了后方某个任务的挂起数量) 而直接终止了 root, 将 root 任务标记成完成态.(注意前面强调的 ” 运行完自己计算的部分 ”, 这就是否定本句话的关键了, 前面也说明 ”helpComplete 仅在当前任务未完成时使用 ”, 显然, 完成了自己负责的计算内容并不代表当前任务完成了, 因为它的子任务还没有完成, 因此它不会调用 tryComplete, 并且可以去帮助子任务)

同时, 执行完父任务负责的计算内容的任务线程也会去找它栈链后方的其他任务, 按照 b 的逻辑, 这将是它的子任务, 帮助它们完成, 每完成一个子任务(子任务无子任务, 不再 help 的情况), 会进行 tryComplete 传递一次.

余下的方法很简单.

// 重写自 ForkJoinTask 的结果, 前文也说过 CountedCompleter 也不维护 result, 返回 null.
// 但并行流或者一些其他并行操作可以实现此结果, 比如 ConcurrentHashMap 中支持的 map reduce 操作.
public T getRawResult() { return null;}

// 同上, 默认空, 一些子类会有特别的实现.
protected void setRawResult(T t) {}

显然,completer 栈链上的所有任务是可以并行执行的, 且每一个完成都可以向后 tryComplete 一次, 并在其后可以帮助前面的任务完成, 而我们若实现上述两个方法, 完全可以将自身运算的结果设置进去, 在 root 被安静完成后,ForkJoinTask 将可以 get 到结果(或 join 也将返回结果), 可在此时合并计算结果, 有些结果显然是可以并行的.

一些操作, 比如 find 类型, 任何一个子任务完成了 find, 就可以直接让 root 结束, 然后直接让整条栈链上的任务 cancelIgnoringExceptions.

一些需要聚合每一个任务结果的操作, 比如 reduce 类型, 需要每个父任务根据子任务的结果去 reduce, 它的父任务再根据他和兄弟任务的结果 reduce, 最终合并到 root. 显然,mapper 由子任务实现,reducer 由父任务实现.

一些接近 find 或 reduce 类型(或者说 find 的变种), 比如 filter, 每一个任务都会有结果, 这个结果可能是自己负责的原集中的一部分子集, 也可能就是个空集, 父任务合并每个子任务的结果集, 直到 root.

排序类型的操作, 如使用归并排序, 显然每个父任务即是 divider 也是 merger, 分解出的每个子集交给子任务去计算, 父任务再去负责 merge.

……

以上是 ForkJoinTask 的抽象子类 CountedCompleter 的源码分析, 接下来我们继续分析工作线程.

ForkJoinWorkerThread 源码

只要对 java 的线程结构稍有了解,ForkJoinWorkerThread 的源码十分简单, 且前面提过,ForkJoinTask 被声称是一个轻量于普通线程和 Future 的实体, 而它在 ForkJoinPool 中的运行载体便是 ForkJoinWorkerThread, 这个轻量究竟体现在何处?

// 类签名, 直接继承自 Thread
public class ForkJoinWorkerThread extends Thread {
// 每个 ForkJoinWorkerThread 都只能属于一个线程池, 且保存该池的引用.
final ForkJoinPool pool; 
// 每个 ForkJoinWorkerThread 都有一个工作队列, 显然队列中的任务就是该线程干活的最小单位了. 它也是工作窃取机制的核心.             
final ForkJoinPool.WorkQueue workQueue; 

// 构造函数, 创建时指定线程池.
protected ForkJoinWorkerThread(ForkJoinPool pool) {
    // 线程名称
    super("aForkJoinWorkerThread");
    this.pool = pool;
    // 将工作线程注册到 ForkJoinPool 后会返回一个工作队列, 供当前线程使用和供其他线程偷取.
    this.workQueue = pool.registerWorker(this);
}

// 带线程组的构造器
ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup,
                     AccessControlContext acc) {super(threadGroup, null, "aForkJoinWorkerThread");
    //inheritedAccessControlContext 是从 Thread 继承下来的, 字面意思是继承的访问控制上下文, 设置为 acc.
    U.putOrderedObject(this, INHERITEDACCESSCONTROLCONTEXT, acc);
    // 注册入池之前, 清除掉本地化信息
    eraseThreadLocals(); 
    this.pool = pool;
    this.workQueue = pool.registerWorker(this);
}

// 返回注册的池.

public ForkJoinPool getPool() {return pool;}

// 返回当前线程工作队列在池中的索引, 每个队列都会维护一个在池中的索引.
public int getPoolIndex() {return workQueue.getPoolIndex();
}

/**
 * Initializes internal state after construction but before
 * processing any tasks. If you override this method, you must
 * invoke {@code super.onStart()} at the beginning of the method.
 * Initialization requires care: Most fields must have legal
 * default values, to ensure that attempted accesses from other
 * threads work correctly even before this thread starts
 * processing tasks.
 */
// 空函数, 可交给子类实现, 按照官方注释, 它的作用是在构造之后(这个构造不是指 new 出线程对象,
// 而是在 run 方法已进入的时候, 说明 "构造" 是指线程已经完成了创建能够正常运行), 处理任务之前.
protected void onStart() {}


// 工作线程终止时的勾子方法, 负责执行一些有关的清理操作. 但是若要重写它, 必须在方法的
// 最后调用 super.onTermination. 参数 exception 是造成该线程终止的异常. 若是正常结束,
// 则它是 null.
protected void onTermination(Throwable exception) {
}

// 核心方法.
public void run() {
    //doug 在这一块标注 "只运行一次", 查看 ForkJoinPool 的源码,
    //ForkJoinPool 中会有一个 WorkQueue 的数组, 在取消线程的注册后,
    // 本线程关联的 WorkQueue 会从该数组移除, 但 WorkQueue 中的 array 不会置空.
    if (workQueue.array == null) {
        Throwable exception = null;
        try {
            // 前面说过的预先操作
            onStart();
            // 用线程池的 runWorker 方法执行, 传入队列.
            pool.runWorker(workQueue);
        } catch (Throwable ex) {
            // 发生异常, 中断前记录下来
            exception = ex;
        } finally {
            try {
                // 将记录下来的异常调用勾子方法.
                onTermination(exception);
            } catch (Throwable ex) {if (exception == null)
                    // 执行勾子方法本身出现了异常, 记录下来
                    exception = ex;
            } finally {
                // 调用线程池的解除注册方法, 会将本线程的 WorkQueue 从数组中移除, 同时使用上述异常.
                pool.deregisterWorker(this, exception);
            }
        }
    }
}

// 擦除本地变量. 把当前线程的两个 ThreadLocalMap 全部置空
final void eraseThreadLocals() {U.putObject(this, THREADLOCALS, null);
    U.putObject(this, INHERITABLETHREADLOCALS, null);
}

// 每正常运行完一次顶级 task, 就调用一次它. 这个顶级任务自带易误解天性, 其实可以理解为每一次从队列取出的任务.
void afterTopLevelExec() {}




// 自带子类. 它不具备任何特殊权限, 也不是用户定义的任何线程组的成员, 每次运行完一个顶级任务,
// 则擦除本地化变量.
static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
   // 自已创建默认线程组.
    private static final ThreadGroup innocuousThreadGroup =
        createThreadGroup();
    // 访问控制上下文支持权限.
    private static final AccessControlContext INNOCUOUS_ACC =
        new AccessControlContext(new ProtectionDomain[] {new ProtectionDomain(null, null)
            });
    // 构造函数.
    InnocuousForkJoinWorkerThread(ForkJoinPool pool) {super(pool, innocuousThreadGroup, INNOCUOUS_ACC);
    }

    @Override 
    void afterTopLevelExec() {
        // 在每一次从队列取出的 "顶级" 任务运行后即擦除本地化变量.
        eraseThreadLocals();}

    @Override 
    public ClassLoader getContextClassLoader() {
        // 如果获取线程上下文类加载器, 永远直接返回系统类加载器.
        return ClassLoader.getSystemClassLoader();}

    // 尝试对未捕获异常处理器的设置, 忽略.
    @Override 
    public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }

    // 禁止直接设置线程的上下文类加载器.
    @Override 
    public void setContextClassLoader(ClassLoader cl) {throw new SecurityException("setContextClassLoader");
    }

    
    // 创建一个以顶级线程组为父的线程组.
    private static ThreadGroup createThreadGroup() {
        try {sun.misc.Unsafe u = sun.misc.Unsafe.getUnsafe();
            Class<?> tk = Thread.class;
            Class<?> gk = ThreadGroup.class;
            long tg = u.objectFieldOffset(tk.getDeclaredField("group"));
            long gp = u.objectFieldOffset(gk.getDeclaredField("parent"));
            // 当前线程的所属组.
            ThreadGroup group = (ThreadGroup)
                u.getObject(Thread.currentThread(), tg);
            // 循环条件, 当前线程的所属组不是 null
            while (group != null) {
                // 不停地循环向上取 parent
                ThreadGroup parent = (ThreadGroup)u.getObject(group, gp);
                if (parent == null)
                    // 发现无 parent 的线程组, 说明是系统顶级线程组, 用它当 parent 创建一个 "无害" 线程组返回.
                    return new ThreadGroup(group,
                                           "InnocuousForkJoinWorkerThreadGroup");
                // 有 parent, 把它赋给 group 开启下一轮循环.
                group = parent;
            }
        } catch (Exception e) {
            // 有异常用 Error 包装抛出.
            throw new Error(e);
        }
        // 不能 return 就抛出 Error.
        throw new Error("Cannot create ThreadGroup");
    }
}

以上是工作线程的代码, 粗略总结一下它和普通线程的区别.

首先, 它内部会维护一个工作队列, 用它来实现任务调度和窃取.

其次, 它提供了一些扩展, 如每次顶层任务运行结束, 清理 ThreadLocal, 这也是一种保护机制, 避免同线程的本地化数据随之污染. 但粗略去看 ForkJoinPool 的代码, 发现它只是在每次从队列取出并运行完一个任务后清除, 并称这个为 ” 顶级循环 ”, 这倒也没错, 但这个任务并不能称之为顶级任务, 因为这里的任务类型是 ForkJoinTask, 不一定是 CountedCompleter 等明显标识了依赖关系的子类, 所以父任务和子任务被塞进一个队列, 即使未被窃取, 只由当前线程执行, 两次的本地化数据也是不同的.

不过如果我们在 ForkJoinTask 的 exec 方法中加入本地化, 或在 CountedCompleter 中加入本地化, 显然每一个在此生成的子任务都会在相应的线程执行 doExec 时设置这些属性, 并在执行结束后清除.

最后官方提供的默认子类, 以及一些线程组, 优先级, 权限等作者也未深入研究, 但是我们构建线程池的时候有一个参数就是 ” 线程工厂 ”, 了解下它或许能对后续的 ForkJoinPool 源码阅读有所帮助.

接下来简述一个官方提供的案例, 并以此聊一聊并行流.

官方案例

第一节论述了 CountedCompleter, 显然它作为一个抽象类, 只是定义了某一些环节, 以及一些环节的子环节的组合过程, 而具体的实现与使用它定义的 api 则由用户实现, 它的源码中并无使用 (当然也可以看一些子类, 但比较复杂), 在 CountedCompleter 的源码注释中, 道格大神提供了若干案例, 这里举出两个来简要说明一下前面论述过的使用方式, 也可以为下一节论述官方提供的子类(并行流 api 中) 提供阅读基础.

第一个是并行的可窃取的分治查找算法.

@Test
public void testDivideSearch(){Integer[] array = new Integer[10000000];
    for(int i = 0; i < array.length; i++){array[i] = i+1;
    }
    AtomicReference<Integer> result = new AtomicReference<>();
    Integer find = new Searcher<>(null, array, result, 0,
            array.length - 1,this::match).invoke();
    LOGGER.info("查找结束, 任务返回:{},result:{}",find,result.get());

}

static class Searcher<E> extends CountedCompleter<E> {final E[] array; final AtomicReference<E> result; final int lo, hi;
    final Function<E,Boolean> matcher;

    Searcher(CountedCompleter<?> p, E[] array, AtomicReference<E> result,
             int lo, int hi,Function<E,Boolean> matcher){super(p);
        this.array = array;
        this.result = result;
        this.lo = lo;
        this.hi = hi;
        this.matcher = matcher;
    }
    @Override
    public void compute() {
        int l = this.lo;int h = this.hi;
        while(result.get() == null && h >= l){if(h - l >=2){int mid = (l + h)>>>1;
                // 添加挂起任务数量, 这样当出现 tryComplete 时可以触发 root 的结束(未查到)
                addToPendingCount(1);
                new Searcher<E>(this,array,result,mid,h,matcher).fork();
                h = mid;
            }else{E x = array[l];
                if(matcher.apply(x) &&  result.compareAndSet(null,x)){super.quietlyCompleteRoot();
                }
                break;
            }
        }
        // 当前未有任何一个线程查到结果, 当前任务也完成了子集查找, 减少一个挂起数量, 若挂起数已减至 0 则终止.
        if(null == result.get())
            tryComplete();}

}

private boolean match(Integer x) {return x > 2000000 &&  x%2 ==0 && x%3 == 0 && x%5 ==0 && x %7 ==0;}

该案例的逻辑很简单, 给定一个非常大的数组, 充分利用本机的资源去查找满足一个条件的元素. 为了方便, 在具体的查找数据上选定了整型, 查找的条件也非常简单.

在该案例中, 会对结果进行分治, 首先分治出足够多的子任务, 剩下的不需再分的父任务由当前线程完成, 子任务则压入工作队列, 其他空闲的线程就会来偷取子任务并执行. 当有任务一个子任务查找到相应的数字后, 即将它存放到 result, 并安静地完成根任务.

此时整个任务链处在一个非常尴尬的情况: 查找到结果的子任务将 root 设置为完成, 而整条链上的非 root 任务均未完成. 但因循环条件不满足, 退出了循环. 此时查到 result 已有值, 并不执行最后的 tryComplete, 执行结束, 任务的 status 依旧为未完成, 是否有重复执行的问题?

答案是没有问题, 因为 ForkJoinTask 绝对会在 ForkJoinPool 中调度(哪怕是 common 池), 在 common 池中, 任务执行前必须出队, 尽管 compute 方法在本例中没有将这些任务设置为完成, 但任务不会被二次执行. 可见, 上一章中费大力介绍的 status 字段也有无用的时候.

但是除了 root 任务需要使用到获取结果的功能, 需要保证 status 是负数, 它产生的子孙任务还有什么用呢? 所有 compute 方法会因为循环中止而结束, 此后的这些任务不存在任何外部引用, 会被 gc 清理, 即使存在外部引用, 用它去获取子孙任务的执行情况或 result 也没有任何意义.

显然这个案例解决了至少两个疑问, 一是怎么实现一个保存 result 的 ForkJoinTask, 二是 ForkJoin 框架如何在查找方面大幅提升性能, 很明显, 相比单线程遍历的办法, 此例多线程查询, 且任何一个子任务在并行条件下完成了查询, 整个大任务均可以终止.

第二个是传说中的 map reduce. 大数据中常使用此概念(跨节点).

在并行流中,map 可以代表非阻断操作,reduce 可以代表阻断操作, 但是 reduce 同样可以并行地执行.

道格在注释上给出了两个 map reduce 案例, 我们只看第一个, 它也是后续并行流一节我们要看的例子比较相近的解法. 方法二有些绕, 较难理解, 但也优雅.

@Test
public void testMapReduce() {Integer[] array = {1, 2, 3};
    // 方法一.
    Integer result = new MapRed<>(null, array, (a)->a+2, (a,b)->a+b,  0,array.length).invoke();
    LOGGER.info("方法一 result:{}",result);
    // 方法二我就不抄了, 就在官方注释上.
    result = new MapReducer<>(null, array, (a) -> a + 1
            , (a, b) -> a + b, 0, array.length, null).invoke();
    LOGGER.info("方法二 result:{}", result);

}


/**
 * 第一种 map reduce 方式, 很好理解.
 * @param <E>
 */
private class MapRed<E> extends CountedCompleter<E> {final E[] array;
    final MyMapper<E> mapper;
    final MyReducer<E> reducer;
    final int lo, hi;
    MapRed<E> sibling;// 兄弟节点的引用
    E result;

    MapRed(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
               MyReducer<E> reducer, int lo, int hi) {super(p);
        this.array = array;
        this.mapper = mapper;
        this.reducer = reducer;
        this.lo = lo;
        this.hi = hi;
    }

    public void compute() {if (hi - lo >= 2) {int mid = (lo + hi) >>> 1;
            MapRed<E> left = new MapRed(this, array, mapper, reducer, lo, mid);
            MapRed<E> right = new MapRed(this, array, mapper, reducer, mid, hi);
            left.sibling = right;
            right.sibling = left;
            // 只挂起右任务
            setPendingCount(1);
            right.fork();
            // 直接运算左任务.
            left.compute();} else {if (hi > lo)
                result = mapper.apply(array[lo]);
            // 它会依次调用 onCompletion. 并且是自己调自己或 completer 调子,
            // 且只有左右两个子后完成的能调成功(父任务的挂起数达到 0).
            tryComplete();}
    }

    public void onCompletion(CountedCompleter<?> caller) {
        // 忽略自己调自己.
        if (caller != this) {
            // 参数是子任务.
            MapRed<E> child = (MapRed<E>) caller;
            MapRed<E> sib = child.sibling;
            // 设置父的 result.
            if (sib == null || sib.result == null)
                result = child.result;
            else
                result = reducer.apply(child.result, sib.result);
        }
    }

    public E getRawResult() {return result;}
}
//mapper 和 reducer 简单的不能再简单.
@FunctionalInterface
private static interface MyMapper<E> {E apply(E e);
}
@FunctionalInterface
private static interface MyReducer<E> {E apply(E a, E b);
}

上面的逻辑也很简单, 首先就是对任务的分解, 简单的将任务分为左和右, 左直接由父任务执行 (可能再分), 右则入池, 所有子任务直到不能再分(叶子任务) 以 map 为 result, 每个叶子任务完成后会调用 tryComplete.

这个动作会触发一系列的 completer 栈元素的挂起数下降或完成, 显然, 如果把 completer 理解为一个普通树 (这是作者很少见到的非二叉树的情况, 尽管这个例子写成了二叉树, 我们完全可以在 compute 中将父任务一分为多, 而不是限 2 个), 从叶子节点开始, 每个叶子节点完成(result 是 mapper 的结果) 会尝试 onCompletion 并减少父节点的挂起任务数, 但只有同父节点的最后一个兄弟节点可以进入 onCompletion 设置父节点的结果, 并且由于这个设置过程的前提是父节点符合挂起任务数为 0, 因此符合循环继续的条件, 叶子节点的动作会继续向上判断父节点的父节点, 直到 root 为止. 假设线程数量足够, 保证每个子任务都有一个线程处理, 那么深度每上一层, 就会有一半 (非二叉树的情况每个父节点只能有一个通过) 的执行叶子节点任务的线程因不符合某个任务的挂起数量为 0 的条件而退出, 这样逐级传导, 最后到 root 调用它最后一个子节点的 onCompletion, 使用 reducer 进行合并.

本例中进行结果合并的写法 (onCompletion) 只适合二叉树, 有兴趣的读者可以看看道格在注释中给出的第二种写法, 几叉都可以. 而且该实现很优雅, 并未写 onCompletion 函数, 但是写法真心够绕的.

并行流简述

在 JAVA8 中支持了 lamda 表达式的同时, 也支持了函数式编程, 由此出现了一种新型的计算方式: 流式计算, 也出现了一种让包括作者在内很多人兴奋不已的编程方式: 响应式编程.

流式计算的核心在于 Stream api, 流有很多分类, 比如并行流和串行流, 这点可以顾名思义, 同样的, 流中的每一个操作都可以划分类型, 比如阻断操作和非阻断操作.

java 中实现并行流就是基于这些操作,CountedCompleter 的一些子类就是这些操作的类型, 显然, 如在前一篇文章所说, 使用了并行流, 就是使用了 ForkJoin 框架.

当我们使用下面的代码, 会发生什么操作?

Stream.of(1,2,3,4,5).parallel().map(x -> x + 1).reduce((a, b) -> a + b).get();

//map 只是将动作简单地记了下来, 包装起来, 等到阻断操作时才会真正执行. 位于 ReferencePipeline
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {Objects.requireNonNull(mapper);// 非空检查
    // 返回一个无状态操作.
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            // 典型的适配器模式. 将 action 一律封装为 Sink.
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void accept(P_OUT u) {downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}
// 阻断操作 reduce 位于 ReferencePipeline
public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {return evaluate(ReduceOps.makeRef(accumulator));
}
//AbstractPipeline
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
//TerminalOp 阻断操作接口的默认方法
default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper,
                                  Spliterator<P_IN> spliterator) {if (Tripwire.ENABLED)
        Tripwire.trip(getClass(), "{0} triggering TerminalOp.evaluateParallel serial default");
    return evaluateSequential(helper, spliterator);
}
// 看 ReduceOps 它返回了一内部类 ReduceTask
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                     Spliterator<P_IN> spliterator) {return new ReduceTask<>(this, helper, spliterator).invoke().get();
    }
// 内部类 ReduceTask 间接继承自 CountedCompleter
private static final class ReduceTask<P_IN, P_OUT, R,
                                      S extends AccumulatingSink<P_OUT, R, S>>
        extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {
    private final ReduceOp<P_OUT, R, S> op;

    ReduceTask(ReduceOp<P_OUT, R, S> op,
               PipelineHelper<P_OUT> helper,
               Spliterator<P_IN> spliterator) {super(helper, spliterator);
        this.op = op;
    }

    ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,
               Spliterator<P_IN> spliterator) {super(parent, spliterator);
        this.op = parent.op;
    }
    // 老外起的名子, 造小孩. 
    @Override
    protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
        // 和上面的例子非常相似的代码, 只是封装更好.
        return new ReduceTask<>(this, spliterator);
    }

    @Override
    protected S doLeaf() {
        // 叶子节点做这个.
        return helper.wrapAndCopyInto(op.makeSink(), spliterator);
    }

    // 重写了前面提过的 onCompletion 函数
    @Override
    public void onCompletion(CountedCompleter<?> caller) {if (!isLeaf()) {
            // 不是叶子节点. 这条件, 和前面咱们分析的多么匹配.
            // 计算左结果
            S leftResult = leftChild.getLocalResult();
            // 联合右结果.
            leftResult.combine(rightChild.getLocalResult());
            // 联合完的结果就是当前 completer 的结果.
            setLocalResult(leftResult);
        }
        // 直接父类是 AbstractTask, 它会对父, 左右子帮助 gc.
        super.onCompletion(caller);
    }
}
//AbstractTask 帮助 gc
public void onCompletion(CountedCompleter<?> caller) {
    spliterator = null;
    leftChild = rightChild = null;
}
// 更多实现细节自阅...

显然, 并行流 (至少我举的这个例子) 是基于 ForkJoin 框架的. 分治的思想与前面道格的例子相似, 只是更加优雅和封装更好. 有了前面的基础, 若要详细熟悉并行流原理, 需要进一步了解的只有他们的继承树, 分割聚合组件等边角料, 核心的调度思想已经不再是困难.

回到问题, 当我们使用并行流时发生了什么? 首先是非阻断操作时, 与串行流情况同样, 也是先将 action 封装成适配器, 仅在阻断操作发生时的调度不同, 并行流在阻断操作下使用 ForkJoin 框架进行调度, 任务的分割则使用它的 Splitor, 结果的合并也有它的 Combiner. 其他的流程与上面的案例无异.

后语

1.CountedCompleter 使用普通树的结构存放动作, 但是它又是另类的树, 因为子节点能找到父节点, 父节点却找不到子节点, 而只知道子节点代表的动作未执行的数量, 因此或许从访问方式的角度来看还是用栈来理解更好. 在这里树既是数据结构, 也是一个另类的操作栈. 只从一个 completer 往下看, 它是个栈, 但从父节点的角度来讲, 它是一个访问不到子节点的普通树 (或许我们不应该强行为它套上一个数据结构, 不然总觉得不伦不类, 但是用树这个形状便于理解). 每个节点会存放挂起任务数量, 每个节点的任务完成未必会设置它自己的完成态, 但会尝试将 completer 父元素栈(或者树的一条线) 上的每个任务挂起数量减一或将根节点安静置为完成态. 关于具体的理解和代码实现, 以及如何保存一个任务的运行结果, 可以参考前面案例的章节, 也可以以此为基础去看并行流的源码, 但也要相应的理解并行流为了便捷实现而提供的各种分割合并组件.

2.ForkJoinWorkerThread 是运行在 ForkJoinPool 中的主要线程, 它内部维护了一个工作任务队列, 并存放了该队列在线程池中的间接索引. 借此实现任务的窃取, 避免过于空闲等待, 任务 fork 会直接 push 到该队列, 第一次扩容时, 才给该队列初始化任务数组, 当线程从池中卸载时, 不会清除掉该数组, 这样线程无法再次启动. 线程的启动有一些勾子, 官方提供的线程工厂有两个, 一个直接创建 ForkJoinWorkerThread, 另一个创建它的子类

InnocuousForkJoinWorkerThread, 它除了一些安全策略外, 最大的区别在于 ForkJoinWorkerThread 在注册入池前进行本地化数据的清理, 而它则每次完成一个主任务处理就清理一次.

3. 并行流是 ForkJoin 框架的一个典型应用,JAVA8 Stream api 中的并行流定义了大量的以 CountedCompleter 为基础的操作. 利用分割 / 合并和周边组件实现了基于 ForkJoin 框架的并行计算调度.

正文完
 0