ForkJoin框架之ForkJoinTask

5次阅读

共计 23791 个字符,预计需要花费 60 分钟才能阅读完成。

前言

在前面的文章 ”CompletableFuture 和响应式编程 ” 中提到了 ForkJoinTask 和 ForkJoinPool, 后者毫无疑问是一个线程池, 前者则是一个类似 FutureTask 经典定义的概念.

官方有一个非常无语的解释:ForkJoinTask 就是运行在 ForkJoinPool 的一个任务抽象,ForkJoinPool 就是运行 ForkJoinTask 的线程池.

ForkJoin 框架包含 ForkJoinTask,ForkJoinWorkerThread,ForkJoinPool 和若干 ForkJoinTask 的子类, 它的核心在于分治和工作窍取, 最大程度利用线程池中的工作线程, 避免忙的忙死, 饿的饿死.

ForkJoinTask 可以理解为类线程但比线程轻量的实体, 在 ForkJoinPool 中运行的少量 ForkJoinWorkerThread 可以持有大量的 ForkJoinTask 和它的子任务.ForkJoinTask 同时也是一个轻量的 Future, 使用时应避免较长阻塞和 io.

ForkJoinTask 在 JAVA8 中应用广泛, 但它是一个抽象类, 它的子类派生了各种用途, 如后续计划单独介绍的 CountedCompleter, 以及若干 JAVA8 中 stream api 定义的与并行流有关的各种操作(ops).

源码

首先看 ForkJoinTask 的签名.

public abstract class ForkJoinTask<V> implements Future<V>, Serializable 

从签名上看,ForkJoinTask 实现了 future, 也可以序列化, 但它不是一个 Runnable 或 Callable.

ForkJoinTask 虽然可以序列化, 但它只对运行前和后敏感, 对于执行过程中不敏感.

先来看 task 的运行字段:

//volatie 修饰的任务状态值, 由 ForkJoinPool 或工作线程修改.
volatile int status; 
static final int DONE_MASK   = 0xf0000000;// 用于屏蔽完成状态位. 
static final int NORMAL      = 0xf0000000;// 表示正常完成, 是负值.
static final int CANCELLED   = 0xc0000000;// 表示被取消, 负值, 且小于 NORMAL
static final int EXCEPTIONAL = 0x80000000;// 异常完成, 负值, 且小于 CANCELLED
static final int SIGNAL      = 0x00010000;// 用于 signal, 必须不小于 1 <<16, 默认为 1 <<16.
static final int SMASK       = 0x0000ffff;// 后十六位的 task 标签

很显然,DONE_MASK 能够过滤掉所有非 NORMAL, 非 CANCELLED, 非 EXCEPTIONAL 的状态, 字段的含义也很直白, 后面的 SIGNAL 和 SMASK 还不明确, 后面再看.

// 标记当前 task 的 completion 状态, 同时根据情况唤醒等待该 task 的线程.
private int setCompletion(int completion) {for (int s;;) {// 开启一个循环, 如果当前 task 的 status 已经是各种完成(小于 0), 则直接返回 status, 这个 status 可能是某一次循环前被其他线程完成.
        if ((s = status) < 0)
            return s;
        // 尝试将原来的 status 设置为它与 completion 按位或的结果.
        if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {if ((s >>> 16) != 0)
                // 此处体现了 SIGNAL 的标记作用, 很明显, 只要 task 完成(包含取消或异常), 或 completion 传入的值不小于 1 <<16,
                // 就可以起到唤醒其他线程的作用.
                synchronized (this) {notifyAll(); }
            //cas 成功, 返回参数中的 completion.
            return completion;
        }
    }
}

前面用注释解释了这个方法的逻辑, 显然该方法是阻塞的, 如果传入的参数不能将 status 设置为负值会如何?

显然, 可能会有至多一次的成功 cas, 并且若满足唤醒的条件, 会尝试去唤醒线程, 甚至可能因为为了唤醒其他线程而被阻塞在 synchonized 代码块外; 也可能没有一次成功的 cas, 直到其他线程成功将 status 置为完成.

//final 修饰, 运行 ForkJoinTask 的核心方法.
final int doExec() {
    int s; boolean completed;
    // 仅未完成的任务会运行, 其他情况会忽略.
    if ((s = status) >= 0) {
        try {
            // 调用 exec
            completed = exec();} catch (Throwable rex) {
            // 发生异常, 用 setExceptionalCompletion 设置结果
            return setExceptionalCompletion(rex);
        }
        if (completed)
            // 正常完成, 调用前面说过的 setCompletion, 参数为 normal, 并将返回值作为结果 s.
            s = setCompletion(NORMAL);
    }
    // 返回 s
    return s;
}

// 记录异常并且在符合条件时传播异常行为
private int setExceptionalCompletion(Throwable ex) {
    // 首先记录异常信息到结果
    int s = recordExceptionalCompletion(ex);
    if ((s & DONE_MASK) == EXCEPTIONAL)
        //status 去除非完成态标志位(只保留前 4 位), 等于 EXCEPTIONAL. 内部传播异常
        internalPropagateException(ex);
    return s;
}
//internalPropagateException 方法是一个空方法, 留给子类实现, 可用于 completer 之间的异常传递
void internalPropagateException(Throwable ex) {}
// 记录异常完成
final int recordExceptionalCompletion(Throwable ex) {
    int s;
    if ((s = status) >= 0) {
        // 只能是异常态的 status 可以记录.
        //hash 值禁止重写, 不使用子类的 hashcode 函数.
        int h = System.identityHashCode(this);
        final ReentrantLock lock = exceptionTableLock;
        // 异常锁, 加锁
        lock.lock();
        try {
            // 抹除脏异常, 后面叙述
            expungeStaleExceptions();
            // 异常表数组.ExceptionNode 后面叙述.
            ExceptionNode[] t = exceptionTable;//exceptionTable 是一个全局的静态常量, 后面叙述
            // 用 hash 值和数组长度进行与运算求一个初始的索引
            int i = h & (t.length - 1);
            for (ExceptionNode e = t[i]; ; e = e.next) {
                // 找到空的索引位, 就创建一个新的 ExceptionNode, 保存 this, 异常对象并退出循环
                if (e == null) {t[i] = new ExceptionNode(this, ex, t[i]);//(1)
                    break;
                }
                if (e.get() == this) // 已设置在相同的索引位置的链表中, 退出循环.//2
                    break;
            // 否则 e 指向 t[i]的 next, 进入下个循环, 直到发现判断包装 this 这个 ForkJoinTask 的 ExceptionNode 已经出现在 t[i]这个链表并 break(2),
            // 或者直到 e 是 null, 意味着 t[i]出发开始的链表并无包装 this 的 ExceptionNode, 则将构建一个新的 ExceptionNode 并置换 t[i],
            // 将原 t[i]置为它的 next(1). 整个遍历判断和置换过程处在锁中进行.
            }
        } finally {lock.unlock();
        }
        // 记录成功, 将当前 task 设置为异常完成.
        s = setCompletion(EXCEPTIONAL);
    }
    return s;
}

//exceptionTable 声明
private static final ExceptionNode[] exceptionTable;// 全局异常 node 表
private static final ReentrantLock exceptionTableLock;// 上面用到的锁, 就是一个普通的可重入锁.
private static final ReferenceQueue<Object> exceptionTableRefQueue;// 变量表引用队列, 后面详述.
private static final int EXCEPTION_MAP_CAPACITY = 32;// 异常表的固定容量, 不大, 只有 32 而且是全局的.

// 初始化在一个静态代码块.
static {exceptionTableLock = new ReentrantLock();
    exceptionTableRefQueue = new ReferenceQueue<Object>();
    exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY];// 容量
    try {U = sun.misc.Unsafe.getUnsafe();
        Class<?> k = ForkJoinTask.class;
        STATUS = U.objectFieldOffset
            (k.getDeclaredField("status"));
    } catch (Exception e) {throw new Error(e);
    }
}

// 先来看 ExceptionNode 内部类的实现
// 签名, 实现了一个 ForkJoinTask 的弱引用.
static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {
    final Throwable ex;
    ExceptionNode next;
    final long thrower;  // use id not ref to avoid weak cycles
    final int hashCode;  // store task hashCode before weak ref disappears
    ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {super(task, exceptionTableRefQueue);// 指向弱引用的构造函数, 保存引用为 task, 队列为全局的 exceptionTableRefQueue.
        this.ex = ex;// 抛出的异常的引用
        this.next = next;// 数组中的 ExceptionNode 以链表形式存在, 前面分析过, 先入者为后入者的 next
        this.thrower = Thread.currentThread().getId();// 保存抛出异常的线程 id(严格来说是创建了 this 的线程)
        this.hashCode = System.identityHashCode(task);// 哈希码保存关联 task 的哈希值.
    }
}
// 清除掉异常表中的脏数据, 仅在持有全局锁时才可使用. 前面看到在记录新的异常信息时要进行一次清除尝试
private static void expungeStaleExceptions() {
    // 循环条件, 全局 exceptionTableRefQueue 队列不为空, 前面说过 ExceptionNode 是弱引用, 当它被回收时会被放入此队列.
    for (Object x; (x = exceptionTableRefQueue.poll()) != null;) {
        // 从队首依次取出元素.
        if (x instanceof ExceptionNode) {
            // 计算在全局 exceptionTable 中的索引.
            int hashCode = ((ExceptionNode)x).hashCode;
            ExceptionNode[] t = exceptionTable;
            int i = hashCode & (t.length - 1);
            // 取出 node
            ExceptionNode e = t[i];
            ExceptionNode pred = null;
            // 不停遍历, 直到 e 是 null 为止.
            while (e != null) {
                // e 的 next
                ExceptionNode next = e.next;//2
                // x 是队首出队的元素. 它与 e 相等说明找到
                if (e == x) {
                    // e 是一个链表的元素,pred 表示它是否有前置元素
                    if (pred == null)
                        // 无前置元素, 说明 e 在链表首部, 直接将首部元素指向 next 即可.
                        t[i] = next;
                    else
                        // 有前置元素, 说明循环过若干次, 将当前 e 出链表
                        pred.next = next;
                    // 在链表中发现 x 即 break 掉内循环, 继续从 exceptionTableRefQueue 的队首弹出新的元素.
                    break;
                }
                // 只要发现当前 e 不是 x, 准备下一次循环,pred 指向 e.e 指向 next, 进行下一个元素的比较.
                pred = e;
                e = next;
            }
        }
    }
}

到此 doExec(也是每个 ForkJoinTask 的执行核心过程)就此结束.

很明显,ForkJoinTask 的 doExec 负责了核心的执行, 它留下了 exec 方法给子类实现, 而重点负责了后面出现异常情况的处理. 处理的逻辑前面已论述, 在产生异常时尝试将异常存放在全局的 execptionTable 中, 存放的结构为数组 + 链表, 按哈希值指定索引, 每次存放新的异常时, 顺便清理上一次已被 gc 回收的 ExceptionNode. 所有 ForkJoinTask 共享了一个 exceptionTable, 因此必然在有关的几个环节要进行及时的清理. 除了刚刚论述的过程, 还有如下的几处:

前面论述了 recordExceptionalCompletion, 一共有四处使用了 expungeStaleException, 将已回收的 ExceptionNode 从引用队列中清除.

clearExceptionalCompletion 在对一个 ForkJoinTask 重新初始化时使用, 我们在前面提到序列化时说过,ForkJoinTask 的序列化结果只保留了两种情况: 运行前, 运行结束. 重新初始化一个 ForkJoinTask, 就要去除任何中间状态, 包含自身产出的已被回收的异常 node, 而 expungeStaleExceptions 显然也顺便帮助其他 task 清除.

getThrowableException 是查询 task 运行结果时调用, 如一些 get/join 方法, 很明显, 记录这个异常的作用就在于返回给 get/join, 在这一块顺便清理已被回收的 node, 尤其是将自己运行时生成的 node 清除.

helpExpungeStaleExceptions 是提供给 ForkJoinPool 在卸载 worker 时使用, 顺便帮助清理全局异常表.

使用它们的方法稍后再论述, 先来继续看 ForkJoinTask 的源码.

// 内部等待任务完成, 直到完成或超时.
final void internalWait(long timeout) {
    int s;
    //status 小于 0 代表已完成, 直接忽略 wait.
    // 未完成, 则试着加上 SIGNAL 的标记, 令完成任务的线程唤醒这个等待.
    if ((s = status) >= 0 && 
        U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
        // 加锁, 只有一个线程可以进入.
        synchronized (this) {
            // 再次判断未完成. 等待 timeout, 且忽略扰动异常.
            if (status >= 0)
                try {wait(timeout); } catch (InterruptedException ie) { }
            else
                // 已完成则响醒其他等待者.
                notifyAll();}
    }
}

internalWait 方法逻辑很简单, 首先判断是否未完成, 满足未完成, 则将标记位加上 SIGNAL(可能已有别的线程做过), 随后加锁 double check status, 还未完成则等待并释放锁, 若发现已完成, 或在后续被唤醒后发现已完成, 则唤醒其他等待线程. 通过 notifyAll 的方式避免了通知丢失.

同时, 它的使用方法目前只有一个 ForkJoinPool::awaitJoin, 在该方法中使用循环的方式进行 internalWait, 满足了每次按截止时间或周期进行等待, 同时也顺便解决了虚假唤醒.

继续看 externalAwaitDone 函数. 它体现了 ForkJoin 框架的一个核心: 外部帮助.

// 外部线程等待一个 common 池中的任务完成.
private int externalAwaitDone() {int s = ((this instanceof CountedCompleter) ? 
    // 当前 task 是一个 CountedCompleter, 尝试使用 common ForkJoinPool 去外部帮助完成, 并将完成状态返回.
             ForkJoinPool.common.externalHelpComplete((CountedCompleter<?>)this, 0) :
            // 当前 task 不是 CountedCompleter, 则调用 common pool 尝试外部弹出该任务并进行执行,
            //status 赋值 doExec 函数的结果, 若弹出失败 (其他线程先行弹出) 赋 0.
             ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
    if (s >= 0 && (s = status) >= 0) {// 检查上一步的结果, 即外部使用 common 池弹出并执行的结果(不是 CountedCompleter 的情况), 或外部尝试帮助 CountedCompleter 完成的结果
        //status 大于 0 表示尝试帮助完成失败.
        // 扰动标识, 初值 false
        boolean interrupted = false;
        do {
            // 循环尝试, 先给 status 标记 SIGNAL 标识, 便于后续唤醒操作.
            if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {synchronized (this) {if (status >= 0) {
                        try {
                            //CAS 成功, 进同步块发现 double check 未完成, 则等待.
                            wait(0L);
                        } catch (InterruptedException ie) {
                            // 若在等待过程中发生了扰动, 不停止等待, 标记扰动.
                            interrupted = true;
                        }
                    }
                    else
                        // 进同步块发现已完成, 则唤醒所有等待线程.
                        notifyAll();}
            }
        } while ((s = status) >= 0);// 循环条件,task 未完成.
        if (interrupted)
            // 循环结束, 若循环中间曾有扰动, 则中断当前线程.
            Thread.currentThread().interrupt();
    }
    // 返回 status
    return s;
}

externalAwaitDone 的逻辑不复杂, 在当前 task 为 ForkJoinPool.common 的情况下可以在外部进行等待和尝试帮助完成. 方法会首先根据 ForkJoinTask 的类型进行尝试帮助, 并返回当前的 status, 若发现未完成, 则进入下面的等待唤醒逻辑. 该方法的调用者为非 worker 线程.

相似的方法:externalInterruptibleAwaitDone

private int externalInterruptibleAwaitDone() throws InterruptedException {
    int s;
    // 不同于 externalAwaitDone, 入口处发现当前线程已中断, 则立即抛出中断异常.
    if (Thread.interrupted())
        throw new InterruptedException();
    if ((s = status) >= 0 &&
        (s = ((this instanceof CountedCompleter) ?
              ForkJoinPool.common.externalHelpComplete((CountedCompleter<?>)this, 0) :
              ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
              0)) >= 0) {while ((s = status) >= 0) {if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {synchronized (this) {if (status >= 0)
                        //wait 时也不 catch 中断异常, 发生即抛出.
                        wait(0L);
                    else
                        notifyAll();}
            }
        }
    }
    return s;
}

externalInterruptibleAwaitDone 的逻辑与 externalAwaitDone 相似, 只是对中断异常的态度为抛, 后者为 catch.

它们的使用点,externalAwaitDone 为 doJoin 或 doInvoke 方法调用,externalInterruptibleAwaitDone 为 get 方法调用, 很明显,join 操作不可扰动,get 则可以扰动.

下面来看看 doJoin 和 doInvoke

//join 的核心方法
private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    // 已完成, 返回 status, 未完成再尝试后续
    return (s = status) < 0 ? s :
        // 未完成, 当前线程是 ForkJoinWorkerThread, 从该线程中取出 workQueue, 并尝试将
        // 当前 task 出队然后执行, 执行的结果是完成则返回状态, 否则使用当线程池所在的 ForkJoinPool 的 awaitJoin 方法等待.
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        wt.pool.awaitJoin(w, this, 0L) :
        // 当前线程不是 ForkJoinWorkerThread, 调用前面说的 externalAwaitDone 方法.
        externalAwaitDone();}

//invoke 的核心方法
private int doInvoke() {
    int s; Thread t; ForkJoinWorkerThread wt;
    // 先尝试本线程执行, 不成功才走后续流程
    return (s = doExec()) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        // 与上一个方法基本相同, 但在当前线程是 ForkJoinWorkerThread 时不尝试将该 task 移除栈并执行, 而是等
        (wt = (ForkJoinWorkerThread)t).pool.
        awaitJoin(wt.workQueue, this, 0L) :
        externalAwaitDone();}

到此终于可以看一些公有对外方法了. 有了前面的基础, 再看 get,join,invoke 等方法非常简单.

//get 方法还有 get(long time)的变种.
public final V get() throws InterruptedException, ExecutionException {int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
        // 当前线程是 ForkJoinWorkerThread 则调用前面提过的 doJoin 方法.
        // 否则调用前述 externalInterruptibleAwaitDone
        doJoin() : externalInterruptibleAwaitDone();
    Throwable ex;
    if ((s &= DONE_MASK) == CANCELLED)
        // 异常处理, 取消的任务, 抛出 CancellationException.
        throw new CancellationException();
    if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
        // 异常处理, 调用 getThrowableException 获取异常, 封进 ExecutionException.
        throw new ExecutionException(ex);
    // 无异常处理, 返回原始结果.
    return getRawResult();}
//getRawResult 默认为一个抽象实现, 在 ForkJoinTask 中, 并未保存该结果的字段.
 public abstract V getRawResult();

//getThrowableException 方法
private Throwable getThrowableException() {
    // 不是异常标识, 直接返回 null, 从方法名的字面意思看, 要返回一个可抛出的异常.
    if ((status & DONE_MASK) != EXCEPTIONAL)
        return null;
    // 系统哈希码来定位 ExceptionNode
    int h = System.identityHashCode(this);
    ExceptionNode e;
    final ReentrantLock lock = exceptionTableLock;
    // 加异常表全局锁
    lock.lock();
    try {
        // 先清理已被回收的异常 node, 前面已述.
        expungeStaleExceptions();
        ExceptionNode[] t = exceptionTable;
        e = t[h & (t.length - 1)];
        // 循环找出 this 匹配的异常 node
        while (e != null && e.get() != this)
            e = e.next;
    } finally {lock.unlock();
    }
    Throwable ex;
    // 前面找不出异常 node 或异常 node 中存放的异常为 null, 则返回 null
    if (e == null || (ex = e.ex) == null)
        return null;
    if (e.thrower != Thread.currentThread().getId()) {
        // 不是当前线程抛出的异常.
        Class<? extends Throwable> ec = ex.getClass();
        try {
            Constructor<?> noArgCtor = null;// 该异常的无参构造器
            Constructor<?>[] cs = ec.getConstructors();// 该异常类公有构造器
            for (int i = 0; i < cs.length; ++i) {Constructor<?> c = cs[i];
                Class<?>[] ps = c.getParameterTypes();
                if (ps.length == 0)
                    // 构建器参数列表长度 0 说明存在无参构造器, 存放.
                    noArgCtor = c;
                else if (ps.length == 1 && ps[0] == Throwable.class) {
                    // 发现有参构造器且参数长度 1 且第一个参数类型是 Throwable, 说明可以存放 cause.
                    // 反射将前面取出的 ex 作为参数, 反射调用该构造器创建一个要抛出的 Throwable.
                    Throwable wx = (Throwable)c.newInstance(ex);
                    // 反射失败, 异常会被 catch, 返回 ex, 否则返回 wx.
                    return (wx == null) ? ex : wx;
                }
            }
            if (noArgCtor != null) {
                // 在尝试了寻找有参无参构造器, 并发现只存在无参构造器的情况, 用无参构造器初始化异常.
                Throwable wx = (Throwable)(noArgCtor.newInstance());
                if (wx != null) {
                    // 将 ex 设置为它的 cause 并返回它的实例.
                    wx.initCause(ex);
                    return wx;
                }
            }
        } catch (Exception ignore) {// 此方法不可抛出异常, 一定要成功返回.}
    }
    // 有参无参均未成功, 返回找到的异常.
    return ex;
}

//join 公有方法
public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        // 调用 doJoin 方法阻塞等待的结果不是 NORMAL, 说明有异常或取消. 报告异常.
        reportException(s);
    // 等于 NORMAL, 正常执行完毕, 返回原始结果.
    return getRawResult();}
// 报告异常, 可在前一步判断执行 status 是否为异常态, 然后获取并重抛异常.
private void reportException(int s) {
    // 参数 s 必须用 DONE_MASK 处理掉前 4 位以后的位.
    if (s == CANCELLED)
        // 传入的状态码等于取消, 抛出取消异常.
        throw new CancellationException();
    if (s == EXCEPTIONAL)
        // 使用前面的 getThrowableException 方法获取异常并重新抛出.
        rethrow(getThrowableException());
}

//invoke 公有方法.
public final V invoke() {
    int s;
    // 先尝试执行
    if ((s = doInvoke() & DONE_MASK) != NORMAL)
        //doInvoke 方法的结果 status 只保留完成态位表示非 NORMAL, 则报告异常.
        reportException(s);
    // 正常完成, 返回原始结果.
    return getRawResult();}

终于, 读到此处的读者将关键的方法线串了起来, 前述的所有内部方法, 常量和变量与公有接口的关系已经明了.

很显然,ForkJoinTask 是个抽象类, 且它并未保存任务的完成结果, 也不负责这个结果的处理, 但声明并约束了返回结果的抽象方法 getRawResult 供子类实现.

因此,ForkJoinTask 的自身关注任务的完成 / 异常 / 未完成, 子类关注这个结果的处理.

每当获取到任务的执行状态时,ForkJoinTask 可根据 status 来判断是否是异常 / 正常完成, 并进入相应的处理逻辑, 最终使用子类实现的方法完成一个闭环.

如果理解为将 ForkJoinTask 和子类的有关代码合并起来, 在结果 / 完成状态 / 异常信息这一块, 相当于同时有三个 part 在合作.

第一个 part:status 字段, 它同时表示了未完成 / 正常完成 / 取消 / 异常完成等状态, 也同时告诉有关等待线程是否要唤醒基本线程(每个线程等待前会设置 SIGNAL), 同时留出了后面 16 位对付其他情况.

第二个 part:result, 在 ForkJoinTask 见不到它, 也没有相应的字段, 子类也未必需要提供这个 result 字段, 前面提到的 CountedCompleter 就没有提供这个 result, 它的 getRawResult 会固定返回 null. 但是 CountedCompleter 可以继承子类并实现这个 result 的保存与返回(道格大神在注释中举出了若干典型代码例子), 在 JAVA8 中,stream api 中的并行流也会保存每一步的计算结果, 并对结果进行合并.

第三个 part: 异常. 在 ForkJoinTask 中已经完成了所有异常处理流程和执行流程的定义, 重点在于异常的存放, 它是由 ForkJoinTask 的类变量进行存放的, 结构为数组 + 链表, 且元素利用了弱引用, 借 gc 帮助清除掉已经被回收的 ExceptionNode, 显然在 gc 之前必须得到使用. 而异常随时可以发生并进行 record 入列, 但相应的能消费掉这个异常的只有相应的外部的 get,join,invoke 等方法或者内部扩展了 exec()等方式, 得到其他线程执行的 task 异常结果的情况. 巧妙的是, 只有外部调用者调用 (get,invoke,join) 时, 这个异常信息才足够重要, 需要 rethrow 出去并保存关键的堆栈信息; 而内部线程在访问一些非自身执行的任务时, 往往只需要 status 判断是否异常即可, 在 exec()中 fork 新任务的, 也往往必须立即 join 这些新的子任务, 这就保证了能够及时得到子任务中的异常堆栈(即使拿不到堆栈也知道它失败了).

经过前面的论述,ForkJoinTask 的执行和异常处理已经基本论结, 但是, 一个 ForkJoinTask 在创建之后是如何运行的? 显然, 它不是一个 Runnable, 也不是 Callable, 不能直接 submit 或 execute 到普通的线程池.

临时切换到 ForkJoinPool 的代码, 前面提到过,ForkJoinTask 的官方定义就是可以运行在 ForkJoinPool 中的 task.

//ForkJoinPool 代码,submit 一个 ForkJoinTask 到 ForkJoinPool, 并将该 task 自身返回.
// 拿到返回的 task, 我们就可以进行前述的 get 方法了.
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {if (task == null)
        throw new NullPointerException();
    externalPush(task);
    return task;
}
//execute, 不返回. 类似普通线程池提交一个 runnable 的行为.
public void execute(ForkJoinTask<?> task) {if (task == null)
        throw new NullPointerException();
    externalPush(task);
}

显然, 若要使用一个自建的 ForkJoinPool, 可以使用 execute 或 submit 函数提交入池, 然后用前述的 get 方法和变种方法进行. 这是一种运行 task 的方式.

前面论述过的 invoke 方法会先去先去尝试本地执行, 然后才去等待, 故我们自己 new 一个 ForkJoinTask, 一样可以通过 invoke 直接执行, 这是第二种运行 task 的方式.

前面论述的 join 方法在某种情况下也是一种 task 的运行方式, 在当前线程是 ForkJoinWorkerThread 时, 会去尝试将 task 出队并 doExec, 也就是会先用本线程执行一次, 不成功才干等, 非 ForkJoinWorkerThread 则直接干等了. 显然我们可以自己构建一个 ForkJoinWorkerThread 并去 join, 这时会将任务出队并执行 (但存在一个问题: 什么时候入队). 且出队后若未执行成功, 则 awaitJoin(参考 ForkJoinPool::awaitJoin), 此时因任务已出队, 不会被窃取或帮助(在 awaitJoin 中会有 helpStealer, 但其实任务是当前线程自己 ” 偷走 ” 了), 似乎完全要靠自己了. 但并不表示 ForkJoinTask 子类无法获取这个已出队的任务, 比如 CountedCompleter 使用时, 可以在 compute 中新生成的 Completer 时, 将源 CountedCompleter(ForkJoinTask 的子类) 作为新生成的 CountedCountedCompleter 的 completer(该子类中的一个字段), 这样, 若有一个 ForkJoinWorkerThread 窃取了这个新生成的 CountedCompleter, 可以通过 completer 链表找到先前被出队的 CountedCompleter(ForkJoinTask). 关于 CountedCompleter 单独文章详述.

除此之外呢? 包含前面提到的使用 join 操作不是 ForkJoinWorkerThread 调用的情况, 不使用 ForkJoinPool 的 submit execute 入池, 如何能让一个 ForkJoinTask 在将来执行? 我们来看后面的方法.

//fork 方法, 将当前任务入池.
public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        // 如果当前线程是 ForkJoinWorkerThread, 将任务压入该线程的任务队列.
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        // 否则调用 common 池的 externalPush 方法入队.
        ForkJoinPool.common.externalPush(this);
    return this;
}

显然, 我们还可以通过对一个 ForkJoinTask 进行 fork 方法入池, 入哪个池完全取决于当前线程的类型. 这是第四种让任务能被运行的方式.

同样, 我们也看到了第五种方式,ForkJoinPool.common 其实就是一个常量保存的 ForkJoinPool, 它能够调用 externalPush, 我们自然也可以直接 new 一个 ForkJoinPool, 然后将当前 task 进行 externalPush, 字面意思外部压入. 这种办法, 非 ForkJoinWorkerThread 也能将任务提交到非 common 的 ForkJoinPool.

从名字来看,ForkJoinTask 似乎已经说明了一切, 按照官方的注释也是如此. 对一个 task, 先 Fork 压队, 再 Join 等待执行结果, 这是一个 ForkJoinTask 的执行周期闭环(但不要简单理解为生命周期, 前面提到过, 任务可以被重新初始化, 而且重新初始化时还会清空 ExceptionNode 数组上的已回收成员).

到此为止,ForkJoinTask 的核心函数和 api 已经基本了然, 其它同类型的方法以及周边的方法均不难理解, 如 invokeAll 的各种变种. 下面来看一些 ” 周边 ” 类型的函数. 有前述的基础, 它们很好理解.

// 取消一个任务的执行, 直接将 status 设置成 CANCELLED, 设置后判断该 status 是否为 CANCELLED, 是则 true 否则 false.
public boolean cancel(boolean mayInterruptIfRunning) {return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
}

// 判断是否完成,status 小于 0 代表正常完成 / 异常完成 / 取消, 很好理解.
public final boolean isDone() {return status < 0;}

// 判断当前任务是否取消.
public final boolean isCancelled() {
    //status 前 4 位
    return (status & DONE_MASK) == CANCELLED;
}
public final boolean isCompletedAbnormally() {
    // 是否为异常完成, 前面说过,CANCELLED 和 EXCEPTIONAL 均小于 NORMAL
    return status < NORMAL;
}
// 是否正常完成.
public final boolean isCompletedNormally() {
    // 完成态位等于 NORMAL
    return (status & DONE_MASK) == NORMAL;
}
// 获取异常.
 public final Throwable getException() {
    int s = status & DONE_MASK;
    // 当为正常完成或未完成时, 返回 null.
    return ((s >= NORMAL)    ? null :
            // 是取消时, 新建一个取消异常.
            (s == CANCELLED) ? new CancellationException() :
            // 不是取消, 参考前面提到的 getThrowableException.
            getThrowableException());
}
// 使用异常完成任务.
 public void completeExceptionally(Throwable ex) {
    // 参考前述的 setExceptionalCompletion,
    //ex 已经是运行时异常或者 Error, 直接使用 ex 完成, 若是受检异常, 包装成运行时异常.
    setExceptionalCompletion((ex instanceof RuntimeException) ||
                             (ex instanceof Error) ? ex :
                            new RuntimeException(ex));
   }
// 使用 value 完成任务.
public void complete(V value) {
    try {
        // 设置原始结果, 它是一个空方法. 前面说过 ForkJoinTask 没有维护 result 之类的结果字段, 子类可自行发挥.
        setRawResult(value);
    } catch (Throwable rex) {
        // 前述步骤出现异常, 就用异常方式完成.
        setExceptionalCompletion(rex);
        return;
    }
    // 前面的结果执行完, 标记当前为完成.
    setCompletion(NORMAL);
}
// 安静完成任务. 直接用 NORMAL setCompletion, 没什么好说的.
public final void quietlyComplete() {setCompletion(NORMAL);
}
 /**
 * Joins this task, without returning its result or throwing its
 * exception. This method may be useful when processing
 * collections of tasks when some have been cancelled or otherwise
 * known to have aborted.
 */
// 安静 join, 它不会返回 result 也不会抛出异常. 处理集合任务时, 如果需要所有任务都被执行而不是一个执行出错 (取消) 其他也跟着出错的情况下,
// 很明显适用, 这不同于 invokeAll, 静态方法 invokeAll 或 invoke(ForkJoinTask,ForkJoinTask)会在任何一个任务出现异常后取消执行并抛出.
public final void quietlyJoin() {doJoin();
}

// 安静执行一次, 不返回结果不抛出异常, 没什么好说的.
public final void quietlyInvoke() {doInvoke();
}
// 重新初台化当前 task
public void reinitialize() {if ((status & DONE_MASK) == EXCEPTIONAL)
        // 如果当前任务是异常完成的, 清除异常. 该方法参考前面的论述.
        clearExceptionalCompletion();
    else
        // 否则重置 status 为 0.
        status = 0;
}
// 反 fork.
public boolean tryUnfork() {
    Thread t;
    // 当前线程是 ForkJoinWorkerThread, 从它的队列尝试移除.
    return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :
            // 当前线程不是 ForkJoinWorkerThread, 用 common 池外部移除.
            ForkJoinPool.common.tryExternalUnpush(this));
}

上面是一些简单的周边方法, 大多并不需要再论述了,unfork 方法很明显在某些场景下不会成功, 显然, 当一个任务刚刚入队并未进行后续操作时, 很可能成功. 按前面所述, 当对一个任务进行 join 时, 可能会成功的弹出当前任务并执行, 此时不可能再次弹出; 当一个任务被其他线程窃取会被它本身执行的也不会弹出.

再来看一些老朋友, 在前面的文章 ”CompletableFuture 和响应式编程 ” 一文中, 作者曾着重强调过它将每个要执行的动作进行压栈(未能立即执行的情况), 而栈中的元素 Completion 即是 ForkJoinTask 的子类, 而标记该 Completion 是否被 claim 的方法和周边方法如下:

// 获取 ForkJoinTask 的标记, 返回结果为 short 型
public final short getForkJoinTaskTag() {
    //status 的后 16 位
    return (short)status;
}


// 原子设置任务的标记位.
public final short setForkJoinTaskTag(short tag) {for (int s;;) {
        // 不停循环地尝试将 status 的后 16 位设置为 tag.
        if (U.compareAndSwapInt(this, STATUS, s = status,
                                // 替换的结果, 前 16 位为原 status 的前 16 位, 后 16 位为 tag.
                                (s & ~SMASK) | (tag & SMASK)))
            // 返回被换掉的 status 的后 16 位.
            return (short)s;
    }
}


// 循环尝试原子设置标记位为 tag, 前提是原来的标记位等于 e, 成功 true 失败 false
public final boolean compareAndSetForkJoinTaskTag(short e, short tag) {for (int s;;) {if ((short)(s = status) != e)
            // 如果某一次循环的原标记位不是 e, 则返回 false.
            return false;
        // 同上个方法
        if (U.compareAndSwapInt(this, STATUS, s,
                                (s & ~SMASK) | (tag & SMASK)))
            return true;
    }
}

还记得 CompletableFuture 在异步执行 Completion 时要先 claim 吗?claim 方法中, 会尝试设置这个标记位. 这是截止 jdk8 中 CompletableFuture 使用到 ForkJoinTask 的功能.

目前来看, 在 CompletableFuture 的内部实现 Completion 还没有使用到 ForkJoinTask 的其他属性, 比如放入一个 ForkJoinPool 执行(没有任何前面总结的调用, 比如用 ForkJoinPool 的 push,execute,submit 等, 也没有 fork 到 common 池). 但是很明显, 道格大神令它继承自 ForkJoinTask 不可能纯粹只为了使用区区一个标记位, 试想一下, 在如此友好支持响应式编程的 CompletableFuture 中传入的每一个 action 都可以生成若干新的 action, 那么 CompletableFuture 负责将这些 action 封装成 Completion 放入 ForkJoinPool 执行, 将最大化利用到 ForkJoin 框架的工作窃取和外部帮助的功效, 强力结合分治思想, 这将是多么优雅的设计. 或者在 jdk9-12 中已经出现了相应的 Completion 实现(尽管作者写过 JAVA9-12, 遗憾的是也没有去翻它们的源码).

另外, 尽管 Completion 的众多子类也没有 result 之类的表示结果的字段, 但它的一些子类通过封装, 实际上间接地将这个 Completion 所引用的 dep 的 result 作为了自己的 ”result”, 当然,getRawResult 依旧是 null, 但是理念却是相通的.

以上是 ForkJoinTask 的部分核心源码, 除了上述的源码外, 还有一些同属于 ForkJoinTask 的核心源码部分, 比如其他的 public 方法(参考 join fork invoke 即可), 一些利用 ForkJoinPool 的实现, 要深入了解 ForkJoinPool 才能了解的方法, 一些不太难的静态方法等, 这些没有必要论述了.

除了核心源码外,ForkJoinTask 也提供了对 Runnable,Callable 的适配器实现, 这块很好理解, 简单看一看.

// 对 Runnable 的实现, 如果在 ForkJoinPool 中提交一个 runnable, 会用它封装成 ForkJoinTask
static final class AdaptedRunnable<T> extends ForkJoinTask<T>
    implements RunnableFuture<T> {
    final Runnable runnable;
    T result;
    AdaptedRunnable(Runnable runnable, T result) {
        // 不能没有 runnable
        if (runnable == null) throw new NullPointerException();
        this.runnable = runnable;
        // 对 runnable 做适配器时, 可以提交将结果传入, 并设置为当前 ForkJoinTask 子类的 result.
        // 前面说过,ForkJoinTask 不以 result 作为完成标记, 判断一个任务是否完成或异常, 使用 status 足以,
        // 返回的结果才使用 result.
        this.result = result; 
    }
    public final T getRawResult() { return result;}
    public final void setRawResult(T v) {result = v;}
    // 前面说过提交入池的 ForkJoinTask 最终会运行 doExec, 而它会调用 exec, 此处会调用 run.
    public final boolean exec() { runnable.run(); return true; }
    public final void run() { invoke(); }
    private static final long serialVersionUID = 5232453952276885070L;// 序列化用
}

// 无结果的 runnable 适配器
static final class AdaptedRunnableAction extends ForkJoinTask<Void>
    implements RunnableFuture<Void> {
    final Runnable runnable;
    AdaptedRunnableAction(Runnable runnable) {if (runnable == null) throw new NullPointerException();
        this.runnable = runnable;
    }
    // 区别就是 result 固定为 null, 也不能 set
    public final Void getRawResult() { return null;}
    public final void setRawResult(Void v) { }
    public final boolean exec() { runnable.run(); return true; }
    public final void run() { invoke(); }
    private static final long serialVersionUID = 5232453952276885070L;
}


// 对 runnable 的适配器, 但强制池中的工作线程在执行任务发现异常时抛出
static final class RunnableExecuteAction extends ForkJoinTask<Void> {
    final Runnable runnable;
    RunnableExecuteAction(Runnable runnable) {if (runnable == null) throw new NullPointerException();
        this.runnable = runnable;
    }
    // 默认 null 结果,set 也是空实现
    public final Void getRawResult() { return null;}
    public final void setRawResult(Void v) { }
    public final boolean exec() { runnable.run(); return true; }
    void internalPropagateException(Throwable ex) {
        // 前面说过 doExec 会被执行, 它会调 exec 并 catch, 在 catch 块中设置当前任务为异常完成态,
        // 然后调用 internalPropagateException 方法, 而在 ForkJoinTask 中默认为空实现.
        // 此处将异常重新抛出, 将造成 worker 线程抛出异常.
        rethrow(ex);
    }
    private static final long serialVersionUID = 5232453952276885070L;
}


// 对 callable 的适配器, 当将 callable 提交至 ForkJoinPool 时使用.
static final class AdaptedCallable<T> extends ForkJoinTask<T>
    implements RunnableFuture<T> {
    final Callable<? extends T> callable;
    T result;
    AdaptedCallable(Callable<? extends T> callable) {if (callable == null) throw new NullPointerException();
        this.callable = callable;
    }
    // 字段中有一个 result, 直接使用它返回.
    public final T getRawResult() { return result;}
    //result 可外部直接设置.
    public final void setRawResult(T v) {result = v;}
    public final boolean exec() {
        try {
            // 默认的 result 用 call 函数设置.
            result = callable.call();
            return true;
        
        } catch (Error err) {
            //catch 住 Error, 抛出
            throw err;
        } catch (RuntimeException rex) {
            //catch 住运行时异常, 抛出
            throw rex;
        } catch (Exception ex) {
            //catch 住受检异常, 包装成运行时异常抛出.
            throw new RuntimeException(ex);
        }
    }
    //run 方法一样只是调用 invoke, 进而调用 doExec.
    public final void run() { invoke(); }
    private static final long serialVersionUID = 2838392045355241008L;
}

//runnable 生成适配器的工具方法
public static ForkJoinTask<?> adapt(Runnable runnable) {return new AdaptedRunnableAction(runnable);
}

// 指定结果设置 runnable 的适配器工具方法
public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {return new AdaptedRunnable<T>(runnable, result);
}

// 对 callable 生成适配器的方法.
public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {return new AdaptedCallable<T>(callable);
}

以上的代码都不复杂, 只要熟悉了 ForkJoinTask 的本身代码结构, 对于这一块了解非常容易, 这也间接说明了 ForkJoinPool 中是如何处理 Runnable 和 Callable 的(因为 ForkJoinPool 本身也是一种线程池, 可以接受提交 Callable 和 Runnable).

将 runnable 提交到 pool 时, 可以指定 result, 也可以不指定, 也可以用 submit 或 execute 方法区分异常处理行为,ForkJoinPool 会自行选择相应的适配器.

将 callable 提交到 pool 时,pool 会选择对 callable 的适配器, 它的结果将为 task 的结果, 它的异常将为 task 的异常.

到此为止,ForkJoinTask 的源码分析完成.

后语

本文详细分析了 ForkJoinTask 的源码, 并解释了前文 CompletableFuture 中 Completion 与它的关联, 以及分析了 Completion 继承自 ForkJoinTask 目前已带来的功能利用 (tag) 和将来可能增加的功用(一个 Completion 产生若干多个 Completion 并在 ForkJoinPool 中运行, 还支持工作窃取).

同时本文也对 ForkJoinPool 和 ForkJoinWorkerThread, 以及 CountedCompleter 和 Stream api 中的并行流进行了略微的描述.

在文章的最后, 或许有一些新手读者会好奇, 我们究竟什么时候会使用 ForkJoinTask?

首先, 如果你在项目中大肆使用了流式计算, 并使用了并行流, 那么你已经在使用了.

前面提过, 官方解释 ForkJoinTask 可以视作比线程轻量许多的实体, 也是轻量的 Future. 结合在源码中时不时出来秀存在感的 ForkJoinWorkerThread, 显然它就是据说比普通线程轻量一些的线程, 在前面的源码中可以看出, 它维护了一组任务的队列, 每个线程负责完成队列中的任务, 也可以偷其他线程的任务, 甚至池外的线程都可以时不时地来个 join, 顺便帮助出队执行任务.

显然, 对于重计算, 轻 io, 轻阻塞的任务, 适合使用 ForkJoinPool, 也就使用了 ForkJoinTask, 你不会认为它可以提交 runnable 和 callable, 就可以不用 ForkJoinTask 了吧? 前面的适配器 ForkJoinPool 在这种情况下必用的, 可以去翻相应的源码.

本章没有去详述 CountedCompleter, 但前面论述时说过, 你可以在 exec()中将一个计算复杂的任务拆解为小的子任务, 然后将子任务入池执行, 父任务合并子任务的结果. 这种分治的算法此前基本是在单线程模式下运行, 使用 ForkJoinTask, 则可以将这种计算交给一个 ForkJoinPool 中的所有线程并行执行.

正文完
 0