JUC-之Phaser

20次阅读

共计 29320 个字符,预计需要花费 74 分钟才能阅读完成。

前言

在前面的几篇文章中详述了 ForkJoin 框架的若干组分, 在相应的官方文档中总会不时地提起 ”Phaser”, 同样的, 也提到 Phaser 可以用于帮助运行在 ForkJoinPool 中的 ForkJoinTask 运行时保持有效的执行并行度(其实特指其他 task 都在等待一个 phase 的前进时).

熟悉 JUC 的朋友都知道它的大概组成部分包含:Containers(支持并发的容器),Synchronizers(同步器),Executors(线程池),BlockingQueue(阻塞队列),Atomic(原子类),Lock and Condition(锁). 而 Phaser 和 CyclicBarrier,Semaphore 等一样是一个同步器.

本文主要介绍 Phaser 的内部实现, 粗略介绍使用, 它的源码相比于线程池较为简单, 但最好能对比其他同步器来了解, 读者最好拥有 juc 其他同步器, 原子类, 部分 ForkJoin 框架的基础.

同时, 本文也会再次提到 ForkJoinPool::managedBlock(blocker), 之前在 ForkJoinPool 一文提到了实现和接口, 而在 CompletableFuture 中见到了一个 blocker 的实现.

Phaser 源码

首先来看一些与 Phaser 状态有关的简单的常量.

//64 位整数表示 Phaser 的状态.
private volatile long state;

private static final int  MAX_PARTIES     = 0xffff;// 最大 parties, 后 16 位表示.
private static final int  MAX_PHASE       = Integer.MAX_VALUE;// 最大 phase, 最大整数值.
private static final int  PARTIES_SHIFT   = 16;// 取 parties 使用的移位数,16
private static final int  PHASE_SHIFT     = 32;// 取 phase 的移位数,32
private static final int  UNARRIVED_MASK  = 0xffff;      // 未到的, 取后 16 位.
private static final long PARTIES_MASK    = 0xffff0000L; // 参加者,17-32 位.
private static final long COUNTS_MASK     = 0xffffffffL; // 数量, 后 32 位.
private static final long TERMINATION_BIT = 1L << 63;// 终止态, 首位.

// 特殊值.
private static final int  ONE_ARRIVAL     = 1;
private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
private static final int  ONE_DEREGISTER  = ONE_ARRIVAL|ONE_PARTY;// 第 1 位和 17 位. 显然, 它表示了一个 ONE_ARRIVAL 信息和 PARTY 信息.
private static final int  EMPTY           = 1;

// 对一个 state s 计算 unarrived 的 count,
private static int unarrivedOf(long s) {// 直接取整数位, 如果等于 EMPTY(1)则返回 0, 否则取后 16 位.
    int counts = (int)s;
    return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
}

// 对一个 state, 取出 parties 信息, 直接取 state 的 17 至 32 位.
private static int partiesOf(long s) {return (int)s >>> PARTIES_SHIFT;
}
// 对于一个 state, 取出 phase 信息, 直接取前 32 位.
private static int phaseOf(long s) {return (int)(s >>> PHASE_SHIFT);
}
// 对于一个 state, 取出 arrived 信息
private static int arrivedOf(long s) {int counts = (int)s;
    //state 的后 32 位等于 1(EMPTY)返回 0, 否则返回 parties(state 的 17 至 32 位, 参考上面的 partiesOf 方法)和 UNARRIVED(state 的后 16 位)的差.
    return (counts == EMPTY) ? 0 :
        (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
}

上面都是一些常量, 没什么可分析的, 简单来个总结.

Phaser 用一个 long 型的 state 保存状态信息.

state 的前 32 位表示 phase, 后 16 位表示 unarrivied,17 至 32 位表示 parties,parties 减去 unarrived 即 arrived.

下面我们看一些成员变量和有关函数.

//this 的父, 可以是 null 表示 none
private final Phaser parent;

//phaser 显然是个树的结果,root 代表根, 如果当前 phaser 不在树内, 则 root==this
private final Phaser root;


// 偶数队列和奇数队列. 它们存放等待线程栈的头, 为了减少当添加线程与释放线程的竞态,
// 这里使用了两个队列并互相切换, 子 phaser 共享 root 的队列以加快释放.
private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;

// 决定某个 phase 的等待线程队列.
private AtomicReference<QNode> queueFor(int phase) {
    // 选择队列的方法, 如果参数 phase 是偶数, 使用 evenQ, 否则 oddQ.
    return ((phase & 1) == 0) ? evenQ : oddQ;
}

// 出现 arrive 事件时的边界异常信息.
private String badArrive(long s) {
    return "Attempted arrival of unregistered party for" +
        stateToString(s);
}

// 注册时的边界异常信息.
private String badRegister(long s) {
    return "Attempt to register more than" +
        MAX_PARTIES + "parties for" + stateToString(s);
}
// 他们都用到的 stateToString(s), 计算参数 s 对应的 phase,parties,arrived.
private String stateToString(long s) {return super.toString() +
        "[phase =" + phaseOf(s) +
        "parties =" + partiesOf(s) +
        "arrived =" + arrivedOf(s) + "]";
}

为了便于理解, 先来看队列的实现.

// 表示等待队列的 QNode, 实现了 ManagedBlocker
static final class QNode implements ForkJoinPool.ManagedBlocker {
    // 存放所属 phaser
    final Phaser phaser;
    // 所属 phase
    final int phase;
    // 是否可扰动
    final boolean interruptible;
    // 是否定时
    final boolean timed;
    // 是否已扰动
    boolean wasInterrupted;
    // 计时相关
    long nanos;
    final long deadline;
    // 关联线程, 当是 null 时, 取消等待.
    volatile Thread thread; 
    // 下一个 QNode
    QNode next;

    QNode(Phaser phaser, int phase, boolean interruptible,
          boolean timed, long nanos) {
        this.phaser = phaser;
        this.phase = phase;
        this.interruptible = interruptible;
        this.nanos = nanos;
        this.timed = timed;
        this.deadline = timed ? System.nanoTime() + nanos : 0L;
        // 取当前线程.
        thread = Thread.currentThread();}
    //isReleasable 方法
    public boolean isReleasable() {if (thread == null)
            //1. 线程已置空(如 2), 返回 true 释放.
            return true;
        if (phaser.getPhase() != phase) {
            //2. 发现 phaser 所处的 phase 不是构建 QNode 时的 phase 了, 就置线程为空, 返回 true.
            thread = null;
            return true;
        }
        if (Thread.interrupted())
            //3. 如果当前线程扰动了.
            wasInterrupted = true;
        if (wasInterrupted && interruptible) {
            //4. 发现扰动标记, 并且 QNode 配置为可扰动, 则置线程 null 并返回 true
            thread = null;
            return true;
        }
        if (timed) {
            //5. 定时逻辑, 还有 nanos, 计算新的时长.
            if (nanos > 0L) {nanos = deadline - System.nanoTime();
            }
            if (nanos <= 0L) {
                // 已经到时间, 返回 true, 线程置空.
                thread = null;
                return true;
            }
        }
        return false;
    }
    //block 逻辑
    public boolean block() {if (isReleasable())
            return true;
        else if (!timed)
            // 不定时的 park
            LockSupport.park(this);
        else if (nanos > 0L)
            // 定时的情况.
            LockSupport.parkNanos(this, nanos);
        // 老规矩
        return isReleasable();}
}

前面介绍过 CompletableFuture 的 Singnaller, 以及 ForkJoinPool 中的 managedBlock, 这一块的逻辑显然驾轻就熟.

很明显, 如果我们在 ForkJoinPool 中使用它作为 blocker, 并在相应的 ForkJoinTask 的 exec 或 CountedCompleter 的 compute 方法中使用 ForkJoinPool::managedBlock(blocker), 将每个 ForkJoinWorkerThread 在阻塞前构建一个 QNode 进入 Phaser 的等待队列(虽然还没有讲到相关内容, 但是 Phaser 显然不用我们直接操作内部类 QNode), 那么它将依照上述逻辑进行补偿, 保障有效的并行度.

前面完成了承前启后, 预热到此结束, 开始看 Phaser 的核心方法.

//doArrive 方法
// 它是 arrive 和 arriveAndDeregister 方法的主要实现. 手动调用这些方法可以加速通过和最小化竞态窗口期.
// 参数代表要从当前 state 中减去的调整数值, 它的单位依托于业务, 当为 arrive 时减去的单位为 ONE_ARRIVAL,
// 当为 arriveAndDeregister 时减去的单位为 ONE_DEREGISTER.
private int doArrive(int adjust) {
    final Phaser root = this.root;
    for (;;) {
        //1. 变量 s 初始化, 取决于是否当前 Phaser 是 root. 不是 root 将试图从 root 同步滞后的 state.
        long s = (root == this) ? state : reconcileState();
        // 计算 phase, 前 32 位.
        int phase = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            //2. 负数直接返回. 说明原来的 state 首位就是 1, 前面的 TERMINATE_BIT 就是 64 位置 1.
            return phase;
        // 取 count, 后 32 位.
        int counts = (int)s;
        // 计算 unarrived, 和前面一样的逻辑.
        int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
        if (unarrived <= 0)//2.1
            // 没有 unarrived 了, 说明不应该调用此方法, 抛出异常, 信息就是前面介绍过的 badArrive
            throw new IllegalStateException(badArrive(s));
        //3. 尝试将 state 减去 adjust 数.
        if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
            //3.1cas 成功后,unarrived 余 1, 则前进一个 phase
            if (unarrived == 1) {
                //3.1.1 取出 parties 作为下一个 state 的基础.
                long n = s & PARTIES_MASK;
                //3.1.2 下一个 unarrived, 数值上等于 parties.
                int nextUnarrived = (int)n >>> PARTIES_SHIFT;
                if (root == this) {
                    //3.1.3 当前 Phaser 是 root,onAdvance 返回 true, 则加上终止信号.
                    if (onAdvance(phase, nextUnarrived))
                        n |= TERMINATION_BIT;
                    else if (nextUnarrived == 0)
                        //3.1.4 onAdvance 返回 false, 而计算得出的 nextUnarrived 是 0, 即没有 parties,n 加上一个 empty(1)
                        n |= EMPTY;
                    else
                        //3.1.5nextUnArrived 不是 0, 加到 n 上.
                        n |= nextUnarrived;
                    //3.1.6 前面的流程完成了 state 的后 32 位(parties 和 unarrived), 接下来处理前 32 位.
                    // 限定在 MAX_PHASE 之内, 对当前 phase 加 1.
                    int nextPhase = (phase + 1) & MAX_PHASE;
                    // 将 nextPhase 的值加到 n 的前 32 位. 并用 n 去 cas 掉原来的 state, 因为有 3 处入口的 cas, 此处一定能成功
                    n |= (long)nextPhase << PHASE_SHIFT;
                    UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
                    // 更新到新的 phase, 唤醒等待的 waiter.
                    releaseWaiters(phase);
                }
                //3.1.7 当前 Phaser 不是 root, 当 nextUnarrived 计算得 0 时, 像父传递解除注册, 参数 ONE_DEREGISTER
                // 会同时减去一个 unarrived 和一个 parties. 下轮循环正常应进入 3.1.8
                else if (nextUnarrived == 0) {phase = parent.doArrive(ONE_DEREGISTER);
                    // 完成传递后, 将自己的 state 置 empty.
                    UNSAFE.compareAndSwapLong(this, stateOffset,
                                              s, s | EMPTY);
                }
                else
                    //3.1.8, 当前 Phaser 不是 root, 计算的 nextUnarrived 非 0, 像父传递一个 arrive 事件, 减去一个 unarrived.
                    phase = parent.doArrive(ONE_ARRIVAL);
            }
            //3.2 返回当前 phase, 可能是已进入 3.1 递增的. 仅有此处可退出循环.
            return phase;
        }
    }
}

关于该方法的执行流程, 我们结合几个周边方法一并分析, 先来看注册方法和 onAdvance 勾子.

// 注册和批量注册. 参数代表 parties 和 unarrived 字段的增加数, 它必须大于 0.
private int doRegister(int registrations) {
    // 1. 用参数计算一个 adjust, 同时包含 parties 和 arrive.
    long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
    final Phaser parent = this.parent;
    int phase;
    // 循环尝试更改.
    for (;;) {
        //2. 存在 parent, 则用 root 的 phase 调整 this 的 state.
        long s = (parent == null) ? state : reconcileState();
        // 取出当前 state 中保存的 counts,parties,unarrived 信息.
        int counts = (int)s;
        int parties = counts >>> PARTIES_SHIFT;
        int unarrived = counts & UNARRIVED_MASK;
        if (registrations > MAX_PARTIES - parties)
            // 要注册的数量大于了余量, 抛出异常.
            throw new IllegalStateException(badRegister(s));
        //3. 计算出 phase
        phase = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            //phase 为负说明 state 为负, 即终止态, 终止.
            break;
        //4. 当前 state 表示的参与数非空的逻辑, 当前注册非首次注册.
        if (counts != EMPTY) {if (parent == null || reconcileState() == s) {
                //this 是 root 或者从 root 同步的 state 不变, 继续执行, 否则重新循环.
                if (unarrived == 0)  
                    //4.1 本轮循环通过原 state 计算的 unarrived 为 0, 说明应等待下一 phase, 使用 root 等待      
                    root.internalAwaitAdvance(phase, null);
                else if (UNSAFE.compareAndSwapLong(this, stateOffset,
                                                   s, s + adjust))
                    //4.2 本轮循环未发现应等待下一 phase, 尝试原子更新, 增加 adjust 到 state 上.
                    break;
            }
        }
        //5. 当前不存在 counts, 且自身就是 root, 代表 root 的首次注册.
        else if (parent == null) { 
            //5.1 计算下一个 state, 因为没有参与数, 使用 phase 初始化前 32 位, 并使用 adjust 做后 32 位.        
            long next = ((long)phase << PHASE_SHIFT) | adjust;
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
                //5.2 cas 成功, 退出, 不成功, 下轮循环.
                break;
        }
        //6. 是首次注册, 但也不是 root 的逻辑. 代表非 root 的 Phaser 的首次注册.
        else {
            //6.1 对当前 Phaser 加锁并 double check, 避免同时调用. 加锁失败的线程将在后续进入 2 的逻辑.
            synchronized (this) {  
                //double check state 未发生改变.   
                if (state == s) {
                    //6.2 首先向父 Phaser 注册 1.
                    phase = parent.doRegister(1);
                    if (phase < 0)
                        // 发现进入终止态, 直接停止.
                        break;
                    
                    //6.3 向父 Phaser 注册成功, 循环尝试 cas 掉老的 state, 新 state 的算法同上,phase 加 adjust.
                    // 在整个 while 循环中, 不再考虑 phase 进入终止态的情况, 因为这些操作处于同一个 "事务" 中,
                    // 且因竞态等原因, 若某次 cas 时计入了负数的 phase, 方法返回后也可以及时发现.
                    while (!UNSAFE.compareAndSwapLong
                           (this, stateOffset, s,
                            ((long)phase << PHASE_SHIFT) | adjust)) {
                        // 如果 cas 不成功, 则读取 s 为新的 state, 计算新的 phase 并重新循环.
                        s = state;
                        phase = (int)(root.state >>> PHASE_SHIFT);
                        // assert (int)s == EMPTY;
                    }
                    //6.4cas 成功后退出循环.
                    break;
                }
                // 如果 if(state==s)判断失败, 说明有别的线程有当前线程进入 synchronized 块前已经加锁并执行了内部的逻辑且稍后释放了锁,
                // 这样当前线程加锁成功, 但 if 判断失败, 它会立即释放锁并返回到 2.
            }
        }

    }
    return phase;
}


// 使用 root 的 phase 调整 this 的 state, 更新滞后的结果. 这一般发生在 root 前进了 phase 但是
// 子 phaser 还没有做到这一步, 这种情况下, 子 phaser 必须完成这个前进的步骤, 这一过程中,phase 将
// 被置为 root 的 phase,unarrived 则会重置为 parties, 若 parties 为 0, 则置为 EMPTY. 返回结果 state.
private long reconcileState() {
    final Phaser root = this.root;
    long s = state;
    // 不是 root 才进行下一步.
    if (root != this) {
        int phase, p;
        //cas,phase 采用 root,parties 不变,unarrived 重置为 parties 或 EMPTY.
        while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
               (int)(s >>> PHASE_SHIFT) &&
                //phase 滞后于 root
                // 尝试 cas.
               !UNSAFE.compareAndSwapLong
               (this, stateOffset, s,
                // 确定新 state 的前 32 位, 使用 root 的 phase.
                s = (((long)phase << PHASE_SHIFT) |
                    // 新 phase<0, 后 32 位直接取 this 的 state 表示的 counts.
                     ((phase < 0) ? (s & COUNTS_MASK) :
                    //phase 有效,this 的 state 表示的 parties 为 0, 则后 32 位使用 empty
                      (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :
                        // 否则, 后 32 位使用 parties.
                       ((s & PARTIES_MASK) | p))))))
            s = state;
    }
    return s;
}


//onAdvance 勾子方法, 参数为当前 phase 和注册的 parties 数.
// 默认实现为 parties 数为 0, 方法返回 true 时, 调用者会尝试终止 Phaser.(参考前面的 doArrive). 随后调用 isTerminated 方法将返回 true.
// 执行此方法时抛出的运行时异常或 Error 将直接上抛给尝试 advance 相应的 phase 的线程, 这种情况下不会发生 phase 的 advance.
// 方法的入参表示的是 Phaser 当前的 state(未 advance 前), 因此若在 onAdvance 方法中执行 arrive,regist,waiting 这三种操作的行为是不确定的也不可靠的.
// 如果当前 Phaser 是一个级联的成员, 那么 onAdvance 只会由 root 在每次 advance 时调用.
// 方法的默认实现返回 true 的场景目前只能是经过数次 arriveAndDeregister 调用造成 parties 归零的结果. 我们继承 Phaser 可以轻易地重写此行为,
// 比如简单粗暴地返回 false, 那么将永远允许新的注册.
protected boolean onAdvance(int phase, int registeredParties) {return registeredParties == 0;}

经过前面的代码分析, 已经对 Phaser 的核心函数 doRegister,doArrive 有了全面的了解.

两者都会在一开始同步 root 的 phase, 且如果出现落后 root 的情况, 同步了新的 phase 的同时, 也会重新初始化 unarrived, 并且使用 parties 的值.

doArrive 方法会每次调整 unarrived 数量(也可包含 parties 数量, 如果使用了解除注册), 当 Phaser 调用自身的 arrive/arriveAndDeregister 时, 会做出相应的减少, 并根据是否为 root 而决定向上递归.

Phaser 减少自身 unarrived 信号 (也可能同时有 parties 信号) 后, 若发现这已经是最后一个 unarrived 信号, 则进行接下来的判断:

1. 当前 Phaser 是 root,advance 并唤醒 waiter.(重要的唤醒操作执行点,root 一轮完成)

2. 当前 Phaser 不是 root, 且它已经不具备继续下一轮的条件(计算 nextUnarrived 为 0, 即 parties 已经被 arriveAndDeregister 置 0), 则从父 Phaser 减少一个 unarrived 和 parties.

3. 当前 Phaser 不是 root, 但它仍具有 parties, 满足进行下一轮的条件(计算 nextUnarrived 不是 0), 则从父 Phaser 减少一个 unarrived, 但不减少 parties.

显然, 子 Phaser 的最后一个 unarrived 的消失一定会造成父的 unarrived 减少, 子 Phaser 不能继续下一 phase 的 register 和 arrive 时, 从父 Phaser 中卸载.

若不是本 Phaser 的最后一个 unarrived 信号, 则直接结束, 相当于只进行了上面的减少信号操作.

doRegister 方法的逻辑大致相反, 不同于 doArrive, 它的参数 registrations 同时作用于 parties 和 unarrived, 即两个位上同时加上 registrations 参数. 它的大致逻辑:

1. 当前注册并非首次注册, 且出现 unarrived==0, 即本轮已经完成了 arrive, 那么本轮将不能注册, 需要等待 root 更新到下轮.(这也是我们碰到的第一个阻塞)

2. 当前注册并非首次注册,unarrived 也不是 0, 则在本 phase 进行注册, 增加相应的 parties 和 unarrived.

3. 当前注册是 root 的首次注册, 给 root 的 state 加上相应的 parties 和 unarrived.

4. 当前注册是非 root 的首次注册, 加锁(this), 对自己的 state 加上相应的 parties 和 unarrived(同上, 以 registrations 为单位), 而对 parent 加上一个 parties 和 unarrived 单位.

很明显, 对于单 Phaser 的情况非常好理解, 每次减少 unarrived 数量(先不考虑减少 parties), 则最终导致 Phaser 自身进入下一个 phase, 然后重新初始化 unarrived 到下一轮,unarrived 的新值是前一轮剩下的 parties 数量.

当我们同时也尝试减少 parties 数量, 即解除 parties 的注册, 最终导致没有 parties, 那么 Phaser 将进入终止态.

整个过程中, 只要 Phaser 没进入终止态, 随时可以进行新的注册, 并增加 parties 和 unarrived 的数量. 每个 arrive 可以减少 unarrived 的数量为任何正整数, 不一定是 1.

对于多 Phaser 的情况, 有两个特殊点:

1. 对任意 Phaser 树中的某一个 Phaser 调用注册操作, 会令自身加上相应参数个 parties 和 unarrived 单位, 仅会在该 Phaser 第一次注册时增加父 Phaser(极端可能, 仅从一个叶子节点第一个注册的情况下可一直递归到 root)的 parties 数和 unarrived 数各 1 单位(不论参数是多少).

2. 对任意 Phaser 树中的某一个 Phaser 调用 arrive 操作, 会令自身减去相应的参数个 parties 和 unarrived 单位, 同时仅当本 Phaser 此时是最后一个 unarrived 时, 会减去父 Phaser 的一个 unarrived 单位(当前子 Phaser 仍旧有 parties 可以构建下一 phase), 或减去父 Phaser 一个 Parties 和 unarrived 单位.(极端情况下, 每一级都是最后一个 unarrived 时, 减少叶子节点的最后一个 unarrived 会递归到 root).

每新增一个子 Phaser, 父 Phaser 就会增加一个要完成触发 phase 的 advance 前必须要等到 arrive 的单位; 每一个子 Phaser 中所有的 arrive 完成, 父 Phaser 都将减少一个要等待 advance 所必需触发的 arrive.

目前没有看到 await 方法, 但可以提前说明, 等待操作完全依赖于 root 是否完成本轮. 也就是所有子 Phaser 都完成了同一轮 (arrive 打满), 才能让父 Phaser 本身减去一个所有 arrive 单位, 再触发父 Phaser 本轮的完成, 此时对任何已完成的 Phaser 进入注册, 都会进入上述的 root.internalAwaitAdvance(phase, null) 方法等待 root 进入下一 phase. 如果对已经完成所有 arrive 的 Phaser 继续进行 arrive 操作, 因为 unarrived 已经是 0, 则会抛出异常.

所以对于使用子 Phaser 的场景, 如果发生很巧妙的情况,Phaser 树上当前子 Phaser 的 arrive 结束条件满足了, 使得新来的注册只能等待下一轮次, 而其他分支的子 Phaser 又偏偏不能完成本轮次, 那么新的 phaser.doRegister 方法将阻塞在此.

好在我们使用 Phaser 可能会类似 CyclicBarrier 的使用方式, 可对每一轮 (phase) 进行注册并等待(也许只等一轮, 那么 arrive 就要带上 deregister), 每一轮最后一个线程 arrive 了, 就会停止所有线程的等待, 让所有线程继续执行, 同时开启了下一轮次, 这些线程此时又可以不经注册直接在新的轮次中进行等待, 直到最后一个 arrive 了, 再次唤醒所有线程并继续执行, 同时 Phaser 再前进一轮, 如此往复. 中间使用 arrive 并 deregister 的线程会从本轮起减少一个 unarrive 数量(因为 parties 也减少了, 所以再下一轮初始化 unarrive 数量时也会减少一次). 我们可以让这些线程参与任意的轮次, 但要注意的是, 如果有线程中途不参加了, 一定要解除注册, 否则因为每轮初始化时, 要等待 arrive 的数量都是上一轮剩下的 parties 数量, 有线程停止了执行, 却不减少 parties 数, 那么下轮所有等待的线程将永远等不到 phaser 满足唤醒的条件.

上述的过程中可以明显的看出, 目前已介绍的两个重要核心函数: 注册和 arrive 并没有直接记录和操作线程的操作, 相应的操作在等待方法和唤醒方法中(前面提到过 release), 我们稍后介绍.

现在假设一个特殊的使用场景, 也可以区别于 CyclicBarrie 和 CountDownLatch 的使用. 还是上面的例子, 但是我们准备的线程数与 Phaser 的 parties 数 /unarrived 数不同(一般前者要多些), 会发生什么事?

首先创建了 Phaser, 不指定最初 parties 数, 并用每个线程去注册(我甚至可以用一个线程去重复注册, 每次的参数 registrations 还可以不同, 注册的作用并不是将当前线程压入队列, 而是为本 phase 设置一个 unarrive 数量, 以控制到达下个 phase 前必须有多少次 arrive 的发生), 则 parties 数和 unarrived 的初值完全与此有关, 是一个依托于我们随意注册而产生的随意值. 那么假定我们的线程数量大于这个 parties 数(假定调用注册方法的线程和 arrive 及等待的线程无关), 并令有的线程执行 arrive(完全可以一次 arrive 减去多个信号量, 甚至一个线程多次 arrive), 有的线程执行 await 等待信号 advance 到下一个 phase(一个线程在一个周期只能调用一次), 有的线程执行了 arrive 也等待 phase 前进(这种情况一个线程一周期也只能一次. 其实这些分别对应了还未介绍的 arrive,waitAdvance,arriveAndWaitAdvance 等方法), 单独进行 await 操作的线程可以是任意数量, 执行 arrive 方法的线程加上执行 arrive 并 wait 的操作的线程和必须超过 unarrived, 这才能唤醒等待线程.

目前这些还比较抽象, 等到我们看过相应的几个方法便了然了.

onAdvance 的方法默认实现就是判断本阶段注册的 parties 数量, 如果已经是 0 则说明没有 parties 了,Phaser 应该结束. 但是我们其实可以重新实现, 比如参数中同时传入了当前的 phase, 我可以规定上面的例子中 phase 最多只有 3 轮次, 那么不论什么时候 arrive, 发现了当前 phase 已进入 3 轮,Phaser 就被终止. 当然, 这一过程是由 root 执行的, 但是子 Phaser 的 phase 会在每次注册和 arrive 发生时同步 root, 因此本例中对于 phase 数的判断可以粗放到所有 Phaser, 对于 parties 数则只能作用于 root(事实上调用 onAdvance 的一定是 root).

接下来看全量构造方法和若干和上面有关的公有函数.

// 初始化一个 Phaser, 指定 parent, 指定未到来的参与者数(unarrived parties), 但这只是一个初值,
// 当我们在任何时候调用注册方法时, 还会相应的增加.
public Phaser(Phaser parent, int parties) {if (parties >>> PARTIES_SHIFT != 0)
        // 太大了, 超过了后 16 位能表示的整数.
        throw new IllegalArgumentException("Illegal number of parties");
    // 初始 phase 为 0.
    int phase = 0;
    this.parent = parent;
    if (parent != null) {
        //1. 有 parent 的情况, 共享 parent 的 root, 队列, 并向 parent 中注册 1 个 parties 和 unarrived,
        // 同时同步一次 phase(表面上是同步了 parent 的, 实际上前面已经看过, 会同步 root).
        final Phaser root = parent.root;
        this.root = root;
        this.evenQ = root.evenQ;
        this.oddQ = root.oddQ;
        if (parties != 0)
            phase = parent.doRegister(1);
    }
    else {
        //2. 无 parent 的情况,root 就是 this, 并初始化奇偶等待队列. 它使用原子引用的形式存放一个 QNode, 而 QNode 我们前面已介绍.
        this.root = this;
        this.evenQ = new AtomicReference<QNode>();
        this.oddQ = new AtomicReference<QNode>();}
    // 统一初始化 state, 后 32 位的决定依托于 parties, 如果 parties 是 0 则给予 EMPTY, 直接不管高 32 位.
    // 不为 0 则给予 phase 设置为前 32 位,parties 设置 parties 位和 unarrived 位.
    this.state = (parties == 0) ? (long)EMPTY :
        ((long)phase << PHASE_SHIFT) |
        ((long)parties << PARTIES_SHIFT) |
        ((long)parties);
}


// 注册方法, 就是调用 doRegister, 参数 1.
// 它会向 this 添加一个 unarrived 的 party, 如果正巧 root 正在进行 advance, 它需要等待下个 phase.
// 如果 this 有 parent, 且它之前没有过注册的 parties, 则首次注册会触发自身向 parent 的注册.
// 如果 this 已经终止了, 那么尝试注册将会无效并返回负值. 如果注册的数量大于了最大支持 parties(后 16 位整数),
// 会抛出 IllegalStateException
public int register() {return doRegister(1);
}


// 批量注册指定的信号量, 并返回最新的 phase. 规则基本同上.
public int bulkRegister(int parties) {if (parties < 0)
        throw new IllegalArgumentException();
    if (parties == 0)
        // 参数 0 直接查询最新的 phase 返回
        return getPhase();
    return doRegister(parties);
}


//arrive 一个信号, 不等待其他 arrive 事件, 返回最新 phase(终止态为负).
// 当前 Phaser 的 arrive 事件已满, 则对 parent 来说也会触发一个 arrive.(如果有 parent)
public int arrive() {return doArrive(ONE_ARRIVAL);
}


//arrive 并解除一个注册 parties, 也不阻塞等待其他 arrive. 如果当前 Phaser 的解除注册操作
// 将 parties 减至 0, 且 this 有 parent, 这将导致 parent 也减少一个 parties(本 phaser 解除在 parent 的注册).
public int arriveAndDeregister() {return doArrive(ONE_DEREGISTER);
}

接下来要看上面已经做足了铺垫的等待方法了, 并结合前面的队列一块看.

// 令当前线程 "到达" 此 phaser 并等待其他 parties, 它等效于 awaitAdvance(arrive()).
// 注意, 按照道格的注释, 如果你在一个未进行注册 (调用 register) 的线程里调用此方法其实是一个使用错误,
// 但是从本方法和前面以及后面有关的方法来看, 所有记录线程的方法均只与 arrive 和等待有关, 与注册无关.
// 因此 Phaser 本身无法规避这种使用错误, 我们完全可以使用另一个线程去注册, 而当前线程去 arrive, 将两个动作分开.
// 方法会返回 arrive 时最新的 phase 号. 终止时会是负值.
public int arriveAndAwaitAdvance() {
    // 记录 root, 开始循环.
    final Phaser root = this.root;
    for (;;) {
        //1. 预计算, 首先同步 state
        long s = (root == this) ? state : reconcileState();
        // 计算 phase
        int phase = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            // 已终结直接返回最终 phase.
            return phase;
        // 计算 counts,unarrived
        int counts = (int)s;
        int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
        if (unarrived <= 0)
            // 已经没有空余的 unarrived 信号了, 不能再调用 arrive, 抛出异常.
            throw new IllegalStateException(badArrive(s));
        //2. 减余 arrive 的有关逻辑. 尝试 cas 减去一个 arrive
        if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
                                      s -= ONE_ARRIVAL)) {if (unarrived > 1)
                //2.1 当前要减的信号不是本 Phaser 的最后一个信号量, 调用 root 的等待方法. 参数 2 是 node, 传空.
                return root.internalAwaitAdvance(phase, null);
            if (root != this)
                //2.2 当前要减的信号量是非 root 的 Phaser 的最后一个, 递归给 parent(虽然用了 return, 但是 parent 也可能在进入 2.1 后阻塞).
                return parent.arriveAndAwaitAdvance();
            //2.3 当前要减的信号量是 root 的最后一个.
            //2.3.1 准备计算下一个状态, 先取出 state 的 parties 信息.
            long n = s & PARTIES_MASK; 
            // 计算 nextUnarrived, 它是现在的 parties.
            int nextUnarrived = (int)n >>> PARTIES_SHIFT;
            //2.3.2 前进 phase 逻辑.
            if (onAdvance(phase, nextUnarrived))
                // 需要终止, 给新 state 的计算基石 n 加上终止标记.
                n |= TERMINATION_BIT;
            else if (nextUnarrived == 0)
                // 计算的 nextUnarrived 是 0, 即没有 parties, 加上空标记位.
                n |= EMPTY;
            else
                // 下一轮能正常进行, 加上 nextUnarrived 位.
                n |= nextUnarrived;
            //2.3.3 给 n 加上下一个 phase.
            int nextPhase = (phase + 1) & MAX_PHASE;
            n |= (long)nextPhase << PHASE_SHIFT;
            if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
                // 用 n 进行 cas 不成功, 将新的 phase 返回.
                // 说明一下, 因为方法执行到此前已经执行过 2 的入口 cas, 减去了最后一个 unarrived, 因此在 2 到此的过程中若有新的注册,
                // 它内部会读到 0 个 unarrived, 就会等待下一个 phase(参考前面介绍过的注册方法), 因此 cas 失败不会是因为 2 之后有新的注册.
                // 在整个 arrive 系列的方法中, 最后一次 arrive 发生后, 本 Phaser 不可能有其他线程再去执行类似 2 处的减余的情况.
                // 故出现这种情况的原因目前来看有二, 一是还未介绍的强制关闭 Phaser 的方法, 此时也会突兀地改掉 state 造成 cas 恰巧失败, 二是
                // 出现一些用户做出的奇葩行为, 比如重写了其他公有方法. 我们自然忽略第二种情况,doug 大神也是简单注释了一个 "terminated".
                return (int)(state >>> PHASE_SHIFT); // terminated
            //cas 成功, 释放等待队列中的线程, 返回下一个 phase(因为在此过程中的 register 会等到 advance, 此时的 phase 已经是 nextPhase 了).
            releaseWaiters(phase);
            return nextPhase;
        }
        //3. 减余失败说明出现竞态, 直接开启下轮循环重新减余.
    }
}


// 等待当前 Phaser 从给定的 phase 前进结束, 如果当前 phase 不等于给定的 phase, 或者 Phaser 已终止立即返回.
//1. 传入 phase 为负, 返回它本身.
//2. 传入的 phase 不是最新的 phase, 返回最新的.
//3. 传入了最新的 phase, 等待到 advance 并返回 advance 后的 phase.
public int awaitAdvance(int phase) {
    final Phaser root = this.root;
    long s = (root == this) ? state : reconcileState();
    int p = (int)(s >>> PHASE_SHIFT);
    if (phase < 0)
        return phase;
    if (p == phase)
        // 匹配成功, 等 root 前进. 参数 node 为 null
        return root.internalAwaitAdvance(phase, null);
    return p;
}


// 参考前面的几个方法, 区别是可扰动.
public int awaitAdvanceInterruptibly(int phase)
    throws InterruptedException {
    final Phaser root = this.root;
    long s = (root == this) ? state : reconcileState();
    int p = (int)(s >>> PHASE_SHIFT);
    if (phase < 0)
        //1. 参数 phase 小于 0 直接返回它本身.
        return phase;
    if (p == phase) {
        //2. 参数 phase 匹配, 回忆一个前面介绍的 QNode, 匹配当前 Phaser 和 phase, 配置为可扰动且不计时.
        QNode node = new QNode(this, phase, true, false, 0L);
        //3. 放入 root 的等待队列阻塞.
        p = root.internalAwaitAdvance(phase, node);
        if (node.wasInterrupted)
            //4. 等待结束, 判断是否是扰动造成的结束, 前面介绍过 QNode 的相关逻辑,
            // 它实现了 ForkJoinPool.ManagedBlocker, 因此在 managedBlock 方法进行时,
            // 会循环调用问询是否能 release, 当我们配置了可扰动且扰动了, 就会标记这个 wasInterrupted, 释放线程引用并返回.
            // 发现此种情况抛出异常.
            // 同时, 当发现等待成功, 也会结束, 释放线程引用并返回, 但不带有扰动标记.
            throw new InterruptedException();}
    //5. 返回 1 处之前读取的 phase 或 3 处得到的最新 phase 值.
    return p;
}

// 同上方法, 但带有计时.
public int awaitAdvanceInterruptibly(int phase,
                                     long timeout, TimeUnit unit)
    throws InterruptedException, TimeoutException {long nanos = unit.toNanos(timeout);
    final Phaser root = this.root;
    long s = (root == this) ? state : reconcileState();
    int p = (int)(s >>> PHASE_SHIFT);
    if (phase < 0)
        return phase;
    if (p == phase) {
        // 不同于上面方法的地方, 建立的 QNode 带有计时和等待时长.
        QNode node = new QNode(this, phase, true, true, nanos);
        p = root.internalAwaitAdvance(phase, node);
        if (node.wasInterrupted)
            // 被扰动的情况.
            throw new InterruptedException();
        else if (p == phase)
            // 时间到了 phase 没有前进, 超时.
            throw new TimeoutException();}
    return p;
}

前面的几个核心方法粗略过完, 补充一些重要内容.

首先在前面曾分析过有线程阻塞等待下一个 phase 的情况, 并没有加上定时等待的考虑. 在超时的情况下, 阻塞的线程可能会收到异常并退出.

建立 QNode 可以限定是否定时和可扰动, 这取决于我们使用哪个方法去 await.

除最后一个线程 arrive 外, 所有线程调用这些方法都会减少一个 arrive 并加入等待队列, 直到 (1) 配置了定时且超时,(2)当前是可扰动等待且使用了 Thread.interrupt(),(3)最后一个线程使用上述方法或 arrive 方法, 使得 Phaser 前进了一个轮次,internalWaitAdvance 结束. 其中 (1)(2) 均会迁成 arrive 线程抛出异常, 只有 (3) 才是正常的情况.

QNode 前面已介绍, 它是一个 blocker, 需要调用 ForkJoinPool::managedBlock 才会起作用(显然 root 的 internalAwaitAdvance 必然与此方法有关联). 当然这个作用与任务是否运行在 ForkJoinPool 无关, 如果等待 phaser 前进的线程是运行在 ForkJoinPool 中的 ForkJoinWorkerThread, 显然会在 internalAwaitAdvance 期间进行补偿. 这一块可参考前面的 ”CompletableFuture 与响应式编程 ” 和 ”ForkJoin 框架之 ForkJoinPool” 两篇文章.

另外, 这些代码也再次说明了 root 的作用: (1)对一切非 root 的 Phaser 进行等待都会用 root 的 internalAwaitAdvance;(2)每次注册或 arrive 一定会同步 root 的最新 phase.

其中 (1) 也间接说明了为什么构建 Phaser 时只有 root 创建等待队列, 所有子 Phaser 共享.

上面还保留了一个疑问, 提到了 ” 强制关闭 Phaser” 造成 arriveAndAwaitAdvance 出现 cas 失败的问题,doug 大神直接注释了一个 terminated, 我们马上来看这一块, 以及一些周边的公共函数, 加深理解, 然后再来解决关于等待队列最后的一些问题.

// 强制关闭 Phaser, 让 Phaser 进入终止态, 但是这个过程不影响它已注册的 parties, 如果此 Phaser 是
// 一个 Phaser 树中的成员, 那么所有 phaser 集中的 Phaser 都会关闭, 如果它已经关闭, 此方法无效. 此方法可以
// 用于若干任务出现意料之外异常的情况下的协调恢复.
public void forceTermination() {
    // Only need to change root state
    final Phaser root = this.root;
    long s;
    // 已是终止态直接忽略.
    while ((s = root.state) >= 0) {
        // 直接尝试给 root 的 state 加上终止位. 显然加上了它, 子 Phaser 在注册和 arrive 等方法同步回新的 phase 就是个负数,
        // 因此更改 root 的 phase 为负相当于判了所有 Phaser 的死刑. 唯一需要解决的是已经阻塞在 root.internalAwaitAdvandce 的线程.
        if (UNSAFE.compareAndSwapLong(root, stateOffset,
                                      s, s | TERMINATION_BIT)) {
            // 加上终止位成功, 先后唤醒偶数等待队列和奇数等待队列.
            releaseWaiters(0); // Waiters on evenQ
            releaseWaiters(1); // Waiters on oddQ
            // 返回
            return;
        }
    }
}


// 返回当前 phase, 直接用 root 的 state 去取.
public final int getPhase() {return (int)(root.state >>> PHASE_SHIFT);
}

// 查询注册的 parties 数量. 调用前面介绍过的 partiesOf
public int getRegisteredParties() {return partiesOf(state);
}

// 查询已经 arrived 的 parties 数量. 调用介绍过的 arriveOf

public int getArrivedParties() {return arrivedOf(reconcileState());
}

// 查询未 arrive 的 parties 数量, 调用前面介绍过的 unarrivedOf
public int getUnarrivedParties() {return unarrivedOf(reconcileState());
}

// 返回 parent
public Phaser getParent() {return parent;}

// 返回 root
public Phaser getRoot() {return root;}

// 判断当前 Phaser 是否终止, 直接取 root 的 state 是否为负, 可见, 终止态完全取决于 root.
public boolean isTerminated() {return root.state < 0L;}

这些方法都比较简单, 只有 forceTermination 需要再强调一翻, 前面介绍 arrayAndAwaitAdvance 时曾提过在减去最后一个 unarrived 信号后去 cas 到下一个 phase 失败的情况,doug 大神简单注释了一句 terminated, 直接返回了当前的 phase(显然只能是负), 在周边方法重重加锁的前提下, 那一次 cas 的失败唯一一处就是强制关闭, 因为它只改关闭标记位, 相当于动了 phase, 而没有动 unarrived 标记位和 parties 标记位. 所以重写 Phaser 的方法要谨慎, 很可能不小心打破了这个封装.

从上面的有关方法可以看出, 子 Phaser 的终止态严重依赖于 root, 目前可以确定的是 root 的 phase 一旦表现出终止态, 所有新来的注册,arrive,arrive 并 await 将会立即返回, 唯一需要关注的就是 root 被设置了终止标记后, 正陷入等待的线程怎么办的问题.

我们下面就来看 Phaser 的等待机制, 这里面又能见到道格大神非常有趣的玩法.

// 工具方法, 移除某个 phase 的等待者.
private void releaseWaiters(int phase) {
    QNode q;  // 保存队列中的队首
    Thread t;  // 保存线程引用.
    // 取队列, 用 phase 的奇偶决定,phase 是偶数就取偶数队列, 否则取奇数队列. 而这个 phase 其实只用来取队列了, 后续的操作与它无关.
    AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
    // 循环, 找出所有 phase 不等于 root 的 phase 的(其实 root 是最大的, 所以就是找出非最新 phase 加入进来的 waiter QNode)
    while ((q = head.get()) != null &&
           q.phase != (int)(root.state >>> PHASE_SHIFT)) {
        // 找出了, 利用原子引用将 head 指向 next.
        if (head.compareAndSet(q, q.next) &&
            (t = q.thread) != null) {// 发现阻塞者, 唤醒线程. 回忆下前面实现 blocker 方法中的 isReleaseble 和 block 方法都有将线程置空的操作.(三种情况, 唤醒扰动超时都会置空)
            // 但是那些方法并没有将代表该阻塞线程的 QNode 移除队列, 因此可能会发现 thread 已经是 null(代表无阻塞者)的情况, 只需要移除队列即可.
            q.thread = null;
            LockSupport.unpark(t);
        }
    }
}


// 上面 releaseWaiters 方法的一个变种, 但它只会处理遍历过程中位于头部的元素, 出现正常的等待节点就会立即返回.
// 此方法在这一块可以有效的减少内存的占用. 退出时返回当前的 phase.
private int abortWait(int phase) {
    // 同样, 参数 phase 只是用来选择要处理的队列.
    AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
    for (;;) {
        Thread t;
        QNode q = head.get();
        // 计算最新 phase 的值 p
        int p = (int)(root.state >>> PHASE_SHIFT);
        if (q == null || ((t = q.thread) != null && q.phase == p))
            //1. 出现 q 为 null 代表整队列元素已出队, 直接返回 p;
            // 或者在出队过程中 head(q)记录的线程引用还在, 说明未超时或扰动, 且是本 phase 的等待节点, 终止循环并返回最新 phase.
            return p;
        if (head.compareAndSet(q, q.next) && t != null) {
            // 进入条件, 参考 1 的条件, 因为 1 会直接返回. 故进入 2 的条件其实是 q 非空且处于旧的 phase. 只有这种情况才可以出队.
            //2. 将 q 出队, 置空线程引用, 释放线程.
            q.thread = null;
            LockSupport.unpark(t);
        }
    }
}

// 计算有效 cpu, 控制自旋.
private static final int NCPU = Runtime.getRuntime().availableProcessors();

// 常量, 每轮 arrive 等待的字旋数, 取决于 NCPU, 小于 2 则取 1, 不小于 2 取 2 的 8 次幂.
static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;


// 珊珊来迟的内部等待方法. 它可能会一直阻塞到 phase 的 advance 发生(除非取消了等待).
// 此方法仅限 root 调用. 参数 phase 表示当前的 phase, 参数 node 表示等待节点, 用于追踪节点的扰动或超时.
// 如果是 null, 表示是一次不可扰动的等待. 返回值为当前最新的 phase.
private int internalAwaitAdvance(int phase, QNode node) {
    // 1. 调用 releaseWaiters, 传入参数 phase 的前一个 phase, 显然这只是决定释放哪一个队列. 参数绝对实时准确的情况下会先将老的队列释放掉.
    releaseWaiters(phase-1); 
    // 节点入队标记, 入队了就会变为 true
    boolean queued = false;   
    // 记录每一轮循环的 unarrived 数量, 用于决定是否扩增自旋等待次数.
    int lastUnarrived = 0; 
    // 自旋数, 参考上面的计算逻辑.
    int spins = SPINS_PER_ARRIVAL;
    long s;
    int p;
    // 开启循环, 直到 phase 前进为止或内部判断已取消等待.
    while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
        //2. 传入 node 是 null, 即非可扰动的模式逻辑. 只有非可扰动模式才有自旋.
        if (node == null) {//2.1 每轮自读进入都会尝试计算新的 unarrived, 如果发现出现了变动(变大或者变小),
            // 会将它保存到前面的 lastUnarrived.        
            int unarrived = (int)s & UNARRIVED_MASK;
            if (unarrived != lastUnarrived &&
                (lastUnarrived = unarrived) < NCPU)
                // 发现新变化的 unarrived<NCPU, 扩增自旋次数, 继续自旋.
                //unarrived 的变化, 若没有大量新的 parties 注册, 会在自旋过程中变小, 反之大量加入注册, 大于了 NCPU 则放弃增加自旋次数.
                spins += SPINS_PER_ARRIVAL;
            //2.2, 未发现本轮循环 unarrived 发生变化, 或者增加了大量注册, 造成大于 NCPU 的逻辑, 首先记录此时的线程扰动状态.
            boolean interrupted = Thread.interrupted();
            //2.3 接 2.2, 如果发现了线程被扰动了, 或者经若干次自旋减少次数, 自旋次数并未能在 2.1 进行增加, 直至减为 0, 进入 if.
            if (interrupted || --spins < 0) { // need node to record intr
                //2.4, 满足 2.3 进入 if 的条件, 不再继续自旋了, 因为参数没有提供 node, 此处初始化一个 node, 不定时, 不可扰动, 并保存扰动状态.
                // 下轮循环将无法进入 2.
                node = new QNode(this, phase, false, false, 0L);
                node.wasInterrupted = interrupted;
            }
        }
        //3. 参数传入了 node, 或者在 2.4 进入了 node 的初始化, 每一轮循环到此都先判断是否可释放(若可以, 内部会置 thread 为 null).
        else if (node.isReleasable()) 
            // 发现 node 所处的 phase 已经达到或者取消了, 则 break 掉循环.
            break;
        //4. 未能在非扰动模式下自旋解决 (2) 或提前发现 node 的扰动且未将 node 入队的情况下, 将 node 入队.
        else if (!queued) { 
            // 选择当前 phase 代表的队列.
            AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
            QNode q = node.next = head.get();
            // 这一行不起眼的 if 条件代码真的是一个悄无声息解决了一个大暗坑的地方, 后面说.
            if ((q == null || q.phase == phase) &&
                (int)(state >>> PHASE_SHIFT) == phase) 
                //double check 避免脏入队, 入队条件是 (1) 无头,(2)或者头元素的 phase 等于参数 phase(因为相邻的两个 phase 绝对不会入同一个队).
                // 满足 (1)(2) 的同时, 还要满足(3), 参数 phase 就是当前的 state 表示的 phase(因为此方法只能 root 使用, 故为 root 表示的最新 phase).
                // 条件满足, 入队, 取代原来的 head, 原来 head 代表的 node 成为 node 的 next. 而条件不满足进入下一循环, 很可能 while 条件就不满足了退出循环.
                queued = head.compareAndSet(q, node);
        }
        //5. 已经在某一轮循环入队了, 使用 ForkJoinPool 的 managedBlock 管理 block, 其间可能会释放线程引用.
        else {
            try {//5.1 它内部也有循环, 且会调用前面看到过的 isReleasable 和 block 实现, 显然它一旦结束(包含扰动), 一定会造成下轮外循环终止于 3 处.
                ForkJoinPool.managedBlock(node);
            } catch (InterruptedException ie) {
                //5.2 出现扰动异常 catch 住, 并保存. 下轮循环也会终止在 3 处.
                node.wasInterrupted = true;
            }
        }
    }
    //6. 走出上面的 while 循环, 可能是 root 已经 advance 到下一个 phase(2 前的循环), 也可能是传入 node 的情况下出现了扰动或超时 (5) 造成 (3) 满足
    if (node != null) {//6.1node 存在代表可能已经压入队列, 结果要么是已出现扰动或超时(方法结束后会抛出异常), 要么是已正常完成.
        // 显然, 代码执行到此处就要返回了, 阻塞的线程会抛出异常结束 (超时或扰动) 或继续执行(正常 advance),
        // 没有必要去尝试唤醒能执行出前面 while 循环到达 6 马上要返回的线程.
        if (node.thread != null)
            //6.2 取消 node 中的线程引用, 避免外面的线程尝试唤醒.
            node.thread = null;       // avoid need for unpark()
        if (node.wasInterrupted && !node.interruptible)
            //6.3 如果 node 本身设置了不可被扰动, 但 5.2 处判断线程本身抛出了扰动异常, 却被 catch 住了, 此处扰动本线程.
            Thread.currentThread().interrupt();
        if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
            //6.4 发现 phase 并未前进. 还是参数传入的 pahse, 说明一定是扰动或超时的结果,abortWait 对本 phase 使用的队列进行清理,
            // 而清理的目标前面已论述过, 是本队列头部开始的早于本 phase 的元素.(发现一个不满足条件的就停止了清理).
            return abortWait(phase); // possibly clean up on abort
    }
    //7. 退出上面的 while 循环一定会到此帮助释放早于最新阶段的 waiter. 注意, 是早于最新 phase 的, 参数 phase 只是决定了选哪个队列(奇偶).
    // 如果是 6.4 代表的那种扰动超时情况, 此处其实释放的是旧的结果. 被唤醒的线程其实一般是执行在 5.1 处阻塞的. 当前线程能运行到此绝对不需要唤醒.
    releaseWaiters(phase);
    return p;
}

到此 Phaser 的代码解析已完毕, 我们来分析关于队列, 等待和唤醒的问题.

1.Phaser 维护了两个 ” 队列 ”, 不论加入等待队列还是弹出等待队列, 都是从头部进行, 新加入的成员会成功队列的新头, 原来的头会成为它的 next, 弹出时 next 成为新头. 所以相当于一个对头部的 ” 后进先出 ”, 考虑官方起名和注释, 我们依旧保持队列这个称呼.

2. 唤醒时, 会从队列的头部依次弹出 node 的 phase 早于 root 的最新 phase 的 node,.

3. 等待时, 入队的 node 成为新的头.

4. 当轮次增加时, 会使用和本轮不同的队列增加元素, 同时也会唤醒本轮中等待的 node.

因为唤醒和等待同时进行, 且各自操作各自的队列(不同的 phase), 因此彼此之间没有竞态(尽管一个是头入一个是头出), 可以说设计巧妙, 下面我们来脑洞大开, 思考一个极端情况.

我们假设一种极端的 phase 切换场景, 奇数 phase 大量等待入队, 偶数 phase 则迅速完成. 假设当前 phase 对应的队列是奇数对列, 轮次提升完成后, 它去释放当前的队列元素, 结果未等这个释放操作执行完毕, 偶数队列的轮次很快执行完, 奇数队列中积压了成千上万个 node 未能释放, 轮次却又切回到了奇数队列, 会出现什么事?

显然奇数队列如果一直保持这种极端场景, 它会越来越庞大, 逼近撑爆内存的同时, 大量线程也会得不到释放, 甚至于老一轮的线程需要等待新一轮的线程去释放. 为什么老一轮的线程会去等待新一轮的线程释放呢?

releaseWaiter 的方法我们已经看出, 它只会释放 phase 早于最新的 node, 此时最新压入的元素属于当前最新的 phase, 显然不满足条件, 那么会造成奇数队列中两轮前压入的元素不能得到清除, 两轮前就在释放当时积压 node 的线程 (那一轮最后一个 arrive) 发现不符合清理条件, 就直接 return 并终止了, 只能等待本轮最后一个 arrive 出现后继续进行释放. 如果本轮最后一个 arrive 出现很晚, 在下一轮依旧保持如此极端, 往返数轮, 确实会导致奇数队列中积压大量 node, 且第一轮就在等待该轮次结束的线程早就满足了释放条件(升到了 2 轮), 事实上可能是第 n 轮才得到释放, 这还符合 Phaser 的定义吗? 我们使用它, 就是要保证每一轮单独使用, 每一轮次达到条件, 线程释放并执行, 下一轮次是下一轮次.

然而 doug 的代码就是这个样子, 想遍各种极端, 觉得可能找到了 bug, 那么就需要仔细思考了. 作者来简述一下这个趟坑的分析过程.

这个问题确实已经得到了极大的规避了, 毕竟是个极端情况.

1. 线程的唤醒真的很快, 尽管此处除了唤醒还包含了原子引用的更新(每次出队都要 cas).

2. 如果没有注册, 显然就没有 arrive 相关的情况, 尽管可以单独调用, 但必须保证在 arrive 时传入的数量此时已经注册了, 因此每一轮次 (phase) 中可能积压等待唤醒的线程的操作一定是在注册之后, 但是我们回忆一下, 注册方法的第一步就是要等待完成 advance, 而且传给 internalAwaitAdvance 的 node 会是 null, 即不能扰动和超时, 所以当本轮次阻塞了一定数量的线程后, 如果不去 arrive, 也不考虑超时和扰动的情况, 那么线程将一直阻塞. 我们不可能在轮次 advance 前进行注册, 也就不可能在 advance 之前进行新一 phase 的 arrive.

3. 当本轮次的最后一个 arrive 线程触发了轮次的更新后, 才可以开启注册以及新轮次的 arrive, 但是此时使用了另一个等待队列, 而触发了轮次更新的上一轮的 arrive 线程将会立即进行前一个队列中积压的线程的唤醒操作. 只有该唤醒操作足够慢, 且新的轮次极快就完成了的情况, 才可能造成在原 arrive 线程未能及时释放奇数队列的情况下, 新一轮次再次向其中添加元素.

4. 最重要的还在上面的 internalAwaitAdvance 方法, 那一段被作者标上了入队条件的注释处, 要想入队, 必须 if ((q == null || q.phase == phase) && 加上后面的条件, 而这两个条件的限定已经很明显, 要想入队, 必须满足该等待队列没有元素或者队首是本轮的元素, 而该方法又是下一轮首次注册时必须等待完成的, 下一轮的 arrive 又必须发生在下一轮的首次注册之后, 因此根本不会出现本轮 wait 的线程还要等下一轮甚至下 N 轮的线程去释放的极端情况, 哪怕真的去做一个极端测试: 让奇数轮大量积压线程, 让偶数轮快速切换, 然后测试第一轮压入的线程到底是不是本轮释放的.(作者差点就要写单元测试去做这个极端测试了!)

这一段不经意的 if, 一个小小的条件, 如果不注意真的忽略了, 小代码大功效, 谁能想到, 这么深的暗坑就这样被规避了.

总结

前面已经详述了 Phaser 的源码以及若干趟坑辛路. 其实已经没什么好总结的了, 就在此顺便对比常见同步器 CyclicBarrier,CountDownLatch,Semaphore 的特征和实现.

从使用特征上看:

1.CountDownLatch 是一次性的, 只能初始化决定 parties 数量, 等待者可以是多个, 每次释放都会减少一个信号量, 直到归 0 时为止, 最后一个释放者将唤醒其他等待的线程. 它也不能继续使用.

2.CyclicBarrier 是可重用的, 分代的, 每一代之间彼此独立, 但是每一代的初始 parties 是相同的, 不可在运行期内动态调整, 每一代最后一个线程会去开启一下代, 并可以在此时运行一个用户指定的 action, 与此同时唤醒其他线程继续执行. 它可以在运行完一代后继续被使用. 并且它还支持重置.

3.Semaphore 是一个资源量的典型, 如果说 CountDownLatch 和 CyclicBarrier 或者 Phaser 都是等到 ” 人够了 ” 再放行,Semaphore 却是起到限流的作用, 它控制了有限的令牌数, 这个数量不可以动态地更改, 在不能 acquire 到足够的令牌数时, 线程将阻塞, 直到其他线程释放了足量的令牌数并唤醒它为止. 每一个持有了令牌的线程都可以唤醒阻塞等待获取的线程.

4.Phaser 的功能上不同很明显, 首先它的参与者数量几乎时刻可变(除了正在进入下一 phase 期间), 随时可以增加减少 parties 数量, 每一 phase 等待者可以是多个, 每一 phase 中, 每个能从 internalAwaitAdvance 方法中走出循环的线程都可以帮助唤醒, 当然最终能进入唤醒操作还是要归功于最后一个 arrive 的线程(尽管它 arrive 后其他线程醒来后也会帮助唤醒).Phaser 的唤醒者不一定是参与者.

从实现来看:

1.CountDownLatch 借助了 aqs 来实现 parties 的释放, 它使用 cas+park 的方式, 不使用 Lock.

2.CyclicBarrier 需要借助重入锁和 condition, 每一个 await 的线程都要全局加锁, 阻塞时 await 在 condition 上.

3.Semaphore 在实现上类似 CountDownLatch, 也是基于 aqs, 只不过它允许获取和释放, 对 state 有增有减, 总量不变. 也是 cas+park 的方式阻塞, 也不使用 Lock

4.Phaser 因为功能的要求, 不基于 AQS(它不能有构建时就固定的 state, 尽管可以初始化一个 state, 但它必须支持改变), 它依托于原子引用实现了一个内部的队列, 相应的等待 / 入队 / 唤醒等操作通过 cas 自旋 +park 的方式, 同样不使用 Lock. 并利用双队列的方式规避了前一轮的释放和后一轮的响醒的阻塞.

此外还有两点结合前面的推理和自测验证的结论:

1.Phaser 中的每一个 phase 是保证了可见性的, 经作者自测, 在任何使用 Phaser 的代码中 await 前后, 不会出现串 phase 读出的乱序情况(侧面说明每个 phase 不会依赖后一个或几个 phase 的释放).

2.Phaser 需要对 await 的线程进行阻塞时, 是将它打包成一个 node(blocker), 利用 ForkJoinPool 来 block 的. 如果使用 Phaser 同步的任务是运行在 ForkJoinPool 中的, 它将会利用到相应的补偿机制, 经作者自测, 这将保证 Phaser 中 block 的每一个任务必然得到执行, 每一个阻塞的线程必然得到释放.

正文完
 0