所谓AQS,指的是AbstractQueuedSynchronizer,它提供了一种实现阻塞锁和一系列依赖FIFO期待队列的同步器的框架,ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等并发类均是基于AQS来实现的,具体用法是通过继承AQS实现其模板办法,而后将子类作为同步组件的外部类。
理解一个框架最好的形式是读源码,说干就干。
AQS是JDK1.5之后才呈现的,由赫赫有名的Doug Lea李大爷来操刀设计并开发实现,全副源代码(加正文)2315行,整体难度中等。
* @since 1.5* @author Doug Lea
根本框架
在浏览源码前,首先论述AQS的根本思维及其相干概念。
AQS根本框架如下图所示:
AQS保护了一个volatile语义(反对多线程下的可见性)的共享资源变量state和一个FIFO线程期待队列(多线程竞争state被阻塞时会进入此队列)。
State
首先说一下共享资源变量state,它是int数据类型的,其拜访形式有3种:
- getState()
- setState(int newState)
- compareAndSetState(int expect, int update)
上述3种形式均是原子操作,其中compareAndSetState()的实现依赖于Unsafe的compareAndSwapInt()办法。
private volatile int state;// 具备内存读可见性语义protected final int getState() { return state;}// 具备内存写可见性语义protected final void setState(int newState) { state = newState;}// 具备内存读/写可见性语义protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update);}
资源的共享形式分为2种:
- 独占式(Exclusive)
只有单个线程可能胜利获取资源并执行,如ReentrantLock。
- 共享式(Shared)
多个线程可胜利获取资源并执行,如Semaphore/CountDownLatch等。
AQS将大部分的同步逻辑均曾经实现好,继承的自定义同步器只须要实现state的获取(acquire)和开释(release)的逻辑代码就能够,次要包含上面办法:
- tryAcquire(int):独占形式。尝试获取资源,胜利则返回true,失败则返回false。
- tryRelease(int):独占形式。尝试开释资源,胜利则返回true,失败则返回false。
- tryAcquireShared(int):共享形式。尝试获取资源。正数示意失败;0示意胜利,但没有残余可用资源;负数示意胜利,且有残余资源。
- tryReleaseShared(int):共享形式。尝试开释资源,如果开释后容许唤醒后续期待结点返回true,否则返回false。
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才须要去实现它。
AQS须要子类复写的办法均没有申明为abstract,目标是防止子类须要强制性覆写多个办法,因为个别自定义同步器要么是独占办法,要么是共享方法,只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。
当然,AQS也反对子类同时实现独占和共享两种模式,如ReentrantReadWriteLock。
CLH队列(FIFO)
AQS是通过外部类Node来实现FIFO队列的,源代码解析如下:
static final class Node { // 表明节点在共享模式下期待的标记 static final Node SHARED = new Node(); // 表明节点在独占模式下期待的标记 static final Node EXCLUSIVE = null; // 表征期待线程已勾销的 static final int CANCELLED = 1; // 表征须要唤醒后续线程 static final int SIGNAL = -1; // 表征线程正在期待触发条件(condition) static final int CONDITION = -2; // 表征下一个acquireShared应无条件流传 static final int PROPAGATE = -3; /** * SIGNAL: 以后节点开释state或者勾销后,将告诉后续节点竞争state。 * CANCELLED: 线程因timeout和interrupt而放弃竞争state,以后节点将与state彻底拜拜 * CONDITION: 表征以后节点处于条件队列中,它将不能用作同步队列节点,直到其waitStatus被重置为0 * PROPAGATE: 表征下一个acquireShared应无条件流传 * 0: None of the above */ volatile int waitStatus; // 前继节点 volatile Node prev; // 后继节点 volatile Node next; // 持有的线程 volatile Thread thread; // 链接下一个期待条件触发的节点 Node nextWaiter; // 返回节点是否处于Shared状态下 final boolean isShared() { return nextWaiter == SHARED; } // 返回前继节点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } // Shared模式下的Node构造函数 Node() { } // 用于addWaiter Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } // 用于Condition Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; }}
能够看到,waitStatus非负的时候,表征不可用,负数代表处于期待状态,所以waitStatus只须要查看其正负符号即可,不必太多关注特定值。
获取资源(独占模式)
acquire(int)
首先解说独占模式(Exclusive)下的获取/开释资源过程,其入口办法为:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}
tryAcquire(arg)为线程获取资源的办法函数,在AQS中定义如下:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}
很显著,该办法是空办法,且由protected润饰,阐明该办法须要由子类即自定义同步器来实现。
acquire()办法至多执行一次tryAcquire(arg),若返回true,则acquire间接返回,否则进入acquireQueued(addWaiter(Node.EXCLUSIVE), arg)办法。
acquireQueued办法分为3个步骤:
- addWriter()将以后线程退出到期待队列的尾部,并标记为独占模式;
- acquireQueued()使线程在期待队列中获取资源,直到获取到资源返回,若整个期待过程被中断过,则返回True,否则返回False。
- 如果线程在期待过程中被中断过,则先标记上,待获取到资源后再进行自我中断selfInterrupt(),将中断响应掉。
上面具体看看过程中波及到的各函数:
tryAcquire(int)
tryAcquire尝试以独占的模式获取资源,如果获取胜利则返回True,否则间接返回False,默认实现是抛出UnsupportedOperationException,具体实现由自定义扩大了AQS的同步器来实现。
addWaiter(Node)
addWaiter为以后线程以指定模式创立节点,并将其增加到期待队列的尾部,其源码为:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 尝试将节点疾速插入期待队列,若失败则执行惯例插入(enq办法) Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 惯例插入 enq(node); return node;}
再看enq(node)办法:
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } }}
能够看到,惯例插入与疾速插入相比,有2点不同:
- 惯例插入是自旋过程(for(;;)),可能保障节点插入胜利;
- 比疾速插入多蕴含了1种状况,即以后期待队列为空时,须要初始化队列,行将待插入节点设置为头结点,同时为尾节点(因为只有一个嘛)。
惯例插入与疾速插入均依赖于CAS,其实现依赖于unsafe类,具体代码如下:
private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update);}private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update);}
unsafe中的cas操作均是native办法,由计算机CPU的cmpxchg指令来保障其原子性。
接着看acquireQueued()办法:
acquireQueued(Node, int)
相干阐明已在代码中正文:
final boolean acquireQueued(final Node node, int arg) { // 标识是否获取资源失败 boolean failed = true; try { // 标识以后线程是否被中断过 boolean interrupted = false; // 自旋操作 for (;;) { // 获取以后节点的前继节点 final Node p = node.predecessor(); // 如果前继节点为头结点,阐明排队马上排到本人了,能够尝试获取资源,若获取资源胜利,则执行下述操作 if (p == head && tryAcquire(arg)) { // 将以后节点设置为头结点 setHead(node); // 阐明前继节点曾经开释掉资源了,将其next置空,以不便虚拟机回收掉该前继节点 p.next = null; // help GC // 标识获取资源胜利 failed = false; // 返回中断标记 return interrupted; } // 若前继节点不是头结点,或者获取资源失败, // 则须要通过shouldParkAfterFailedAcquire函数 // 判断是否须要阻塞该节点持有的线程 // 若shouldParkAfterFailedAcquire函数返回true, // 则继续执行parkAndCheckInterrupt()函数, // 将该线程阻塞并查看是否能够被中断,若返回true,则将interrupted标记置于true if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 最终获取资源失败,则以后节点放弃获取资源 if (failed) cancelAcquire(node); }}
具体看一下shouldParkAfterFailedAcquire函数:
// shouldParkAfterFailedAcquire是通过前继节点的waitStatus值来判断是否阻塞以后节点的线程的private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取前继节点的waitStatus值ws int ws = pred.waitStatus; // 如果ws的值为Node.SIGNAL(-1),则间接返回true // 阐明前继节点实现资源的开释或者中断后,会告诉以后节点的,回家等告诉就好了,不必自旋频繁地来打听音讯 if (ws == Node.SIGNAL) return true; // 如果前继节点的ws值大于0,即为1,阐明前继节点处于放弃状态(Cancelled) // 那就持续往前遍历,直到以后节点的前继节点的ws值为0或正数 // 此处代码很要害,节点往前挪动就是通过这里来实现的,直到节点的前继节点满足 // if (p == head && tryAcquire(arg))条件,acquireQueued办法才可能跳出自旋过程 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 将前继节点的ws值设置为Node.SIGNAL,以保障下次自旋时,shouldParkAfterFailedAcquire间接返回true compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}
parkAndCheckInterrupt()函数则简略很多,次要调用LockSupport类的park()办法阻塞以后线程,并返回线程是否被中断过。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted();}
至此,独占模式下,线程获取资源acquire的代码就跟完了,总结一下过程:
- 首先线程通过tryAcquire(arg)尝试获取共享资源,若获取胜利则间接返回,若不胜利,则将该线程以独占模式增加到期待队列尾部,tryAcquire(arg)由继承AQS的自定义同步器来具体实现;
- 以后线程退出期待队列后,会通过acquireQueued办法基于CAS自旋一直尝试获取资源,直至获取到资源;
- 若在自旋过程中,线程被中断过,acquireQueued办法会标记此次中断,并返回true。
- 若acquireQueued办法获取到资源后,返回true,则执行线程自我中断操作selfInterrupt()。
static void selfInterrupt() { Thread.currentThread().interrupt();}
开释资源(独占模式)
讲完获取资源,对应的讲一下AQS的开释资源过程,其入口函数为:
public final boolean release(int arg) { if (tryRelease(arg)) { // 获取到期待队列的头结点h Node h = head; // 若头结点不为空且其ws值非0,则唤醒h的后继节点 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false;}
逻辑并不简单,通过tryRelease(arg)来开释资源,和tryAcquire相似,tryRelease也是有继承AQS的自定义同步器来具体实现。
tryRelease(int)
该办法尝试开释指定量的资源。
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}
unparkSuccessor(Node)
该办法次要用于唤醒期待队列中的下一个阻塞线程。
private void unparkSuccessor(Node node) { // 获取以后节点的ws值 int ws = node.waitStatus; // 将以后节点的ws值置0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; // 若后继节点为null或者其ws值大于0(放弃状态),则从期待队列的尾节点从后往前搜寻, // 搜寻到期待队列中最靠前的ws值非正且非null的节点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 如果后继节点非null,则唤醒该后继节点持有的线程 if (s != null) LockSupport.unpark(s.thread);}
后继节点的阻塞线程被唤醒后,就进入到acquireQueued()的if (p == head && tryAcquire(arg))的判断中,此时被唤醒的线程将尝试获取资源。
当然,如果被唤醒的线程所在节点的前继节点不是头结点,通过shouldParkAfterFailedAcquire的调整,也会挪动到期待队列的后面,直到其前继节点为头结点。
解说完独占模式下资源的acquire/release过程,上面开始解说共享模式下,线程如何实现资源的获取和共享。
获取资源(共享模式)
了解了独占模式下,资源的获取和开释过程,则共享模式下也就so easy了,首先看一下办法入口:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg);}
执行tryAcquireShared办法获取资源,若获取胜利则间接返回,若失败,则进入期待队列,执行自旋获取资源,具体由doAcquireShared办法来实现。
tryAcquireShared(int)
同样的,tryAcquireShared(int)由继承AQS的自定义同步器来具体实现。
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException();}
其返回值为负值代表失败;0代表获取胜利,但无残余资源;正值代表获取胜利且有残余资源,其余线程可去获取。
doAcquireShared(int)
private void doAcquireShared(int arg) { // 将线程以共享模式增加到期待队列的尾部 final Node node = addWaiter(Node.SHARED); // 初始化失败标记 boolean failed = true; try { // 初始化线程中断标记 boolean interrupted = false; for (;;) { // 获取以后节点的前继节点 final Node p = node.predecessor(); // 若前继节点为头结点,则执行tryAcquireShared获取资源 if (p == head) { int r = tryAcquireShared(arg); // 若获取资源胜利,且有残余资源,将本人设为头结点并唤醒后续的阻塞线程 if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC // 如果中断标记位为真,则线程执行自我了断 if (interrupted) selfInterrupt(); // 表征获取资源胜利 failed = false; return; } } // houldParkAfterFailedAcquire(p, node)依据前继节点判断是否阻塞以后节点的线程 // parkAndCheckInterrupt()阻塞以后线程并查看线程是否被中断过,若被中断过,将interrupted置为true if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) // 放弃获取资源 cancelAcquire(node); }}
能够发现,doAcquireShared与独占模式下的acquireQueued大同小异,次要有2点不同:
- doAcquireShared将线程的自我中断操作放在了办法体外部;
- 当线程获取到资源后,doAcquireShared会将以后线程所在的节点设为头结点,若资源有残余则唤醒后续节点,比acquireQueued多了个唤醒后续节点的操作。
上述办法体现了共享的实质,即以后线程吃饱了后,若资源有残余,会招呼前面排队的来一起吃,好货色要大家一起分享嘛,哈哈。
上面具体看一下setHeadAndPropagate(Node, int)函数:
private void setHeadAndPropagate(Node node, int propagate) { // 记录原来的头结点,上面过程会用到 Node h = head; // 设置新的头结点 setHead(node); // 如果资源还有残余,则唤醒后继节点 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); }}
能够看到,理论执行唤醒后继节点的办法是doReleaseShared(),持续追踪:
private void doReleaseShared() { // 自旋操作 for (;;) { // 获取期待队列的头结点 Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 唤醒后继节点的线程 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; }}
开释资源(共享模式)
首先进入到办法入口:
public final boolean releaseShared(int arg) { // 尝试开释资源 if (tryReleaseShared(arg)) { // 唤醒后继节点的线程 doReleaseShared(); return true; } return false;}
同样的,tryReleaseShared(int)由继承AQS的自定义同步器来具体实现。
doReleaseShared()上节解说setHeadAndPropagate已阐明过,不再赘述。
至此,共享模式下的资源获取/开释就解说完了,上面以一个具体场景来概括一下:
整个获取/开释资源的过程是通过流传实现的,如最开始有10个资源,线程A、B、C别离须要5、4、3个资源。
- A线程获取到5个资源,其发现资源还残余5个,则唤醒B线程;
- B线程获取到4个资源,其发现资源还残余1个,唤醒C线程;
- C线程尝试取3个资源,但发现只有1个资源,持续阻塞;
- A线程开释1个资源,其发现资源还残余2个,故唤醒C线程;
- C线程尝试取3个资源,但发现只有2个资源,持续阻塞;
- B线程开释2个资源,其发现资源还残余4个,唤醒C线程;
- C线程获取3个资源,其发现资源还剩1个,持续唤醒后续期待的D线程;
- ......
总结
本文次要介绍了AQS在独占和共享两种模式下,如何进行资源的获取和开释(tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared),须要留神的是,在acquire()和acquireShared()办法中,线程在阻塞过程中均是疏忽中断的。
AQS也能够通过acquireInterruptibly()/acquireSharedInterruptibly()来反对线程在期待过程中响应中断。
篇幅无限,本文就解说到这里。对于AQS其余高级个性,感兴趣的读者可跟一下源码。
参考
https://www.cnblogs.com/iou123lg/p/9464385.html
https://www.cnblogs.com/waterystone/p/4920797.html
https://www.jianshu.com/p/0da2939391cf
作者:安中古天乐
链接:https://www.jianshu.com/p/0f8...
起源:简书
著作权归作者所有。商业转载请分割作者取得受权,非商业转载请注明出处。