乐趣区

关于java:阅读-JDK-8-源码AQS-中的共享模式

AbstractQueuedSynchronizer,简称 AQS,是一个用于构建锁和同步器的框架。
上一篇文章 介绍了 AQS 的数据结构和独占模式的实现原理,本篇介绍 AQS 共享模式的实现原理。

1. 共享模式

独占模式下,只有有一个线程占有锁,其余线程试图获取该锁将无奈取得成功。
共享模式下,多个线程获取某个锁可能(但不是肯定)会获得成功。

1.1 获取锁 -acquireShared

共享模式下获取锁 / 资源,忽视中断

java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireShared

public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
  1. tryAcquireShared:获取共享锁 / 资源,获取失败则进入下一步。
  2. doAcquireShared:进入同步队列中期待获取锁 / 资源。

1.1.1 tryAcquireShared

尝试获取资源,具体资源获取形式交由自定义同步器实现。

java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquireShared

protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();
}

对于返回值:

  • 正数:获取资源失败,筹备进入同步队列;
  • 0:获取资源胜利,但没有残余可用资源;
  • 负数:获取资源胜利,能够唤醒下一个期待线程;

1.1.2 doAcquireShared

进入同步队列,自旋判断是否能获取锁,否则进入阻塞。

/**
 * Acquires in shared uninterruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED); // 在队列中退出共享模式的节点
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {final Node p = node.predecessor();
            if (p == head) {int r = tryAcquireShared(arg);    // 如果上一个节点是头结点,则尝试获取共享资源,返回残余的资源数量
                if (r >= 0) {setHeadAndPropagate(node, r); // 设置以后节点为新的头节点(dummy node),并唤醒后继共享节点
                    p.next = null; // help GC     // 旧的头节点出队
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) && // 上一个节点不是头节点,须要判断是否进入阻塞:1. 不能进入阻塞,则重试获取锁。2. 进入阻塞
                parkAndCheckInterrupt())                 // 阻塞以后线程。当从阻塞中被唤醒时,检测以后线程是否已中断,并革除中断状态。接着持续重试获取锁。interrupted = true;                      // 标记以后线程已中断
        }
    } finally {if (failed)
            cancelAcquire(node);
    }
}

共享模式的 doAcquireShared 办法,与独占模式的 acquireQueued 相似,节点退出同步队列之后进行自旋,执行两个判断:

  1. 是否获取锁
  2. 是否进入阻塞

不同的中央是:

  • acquireQueued 中应用 setHead 设置头节点;
  • doAcquireShared 应用 setHeadAndPropagate 设置头节点之后,须要判断是否唤醒后继节点。

也就是说:

  • 共享模式,获取锁胜利,或者开释锁胜利,都须要告诉后继节点。
  • 独占模式,开释锁胜利,才须要告诉后继节点。

setHeadAndPropagate

将以后节点设置为新的头节点。
如果共享资源有亏损,唤醒后续期待中的共享节点。

/**
 * Sets head of queue, and checks if successor may be waiting
 * in shared mode, if so propagating if either propagate > 0 or
 * PROPAGATE status was set.
 *
 * @param node the node
 * @param propagate the return value from a tryAcquireShared    // 共享资源的残余数量
 */
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    // 判断是否唤醒后继节点
    if (propagate > 0 || h == null || h.waitStatus < 0 || // 若无残余资源,则校验旧的头节点 h 的状态(PROPAGATE 或 SIGNAL,均 <0)(h = head) == null || h.waitStatus < 0) {         // 若其余线程批改了 head,取新 head 作为前继节点来校验
        Node s = node.next;
        if (s == null || s.isShared()) // node.next != null 时,这里限度了只会唤醒共享节点!doReleaseShared(); // 唤醒后继节点}
}

如果满足下列条件能够尝试唤醒下一个节点:

  1. 有残余资源(propagate > 0),或者头节点的状态是 PROPAGATE(waitStatus < 0)
  2. 后继节点是期待中的共享节点,或者后继节点为空

可能会造成不必要的唤醒,然而个别产生在大量地抢夺 acquires/releases 之时,而这种状况下,线程早晚都会被唤醒。

doReleaseShared

唤醒后继节点(共享模式下,以后线程获取锁胜利、开释锁之后,都可能会调用该办法)。

留神:头节点是共享节点,然而这个办法不会辨别后继节点是否是共享节点。

java.util.concurrent.locks.AbstractQueuedSynchronizer#doReleaseShared

/**
 * Release action for shared mode -- signals successor and ensures
 * propagation. (Note: For exclusive mode, release just amounts
 * to calling unparkSuccessor of head if it needs signal.)
 */
// 共享模式下的 release 操作:在足够的资源下,唤醒后继节点,流传信息(资源亏损,可共享)// 互斥模式下的 release 操作:只会唤醒队列头部须要唤醒的一个后继节点(见 AbstractQueuedSynchronizer#unparkSuccessor)private void doReleaseShared() {for (;;) {
        Node h = head;
        if (h != null && h != tail) {    // 头节点不为空,且具备后继节点
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {     // 如果头节点状态是 SIGNAL,尝试改为 0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases // CAS 失败,从新自旋
                unparkSuccessor(h);      // 唤醒 head 的后继节点
            }
            else if (ws == 0 &&          // 如果头节点状态是 0,尝试改为 PROPAGATE。!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 
                continue;                // loop on failed CAS // CAS 失败,从新自旋
        }
        if (h == head)                   // loop if head changed // 校验头节点是否发生变化,若变动了则从新校验最新头节点的状态
            break;
    }
}

代码流程:

  1. 每次自旋都获取最新的头节点 head,如果 head 不为空,且具备后继节点,则进入下一步判断。
  2. SIGNAL → 0:如果头节点状态是 SIGNAL(阐明后继节点阻塞中,期待唤醒):
    ① 改为 0 失败,从新自旋;
    ② 改为 0 胜利,唤醒后继节点,进入第 4 步。
  3. 0 → PROPAGATE:如果头节点状态是 0(后继节点自旋中未阻塞,或者后继节点已勾销):
    ① 改为 PROPAGATE 失败,从新自旋;
    ② 改为 PROPAGATE 胜利,进入第 4 步。
  4. 批改头节点状态胜利(SIGNAL → 0 或 0 → PROPAGATE),如果该过程中头节点没有发生变化,完结自旋。

对于 PROPAGATE 状态

为什么 以后节点状态由 0 改为 PROPAGATE 失败,须要持续自旋?

  • 后继节点在同步队列中自旋时,执行 shouldParkAfterFailedAcquire 看到前继节点状态是 0 或 PROPAGATE,都会改为 SIGNAL。
  • 所以这里 CAS 失败,可能是后继节点的批改造成的,须要从新校验以后节点状态。
  • 若下一次查看到以后节点状态为 SIGNAL,即可唤醒后继节点。

为什么 以后节点状态由 0 改为 PROPAGATE 胜利,就不再唤醒后继节点了呢?

  1. 只有 SIGNAL 才须要被动唤醒后继节点。
  2. 以后节点的状态从 0 设为 PROPAGATE,此时后继节点可能是在同步队列中自旋中,并未阻塞,无需唤醒;也有可能后继节点已勾销,也无需唤醒。
  3. 以后节点设置状态为 PROPAGATE 之后,若处于自旋之中的后继节点获取锁胜利(见 doAcquireShared)之后,因为头节点状态为 PROPAGATE < 0(见 setHeadAndPropagate),会持续唤醒向下一个节点。

1.2 开释锁 -releaseShared

共享模式下开释锁 / 资源

java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) { // 开释共享锁 / 资源
        doReleaseShared(); // 开释锁 / 资源胜利,唤醒队列中的期待节点
        return true;
    }
    return false;
}
  1. tryReleaseShared:尝试开释共享锁 / 资源,开释胜利则进入下一步。
  2. doReleaseShared:开释锁 / 资源胜利,唤醒队列中的期待节点。

java.util.concurrent.locks.AbstractQueuedSynchronizer#tryReleaseShared

protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();
}

相干浏览:
浏览 JDK 8 源码:AQS 中的独占模式
浏览 JDK 8 源码:AQS 中的共享模式
浏览 JDK 8 源码:AQS 对 Condition 的实现

作者:Sumkor

退出移动版