前言

在前面的文章"CompletableFuture和响应式编程"中提到了ForkJoinTask和ForkJoinPool,后者毫无疑问是一个线程池,前者则是一个类似FutureTask经典定义的概念.

官方有一个非常无语的解释:ForkJoinTask就是运行在ForkJoinPool的一个任务抽象,ForkJoinPool就是运行ForkJoinTask的线程池.

ForkJoin框架包含ForkJoinTask,ForkJoinWorkerThread,ForkJoinPool和若干ForkJoinTask的子类,它的核心在于分治和工作窍取,最大程度利用线程池中的工作线程,避免忙的忙死,饿的饿死.

ForkJoinTask可以理解为类线程但比线程轻量的实体,在ForkJoinPool中运行的少量ForkJoinWorkerThread可以持有大量的ForkJoinTask和它的子任务.ForkJoinTask同时也是一个轻量的Future,使用时应避免较长阻塞和io.

ForkJoinTask在JAVA8中应用广泛,但它是一个抽象类,它的子类派生了各种用途,如后续计划单独介绍的CountedCompleter,以及若干JAVA8中stream api定义的与并行流有关的各种操作(ops).

源码

首先看ForkJoinTask的签名.

public abstract class ForkJoinTask<V> implements Future<V>, Serializable 

从签名上看,ForkJoinTask实现了future,也可以序列化,但它不是一个Runnable或Callable.

ForkJoinTask虽然可以序列化,但它只对运行前和后敏感,对于执行过程中不敏感.

先来看task的运行字段:

//volatie修饰的任务状态值,由ForkJoinPool或工作线程修改.volatile int status; static final int DONE_MASK   = 0xf0000000;//用于屏蔽完成状态位. static final int NORMAL      = 0xf0000000;//表示正常完成,是负值.static final int CANCELLED   = 0xc0000000;//表示被取消,负值,且小于NORMALstatic final int EXCEPTIONAL = 0x80000000;//异常完成,负值,且小于CANCELLEDstatic final int SIGNAL      = 0x00010000;//用于signal,必须不小于1<<16,默认为1<<16.static final int SMASK       = 0x0000ffff;//后十六位的task标签

很显然,DONE_MASK能够过滤掉所有非NORMAL,非CANCELLED,非EXCEPTIONAL的状态,字段的含义也很直白,后面的SIGNAL和SMASK还不明确,后面再看.

//标记当前task的completion状态,同时根据情况唤醒等待该task的线程.private int setCompletion(int completion) {    for (int s;;) {        //开启一个循环,如果当前task的status已经是各种完成(小于0),则直接返回status,这个status可能是某一次循环前被其他线程完成.        if ((s = status) < 0)            return s;        //尝试将原来的status设置为它与completion按位或的结果.        if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {            if ((s >>> 16) != 0)                //此处体现了SIGNAL的标记作用,很明显,只要task完成(包含取消或异常),或completion传入的值不小于1<<16,                //就可以起到唤醒其他线程的作用.                synchronized (this) { notifyAll(); }            //cas成功,返回参数中的completion.            return completion;        }    }}

前面用注释解释了这个方法的逻辑,显然该方法是阻塞的,如果传入的参数不能将status设置为负值会如何?

显然,可能会有至多一次的成功cas,并且若满足唤醒的条件,会尝试去唤醒线程,甚至可能因为为了唤醒其他线程而被阻塞在synchonized代码块外;也可能没有一次成功的cas,直到其他线程成功将status置为完成.

//final修饰,运行ForkJoinTask的核心方法.final int doExec() {    int s; boolean completed;    //仅未完成的任务会运行,其他情况会忽略.    if ((s = status) >= 0) {        try {            //调用exec            completed = exec();        } catch (Throwable rex) {            //发生异常,用setExceptionalCompletion设置结果            return setExceptionalCompletion(rex);        }        if (completed)            //正常完成,调用前面说过的setCompletion,参数为normal,并将返回值作为结果s.            s = setCompletion(NORMAL);    }    //返回s    return s;}//记录异常并且在符合条件时传播异常行为private int setExceptionalCompletion(Throwable ex) {    //首先记录异常信息到结果    int s = recordExceptionalCompletion(ex);    if ((s & DONE_MASK) == EXCEPTIONAL)        //status去除非完成态标志位(只保留前4位),等于EXCEPTIONAL.内部传播异常        internalPropagateException(ex);    return s;}//internalPropagateException方法是一个空方法,留给子类实现,可用于completer之间的异常传递void internalPropagateException(Throwable ex) {}//记录异常完成final int recordExceptionalCompletion(Throwable ex) {    int s;    if ((s = status) >= 0) {        //只能是异常态的status可以记录.        //hash值禁止重写,不使用子类的hashcode函数.        int h = System.identityHashCode(this);        final ReentrantLock lock = exceptionTableLock;        //异常锁,加锁        lock.lock();        try {            //抹除脏异常,后面叙述            expungeStaleExceptions();            //异常表数组.ExceptionNode后面叙述.            ExceptionNode[] t = exceptionTable;//exceptionTable是一个全局的静态常量,后面叙述            //用hash值和数组长度进行与运算求一个初始的索引            int i = h & (t.length - 1);            for (ExceptionNode e = t[i]; ; e = e.next) {                //找到空的索引位,就创建一个新的ExceptionNode,保存this,异常对象并退出循环                if (e == null) {                    t[i] = new ExceptionNode(this, ex, t[i]);//(1)                    break;                }                if (e.get() == this) //已设置在相同的索引位置的链表中,退出循环.//2                    break;            //否则e指向t[i]的next,进入下个循环,直到发现判断包装this这个ForkJoinTask的ExceptionNode已经出现在t[i]这个链表并break(2),            //或者直到e是null,意味着t[i]出发开始的链表并无包装this的ExceptionNode,则将构建一个新的ExceptionNode并置换t[i],            //将原t[i]置为它的next(1).整个遍历判断和置换过程处在锁中进行.            }        } finally {            lock.unlock();        }        //记录成功,将当前task设置为异常完成.        s = setCompletion(EXCEPTIONAL);    }    return s;}//exceptionTable声明private static final ExceptionNode[] exceptionTable;//全局异常node表private static final ReentrantLock exceptionTableLock;//上面用到的锁,就是一个普通的可重入锁.private static final ReferenceQueue<Object> exceptionTableRefQueue;//变量表引用队列,后面详述.private static final int EXCEPTION_MAP_CAPACITY = 32;//异常表的固定容量,不大,只有32而且是全局的.//初始化在一个静态代码块.static {    exceptionTableLock = new ReentrantLock();    exceptionTableRefQueue = new ReferenceQueue<Object>();    exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY];//容量    try {        U = sun.misc.Unsafe.getUnsafe();        Class<?> k = ForkJoinTask.class;        STATUS = U.objectFieldOffset            (k.getDeclaredField("status"));    } catch (Exception e) {        throw new Error(e);    }}//先来看ExceptionNode内部类的实现//签名,实现了一个ForkJoinTask的弱引用.static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {    final Throwable ex;    ExceptionNode next;    final long thrower;  // use id not ref to avoid weak cycles    final int hashCode;  // store task hashCode before weak ref disappears    ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {        super(task, exceptionTableRefQueue);//指向弱引用的构造函数,保存引用为task,队列为全局的exceptionTableRefQueue.        this.ex = ex;//抛出的异常的引用        this.next = next;//数组中的ExceptionNode以链表形式存在,前面分析过,先入者为后入者的next        this.thrower = Thread.currentThread().getId();//保存抛出异常的线程id(严格来说是创建了this的线程)        this.hashCode = System.identityHashCode(task);//哈希码保存关联task的哈希值.    }}//清除掉异常表中的脏数据,仅在持有全局锁时才可使用.前面看到在记录新的异常信息时要进行一次清除尝试private static void expungeStaleExceptions() {    //循环条件,全局exceptionTableRefQueue队列不为空,前面说过ExceptionNode是弱引用,当它被回收时会被放入此队列.    for (Object x; (x = exceptionTableRefQueue.poll()) != null;) {        //从队首依次取出元素.        if (x instanceof ExceptionNode) {            //计算在全局exceptionTable中的索引.            int hashCode = ((ExceptionNode)x).hashCode;            ExceptionNode[] t = exceptionTable;            int i = hashCode & (t.length - 1);            //取出node            ExceptionNode e = t[i];            ExceptionNode pred = null;            //不停遍历,直到e是null为止.            while (e != null) {                //e的next                ExceptionNode next = e.next;//2                //x是队首出队的元素.它与e相等说明找到                if (e == x) {                    //e是一个链表的元素,pred表示它是否有前置元素                    if (pred == null)                        //无前置元素,说明e在链表首部,直接将首部元素指向next即可.                        t[i] = next;                    else                        //有前置元素,说明循环过若干次,将当前e出链表                        pred.next = next;                    //在链表中发现x即break掉内循环,继续从exceptionTableRefQueue的队首弹出新的元素.                    break;                }                //只要发现当前e不是x,准备下一次循环,pred指向e.e指向next,进行下一个元素的比较.                pred = e;                e = next;            }        }    }}

到此doExec(也是每个ForkJoinTask的执行核心过程)就此结束.

很明显,ForkJoinTask的doExec负责了核心的执行,它留下了exec方法给子类实现,而重点负责了后面出现异常情况的处理.处理的逻辑前面已论述,在产生异常时尝试将异常存放在全局的execptionTable中,存放的结构为数组+链表,按哈希值指定索引,每次存放新的异常时,顺便清理上一次已被gc回收的ExceptionNode.所有ForkJoinTask共享了一个exceptionTable,因此必然在有关的几个环节要进行及时的清理.除了刚刚论述的过程,还有如下的几处:

前面论述了recordExceptionalCompletion,一共有四处使用了expungeStaleException,将已回收的ExceptionNode从引用队列中清除.

clearExceptionalCompletion在对一个ForkJoinTask重新初始化时使用,我们在前面提到序列化时说过,ForkJoinTask的序列化结果只保留了两种情况:运行前,运行结束.重新初始化一个ForkJoinTask,就要去除任何中间状态,包含自身产出的已被回收的异常node,而expungeStaleExceptions显然也顺便帮助其他task清除.

getThrowableException是查询task运行结果时调用,如一些get/join方法,很明显,记录这个异常的作用就在于返回给get/join,在这一块顺便清理已被回收的node,尤其是将自己运行时生成的node清除.

helpExpungeStaleExceptions是提供给ForkJoinPool在卸载worker时使用,顺便帮助清理全局异常表.

使用它们的方法稍后再论述,先来继续看ForkJoinTask的源码.

//内部等待任务完成,直到完成或超时.final void internalWait(long timeout) {    int s;    //status小于0代表已完成,直接忽略wait.    //未完成,则试着加上SIGNAL的标记,令完成任务的线程唤醒这个等待.    if ((s = status) >= 0 &&         U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {        //加锁,只有一个线程可以进入.        synchronized (this) {            //再次判断未完成.等待timeout,且忽略扰动异常.            if (status >= 0)                try { wait(timeout); } catch (InterruptedException ie) { }            else                //已完成则响醒其他等待者.                notifyAll();        }    }}

internalWait方法逻辑很简单,首先判断是否未完成,满足未完成,则将标记位加上SIGNAL(可能已有别的线程做过),随后加锁double check status,还未完成则等待并释放锁,若发现已完成,或在后续被唤醒后发现已完成,则唤醒其他等待线程.通过notifyAll的方式避免了通知丢失.

同时,它的使用方法目前只有一个ForkJoinPool::awaitJoin,在该方法中使用循环的方式进行internalWait,满足了每次按截止时间或周期进行等待,同时也顺便解决了虚假唤醒.

继续看externalAwaitDone函数.它体现了ForkJoin框架的一个核心:外部帮助.

//外部线程等待一个common池中的任务完成.private int externalAwaitDone() {    int s = ((this instanceof CountedCompleter) ?     //当前task是一个CountedCompleter,尝试使用common ForkJoinPool去外部帮助完成,并将完成状态返回.             ForkJoinPool.common.externalHelpComplete(                 (CountedCompleter<?>)this, 0) :            //当前task不是CountedCompleter,则调用common pool尝试外部弹出该任务并进行执行,            //status赋值doExec函数的结果,若弹出失败(其他线程先行弹出)赋0.             ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);    if (s >= 0 && (s = status) >= 0) {        //检查上一步的结果,即外部使用common池弹出并执行的结果(不是CountedCompleter的情况),或外部尝试帮助CountedCompleter完成的结果        //status大于0表示尝试帮助完成失败.        //扰动标识,初值false        boolean interrupted = false;        do {            //循环尝试,先给status标记SIGNAL标识,便于后续唤醒操作.            if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {                synchronized (this) {                    if (status >= 0) {                        try {                            //CAS成功,进同步块发现double check未完成,则等待.                            wait(0L);                        } catch (InterruptedException ie) {                            //若在等待过程中发生了扰动,不停止等待,标记扰动.                            interrupted = true;                        }                    }                    else                        //进同步块发现已完成,则唤醒所有等待线程.                        notifyAll();                }            }        } while ((s = status) >= 0);//循环条件,task未完成.        if (interrupted)            //循环结束,若循环中间曾有扰动,则中断当前线程.            Thread.currentThread().interrupt();    }    //返回status    return s;}

externalAwaitDone的逻辑不复杂,在当前task为ForkJoinPool.common的情况下可以在外部进行等待和尝试帮助完成.方法会首先根据ForkJoinTask的类型进行尝试帮助,并返回当前的status,若发现未完成,则进入下面的等待唤醒逻辑.该方法的调用者为非worker线程.

相似的方法:externalInterruptibleAwaitDone

private int externalInterruptibleAwaitDone() throws InterruptedException {    int s;    //不同于externalAwaitDone,入口处发现当前线程已中断,则立即抛出中断异常.    if (Thread.interrupted())        throw new InterruptedException();    if ((s = status) >= 0 &&        (s = ((this instanceof CountedCompleter) ?              ForkJoinPool.common.externalHelpComplete(                  (CountedCompleter<?>)this, 0) :              ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :              0)) >= 0) {        while ((s = status) >= 0) {            if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {                synchronized (this) {                    if (status >= 0)                        //wait时也不catch中断异常,发生即抛出.                        wait(0L);                    else                        notifyAll();                }            }        }    }    return s;}

externalInterruptibleAwaitDone的逻辑与externalAwaitDone相似,只是对中断异常的态度为抛,后者为catch.

它们的使用点,externalAwaitDone为doJoin或doInvoke方法调用,externalInterruptibleAwaitDone为get方法调用,很明显,join操作不可扰动,get则可以扰动.

下面来看看doJoin和doInvoke

//join的核心方法private int doJoin() {    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;    //已完成,返回status,未完成再尝试后续    return (s = status) < 0 ? s :        //未完成,当前线程是ForkJoinWorkerThread,从该线程中取出workQueue,并尝试将        //当前task出队然后执行,执行的结果是完成则返回状态,否则使用当线程池所在的ForkJoinPool的awaitJoin方法等待.        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?        (w = (wt = (ForkJoinWorkerThread)t).workQueue).        tryUnpush(this) && (s = doExec()) < 0 ? s :        wt.pool.awaitJoin(w, this, 0L) :        //当前线程不是ForkJoinWorkerThread,调用前面说的externalAwaitDone方法.        externalAwaitDone();}//invoke的核心方法private int doInvoke() {    int s; Thread t; ForkJoinWorkerThread wt;    //先尝试本线程执行,不成功才走后续流程    return (s = doExec()) < 0 ? s :        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?        //与上一个方法基本相同,但在当前线程是ForkJoinWorkerThread时不尝试将该task移除栈并执行,而是等        (wt = (ForkJoinWorkerThread)t).pool.        awaitJoin(wt.workQueue, this, 0L) :        externalAwaitDone();}

到此终于可以看一些公有对外方法了.有了前面的基础,再看get,join,invoke等方法非常简单.

//get方法还有get(long time)的变种.public final V get() throws InterruptedException, ExecutionException {    int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?        //当前线程是ForkJoinWorkerThread则调用前面提过的doJoin方法.        //否则调用前述externalInterruptibleAwaitDone        doJoin() : externalInterruptibleAwaitDone();    Throwable ex;    if ((s &= DONE_MASK) == CANCELLED)        //异常处理,取消的任务,抛出CancellationException.        throw new CancellationException();    if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)        //异常处理,调用getThrowableException获取异常,封进ExecutionException.        throw new ExecutionException(ex);    //无异常处理,返回原始结果.    return getRawResult();}//getRawResult默认为一个抽象实现,在ForkJoinTask中,并未保存该结果的字段. public abstract V getRawResult();//getThrowableException方法private Throwable getThrowableException() {    //不是异常标识,直接返回null,从方法名的字面意思看,要返回一个可抛出的异常.    if ((status & DONE_MASK) != EXCEPTIONAL)        return null;    //系统哈希码来定位ExceptionNode    int h = System.identityHashCode(this);    ExceptionNode e;    final ReentrantLock lock = exceptionTableLock;    //加异常表全局锁    lock.lock();    try {        //先清理已被回收的异常node,前面已述.        expungeStaleExceptions();        ExceptionNode[] t = exceptionTable;        e = t[h & (t.length - 1)];        //循环找出this匹配的异常node        while (e != null && e.get() != this)            e = e.next;    } finally {        lock.unlock();    }    Throwable ex;    //前面找不出异常node或异常node中存放的异常为null,则返回null    if (e == null || (ex = e.ex) == null)        return null;    if (e.thrower != Thread.currentThread().getId()) {        //不是当前线程抛出的异常.        Class<? extends Throwable> ec = ex.getClass();        try {            Constructor<?> noArgCtor = null;//该异常的无参构造器            Constructor<?>[] cs = ec.getConstructors();//该异常类公有构造器            for (int i = 0; i < cs.length; ++i) {                Constructor<?> c = cs[i];                Class<?>[] ps = c.getParameterTypes();                if (ps.length == 0)                    //构建器参数列表长度0说明存在无参构造器,存放.                    noArgCtor = c;                else if (ps.length == 1 && ps[0] == Throwable.class) {                    //发现有参构造器且参数长度1且第一个参数类型是Throwable,说明可以存放cause.                    //反射将前面取出的ex作为参数,反射调用该构造器创建一个要抛出的Throwable.                    Throwable wx = (Throwable)c.newInstance(ex);                    //反射失败,异常会被catch,返回ex,否则返回wx.                    return (wx == null) ? ex : wx;                }            }            if (noArgCtor != null) {                //在尝试了寻找有参无参构造器,并发现只存在无参构造器的情况,用无参构造器初始化异常.                Throwable wx = (Throwable)(noArgCtor.newInstance());                if (wx != null) {                    //将ex设置为它的cause并返回它的实例.                    wx.initCause(ex);                    return wx;                }            }        } catch (Exception ignore) {            //此方法不可抛出异常,一定要成功返回.        }    }    //有参无参均未成功,返回找到的异常.    return ex;}//join公有方法public final V join() {    int s;    if ((s = doJoin() & DONE_MASK) != NORMAL)        //调用doJoin方法阻塞等待的结果不是NORMAL,说明有异常或取消.报告异常.        reportException(s);    //等于NORMAL,正常执行完毕,返回原始结果.    return getRawResult();}//报告异常,可在前一步判断执行status是否为异常态,然后获取并重抛异常.private void reportException(int s) {    //参数s必须用DONE_MASK处理掉前4位以后的位.    if (s == CANCELLED)        //传入的状态码等于取消,抛出取消异常.        throw new CancellationException();    if (s == EXCEPTIONAL)        //使用前面的getThrowableException方法获取异常并重新抛出.        rethrow(getThrowableException());}//invoke公有方法.public final V invoke() {    int s;    //先尝试执行    if ((s = doInvoke() & DONE_MASK) != NORMAL)        //doInvoke方法的结果status只保留完成态位表示非NORMAL,则报告异常.        reportException(s);    //正常完成,返回原始结果.    return getRawResult();}

终于,读到此处的读者将关键的方法线串了起来,前述的所有内部方法,常量和变量与公有接口的关系已经明了.

很显然,ForkJoinTask是个抽象类,且它并未保存任务的完成结果,也不负责这个结果的处理,但声明并约束了返回结果的抽象方法getRawResult供子类实现.

因此,ForkJoinTask的自身关注任务的完成/异常/未完成,子类关注这个结果的处理.

每当获取到任务的执行状态时,ForkJoinTask可根据status来判断是否是异常/正常完成,并进入相应的处理逻辑,最终使用子类实现的方法完成一个闭环.

如果理解为将ForkJoinTask和子类的有关代码合并起来,在结果/完成状态/异常信息这一块,相当于同时有三个part在合作.

第一个part:status字段,它同时表示了未完成/正常完成/取消/异常完成等状态,也同时告诉有关等待线程是否要唤醒基本线程(每个线程等待前会设置SIGNAL),同时留出了后面16位对付其他情况.

第二个part:result,在ForkJoinTask见不到它,也没有相应的字段,子类也未必需要提供这个result字段,前面提到的CountedCompleter就没有提供这个result,它的getRawResult会固定返回null.但是CountedCompleter可以继承子类并实现这个result的保存与返回(道格大神在注释中举出了若干典型代码例子),在JAVA8中,stream api中的并行流也会保存每一步的计算结果,并对结果进行合并.

第三个part:异常.在ForkJoinTask中已经完成了所有异常处理流程和执行流程的定义,重点在于异常的存放,它是由ForkJoinTask的类变量进行存放的,结构为数组+链表,且元素利用了弱引用,借gc帮助清除掉已经被回收的ExceptionNode,显然在gc之前必须得到使用.而异常随时可以发生并进行record入列,但相应的能消费掉这个异常的只有相应的外部的get,join,invoke等方法或者内部扩展了exec()等方式,得到其他线程执行的task异常结果的情况.巧妙的是,只有外部调用者调用(get,invoke,join)时,这个异常信息才足够重要,需要rethrow出去并保存关键的堆栈信息;而内部线程在访问一些非自身执行的任务时,往往只需要status判断是否异常即可,在exec()中fork新任务的,也往往必须立即join这些新的子任务,这就保证了能够及时得到子任务中的异常堆栈(即使拿不到堆栈也知道它失败了).

经过前面的论述,ForkJoinTask的执行和异常处理已经基本论结,但是,一个ForkJoinTask在创建之后是如何运行的?显然,它不是一个Runnable,也不是Callable,不能直接submit或execute到普通的线程池.

临时切换到ForkJoinPool的代码,前面提到过,ForkJoinTask的官方定义就是可以运行在ForkJoinPool中的task.

//ForkJoinPool代码,submit一个ForkJoinTask到ForkJoinPool,并将该task自身返回.//拿到返回的task,我们就可以进行前述的get方法了.public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {    if (task == null)        throw new NullPointerException();    externalPush(task);    return task;}//execute,不返回.类似普通线程池提交一个runnable的行为.public void execute(ForkJoinTask<?> task) {    if (task == null)        throw new NullPointerException();    externalPush(task);}

显然,若要使用一个自建的ForkJoinPool,可以使用execute或submit函数提交入池,然后用前述的get方法和变种方法进行.这是一种运行task的方式.

前面论述过的invoke方法会先去先去尝试本地执行,然后才去等待,故我们自己new一个ForkJoinTask,一样可以通过invoke直接执行,这是第二种运行task的方式.

前面论述的join方法在某种情况下也是一种task的运行方式,在当前线程是ForkJoinWorkerThread时,会去尝试将task出队并doExec,也就是会先用本线程执行一次,不成功才干等,非ForkJoinWorkerThread则直接干等了.显然我们可以自己构建一个ForkJoinWorkerThread并去join,这时会将任务出队并执行(但存在一个问题:什么时候入队).且出队后若未执行成功,则awaitJoin(参考ForkJoinPool::awaitJoin),此时因任务已出队,不会被窃取或帮助(在awaitJoin中会有helpStealer,但其实任务是当前线程自己"偷走"了),似乎完全要靠自己了.但并不表示ForkJoinTask子类无法获取这个已出队的任务,比如CountedCompleter使用时,可以在compute中新生成的Completer时,将源CountedCompleter(ForkJoinTask的子类)作为新生成的CountedCountedCompleter的completer(该子类中的一个字段),这样,若有一个ForkJoinWorkerThread窃取了这个新生成的CountedCompleter,可以通过completer链表找到先前被出队的CountedCompleter(ForkJoinTask).关于CountedCompleter单独文章详述.

除此之外呢?包含前面提到的使用join操作不是ForkJoinWorkerThread调用的情况,不使用ForkJoinPool的submit execute入池,如何能让一个ForkJoinTask在将来执行?我们来看后面的方法.

//fork方法,将当前任务入池.public final ForkJoinTask<V> fork() {    Thread t;    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)        //如果当前线程是ForkJoinWorkerThread,将任务压入该线程的任务队列.        ((ForkJoinWorkerThread)t).workQueue.push(this);    else        //否则调用common池的externalPush方法入队.        ForkJoinPool.common.externalPush(this);    return this;}

显然,我们还可以通过对一个ForkJoinTask进行fork方法入池,入哪个池完全取决于当前线程的类型.这是第四种让任务能被运行的方式.

同样,我们也看到了第五种方式,ForkJoinPool.common其实就是一个常量保存的ForkJoinPool,它能够调用externalPush,我们自然也可以直接new一个ForkJoinPool,然后将当前task进行externalPush,字面意思外部压入.这种办法,非ForkJoinWorkerThread也能将任务提交到非common的ForkJoinPool.

从名字来看,ForkJoinTask似乎已经说明了一切,按照官方的注释也是如此.对一个task,先Fork压队,再Join等待执行结果,这是一个ForkJoinTask的执行周期闭环(但不要简单理解为生命周期,前面提到过,任务可以被重新初始化,而且重新初始化时还会清空ExceptionNode数组上的已回收成员).

到此为止,ForkJoinTask的核心函数和api已经基本了然,其它同类型的方法以及周边的方法均不难理解,如invokeAll的各种变种.下面来看一些"周边"类型的函数.有前述的基础,它们很好理解.

//取消一个任务的执行,直接将status设置成CANCELLED,设置后判断该status 是否为CANCELLED,是则true否则false.public boolean cancel(boolean mayInterruptIfRunning) {    return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;}//判断是否完成,status小于0代表正常完成/异常完成/取消,很好理解.public final boolean isDone() {    return status < 0;}//判断当前任务是否取消.public final boolean isCancelled() {    //status前4位    return (status & DONE_MASK) == CANCELLED;}public final boolean isCompletedAbnormally() {    //是否为异常完成,前面说过,CANCELLED和EXCEPTIONAL均小于NORMAL    return status < NORMAL;}//是否正常完成.public final boolean isCompletedNormally() {    //完成态位等于NORMAL    return (status & DONE_MASK) == NORMAL;}//获取异常. public final Throwable getException() {    int s = status & DONE_MASK;    //当为正常完成或未完成时,返回null.    return ((s >= NORMAL)    ? null :            //是取消时,新建一个取消异常.            (s == CANCELLED) ? new CancellationException() :            //不是取消,参考前面提到的getThrowableException.            getThrowableException());}//使用异常完成任务. public void completeExceptionally(Throwable ex) {    //参考前述的setExceptionalCompletion,    //ex已经是运行时异常或者Error,直接使用ex完成,若是受检异常,包装成运行时异常.    setExceptionalCompletion((ex instanceof RuntimeException) ||                             (ex instanceof Error) ? ex :                            new RuntimeException(ex));   }//使用value完成任务.public void complete(V value) {    try {        //设置原始结果,它是一个空方法.前面说过ForkJoinTask没有维护result之类的结果字段,子类可自行发挥.        setRawResult(value);    } catch (Throwable rex) {        //前述步骤出现异常,就用异常方式完成.        setExceptionalCompletion(rex);        return;    }    //前面的结果执行完,标记当前为完成.    setCompletion(NORMAL);}//安静完成任务.直接用NORMAL setCompletion,没什么好说的.public final void quietlyComplete() {    setCompletion(NORMAL);} /** * Joins this task, without returning its result or throwing its * exception. This method may be useful when processing * collections of tasks when some have been cancelled or otherwise * known to have aborted. *///安静join,它不会返回result也不会抛出异常.处理集合任务时,如果需要所有任务都被执行而不是一个执行出错(取消)其他也跟着出错的情况下,//很明显适用,这不同于invokeAll,静态方法invokeAll或invoke(ForkJoinTask,ForkJoinTask)会在任何一个任务出现异常后取消执行并抛出.public final void quietlyJoin() {    doJoin();}//安静执行一次,不返回结果不抛出异常,没什么好说的.public final void quietlyInvoke() {    doInvoke();}//重新初台化当前taskpublic void reinitialize() {    if ((status & DONE_MASK) == EXCEPTIONAL)        //如果当前任务是异常完成的,清除异常.该方法参考前面的论述.        clearExceptionalCompletion();    else        //否则重置status为0.        status = 0;}//反fork.public boolean tryUnfork() {    Thread t;    //当前线程是ForkJoinWorkerThread,从它的队列尝试移除.    return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?            ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :            //当前线程不是ForkJoinWorkerThread,用common池外部移除.            ForkJoinPool.common.tryExternalUnpush(this));}

上面是一些简单的周边方法,大多并不需要再论述了,unfork方法很明显在某些场景下不会成功,显然,当一个任务刚刚入队并未进行后续操作时,很可能成功.按前面所述,当对一个任务进行join时,可能会成功的弹出当前任务并执行,此时不可能再次弹出;当一个任务被其他线程窃取会被它本身执行的也不会弹出.

再来看一些老朋友,在前面的文章"CompletableFuture和响应式编程"一文中,作者曾着重强调过它将每个要执行的动作进行压栈(未能立即执行的情况),而栈中的元素Completion即是ForkJoinTask的子类,而标记该Completion是否被claim的方法和周边方法如下:

//获取ForkJoinTask的标记,返回结果为short型public final short getForkJoinTaskTag() {    //status的后16位    return (short)status;}//原子设置任务的标记位.public final short setForkJoinTaskTag(short tag) {    for (int s;;) {        //不停循环地尝试将status的后16位设置为tag.        if (U.compareAndSwapInt(this, STATUS, s = status,                                //替换的结果,前16位为原status的前16位,后16位为tag.                                (s & ~SMASK) | (tag & SMASK)))            //返回被换掉的status的后16位.            return (short)s;    }}//循环尝试原子设置标记位为tag,前提是原来的标记位等于e,成功true失败falsepublic final boolean compareAndSetForkJoinTaskTag(short e, short tag) {    for (int s;;) {        if ((short)(s = status) != e)            //如果某一次循环的原标记位不是e,则返回false.            return false;        //同上个方法        if (U.compareAndSwapInt(this, STATUS, s,                                (s & ~SMASK) | (tag & SMASK)))            return true;    }}

还记得CompletableFuture在异步执行Completion时要先claim吗?claim方法中,会尝试设置这个标记位.这是截止jdk8中CompletableFuture使用到ForkJoinTask的功能.

目前来看,在CompletableFuture的内部实现Completion还没有使用到ForkJoinTask的其他属性,比如放入一个ForkJoinPool执行(没有任何前面总结的调用,比如用ForkJoinPool的push,execute,submit等,也没有fork到common池).但是很明显,道格大神令它继承自ForkJoinTask不可能纯粹只为了使用区区一个标记位,试想一下,在如此友好支持响应式编程的CompletableFuture中传入的每一个action都可以生成若干新的action,那么CompletableFuture负责将这些action封装成Completion放入ForkJoinPool执行,将最大化利用到ForkJoin框架的工作窃取和外部帮助的功效,强力结合分治思想,这将是多么优雅的设计.或者在jdk9-12中已经出现了相应的Completion实现(尽管作者写过JAVA9-12,遗憾的是也没有去翻它们的源码).

另外,尽管Completion的众多子类也没有result之类的表示结果的字段,但它的一些子类通过封装,实际上间接地将这个Completion所引用的dep的result作为了自己的"result",当然,getRawResult依旧是null,但是理念却是相通的.

以上是ForkJoinTask的部分核心源码,除了上述的源码外,还有一些同属于ForkJoinTask的核心源码部分,比如其他的public方法(参考join fork invoke 即可),一些利用ForkJoinPool的实现,要深入了解ForkJoinPool才能了解的方法,一些不太难的静态方法等,这些没有必要论述了.

除了核心源码外,ForkJoinTask也提供了对Runnable,Callable的适配器实现,这块很好理解,简单看一看.

//对Runnable的实现,如果在ForkJoinPool中提交一个runnable,会用它封装成ForkJoinTaskstatic final class AdaptedRunnable<T> extends ForkJoinTask<T>    implements RunnableFuture<T> {    final Runnable runnable;    T result;    AdaptedRunnable(Runnable runnable, T result) {        //不能没有runnable        if (runnable == null) throw new NullPointerException();        this.runnable = runnable;        //对runnable做适配器时,可以提交将结果传入,并设置为当前ForkJoinTask子类的result.        //前面说过,ForkJoinTask不以result作为完成标记,判断一个任务是否完成或异常,使用status足以,        //返回的结果才使用result.        this.result = result;     }    public final T getRawResult() { return result; }    public final void setRawResult(T v) { result = v; }    //前面说过提交入池的ForkJoinTask最终会运行doExec,而它会调用exec,此处会调用run.    public final boolean exec() { runnable.run(); return true; }    public final void run() { invoke(); }    private static final long serialVersionUID = 5232453952276885070L;//序列化用}//无结果的runnable适配器static final class AdaptedRunnableAction extends ForkJoinTask<Void>    implements RunnableFuture<Void> {    final Runnable runnable;    AdaptedRunnableAction(Runnable runnable) {        if (runnable == null) throw new NullPointerException();        this.runnable = runnable;    }    //区别就是result固定为null,也不能set    public final Void getRawResult() { return null; }    public final void setRawResult(Void v) { }    public final boolean exec() { runnable.run(); return true; }    public final void run() { invoke(); }    private static final long serialVersionUID = 5232453952276885070L;}//对runnable的适配器,但强制池中的工作线程在执行任务发现异常时抛出static final class RunnableExecuteAction extends ForkJoinTask<Void> {    final Runnable runnable;    RunnableExecuteAction(Runnable runnable) {        if (runnable == null) throw new NullPointerException();        this.runnable = runnable;    }    //默认null结果,set也是空实现    public final Void getRawResult() { return null; }    public final void setRawResult(Void v) { }    public final boolean exec() { runnable.run(); return true; }    void internalPropagateException(Throwable ex) {        //前面说过doExec会被执行,它会调exec并catch,在catch块中设置当前任务为异常完成态,        //然后调用internalPropagateException方法,而在ForkJoinTask中默认为空实现.        //此处将异常重新抛出,将造成worker线程抛出异常.        rethrow(ex);    }    private static final long serialVersionUID = 5232453952276885070L;}//对callable的适配器,当将callable提交至ForkJoinPool时使用.static final class AdaptedCallable<T> extends ForkJoinTask<T>    implements RunnableFuture<T> {    final Callable<? extends T> callable;    T result;    AdaptedCallable(Callable<? extends T> callable) {        if (callable == null) throw new NullPointerException();        this.callable = callable;    }    //字段中有一个result,直接使用它返回.    public final T getRawResult() { return result; }    //result可外部直接设置.    public final void setRawResult(T v) { result = v; }    public final boolean exec() {        try {            //默认的result用call函数设置.            result = callable.call();            return true;                } catch (Error err) {            //catch住Error,抛出            throw err;        } catch (RuntimeException rex) {            //catch住运行时异常,抛出            throw rex;        } catch (Exception ex) {            //catch住受检异常,包装成运行时异常抛出.            throw new RuntimeException(ex);        }    }    //run方法一样只是调用invoke,进而调用doExec.    public final void run() { invoke(); }    private static final long serialVersionUID = 2838392045355241008L;}//runnable生成适配器的工具方法public static ForkJoinTask<?> adapt(Runnable runnable) {    return new AdaptedRunnableAction(runnable);}//指定结果设置runnable的适配器工具方法public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {    return new AdaptedRunnable<T>(runnable, result);}//对callable生成适配器的方法.public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {    return new AdaptedCallable<T>(callable);}

以上的代码都不复杂,只要熟悉了ForkJoinTask的本身代码结构,对于这一块了解非常容易,这也间接说明了ForkJoinPool中是如何处理Runnable和Callable的(因为ForkJoinPool本身也是一种线程池,可以接受提交Callable和Runnable).

将runnable提交到pool时,可以指定result,也可以不指定,也可以用submit或execute方法区分异常处理行为,ForkJoinPool会自行选择相应的适配器.

将callable 提交到pool时,pool会选择对callable的适配器,它的结果将为task的结果,它的异常将为task的异常.

到此为止,ForkJoinTask的源码分析完成.

后语

本文详细分析了ForkJoinTask的源码,并解释了前文CompletableFuture中Completion与它的关联,以及分析了Completion继承自ForkJoinTask目前已带来的功能利用(tag)和将来可能增加的功用(一个Completion产生若干多个Completion并在ForkJoinPool中运行,还支持工作窃取).

同时本文也对ForkJoinPool和ForkJoinWorkerThread,以及CountedCompleter和Stream api中的并行流进行了略微的描述.

在文章的最后,或许有一些新手读者会好奇,我们究竟什么时候会使用ForkJoinTask?

首先,如果你在项目中大肆使用了流式计算,并使用了并行流,那么你已经在使用了.

前面提过,官方解释ForkJoinTask可以视作比线程轻量许多的实体,也是轻量的Future.结合在源码中时不时出来秀存在感的ForkJoinWorkerThread,显然它就是据说比普通线程轻量一些的线程,在前面的源码中可以看出,它维护了一组任务的队列,每个线程负责完成队列中的任务,也可以偷其他线程的任务,甚至池外的线程都可以时不时地来个join,顺便帮助出队执行任务.

显然,对于重计算,轻io,轻阻塞的任务,适合使用ForkJoinPool,也就使用了ForkJoinTask,你不会认为它可以提交runnable和callable,就可以不用ForkJoinTask了吧?前面的适配器ForkJoinPool在这种情况下必用的,可以去翻相应的源码.

本章没有去详述CountedCompleter,但前面论述时说过,你可以在exec()中将一个计算复杂的任务拆解为小的子任务,然后将子任务入池执行,父任务合并子任务的结果.这种分治的算法此前基本是在单线程模式下运行,使用ForkJoinTask,则可以将这种计算交给一个ForkJoinPool中的所有线程并行执行.