ForkJoin框架之ForkJoinPool

7次阅读

共计 90999 个字符,预计需要花费 228 分钟才能阅读完成。

前言

在前面的三篇文章中先后介绍了 ForkJoin 框架的任务组件 (ForkJoinTask 体系,CountedCompleter 体系) 源码, 并简单介绍了目前的并行流应用场景.ForkJoin 框架本质上是对 Executor-Runnable/Callable-Future/FutureTask 的扩展, 它依旧支持经典的 Executor 使用方式, 即任务 + 池的配合, 向池中提交任务, 并异步地等待结果.

毫无疑问, 前面的文章已经解释了 ForkJoin 框架的新颖性, 初步了解了工作窃取依托的数据结构,ForkJoinTask/CountedCompleter 在执行期的行为, 也提到它们一定要在 ForkJoinPool 中进行运行和调度, 这也是本文力求解决的问题.

ForkJoinPool 源码

ForkJoinPool 源码是 ForkJoin 框架中最复杂, 最难理解的部分, 且因为交叉依赖 ForkJoinTask,CountedCompleter,ForkJoinWorkerThread, 作者在前面单独用两篇文章分析了它们, 以前两篇文章为基础, 重复部分本文不再详述.

首先看类签名.

// 禁止伪共享
@sun.misc.Contended
// 继承自 AbstractExecutorService
public class ForkJoinPool extends AbstractExecutorService

前面的几篇文章不止一次强调过 ForkJoin 框架的 ” 轻量线程, 轻量任务 ” 等概念, 也提到少量线程 - 多数计算, 资源空闲时窃取任务. 并介绍了基于 status 状态的调度 (ForkJoinTask 系列), 不基于 status 而由子任务触发完成的调度(CountedCompleter 系列), 显然它们的共性就是让线程在正常调度的前提下尽量少的空闲, 最大幅度利用 cpu 资源, 伪共享 / 缓存行的问题在 ForkJoin 框架中显然会是一个更大的性能大杀器. 在 1.8 之前, 一般通过补位的方式解决伪共享问题,1.8 之后, 官方使用 @Contended 注解, 令虚拟机尽量注解标注的字段(字段的情况) 或成员字段放置在不同的缓存行, 从而规避了伪共享问题.

建立 ForkJoinPool 可以直接 new, 也可以使用 Executors 的入口方法.

//Executors 方法, 显然 ForkJoinPool 被称作工作窃取线程池. 参数指定了并行度.
public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
        (parallelism,
        // 默认线程工厂, 前文中已提过默认的 ForkJoinWorkerThread
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}


// 不提供并行度.
public static ExecutorService newWorkStealingPool() {
    return new ForkJoinPool
        // 使用所有可用的处理器
        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}
// 对应的,ForkJoinPool 的构造器们.
// 不指定任何参数.
public ForkJoinPool() {
    // 并行度取 MAX_CAP 和可用处理器数的最小值.
    this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
        // 默认的线程工厂. 无异常处理器, 非异步模式.
         defaultForkJoinWorkerThreadFactory, null, false);
}


// 同上, 只是使用参数中的并行度.
public ForkJoinPool(int parallelism) {this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}



public ForkJoinPool(int parallelism,
                    ForkJoinWorkerThreadFactory factory,
                    UncaughtExceptionHandler handler,
                    boolean asyncMode) {
    // 并行度需要校验
    this(checkParallelism(parallelism),
        // 校验线程工厂
         checkFactory(factory),
        // 参数指定的未捕获异常处理器.
         handler,
        // 前面的几处代码 asyncMode 都是 false, 会选用 LIFO 队列, 是 true 是会选用 FIFO 队列, 后面详述.
         asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
        // 线程名前缀
         "ForkJoinPool-" + nextPoolId() + "-worker-");
    // 检查许可, 不关心.
    checkPermission();}

// 检查方法很简单.
// 并行度不能大于 MAX_CAP 不能不大于 0.
 private static int checkParallelism(int parallelism) {if (parallelism <= 0 || parallelism > MAX_CAP)
        throw new IllegalArgumentException();
    return parallelism;
}
// 线程工厂非空即可.
private static ForkJoinWorkerThreadFactory checkFactory
    (ForkJoinWorkerThreadFactory factory) {if (factory == null)
        throw new NullPointerException();
    return factory;
}
// 最终构造器, 私有. 待介绍完一些基础字段后再述.
private ForkJoinPool(int parallelism,
                     ForkJoinWorkerThreadFactory factory,
                     UncaughtExceptionHandler handler,
                     int mode,
                     String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    //config 初始化值, 用并行度与 mode 取或, 显然 mode 是 FIFO 时, 将有一个第 17 位的 1.
    this.config = (parallelism & SMASK) | mode;
    //np 保存并行度 (正数) 的相反数(补码).
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

了解其他线程池源码的朋友可以去回忆其他线程池的构建, 不论是调度线程池还是普通的线程池或者缓存池, 他们其实都设置了核心线程数和最大线程数. 当然这要看定义 ” 线程池分类 ” 的视角, 以 Executors 入口的 api 分类, 或许可以分类成固定线程池, 缓冲池, 单线程池, 调度池, 工作窃取池; 但以真正的实现分类, 其实只有 ThreadPoolExecutor 系列 (固定线程池, 单线程池都直接是 ThreadPoolExecutor, 调度池是它的子类, 缓冲池也是 ThreadPoolExecutor, 只是阻塞队列限定为 SynchronizedQueue) 和 ForkJoinPool 系列(工作窃取池).

作者更倾向于用实现的方式区分, 也间接参照 Executors 的 api 使用用途的区分方式. 如果不使用 Executors 的入口 api, 不论哪种 ThreadPoolExecutor 系列, 我们都可以提供线程池的大小配置, 阻塞队列, 线程空闲存活时间及单位, 池满拒绝策略, 线程工厂等, 而所谓的缓存池和固定池的区别只是队列的区别.

调度池的构造参数与 ThreadPoolExecutor 无异, 只是内限了阻塞队列的类型, 它虽然是 ThreadPoolExecutor 的扩展, 却不仅没有拓充参数, 反而减少了两个参数: 阻塞队列和最大线程数. 阻塞队列被默认设置为内部类 DelayQueue, 它实现了 BlockingQueue, 最大线程数则为整数上限, 同时新增的对任务的延时或重试等属性则是依托于内部维护的一个 FutureTask 的扩展, 并未增加到构造参数.

而到了 ForkJoinPool, 我们看到的是截然不同于 ThreadPoolExecutor 系列的构建方式. 首先根本没有提供核心线程和最大线程数, 线程空闲存活时间的参数和阻塞队列以及池满拒绝策略; 线程工厂也仅能提供生产 ForkJoinWorkerThread 的工厂 bean; 还具备一些 ThreadPoolExecutor 没有的参数, 如未捕获异常处理器, 同步异步模式, 工作线程前缀 (其实别的类型的线程工厂也可以提供线程前缀, 默认就是常见的 pool- 前缀) 等.

显然从参数看便可猜测出若干不同于其他线程池的功能. 但我们更关心其中的一些参数设置.

一般的参数都能见名知义, 仅有 config 和 ctl 难以理解, 此处也不详细介绍, 只说他们的初值的初始化.

config 是并行度与 SMASK 取与运算再与 mode 取或, 这里并行度最大是 15 位整数(MAX_CAP=0x7FFF), 而 SMASK 作用于整数后 16 位,mode 在 FIFO 为 1 <<16,LIFO 是 0. 很好计算.

ctl 其实是一个控制信号, 我们后面会在具体源码就地解释, 它的计算先通过了一个局部变量 np.

np 的计算方法是将并行度的相反数 (补码) 转换为长整型. 前面简单分析, 并行度不会大于 MAX_CAP, 因此 np 至少前 49 位全部是 1.

计算 ctl 时, 将 np 左移 AC_SHIFT 即为取后 16 位, 将 np 左移 TC_SHIFT 即取它的后 32 位, 分别与 AC_MASK 和 TC_SHIFT, 表示取 np 的后 16 位分别放置于 ctl 的前 16 位和 33 至 48 位. 而 ctl 的后 32 位初值为 0.

因为生成的 ctl 前 16 位和后 16 位相等, 如果仔细用数学验证, 可以发现, 对前 16 位和后 16 位的末位同时加 1, 当添加了 parallel 次后,ctl 将归 0. 这也是添加 worker 限制的重要数理依据.

前面列举了获取 ForkJoinPool 实例的几种方法, 初步展示了构造一个 ForkJoinPool 的属性, 也暴露了一些实现细节, 而这些细节依赖于一些字段和成员函数, 我们先从它们开始.

//ForkJoinWorkerThread 的线程工厂.
public static interface ForkJoinWorkerThreadFactory {
    // 创建新线程要实现的方法.
    public ForkJoinWorkerThread newThread(ForkJoinPool pool);
}
// 前面看到的默认线程工厂.
static final class DefaultForkJoinWorkerThreadFactory
    implements ForkJoinWorkerThreadFactory {public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {return new ForkJoinWorkerThread(pool);
    }
}
// 创建 InnocuousForkJoinWorkerThread 的线程工厂, 上一文已经介绍过.
static final class InnocuousForkJoinWorkerThreadFactory
    implements ForkJoinWorkerThreadFactory {
    
    private static final AccessControlContext innocuousAcc;
    static {Permissions innocuousPerms = new Permissions();
        innocuousPerms.add(modifyThreadPermission);
        innocuousPerms.add(new RuntimePermission("enableContextClassLoaderOverride"));
        innocuousPerms.add(new RuntimePermission("modifyThreadGroup"));
        innocuousAcc = new AccessControlContext(new ProtectionDomain[] {new ProtectionDomain(null, innocuousPerms)
            });
    }

    public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {return (ForkJoinWorkerThread.InnocuousForkJoinWorkerThread)
            java.security.AccessController.doPrivileged(new java.security.PrivilegedAction<ForkJoinWorkerThread>() {public ForkJoinWorkerThread run() {
                    return new ForkJoinWorkerThread.
                        InnocuousForkJoinWorkerThread(pool);
                }}, innocuousAcc);
    }
}
// 空任务
static final class EmptyTask extends ForkJoinTask<Void> {
    private static final long serialVersionUID = -7721805057305804111L;
    EmptyTask() { status = ForkJoinTask.NORMAL;} // 状态直接是已正常完成.
    public final Void getRawResult() { return null;}
    public final void setRawResult(Void x) {}
    public final boolean exec() { return true;}
}

以上是线程工厂和一个默认的 EmptyTask. 接下来看一些跨池和工作队列的公用常量.

 // 与边界有关的常量
static final int SMASK        = 0xffff;        // 后 16 位.
static final int MAX_CAP      = 0x7fff;        // 前面在定并行度时参考的最大容量.
static final int EVENMASK     = 0xfffe;        // 后 16 位验偶数
static final int SQMASK       = 0x007e;        // 最大 64 个偶数槽, 从第 2 位至 7 位共 6 位,2 的 6 次方.

// 与 WorkQueue 有关
static final int SCANNING     = 1;             // 对 WorkQueue 正在运行任务的标记
static final int INACTIVE     = 1 << 31;       // 标记负数
static final int SS_SEQ       = 1 << 16;       // 版本号使用, 第 17 位 1

// ForkJoinPool 和 WorkQueue 的 config 有关常量.
static final int MODE_MASK    = 0xffff << 16;  // 能滤取前 16 位.
static final int LIFO_QUEUE   = 0;// 前面提到过的, 非 async 模式(false), 值取 0.
static final int FIFO_QUEUE   = 1 << 16;//async 模式(true), 值取 1.
static final int SHARED_QUEUE = 1 << 31;       // 共享队列标识, 符号位表示负.

以上的字段含义只是粗略的描述, 先有一个印象, 后面看到时自然理解其含义.

接下来看核心的 WorkQueue 内部类.

// 前面的文章说过, 它是一个支持工作窃取和外部提交任务的队列. 显然, 它的实例对内存部局十分敏感,
//WorkQueue 本身的实例, 或者内部数组元素均应避免共享同一缓存行.
@sun.misc.Contended
static final class WorkQueue {

    
    // 队列内部数组的初始容量, 默认是 2 的 12 次方, 它必须是 2 的几次方, 且不能小于 4.
    // 但它应该设置一个较大的值来减少队列间的缓存行共享.
    // 在前面的 java 运行时和 54 篇 java 官方文档术语中曾提到,jvm 通常会将
    // 数组放在能够共享 gc 标记 (如卡片标记) 的位置, 这样每一次写都会造成严重内存竞态.
    static final int INITIAL_QUEUE_CAPACITY = 1 << 13;

    // 最大内部数组容量, 默认 64M, 也必须是 2 的平方, 但不大于 1 <<(31- 数组元素项宽度),
    // 根据官方注释, 这可以确保无需计算索引概括, 但定义一个略小于此的值有助于用户在
    // 系统饱合前捕获失控的程序.
    static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
    // unsafe 机制有关的字段.
    private static final sun.misc.Unsafe U;
    private static final int  ABASE;
    private static final int  ASHIFT;
    private static final long QTOP;
    private static final long QLOCK;
    private static final long QCURRENTSTEAL;
    static {
        try {U = sun.misc.Unsafe.getUnsafe();
            Class<?> wk = WorkQueue.class;
            Class<?> ak = ForkJoinTask[].class;
            //top 字段的句柄.
            QTOP = U.objectFieldOffset
                (wk.getDeclaredField("top"));
            //qlock 字段的句柄.
            QLOCK = U.objectFieldOffset
                (wk.getDeclaredField("qlock"));
            //currentSteal 的句柄
            QCURRENTSTEAL = U.objectFieldOffset
                (wk.getDeclaredField("currentSteal"));
            //ABASE 是 ForkJoinTask 数组的首地址.
            ABASE = U.arrayBaseOffset(ak);
            //scale 代表数组元素的索引大小. 它必须是 2 的平方.
            int scale = U.arrayIndexScale(ak);
            if ((scale & (scale - 1)) != 0)
                throw new Error("data type scale not a power of two");
            // 计算 ASHIFT, 它是 31 与 scale 的高位 0 位数量的差值. 因为上一步约定了 scale 一定是一个正的 2 的几次方,
            //ASHIFT 的结果一定会大于 1. 可以理解 ASHIFT 是数组索引大小的有效位数.
            ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
        } catch (Exception e) {throw new Error(e);
        }
    }
    // 插曲, 在 Integer 类的 numberOfLeadingZeros 方法, 果然一流的程序是数学.
        public static int numberOfLeadingZeros(int i) {
        // HD, Figure 5-6
        if (i == 0)
            // i 本身已是 0, 毫无疑问地返回 32. 本例中 i 是 2 起, 所以不会.
            return 32;
        // 先将 n 初始化 1. 最后会减掉首位 1.
        int n = 1;
        // i 的前 16 位不存在非零值, 则将 n 加上 16 并移除 i 的前 16 位. 将 i 转换为一个以原 i 后 16 位开头的新值.
        if (i >>> 16 == 0) {n += 16; i <<= 16;}
        // 不论前一步结果如何, 若此时 i 的前 8 位不存在非零值, 则 n 加上 8,i 移除前 8 位. 将 i 转换为原 i 的后 24 位开头的新值.
        if (i >>> 24 == 0) {n +=  8; i <<=  8;}
        // 不论前一步结果如何, 若此时 i 的前 4 位不存在非零值, 则 n 加上 4,i 移除前 4 位. 将 i 转换为原 i 的后 28 位开头的新值.
        if (i >>> 28 == 0) {n +=  4; i <<=  4;}
        // 不论前一步结果如何, 若此时 i 的前 2 位不存在非零值, 则 n 加上 2,i 移除前 2 位. 将 i 转换为原 i 的后 30 位开头的新值.
        if (i >>> 30 == 0) {n +=  2; i <<=  2;}
        // 经过前面的运算,i 的前 30 位的非零值数量已经记入 n,
        // 在前一步的基础上, 此时 i 的前 1 位若存在非零值, 则 n -1, 否则 n 保留原值.
        n -= i >>> 31;
        return n;
    }
    // 回到 WorkQueue
    // 实例字段
    volatile int scanState;    // 版本号, 小于 0 代表不活跃, 注释解释奇数代表正在扫描, 但从代码语义上看正好相反.
    int stackPred;             // 前一个池栈控制信号(ctl), 它保有前一个栈顶记录.
    int nsteals;               // 偷盗的任务数
    int hint;                  // 一个随机数, 用于决定偷取任务的索引.
    int config;                // 配置, 表示池的索引和模式
    volatile int qlock;        // 队列锁,1 表示锁了, 小于 0 表示终止, 其他情况是 0.
    volatile int base;         // 底, 表示下一个 poll 操作的插槽索引
    int top;                   // 顶, 表示下一个 push 操作的插槽索引
    ForkJoinTask<?>[] array;   // 存放任务元素的数组, 初始不分配, 首扩容会分配.
    final ForkJoinPool pool;   // 包含该队列的池, 可能在某些时刻是 null.
    final ForkJoinWorkerThread owner; // 持有该队列的线程, 如果队列是共享的,owner 是 null.
    volatile Thread parker;    // 在调用 park 阻塞的 owner, 非阻塞时为 null
    volatile ForkJoinTask<?> currentJoin;  // 被在 awaitJoin 中 join 的 task.
    volatile ForkJoinTask<?> currentSteal; // 字面意思当前偷的任务, 主要用来 helpStealer 方法使用.
    // 工作队列构造器, 只初始化线程池,owner 等字段.
    WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
        this.pool = pool;
        this.owner = owner;
        // Place indices in the center of array (that is not yet allocated)
        //base 和 top 初始均为 INITIAL_QUEUE_CAPACITY 的一半, 也就是 2 的 11 次方.
        base = top = INITIAL_QUEUE_CAPACITY >>> 1;
    }

    // 返回本队列在池中的索引, 使用 config 的 2 至 4 位表示. 因为 config 的最后一位是奇偶位, 忽略.
    final int getPoolIndex() {return (config & 0xffff) >>> 1; 
    }

    
    // 返回队列中的任务数.
    final int queueSize() {
        // 非 owner 的调用者必须先读 base, 用 base-top, 得到的结果小于 0 则取相反数, 否则取 0.
        // 忽略即时的负数, 它并不严格准确.
        int n = base - top;       
        return (n >= 0) ? 0 : -n; 
    }

    
    // 判断队列是否为空队. 本方法较为精确, 对于近空队列, 要检查是否有至少一个未被占有的任务.
    final boolean isEmpty() {ForkJoinTask<?>[] a; int n, m, s;
        //base 大于等于 top, 说明空了.
        return ((n = base - (s = top)) >= 0 ||
                // 有容量, 且恰好计算为 1, 可能只有一个任务.
                (n == -1 && 
                // 计算为 1, 再验数组是不是空的.          
                 ((a = array) == null || (m = a.length - 1) < 0 ||
                // 取该位置元素的值判空, 空则说明 isEmpty.
                // 取值的方式是取 ForkJoinTask.class 首地址加上偏移量 (数组长度减一(最后一个元素位置, 经典案例 32-1) 与运算 top 减一左移 ASHIFT(索引大小有效位数)位)的值.
                  U.getObject
                  (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null)));
    }

   
    // 将一个任务压入队列, 前文提过的 fork 最终就会压队. 但此方法只能由非共享队列的持有者调用.
    // 当使用线程池的 "外部压入"externalPush 方法时, 压入共享队列.
    final void push(ForkJoinTask<?> task) {ForkJoinTask<?>[] a; ForkJoinPool p;
        // 保存当时的 base top.
        int b = base, s = top, n;
        // 如果数组被移除则忽略.
        if ((a = array) != null) { 
            // 数组最后一个下标. 如长度 32, 则 m 取 31 这个质数. 此时保存一个 m, 对于保存后其他 push 操作相当于打了屏障.
            int m = a.length - 1; 
            // 向数组中的指定位置压入该任务. 位置包含上面的 m 和 s 进行与运算(数组中的位置), 结果左移索引有效长度位(索引长度), 再加上数组首索引偏移量(起始地址).  
            U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
            // 将 top 加 1.
            U.putOrderedInt(this, QTOP, s + 1);
            if ((n = s - b) <= 1) {
                // 计算旧的任务数量, 发现不大于 1 个, 说明原来很可能工作线程正在阻塞等待新的任务. 需要唤醒它.
                if ((p = pool) != null)
                    //signalWork 会根据情况, 添加新的工作线程或唤醒等待任务的线程.
                    p.signalWork(p.workQueues, this);
            }
            else if (n >= m)//2.
                // 任务数量超出了, 对数组扩容.
                growArray();}
    }
    // 添加任务过程主流程无锁, 包括可能出现的 growArray. 当原队列为空时, 它会初始化一个数组, 否则扩容一倍.
    // 持有者调用时, 不需要加锁, 但当其他线程调用时, 需要持有锁. 在 resize 过程中,base 可以移动, 但 top 不然.
    final ForkJoinTask<?>[] growArray() {
        // 记录老数组.
        ForkJoinTask<?>[] oldA = array;
        // 根据老数组决定新容量, 老数组空则 INITIAL_QUEUE_CAPACITY 否则国倍.
        int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
        if (size > MAXIMUM_QUEUE_CAPACITY)
            // 新大小大于最大数组大小则拒绝.
            throw new RejectedExecutionException("Queue capacity exceeded");
        int oldMask, t, b;
        // 直接将原来的数组引用替换成新的.
        ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
        // 如果是初次分配, 就此打住返回 a, 是扩容, 且老数组非空则进入下面的循环拷贝.
        if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
            (t = top) - (b = base) > 0) {
            // 根据前面的运算,size 一定是 2 的幂, 减一用来哈希, 这是经典处理办法.
            int mask = size - 1;
            do { 
                ForkJoinTask<?> x;
                // 老数组 base 自增过若干次的得到 b, 它代表的元素对应的索引.
                int oldj = ((b & oldMask) << ASHIFT) + ABASE;
                // 用 b 在新数组中找出索引.
                int j    = ((b &    mask) << ASHIFT) + ABASE;
                // 老数组中用索引取出元素.
                x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
                if (x != null &&
                    // 老数组置空, 放入新数组.
                    U.compareAndSwapObject(oldA, oldj, x, null))
                    U.putObjectVolatile(a, j, x);
            // 每处理完一个 task, 就将 base 自增 1, 直到 top 为止.
            } while (++b != t);
        }
        // 返回新数组.
        return a;
    }

    
    // 存在下一个任务, 弹出, 顺序是后进先出. 此方法仅限非共享队列的 owner 调用.
    final ForkJoinTask<?> pop() {ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
        // 还有元素.
        if ((a = array) != null && (m = a.length - 1) >= 0) {
            //1.top 至少比 base 大一. 注意, 每次循环都会读出新的 top, 它是 volatile 修饰的.
            for (int s; (s = top - 1) - base >= 0;) {
                //top 对应的索引.
                long j = ((m & s) << ASHIFT) + ABASE;
                //2. 该索引没有元素,break, 返回 null. 而且就代表这个位置的确是 null, 与竞态无关.
                // 因为此方法仅 owner 线程使用, 不会出现另一个线程计算了同样的 j, 且先执行了 3 的情况.
                // 出现这种情况, 则是此位置的任务当先被执行并出栈, 或者就从未设置过任务, 后续分析这种极端情况.
                // 故如果出现某个任务在数组的中间, 提前被执行并置空(非 pop 或 poll 方式), 那么再对 WorkQueue 进行 pop 时将会中断,
                // 留下一部分 null 之后的任务不能出栈, 所以可以允许任务非 pop 或 poll 方式查出并执行, 但为了能 pop 出所有任务, 不能中间置 null.
                if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
                    break;
                //3. 有元素, 将该索引位置置 null. 若 cas 失败, 说明元素被取出了,
                // 但下次循环即使在 2 处 break 并返回 null, 也不是因为竞态, 因为每次循环到 1 都会读取新的 top,
                // 也就有新的 j.
                if (U.compareAndSwapObject(a, j, t, null)) {
                    // 数组位置置 null 的同时 top 减 1.
                    U.putOrderedInt(this, QTOP, s);
                    return t;
                }
            }
        }
        // 循环退出, 说明 top 位置没有元素, 也相当于说明数组为空. 显然此方法的另一个作用是将队列压缩, 空队列会将 top 先降到 base+1, 再循环最后一次将 top 降到 base.
        return null;
    }
    

    
    // 如果 b 是 base, 使用 FIFO 的次序尝试无竞态取底部的任务. 它会在 ForkJoinPool 的 scan 和 helpStealer 中使用.
    final ForkJoinTask<?> pollAt(int b) {ForkJoinTask<?> t; ForkJoinTask<?>[] a;
        if ((a = array) != null) {
            // 和前面一样的的方式计算 b 对应的索引 j
            int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
            if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
                // j 对应位置有 task 且当前 base==b, 尝试将 task 出队.
                base == b && U.compareAndSwapObject(a, j, t, null)) {
                // 出队成功 base 增 1. 不需要额外的同步, 因为两个线程不可能同时在上面的 cas 成功.
                // 当一切条件匹配(b 就是 base 且 j 位置有元素),pollAt 同一个 b 只会有一个返回非空的 t.
                // 如果多个线程传入的 b 不相等, 在同一时刻只有一个会等于 base.
                base = b + 1;
                return t;
            }
        }
        return null;
    }

    
    // 用 FIFO 的次序取下一个任务.
    final ForkJoinTask<?> poll() {ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
        //1. 循环从 base 取任务, 当 base 增长到 top 或其他操作重置 array 为 null 则终止循环.
        while ((b = base) - top < 0 && (a = array) != null) {
            // 前面已叙述过取索引的逻辑, 使用一个 top 到 base 间的数与数组长度 - 1 与运算并左移索引长度位再加上数组基准偏移量. 后面不再缀述.
            int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
            // 取出 task
            t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
            //2. 如果发生竞态,base 已经不是 b, 直接开启下一轮循环把新的 base 读给 b.
            if (base == b) {if (t != null) {
                    //3. 当前 t 是 base 任务, 用 cas 置空,base+1, 返回 t.
                    // 如果此处发生竞态, 则只有一个线程可以成功返回 t 并重置 base(4).
                    // 不成功的线程会开启下一轮循环, 此时成功线程可能未来的及执行 4 更新 base,
                    // 也可能已经更新 base, 则导致先前失败的线程在 2 处通过, 经 5 种或判队列空返回, 或非空再次循环, 而
                    // 在当前成功线程执行 4 成功后, 所有前面失败的线程可以在 1 处读到新的 base, 这些线程
                    // 在下一次循环中依旧只会有一个成功弹出 t 并重置 base, 直到所有线程执行完毕.
                    if (U.compareAndSwapObject(a, j, t, null)) {
                        // 4 重置加返回
                        base = b + 1;
                        return t;
                    }
                }
                //5.t 取出的是空, 发现此时临时变量 b(其他成功线程在此轮循环前置的 base)已增至 top-1, 且当前线程又没能成功的弹出 t, 说明一定会有一个线程
                // 将 t 弹出并更新 base 到 top 的值, 当前线程没必要再开下一个循环了, 直接 break 并返回 null.
                // t 取出的是空, 但是没到 top, 说明只是被提前执行并置空了, 那么继续读取新的 base 并循环, 且若没有其他线程去更改 base,array 的长度, 或者把 top 降到
                //base, 则当前线程就永远死循环下去了, 因为每次循环都是 125 且每个变量都不变. 因此为避免循环, 每个任务可以提前执行, 但一定不能提前离队(置 null).
                // 也就是说: 只能用 poll 或 pop 方式弹出任务, 其他方式获得任务并执行是允许的, 但不能在执行后置 null, 留待后续源码验证一下.
                else if (b + 1 == top) // now empty
                    break;
            }
        }
        // 从循环退出来有两种情况, 可能是在 5 处满足退出条件, 或者在 2 处发现 b 已经是脏数据, 下轮循环不满足循环条件所致. 两种都应该返回 null.
        return null;
    }

    // 根据 mode 来取下一个本队列元素. 根据模式.
    final ForkJoinTask<?> nextLocalTask() {
        // 当前 WorkQueue 的配置了 FIFO, 则 poll, 否则 pop.
        // 尽管还未看到注册 worker 的源码, 在此提前透露下,ForkJoinPool 也有一个 config(前面讲构造函数提过)
        // 该 config 保存了 mode 信息, 并原样赋给了 WorkQueue 的 mode. 注意, 相应的任务会出队.
        return (config & FIFO_QUEUE) == 0 ? pop() : poll();
    }

    // 根据模式取出下一个任务, 但是不出队.
    final ForkJoinTask<?> peek() {ForkJoinTask<?>[] a = array; int m;
        // 空队, 返回 null.
        if (a == null || (m = a.length - 1) < 0)
            return null;
        // 根据 mode 定位要取的索引 j.
        int i = (config & FIFO_QUEUE) == 0 ? top - 1 : base;
        int j = ((i & m) << ASHIFT) + ABASE;
        // 返回读出的值, 不出队.
        return (ForkJoinTask<?>)U.getObjectVolatile(a, j);
    }

   
    // 如果参数 t 是当前队的 top, 则弹出.
    final boolean tryUnpush(ForkJoinTask<?> t) {ForkJoinTask<?>[] a; int s;
        if ((a = array) != null && (s = top) != base &&
            //1. 满足非空条件. 尝试用 t 去当当作计算出的索引位置的原任务的值并 cas 为 null 来出队.
            U.compareAndSwapObject
            (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
            //cas 成功, 说明 t 确实是 top, 将 top 减一返回 true.
            U.putOrderedInt(this, QTOP, s);
            return true;
        }
        //2.cas 失败或不满足 1 的条件, 返回 false.
        return false;
    }

    // 移除并取消队列中所有已知的任务, 忽略异常.
    final void cancelAll() {
        ForkJoinTask<?> t;
        if ((t = currentJoin) != null) {
            // 有 currentJoin, 引用置空, 取消并忽略异常.
            currentJoin = null;
            ForkJoinTask.cancelIgnoringExceptions(t);
        }
        if ((t = currentSteal) != null) {
            // 有 currentSteal, 引用置空, 取消并忽略异常.
            currentSteal = null;
            ForkJoinTask.cancelIgnoringExceptions(t);
        }
        // 除了上面两个, 就只剩下数组中的任务了. 按 LILO 的顺序弹出并依次取消, 忽略所有异常.
        while ((t = poll()) != null)
            ForkJoinTask.cancelIgnoringExceptions(t);
    }

    // 以下是执行方法.

    // 按 FIFO 顺序从队首弹出任务并执行所有非空任务.
    final void pollAndExecAll() {for (ForkJoinTask<?> t; (t = poll()) != null;)
            // 很明显, 如果未按严格顺序执行, 先执行中间的一个任务,
            // 再调用本方法, 则会半路中止.
            t.doExec();}

    
    // 移除并执行完所有本队列的任务, 如果是先进先出, 则执行前面的 pollAndExecAll 方法.
    // 否则 pop 循环执行到空为止. 按前面的分析, 只要坚持只能 pop 或 poll 弹出, 其他方式执行任务但不能置空的原则,
    // 可以保证 pop 或 poll 出现空的情况只能是竞态发生的情况.
    final void execLocalTasks() {
        int b = base, m, s;
        ForkJoinTask<?>[] a = array;
        // 初始满足条件,top 至少比 base 大 1. 队列非空.
        if (b - (s = top - 1) <= 0 && a != null &&
            (m = a.length - 1) >= 0) {
            // 不是 FIFO 模式.
            if ((config & FIFO_QUEUE) == 0) {for (ForkJoinTask<?> t;;) {
                    // 原子 getAndSet, 查出并弹出原本的 task
                    if ((t = (ForkJoinTask<?>)U.getAndSetObject
                         (a, ((m & s) << ASHIFT) + ABASE, null)) == null)
                        // 弹出的 task 是空,break. 说明整个工作流程中, 如果未保证严格有序,
                        // 如先从中间的某个任务开始执行并且出队了, 再调用 execLocalTasks, 会导致中间停顿.
                        // 只执行不出队, 则至少不会中断. 出现 t 是 null 的情况只能是竞态或末尾.
                        break;
                    //top 减一, 执行任务.
                    U.putOrderedInt(this, QTOP, s);
                    t.doExec();
                    // 如果 base 大于等于 top, 则中止.
                    if (base - (s = top - 1) > 0)
                        break;
                }
            }
            // 是 FIFO 模式,pollAndExecAll.
            else
                pollAndExecAll();}
    }

    // 重点入口方法来了, 前面留下诸多关于执行任务是否出队的讨论, 下面来分析入口方法.
    // 该方法的入口是每个工作线程的 run 方法, 因此只有一个线程.
    final void runTask(ForkJoinTask<?> task) {
        // 传入 task 是空直接不理会.
        if (task != null) {
            // 标记成忙.scanState 是 WorkQueue 的成员变量, 每个 WorkQueue 只有一个值,
            // 前面说过, 一般情况下, 每个线程会有一个 WorkQueue, 所以某种情况来讲也可以标记为
            // 当前 ForkJoinWorkerThread 繁忙.
            //SCANNING 常量值是 1, 这个操作实质上就是将 scanState 变量的个位置 0, 也就是变成了偶数并标记它要忙了.
            // 显然偶数才表示忙碌, 这也是为什么前面觉得官方注释 scanState 是奇数表示 "正在扫描" 很奇怪.
            scanState &= ~SCANNING; 
            // 将 currentSteal 设置为传入的任务, 并运行该任务, 若该任务内部进行了分叉, 则进入相应的入队逻辑.
            (currentSteal = task).doExec();
            // 执行完该任务后, 将 currentSteal 置空. 将该 task 释放掉, 帮助 gc.
            U.putOrderedObject(this, QCURRENTSTEAL, null);
            // 调用前面提到的, 根据 mode 选择依次 pop 或 poll 的方式将自己的工作队列内的任务出队并执行的方法.
            execLocalTasks();
            // 到此, 自己队列中的所有任务都已经完成. 包含偷来的任务 fork 后又入队到自己队列的子任务.
            // 取出 owner 线程. 处理偷取任务有关的一些信息.
            ForkJoinWorkerThread thread = owner;
            if (++nsteals < 0)
                // 发现当前 WorkQueue 偷来的任务数即将溢出了, 将它转到线程池.      
                transferStealCount(pool);
            // 取消忙碌标记.
            scanState |= SCANNING;
            if (thread != null)
                // 执行 afterTopLevelExec 勾子方法, 上一节中介绍 ForkJoinWorkerThread 时已介绍.
                thread.afterTopLevelExec();}
        // 方法结束, 注意, 没有任何操作将 task 从所在的数组中移除, 不论这个 task 是哪个 WorkQueue 中的元素.
        // 同时, 此方法原则上讲可以多次调用(尽管事实上就一次调用), 入口处和出口处分别用忙碌标记来标记 scanState, 但重复标记显然不影响执行.
    }

    // 如果线程池中已经初始化了用于记录的 stealCounter, 则用它加上当前 WorkQueue 的 nsteals/ 或最大整数(发生溢出时).
    // 并初始化当前 WorkQueue 的 nsteals.
    final void transferStealCount(ForkJoinPool p) {
        AtomicLong sc;
        if (p != null && (sc = p.stealCounter) != null) {
            // 线程池中存放了 stealCounter, 它是一个原子整数.
            int s = nsteals;
            nsteals = 0; // 恢复 0.  
            // 若 nsteals 是负, 增加最大整数, 否则增加 nsteal         
            sc.getAndAdd((long)(s < 0 ? Integer.MAX_VALUE : s));
        }
    }

    
    // 如果 task 存在, 则将它从队列中移除并执行, 发现有位于顶部的取消任务, 则移除之, 只用于 awaitJoin.
    // 如果队列空并且任务不知道完成了, 则返回 true.
    final boolean tryRemoveAndExec(ForkJoinTask<?> task) {ForkJoinTask<?>[] a; int m, s, b, n;
        // 进入 if 的条件, 存在非空任务数组, 参数 task 非空.
        if ((a = array) != null && (m = a.length - 1) >= 0 &&
            task != null) {
            // 循环条件, 队列非空. 从 s 开始遍历到 b, 也就是从顶到底. 后进先出.
            while ((n = (s = top) - (b = base)) > 0) {
                //1. 内层循环.
                for (ForkJoinTask<?> t;;) {
                    //2. 从顶开始的索引 j, 每次向下找一个.      
                    long j = ((--s & m) << ASHIFT) + ABASE;
                    if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
                        //3. 取出的是空, 返回值取决于 top 是不是内层循环是第一次运行, 外循环每次会将 s 更新为新 top,
                        // 内循环则会每次将 s 减一. 内循环只跑了一次的情况, 显然会返回 true.
                        // 显然这种情况下 top 也没有被其他线程更新, 内循环又是第一次跑, 那么将足以说明当前队列为空, 该为 false.
                        //true 的情况, 向下遍历了几个元素打到了底, 未进入 46 10 这三种要重开启一轮外循环的情况, 也没找到 task.
                        // 不管怎样, 发现空任务就返回.
                        return s + 1 == top;// 比预期短, 第一个或第 n 个出现了空值, 但循环条件未 false
                    else if (t == task) {
                        // 找到的任务 t 不是空, 且是目标任务.
                        boolean removed = false;
                        if (s + 1 == top) {
                        //4. 发现是首轮内循环,s+1==top 成立, 进行 pop 操作, 将 task 弹出并将 top 减一.
                        // 显然,task 是最顶任务, 可以用 pop 方式, 将它置空.
                            if (U.compareAndSwapObject(a, j, task, null)) {U.putOrderedInt(this, QTOP, s);
                                //5. 置 removed 为 true.
                                removed = true;
                            }
                        }
                        //6. 不是首轮循环, 而且 base 没有在处理期间发生改变.
                        else if (base == b) 
                            //7. 尝试将 task 替换成一个 EmptyTask 实例. 成功则 removed 是 true,
                            // 这样虽然该任务出了队, 但在队上还有一个空的任务, 而不会出现前面担心的中间 null
                            // 的情况, 也不改变 top 或 base 的值.
                            removed = U.compareAndSwapObject(a, j, task, new EmptyTask());
                        if (removed)
                            //8. 只要任务成功出队(不论是 4 还是 7, 则执行.
                            task.doExec();
                        //9. 只要找到任务, 退出内循环, 回到外循环重置相应的条件.
                        break;
                    }
                    //10. 本轮内循环没找到匹配 task 的任务.
                    else if (t.status < 0 && s + 1 == top) {// 官方注释是取消.
                        //11. 若 t 是完成的任务且是首轮内循环且 top 未变动, 将该任务出队并令 top 减一.
                        if (U.compareAndSwapObject(a, j, t, null))
                            U.putOrderedInt(this, QTOP, s);
                        //12. 只要进入此分支就退出内循环.
                        break; 
                    }
                    if (--n == 0)
                    //13. 内循环每执行到此一次, 就说明有一次没找到目标任务, 减少 n(开始时的 base top 差值). 达 0 时返回 false 停止循环.
                    // 即每个内循环都只能执行 n 次, 进入外循环时重置 n.
                        return false;
                }
                //14. 结束了任何一轮内循环时, 发现目标 task 已经完成, 则停止外循环返回 false.
                if (task.status < 0)
                    return false;
            }
        }
        //15.task 参数传空, 或者当前 WorkQueue 没有任务, 直接返回 true.
        return true;
    }
    // 简单梳理一下 tryRemoveAndExec 的执行流程和生命周期.
    //a. 显然, 一上来就判队列的空和参数的空, 如果第一个 if 都进不去, 按约定返回 true.
    //b. 经过 1 初始化一个内层循环, 并初始化了 n, 它决定内循环最多跑 n 次, 如果内循环一直不 break(9 找到任务或 12 发现顶部任务是完成态), 也假定一般碰不到 14(发现目标任务完成了)
    // 也没有出现几种 return(3 查出 null,14 某轮内循环目标 task 发现被完成了), 那么最终只会耗尽次数, 遍历到底, 在 13 处 return false(确定此轮循环 task 不在队列)
    //c. 如果出现了几种 break(9,12),9 其实代表查到任务,12 代表顶部任务已完成(官方说取消), 那就会停止内循环, 重新开启一轮外循环, 初始化 n, 继续从新的 top 到 base 遍历(b).
    // 但此时, 可能找不到 task 了(它已经在上一轮内循环出队或被替换成代理), 但也可能实际上未出队(该 task 不是 top, 即 4,base 也发生了改变造成 7 未执行), 那么可能在本轮循环
    // 找到任务, 在 b 中进入相应的 break, 并且成功移除并会进入 d, 也可能没进入 break 而是再重复一次 b.
    //d. 如果某一次 break 成功删除了任务, 那么外循环更新了 n,base,top, 重启了一次内循环, 但是所有找到 task 的分支不会再有了, 如果接下来不再碰到被完成 (取消) 的顶部任务 11-12,
    // 同样也没发现目标 task 完成了(不进 14), 那么最终的结果就是 n 次内循环后 n 降低到 0, 直接 return false.
    //e. 从 b - d 任何一次内循环在最后发现了 task 结束, 立即返回 false. 否则, 它可能在某一次内循环中弹出并执行了该任务, 却可能一直在等待它完成, 因此这个机制可以让等待 task 完成前,
    // 帮助当前 WorkQueue 清理顶部无效任务等操作.

    
    // 此方法适用于不论共享或者独有的模式, 只在 helpComplete 时使用.
    // 它会弹出和 task 相同的 CountedCompleter, 在前一节讲解 CountedCompleter 时已介绍过此方法.
    // 父 Completer 仅能在栈链上找到它的父和祖先 completer 并帮助减挂起任务数或完成 root, 但在此处
    // 它可以帮助栈链上的前置(子任务), 前提是要 popCC 弹出.
    final CountedCompleter<?> popCC(CountedCompleter<?> task, int mode) {int s; ForkJoinTask<?>[] a; Object o;
        // 当前队列有元素.
        if (base - (s = top) < 0 && (a = array) != null) {
            // 老逻辑从顶部确定 j.
            long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
            if ((o = U.getObjectVolatile(a, j)) != null &&
                (o instanceof CountedCompleter)) {
                // 当前队列中存在类型为 CountedCompleter 的元素. 对该 completer 栈链开启一个循环.
                CountedCompleter<?> t = (CountedCompleter<?>)o;
                for (CountedCompleter<?> r = t;;) {
                    // 对该 CountedCompleter 及它的 completer 栈元素进行遍历, 每一个遍历到的临时存放 r.
                    // 找到 r ==task, 说明有一个 completer 位于 task 的执行路径.
                    if (r == task) {
                        //mode 小于 0, 这个 mode 其实有误解性, 它的调用者其实是将一个 WorkQueue 的 config 传给了这个 mode.
                        // 而 config 只有两处初始化, 一是将线程注册到池的时候, 初始化 WorkQueue,
                        // 二是外部提交的任务, 使用 externalSubmit 时新建的 WorkQueue,config 会是负值且没有 owner.
                        // 它也说明是共享队列, 需要有锁定机制..
                        if (mode < 0) { 
                            // 另一个字段 qlock 派上了用场, 将它置为 1 表示加锁.
                            if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
                                // 加锁成功, 在 top 和 array 这过程中未发生变动的情况下, 尝试
                                // 将 t 出队, 此时 t 是栈顶上的元素, 它的 completer 栈链前方有 task.
                                if (top == s && array == a &&
                                    U.compareAndSwapObject(a, j, t, null)) {U.putOrderedInt(this, QTOP, s - 1);
                                    U.putOrderedInt(this, QLOCK, 0);
                                    return t;
                                }
                                // 不论出队成功还是失败, 解锁.
                                U.compareAndSwapInt(this, QLOCK, 1, 0);
                            }
                        }
                        // 非共享队列, 直接将 t 出列.
                        else if (U.compareAndSwapObject(a, j, t, null)) {U.putOrderedInt(this, QTOP, s - 1);
                            return t;
                        }
                        // 只要找到, 哪怕两处 cas 出现不成功的情况, 也是竞态失败,break 终止循环.
                        break;
                    }
                    // r 不等于 task, 找出 r 的父并开始下轮循环, 直到 root 或找到 task 为止.
                    else if ((r = r.completer) == null) // try parent
                        break;
                }
            }
        }
        // 空队列, 顶部不是 Completer 或者不是 task 的子任务, 返回 null.
        return null;
    }

   
    // 尝试在无竞态下偷取此 WorkQueue 中与给定 task 处于同一个 completer 栈链上的任务并运行它,
    // 若不成功, 返回一个校验合 / 控制信号给调用它的 helpComplete 方法.
    // 返回规则, 成功偷取则返回 1; 返回 2 代表可重试(被其他小偷击败), 如果队列非空但未找到匹配 task, 返回 -1,
    // 其他情况返回一个强制负的基准索引.
    final int pollAndExecCC(CountedCompleter<?> task) {int b, h; ForkJoinTask<?>[] a; Object o;
        if ((b = base) - top >= 0 || (a = array) == null)
            // 空队列, 与最小整数 (负值) 取或作为信号 h
            h = b | Integer.MIN_VALUE;  // to sense movement on re-poll
        else {
            // 从底部取索引 j
            long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
            // 用该索引取 task 取出 null, 说明被捷足先登了, 信号置为可重试.
            if ((o = U.getObjectVolatile(a, j)) == null)
                h = 2;                  // retryable
            // 取出的非空任务类型不是 CountedCompleter. 说明不匹配, 信号 -1
            else if (!(o instanceof CountedCompleter))
                h = -1;                 // unmatchable
            else {
                // 是 CountedCompleter 类型
                CountedCompleter<?> t = (CountedCompleter<?>)o;
                for (CountedCompleter<?> r = t;;) {
                    // 基本同上个方法的逻辑, 只是上个方法 t 取的是 top, 这里取 base.
                    // r 从 t 开始找它的父, 直到它本身或它的父等于 task. 将它从底端出队.
                    if (r == task) {
                        if (base == b &&
                            U.compareAndSwapObject(a, j, t, null)) {
                            // 出队成功, 因为我们找的是 base, 且竞态成功, 直接更新 base 即可.
                            base = b + 1;
                            // 出队后执行该出队的任务. 返回 1 代表成功.
                            t.doExec();
                            h = 1;      // success
                        }
                        //base 被其他线程修改了, 或者 cas 竞态失败.(其实是一个情况), 信号 2, 可以从新的 base 开始重试.
                        else
                            h = 2;      // lost CAS
                        // 只要找到 task 的子任务就 break, 返回竞态成功或可重试的信号.
                        break;
                    }
                    // 迭代函数, 当前 r 不是 task, 将 r 指向它的父, 直到某一个 r 的父是 task 或者是 null 进入 else if.
                    else if ((r = r.completer) == null) {
                        // 能够进来, 说明 r 已经指向了 root, 却没有找到整条链上有这个 task, 返回信号为未匹配到.
                        h = -1;         // unmatched
                        break;
                    }
                }
            }
        }
        return h;
    }

    // 如果当前线程拥有此队列且明显未被锁定, 返回 true.
    final boolean isApparentlyUnblocked() {
        Thread wt; Thread.State s;
        // 前面提过的 scanState 会在一上来 runTask 时和 1 的反码取与运算, 直到运行完任务才会反向运算.
        // 这个过程,scanState 的最后一位会置 0, 但这与此判断条件关系不大.
        // 前面对 scanState 有所注释, 小于 0 代表不活跃.
        return (scanState >= 0 &&
                // 队列处于活跃态且当前线程的状态不是阻塞, 不是等待, 不是定时等待, 则返回 true.
                (wt = owner) != null &&
                (s = wt.getState()) != Thread.State.BLOCKED &&
                s != Thread.State.WAITING &&
                s != Thread.State.TIMED_WAITING);
    }

    
}

到此终于 WorkQueue 内部类的代码告一段落.

这一段介绍了 WorkQueue 的内部实现机制, 以及与上一节有关的提到的 CountedCompleter 在帮助子任务时处于 WorkQueue 的实现细节(似乎默认情况下即 asnycMode 传 true 时只会从当前工作线程队列取顶部元素, 从其他随机队列的底部开取, 有可能可以重复取, 具体细节到 ForkJoinPool 的 helpComplete 相关源码再说), 以及构建好的 WorkQueue 会有哪些可能的状态和相应的字段, 以及若干模式(同步异步或者 LIFO,FIFO 等), 出队入队的操作, 还提出了队列中元素为什么中间不能为空, 如果出现要将中间元素出队怎么办? 别忘了答案是换成一个 EmptyTask.

不妨小结一下 WorkQueue 的大致结构.

1. 它规避了伪共享.

2. 它用 scanState 表示运行状态, 版本号, 小于 0 代表不活跃维护了忙碌标记, 也用 scanState 在 runTask 入口开始运行任务时标记为忙碌(偶数), 结束后再取消忙碌状态(奇数). 注释解释奇数代表正在扫描, 但从代码语义上看正好相反

3. 它维护了一个可以扩容的数组, 也维护了足够大的 top 和 base,[base,top)或许可以形象地表示它的集合,pop 是从 top- 1 开始,poll 从 base 开始, 当任务压入队成功后, 检查若 top-base 达到了数组长度, 也就是集合 [base,top) 的元素数达到或者超过了队列数组长度, 将对数组进行扩容, 因使用数组长度 - 1 与哈希值的方式, 扩容前后原数组元素索引不变, 新压入队列的元素将在此基础上无缝添加, 因此扩容也规避了出现中间任务 null 的情况. 初始容量在 runWorker 时分配.

4. 它维护了偷取任务的记录和个数, 并在溢出等情况及时累加给池. 它也维护了阻塞者线程和主人线程.

5. 它可能没有主人线程(共享队列), 或有主人线程(非共享, 注册入池时生成)

6. 它维护了队列锁 qlock, 但目前仅在 popCC 且当前为共享队列情况下使用, 保证争抢的同步.

7. 其他一些字段如 config,currentJoin,hint,parker 等, 需要在后续的线程池自身代码中结合前面的源码继续了解, 包含 stackPred 据说保持前一个池栈的运行信号.

WorkQueue 本质也是一个内部类, 它虽然定义了一系列实现, 但这些实现方法的调度还是由 ForkJoinPool 来实现, 所以我们还是要回归到 ForkJoinPool 自身的方法和公有 api 上, 遇到使用上面 WorkQueue 定义好的工具方法时, 我们再来回顾.

前面已经看了一些影响 WorkQueue 的位于 ForkJoinPool 的常量, 再来继续看其他的 ForkJoinPool 中的一些常量.

// 默认线程工厂. 前面提过两个实现
public static final ForkJoinWorkerThreadFactory
    defaultForkJoinWorkerThreadFactory;

// 是否允许启动者在方法中杀死线程的许可, 我们忽略这方面的内容.
private static final RuntimePermission modifyThreadPermission;

// 静态的 common 池
static final ForkJoinPool common;
common 池的并行度.
static final int commonParallelism;

//tryComensate 方法中对构造备用线程的创造.
private static int commonMaxSpares;

// 池顺序号, 创建工作线程会拼接在名称上.
private static int poolNumberSequence;

// 同步方法同, 递增的池 id.
private static final synchronized int nextPoolId() {return ++poolNumberSequence;}

// 以下为一些静态配置常量.

//IDLE_TIMEOUT 代表了一个初始的纳秒单位的超时时间, 默认为 2s, 它用于线程触发静止停顿以等待新的任务.
// 一旦超过了这个 时长, 线程将会尝试收缩 worker 数量. 为了避免某些如长 gc 等停顿的影响, 这个值应该足够大
private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec

// 为应对定时器下冲设置的空闲超时容忍度.
private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L;  // 20ms

// 它是 commonMaxSpares 静态初始化时的初值, 这个值远超普通的需要, 但距离
//MAX_CAP 和一般的操作系统线程限制要差很远, 这也使得 jvm 能够在资源耗尽前
// 捕获资源的滥用.
private static final int DEFAULT_COMMON_MAX_SPARES = 256;

// 在 block 之前自旋等待的次数, 它在 awaitRunStateLock 方法和 awaitWork 方法中使用,
// 但它事实上是 0, 因此这两个方法其实在用随机的自旋次数, 设置为 0 也减少了 cpu 的使用.
// 如果将它的值改为大于 0 的值, 那么必须设置为 2 的幂, 至少 4. 这个值设置达到 2048 已经可以
// 耗费一般上下文切换时间的一小部分.
private static final int SPINS  = 0;

// 种子生成器的默认增量. 注册新 worker 时详述.
private static final int SEED_INCREMENT = 0x9e3779b9;

上面都是一些常量的声明定义, 下面看一些与线程池 config 和 ctl 有关的常量, 以及前面构造器提过的变量.

// 高低位
private static final long SP_MASK    = 0xffffffffL;//long 型低 32 位.
private static final long UC_MASK    = ~SP_MASK;//long 型高 32 位.

// 活跃数.
private static final int  AC_SHIFT   = 48;// 移位偏移量, 如左移到 49 位开始.
private static final long AC_UNIT    = 0x0001L << AC_SHIFT;//1<<48 代表一个活跃数单位.
private static final long AC_MASK    = 0xffffL << AC_SHIFT;//long 型高 16 位(49-64)

// 总数量
private static final int  TC_SHIFT   = 32;// 移位偏移量,33 位开始.
private static final long TC_UNIT    = 0x0001L << TC_SHIFT;//1<<32 代表一个总数量
private static final long TC_MASK    = 0xffffL << TC_SHIFT;//33-48 位
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // 第 48 位

// 与运行状态有关的位, 显然后面的 runState 是个 int 型, 这些移位数也明显是 int 的范围.
//SHUTDOWN 显然一定是负值, 其他值也都是 2 的幂.
private static final int  RSLOCK     = 1;//run state 锁, 简单来说就是奇偶位.
private static final int  RSIGNAL    = 1 << 1;//2 运行状态的唤醒.
private static final int  STARTED    = 1 << 2;//4, 启动
private static final int  STOP       = 1 << 29;//30 位, 代表停.
private static final int  TERMINATED = 1 << 30;//31 位代表终止.
private static final int  SHUTDOWN   = 1 << 31;//32 位代表关闭.

// 实例字段.
volatile long ctl;                   // 代表池的主要控制信号,long 型
volatile int runState;               // 可以锁的运行状态
final int config;                    // 同时保存了并行度和模式(开篇的构造函数)
int indexSeed;                       // 索引种子, 生成 worker 的索引
volatile WorkQueue[] workQueues;     // 工作队列的注册数组.
final ForkJoinWorkerThreadFactory factory;// 线程工厂
final UncaughtExceptionHandler ueh;  // 每一个 worker 线程的未捕获异常处理器.
final String workerNamePrefix;       // 工作线程名称前缀.
volatile AtomicLong stealCounter;    // 代表偷取任务数量, 前面提过, 官方注释说也用作同步监视器

仅仅看这些字段的简单描述是无法彻底搞清楚它们的含义的, 还是要到应用的代码来看, 我们继续向下看 ForkJoinPool 中的一些方法.

// 尝试对当前的 runState 加锁标志位, 并返回一个 runState, 这个 runState 可能是原值 (无竞态) 或新值(竞态且成功).
// 不太准确的语言可以说是 "锁住"runState 这个字段, 其实不是, 从代码上下文看,
// 该标志位被设置为 1 的期间, 尝试去 lock 的线程可以去更改 runState 的其他位, 比如信号位.
// 而 lockRunState 成功的线程则是紧接着去更改 ctl 控制信号, 工作队列等运行时数据, 故可以称 runState 在锁标志这一块
// 可以理解为运行状态锁.
private int lockRunState() {
    int rs;
    //runState 已经是奇数, 表示已经锁上了,awaitRunState
    return ((((rs = runState) & RSLOCK) != 0 ||
            // 发现原本没锁, 尝试将原 rs 置为 rs+1, 即变为奇数.
             !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ?
            // 原来锁了或者尝试竞态加锁不成功, 等待加锁成功, 否则直接返回 rs.
            awaitRunStateLock() : rs);
}
// 自旋或阻塞等待 runstate 锁可用, 这与上面的 runState 字段有关. 也是上一个方法的自旋 + 阻塞实现.
private int awaitRunStateLock() {
    Object lock;
    boolean wasInterrupted = false;
    for (int spins = SPINS, r = 0, rs, ns;;) {
        // 每轮循环重读 rs.
        if (((rs = runState) & RSLOCK) == 0) {//1. 发现 rs 还是偶数, 尝试将它置为奇数.(锁)
            if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) {
                //2, 锁成功后发现扰动了, 则扰动当前线程,catch 住不符合安全策略的情况.
                if (wasInterrupted) {
                    try {
                        //2.1 扰动. 它将影响到后面 awaitWork 方法的使用.
                        Thread.currentThread().interrupt();
                    } catch (SecurityException ignore) {}}
                //2.2 返回的是新的 runStatus, 相当于原 +1, 是个奇数.
                // 注意, 此方法中只有此处一个出口, 也就是说必须要锁到结果.
                return ns;
            }
        }
        // 在 1 中发现被锁了或者 2 处争锁竞态失败.
        else if (r == 0)
            //3. 所有循环中只会执行一次, 如果简单去看,nextSecondarySeed 是一个生成
            // 伪随机数的代码, 它不会返回 0 值.r 的初值是 0.
            r = ThreadLocalRandom.nextSecondarySeed();
        else if (spins > 0) {
            //4. 有自旋次数, 则将 r 的值进行一些转换并开启下轮循环. 默认 spins 是 0, 不会有自旋次数.
            // 从源码来看, 自旋的唯一作用就是改变 r 的值, 使之可能重新进入 3, 也会根据 r 的结果决定是否减
            // 少一次自旋.
            // r 的算法, 将当前 r 的后 6 位保留, 用 r 的后 26 位与前 26 位异或被保存为 r 的前 26 位(a).
            // 再将 (a) 的结果处理,r 的前 21 位保持不变, 后 11 位与前 11 位异或并保存为 r 的后 11 位(b).
            // 再将 (b) 的结果处理,r 的后 7 位保持不变, 用前 25 位与后 25 位异或并保存为 r 的前 25 位(c)
            // 个中数学原理, 有兴趣的研究一下吧.
            // 显然, 自旋次数并不是循环次数, 它只能决定进入 6 中锁代码块前要运行至少几轮循环.
            r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift
            if (r >= 0)
                // 经过上面的折腾 r 还不小于 0 的, 减少一个自旋次数.
                // 自旋次数不是每次循环都减一, 但减到 0 之后不代表方法停止循环, 而是进入 2(成功)或者 6(阻塞).
                --spins;
        }
        // 某一次循环,r 不为 0, 不能进入 3, 自旋次数也不剩余, 不能进入 4. 则到此.
        else if ((rs & STARTED) == 0 || (lock = stealCounter) == null)
            //5. 线程池的 runState 表示还未开启, 或者还未初始化偷锁(stealCounter), 说明
            // 还没完成初始化, 此处是初始化时的竞态, 直接让出当前线程的执行权. 等到重新获取执行权时,
            // 重新循环, 读取新的 runState 并进行.
            Thread.yield();   // initialization race
        else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {// 可重入
            //6. 没能对 runState 加锁, 也不是 5 中的初始化时竞态的情况, 尝试加上信号位, 以 stealCounter 进行加锁.
            // 显然, 这种加信号位的加法不会因为信号位而失败, 而会因为 runState 的其他字段比如锁标识位失败, 这时
            // 重新开始循环即可.
            synchronized (lock) {
                // 明显的 double check
                if ((runState & RSIGNAL) != 0) {
                    //6.1 当前 pool 的 runState 有信号位的值, 说明没有线程去释放信号位.
                    try {
                        //6.2runState 期间没有被去除信号位, 等待.
                        lock.wait();} catch (InterruptedException ie) {
                        //6.3 等待过程中发生异常, 且不是记录一个标记, 在 2 处会因它中断当前线程.
                        if (!(Thread.currentThread() instanceof
                              ForkJoinWorkerThread))
                            wasInterrupted = true;
                    }
                }
                else
                    //6.4 当前 runState 没有信号位的值, 说明被释放了, 顺便唤醒等待同步块的线程. 让他们继续转圈.
                    lock.notifyAll();}
        }
    }
}

// 解锁 runState, 前面解释过, 这个锁可以理解为对 runState 的锁标志位进行设定, 而设定成功的结果就是可以改信号量 ctl.
// 它会解锁 runState, 并会用新的 runState 替换.
private void unlockRunState(int oldRunState, int newRunState) {
    // 首先尝试 cas.cas 成功可能会导致上一个方法中进入同步块的线程改走 6.4 唤醒阻塞线程.
    if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) {
        //cas 不成功, 直接强制更改.
        Object lock = stealCounter;
        runState = newRunState;// 这一步可能清除掉信号位. 使上一个方法中已进入同步块的线程改走 6.4
        if (lock != null)
            // 强制更换为新的运行状态后, 唤醒所有等待 lock 的线程.
            synchronized (lock) {lock.notifyAll(); }
    }
}

上面的几个方法是对 runState 字段进行操作的, 并利用了信号位, 锁标识位, 运行状态位.

显然, 虽然可以不精确地说加锁解锁是对 runState 的锁标识位进行设置, 严格来说, 这却是为 ctl/ 工作队列等运行时数据服务的(后面再述), 显然精确说是对运行时数据的修改权限加锁.

同样的, 加锁过程采用自旋 + 阻塞的方式, 整个循环中同时兼容了线程池还在初始化 (处理方式让出执行权), 设定了自旋次数(处理方式, 随机数判断要不要减少自旋次数, 自旋次数降 0 前不会阻塞) 这两种情况, 也顺便在阻塞被扰动的情况下暂时忽略扰动, 只在成功设置锁标识位后顺手负责扰动当前线程.

简单剥离这三种情况, 加锁过程是一轮轮的循环, 会尝试设置锁标识位, 成功则返回新标识, 不成功则去设置信号位(可能已经有其他线程设置过了), 设置信号位也是使用原子更改, 即使其他线程设置过信号位, 原子更改也会成功, 唯一能造成失败的是 runState 的其他位发生变更, 而且很可能是因为锁标识位被释放的原因.

unlock 操作也并不复杂, 如果传入的 newRunState 参数依旧代表了锁, 那么不会有任何效果. 这里只考虑有效果的情况, 即取消了锁(注释说必须取消锁标识位,unlockRunState 的方法确实都这么做的), 若没有其他线程竞态修改 runState, 则解锁直接通过一个 cas 成功, 也不需要去唤醒其他线程. 否则在解锁操作尝试去用 cas 释放锁标识位的时候没有成功, 说明在 unlock 操作的线程读取到 runState 后又有其他线程对它进行了更改, 那么直接暴力重置为 newState 并唤醒阻塞线程.

这似乎存在一个 bug. 如果加锁时出现了竞态, 若干个加锁线程被阻塞, 此时信号位和锁位都有值, 过了一会, 有一个线程去解锁, 在解锁前, 它先读取了现在的 runState, 然后用 cas 去修改, 因为此时没有竞态, 解锁线程在读取到 runState 到 cas 期间, 没有任何线程去改过 runState, 那么解锁线程直接就会 cas 成功, 而不会去唤醒前面阻塞的线程.

不过还有一个解锁的地方, 去尝试加锁的线程, 在同步代码块内发现 runState 的信号位被释放, 就立即唤醒所有阻塞的线程, 但如果此后没有新的加锁线程进入呢? 或者没有人将 runState 的信号位取消呢?

于是作者仔细查看了 runState 的设置, 没有任何一个地方显式地将信号位释放, 因此一度判断在 awaitRunStateLock 方法中不会有任何线程去唤醒其他等待加锁的线程.

所以在上述方法内部这个底层的层面是不能解决线程锁死的. 但在加锁 lockRunState 和解锁 unlockRunState 两个方法的调用者处来看, 每一个线程都是先加锁后解锁, 并且在加锁后记录当时的 rs, 解锁时尝试用该 rs 去 cas, 若能成功, 说明自己加锁到解锁期间没有任何线程尝试加锁 (尝试加锁不成会修改 runState 导致 cas 失败). 因此直接 cas 释放就行了, 但如果发生了其他线程在它解锁前的加锁阻塞, 则前一个线程在解锁时会 cas 失败, 因此将强制转换为自己加锁时记录的 rs 去除锁标识位的结果(思考: 这个结果会不会包含信号位?) 并唤醒等待线程.

涉及到信号位的情况有明显的逻辑暗坑, 作者在注释中提出大量问题(比如是不是一旦设置了信号位就不会取消, 确实代码中没有明显的信号位取消代码), 如果不看接下来的这一段, 很可能会认为道格大神终于写出了 bug. 这一块要费大脑分析, 前面说过, 没有看到任何地方去明确释放信号位, 比如 runState=rs&~RSIGNAL 或者相应的 unsafe 机制. 而且这个机制的安全性不能自我保证, 它取决于调用者的实现, 此处还未讲到相关代码, 鉴于读者可能与作者有同样的急于寻得这个问题答案的心情, 我们先不去罗列更冗长的加解锁使用代码, 而先用语言来形容这个过程, 原来这也是一件实现很巧妙的事:

开始状态, 假定有多个线程去尝试加锁, 操作完再解锁, 我们来分析一波, 看看信号位是否会重置.

1. 首先, 第一个线程毫无疑问会读取到原来的 runState, 并把它赋给一个临时变量 rs(有关代码后面会贴), 然后在一系列操作后, 将 rs 去除锁标识位(哪怕它根本没置过锁标识位), 作为 newState 调用 unlock.

2. 如果在第一个线程加锁到解锁期间,rs 从未变过, 那么该线程会直接 cas 成功.

3. 如果第一个线程加锁到解锁期间, 有 n 个线程尝试加锁, 那么他们会阻塞在 lockRunState 方法的同步块(忽略前面提到的自旋等操作), 并且会更改 runState 到新的值(加信号位), 此时第一个线程记录的 rs 会成为脏数据. 所以当第一个线程去解锁时(因为作者看到所有加解锁都是成对出现的, 不会有后续的线程在第一个线程解锁前去尝试解锁),cas 会失败, 它会将自己原来脏数据记录的 rs 去除锁标识位后强制设置为 runState 的新值, 然后唤醒阻塞的线程. 因为它没有阻塞过, 设置原来的 rs 不会包含信号位, 相当于清除了阻塞线程设置的信号位.

4. 因为刚才 cas 失败, 阻塞的线程在同步块中被唤醒, 阻塞在 synchronized 外的线程进代码块 double check 发现 rs 中已经没有信号位了, 也帮助唤醒其他线程.

5. 阻塞的线程被唤醒后 (4 中已经进入 if 后的 wait 线程) 开启了新的循环, 有的竞态成功并返回了 ns(它与第一个线程返回值的差距是锁标识位是 1, 但它同样没有信号位), 其他线程再次竞态失败阻塞, 并更改了 runState(加上了标识位).

6. 此时可能有 (4) 被唤醒的某些线程 (4 中 lockRunState 进同步块判断) 发现 runState 还是有信号位的(因为早于它唤醒的至少两个线程, 一个成功, 一个失败再次设置信号位), 它进入 if 并 wait.

7. 不论后方阻塞了多少个线程,5 中成功的线程再次尝试使用自己在 lock 成功时返回的 ns(rs)去除锁标识位去 cas 掉当前 runState, 因为后面有竞态, 显然它也会失败, 那么强制使用自己前面保存的, 干净地未受信号位污染的 rs 在去除锁标识位后替换了已经被阻塞者设置的 runState. 然后再次进入唤醒操作.

显然, 加锁解锁必须成对出现, 仅有加了锁成功的线程才可以解锁, 自己加锁自己解, 只解自己的锁. 万古定律在此也要坚持.

道格大神逻辑强大, 作者却不善描述, 寥寥几行代码, 区区一个信号位问题, 却用了上面一整大段来论述. 感叹一下差距.

接下来终于可以看工作线程的创建, 注册入池和从池中卸载的过程.


// 创建工作线程的代码. 尝试工作线程的构建和开启, 它假定已经有别处维护了预留的增加总数,
// 创建和启动过程中出现任何异常, 就执行工作线程的卸载. 创建线程成功返回 true.
private boolean createWorker() {
    // 前面构造器传入的 factory.
    ForkJoinWorkerThreadFactory fac = factory;
    Throwable ex = null;
    ForkJoinWorkerThread wt = null;
    try {
        // 创建线程成功
        if (fac != null && (wt = fac.newThread(this)) != null) {
            // 启动该线程.
            wt.start();
            // 启动也成功, 返回 true
            return true;
        }
    } catch (Throwable rex) {
        // 出现异常, 保存
        ex = rex;
    }
    // 用前面的异常卸载.
    deregisterWorker(wt, ex);
    // 返回 false.
    return false;
}

//createWorker 的调用者 tryAddWorker
// 尝试添加一个 worker, 并在完成前增加 ctl 里面记录的数量(还记得前面的 AC 参数吗?), 但这个
// 增加过程是否进行决定于 createWorker 返回的 true 还是 false. 参数 c 是一个进入控制信号 ctl.
// 它的总计数为负且没有空闲 worker,cas(增加 ctl)失败时, 若 ctl 未改变, 则可以刷新重试.
// 否则说明被添加了一个 worker, 那么它也就不需要再继续了. 从方法的实现上看,c 似乎可以传任何值,
// 如果 c 传入的值不等于当前 ctl, 则会多一次循环重读 ctl 到 c.
private void tryAddWorker(long c) {
    boolean add = false;//1. 标记 add 成功与否.
    do {
        //2. 根据参数 c 生成一个新的 ctrl/nc.c 源自参数或者某一次循环读取的 ctl.
        //nc 的值计算结果:c 加上一个活跃单位 1 <<48, 并对结果保留前 16 位,
        // c 加上一个总数单位 1 <<32, 并对结果保留第二个 16 位(33 到 48 位),
        //nc 等于上两步的结果和. 显然,nc 的后 32 位全部是 0.
        long nc = ((AC_MASK & (c + AC_UNIT)) |
                   (TC_MASK & (c + TC_UNIT)));
        //3. 上述计算过程中 ctl 未改变.
        if (ctl == c) {
            //4. 阻塞加锁并判断是否已在终止.
            int rs, stop;                 // check if terminating
            if ((stop = (rs = lockRunState()) & STOP) == 0)
                //5. 加锁成功且未终止, 尝试 cas 掉 ctl.
                add = U.compareAndSwapLong(this, CTL, c, nc);
            //6. 加锁成功, 不论 cas ctl 是否成功, 解锁.
            unlockRunState(rs, rs & ~RSLOCK);
            //7. 如果已 stop,break 退出添加 worker 的步骤.
            if (stop != 0)
                break;
            if (add) {
                //8. 加锁成功,cas 也成功, 线程池未进入终止流程, 创建 worker.
                createWorker();
                // 创建成功立即 break.
                break;
            }
        }
    //9. 在 2 到 3 中间计算 nc 过程中 ctl 改变, 或 4 处未终止且 5 处未成功 cas, 则 c 读取新的 ctl, 并判断是否还要添加 worker(新 ctl 存在 ADD_WORKER 位)且新 ctl 的后 32 位不存在数值.
    // 这时为前 32 位发生了变化, 只能在新一轮循环处理. 注:ADD_WORKER 位是第 48 位, 前面已提到. 它是 TC_MASK 能负责的最高位.
    } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}

前面是尝试创建 worker 和决定创建 worker 的调度流程.

显然,createWorker 没有什么可冗余介绍的, 只需要了解上一文中的工作线程和线程工厂相关知识, 很明显, 它不维护线程数量等规范, 只负责在失败情况下解除创建完线程的注册.

核心的逻辑在 tryAddWorker 中.

显然,tryAddWorker 支持传入非 ctl 的参数 c, 代价是一定会多一次循环. 如果传入的 c 就是方法外读取的 ctl, 且未发生竞态, 效果一是少一轮循环, 效果二是不去判断 while 循环的条件, 即不需要 ctl 具备 ADD_WORKER 标记位且 ctl 的后 32 位无值.

总体流程:

1. 根据传入的 c, 对它加上 active 数单位, 总数单位, 并去除整数位, 得到的结果即 nc.

2. 判断一次 ctl 和 c 是否相等, 不相等可能一是竞态, 可能二是传参错误.

3.ctl 和 c 相等, 阻塞加锁(前面分析过,lockRunState 一定要阻塞到加锁成功为止, 因此本方法不会让加锁失败的线程去释放锁), 前面说过,runState 加锁这一块, 本人暂时描述其锁定的对象为 ” 运行时状态 ”, 可见, 工作线程也是运行时状态之一.

4. 加锁成功且加锁过程中没有开始终止, 尝试将 ctl 用 cas 设置成 nc, 这样相当于同时增加了一个活跃单位和一个总数单位, 并忽略掉整数位.

5. 如果加锁期间 (3) 发生了线程池终止则退出, 如果加锁并设置 nc 成功, 则创建线程.

6. 在 2 判断条件不成立时, 或 4 中 cas 失败, 重初始化 ctl 并验证是否符合循环开始条件, 即 ctl 满足后 32 位为 0 且存在 ADD_WORKER 信号位. 若不满足这个条件则直接退出循环, 终止添加尝试.

显然, 此方法允许传入一个非法的 c, 只要符合循环条件, 它会在白白计算一次 nc 后开始第二轮循环(2 条件失败),while 条件后置, 而 6 处的 while 条件比 2 处要更加严格, 显然, 当传入的 c 就是 ctl 且在计算 nc 期间未发生改变, 则有机会成功 cas 掉旧的 ctl 并创建工作线程, 即使不满足 ADD_WORKER 条件和整数位全 0 的条件. 这是为第一次循环单独开的绿灯, 也是 do while 循环的原因. 那么这个 ” 第一次 ” 的绿灯究意为何而开? 我们稍后分析.

还有 6 中的循环条件的 32 位整数问题, 我们也稍后分析, 循环条件的另一个特殊意义: 除第一次循环参数传入 c 的情况外, 其他若干次循环能够成功添加 worker(突破 2 的验证)的条件是前 32 位不变, 显然这取决于其他线程是否成功改了 ctl 到它计算的 nc(这个操作会同时改变前 16 位和前 17-32 位).

至于 ADD_WORKER 位, 显然涉及线程个数, 到此需要回忆一下最开头列出的构造器. 它是第 48 位, 默认是 1, 显然每添加一个 worker, 会在第 49 位和 33 位加一个 1, 添加到 parallel 个时,ctl 将归 0(重点是 tc 高位将由 1 变 0), 而 48 位也会由 1 变 0.(没有其他方法干扰的情况)

显然,ForkJoinPool 的几种公有构造器最终需要依托一个私有构造函数进行构造, 而它的构造器没有显式调用父类的构造方法, 构造器参数也没有 coreSize 之类的能决定核心线程和最大线程数的有关设置.

是不是可以理解为构造 ForkJoinPool 时传入的并行度值就是线程池的最大线程数量?

不严格说, 可以这样理解. 但我们已经讲完 tryAddWorker 方法, 显然, 只要在方法外读出 c =ctl, 并调用 tryAddWorker(c), 只要此时没有其他线程修改 ctl, 也没有人终止线程池, 显然一定会成功, 随后我再次读取新的 c =ctl, 往复这个过程, 每一次都会成功, 而实际上的线程数早已大于并发度.

到此终于可以简单理解 ADD_WORKER 和 TC_UNIT,AC_UNIT 了.ac 字面意思是活跃线程数,TC 则是总数, 因此用 tc 的最高位 (48) 表示, 当加入的超过总数, 它会溢出.

接下来看线程注册入池的方法.

// 将线程注册入池, 同时返回一个 WorkQueue, 前一篇文章中提到过此方法, 工作线程会在内部记录这个队列.
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
    UncaughtExceptionHandler handler;
    // 设置成守护线程, 这样保证用户线程都已释放的情况下关闭虚拟机.
    wt.setDaemon(true);
    if ((handler = ueh) != null)
        // 构造器提供了异常处理器
        wt.setUncaughtExceptionHandler(handler);
    // 构建工作队列.
    WorkQueue w = new WorkQueue(this, wt);
    int i = 0; 
    // 取出池 config 的 17-32 位保存为 mode,
    // 前面提过,config 在构造时由并行度 (后 15 位) 和模式 (第 17 位) 表示, 根据 17 位是否有值决定 FIFO 或 LIFO.
    // 这个与运算进行后, 相当于滤掉了并行度信息.                                 
    int mode = config & MODE_MASK;
    // 创建完队列之后要加锁, 尤其后面涉及到可能的数组扩容拷贝, 以及一些判断和重设随机数等.
    int rs = lockRunState();
    try {WorkQueue[] ws; int n;      
        // 前面构造器我们看过, 没有初始化 workQueues, 所以如果一个线程此时来注册是被忽略的.
        // 显然, 使用它们的方法一定做了相应的保证. 我们后续再看.              
        if ((ws = workQueues) != null && (n = ws.length) > 0) {
            // 队列非空, 递增种子.
            //indexSeed 值是 0,SEED_INCREMENT 是每次相加的增量, 它的值默认是 0x9e3779b9(2654435769)
            // 这是一个特殊的值, 它的使用不仅此一处, 后面稍微介绍. 这样减少碰撞可能性.
            int s = indexSeed += SEED_INCREMENT;  
            int m = n - 1;//ws 数组长度 -1, 数组长度一定是偶数(后面介绍).
            i = ((s << 1) | 1) & m;// 奇数位 i.
            if (ws[i] != null) {
                // 满足这个条件就是发生碰撞了,i 已被占用. 初始化 probes 为 0
                int probes = 0;
                // 定义步长, 数组长度不大于 4, 步长 2, 否则取 n 一半的第 2 至 16 位的结果 (偶数) 再加上 2 作为步长.
                int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                // 开启循环, 每次对 i 加上步长并与 m 求与运算, 直到无碰撞为止.
                while (ws[i = (i + step) & m] != null) {
                    // 每次循环增加 probes, 表示对同一个数组最多只循环 n 次, 达到次数要进行扩容重试.
                    if (++probes >= n) {
                        // 当前数组已经尝试 n 次, 还没有找到无碰撞点, 扩容数组一倍, 原位置拷贝.
                        // 此处没有任何加锁动作, 与循环之外创建好队列之后的代码共享一个锁, 也是 lockRunState
                        // 可见只有指派索引相关的动作才需要加锁.
                        workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                        // 重置条件.
                        m = n - 1;
                        probes = 0;
                    }
                }
            }
            // s 值保存给队列的 hint 作为随机数种子. 可见, 此处至少可说明每个注册线程时创建的队列都会有不同的 hint, 它也算是一个标识.
            w.hint = s;    
            // 队列的配置,i 与 mode 取或,mode 只能是 0 或 1 <<16 
            // 这个结果是将 mode 可能存放在队列 config 的 17 位, 从而和池中的 config 在模式这一块保持一致.
            // i 一定是一个不大于 m(n-1)的奇数, 而 n 一定不超过后 16 位(后面叙述), 它和 mode 互不影响.  
            // 故队列的 config 相当于同时保存了在池的 workQueues 数组的索引和所属池的 FIFO 或 LIFO.                    
            w.config = i | mode;
            // 初始化 scanState, 以奇数 i(索引)当值. 相当于发布了初始屏障.
            //(不理解? 参考 runState 方法, 一上来就将它的末位置 0 成偶数)
            w.scanState = i; 
            // 新建的队列置于 i 处.
            ws[i] = w;
        }
    } finally {
        // 解锁.
        unlockRunState(rs, rs & ~RSLOCK);
    }
    // 线程名, 这里看到一个有趣的事, 无符号右移,i 一定是个奇数, 假定右移后的值是 j, 则 2 *j=i.
    wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
    // 返回队列给线程.
    return w;
}

现在回过头来梳理一下 registerWorker 方法.

方法本身并不难理解, 代码也较检查, 难处在于要串起来前面所有有关内容. 包含 ForkJoinPool 的构造器, 包含 ForkJoinPool 中的 config 到底是如何存放的, 它进一步影响了新建队列的 config 初始化.

我们总结一下有关信息:

1. 把线程注册入 ForkJoinPool, 首先在整个方法的周期内, 有一些我们不需要重点费时间的杂项. 如守护线程, 异常处理器等设置, 紧接着它立即为线程初始化了一个工作队列.

2. 在 1 完成之前, 不会进行加锁操作, 但后续涉及到将 1 初始化的工作队列加入到 ForkJoinPool 内部维护的工作队列数组 (workQueues) 中, 需要为它计算一个索引, 并且应对可能的扩容操作, 而这些步骤均需要加锁进行.

3. 在加锁状态下, 也能过了池内队列数组的非空验证, 注册方法会尝试使用全局的 indexSeed 递增 SEED_INCREMENT 来确定一个变量 s, 并将 2s+ 1 与数组长度减 1 进行与运算确定索引值. 其中, 每个线程都只会为 indexSeed 加上一次的 SEED_INCREMENT, 这也是为了减少冲突, 而与 workQueues 数组长度 (一定偶数) 减 1 的与运算结果一定会是一个限于后 16 位的奇数, 这说明, 当前线程注册入队时的队列只会放入到 ws 数组的奇索引上, 注册方法最后的代码在对线程取名时, 也将线程的号进行了索引位的无符号右移, 这也侧面说明了这一点.

关于 SEED_INCREMENT, 简单说一下, 最简单的说法就是 just math, 纯粹的数学了, 用它是减少冲突的一种办法, 毫无疑问,s 一定是 n 倍的 SEED_INCREMENT, 而 2s+ 1 与一个奇数进行与运算一定很难冲突, 但是个中数学原理, 作者却是无从理解, 也希望有路过的大牛帮助解释. 另外 SEED_INCREMENT 并非使用一处, 至少在我们曾经数次一瞥的工具 ThreadLocalRandom 中有所使用, 我们曾多次见到使用 ThreadLocalRandom 生成随机数, 比如这段代码.

// 前面看过此方法.
static final int nextSecondarySeed() {
    int r;
    Thread t = Thread.currentThread();
    if ((r = UNSAFE.getInt(t, SECONDARY)) != 0) {
        r ^= r << 13;   // xorshift
        r ^= r >>> 17;
        r ^= r << 5;
    }
    else {
        // 原来 secondary 是 0,localInit 一个值.
        localInit();
        if ((r = (int)UNSAFE.getLong(t, SEED)) == 0)
            r = 1; // avoid zero
    }
    UNSAFE.putInt(t, SECONDARY, r);
    return r;
}
//localInit
static final void localInit() {
    // 生成器是一个 AtomicLong, 从 0 开始, 每次加入 PROBE_INCREMENT.
    int p = probeGenerator.addAndGet(PROBE_INCREMENT);
    int probe = (p == 0) ? 1 : p; // skip 0
    long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
    Thread t = Thread.currentThread();
    UNSAFE.putLong(t, SEED, seed);
    UNSAFE.putInt(t, PROBE, probe);
}
// 初值
private static final int PROBE_INCREMENT = 0x9e3779b9

显然, 它也出现了一个从 0 开始递加的初值, 用于后续规避冲突, 而这些数是数学中的魔幻,PROBE_INCREMENT 与 SEED_INCREMENT 是同一个值:0x9e3779b9.

作者对这个值十分陌生, 也无法从数学概念上对它做出过深理解, 它为什么能保证 n 倍的结果乘 2 + 1 后可用于在与运算中规避冲突? 只能强行理解了.

顺嘴一提, 高并发下的两大神器:LongAdder 与 DoubleAdder, 它们会用到一个 PROBE, 这也与它有关, 在作者所在公司的分布式主键生成器中, 其实底层也是使用了它. 继续回到注册线程.

4. 当发生碰撞时,i 的递增步 step 也有神奇的算法, 把 n 超过 16 位的那一半拿出来加上 2, 再去除到奇数属性, 就是步长了, 这意味着步长至少是 2(在 n 不超过 16 位的情况, 已经够大了, 步长只有可能是 2), 显然,n 在足够大 (大于 16 位) 前步长恒定为 2, 大到 16 位以上时, 步长会随着 n 变大而变大. 仿佛像中学时代的分段方程, 或许这样描述更好.

step={n<16 位 2;n>16 位, 前 16 位的值 + 2 去除奇偶位}

在计算好步长后, 紧接着 i =((i+step)&(n-1)), 显然,step 大到 16 位以上, 那么它的前 16 位将会在新的索引 i 上有所体现. 这个体现也许有什么美妙的数学价值.

5. 每当出现索引冲突, 依旧重复 34 且记录循环次数, 当循环次数达到 n 时, 因为加了运行状态锁, 线程可以放心地操作 ForkJoinPool 的状态, 故可以扩容. 当不冲突时, 将 i(一个奇数)赋给队列的 scanState, 意味着它初始没有人标记为正在扫描, 将队列的 config 保存 i 和 mode. 最后解锁并返回队列给线程. 建立相互引用.

注册入池的简短方法至此就简单分析完毕了. 它留给我们一些疑问:

1. 为什么注册入池的线程生成的 WorkQueue 只占用 ForkJoinPool 的 workQueues 的奇数位? 这与工作窃取是否有关?

2.scanState 的初始化终于明了, 是一个奇数, 且它也与前面的 SEED_INCREMENT 有关, 它是否在后面是否也发挥了作用?

3. 前面设置的异常处理器的作用?

4. 此处解决碰撞, 计算队列位于数组的索引等都依赖于 workQueues 的初始大小(每次扩一倍), 它的初始大小又是如何确定的? 创建时机? 显然在它未初始化的前提下, 队列入数组会静默入池失败, 但是队列却成功创建并返回给线程了.

这些问题需要暂时记在大脑, 当看到相应的方法时再予解释.

// 解除注册操作. 它是一个要终止的工作线程的最终回调, 或者创建失败时也会回调. 在介绍 ForkJoinWorkerThread 和前面 createWorker 时提过.
// 这会从数组中移除 worker 记录, 调整数量. 如果池已经处在关闭进行中, 尝试帮助完成池的关闭.
// 参数 wt 是工作线程, 构建失败会是 null,ex 是造成失败的异常. 它也可以是 null.
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
    //1. 处理队列从池的队列数组中的移除.
    WorkQueue w = null;
    if (wt != null && (w = wt.workQueue) != null) {
        // 存在工作线程且该工作线程有队列的逻辑.
        WorkQueue[] ws;            
        // 前面说过, 队列的 config 后 16 位表示索引, 第 17 位表示 mode.         
        int idx = w.config & SMASK;
        // 加运行时状态锁.
        int rs = lockRunState();
        if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
            // 简单的置空操作.
            ws[idx] = null;
        // 解锁.
        unlockRunState(rs, rs & ~RSLOCK);
    }
    //2. 处理控制信号中保存的数量.
    long c; 
    // 循环直到减数成功. 哪怕有别的线程在竞态减少, 当前方法也要在新的 ctl 中减少数量.                                    
    do {} while (!U.compareAndSwapLong
                // 第 49 位减 1.
                 (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
                                        // 第 33 位减 1.
                                       (TC_MASK & (c - TC_UNIT)) |
                                        // 保留后 32 位.
                                       (SP_MASK & c))));
    //3. 该工作线程有队列, 且已经在 1 出了数组.
    if (w != null) {
        // 把队列锁设定负数.
        w.qlock = -1;                            
        // 把队列中记录的偷取任务数加到池中. 前面已论述过此方法.
        w.transferStealCount(this);
        // 取消队列中所有存活的任务.
        w.cancelAll();}
    //4. 进入循环尝试帮助关闭池或释放阻塞线程, 补偿线程等.
    for (;;) {WorkQueue[] ws; int m, sp;
        //4.1 tryTerminate 后面介绍, 第一个参数 true 代表无条件立即结束, 第二个参数 true
            // 代表下次 tryTerminate 将可以结束.
        if (tryTerminate(false, false) || w == null || w.array == null ||
            (runState & STOP) != 0 || (ws = workQueues) == null ||
            (m = ws.length - 1) < 0)              
            // 进入 if, 说明正在结束或已结束, 没什么可做的了.
            break;
        //4.2 控制信号后 32 位有数值, 进尝试释放逻辑. 这不是第一次看到 ctl 的后 32 了. 对于 ctl 的前 32 位,
        // 我们已经通过构造函数和前面的代码说明, 它初始化时与并行度有关, 并在后面存放了添加 worker 数量
        // 的值(但不能说存放了并行度, 因为添加 worker 会改变相应的位), 后 32 位的真相也开始浮出水面,
        // 在前面的 tryAddWorker 中, 第二轮及以后的循环条件要求后 32 位不能存在值. 而且添加成功也会
        // 将后 32 位置 0, 故 tryAddWorker 的第一轮循环会清空后 32 位, 与此有所影响.
        if ((sp = (int)(c = ctl)) != 0) { 
            // 后 32 位有值, 尝试 release,tryRealease 方法会将 activeCount 数量添加第三个参数的值,
            // 如果第二个参数代表的队列是空闲 worker 的栈顶, 则释放其内的阻塞者.
            if (tryRelease(c, ws[sp & m], AC_UNIT))
                break;
            // 仅有此处释放失败的情况下, 开启下一轮循环, 其他分支均会退出循环.
        }
        else if (ex != null && (c & ADD_WORKER) != 0L) {
            // 5 此次解除注册是因为异常, 且当前添加 worker 信号依旧满足, 则添加一个 worker 代替原来并退出.
            tryAddWorker(c);                      
            break;
        }
        else 
            // 6. 不需要添加补偿 worker, 退                                    
            break;
    }
    if (ex == null) 
        // 前面记录的异常不存在, 帮助清理脏异常节点.                             
        ForkJoinTask.helpExpungeStaleExceptions();
    else   
        // 存在异常, 重抛.                                      
        ForkJoinTask.rethrow(ex);
}

deregisterWorker 逻辑并不复杂, 把队列移出池, 减少 count, 清理 queue 中的任务, 此处又见到了 WorkQueue 中的另一个属性的使用,qlock. 显然 qlock 值取 - 1 时, 代表队列已经失效了.

队列移除后, 方法还会尝试做一些非本职工作. 如尝试终结线程池, 满足条件则退出循环 (显然每个线程的卸载都尝试触发池的终结); 线程池未进入终结过程, 则尝试释放 parker 的逻辑(如果有), 尝试成功也会退出循环, 此两种情况(tryTerminate 或 tryRelease) 会造成忽略了异常等信息, 只有在两者均未成功的前提下, 前去考虑参数中的异常. 异常的处理逻辑很简单, 存在即重抛.

// 如果当前活跃线程数过少, 尝试去创建或活化一个 worker.
// 参数 ws 是想要找到被唤醒者的队列数组(也就是任何一个 ForkJoinPool 的成员变量),
// 参数 q 是个非空的队列, 则方法只尝试一次, 不会重试.
final void signalWork(WorkQueue[] ws, WorkQueue q) {
    long c; int sp, i; WorkQueue v; Thread p;
    while ((c = ctl) < 0L) { 
        //1. 添加 worker 步骤                      
        //ctl 小于 0, 表示 active 的太少. 但似乎也只能最多加上并行度的数量.
        if ((sp = (int)c) == 0) {
            // 取 ctl 的后 32 位, 终于, 终于看明白了, 这里有一个注释,sp== 0 代表无闲置 worker.
            // 但不代表后 32 位全部与闲置 worker 有关.
            if ((c & ADD_WORKER) != 0L) 
                //ADD_WORKER 位有值, 说明总 worker 数量未达到.
                // 经过三重关, 添加 worker.          
                tryAddWorker(c);
            // 满足添加 worker 的第一个条件, 无闲置 worker, 不论有没有成功创建新的 worker, 就都一定会退出循环.
            break;
        }
        //2. 不存在空闲 worker, 验证不满足唤醒流程的情况.
        if (ws == null)  
            //2.1 队列数组都还没初始化, 显然池不满足条件.                          
            break;
        if (ws.length <= (i = sp & SMASK))// 
            // 2.2 队列数组长度不大于 ctl 后 16 位. 说明已进入终止态. 退出(多像数组的 length 一定要大于索引)
            // 又一次大揭密,ctl 后 16 位似乎与队列数组的长度有关, 而且存放的是一个索引. 
            // 此处隐含条件,ctl 后 32 位不是 0, 将它的后 16 位取出来当索引 i, 要结合 1 处的条件.      
            break;
        if ((v = ws[i]) == null) 
            //2.3 队列数组长度正常, 使用索引 (ctl 的后 15 位) 从 ws 中取不出队列.
            // 说明正在终止, 退出. 
            break;
        //3. 满足了唤醒流程.
        //3.1 计算数据, 第一个为下一个 scanState, 在前面的 addWorker 流程, 我们看到
        //scanState 的第一个值是在队列数组中的索引. 显然索引不能乱变.
        // 新的 scanState 值计算, 老 ctl 的整数位在 17 位加 1(SS_SEQ)再取它的后 31 位. 显然每次被唤醒都会走一次这个逻辑.
        int vs = (sp + SS_SEQ) & ~INACTIVE; 
        int d = sp - v.scanState; // 屏蔽不必要的 cas.   
        //3.2 计算 nc, 它用老的 ctl 加一个活跃位(48 位), 然后只取出前 32 位, 对后 32 位取出队列 v 在上次扫描时存放的值(也是当时 ctl 的后 32 位).
        // 这里我们又见到一个熟人:stackPred, 接下来会有重要的方法使用它.          
        long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
        //3.3d 是 0 说明 ctl 的后 32 位相对于原来 v 中存放的 scanState 没有变化, 那么也就不需要 cas.
        // d 不是 0, 需要 cas, 用 nc 替换掉 c.
        if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
            // 把 v 的 scanDate 置换成 vs, 激活了 v.
            v.scanState = vs;                      
            if ((p = v.parker) != null)
                // 有线程阻塞, 唤醒.
                U.unpark(p);
            // 激活成功, 退出循环.
            break;
        }
        //4. 上述过程没有成功, 看 q 是否提供, 如果提供了不循环第二次.
        if (q != null && q.base == q.top)        
            break;
    }
}

signal 这个方法本身逻辑并不复杂, 重点在于我们的几点意外发现.

1.ctl 的后 16 位原来可以表示非终止状态下线程池中的一个 WorkQueue 的索引.

2.scanState 每次 singal 满足唤醒流程都会被尝试置换 (新值取决于 sp, 会在它的 17 位加 1, 并只取后 31 位.), 前提是此时 ctl 的后 32 位(sp) 与 v 中的 scanState 一致 (差值 d 为 0) 或能够替换 ctl 为新值. 置换新的 scanState 前会根据 d 来决定是否更换 ctl, 若有改变则 cas 掉 ctl, 将它替换成新值(原值增加了一个活跃数, 并将后 32 位置为 v 中上次 scan 保存的 ctl 后 32 位),d 是 0 或 d 不是 0 且置换 ctl 成功, 方才去将 v 的 scanState 换为 vs. 这一步成功, 发现有阻塞线程则唤醒.

3. 参数 q 全程酱油, 唯一作用是信号, 上述过程出现失败, 如 cas 不成功等, 它非空则不循环第二次.

4. 此处也是首次发现只增加活跃数不增加线程总数的情况

此方法的执行只有三种结果: 第一是满足活跃数 / 总数未达到最大, 且无闲置数, 则创建 worker; 第二是前一个不满足, 且未在终止周期, 满足尝试唤醒的条件, 会尝试唤醒一个阻塞线程(它是 ctl 控制信号后 32 位索引去 ws 中取的 w); 第三种情况, 不满足前两个条件, 也不满足进入下个循环的条件, 相当于什么也没做.

下面先来看 tryRelease 方法和 runWorker 方法, 逐个分析未知字段的含义.

// 唤醒并释放 worker v(队列), 如果它处于空闲 worker 栈的顶部, 此方法是当至少有一个空闲 worker
// 时的一个快速唤醒方式.
// 它的参数,c 应当传入此前读取的 ctl,v 是一个工作队列, 如果不传空, 应当传一个 worker 过来,inc 代表活跃数的增加数.
// 如果成功释放, 则返回 true.
private boolean tryRelease(long c, WorkQueue v, long inc) {
    // 类似上面的 signalWork 方法的计算方式,sp 保存 ctl 的后 32 位,vs 为队列 v 的下一个 scanState.
    // 值依旧是 sp 在 17 位加 1 并只取结果的 31 位.
    int sp = (int)c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p;
    // 判断是否满足条件,v 存在且 v 的 scanState 是 sp
    //(言外之意 sp 保存的是一个 v 的 scanState, 别急, 我们离真相越来越近了, 注释说此条件代表 v 是栈顶)
    if (v != null && v.scanState == sp) {
        // 满足了前述的条件,v 是当前 "栈顶", 这个栈顶的含义有些奇怪, 没有栈, 何来栈顶? 别急.
        // 计算新的 ctl, 算法同上, 老 ctl 加上 inc 的结果的前 32 位给 nc 的前 32 位,v 保存的 stackPred 作为 nc 的后 32 位.
        // 在前面 deregisterWorker 中,tryRelease 方法传入的 inc 为一个 AC_UNIT. 相当于增加一个活跃数.
        long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred);
        // 尝试用前面计算的结果更新为新值.
        if (U.compareAndSwapLong(this, CTL, c, nc)) {
            // 控制信号成功更新为 nc, 则将 v 的 scanState 保存为 vs.
            v.scanState = vs;
            if ((p = v.parker) != null)
                // 存在 parker, 唤醒.
                U.unpark(p);
            return true;
        }
    }
    return false;
}

在 deregisterWorker 方法中我们简单提过 tryRelease 方法 (有调用, 在完成), 回忆一下该方法在这一块的逻辑, 在尝试解除 worker 注册时, 会在完成解注册本身操作后尝试 tryTerminate, 如果执行后未进入 terminate 流程, 则进入 tryRelease, 如果抛去中间这些杂在其内的步骤, 可以粗放理解为释放一个 worker 会尝试一次释放, 而且条件是 sp(即 ctl 的后 32 位) 非 0, 代表有 ” 空闲 ” 线程需要释放, 而且会用 sp 作索引调用 tryRelease, 试图释放掉栈点 worker 并增加一个活跃数.

显然,tryRelease 方法有一个重大的不同, 它完全是一个 ”try”, 对于释放操作只尝试一次, 真正重要的操作是否进行完全取决于一个对 ctl 的 cas, 只有它成功, 才会进行 scanState 的更新和线程的唤醒. 因此该方法是一个不需要加锁更改 ctl 的方法.

结合前面总结过的若干方法, 我们再次理解 ctl,scanState 等字段.

1.ctl 的前 32 位与线程池中的线程数有关, 而在这前 32 位中, 前 16 位是活跃数, 后 16 位记录总数.

2.ctl 的后 32 位稍有些复杂, 第 32 位是代表不活跃的字段(INACTIVE& 它不是 0). 同时它的后 16 位又可以代表某个队列在队列数组中的索引(signalWork 方法中尝试取出它的后 16 位并从队列数组中取值), 奇特的是它的后 32 位似乎也可以代表某个队列在队列数组中的索引, 而且这个还应该是 ” 栈顶 ”, 在 deregisterWorker 方法中直接用了它的后 32 位作为索引.

可见,ctl 的真相越来越近了, 但还不够近.

3. 队列的 scanState 也越来越明显了, 它的初值绝对是在当前队列在池中队列数组的索引 (前面 register 方法中直接将 i 给了 v.scanState), 每次的释放的更新则是直接忽略掉 sp 的首位(不活跃位, 即 32 位置 0) 并在 17 位加 1. 那么此时它还能代表索引吗?

关键点在于,scanState 的后 16 位会不会增加, 其实最终的答案将在 externalSubmit 方法中得到一个重要的补充, 你会发现, 线程池中的 workQueues 大小初值绝对不会超过 16 位, 同时在上面的 registWorker 方法中每当出现冲突碰撞, 会尝试对 workQueues 扩容一倍, 而且它并未做出限制, 但是 registerWork 方法需要经 tryAddWorker 方法创建线程, 再由线程对象调用它注册入池, 然后你会发现 tryAddWorker 只能由前面已介绍的 deregistWorker 与 signalWorker 调用, 后者限定它能够成功的前提是均是 sp=(int)c==0, 也就是说, 只有后 ctl 的后 32 位无值才可以添加 worker(添加 worker 时后 32 位直接被无视, 结果直接置 0)前者限定为不满后者的条件时, 必须是异常的情况且满足 ADD_WORKER 位未被重置, 那么它或许有一次机会添加 worker, 一旦失败, 在 tryWorker 的循环第二次将不成功.

绕了这么半天, 还是在上一段的开头, 就是为了说明 scanState 既然初始值是一个只有 16 位的奇数 (add worker 成功时初始化, 非 worker 的队列还未介绍到), 尽管在每次 release 都在 17 位尝试加 1, 目前来看, 并不影响它保存自己的索引(后 16 位), 问题就在于这后 16 位是否足够, 每次添加 worker, 肯定要占用一个新的索引, 而添加 worker 完成并在注册时发生冲突一定次数后会扩容(tryAddworker→registWorker), 前面论述过,externalSubmit 方法将 workQueues 数组初始化一个坚决不大于 short 型长度的数组(后面论述 externalSubmit 会简单再提), 而从 signalWorker 调用 tryAddWorker 时,ctl 的后 32 位必须为 0, 从 deregisterWorker 中虽然允许第一次不判断 int(c)==0, 但它实际上已经卸载了一个 worker. 那么至少从目前来看, 在 tryRelease 和 signalWork 方法中使用 ctl 的后 32 位去计算新的 scanState, 隐含的意思似乎是 ctl 的后 32 位此刻包含了 ” 类 ” 原来的 scanState, 而且新的 scanState 只用 ctl 的后 32 位的首位和 17 位, 潜台词似乎就是此时的 ctl 的后 16 位绝对能表示 v 的索引, 否则我们无法用这个新的 scanState(vs) 去从数组中找到队列.

提示: 一定要结合上面 signalWork 源码注释中的 (2.2 隐藏条件), 综合 tryRelease 的注释要求参数 v 是 ” 栈顶 ”, 调用它的 deregisterWork 方法传入的 v 是 vs 中索引为 sp&(n-1) 所得, 因此大胆的推测已经成熟.

1.ctl 的后 16 位如果有值, 将会是 ” 栈顶 ” 队列在 ws 中的索引. 它被用于释放栈顶资源 (如 parker) 时找到 queue, 以及对 queue 中的 scanState 若干步骤的重新设置, 而重新设置时也不会影响后 16 位. 因此索引在注册依始就不会再变, 扩容也不会改变它的位置.

2.scanState 字面意思是扫描状态,ForkJoinPool 还有 scan 方法未介绍.

那么, 就用后面的代码来验证这个绕脑的推理吧.

//runWorker 方法是线程运行的最顶层方法, 它由 ForkJoinWorkerThread 在注册成功后调用, 也是全部生命周期.
final void runWorker(WorkQueue w) {
    //1. 一上来初始化数组, 前面说过,WorkQueue 内部的任务数组初始化是 null.
    w.growArray();  
    //2. 用 seed 保留构建时的 hint 随机数, 在 registerWorker 方法中曾介绍过,
    // 会有一个随机数 s 是保证每个队列不同的, 且其中有一个每次增加一个值的成份, 该值是个数学中很奇异的数字.
    // 而 hint 的初值即这个 s, 它同时也被用于确定队列在 ws 中的索引, 间接决定是否扩容.        
    int seed = w.hint; 
    //3. 初始化 r, 并避免异或时出现 0.
    int r = (seed == 0) ? 1 : seed;  
    //4. 循环.
    for (ForkJoinTask<?> t;;) {//5. 尝试 "scan"(终于出现了有没有)队列 w, 使用随机数 r
        if ((t = scan(w, r)) != null)
            //6.scan 到了一个任务 t, 则运行它. 这是进入一个任务处理的主流程, 前面已介绍过 WorkQueue 的 runTask 方法.
            // 回忆一下, 它会在过程中把 scanState 标记为忙碌.
            w.runTask(t);
        else if (!awaitWork(w, r))
            //7.scan 不到, 尝试等待任务, 如果等待过一段时间还未等待, 进入 8 重置 r, 继续下轮循环 scan. 若 awaitWork 返回 false 代表应 break 结束 worker.
            // 关于 awaitWork 的返回我们后面详解.
            break;
        //8. 或许只能说 just math.
        r ^= r << 13; r ^= r >>> 17; r ^= r << 5; 
    }
}

runWorker 是线程 run 方法中直接调用的, 进入业务主逻辑, 也结合了前面的 runTask 方法, 初始化数组等方法和将要介绍的与 scan 有关的逻辑, 还再一次用到了那个魔法数字.

传给 scan 方法的 r 每一次循环 (scan 成功并运行, 又 await 到了新的任务) 都会重新赋个新值, 作者看不懂新值的算法, 但在我们即将去了解的 scan 方法中使用了 r 来计算索引, 因此作者更关心它的奇偶性.

很明显,r 不论初始是奇是偶, 新计算的 r 值可以是奇数也可以是偶数. 也就是说, 使用 r &(n-1)取出的 ws 中的一个 WorkQueue, 可能是线程注册时生成的一半之一, 也可能不是.

接下来介绍 scan 方法.

// 尝试扫描队列, 并偷取一个 "顶级" 的任务. 扫描开始于一个随机位置(与 r 有关), 如果在扫描过程中发生了
// 竞态, 则移动到一个随机的位置继续, 否则线性地扫描, 这个过程持续到在所有相同校验和
//(校验和的计算会采样每个队列的 base 索引, 而 base 索引会在每次偷的时候移动)的队列上有两次
// 连续的空传递, 此时 worker 会尝试对队列进行灭活并重新扫描, 如果能找到一个 task, 则尝试重新
// 激活(重新激活可以由别的线程完成), 如果找不到 task, 则返回 null 用于等待任务. 扫描过程应当减少内
// 存使用, 以及与其他正在扫描的线程的冲突.
// 参数 w 为目标队列,r 是前面传递的种子, 返回 task 或 null.
private ForkJoinTask<?> scan(WorkQueue w, int r) {WorkQueue[] ws; int m;
    // 当前工作线程必须是已经完成注册的, 即存在工作队列, 且 r &m 能取得它的队列, 否则直接返回 null.
    if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {//1.scan 方法是在 runWorker 的循环中调用的, 初次调用时,scanState 的值是 i(前面说过), 是个非负值.
        int ss = w.scanState;  
        //scan 方法内部开始循环. 用 r &m, 即 w 的索引给 origin 和 k, 初始化 oldSum 和 checkSum 为 0.                  
        for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
            int b, n; long c;
            //2. 选择队列 q 存在的逻辑.
            if ((q = ws[k]) != null) {
                //2.1 目标队列 q 非空(本身 base 到 top 间至少存在 1 个, 任务数组非空.
                if ((n = (b = q.base) - q.top) < 0 &&
                    (a = q.array) != null) {// 计算任务数组的 base 索引(参考 WorkQueue 源码).
                    long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                    //2.2 数组中取出 base 对应 task 存在,base 未改变的逻辑.
                    if ((t = ((ForkJoinTask<?>)
                              U.getObjectVolatile(a, i))) != null &&
                        q.base == b) {
                        //2.3 初始记录的 scanState 不小于 0, 代表存活的逻辑.
                        if (ss >= 0) {
                            //2.4 尝试 cas 掉 base 处的任务, 注意, 一定只能从 base 开始, 不会将任务数组中间的元素置空.
                            if (U.compareAndSwapObject(a, i, t, null)) {
                                //cas 成功, 更新 base.
                                q.base = b + 1;
                                if (n < -1) 
                                    //2.5 发现队列 q 的 base 到 top 间不止一个任务元素, 则唤醒它可能存在的 parker.
                                    // 重温一下 signalWork 的简要逻辑,ctl 后 32 位 0 且满足加 worker 条件,tryAddWorker,
                                    // 条件不满足 (忽略终止等判断逻辑), 则计算新的 scanState(使用到原 ctl 的后 32 位) 和 ctl(使用原 ctl 的前 32 位和 q 的 stackPred),
                                    // 在 cas 为新的 ctl 成功的前提下, 换掉新的 scanState.
                                    signalWork(ws, q);
                                //2.6 只要 2.4 成功, 返回弹出的任务.
                                return t;
                            }
                        }
                        //2.7 从 scanState 看已经是 inactive 的情况. 尝试活化.
                        else if (oldSum == 0 &&   
                                 w.scanState < 0)
                            //tryRelease 前面已介绍过. 尝试释放掉栈顶, 显然 ws[m&(int)c]被视为栈顶, 即 ctl 的后 32 位 (严格来说似乎是后 16 位) 代表栈顶的索引.
                            // 释放时对 ctl 的增量是一个 AC_UNIT.
                            tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
                    }
                    //2.8 只要没有进入 2.4->2.6, 重置 origin,k,r, 校验和等参数并开启下轮, 但整个 2 工作线用不到, 进入 3 工作线才有用.
                    if (ss < 0)                   // refresh
                        // 可能会有其他抢到同一个队列的 worker 在 2.5/2.7 处重活化了 scanState, 因此当它是 inactive 的情况, 重刷新一次.
                        ss = w.scanState;
                    r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
                    origin = k = r & m;           // move and rescan
                    oldSum = checkSum = 0;
                    continue;
                }
                //2.9 校验和增加 b
                checkSum += b;
            }
            //3. 持续迭代到稳定的逻辑.
            // 这个表达式大概可以理解, 线性的增加 k, 每次加 1, 直到发现已经从一个 origin 转满了一圈或 n 圈.
            if ((k = (k + 1) & m) == origin) {// 条件:scanState 表示活跃, 或者满足当前线程工作队列 w 的 ss 未改变,oldSum 依旧等于最新的 checkSum(校验和未改变)
                if ((ss >= 0 || (ss == (ss = w.scanState))) &&
                    oldSum == (oldSum = checkSum)) {if (ss < 0 || w.qlock < 0)    // already inactive
                    //3.1 满足前面注释的条件, 且 w 已经 inactive, 终止循环, 返回 null.
                        break;
                    //3.2 又是这一段计算和替换的逻辑, 只不过 ns(new scanState)要加上非 active 标记.
                    int ns = ss | INACTIVE;       // try to inactivate
                    //3.3 尝试计算用来替换的 ctl, 它的后 32 位为 ss 加上非活跃标记, 前 32 位减去一个活跃数单元.(终于到这了, 参考前面分析的 ctl 前 32 后 32 位, 验证了)
                    long nc = ((SP_MASK & ns) |
                               (UC_MASK & ((c = ctl) - AC_UNIT)));
                    // 原来 ctl 的后 32 位存给队列的 stackPred.
                    // 注意, 此时 w.stackPred 和新的 ctl 的后 32 位都有一个共性, 那就是它们的后 31 位都可以用来运算并计算得 w 在 ws 的索引.
                    w.stackPred = (int)c;         // hold prev stack top
                    //3.4 先把 w 的 scanState 换成 ns, 再用 cas 换 ctl 为 nc.
                    U.putInt(w, QSCANSTATE, ns);
                    if (U.compareAndSwapLong(this, CTL, c, nc))
                        // 替换 ctl 成功,ss 直接指向 ns, 省去一次 volatile 读.
                        ss = ns;
                    else
                        //3.5 替换失败, 再把 w 的 scanState 设置回 ss.
                        w.scanState = ss;         // back out
                }
                //3.6 每发现回了一轮, 校验和置 0.
                checkSum = 0;
            }
        }
    }
    return null;
}

scan 方法是个比较重要的方法, 结合前面提过的 runWorker, 再次提示 scan 方法是在 runWorker 中循环调用的, 当然每一次都伴随着完成一波任务和等待新任务的到来.

单轮 scan 中也是处在循环的, 没有竞态也找到了非空队列的情况下, 显然会很容易从 base 处出队一个合法的任务, 即从 1 ->2->2.1->2.2->2.3->2.4(->2.5 可选)→2.6, 会返回正常查找的任务.

 进入 2 代表某一轮循环开始时找到了队列, 这之后主要有三条线, 其中一条线是前面说的 1 ->2->2.1->2.2->2.3->2.4(→2.5 可选)→2.6 的正常线.

第二条线,1->2->2.1->2.3->2.4->2.7(在 2.4 的 cas 时发生竞态失败), 或 1 ->2→2.1→2.2→2.7(在 2.2 判断任务为空或者已经有其他线程捷足先登地取走了 base), 最终结果是执行 2.7 的归置操作(出现竞态, 随机改变索引).

第三条线,1->2->2.1->2.9(2.1 处发现目标 WorkQueue 已经是空队列), 则增加一次校验和.

这三条线均可以忽略起点(1), 因为可以在某次循环时从 2 开始.

任何一次循环到达 3 相应的有两处入线, 第一条 1 ->2->3(在 2 处发现 ws 数组的 k 索引处尚且没有队列元素, 这说明连工作线程注册都没有完成; 第二条线是基于前面的第三条线, 即 1 ->2->2.1->2.9→(即 2.1 失败, 前面 2 的步骤, 只要进入到 2.1, 就一定是 return 或者 continue 开启下一轮).

单次循环到达 3 的两条路线的区别是第二种情况是校验和会增加最初获取到该队列时读取的 base 值(2.1).

从到达 3 开始分析, 有两条大分支, 将取 ws 元素的下标 k 进行增 1 操作, 判断是否已经完成了一轮(等于上轮记录的 origin), 未完成一轮, 直接再开启一次循环(因为一开始就没读出 WorkQueue, 所以无竞态, 线性加 1), 从上次记录 origin 到现在已完成一轮的情况进入 3 内的分支.

进入 3, 最后都会将校验和归 0(3.6), 也就是说每查完一整轮就会让校验和复位 0. 可能会根据 scanState 决定是否进入 3.1, 此处也有分线.

线路 1, 若当前队列 q 已经是非存活态(scanState 是负数且这轮循环未更新, 且校验和在本轮循环中未改变), 或发现队列的 qlock 已经被锁定负(前面讲过要灭活), 直接 break, 执行路径 3→3.1.

线路 2, 当 q 是存活态, 因为已经找了一轮, 没有意义再去找了, 将 q 灭活, 相应的计算新的 ctl 和 scanState 的逻辑与前面 tryRelease/signalWork 的方式正好相反,scanState 直接加上灭活标记位并存为 ns, 且 ns 将交给新的 ctl 的后 32 位, 新 ctl 的前 32 位则减去一个活跃单元, 并把 ctl 原来后 32 位的状态存给 q 的 stackPred, 这也就是 ctl 后 32 位能表示当前 ” 栈顶 ” 的原因. 而且这一过程中, 涉及 q 在 ws 中的索引有关的值不受影响, 依旧可以用 ctl 的后 32 位来找到它. 前面说过, 当进行 release 等操作时, 可以将栈顶 (就是用 ctl 后 32 来取) 的 stackPred 取出复位, 正是因为这个原理(灭活时存放了此前的 ctl 后 32 位).

形象一点理解:

1. 灭活一个 q, 则将它的 scanState 加上非存活标记位(不影响后面的索引标识, 它是 32 最高位), 将 ctl 的后 32 位变化后存给它的 stackPred, 将 ctl 设置新值, 前 32 位进行减活跃数的逻辑, 后 32 位用新的 ns 来替换.

2. 再灭活一个新的 w, 重复上一个逻辑, 则新的 w 是 ” 灭活栈 ” 的栈顶, 新 w 的索引会保存在新的 ctl 里, 原 ctl 中存放的上一个 q 的索引被置为当前 w 的 stackPred.

3.release 或者 signal, 对栈顶元素有一个相应的操作, 将它重新激活, 会将它的 stackPred 经过反向算法交给 ctl, 而它自己的 scanState 又简单恢复成包含索引的合法结果(ctl 后 32 位加一个标识位并去除非活跃位的 1).release 之后的 ctl 依旧可能存在非零的后 32 位(这取决于刚出的栈顶是不是栈底), 同样 signal 中的 tryAddWorker, 只有在 ctl 后 32 位干净时才会调用, 也说明了这一点.

注意,3.4 对 ss 进行了加上 INACTIVE 标记位的操作, 即令 ss 变成负值, 但在 3 中并不会在此退出循环, 下一轮循环中可能再次进入 3.1 满足了 break 条件并退出循环返回 null, 也可能进入 2.9 增加校验和, 或者在下一次循环中进入 2.8 重新刷新 runState, 这时如果此前已经有别的线程在 2.7 进行了当前 worker 的释放或者 tryRelease/signal 等操作(共同的要求: 当前 worker 此时是栈顶), 会因此令下一次循环有机会从 2.4 返回.

终于串了起来, 终于搞懂了 WorkQueue 之间这个又是数组又是栈的结构了.

显然, 这种栈的实现方式真是够少见 … 它确实是个栈的结构, 不过栈元素自身维护的数据需要不停地和外界 (池) 中的 ctl 进行后面位的交换.

到此, 前面的难点与疑问终于清楚了.

同时也可以发现,scan 方法,runWorker 方法和前面 WorkQueue 的 runTask 等方法共同组成了 ” 工作窃取 ” 的调度机制, 明显一个线程在注册入池后启动, 每一轮大循环都会先从 scan 到一个 task 开始, 获取不到直接 awaitWork, 获取到, 则先执行 task, 再执行自己 localTask, 因为我们提前介绍了 ForkJoinTask 和它的若干子类以及 doug 在官方文档中给我们的用例, 因此大家很容易理解这一点, 一个任务在运行过程中, 很可能会有新的任务由它而生并入池, 现在没有看到入池的源码, 但在前面介绍过,ForkJoinWorkerThread 来 fork 出的任务入池时是入自己的队列, 外部线程提交入池的任务则是 externalSubmit 一类, 这两部分的源码都会在后面介绍.

显然一个线程刚刚启动时, 它的 workQueue 完全是空的, 相应的另一个线程在 scan 时若获取了它的队列必然会忽略, 当前线程也必须先从 scan 开始, 从随机的队列中按一定的规则 (冲突时重置一个随机的索引位置, 不冲突但发现未注册 worker 等情况时直接索引加 1) 去偷取一个原始的任务, 而且从 base(先入任务)开偷, 然后先运行这个偷来的 task, 再运行自己的本地任务, 在运行该 task 时, 可能就会 fork 出多个子任务入了自己的任务数组, 因此再运行自己的本地任务时才有活可干, 完成所有本地任务后,runWorker 进入下一轮循环, 继续 scan->waitScan→runTask 的流程.

接下来继续看 awaitWork 方法.

// 字面意思, 等待有工作可做.
// 它其实可能会阻塞一个 worker 偷取任务的过程, 如果 worker 应当关闭则直接返回 false.
// 如果 worker 已经处于非活睡在态, 且引起了线程池的静寂, 则检查线程池的终结态, 只要当前 worker
// 不是唯一一个 worker 就等待一段时间. 如果等待超时后 ctl 未改变(前 32 位的数量信息未变, 后 32 位的栈信息也未变),
// 则终止当前 worker, 它可能会唤醒另一个可能重复这个过程的 worker
// 参数 w, 调用者 worker,r 是一个自旋用的随机数种子, 如果 worker 应当关闭, 返回 false.
private boolean awaitWork(WorkQueue w, int r) {if (w == null || w.qlock < 0)                 // w is terminating
        // 一个线程从注册入池起就有队列, 如果它为空或者 qlock 被置为负(-1), 应当终结.
        // 前面提过, 在 deregisterWorker 或 tryTerminate 时会将 qlock 置 -1.
        return false;
    // 初始化相关值, 保留队列中保存的前一个栈, 取出队列的 ss, 赋值自旋数.SPINS 在前面分析
    // 运行状态加锁时介绍过, 它的值当前就是 0, 参考 awaitRunState 方法, 在等待 runState 锁的时候, 也可以根据它先自旋.
    for (int pred = w.stackPred, spins = SPINS, ss;;) {if ((ss = w.scanState) >= 0)
            //1. 队列的 scanState 大于 0, 回忆一下, 前面介绍 tryRelease 和 signal 中计算 vs 的方法, 其中一步是与~INACTIVE, 而 INACTIVE 是 1 <<31
            // 在前面的 scan 方法中已经遍历一轮且未找到 task 又未出现竞态未更改校验和的情况, 会将 scanState 加上 INACTIVE.
            // 因此此处 scanState 突然不小于 0, 说明是经历过类似 tryRelease 或 signal 的释放唤醒动作, 退出循环等待.
            break;
        else if (spins > 0) {
            //2. 当前未被活化, 依旧处于 INACTIVE 态, 则首先尝试自旋. 使用 r 这个随机数来决定是否对自旋次数减 1.
            r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
            if (r >= 0 && --spins == 0) {         // randomize spins
                //2.1 自旋次数达到 0 时做了勾子操作.
                WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
                if (pred != 0 && (ws = workQueues) != null &&
                    (j = pred & SMASK) < ws.length &&
                    (v = ws[j]) != null &&        // see if pred parking
                    (v.parker == null || v.scanState >= 0))
                    //2.2 自旋次数降到 0 时, 若满足几个条件:
                    // 当前队列保存的栈下一个队列的索引 (pred) 存在, 线程池队列非空,pred 未溢出队列数组,
                    // 取出 pred 对应的 ws 的队列(它其实是当前 w 在栈向栈底前进一个的元素, 它存在说明当前 w 不是栈底.
                    // 如果该元素存在, 且它没有阻塞者或它还保持 active, 则重置自旋次数, 继续自旋.
                    spins = SPINS;                // continue spinning
            }
        }
        else if (w.qlock < 0)                     // recheck after spins
            //3. 自旋结束后, 再次检查 w 的队列锁, 看它是不是已经被终止了.(deregisterWorker 或 tryTerminate).
            return false;
        else if (!Thread.interrupted()) {
            //4. 如果当前线程还未被扰动.
            // 目前我们只在一个地方看到过线程扰动的情况:awaitRunStateLock, 即当一个线程尝试去修改池的运行时状态, 它会去获取一个 runState 锁,
            // 获取失败, 发生竞态, 也经过自旋等辅助策略无效的阶段, 则会尝试使用 stealCounter 来当作锁加锁,unlock 时也会在确认竞态的情况下去用它唤醒.
            // 而在 awaitRunStateLock 中阻塞的线程如果正在进行 stealCounter.wait 时,wait 操作被中断, 则会扰动当前线程, 这将去除进入此分支的可能.
            // 此外,tryTerminate 本身也有扰动其他工作线程的步骤. 如果用户不在相应的实现代码(如 ForkJoinTask 的 exec 函数或 CountedCompleter 的 compute 函数)
            // 中手动去扰动当前工作线程, 可以理解 awaitRunStateLock 的扰动事件可能与 tryTerminate 有关.
            long c, prevctl, parkTime, deadline;
            // 计算新的活跃数, 它是原 ctl 的前 16 位 (负) 加上并行度.
            int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
            if ((ac <= 0 && tryTerminate(false, false)) ||
                (runState & STOP) != 0)           // pool terminating
                //5. 发现活跃数已降至 0, 尝试调用 tryTerminate, 方法返回 true 表明已终止或正在终止; 或发现 runState 已经进入终结程序.
                // 这两种情况直接返回 false, 线程执行完毕终止.
                return false;
            if (ac <= 0 && ss == (int)c) {        // is last waiter
                //6. 前面分析 scan 方法时讨论过, 栈顶元素的 scanState 体现在 ctl 的最新后 32 位, 它的 stackPred 则是 ctl 之前的后 32 位值.
                // 进入 6, 说明当前 worker 是栈顶, 即最后一个等待者.
                // 用 pred 计算出之前的 ctl.
                prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
                // 取 ctl 的 17-32 位, 即 worker 总数.
                int t = (short)(c >>> TC_SHIFT);  // shrink excess spares
                if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
                    //6.1 如果发现线程总数大于 2, 将 ctl 回滚, 返回 false 让线程终止.
                    return false;                 // else use timed wait
                //6.2 计算 deadLine 和 parkTime, 用于后续的定时等待, 暂不终结当前线程, 而是作为 parker.
                //IDLE_TIMEOUT 最开始说过, 它就是起这个作用的一个时间单位, 把 gc 时间也考虑在内, 默认为 2 秒.
                parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
                deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;}
            else
                //7. 存在 active 的 worker 或当前 w 不是栈顶.
                prevctl = parkTime = deadline = 0L;
            //8. 做线程停段的工作.
            Thread wt = Thread.currentThread();
            // 把当前线程池作为 parker 设置给线程, 当使用 LockSupport.park 时, 它将被当作一个参数传递(参考 Thread 类注释, 在 java 方法签名处看不出来).
            U.putObject(wt, PARKBLOCKER, this);   // emulate LockSupport
            // 设置 parker.
            w.parker = wt;
            if (w.scanState < 0 && ctl == c)      // recheck before park
                //8.1 重新检查非 active. 合格则停顿.
                U.park(false, parkTime);
            // 归置.
            U.putOrderedObject(w, QPARKER, null);
            U.putObject(wt, PARKBLOCKER, null);
            if (w.scanState >= 0)
                //8.2 停顿 (或未停顿) 重检查发现 w 被重新 active, 则退出循环返回 true(非 false 代表不能终结当前线程).
                break;
            if (parkTime != 0L && ctl == c &&
                deadline - System.nanoTime() <= 0L &&
                U.compareAndSwapLong(this, CTL, c, prevctl))
                //8.3 发现没有时间了,ctl 也未在等待的时间发生变化, 将 ctl 设置为 w 入栈前的结果, 返回 false 让终结此线程(类似出栈).
                return false;                     // shrink pool
        }
    }
    return true;
}

awaitWork 方法稍长一些, 但是内容大多是前面已经介绍过的字段, 如果前面的有关方法和字段较熟悉, 这一块不难理解.

同时, 它也验证了我前面对 runState,ctl 和 stackPred 的猜测正确.

它的返回值也值得注意, 从返回值来说, 它的作用也不止是简单的 ” 等待操作 ”, 它返回 false 会造成线程的终结, 而返回 true 时,runWorker 方法会重开一轮, 再一次尝试获取任务,

而返回 true 只能发生在两个 break(1 和 8.2)检查 scanState 时, 这说明 w 被活化.

接下来看一些与 join 有关的操作, 这些操作大多是由外部 (工作线程之外, 甚至线程池之外的线程) 调用, 也能由其他类 (非 ForkJoinPool, 如已经介绍过的 ForkJoinTask 和 CountedCompleter) 进行调用和调度.

// 帮助完成, 调用者可以是一个池中的工作线程, 也可以是池外的. 在 JDK8 版本中, 有三处调用:
//1.CountedCompleter::helpComplete, 该方法的调用由我们决定.
//2.ForkJoinPool::awaitJoin, 等待结果的同时可以尝试帮助完成, 只由池中线程调用, 传入的队列是该线程的队列. 该方法由 ForkJoinTask 的 join/invoke/get 调用.
//3.ForkJoinPool::externalHelpComplete, 用于外部线程操作, 前面在 CountedCompleter 的文章已粗略介绍, 传入的 w 为 ws 中用一个随机数与 n - 1 和 0x007e 取与运算
// 的结果, 很明显, 即使 w 不是 null, 也只能是一个偶数位的元素, 这意味着 w 不会是 registerWoker 时生成的带有工作线程的 WorkQueue. 也就是不能帮助池中线程完成自己的队列.
// 本方法会尝试从当前的计算目标之内偷取一个任务, 它使用顶层算法的变种, 限制偷出来的任务必须是给定任务的后代, 但是也有一些细节要注意.
// 首先, 它会尝试从自己的工作队列中找合格的任务(用前面讲过的 WorkQueue::popCC), 若不能找到则扫描其他队列, 当发生竞态时随机移动指针, 依照校验和机制决定是否放弃
// 帮助执行(这取决于前面介绍的 pollAndExecCC 的返回码). 参数 maxTasks 是对外部使用的支持参数, 内部调用它会传入 0, 允许无界的次数(外部调用时, 捕获非法的非正数).
// 参数 w, 队列, 在内部调用的情况下可以理解为当前线程的工作队列, 参数 maxTasks 如果非 0, 指代最大的可运行的其他任务. 退出时方法返回任务状态.
final int helpComplete(WorkQueue w, CountedCompleter<?> task,
                       int maxTasks) {WorkQueue[] ws; int s = 0, m;
    // 变量初始化和验证, 队列和参数 w 必须非空才能进入 if.
    if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
        task != null && w != null) {
        // 介绍 popCC 时曾专门强调过这个 mode 其实是 config.
        int mode = w.config;                 // for popCC
        int r = w.hint ^ w.top;              // arbitrary seed for origin
        int origin = r & m;                  // first queue to scan
        // 初始时赋 h 为 1, 在每一轮循环中, 它取 1 代表正在正常运行, 大于 1 代表发生了竞态, 小于 0 将增加到校验和, 代表 pollAndExecCC 达到了根元素.
        // 详细参考前面论述过的 pollAndExecCC.
        int h = 1;                           // 1:ran, >1:contended, <0:hash
        // 初始化条件循环条件, 记录 origin 的值, 初始化 oldSum 和 checkSum
        for (int k = origin, oldSum = 0, checkSum = 0;;) {
            CountedCompleter<?> p; WorkQueue q;
            //1. 传入的任务已经是完成的,break 返回 s(负).
            if ((s = task.status) < 0)
                break;
            //2.h 未经过更改或经历过若干次更改, 但在上一轮循环代表了 pollAndExecCC 成功执行 task(h 取 1), 则
            // 在当轮循环尝试对 w 进行 popCC, 并根据 mode 决定从 base 还是 top 出队.
            if (h == 1 && (p = w.popCC(task, mode)) != null) {
                //2.1 本队列有满足条件的任务, 执行之.
                p.doExec();                  // run local task
                if (maxTasks != 0 && --maxTasks == 0)
                    // 减少 maxTask 并在它降到 0 时 break.(前提是传入了正数的 maxTasks).
                    break;
                //2.2 没降到 0, 把 origin 和校验和参数重设为循环初始化的值.
                origin = k;                  // reset
                oldSum = checkSum = 0;
            }
            //3. 某轮循环 h 代表出现竞态等问题或不能使用 popCC 方式从本地队列出任务执行. 尝试从其他队列 poll 执行.
            else {                           // poll other queues
                //3.1 找不出任务,h 置 0, 这将使它在随后的循环中不会再进入 2
                if ((q = ws[k]) == null)
                    h = 0;
                //3.2 尝试从 q 的 base 处 poll 并执行 task. 返回 - 1 代表不匹配, 对校验和增加 h(负数).
                else if ((h = q.pollAndExecCC(task)) < 0)
                    checkSum += h;
                //3.3 h 大于 0, 可能是等于 1 但 popCC 未成功的情况. 也可能是 pollAndExecCC 成功了一次或 cas 失败.
                if (h > 0) {if (h == 1 && maxTasks != 0 && --maxTasks == 0)
                        // h 是 1 减 maxTask, 当它达到 0 终止循环.(前提是没传了正数的 maxTasks)
                        break;
                    // h 不等于 1, 一般是 poll 时 cas 失败, 重置 r,origin,checkSum 等, 开下一轮循环.
                    r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
                    origin = k = r & m;      // move and restart
                    oldSum = checkSum = 0;
                }
                //3.4 前面见过类似的代码, 发现已经转完了一轮, 校验和未改变过 (任何一个队列都未进 3.2/3.3, 也就是查找任何一个下标 ws[k] 都是 null),break.
                else if ((k = (k + 1) & m) == origin) {if (oldSum == (oldSum = checkSum))
                        break;
                    // 发现校验和有变更, 说明有一轮循环未进入 3.1, 再次循环.
                    checkSum = 0;
                }
            }
        }
    }
    // 返架退出循环是 task 的 status.
    return s;
}

显然,task 在整个 help 过程中不会被执行, 一旦某一轮循环发现 task 已经完成了, 那么立即结束循环. 此方法可以让进行 join/get 等操作的线程帮助完成一些有关 (子任务) 的任务.

只要存在非空队列,task 未完成, 当前线程未能帮助完成 maxTask 个任务(或初始就指定了 0), 当前线程就会一直循环去找任务, 直到发现 task 完成了为止.

简单分析一下一轮循环的工作流程.

显然首轮循环 (不考虑一上来 task 的 status 就小于 0 的情况) 必然能从 2 进入, 在若干轮后某一轮未能 pop 成功而进入了 3,3 中若干轮后 poll 成功, 则 h 重新被置为 1, 造成下一轮循环又可以进入 2 的流程.

直到: 某一轮 task 完成了; 某一轮 maxTask 完成了(指定的情况); 某一轮再次发现 h 为 0 且发现已经对 ws 所有队列转满了一圈.

还是再逻辑一下该方法的使用.

1. 外部使用 ForkJoinTask 的 get/join 等方法时, 引用到 ForkJoinPool::externalHelpComplete, 它调用 helpComplete 传入的队列一定是偶数索引的队列. 非工作线程维护的队列; 或引用到 ForkJoinPool::awaitJoin, 调用 helpComplete 传入 maxTasks 是 0, 意味着可能循环到直到 task 完成为止.

2. 内部线程可能在 CountedCompleter::helpComplete 中使用此方法, 这种情况下, 需要我们在 compute 方法中进行调用.

接下来看 helpStealer 方法.

// 字面意思: 尝试帮助一个 "小偷".
// 本方法会尝试定位到 task 的偷盗者, 并尝试执行偷盗者 (可能偷盗者的偷盗者) 的任务. 它会追踪 currentSteal(前面 runTask 时提过, 会将参数 task 置为 currentSteal)->
//currentJoin(当前队列等待的任务, 后面会介绍 awaitJoin 方法), 这样追寻一个线程在给定的 task 的后续工作, 它会使用非空队列偷回和执行任务. 方法的第一次从等待 join 调用
// 通常意味着 scan 搜索, 因为 joiner 没有什么更适合做的, 这种做法也是 ok 的. 本方法会在 worker 中留下 hint 标识来加速后续的调用.
// 参数 w 代表 caller 的队列,task 是要 join 的任务.
// 方法共分三层循环, 最外层是一个 do-while 循环, 其内是两个 for 循环.
private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
    // 初始化和进入 if 的条件, 没什么可说的.
    WorkQueue[] ws = workQueues;
    int oldSum = 0, checkSum, m;
    if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
        task != null) {
        // 循环一: 方法最外层的 while 循环. 它的条件是 task 未完成且校验和未发生变化.
        do {                                       // restart point
            // 每次循环一的起点, 校验和重置为 0. 用 j 保存 w.
            checkSum = 0;                          // for stability check
            ForkJoinTask<?> subtask;
            WorkQueue j = w, v;                    // v is subtask stealer
            // 循环二: 外部 for 循环,subtask 初始指向参数 task, 循环条件是 subtask 未完成.
            // 在每次循环四中会校验当前小偷的队列是否空了, 如果空了则换它的小偷继续偷(交给 subtask 指向).
            descent: for (subtask = task; subtask.status >= 0;) {
                // 循环三: 内部第一个 for 循环, 初始化变量 h, 用 hint 加上奇数位, 保证从奇数索引取队列.k 初始为 0, 每次循环结束加 2.
                for (int h = j.hint | 1, k = 0, i; ; k += 2) {
                    //1. 循环三内的逻辑.
                    //1.1 发现 k 已经递增到大于最大索引 m 了, 直接终止循环二, 若发现 task 还未完成, 校验和也未更改, 则进行上面的重置操作并重新开始循环二.
                    if (k > m)                     // can't find stealer
                        break descent;
                    //1.2i 位置标记为 h + k 的结果与运算 m, 因为 k 每次增 2,h 又是奇数, 故保证只取有线程主的队列.
                    if ((v = ws[i = (h + k) & m]) != null) {if (v.currentSteal == subtask) {// 发现偷取 currentSteal 的 worker v, 将它的索引 i 交给 j(初始为 w, 在 2 内会更改为 "等待队列" 的元素, 在 while 循环中会重置为 w)的 hint,
                            // 方便下一次再进入循环三时的查找. 并终止循环三, 进入循环四的判断入口.
                            j.hint = i;
                            break;
                        }
                        //1.3 存在非空 v, 但是 v 未偷取 subtask, 将 v 的 base 加给校验和. 这将影响到循环一的判真条件. 显然从循环三退出循环二, 或后续循环四退出循环二
                        // 将导致循环一也一并因 while 条件不满而退出.
                        checkSum += v.base;
                    }
                }
                // 循环四: 迭代 subtask 的循环, 它必须经循环三中的 1.2 走出.
                //2. 到达循环四, 一定已经在循环三中找到了一个 v, 此处会尝试帮助 v 或者它的产生的 "后裔".
                for (;;) {                         // help v or descend
                    ForkJoinTask<?>[] a; int b;
                    //2.1 类似 1.3 的逻辑, 校验和增加 v.base, 初始化 next, 增加校验和意味着, 只要从循环四退出了循环二, 则最外的循环一的 while 条件将不满足.
                    checkSum += (b = v.base);
                    //next 取 v 的 currentJoin.
                    ForkJoinTask<?> next = v.currentJoin;
                    if (subtask.status < 0 || j.currentJoin != subtask ||
                        v.currentSteal != subtask) // stale
                        //2.2 如果 subtask 已是完成态, 或发现竞态等情况造成数据已脏, 如发现本轮循环中 j 的当前 join 已不是当前 subtask,
                        // 或 v 的当前 steal 不是 subtask, 说明出现了脏数据, 直接终止循环二, 重新进入 while 循环重初始化 jv.
                        break descent;
                    //2.3 发现队列 v 已空的逻辑.
                    if (b - v.top >= 0 || (a = v.array) == null) {if ((subtask = next) == null)
                            //2.3.1 v 已空, 且不存在 next, 即 "等待队列" 已空, 退出循环二, 重新 while 判定循环条件, 重初始化 jv.
                            break descent;
                        //2.3.2 还有 next, 将 subtask 指向 next 的同时, 用 v 替换掉 j. 这是明显的迭代语句.
                        // 在前面的代码中可以看出, 循环一就是为 subtask 找出小偷 v 的, 关系是 v.currentSteal=subtask. 同时 j.currentJoin=subtask.
                        // 因为 next=v.currentJoin, 将 v 赋给 j 后, 仍旧满足 j.currentJoin=next=subtask, 此时 break 掉循环四, 重新开启循环二的新一轮
                        // 正好对 v 进行重新初始化, 而找到 v 的条件又是 v.currentSteal=subtask, 也即等于 j.currentJoin.
                        // 此处 break 掉的循环四将导致循环二的下轮将在循环三处重新为新的 j 找到 v(v.currentSteal==subtask).
                        j = v;
                        break;
                    }
                    //2.4 未进入 2.3.1/2.3.2 的情况, 显然进入这两者会 break 到循环一或二.
                    // 取出 base 索引位置 i 和相应的任务元素.
                    int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                    ForkJoinTask<?> t = ((ForkJoinTask<?>)
                                         U.getObjectVolatile(a, i));
                    //2.5, 接 2.4, 判断竞态,v.base!= b 说明已经被别的线程将 base 元素出队. 这种情况下直接进入下一轮的循环二.
                    if (v.base == b) {
                        //2.5.1 取出任务 t, 发现空为脏数据, 从 while 循环重新初始化.
                        if (t == null)             // stale
                            break descent;
                        //2.5.2, 将 t 出队并进行后续流程.
                        if (U.compareAndSwapObject(a, i, t, null)) {
                            //2.5.3 首先将 v 的 base 增 1.
                            v.base = b + 1;
                            //2.5.4 取出 w(方法参数, 当前 worker)的 currentSteal 保存到 ps.
                            ForkJoinTask<?> ps = w.currentSteal;
                            int top = w.top;
                            do {
                                //2.5.5 此循环不和循环一至四一块罗列, 因为它本质上只是任务的出队与执行.
                                // 首先会尝试将 w 队列的 currentSteal 置为刚刚从 v 的任务数组中出队的 t
                                U.putOrderedObject(w, QCURRENTSTEAL, t);
                                // 执行 t. 执行后顺带循环处理自己刚压入队列 w 的任务. 执行后, 也跳出当前 while 循环的情况下会在下次重新判断 2.3,
                                // 非空继续找 base(i), 为空则迭代 v 为 next(2.3.2).
                                t.doExec();        // clear local tasks too
                            //2.5.6 循环条件, 只要参数 task 还未完成,w 新压入了任务, 则依次尝试从 w 中 pop 元素, 和前面的 t 一样按序执行(此处顺带执行自己的任务).
                            } while (task.status >= 0 &&
                                     w.top != top &&
                                     (t = w.pop()) != null);
                            //2.5.7 偷了小偷 v 的 base 任务并执行成功, 则恢复 w 的 currentSteal.
                            U.putOrderedObject(w, QCURRENTSTEAL, ps);
                            if (w.base != w.top)
                                //2.5.8 偷完并执行完当前 v 的 base 任务或者某一轮的等待队列上的元素 v 的 base 任务后, 发现自己的队列非空了, 就不再帮助对方, 方法 return.
                                // 可以参考 awaitJoin 方法, 因为 helpStealer 只在 awaitJoin 中调用, 调用的前提就是 w.base==w.top.
                                // 这显然与 2.5.6 有所纠结(尽管一个判断 top, 一个判断 top 和 base 的相等), 只要到了 2.5.8, 队列非空将返回.
                                return;            // can't further help
                        }
                        // 出队失败同 2.5.1 一样, 竞态失败重新循环二, 但在下一轮循环中会在 2.5.1break 回 while 循环.
                    }
                }
            }
        // 最外层的 while 循环条件,task 未完成, 校验和未发生更改.
        } while (task.status >= 0 && oldSum != (oldSum = checkSum));
    }
}

helpStealer 方法不短, 内容和信息也不少, 但鉴于前面已经不停地渗透与它有关的 ” 窃取 ”,” 外部帮助 ” 等概念, 此处只再啰嗦一点细节, 也解释一些注释中的疑惑.

1. 回顾一下 currentSteal, 在 scan 方法中会把参数 task 设置为自身队列的 currentSteal. 而且 runTask 在每轮循环会先运行这个 task, 再运行队列的本地任务, 每轮循环都会更新它.

2. 如果我建立了一个 ForkJoinTask, 并 fork 出若干子任务并 join 之, 或者在外部 ForkJoinTask::join 等方式, 相当于形成了一个 ” 等待队列 ”, 即任务之间彼此 join, 用 currentJoin 标识(这一块在 awaitJoin 方法详解).

3. 仅有 w.base==w.top 时才能执行此方法, 如果执行过程中发生条件变化, 则在执行完当前小偷 v 的某个任务后进行检查会发现,  就会回归到自己的任务队列.

4. 同 1, 其实 helpStealer 方法相当于沿着 currentJoin 队列进行帮助, 首先找到自己 w 的小偷, 帮他执行完剩下的任务, 然后顺着它们 join 的任务去执行. 对于等待队列的逐个迭代过程, 依靠 currentJoin 和 currentSteal 两者的配置, 通过 currentJoin 找到 next, 也即下一个 subtask, 再遍历 ForkJoinPool 中的 ws 找到 currentSteal 是 subtask 的 worker, 如此迭代并重复上面的所有过程.

5. 玩笑式的聊一下这个 ” 反窃取 ”, 甚至有点 ” 赔偿 ” 的概念了, 一个 worker 发现 / 或线程池外的线程去帮助偷了我任务的工作线程 (worker, 或 WorkQueue, 即 ws 的奇数索引位元素) 从 base 处执行, 直到执行干净, 再找它 currentJoin 的任务所属于的队列, 继续这个完成过程, 直到发现自己的任务被完成了为止. 可见, 我发现你是我这个任务的小偷, 我不但要偷你的全部身家, 还要偷走偷了你的任务的小偷的全部身家, 如果他也被偷过, 那我再找他的小偷去偷, 直到找回失主 (我丢的任务) 为止.

因为一个任务的完成要先从出队开始, 因此不会出现两次执行的情况, 可以放心大胆的窃取.

上面的 45 是比较简单的形象例子, 但也不妨再加上一小段伪代码.

比如我先创建了一个 ForkJoinTask 子类并 new 一个对象, 在它的 exec 方法中我 fork 了多个任务, 那么当我去 submit 到一个 ForkJoinPool 后, 我使用 get 方法去获取结果, 此时美妙的事情就发生了.

1. 我提交它入池, 它进入了队列, 并被一个工作线程 (小偷) 偷走.

2. 它被偷走后, 我才去 get 结果, 此时发现 task 还未结束, 我需要等. 但是我就这样干等吗? 不我要找小偷, 我要报复.

3. 开始报复, 但是我只能偷他的钱库(array), 小偷对自己偷来的任务非常重视, 它放在 currentSteal 里面了, 我偷不到, 只好把他的钱库偷光.

4. 偷完他的钱库, 我发现我的 task(失物)还没有完成, 我还是不能闲着, 一不做二不休, 我发现小偷也有 join 的任务, 这个被 join 的任务不在他的队列, 也被其他小偷偷走了, 那么我找到新的小偷, 再偷光它的财产.

5. 如果我的 task 还是未执行完毕, 我再找新的小偷; 否则返回即可. 或者我每偷走一个小偷的任务时, 突然发现我的仓库提交了新任务, 那我就不能再去偷了.

这是外部线程的执行结果. 但如果帮助者本身是一个工作线程, 那么流程也相似, 读者自行捊顺吧.

// 字面意思: 尝试补偿.
// 方法会尝试减少活跃数 (有时是隐式的) 并可能会因阻塞释放或创建一个补偿 worker.
// 在出现竞态, 发现脏数据, 不稳定, 终止的情况下返回 false, 并可重试. 参数 w 代表调用者.
// 方法实现比较简单, 为简单的 if else 模式, 只有一个分支可以执行.
private boolean tryCompensate(WorkQueue w) {
    //canBlock 为返回值.
    boolean canBlock;
    WorkQueue[] ws; long c; int m, pc, sp;
    //1. 发现调用者终止了, 线程池队列数组为空, 或者禁用了并行度, 则返回 false.
    if (w == null || w.qlock < 0 ||           // caller terminating
        (ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
        (pc = config & SMASK) == 0)           // parallelism disabled
        canBlock = false;
    //2. 发现当前 ctl 表示有 worker 正等待任务(空闲, 位于 scan), 则尝试释放它, 让它回来工作.
    else if ((sp = (int)(c = ctl)) != 0)      // release idle worker
        canBlock = tryRelease(c, ws[sp & m], 0L);
    else {
        //3. 当前所有 worker 都在忙碌.
        //3.1 计算活跃数, 总数, 计算方法前面已经论述多次.
        int ac = (int)(c >> AC_SHIFT) + pc;
        int tc = (short)(c >> TC_SHIFT) + pc;
        // 记录 nbusy, 注释表示用于验证饱合度.
        int nbusy = 0;                        // validate saturation
        for (int i = 0; i <= m; ++i) {        // two passes of odd indices
            WorkQueue v;
            //3.2nbusy 的计算方法, 遍历线程池的队列数组(每次增 1), 验证则以 1 -3- 5 这个顺序开始, 发现有处于 SCANNING 态的, 就停掉循环, 否则加 1.
            if ((v = ws[((i << 1) | 1) & m]) != null) {if ((v.scanState & SCANNING) != 0)
                    break;
                ++nbusy;
            }
        }
        //3.3 如果非稳态(饱合度不是 tc 的 2 倍), 或者 ctl 脏读, 则返回 false.
        if (nbusy != (tc << 1) || ctl != c)
            canBlock = false;                 // unstable or stale
        else if (tc >= pc && ac > 1 && w.isEmpty()) {
            //3.4 处于稳态且 ctl 还有效, 总 worker 数大于并行度且活跃数大于 1 而且当前 w 又是空的. 尝试将 ctl 减去一个活跃位.
            long nc = ((AC_MASK & (c - AC_UNIT)) |
                       (~AC_MASK & c));       // uncompensated 反补偿, 初看莫名其妙, 调用者会在之后增加 ac.
            // 返回值为 cas 是否成功.
            canBlock = U.compareAndSwapLong(this, CTL, c, nc);
        }
        else if (tc >= MAX_CAP ||
                 (this == common && tc >= pc + commonMaxSpares))
            //3.5 普通 ForkJoinPool, 总 worker 数达到 MAX_CAP, 或 common 池, 总 worker 数量达到并行度 +commonMaxSpares(默认 256), 抛出拒绝异常.
            throw new RejectedExecutionException("Thread limit exceeded replacing blocked worker");
        else {                                // similar to tryAddWorker
            boolean add = false; int rs;      // CAS within lock
            //3.6. 计算新的 ctl, 增加一个总 worker 数.
            long nc = ((AC_MASK & c) |
                       (TC_MASK & (c + TC_UNIT)));
            // 加运行状态锁, 池未进入终止态的情况下, 进行 cas, 随后解锁.
            if (((rs = lockRunState()) & STOP) == 0)
                add = U.compareAndSwapLong(this, CTL, c, nc);
            unlockRunState(rs, rs & ~RSLOCK);
            //cas 成功, 则创建 worker
            canBlock = add && createWorker(); // throws on exception}
    }
    return canBlock;
}

此方法相对简单许多, 只是根据不同的当前线程池和参数队列的状态进行不同的操作.

1. 调用者终止或线程池空态, 返回 false 结束.

2. 发现当前有 worker 正在空闲(阻塞等待新任务), 释放等待栈顶(前面已经论述并欣赏过这种奇怪的 ” 栈 ”).

3. 线程池未进入稳态或者进入时读取的 ctl 失效, 返回 false.

4. 存在活跃 worker 且总 worker 数大于 tc, 调用者队列实际又是空的, 则减去一个活跃位.

5. 总线程数超限, 抛出异常.

6. 其他情况, 增加一个总 worker 数并创建 worker.

// 前面提过它很多次了,awaitJoin 方法会在指定任务完成或者超时前尝试帮助或阻塞自身.
// 参数 w 代表调用者,task 为目标任务, 参数 deadline 是超时目标(非 0). 它会返回退出时的任务状态.
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
    // 返回值.
    int s = 0;
    if (task != null && w != null) {
        //1. 若未进入 if 必然返回 0, 进入条件是提供了 task 和 w.
        // 保存 currentJoin
        ForkJoinTask<?> prevJoin = w.currentJoin;
        // 将 w 的 currentJoin 暂时设置为 task.
        U.putOrderedObject(w, QCURRENTJOIN, task);
        // 如果 task 是 CountedCompleter 类型, 转化并存放到 cc.
        CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
            (CountedCompleter<?>)task : null;
        //2. 循环.
        for (;;) {if ((s = task.status) < 0)
                //2.1 目标 task 已完成, 返回 task 的 status.
                break;
            if (cc != null)
                //2.2 目标 task 是 CountedCompleter, 调用前面介绍过的 helpComplete 方法,maxTasks 不限(0).
                helpComplete(w, cc, 0);
            //2.3 否则发现队列 w 已空, 或者非空, 则尝试从 w 中移除并执行 task, 若出现队列 w 是空且任务不知道是否完成的情况(t.doExec 只是执行, 不等结果),
            // 此处也会拿到一个 true, 则调用前面介绍过的 helpStealer 去帮助小偷.
            else if (w.base == w.top || w.tryRemoveAndExec(task))
                helpStealer(w, task);
            //2.4. 帮助需要时间,double check, 同 2.1.
            if ((s = task.status) < 0)
                break;
            //2.5 计算 deadline 有关的停顿时间 ms.
            long ms, ns;
            if (deadline == 0L)
                ms = 0L;// 未指定 deadline,ms 为 0
            else if ((ns = deadline - System.nanoTime()) <= 0L)
                break;// 要指定的 deadline 已经早于当前时间了,break 返回上面的 status
            else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                ms = 1L;// 用上面的 ns 计算 ms 发现负数, 重置 ms 为 1
            //2.6, 调用上面提到过的 tryCompensate 方法, 传入当前 worker, 如果得到 true 的返回值, 等待超时,
            // 超时结束增加一个活跃位(前面提到 tryCompensate 方法最后加增加 tc 并创建 worker, 不增加 ac, 或者莫名其妙地减去了一个 ac).
            if (tryCompensate(w)) {task.internalWait(ms);
                U.getAndAddLong(this, CTL, AC_UNIT);
            }
        }
        //3. 最后恢复原来的 currentJoin.
        U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
    }
    return s;
}

await 方法只会在 2.1,2.4,2.5 三处结束, 前两处为发现 task 结束, 后一处是超时. 返回的结果一定是返回时 task 的状态.

接下来看一些专门针对扫描的方法.

// 简单的方法, 尝试找一个非空的偷盗队列. 使用类似简化的 scan 的方式查取, 可能返回 null.
// 如果调用者想要尝试使用队列, 必须在得到空后多次尝试.
private WorkQueue findNonEmptyStealQueue() {WorkQueue[] ws; int m;  
    // 随机数 r
    int r = ThreadLocalRandom.nextSecondarySeed();
    // 线程池不具备队列, 直接返回 null.
    if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
        // 循环开始.
        for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
            WorkQueue q; int b;
            if ((q = ws[k]) != null) {if ((b = q.base) - q.top < 0)
                    // 查到 q 这个 WorkQueue, 并且 q 非空, 则将 q 返回.
                    return q;
                // 查到了空队列 q, 则校验和加上 q 的 base.
                checkSum += b;
            }
            // 前面多次见过的判断进入第二轮的办法. 发现 ws 从头到尾都是 null 则 break 返回 null, 否则出现非 null 的空队列则将校验和置 0 继续循环.
            if ((k = (k + 1) & m) == origin) {if (oldSum == (oldSum = checkSum))
                    break;
                checkSum = 0;
            }
        }
    }
    return null;
}


// 运行任务, 直到 isQuiescent, 本方法顺带维护 ctl 中的活跃数, 但是全过程不会在任务不能找到的情况下
// 进行阻塞, 而是进行重新扫描, 直到所有其他 worker 的队列中都不能找出任务为止.
final void helpQuiescePool(WorkQueue w) {
    // 保存当前偷取的任务.
    ForkJoinTask<?> ps = w.currentSteal; // save context
    // 循环开始,active 置 true.
    for (boolean active = true;;) {
        long c; WorkQueue q; ForkJoinTask<?> t; int b;
        //1. 先把本地任务执行完毕(每次循环扫描).
        w.execLocalTasks();   
        //2. 查找到非空队列的情况, 除了 2 以外的 34 在 cas 成功的情况下都会终止循环.
        if ((q = findNonEmptyStealQueue()) != null) {
            //2.1 经过 3 中成功减少了活跃数的情况, 下一次循环又扫描到了新的非空队列, 需要重激活.
            if (!active) {      
                active = true;
                // 活跃数重新加 1.
                U.getAndAddLong(this, CTL, AC_UNIT);
            }
            //2.2 再次判断队列非空, 并从队列内部数组的 base 起取出 task
            if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
                // 将 task 置于 currentSteal
                U.putOrderedObject(w, QCURRENTSTEAL, t);
                // 执行 task
                t.doExec();
                // 如果 w 的偷取任务数溢出, 转到池中.
                if (++w.nsteals < 0)
                    w.transferStealCount(this);
            }
        }
        //3. 仍旧保持 active 的情况.
        else if (active) { 
            //3.1 能进到这里, 肯定本轮循环未能进入 2, 说明未能发现非空队列, 计算新的 ctl 即 nc, 它是原 ctl 减去一个活跃单位.  
            long nc = (AC_MASK & ((c = ctl) - AC_UNIT)) | (~AC_MASK & c);
            if ((int)(nc >> AC_SHIFT) + (config & SMASK) <= 0)
                //3.2 新的活跃数加上并行度还不大于 0, 即不能溢出, 说明没有活跃数, 不进行 cas 了, 直接 break.
                // 很明显, 上面计算 nc 的方法, 首先 ctl 正常本身是负, 若上面表达式为正, 唯一的解释是线程池有活跃线程(前面讲过, 活跃一个加一个活跃单元, 直到并行度为止)
                // 因为两个表达式分别是前 16 位 (在前面再补上 16 个 1) 和后 16 位求和.
                break;  
            //3.3 未能从 3.2 退出, 说明 nc 表示当前有活跃数存在, 进行 cas, 成功后 active 置 false, 不退出循环.
            // 若下轮循环发现新的非空队列, 会在 2.1 处增加回来. 若未能发现, 会在 4 处加回来.       
            if (U.compareAndSwapLong(this, CTL, c, nc))
                active = false;
        }
        //4. 前一轮循环进了 3.3, 当前循环未能进入 2.1 的情况, 判断当前 ctl 活跃数加上并行度是非正, 说明再创建并行度个数的 worker 也不能溢出. 则再加回一个活跃数.
        else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) <= 0 &&
                 U.compareAndSwapLong(this, CTL, c, c + AC_UNIT))
            break;
    }
    //5. 重新恢复 currentSteal
    U.putOrderedObject(w, QCURRENTSTEAL, ps);
}


// 获取并移除一个本地或偷来的任务.
final ForkJoinTask<?> nextTaskFor(WorkQueue w) {for (ForkJoinTask<?> t;;) {
        WorkQueue q; int b;
        // 首先尝试 nextLocalTask 本地任务.
        if ((t = w.nextLocalTask()) != null)
            return t;
        // 获取不到本地任务, 尝试从其他队列获取非空队列, 获取不到非空队列, 返回 null.
        if ((q = findNonEmptyStealQueue()) == null)
            return null;
        // 获取到了非空队列, 从 base 处取任务, 非空则返回, 为空则重复循环.
        if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
            return t;
    }
}

上面是一些针对扫描的方法, 有前面的基础, 理解实现并不困难, 不再缀述.

接下来关注与终止有关的函数.

// 前面不止一次提到过 tryTerminate, 说过它会尝试终止或完成终止.
// 参数 now 如果设置为 true, 则表示在 runState 进入 SHUTDOWN 关闭态 (负) 时无条件终止, 否则需要在进入 SHUTDOWN 同时没有 work 也没有活跃 worker 的情况下终止.
// 如果设置 enable 为 true, 则下次调用时 runState 为负, 可直接进入关闭流程(如果有 now 为 true, 则立即关).
// 如果当前线程池进入终止流程或已终止, 返回 true.
private boolean tryTerminate(boolean now, boolean enable) {
    int rs;
    //common 池不可关.
    if (this == common)                       // cannot shut down
        return false;
    //1.runState 拦截和处理.
    if ((rs = runState) >= 0) {if (!enable)
            //1.1 对于当前 runState 非负的情况, 如果没有指定 enable, 返回 false.
            return false;
        //1.2 如果指定了 enable, 将加运行状态锁并更新 runState 的首位为 1, 即 runState 下次进入时为负. 不再进入 1 的拦截处理流程.
        rs = lockRunState();                 
        unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN);
    }

    //2. 终止的处理流程.
    if ((rs & STOP) == 0) {
        //2.1. 没有指定即刻关闭, 检查是否线程池已进入静寂态.
        if (!now) { 
            // 循环重复直到稳态. 初始化校验和机制.                          
            for (long oldSum = 0L;;) {WorkQueue[] ws; WorkQueue w; int m, b; long c;
                long checkSum = ctl;// 校验和取 ctl
                if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0)
                    //2.1.1 当前线程池还有活跃的 worker(前面解释过). 此时应返回 false
                    return false;            
                if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
                    //2.1.2 线程池已经没有队列, 直接 break 进入后续流程.
                    break;
                //2.1.3 从 0 开始遍历到 ws 的最后一个队列. 
                for (int i = 0; i <= m; ++i) {if ((w = ws[i]) != null) {if ((b = w.base) != w.top || w.scanState >= 0 ||
                            w.currentSteal != null) {// 只要发现任何一个队列非空, 或队列未进入非活跃态 (负) 或队列仍有偷来的任务未完成.
                            // 尝试释放栈顶 worker 并增加一个活跃数. 并返回 false, 可以据此重新检查.
                            tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
                            return false;   
                        }
                        // 队列非 null 但是空队列, 给校验和增加 base.
                        checkSum += b;
                        if ((i & 1) == 0)
                            // 发现非 worker 的队列, 直接让外部禁用.
                            w.qlock = -1;    
                    }
                }
                //2.1.4 校验和一轮不变,break 掉进入后置流程. 即 2.1.3 中每一次取 ws[i]都是 null.
                if (oldSum == (oldSum = checkSum))
                    break;
            }
        }
        //2.2. 到这一步, 已经保证了所有的关闭条件. 若还没有给运行状态锁加上 stop 标记,
        // 则给它加上标记. 此时再有其他线程去尝试关闭, 会进不来 2 这个分支.
        if ((runState & STOP) == 0) {rs = lockRunState();              // enter STOP phase
            unlockRunState(rs, (rs & ~RSLOCK) | STOP);
        }
    }
    //3. 经过前面的阶段, 已完成预处理或 now 检查, 可进入后置流程.
    int pass = 0;                             // 3 passes to help terminate
    for (long oldSum = 0L;;) {                // or until done or stable
        WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt; int m;
        long checkSum = ctl;
        //3.1 前面解释过这个状态表示当前无活跃 worker.
        if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 ||
            (ws = workQueues) == null || (m = ws.length - 1) <= 0) {if ((runState & TERMINATED) == 0) {
                // 在确保无 worker 活跃的情况, 直接将线程池置为 TERMINATED. 并唤醒所有等待终结的线程.
                rs = lockRunState();       
                unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED);
                synchronized (this) {notifyAll(); } 
            }
            // 到此一定是终结态了, 退出循环, 结束方法返回 true.
            break;
        }
        //3.2 内循环处理存在活跃 worker 的情况. 从第一个队列开始遍历.
        for (int i = 0; i <= m; ++i) {if ((w = ws[i]) != null) {
                //3.2.1 对每个非 null 队列, 增加一次校验和并禁用队列.
                checkSum += w.base;
                w.qlock = -1;               
                if (pass > 0) {
                    //3.2.2 内循环初次 pass 为 0 不能进入.
                    //pass 大于 0, 取消队列上的所有任务, 清理队列.
                    w.cancelAll();  
                    if (pass > 1 && (wt = w.owner) != null) {
                        //3.2.3 pass 大于 1 并且队列当前存在 owner, 扰动它.
                        if (!wt.isInterrupted()) {
                            try {wt.interrupt();
                            } catch (Throwable ignore) {}}
                        if (w.scanState < 0)
                            //3.2.4 如果 w 代表的 worker 正在等待任务, 让它取消停顿, 进入结束流程.
                            U.unpark(wt); 
                    }
                }
            }
        }
        //3.3 如果校验和在几轮 (最大为 3 或 m 的最大值) 循环中改变过, 说明并未进入稳态. 将 oldSum 赋值为新的 checkSum 并重置 pass 为 0.
        if (checkSum != oldSum) {   
            oldSum = checkSum;
            pass = 0;
        }
        //3.4pass 从未被归置为 0, 稳态增加到大于 3 且大于 m 的情况, 不能再帮助了, 退出循环返回 true.
        else if (pass > 3 && pass > m) 
            break;
        //3.5pass 未到临界值, 加 1.
        else if (++pass > 1) { 
            long c; int j = 0, sp;
            // 每一次进入 3.5 都会执行一次循环. 如果 ctl 表示有 worker 正在 scan, 最多 m 次尝试 release 掉栈顶 worker.
            // 因为最多只有 m 个 worker 在栈中阻塞. 因此 3.4 是合理的.
            while (j++ <= m && (sp = (int)(c = ctl)) != 0)
                tryRelease(c, ws[sp & m], AC_UNIT);
        }
    }
    return true;
}

tryTerminate 方法的实现并不复杂, 不过这里有一点需要注意的地方: 从方法中返回 true, 至少可以理解为进入了终止流程, 但不一定代表已终止(即使是 now 的情况), 因为仅看方法的后半点, 返回 true 时, 线程池一定已经进入 stop(从 3.4break), 或完成了 terminated(从 3.1break).

显然, 线程池的关闭必然会先经历 STOP, 然后再 TERMINATED, 故前面所有的使用线程池的方法都是直接先判断 stop, 因为如果线程池 terminated 了, 那么一定先 stop. 同样, 还有一个 shutdown 标记位来标记 runState 是否已进入负值, 它小于 0 时(SHUTDOWN 是最高位), 则不能再接收新的任务.

其实从调用者可以看出来它的几种执行情况.


显然, 在对线程进行解除注册时, 等待任务时和提交任务时,now 和 enable 均会传入 false, 如果没有其他地方提交调用了 shutdown 将 runState 的首位置 1, 这三个方法无法通过注释 (1) 处的代码拦截.

shutdown 会用 enable 的方式, 将当前尚未将 runState 置负的状态置负, 使得下一次调用 deregisterWorker,awaitWork,externalSubmit,shutdown 四个方法均能走后置的逻辑.

shutdownNow 则两个参数均会置 true, 会走完上面的所有逻辑.

下面来看 externalSubmit 等外部操作的方法.

// 字面意思, 从外面提交一任务入池. 有前面的基础后, 此方法很容易理解.
// 此方法会处理一些不常见的 case, 比如辅助进行池的一些初始化过程(首次提交任务),
// 如果发现是首次外部线程提交任务, 在 ws 的目标索引位置为空或者出现竞态, 它会尝试创建新的共享队列.
// 参数 task 是目标任务, 调用者必须保证非空.
private void externalSubmit(ForkJoinTask<?> task) {
    int r; 
    // 初始化一个用于定位的随机数 r, 前面曾简单介绍过它和 localInit, 许多公司的分布式 id 也是有它的成份.     
    // 而这个随机数与线程相当于绑定在了一起, 因此, 可以使用它表示一个线程特有的东西.                             
    if ((r = ThreadLocalRandom.getProbe()) == 0) {ThreadLocalRandom.localInit();
        r = ThreadLocalRandom.getProbe();}
    // 循环尝试压入任务.
    for (;;) {WorkQueue[] ws; WorkQueue q; int rs, m, k;
        boolean move = false;
        if ((rs = runState) < 0) {
            //1. 发现此时线程池的运行状态已经进入 SHUTDOWN, 帮助终止线程池, 并抛出拒绝异常.
            tryTerminate(false, false);     // help terminate
            throw new RejectedExecutionException();}
        //2. 发现线程池还未初始化, 辅助初始化.STARTED 为第三位.
        else if ((rs & STARTED) == 0 ||     // initialize
                 ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
            int ns = 0;
            // 加锁.
            rs = lockRunState();
            try {//double check 并尝试初台化 stealCounter. 它在 awaitRunStateLock(尝试加锁)的时候会用来 wait,
                // 同时也处理了初始化阶段的竞态, 还记得在 awaitRunStateLock 方法中发现 stealCounter 为 null 时的注释 (初始化竞态) 吗?
                if ((rs & STARTED) == 0) {
                    U.compareAndSwapObject(this, STEALCOUNTER, null,
                                           new AtomicLong());
                    // 创建 workQueues 数组, 大小是 2 的幂. 并保证至少有两个插槽(语义理解, 如果是 4 个的话, 两个 share 两个独有).
                    int p = config & SMASK;
                    // 下面这个比较好理解, 总之, 最小的情况下,n 也会是(1+1)<<1=4, 这样保证有两个位置给 SHARE 两个位置给工作线程.
                    // n 的初始值取决于并行度.
                    int n = (p > 1) ? p - 1 : 1;
                    n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                    n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                    workQueues = new WorkQueue[n];
                    // 新的运行状态.
                    ns = STARTED;
                }
            } finally {
                // 完成了辅助初始化, 则解锁, 并置 runState 加上 STARTED 标识.
                unlockRunState(rs, (rs & ~RSLOCK) | ns);
            }
        }
        //3. 某轮循环发现早已完成初始化, 使用本线程的随机数 r 计算索引, 发现 ws[k]存在. 说明已被别的线程在此初始化了一个队列.
        // 注意索引 k 的值的计算, 它与 m 进行与运算, 保证不大于 m, 同时与 SQMASK, 即 share-queue mask, 它的值是 0X007e, 前面说过,
        // 很明显, 它是整数的 2 至 7 位, 保证了共享队列只能放在 ws 的偶数位.
        else if ((q = ws[k = r & m & SQMASK]) != null) {
            //3.1 对队列加锁.
            if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                // 取队列的数组和 top
                ForkJoinTask<?>[] a = q.array;
                int s = q.top;
                // 初始化提交或者扩容.
                boolean submitted = false;
                try {                      // locked version of push
                    //|| 左边的语句指符合添加元素的条件, 右边表示如果不符合添加条件, 则进行扩容.
                    if ((a != null && a.length > s + 1 - q.base) ||
                        (a = q.growArray()) != null) {
                        // 符合添加条件或扩容成功, 取 top 对应的索引 j.
                        int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                        // 向 top 放入 task.
                        U.putOrderedObject(a, j, task);
                        // 给 top 加 1.
                        U.putOrderedInt(q, QTOP, s + 1);
                        // 标记为已提交.
                        submitted = true;
                    }
                } finally {
                    // 释放 qlock.
                    U.compareAndSwapInt(q, QLOCK, 1, 0);
                }
                if (submitted) {// 如果提交成功, 则尝试唤醒 top 或创建一个 worker(如果太少). 并返回.
                    signalWork(ws, q);
                    return;
                }
            }
            // 竞态失败, 标记 move
            move = true;                   // move on failure
        }
        //3.2 计算出的位置没有 queue, 且 runState 未锁, 创建一个新的.
        else if (((rs = runState) & RSLOCK) == 0) { // create new queue
            // 共享队列没有 owner.
            q = new WorkQueue(this, null);
            // 随机数就用线程的随机数 r.
            q.hint = r;
            //config 的第 32 位置 1 表示共享队列
            q.config = k | SHARED_QUEUE;
            // 队列的 scanState 直接置为 INACTIVE, 很明显, 参考前面的描述,
            // 它没有工作线程, 也不会参与活化和 scan 阻塞的过程, 也不会将自己的 scanState 压入 ctl 后 32 位做栈元素.
            q.scanState = INACTIVE;
            // 加锁.
            rs = lockRunState();         
            if (rs > 0 &&  (ws = workQueues) != null &&
                k < ws.length && ws[k] == null)
                // 仍旧符合添加条件, 池未终结, 将 q 赋给 ws[k], 否则的话, 可能在下一轮循环进入 1 帮助终止,
                // 也可能进入 2 用现成的队列内的任务数组添加元素到 top. 也可能在 4 处发现竞态, 并最终导致 5 处重初始化 r 并重新循环找索引.
                ws[k] = q;                 // else terminated
            unlockRunState(rs, rs & ~RSLOCK);
        }
        //4. 标记繁忙.
        else
            move = true;                   // move if busy
        //5. 本轮循环经历 2 的竞态失败或 4 的繁忙, 重新初始化一个 r 供下轮循环使用.
        if (move)
            r = ThreadLocalRandom.advanceProbe(r);
    }
}

这个方法的逻辑相对简单, 用到的方法和字段基本都是前面说过的.

它的最终结果只有两个:

1. 任务提交入池, 并唤醒正在 scan 的栈顶 worker 或创建一个新的 worker(空闲太多).

2. 终止了线程池并抛出拒绝异常.

看一看有关的几个简短方法.

// 尝试将给定的 task 添加到一个提交者当前的队列中, 如果还需要额外的初始化操作等, 使用上面的 externalSubmit.
// 我们知道, 绝大多数的情况下, 不需要初始化线程池的任务数组(整个线程池就一次), 不需要初始化一个工作队列(每个 ws 一个位置只一次).
// 因此它相当于先尝试用最简单直接的办法将任务压入队列, 如果 ws 存在而队列需要初始化, 或者池本身就没有完成初始化, 再使用 externalSubmit.
// 参数 task 是要提交的任务, 调用者本身必须保证它非空.
final void externalPush(ForkJoinTask<?> task) {WorkQueue[] ws; WorkQueue q; int m;
    // 老办法, 初始的随机数. 运行状态.
    int r = ThreadLocalRandom.getProbe();
    int rs = runState;
    // 快速压入的代码分支, 条件是队列数组 ws 已分配, 非空, 且根据 r 计算出来索引位取出的队列
    // 存在且已完成初始化, 线程池未进入 SHUTDOWN, 并且能够对队列进行加锁.
    if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
        (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
        U.compareAndSwapInt(q, QLOCK, 0, 1)) {
        // 快速入队.
        ForkJoinTask<?>[] a; int am, n, s;
        if ((a = q.array) != null &&
            // 第二个条件是没有到扩容条件.
            (am = a.length - 1) > (n = (s = q.top) - q.base)) {
            // 计算出 top 的索引 j, 并将当前任务放入, 将 top 加 1
            int j = ((am & s) << ASHIFT) + ABASE;
            U.putOrderedObject(a, j, task);
            U.putOrderedInt(q, QTOP, s + 1);
            U.putIntVolatile(q, QLOCK, 0);
            if (n <= 1)
                // 发现原来的队列长度很短, 有可能有 worker 正在 scan, 尝试唤醒一个 worker 或添加一个 worker
                signalWork(ws, q);
            // 只要成功压入, 返回.
            return;
        }
        // 最后解锁.
        U.compareAndSwapInt(q, QLOCK, 1, 0);
    }
    // 未能成功压栈, 原因可能是线程池未初始化, 工作队列未初始化, 队列达到扩容阈值等. 使用 externalSubmit 进行.
    externalSubmit(task);
}



// 尝试弹出外部提交者的任务, 找到队列, 非空时加锁, 最后调整 top, 每次进行都会检查失败, 尽管很少失败.
// 在前面 ForkJoinTask 和 CountedCompleter 等文章中曾引用过相关方法, 此方法可以令等待任务的线程
// 自行将任务出队并执行, 而不是在池内线程还忙碌的情况下干等. 但是该队列可能被其他外部线程放置了新的栈顶
// 且看内部方法实现, 当且仅当 task 是栈顶才有用.
final boolean tryExternalUnpush(ForkJoinTask<?> task) {WorkQueue[] ws; WorkQueue w; ForkJoinTask<?>[] a; int m, s;
    // 当前线程生成随机数 r.
    int r = ThreadLocalRandom.getProbe();
    if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
        (w = ws[m & r & SQMASK]) != null &&
        (a = w.array) != null && (s = w.top) != w.base) {
        // 进入 if 的条件. 线程池已初始化,ws 存在,w 存在且 w 非空队列. 注意, 仍旧取的偶数索引.
        // 计算当前最顶部元素的索引 j
        long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
        // 尝试加锁 qlock, 加锁成功进入.
        if (U.compareAndSwapInt(w, QLOCK, 0, 1)) {
            // 进一步 check 队列 w 的 top 和 w 的 array 未变.
            if (w.top == s && w.array == a &&
                // 队列 w 的顶部元素就是参数 task
                U.getObject(a, j) == task &&
                // 成功将 task 出队
                U.compareAndSwapObject(a, j, task, null)) {
                // 将 top 减 1 并释放锁, 返回 true.
                U.putOrderedInt(w, QTOP, s - 1);
                U.putOrderedInt(w, QLOCK, 0);
                return true;
            }
            // 加锁前已有更改或者 task 本身就不是顶部任务, 直接解锁. 返回 false.
            U.compareAndSwapInt(w, QLOCK, 1, 0);
        }
    }
    // 默认返回 fasle
    return false;
}


// 外部提交者 helpComplete. 介绍 CountedCompleter 提过此方法.
// 当目标任务 task 是 CountedCompleter 类型时可以手动调用 CountedCompleter::helpComplete, 它会调用此处,ForkJoinTask::get 也有调用.
// 此方法可以令外部线程在等待 task 时帮助 completer 栈链上它的子孙任务完成, 从而加速 task 的完成.
final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {WorkQueue[] ws; int n;
    int r = ThreadLocalRandom.getProbe();
    return ((ws = workQueues) == null || (n = ws.length) == 0) ? 0 :
        //ws 未初始化, 返回 0, 否则返回 helpComplete 的结果, 取 w 的方式不变.
        helpComplete(ws[(n - 1) & r & SQMASK], task, maxTasks);
}

终于可以看对外公有的 api 了, 我们使用 ForkJoinPool 的公有方法:

//invoke 方法会尝试将 task 压入池, 但也会立即 join 等待, 压入池的方法即前面介绍过的 externalPush, 同样 join 方法也可能会
// 导致当前线程自身完成了任务(池中工作线程忙碌而当前线程立即从队列中获取了该任务).
// 执行结束后返回该任务的执行结果, 当出现异常时, 直接重新抛出. 但也可能抛出拒绝异常(拒绝入队).
public <T> T invoke(ForkJoinTask<T> task) {if (task == null)
        throw new NullPointerException();
    externalPush(task);
    return task.join();}


// 安排给定任务的执行, 异步进行.
public void execute(ForkJoinTask<?> task) {if (task == null)
        throw new NullPointerException();
    externalPush(task);
}



// 继承自 AbstractExecutorService 的方法列表
//execute 方法, 传入 runnable, 使用前面文章介绍的 ForkJoinTask.RunnableExecuteAction 适配器.
public void execute(Runnable task) {if (task == null)
        throw new NullPointerException();
    ForkJoinTask<?> job;
    if (task instanceof ForkJoinTask<?>) // avoid re-wrap
        job = (ForkJoinTask<?>) task;
    else
        job = new ForkJoinTask.RunnableExecuteAction(task);
    externalPush(job);
}

//submit 一个 task, 返回它本身.
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {if (task == null)
        throw new NullPointerException();
    externalPush(task);
    return task;
}

// 对 callable 的适配, 前面也提过.
public <T> ForkJoinTask<T> submit(Callable<T> task) {ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task);
    externalPush(job);
    return job;
}

// 对 task 和 result 的适配.
public <T> ForkJoinTask<T> submit(Runnable task, T result) {ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result);
    externalPush(job);
    return job;
}

// 对 submit 一个 Runnable 的适配. 避免重复包装. 因为 ForkJoinTask 也可以实现 runnable.
// 典型的场景, 先 submit 一个 runnable, 得到返回的 job, 再将 job 给 submit 进去.
public ForkJoinTask<?> submit(Runnable task) {if (task == null)
        throw new NullPointerException();
    ForkJoinTask<?> job;
    if (task instanceof ForkJoinTask<?>) // avoid re-wrap
        job = (ForkJoinTask<?>) task;
    else
        job = new ForkJoinTask.AdaptedRunnableAction(task);
    externalPush(job);
    return job;
}

// 执行所有任务. 一样先入队再执行, 可能出现本外部线程又偷回来执行的情况.
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
    // 标记是否有异常.
    boolean done = false;
    try {for (Callable<T> t : tasks) {ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
            // 把所有任务包成适配器并加入 futures 列表.
            futures.add(f);
            // 压入池.
            externalPush(f);
        }
        for (int i = 0, size = futures.size(); i < size; i++)
            // 对每一个任务进行静默等待.
            ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
        // 上面的循环成功退出, 置 true 返回.
        done = true;
        return futures;
    } finally {if (!done)
            // 发现是异常退出, 则依次取消任务.
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(false);
    }
}

截止到此,ForkJoinPool 中的主体难点方法已全部介绍完毕, 下面选看一些周边的有助于理解的简单方法.

 
// 估计当前线程池中正在运行 (偷任务或运行任务) 的线程, 也就是未阻塞等待任务的线程. 它会过度估计正在运行的线程数.
public int getRunningThreadCount() {
    int rc = 0;
    WorkQueue[] ws; WorkQueue w;
    if ((ws = workQueues) != null) {for (int i = 1; i < ws.length; i += 2) {if ((w = ws[i]) != null && w.isApparentlyUnblocked())
                // 只取 ws 奇数索引的 worker, 只要它 isApparentlyUnblocked, 即未进入 waiting,blocking,wating_timed.
                ++rc;
        }
    }
    return rc;
}

// 估计当前正在进行偷取或执行任务的线程(未阻塞等待任务), 此方法也会过度估计.
public int getActiveThreadCount() {
    // 很明显, 根据前面我们研究了好久的逻辑, 每 release/signal 的 worker 都会增加一个活跃数单元,
    // 初始添加的 worker 也会增加一个活跃数单元和总数, 显然只要有 active 的, 那么 r 必然是一个溢出的正数.
    int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
    return (r <= 0) ? 0 : r; // 忽略负值.
}


// 判断线程池此刻是否已经进入静寂态, 所谓的静寂态是指当前线程池中所有 worker 都已经阻塞在等待任务了,
// 因为没有任何任务可供他们偷取或执行, 也没有任何挂起的提交入池的任务. 此方法相对保守, 并不是所有线程都空闲的情况下
// 立即会返回 true, 只有在他们减少了活跃数之后.(也就是保持空闲一段时间)
public boolean isQuiescent() {
    // 前面分析过, 显然这个表达式不大于 0 即为不溢出的情况, 回忆前面关于 scan 时, 终止时等的降低活跃数.
    return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0;
}

与关闭有关的方法.

// 此方法的执行参数, 注意,now 传 false,enable 传 true.
// 它的执行结果很简单(可以参考前面的 tryTerminate).
//1. 此前已经调用过 tryTerminate 并 enable, 或者调过 shutdown, 那会导致一次终结.
//2. 初次调用, 前面提交过的任务继续执行, 但不会接受新的任务(因为 runState 首位置 1 了).
//3.commonPool 不许关.
//4. 已关的, 调用了也没什么效果. 但第二次调用时, 已经在过程中的任务可能受此影响取消.
public void shutdown() {checkPermission();
    tryTerminate(false, true);
}


// 它会尝试立即取消和停止所有的任务, 拒绝后续提交的任务. 如果是 common 池则无效果.
// 如果已经关闭, 再调用无影响. 正在被提交入池或正在执行的任务 (在调用此方法执行时) 可能会被取消, 也可能不会(取决于时机, 可能早于取消过程而完成执行).
// 它会取消掉已存在的任务或未执行的任务. 方法总会返回一个空的 list.(与其他 executor 不同)
public List<Runnable> shutdownNow() {checkPermission();
    tryTerminate(true, true);
    return Collections.emptyList();}

// 很好理解,runState 的 31 位是 1, 而仅有在 shutdown 方法中所有 worker 都已闲置或 ws 为空才会加上此位. 显然此时所有任务都已完成.
public boolean isTerminated() {return (runState & TERMINATED) != 0;
}

// 正在关闭中, 一定有 STOP 标记位, 没有 TERMINATED 位.

public boolean isTerminating() {
    int rs = runState;
    return (rs & STOP) != 0 && (rs & TERMINATED) == 0;
}

// 已 SHUTDOWN, 首位标记, 显然只要 shutdown 方法调用并传 enable 为 true 一定会有此结果.
public boolean isShutdown() {return (runState & SHUTDOWN) != 0;
}


// 等待一个 shutdown 请求后所有的任务完成或者发生超时, 或者当前线程被扰动(第一优先级).
// 因为 common 池永远不会随程序调用 shutdown 而终止, 因此使用 commonPool 调用此方法时,
// 会直接等效于 awaitQuiescence, 而且永远会返回 false.
// 返回 true, 代表当前线程池终止了,false 代表超时了.
public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {if (Thread.interrupted())
        //1. 当前线程中断, 抛出异常.
        throw new InterruptedException();
    if (this == common) {
        //2.common 池等效于 awaitQuiescence 并返回 false.
        awaitQuiescence(timeout, unit);
        return false;
    }
    long nanos = unit.toNanos(timeout);
    if (isTerminated())
        //3. 发现所有任务已完成返回 true.
        return true;
    if (nanos <= 0L)
        //4. 已超时返回 false.
        return false;
    //5. 计算 deadline 并进入循环等待逻辑.
    long deadline = System.nanoTime() + nanos;
    synchronized (this) {for (;;) {if (isTerminated())
                //5.1 循环中发现已达到完成态, 返回 true.
                return true;
            if (nanos <= 0L)
                //5.2 循环时发现超时,false.
                return false;
            //5.3 循环时减少时间并等待.
            long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
            wait(millis > 0L ? millis : 1L);
            nanos = deadline - System.nanoTime();}
    }
}


// 等待静寂. 如果当前线程是池内线程, 等效于 ForkJoinTask::helpQuiesce 方法, 否则只是等待.
public boolean awaitQuiescence(long timeout, TimeUnit unit) {long nanos = unit.toNanos(timeout);
    ForkJoinWorkerThread wt;
    Thread thread = Thread.currentThread();
    if ((thread instanceof ForkJoinWorkerThread) &&
        (wt = (ForkJoinWorkerThread)thread).pool == this) {
        //1. 线程是 ForkJoinWorkerThread, 帮助静寂. 返回 true.
        helpQuiescePool(wt.workQueue);
        return true;
    }
    //2. 不是池内线程, 准备计时, 初始化若干变量.
    long startTime = System.nanoTime();
    WorkQueue[] ws;
    int r = 0, m;
    boolean found = true;// 代表发现任务.
    //3. 循环等待静寂或超时.
    while (!isQuiescent() && (ws = workQueues) != null &&
           (m = ws.length - 1) >= 0) {
        //3.1 有趣的地方, 只有本轮没找到任务才会进行超时判断.
        if (!found) {
            //3.1.1 判断超时了, 返回 false.
            if ((System.nanoTime() - startTime) > nanos)
                return false;
            //3.1.2 没超时, 放弃执行权一段时间, 不能阻塞在此.
            Thread.yield(); // cannot block}
        // 改为 false.
        found = false;
        //4. 内循环从数组中间开始, 一直递减到 0.
        for (int j = (m + 1) << 2; j >= 0; --j) {
            ForkJoinTask<?> t; WorkQueue q; int b, k;
            //4.1 取队列从 0 开始, 只要取出了 ws 的非空队列成员, 进入逻辑.
            if ((k = r++ & m) <= m && k >= 0 && (q = ws[k]) != null &&
                (b = q.base) - q.top < 0) {
                //4.2 found 标记 true
                found = true;
                if ((t = q.pollAt(b)) != null)
                    //4.3 尝试从底部取出任务并执行. 相当于帮助静寂
                    t.doExec();
                // 进入 4.1, 即 break 掉内循环, 可能凑巧, 执行完一个任务就静寂了.
                break;
            }
        }
    }
    // 能从 while 循环 break 出来或者循环条件为假退出, 说明达到静寂.
    return true;
}

上面的代码本身没有什么问题, 但是已经涉及到了外部 api,ForkJoinTask::helpQuiesce, 此 api 是由我们决定调用时机的, 显然, 我们可以在任何一个入池的 ForkJoinTask 中执行此方法来帮助 ForkJoinPool 进入静寂态, 帮助执行所有待执行的任务, 参考 helpQuiescePool 方法(会先执行本地任务, 再偷其他队的任务执行).

到此, 源码只剩下一个 blocker 了.

//MangedBlocker 接口. 它是一个为运行在 ForkJoinPool 中的任务维护并行度的接口
// 我们可以通过拓展它来实现在 ForkJoinPool 中运行的任务的并行度管理. 它只有两个方法.
//isReleasable 方法会在没有必要阻塞时一定返回 true,block 方法会在必要时阻塞当前线程,
// 它内部可以调用 isReleasable. 而这个调度需要使用 ForkJoinPool#managedBlock(ManagedBlocker)
// 它会尝试去调度, 避免长期的阻塞, 它允许更灵活的内部处理.
public static interface ManagedBlocker {
    // 可能会阻塞一个线程, 比如等待监视器, 当返回 true 时表示认为当前没有必要继续 block.
    boolean block() throws InterruptedException;

    // 返回 true 表示认为没有必要 block.
    boolean isReleasable();}


// 运行给定的阻塞任务, 当在 ForkJoinPool 运行 ForkJoinTask 时, 此方法在当前线程阻塞的情况下(调用 blocker.block),
// 认为需要保持必要并行度时安排一个备用线程, 方法内重复调用 blocker.isReleasable 和 blocker.block. 且前者必在后者前,
// 它返回 false 时才会有后者. 如果没运行在 ForkJoinPool 内, 那么方法的行为等效于下面这段代码:
//while(!blocker.isReleasable()){if(blocker.block() break;}
// 参数 blocker 是上面的接口的实现类, 在前面的文章 CompletableFuture 和响应式编程中曾见到一个实现类.
public static void managedBlock(ManagedBlocker blocker)
    throws InterruptedException {
    ForkJoinPool p;
    ForkJoinWorkerThread wt;
    Thread t = Thread.currentThread();
    //1. 当前是 ForkJoinPool 池内线程时的逻辑.
    if ((t instanceof ForkJoinWorkerThread) &&
        (p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
        WorkQueue w = wt.workQueue;
        //1.1 取出工作队列, 进行循环,blocker.isReleasable 判断当前并非没有必要加锁时进入.
        while (!blocker.isReleasable()) {
            //1.2 要加锁, 尝试补偿, 它会在此时唤醒一个空闲的线程或创建一个新的线程来补偿当前线程的阻塞.
            if (p.tryCompensate(w)) {
                try {
                    //1.3 当前线程阻塞等待.
                    do {} while (!blocker.isReleasable() &&
                                 !blocker.block());
                } finally {U.getAndAddLong(p, CTL, AC_UNIT);
                }
                break;
            }
        }
    }
    //2. 非池内线程的逻辑. 同上面的阻塞逻辑.
    else {do {} while (!blocker.isReleasable() &&
                     !blocker.block());
    }
}

关于 blocker 我们并不陌生, 在 CompletableFuture 和响应式编程一文中, 我们提到了 CompletableFuture 中内部实现了一个 blocker, 并使用 ForkJoinPool 的 managedBlock 方法管理. 还记得这方面的实现吗?

CompletableFuture 内部维护了一个类似栈的结构, 用内部类 Completion 和它的子类们实现, 而 Completion 本身是 ForkJoinTask 的子类.

同样, 使用 CompletableFuture 时, 我们可以在入口方法入包含 runAsyc 之类的方法, 该方法默认会提供一个线程池, 而此线程池会由可用核数来决定, 会选定一个 ForkJoinPool 或一个 low 逼的一任务一线程的线程池.

如果选择了 ForkJoinPool, 显然能及时补偿一个工作线程的阻塞是非常有必要的, 这也是提升性能之举.

到此为止,ForkJoinPool 的源码分析完毕.

后语

这篇文章是 ForkJoin 框架系列的最后一篇,前面分析了 ForkJoinPool 的代码,它是 ForkJoin 框架的核心,代码较为复杂,作者个人觉得它也是所有线程池中最复杂的一个,下面我们来总结一下 ForkJoinPool 和整个 ForkJoin 框架。

ForkJoinTask 是运行在 ForkJoinPool 的 task,它定义了任务自身的入口 api,维护了任务的 status 字段和 result,结合 ForkJoinPool 来实现调度。ForkJoinTask 一定会运行在一个 ForkJoinPool 中,如果没有显式地交它提交到 ForkJoinPool,会使用一个 common 池(全进程共享)来执行任务。

ForkJoinTask 支持 fork 和 join,fork 就是将当前 task 入池,join 就是等待此 task 的结束并获取结果。

CountedCompleter 是一个另类的 ForkJoinTask,它在 ForkJoinTask 基础上维护了一个栈链,其实在某些视角上即像栈,又像一个不保存子节点的树。同时它也不保存运行结果,使用它去 getRawResult 只能得到 null,但是任务的 status 会进行维护(委托给父类 ForkJoinTask)。并行流是基于它来实现的,调度交由 CountedCompleter 完成,而原集的分割,结果的合并则由并行流的逻辑实现。

ForkJoinWorkerThread 是运行在 ForkJoinPool 中的线程,它内部会维护一个存放 ForkJoinTask 的 WorkQueue 队列,而 WorkQueue 是 ForkJoinPool 的内部类。

ForkJoinPool 是框架的核心,不同于其他线程池,它的构建不需要提供核心线程数,最大线程数,阻塞队列等,还增加了未捕获异常处理器,而该处理器会交给工作线程,由该线程处理,这样的好处在于当一个线程的工作队列上的某个任务出现异常时,不至于结束掉线程,而是让它继续运行队列上的其他任务。它会依托于并行度(或默认根据核数计算)来决定最大线程数,它内部维护了 WorkQueue 数组 ws 取代了阻塞队列,ws 中下标为奇数的为工作线程的所属队列,偶数的为共享队列,虽然名称有所区分,但重要的区别只有一点:共享队列不存在工作线程。

关于工作窃取,线程池外的提交者在 join 一个任务或 get 结果时,如果发现没有完成,它不会干等着工作线程,而是尝试自行执行,当执行方法结束,任务还没有完成的情况,它可以帮助工作线程做一些其他工作,比如当任务是 CountedCompleter 类型时,帮助完成位于栈链前方的子任务,而这个子任务先从当前 worker 队列的 top 找,后从其他队列的 base 找;线程池中的工作线程会在任务入队时被尝试唤醒,会循环执行,每轮循环都会先尝试随机 scan 到一个任务(该任务可能属于其他线程),执行它,再执行本地任务,如此往复,scan 的过程可以理解为一种窃取,当不能窃取时则会 inactive;工作窃取时从队列的 base 开始,工作压入时从 pop 进入,执行自己队列的任务时,依托于 FIFO 还是 LIFO 的模式。此外,反向帮助小偷(helpStealer)也是一个“反弹式”的工作窃取,它与 helpComplete 一并属于工作窃取的一部分。

ForkJoinPool 维护了一个 ctl 控制信号,前 16 位表示活跃 worker 数,33 至 48 位表示 worker 总数,后 32 位可以粗略理解用于表示 worker 等待队列的栈顶。ForkJoinPool 利用这个 ctl,WorkQueue 的 scanState 和 stackPred 以及 ws 的索引算法维护了一个类似队列(或者叫栈更贴切一些)的数据结构。每当有一个线程偷不到任务,就会存放此前的 ctl 后置标记位到 pred,并将自己的索引交给 ctl 作为栈顶。相应的唤醒操作则由栈顶起。相应的方法在进行尝试添加 worker 时,会综合当前是否有阻塞等待任务的线程。

当所有线程都不能窃取到新的任务,进入等待队列时,称之为“静寂态”。

ForkJoinPool 对全局全状的修改需要加锁进行,这些操作如修改 ctl(改变栈顶,增删活跃数或总数等),处理 ws 中的元素,扩容 ws,关闭线程池,初始化(包含 ws 的初始化),注册线程入池等。而这个锁就是 runState,它除了当锁,也间接表示了运行状态,相应的线程池的 SHUTDOWN,STOP,TERMINATED 等状态均与其相应的位有关。

线程池的并行度保存在 config 字段的后 16 位,config 的第 17 位决定了是 FIFO 还是 LIFO。而这个并行度也通过间接地取反并计入到 ctl 的前 32 位,线程池中判断是否当前有活跃的线程,或者是否已进入寂静态,都是用保存在 config 的并行度和保存在 ctl 前 32 位的活跃数与并行度的运算结果进行相加,判断是否会溢出(正数)来决定的。

ForkJoinPool 还提供了补偿机制,用于在线程将要阻塞在执行过程中前释放掉一个正在空闲的工作线程或创建一个新的工作线程,从而保证了并行度。第一篇文章中提到的 CompletableFuture 就是将 Completion 栈(ForkJoinTask)交给 ForkJoinPool(取决于并行度)去执行并用它来进行调度。

ForkJoinPool 的关闭则可能有多种场景:当一个 worker 被解除注册时,尝试一次,并不强关,也不指定 enable,只有在线程池已经收到关闭信号并处在过程中时,它才会帮助关闭;工作线程因为 scan 不到 work 而不得不进行 await,当它发现当前线程池已处于静寂态,也尝试同上的关闭线程池,同样不强行关闭也不指定 enable,只有线程池已经收到关闭信号并处在过程中时,它才会帮助关闭;线程池外提交任务时,发现线程池已收到关闭信号,尝试帮助关闭;手动传入关闭信号,即调用 shutdown 时,会指定非 now,enable,则线程池将收到关闭信号,记录该信号,并进行关闭流程,当下一次再有前述三种情况调用时,必然可以进入到关闭流程;即刻关闭,shutdownNow,它会要求即刻进入关闭,不会进入非 now 的情况下的 release 以及等待静寂等操作。

完。

正文完
 0