1. AQS是什么?
AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如罕用的ReentrantLock。
简略来说,AQS定义了一套框架,来实现同步类。
2. AQS核心思想
2.1 根本框架
AQS的核心思想是对于共享资源,保护一个双端队列来治理线程,队列中的线程顺次获取资源,获取不到的线程进入队列期待,直到资源开释,队列中的线程顺次获取资源。
AQS的根本框架如图所示:
2.1.1 资源state
state变量示意共享资源,通常是int
类型。
- 拜访办法
state类型用户无奈间接进行批改,而须要借助于AQS提供的办法进行批改,即getState()
、setState()
、compareAndSetState()
等。 拜访类型
AQS定义了两种资源拜访类型:- 独占(Exclusive):一个工夫点资源只能由一个线程占用;
- 共享(Share):一个工夫点资源能够被多个线程共用。
2.1.2 CLH双向队列
CLH队列是一种基于逻辑队列非线程饥饿的自旋偏心锁,具体介绍可参考此篇博客。CLH中每个节点都示意一个线程,处于头部的节点获取资源,而其余资源则期待。
节点构造
Node
类源码如下所示:static final class Node { // 模式,分为共享与独占 // 共享模式 static final Node SHARED = new Node(); // 独占模式 static final Node EXCLUSIVE = null; // 结点状态 // CANCELLED,值为1,示意以后的线程被勾销 // SIGNAL,值为-1,示意以后节点的后继节点蕴含的线程须要运行,也就是unpark // CONDITION,值为-2,示意以后节点在期待condition,也就是在condition队列中 // PROPAGATE,值为-3,示意以后场景下后续的acquireShared可能得以执行 // 值为0,示意以后节点在sync队列中,期待着获取锁 static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; // 结点状态 volatile int waitStatus; // 前驱结点 volatile Node prev; // 后继结点 volatile Node next; // 结点所对应的线程 volatile Thread thread; // 下一个期待者 Node nextWaiter; // 结点是否在共享模式下期待 final boolean isShared() { return nextWaiter == SHARED; } // 获取前驱结点,若前驱结点为空,抛出异样 final Node predecessor() throws NullPointerException { // 保留前驱结点 Node p = prev; if (p == null) // 前驱结点为空,抛出异样 throw new NullPointerException(); else // 前驱结点不为空,返回 return p; } // 无参构造方法 Node() { // Used to establish initial head or SHARED marker } // 构造方法 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } // 构造方法 Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; }}
Node
的办法和属性值如图所示:
其中,
waitStatus
示意以后节点在队列中的状态;thread
示意以后节点示意的线程;prev
和next
别离示意以后节点的前驱节点和后继节点;nextWaiter
d当存在CONDTION队列时,示意一个condition状态的后继节点。- waitStatus
结点的期待状态是一个整数值,具体的参数值和含意如下所示: 1
-CANCELLED
,示意节点获取锁的申请被勾销,此时节点不再申请资源;0
,是节点初始化的默认值;-1
-SIGNAL
,示意线程做好筹备,期待资源开释;-2
-CONDITION
,示意节点在condition期待队列中,期待被唤醒而进入同步队列;-3
-PROPAGATE
,以后线程处于共享模式下的时候会应用该字段。
2.2 AQS模板
AQS提供一系列构造,作为一个残缺的模板,自定义的同步器只须要实现资源的获取和开释就能够,而不须要思考底层的队列批改、状态扭转等逻辑。
应用AQS实现一个自定义同步器,须要实现的办法:
isHeldExclusively()
:该线程是否独占资源,在应用到condition的时候会实现这一办法;tryAcquire(int)
:独占模式获取资源的形式,胜利获取返回true
,否则返回false
;tryRelease(int)
:独占模式开释资源的形式,胜利获取返回true
,否则返回false
;tryAcquireShared(int)
:共享模式获取资源的形式,胜利获取返回true
,否则返回false
;tryReleaseShared(int)
:共享模式开释资源的形式,胜利获取返回true
,否则返回false
;
一般来说,一个同步器是资源独占模式或者资源共享模式的其中之一,因而tryAcquire(int)
和tryAcquireShared(int)
只须要实现一个即可,tryRelease(int)
和tryReleaseShared(int)
同理。
然而同步器也能够实现两种模式的资源获取和开释,从而实现独占和共享两种模式。
3. 源码剖析
3.1 acquire(int)
acquire(int)
是获取源码局部的顶层入口,源码如下所示:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}
这段代码展示的资源获取流程如下:
tryAcquire()
尝试间接去获取资源;获取胜利则间接返回- 如果获取失败,则
addWaiter()
将该线程退出期待队列的尾部,并标记为独占模式; acquireQueued()
使线程阻塞在期待队列中获取资源,始终获取到资源后才返回。
简略总结就是:
- 获取资源;
- 失败就排队;
- 排队要期待。
从上文的形容可见重要的办法有三个:tryAquire()
、addWaiter()
、acquireQueued()
。上面将一一剖析其源码:
3.1.1 tryAcquire(int)
tryAcquire(int)
是获取资源的办法,源码如下所示:
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}
该办法是一个空办法,须要自定义同步器实现,因而在应用AQS实现同步器时,须要重写该办法。这也是“自定义的同步器只须要实现资源的获取和开释就能够”的体现。
3.1.2 addWaiter(Node.EXCLUSIVE)
addWaiter(Node.EXCLUSIVE)
是将线程退出期待队列的尾部,源码如下所示:
private Node addWaiter(Node mode) { //以给定模式结构结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享) //aquire()办法是独占模式,因而间接应用Exclusive参数。 Node node = new Node(Thread.currentThread(), mode); //尝试疾速形式间接放到队尾。 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //上一步失败则通过enq入队。 enq(node); return node;}
首先,应用模式将以后线程结构为一个节点,而后尝试将该节点放入队尾,如果胜利则返回,否则调用enq(node)
将节点放入队尾,最终返回以后节点的地位指针。
其中,enq(node)
办法是将节点退出队列的办法,源码如下所示:
private Node enq(final Node node) { for (;;) { // 有限循环,确保结点可能胜利入队列 // 保留尾结点 Node t = tail; if (t == null) { // 尾结点为空,即还没被初始化 if (compareAndSetHead(new Node())) // 头节点为空,并设置头节点为新生成的结点 tail = head; // 头节点与尾结点都指向同一个新生结点 } else { // 尾结点不为空,即曾经被初始化过 // 将node结点的prev域连贯到尾结点 node.prev = t; if (compareAndSetTail(t, node)) { // 比拟结点t是否为尾结点,若是则将尾结点设置为node // 设置尾结点的next域为node t.next = node; return t; // 返回尾结点 } } }}
3.1.3 acquireQueued(Node node, int arg)
这部分源码是将线程阻塞在期待队列中,线程处于期待状态,直到获取到资源后才返回,源码如下所示:
// sync队列中的结点在独占且疏忽中断的模式下获取(资源)final boolean acquireQueued(final Node node, int arg) { // 标记 boolean failed = true; try { // 中断标记 boolean interrupted = false; for (;;) { // 有限循环 // 获取node节点的前驱结点 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // 前驱为头节点并且胜利取得锁 setHead(node); // 设置头节点 p.next = null; // help GC failed = false; // 设置标记 return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())// //shouldParkAfterFailedAcquire只有当该节点的前驱结点的状态为SIGNAL时,才能够对该结点所封装的线程进行park操作。否则,将不能进行park操作。 //parkAndCheckInterrupt首先执行park操作,即禁用以后线程,而后返回该线程是否曾经被中断 interrupted = true; } } finally { if (failed) cancelAcquire(node); }}
acquireQueued(Node node, int arg)
办法的次要逻辑如下:
- 获取
node
节点的前驱结点,判断前驱节点是不是头部节点head,有没有胜利获取资源。 - 如果前驱结点是头部节点head并且获取了资源,阐明本人应该被唤醒,设置该节点为head节点期待下一个取得资源;
如果前驱节点不是头部节点或者没有获取资源,则判断是否须要park以后线程,
- 判断前驱节点状态是不是
SIGNAL
,是的话则park以后节点,否则不执行park操作;
- 判断前驱节点状态是不是
park
以后节点之后,以后节点进入期待状态,期待被其余节点unpark
操作唤醒。而后反复此逻辑步骤。
3.2 release(int)
release(int)
是开释资源的顶层入口办法,源码如下所示:
public final boolean release(int arg) { if (tryRelease(arg)) { // 开释胜利 // 保留头节点 Node h = head; if (h != null && h.waitStatus != 0) // 头节点不为空并且头节点状态不为0 unparkSuccessor(h); //开释头节点的后继结点 return true; } return false;}
release(int)
办法的次要逻辑如下:
- 尝试开释资源,如果开释胜利则返回
true
,否则返回false
; - 开释胜利之后,须要调用
unparkSuccessor(h)
唤醒后继节点。
上面介绍两个重要的源码函数:tryRelease(int)
和unparkSuccessor(h)
。
3.2.1 tryRelease(int)
tryRelease(int)
是开释资源的办法,源码如下所示:
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}
这部分是须要自定义同步器本人实现的,要留神的是返回值须要为boolean
类型,示意开释资源是否胜利。
3.2.2 unparkSuccessor(h)
unparkSuccessor(h)
是唤醒后继节点的办法,源码如下所示:
private void unparkSuccessor(Node node) { //这里,node个别为以后线程所在的结点。 int ws = node.waitStatus; if (ws < 0)//置零以后线程所在的结点状态,容许失败。 compareAndSetWaitStatus(node, ws, 0); Node s = node.next;//找到下一个须要唤醒的结点s if (s == null || s.waitStatus > 0) {//如果为空或已勾销 s = null; for (Node t = tail; t != null && t != node; t = t.prev) // 从后向前找。 if (t.waitStatus <= 0)//从这里能够看出,<=0的结点,都是还无效的结点。 s = t; } if (s != null) LockSupport.unpark(s.thread);//唤醒}
这部分次要是查找第一个还处于期待状态的节点,将其唤醒;
查找程序是从后往前找,这是因为CLH队列中的prev
链是强统一的,从后往前找更加平安,而next
链因为addWaiter()
办法和cancelAcquire()
办法的存在,不是强统一的,因而从前往后找可能会呈现问题。这部分的具体解释能够参考参考文献-1
3.3 acquireShared(int)和releaseShared(int)
3.3.1 acquireShared(int)
是应用共享模式获取共享资源的顶层入口办法,源码如下所示:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg);}
流程如下:
- 通过
tryAcquireShared(arg)
尝试获取资源,如果获取胜利则间接返回; - 如果获取资源失败,则调用
doAcquireShared(arg)
将线程阻塞在期待队列中,直到被unpark()
/interrupt()
并胜利获取到资源才返回。
其中,tryAcquireShared(arg)
是获取共享资源的办法,也是须要用户本人实现。
而doAcquireShared(arg)
是将线程阻塞在期待队列中,直到获取到资源后才返回,具体流程和acquireQueued()
办法相似,
源码如下所示:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED);//退出队列尾部 boolean failed = true;//是否胜利标记 try { boolean interrupted = false;//期待过程中是否被中断过的标记 for (;;) { final Node p = node.predecessor();//前驱 if (p == head) {//如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒本人的 int r = tryAcquireShared(arg);//尝试获取资源 if (r >= 0) {//胜利 setHeadAndPropagate(node, r);//将head指向本人,还有残余资源能够再唤醒之后的线程 p.next = null; // help GC if (interrupted)//如果期待过程中被打断过,此时将中断补上。 selfInterrupt(); failed = false; return; } } //判断状态,寻找平安点,进入waiting状态,等着被unpark()或interrupt() if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}
3.3.2 releaseShared(int)
releaseShared(int)
是开释共享资源的顶层入口办法,源码如下所示:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {//尝试开释资源 doReleaseShared();//唤醒后继结点 return true; } return false;}
流程如下:
- 应用
tryReleaseShared(arg)
尝试开释资源,如果开释胜利则返回true,否则返回false; - 如果开释胜利,则调用
doReleaseShared()
唤醒后继节点。
上面介绍一下doReleaseShared()
办法,源码如下所示:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h);//唤醒后继 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head)// head发生变化 break; }}
4. 面试问题模仿
Q:AQS是接口吗?有哪些没有实现的办法?看过相干源码吗?
AQS定义了一个实现同步类的框架,实现办法次要有tryAquire
和tryRelease
,示意独占模式的资源获取和开释,tryAquireShared
和tryReleaseShared
示意共享模式的资源获取和开释。
源码剖析如上文所述。
参考资料
- Java并发之AQS详解
- JUC锁: 锁外围类AQS详解
- 从ReentrantLock的实现看AQS的原理及利用