共计 4571 个字符,预计需要花费 12 分钟才能阅读完成。
AbstractQueuedSynchronizer,简称 AQS,是一个用于构建锁和同步器的框架。
上一篇文章 介绍了 AQS 的数据结构和独占模式的实现原理,本篇介绍 AQS 共享模式的实现原理。
1. 共享模式
独占模式下,只有有一个线程占有锁,其余线程试图获取该锁将无奈取得成功。
共享模式下,多个线程获取某个锁可能(但不是肯定)会获得成功。
1.1 获取锁 -acquireShared
共享模式下获取锁 / 资源,忽视中断
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireShared
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
- tryAcquireShared:获取共享锁 / 资源,获取失败则进入下一步。
- doAcquireShared:进入同步队列中期待获取锁 / 资源。
1.1.1 tryAcquireShared
尝试获取资源,具体资源获取形式交由自定义同步器实现。
java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquireShared
protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();
}
对于返回值:
- 正数:获取资源失败,筹备进入同步队列;
- 0:获取资源胜利,但没有残余可用资源;
- 负数:获取资源胜利,能够唤醒下一个期待线程;
1.1.2 doAcquireShared
进入同步队列,自旋判断是否能获取锁,否则进入阻塞。
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED); // 在队列中退出共享模式的节点
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {final Node p = node.predecessor();
if (p == head) {int r = tryAcquireShared(arg); // 如果上一个节点是头结点,则尝试获取共享资源,返回残余的资源数量
if (r >= 0) {setHeadAndPropagate(node, r); // 设置以后节点为新的头节点(dummy node),并唤醒后继共享节点
p.next = null; // help GC // 旧的头节点出队
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && // 上一个节点不是头节点,须要判断是否进入阻塞:1. 不能进入阻塞,则重试获取锁。2. 进入阻塞
parkAndCheckInterrupt()) // 阻塞以后线程。当从阻塞中被唤醒时,检测以后线程是否已中断,并革除中断状态。接着持续重试获取锁。interrupted = true; // 标记以后线程已中断
}
} finally {if (failed)
cancelAcquire(node);
}
}
共享模式的 doAcquireShared 办法,与独占模式的 acquireQueued 相似,节点退出同步队列之后进行自旋,执行两个判断:
- 是否获取锁
- 是否进入阻塞
不同的中央是:
- acquireQueued 中应用 setHead 设置头节点;
- doAcquireShared 应用 setHeadAndPropagate 设置头节点之后,须要判断是否唤醒后继节点。
也就是说:
- 共享模式,获取锁胜利,或者开释锁胜利,都须要告诉后继节点。
- 独占模式,开释锁胜利,才须要告诉后继节点。
setHeadAndPropagate
将以后节点设置为新的头节点。
如果共享资源有亏损,唤醒后续期待中的共享节点。
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared // 共享资源的残余数量
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
// 判断是否唤醒后继节点
if (propagate > 0 || h == null || h.waitStatus < 0 || // 若无残余资源,则校验旧的头节点 h 的状态(PROPAGATE 或 SIGNAL,均 <0)(h = head) == null || h.waitStatus < 0) { // 若其余线程批改了 head,取新 head 作为前继节点来校验
Node s = node.next;
if (s == null || s.isShared()) // node.next != null 时,这里限度了只会唤醒共享节点!doReleaseShared(); // 唤醒后继节点}
}
如果满足下列条件能够尝试唤醒下一个节点:
- 有残余资源(propagate > 0),或者头节点的状态是 PROPAGATE(waitStatus < 0)
- 后继节点是期待中的共享节点,或者后继节点为空
可能会造成不必要的唤醒,然而个别产生在大量地抢夺 acquires/releases 之时,而这种状况下,线程早晚都会被唤醒。
doReleaseShared
唤醒后继节点(共享模式下,以后线程获取锁胜利、开释锁之后,都可能会调用该办法)。
留神:头节点是共享节点,然而这个办法不会辨别后继节点是否是共享节点。
java.util.concurrent.locks.AbstractQueuedSynchronizer#doReleaseShared
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
// 共享模式下的 release 操作:在足够的资源下,唤醒后继节点,流传信息(资源亏损,可共享)// 互斥模式下的 release 操作:只会唤醒队列头部须要唤醒的一个后继节点(见 AbstractQueuedSynchronizer#unparkSuccessor)private void doReleaseShared() {for (;;) {
Node h = head;
if (h != null && h != tail) { // 头节点不为空,且具备后继节点
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 如果头节点状态是 SIGNAL,尝试改为 0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases // CAS 失败,从新自旋
unparkSuccessor(h); // 唤醒 head 的后继节点
}
else if (ws == 0 && // 如果头节点状态是 0,尝试改为 PROPAGATE。!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS // CAS 失败,从新自旋
}
if (h == head) // loop if head changed // 校验头节点是否发生变化,若变动了则从新校验最新头节点的状态
break;
}
}
代码流程:
- 每次自旋都获取最新的头节点 head,如果 head 不为空,且具备后继节点,则进入下一步判断。
- SIGNAL → 0:如果头节点状态是 SIGNAL(阐明后继节点阻塞中,期待唤醒):
① 改为 0 失败,从新自旋;
② 改为 0 胜利,唤醒后继节点,进入第 4 步。 - 0 → PROPAGATE:如果头节点状态是 0(后继节点自旋中未阻塞,或者后继节点已勾销):
① 改为 PROPAGATE 失败,从新自旋;
② 改为 PROPAGATE 胜利,进入第 4 步。 - 批改头节点状态胜利(SIGNAL → 0 或 0 → PROPAGATE),如果该过程中头节点没有发生变化,完结自旋。
对于 PROPAGATE 状态
为什么 以后节点状态由 0 改为 PROPAGATE 失败,须要持续自旋?
- 后继节点在同步队列中自旋时,执行 shouldParkAfterFailedAcquire 看到前继节点状态是 0 或 PROPAGATE,都会改为 SIGNAL。
- 所以这里 CAS 失败,可能是后继节点的批改造成的,须要从新校验以后节点状态。
- 若下一次查看到以后节点状态为 SIGNAL,即可唤醒后继节点。
为什么 以后节点状态由 0 改为 PROPAGATE 胜利,就不再唤醒后继节点了呢?
- 只有 SIGNAL 才须要被动唤醒后继节点。
- 以后节点的状态从 0 设为 PROPAGATE,此时后继节点可能是在同步队列中自旋中,并未阻塞,无需唤醒;也有可能后继节点已勾销,也无需唤醒。
- 以后节点设置状态为 PROPAGATE 之后,若处于自旋之中的后继节点获取锁胜利(见 doAcquireShared)之后,因为头节点状态为 PROPAGATE < 0(见 setHeadAndPropagate),会持续唤醒向下一个节点。
1.2 开释锁 -releaseShared
共享模式下开释锁 / 资源
java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) { // 开释共享锁 / 资源
doReleaseShared(); // 开释锁 / 资源胜利,唤醒队列中的期待节点
return true;
}
return false;
}
- tryReleaseShared:尝试开释共享锁 / 资源,开释胜利则进入下一步。
- doReleaseShared:开释锁 / 资源胜利,唤醒队列中的期待节点。
java.util.concurrent.locks.AbstractQueuedSynchronizer#tryReleaseShared
protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();
}
相干浏览:
浏览 JDK 8 源码:AQS 中的独占模式
浏览 JDK 8 源码:AQS 中的共享模式
浏览 JDK 8 源码:AQS 对 Condition 的实现
作者:Sumkor