本文首发于微信公众号【WriteOnRead】,欢送关注。
1. 概述
前文「JDK源码剖析-AbstractQueuedSynchronizer(2)」剖析了 AQS 在独占模式下获取资源的流程,本文剖析共享模式下的相干操作。
其实二者的操作大部分是相似的,了解了后面对独占模式的剖析,再剖析共享模式就绝对容易了。
2. 共享模式
2.1 办法概述
与独占模式相似,共享模式下也有与之类似的相应操作,别离如下:
- acquireShared(int arg): 以共享模式获取资源,疏忽中断;
- acquireSharedInterruptibly(int arg): 以共享模式获取资源,响应中断;
- tryAcquireSharedNanos(int arg, long nanosTimeout): 以共享模式获取资源,响应中断,且有超时期待;
- releaseShared(int arg): 开释资源,唤醒后继节点,并确保流传。
它们的操作与独占模式也比拟相似,上面具体分析。
2.2 办法剖析
2.2.1 共享模式获取资源(疏忽中断)
acquireShared:
public final void acquireShared(int arg) { // 返回值小于 0,示意获取失败 if (tryAcquireShared(arg) < 0) doAcquireShared(arg);}// 尝试以共享模式获取资源(返回值为 int 类型)protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException();}
与独占模式的 tryAcquire 办法相似,tryAcquireShared 办法在 AQS 中也抛出异样,由子类实现其逻辑。
不同的中央在于,tryAcquire 办法的返回后果是 boolean 类型,示意获取胜利与否;而 tryAcquireShared 的返回后果是 int 类型,别离为:
- 正数:示意获取失败;
- 零:示意获取胜利,但后续共享模式的获取会失败;
- 负数:示意获取胜利,后续共享模式的获取可能会胜利(须要进行检测)。
若 tryAcquireShared 获取胜利,则间接返回;否则执行 doAcquireShared 办法:
private void doAcquireShared(int arg) { // 把以后线程封装成共享模式的 Node 节点,插入主队列开端 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { // 中断标记位 boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 若前驱节点为头节点,则尝试获取资源 if (p == head) { int r = tryAcquireShared(arg); // 这里示意以后线程胜利获取到了资源 if (r >= 0) { // 设置头节点,并流传状态(留神这里与独占模式不同) setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 是否应该休眠(与独占模式雷同,不再赘述) if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) // 勾销操作(与独占模式雷同) cancelAcquire(node); }}
doAcquireShared 办法会把以后线程封装成一个共享模式(SHARED)的节点,并插入主队列开端。addWaiter(Node mode) 办法前文曾经剖析过,不再赘述。
该办法与 acquireQueued 办法的区别在于 setHeadAndPropagate 办法,把以后节点设置为头节点之后,还会有流传(propagate)行为:
private void setHeadAndPropagate(Node node, int propagate) { // 记录旧的头节点 Node h = head; // Record old head for check below // 将 node 设置为头节点 setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; // 后继节点为空或共享模式唤醒 if (s == null || s.isShared()) doReleaseShared(); }}
doReleaseShared:
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { // 这里的头节点曾经是下面设置后的头节点了 Node h = head; // 因为该办法有两个入口(setHeadAndPropagate 和 releaseShared),需思考并发管制 if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 唤醒后继节点 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 若头节点不变,则跳出循环;否则持续循环 if (h == head) // loop if head changed break; }}
该办法与独占模式下的获取办法 acquire 大体类似,不同在于该办法中,节点获取资源后会流传状态,即,有可能会持续唤醒后继节点。值得注意的是:该办法有两个入口 setHeadAndPropagate 和 releaseShared,可能有多个线程操作,需思考并发管制。
此外,自己对于将节点设置为 PROPAGATE 状态的了解还不是很清晰,网上说法也不止一种,待后续钻研明确再补充。
2.2.2 以共享模式获取资源(响应中断)
该办法与 acquireShared 相似:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg);}
tryAcquireShared 办法后面已剖析,若获取资源失败,则会执行 doAcquireSharedInterruptly 办法:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 把以后线程封装成共享模式节点,并插入主队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 与 doAcquireShared 相比,区别在于这里抛出了异样 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}
从代码能够看到,acquireSharedInterruptibly 办法与 acquireShared 办法简直齐全一样,不同之处仅在于前者会抛出 InterruptedException 异样响应中断;而后者仅记录标记位,获取完结后才响应。
2.2.3 以共享模式获取资源(响应中断,且有超时)
代码如下(该办法可与前文独占模式下的超时获取办法比拟剖析):
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout);}
doAcquireSharedNanos:
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); }}
该办法可与独占模式下的超时期待办法 tryAcquireNanos(int arg, long nanosTimeout) 进行比照,二者操作基本一致,不再详细分析。
2.2.4 开释资源,唤醒节点,流传状态
如下:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false;}
tryReleaseShared:
protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException();}
doReleaseShared() 办法后面曾经剖析过了。本办法与独占模式的 release 办法相似,不同的中央在于“流传”二字。
3. 场景剖析
为了便于了解独占模式和共享模式下队列和节点的状态,上面简要举例剖析。
场景如下:有 T0~T4 共 5 个线程按先后顺序获取资源,其中 T2 和 T3 为共享模式,其余均为独占模式。
就此场景剖析:T0 先获取到资源(假如占用工夫较长),而后 T1~T4 再获取则失败,会顺次进入主队列。此时主队列中各个节点的状态示意图如下:
之后,T0 操作结束并开释资源,会将 T1 唤醒。T1(独占模式) 会从 acquireQueued(final Node node, int arg) 办法的循环中持续获取资源,这时会获取胜利,并将 T1 设置为头节点(T 被移除)。此时主队列节点示意图如下:
此时,T1 获取到资源并进行相干操作。
而后,T1 操作完开释资源,并唤醒下一个节点 T2,T2(共享模式) 持续从 doAcquireShared(int) 办法的循环中执行。此时 T2 获取资源胜利,将本身设为头节点(T1 被移除),因为后继节点 T3 也是共享模式,因而 T1 会持续唤醒 T3;T3 唤醒后的操作与 T2 雷同,但后继节点 T4 不是共享模式,因而不再持续唤醒。此时队列节点状态示意图如下:
此时,T2 和 T3 同时获取到资源。
之后,当二者都开释资源后会唤醒 T4:
T4 获取资源的与 T1 相似。
PS: 该场景仅供参考,只为便于了解,若有不当之处敬请斧正。
4. 小结
本文剖析了以共享模式获取资源的三种形式,以及开释资源的操作。别离为:
- acquireShared: 共享模式获取资源,疏忽中断;
- acquireSharedInterruptibly: 共享模式获取资源,响应中断;
- tryAcquireSharedNanos: 共享模式获取资源,响应中断,有超时;
- releaseShared: 开释资源,唤醒后继节点,并确保流传。
并简要剖析一个场景下主队列中各个节点的状态。此外,AQS 中还有嵌套类 ConditionObject 及条件队列的相干操作,前面波及到的时候再进行剖析。
独自去剖析 AQS 的源码比拟干燥,后文会联合 ReentrantLock、CountdownLatch 等罕用并发工具类的源码进行剖析。
上述解析是参考其余材料及集体了解,若有不当之处欢送斧正。
相干浏览:
JDK源码剖析-AbstractQueuedSynchronizer(1)
JDK源码剖析-AbstractQueuedSynchronizer(2)