上一篇文章中笔者曾经介绍过 AQS 的基础知识,然而更加具体的源码实现还未波及,本篇文章笔者就联合具体的 AQS 实现类来聊一聊 AQS 的 独占锁 实现过程
前言
JDK 版本号:1.8.0_171
之前的文章中曾经介绍过,AQS 是一套多线程访问共享资源的同步器框架实现
,开发者只须要依照需要在实现类中实现对应的办法即可实现锁或者同步器的构建,在 AQS 的源码正文局部,如果你有认真看过的话应该会留神到,作者举了 2 个简略的例子,一个是独占锁实现 Mutex,另一个是共享锁实现 BooleanLatch
作为示例笔者就以这两个简略的锁实现联合 AQS 的具体方法来进行解说阐明,这里揭示下请先看下笔者的上篇文章 JDK 源码那些事儿之传说中的 AQS- 概览
,理解 AQS 的基本知识,同时浏览下之前对Object 和LockSupport的解说,笔者默认读者应该曾经理解了这些相干知识点
其次,集体认为初学者学习 AQS 的源码须要多思考多 debug,多看几遍本篇文章能力了解。因为篇幅过长,笔者这篇文章只解说简略的独占锁实现流程
示例
先来看下源码作者实现的独占锁 Mutex,能够看到封装了 外部类 Sync 继承 AbstractQueuedSynchronizer实现了独占锁须要的几个办法,这里次要是tryAcquire,tryRelease,其余办法留给读者自行深刻,这里再温习下 AQS 定义这两个办法的含意:
- tryAcquire:独占模式尝试获取资源,胜利则返回 true,失败则返回 false
- tryRelease:独占模式尝试开释资源,胜利则返回 true,失败则返回 false
这里先理解下即可,可略过持续看上面的测试代码实现
class Mutex implements Lock, Serializable {
// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
// Reports whether in locked state
@Override
protected boolean isHeldExclusively() {return getState() == 1;
}
// Acquires the lock if state is zero
@Override
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// Releases the lock by setting state to zero
@Override
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0)
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Provides a Condition
Condition newCondition() {return new ConditionObject();
}
// Deserializes properly
private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();
@Override
public void lock() {sync.acquire(1);
}
@Override
public boolean tryLock() {return sync.tryAcquire(1);
}
@Override
public void unlock() {sync.release(1);
}
@Override
public Condition newCondition() {return sync.newCondition();
}
public boolean isLocked() {return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {return sync.hasQueuedThreads();
}
@Override
public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
笔者这里写了一个简略的应用示例供大家参考,其中应用了CountDownLatch,仅仅是保障线程池中的所有线程同一时刻争抢 Mutex 独占锁,读者能够不必过多关注,把这个去掉也能够,不是重点,咱们重点关注下 Mutex 的应用即可,最终的后果想必读者也能猜到,其中的线程一个一个争抢到锁能力执行业务逻辑操作
int threadNum = 10;
CountDownLatch countDownLatch = new CountDownLatch(1);
Mutex mutex = new Mutex();
ExecutorService threadPool = Executors.newFixedThreadPool(10);
Runnable task = () -> {
try {
// 保障同时开始争抢锁
countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();
}
// 获取锁,未获取则阻塞期待
mutex.lock();
try {System.out.println(Thread.currentThread().getName() + "获取锁");
try {
// 模仿业务解决
Thread.sleep(2000);
} catch (InterruptedException e) {e.printStackTrace();
}
} finally {System.out.println(Thread.currentThread().getName() + "开释锁");
mutex.unlock();}
};
// 开启 10 个线程争抢互斥锁
for (int i = 0; i < threadNum; i++) {threadPool.submit(task);
}
// 同时开始执行
countDownLatch.countDown();
threadPool.shutdown();
外围代码
在下面的应用示例里最重要的局部在于上面这段:
// 获取锁,未获取则阻塞期待
mutex.lock();
try { } finally {
// 开释锁
mutex.unlock();}
笔者就联合这个示例来剖析 AQS 独占锁底层源码实现的流程,为了不便解说,先规定下场景:
线程池中蕴含 3 个线程,最终线程获取到独占锁执行业务逻辑的程序为:线程 A -> 线程 B -> 线程 C(留神,这里程序是随机的,笔者只是为了不便解说这样设置)
那么 AQS 底层到底是如何流转的呢? 如何在 Java 语言层面上保障独占呢?开释锁时如何唤醒期待的线程呢?能够先思考几分钟,想想如果让你来设计,你会怎么做?
lock
看下调用过程
mutex.lock() -> sync.acquire(1) -> !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
mutex.lock()在内部类 Sync 的 lock 办法上,而这个办法最终是在调用 AbstractQueuedSynchronizer 的acquire办法,这个办法含意如下:
以独占模式获取共享资源,疏忽中断,即线程在 aquire 过程中,中断此线程是有效的,同时至多调用一次 tryAcquire 办法,胜利获取到共享资源则返回,否则,将以后想要独占资源的线程封装成 Node 节点增加到同步队列(sync queue)中,阻塞期待获取共享资源
初学者可能会感到困惑,这里 tryAcquire 就是在尝试获取共享资源,胜利则不须要进行第二个条件了,否则就须要将以后线程封装成节点增加到队列中阻塞期待,联合下面示例,你也应该明确,为什么线程 B 和线程 C 在线程 A 未执行结束时始终阻塞期待了吧,那么这里具体是怎么实现的呢?笔者率领大家一步一步往下看
public final void acquire(int arg) {if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
查看源码 tryAcquire 的实现,这个办法就是子类须要实现的了,可能 Java 老手会感觉有点绕,最好先去理解下 模板办法设计模式 的应用,会绝对轻松些,在下面的示例中外部类实现如下:
// Acquires the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
十分简单明了,就是通过 CAS 尝试将 state 从 0 设置为 1,你也能看进去,这里入参是为了通用性而传入的值(对于这个示例来说无意义),这里重点了解的局部在于如果 CAS 胜利了,行将 state 从 0 设置成了 1,表明以后线程获取到了共享资源,对于独占锁来说,就是以后线程独占,通过 setExclusiveOwnerThread 记录独占线程,返回 true 表明胜利,最终在示例代码中也就是 mutex.lock 处不会进行阻塞,会持续向下执行程序,此时线程 A 获取到锁时 AQS 的状况如下:
继续执行,在线程 A 还未通过 mutex.unlock()开释锁时,线程 B 执行 mutex.lock()获取锁,线程 B 在这里会阻塞住期待锁的开释,获取到锁才会继续执行,那么底层到底是如何实现的呢?
再回过头看下tryAcquire,这里返回 false,再看acquire,执行第二个条件acquireQueued(addWaiter(Node.EXCLUSIVE), arg),这里波及到了两个办法,一个一个来看底层是如何解决的
首先是 addWaiter(Node.EXCLUSIVE)
办法,去 AQS 看下实现:
private Node addWaiter(Node mode) {
// 以给定的模式创立以后线程的节点
Node node = new Node(Thread.currentThread(), mode);
// 增加到同步队列尾部
Node pred = tail;
// 尾节点非空,即曾经被初始化了
if (pred != null) {
// node 前驱指向 pred
node.prev = pred;
// CAS 更新尾节点为新增节点 node
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 尾节点为空或 CAS 更新尾节点失败则调用 enq 来操作直到胜利
enq(node);
return node;
}
这里只针对示例中的独占锁实现剖析,你能够看到源码中Node.EXCLUSIVE = null,AQS 的 head,tail,state 在 Sync 外部类中曾经初始化过了,初始值为 null,null 和 0,持续看源码做了什么,首先创立了一个 Node 节点,调用如下,这里能够看到设置了节点的this.nextWaiter = mode = Node.EXCLUSIVE,thread 保留以后线程
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
线程 B 执行时,pred = null故这里间接进入 enq(node)办法,看下源码实现:
private Node enq(final Node node) {
// 有限循环保障插入节点中
for (;;) {
// 获取尾节点
Node t = tail;
// 尾节点为空,先初始化,CAS 设置头节点
// 胜利则设置尾节点
if (t == null) { // Must initialize
// 头节点为空,设置头节点为新创建的节点
if (compareAndSetHead(new Node()))
// 头尾节点指向同一个新节点
tail = head;
} else {
// 设置 node 节点前驱为尾节点
node.prev = t;
// CAS 更新尾节点为 node 节点
// 胜利则更新旧的尾节点的 next 指向 node
// 链表关系才算更新结束,返回旧的尾节点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
综上所述,线程 B 执行流程如下:
- 首次循环尾节点为空
- 创立一个空 Node 节点
- 头尾都指向这个节点
- 第二次循环尾节点非空
- 将参数的节点 node 增加到链表中并更新 tail 指向
- 返回 node 的前一个节点,这里也就是 head 指向的空 Node 节点
此时内部结构图示如下:
接下来再回头看下acquireQueued(addWaiter(Node.EXCLUSIVE), arg),看下源码中是如何解决的
- 参数一为线程 B 封装成的 Node 节点
- 参数二为传入的 int 值 1
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 中断标识
boolean interrupted = false;
for (;;) {
// 获取 node 的前驱节点
final Node p = node.predecessor();
// 前驱节点为头节点,同时获取共享资源胜利
if (p == head && tryAcquire(arg)) {
// 设置头节点为 node 节点
setHead(node);
// 清理 p 节点
p.next = null; // help GC
failed = false;
return interrupted;
}
// 前驱非头节点或获取共享资源失败则查看并挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 失败勾销节点持续获取资源
if (failed)
cancelAcquire(node);
}
}
线程 B 执行流程如下:
- 获取到线程 B 封装成的 Node 节点的前一个节点,这里也就是 AQS 的 head 节点
- 通过 tryAcquire(arg)尝试获取锁,因为线程 A 还未开释,所以第一个 if 条件失败,
- 进入第二个 if 条件
先看下shouldParkAfterFailedAcquire(p, node),从这个办法名你应该略微能明确该办法的作用,获取锁失败是否应该阻塞线程,首先明确下传参:
- 参数一 p 是 head 节点
- 参数二 node 是线程 B 封装的 Node 节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// pred 节点曾经设置为 SIGNAL 间接返回,可进行 park 操作
return true;
if (ws > 0) {
// 前驱节点 pred 处于 CANCELLED 状态
// 找到其前驱节点中非 CANCELLED 状态的节点,// node 的前驱指向这个新的 pred 节点
do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
// 更新 pred 节点 next 指针指向 node
pred.next = node;
} else {
// 更新为 SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 不能进行 park 操作
return false;
}
线程 B 执行流程如下:
- 第一次执行 shouldParkAfterFailedAcquire(p, node),head 节点的 waitStatus 默认为 0,compareAndSetWaitStatus(pred, ws, Node.SIGNAL)更新 head 节点的 waitStatus 为 SIGNAL(不分明 Node 状态的含意能够去看下上一篇文章,示意后继节点中的线程在期待以后节点唤醒,这里后继节点也就是 node,也就是线程 B 封装的 Node 节点)
- 返回 false,shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()这次执行就算完结了,在 acquireQueued 里持续循环
- 第二次执行 shouldParkAfterFailedAcquire(p, node),此时 head 节点的 waitStatus 为 SIGNAL,进入第一个 if (ws == Node.SIGNAL)条件中,返回 true
- 在 acquireQueued 条件里开始执行第二个办法 parkAndCheckInterrupt(),进行阻塞线程操作(在唤醒时查看线程中断状态)
看下 parkAndCheckInterrupt 源码实现:
private final boolean parkAndCheckInterrupt() {
// 设置以后对象为 blocker
LockSupport.park(this);
// 返回中断标记
return Thread.interrupted();}
这里再揭示下,没理解 LockSupport 的读者麻烦先去看下笔者之前对 LockSupport 文章的解说,或者先去 Google 理解下,AQS 通过 LockSupport.park(this)来对以后线程进行阻塞,到这里线程 B 将始终阻塞直到被唤醒,前面开释锁的线程唤醒阻塞线程时笔者会持续从这里再持续进行阐明,此时 AQS 中状态如下:
线程 B 阻塞期待获取锁的整个办法执行流程,读者可参考下图进行了解:
同样的,此时线程 C 执行 mutex.lock()获取锁时,读者能够类比下面线程 B 执行的过程,梳理下线程 C 的过程,线程 C 同样获取不到锁,执行流程如下:
- tryAcquire 返回 false
- addWaiter 将以后线程封装成 Node 节点,更新 tail 指针,此时 tail 指向线程 C 对应的 Node
- 执行 acquireQueued 办法,前一个节点非 head 节点,间接执行 shouldParkAfterFailedAcquire
- 更新前驱节点也就是线程 B 对应的节点状态为 SIGNAL,acquireQueued 第一次循环执行完结
- 开始第二次循环操作,进入 shouldParkAfterFailedAcquire,返回 true,开始执行 parkAndCheckInterrupt,通过 LockSupport.park 阻塞期待
此时同步队列如下图:
unlock
看下开释锁操作,在线程 A 获取锁之后执行完业务逻辑执行mutex.unlock(),看下调用过程:
mutex.unlock() -> sync.release(1) -> tryRelease(arg) -> unparkSuccessor(h)
代码实现如下:
public void unlock() {sync.release(1);
}
而后就到 AQS 的办法了
public final boolean release(int arg) {
// 尝试开释,子类须要实现
if (tryRelease(arg)) {
Node h = head;
// 头节点非空且头节点状态非 0
if (h != null && h.waitStatus != 0)
// 唤醒头节点后继节点
unparkSuccessor(h);
return true;
}
return false;
}
子类实现如下:
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0)
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
此时线程 A 执行流程如下:
- tryRelease(1)尝试开释,进入 Sync 类实现的 tryRelease 办法
- 通过 setExclusiveOwnerThread 将独占线程属性设置为 null
- setState 将 state 设置为 0,返回 true
- head 节点非 null 同时 waitStatus 非 0 时通过 unparkSuccessor 唤醒线程 B 继续执行
留神,tryRelease 只是更新了 AQS 的相干属性,还未进行阻塞线程的唤醒操作,看下代码逻辑,head 节点非 null 同时 waitStatus 非 0,这里也就是下面线程 B 尝试获取锁失败后更新了前一个节点也就是 head 节点的 waitStatus 的起因,通过 head 节点的 waitStatus 来判断是否须要唤醒后继节点
unparkSuccessor 入参在线程 A 执行过程中为 head 节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 节点状态处于 SIGNAL(-1)/CONDITION(-2)/PROPAGATE(-3)时 CAS 更新状态为 0
// 唤醒后继节点,尝试将以后节点置为初始状态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取后继节点
Node s = node.next;
// 节点为 null 或者处于 CANCELLED(1)时
// 从尾节点从后向前进行遍历
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
// 找到无效节点,非 CANCELLED(1)节点
if (t.waitStatus <= 0)
s = t;
}
// 非 null 唤醒节点 s 的线程
if (s != null)
LockSupport.unpark(s.thread);
}
来看下线程 A 开释锁的执行流程:
- head 的节点 waitStatus 为 SIGNAL,这里就是 ws,执行 compareAndSetWaitStatus 将 head 节点的 waitStatus 置为 0
- 获取 head 节点的后继节点,也就是线程 B 封装成的 Node 节点
- 判断,第一个条件后继节点为 null 或者节点处于 CANCELLED 状态(这个状态也就是勾销持续获取),阐明节点 s 非无效节点
- 从 tail 节点向前查找第一个无效节点 t,s 指向 t,这里 B 线程对应节点是无效节点,不须要执行
- s 非 null,LockSupport.unpark 唤醒 s 节点线程,这里也就是唤醒线程 B
此时线程 A 曾经执行结束,将锁开释,唤醒线程 B 继续执行,线程 B 阻塞期待在 parkAndCheckInterrupt
办法中,下面曾经进行了阐明,再次看下这个办法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 中断标识
boolean interrupted = false;
for (;;) {
// 获取 node 的前驱节点
final Node p = node.predecessor();
// 前驱节点为头节点,同时获取共享资源胜利
if (p == head && tryAcquire(arg)) {
// 设置头节点为 node 节点
setHead(node);
// 清理 p 节点
p.next = null; // help GC
failed = false;
return interrupted;
}
// 前驱非头节点或获取共享资源失败则查看并挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 失败勾销节点持续获取资源
if (failed)
cancelAcquire(node);
}
}
线程 B 被唤醒之后执行流程如下:
- 持续循环,线程 B 对应节点的前驱节点即 head 节点,tryAcquire 获取到共享资源(锁)胜利
- 设置头节点为线程 B 对应的节点,这里新设置了 head 节点,留神下
- 清理原 head 节点
- 返回是否中断标记,若失败则勾销节点持续获取资源
此时 AQS 图示如下:
同样的,当线程 B 执行 mutex.unlock()开释锁时,读者能够类比下面线程 A 执行的过程,梳理下线程 B 开释锁,线程 C 被唤醒的过程,执行流程如下:
- tryRelease 尝试开释锁,进入 Sync 类实现的 tryRelease 办法,独占线程属性设置为 null,更新 state 为 0
- head 节点非 null 同时 waitStatus 非 0 时执行 unparkSuccessor(这里 head 节点在下面线程 B 被唤醒后曾经替换成线程 B 对应的节点了)
- head 的节点 waitStatus 为 SIGNAL,这里就是 ws,执行 compareAndSetWaitStatus 将 head 节点(线程 B 对应的)的 waitStatus 置为 0
- 获取 head 节点的后继节点,也就是线程 C 封装成的 Node 节点
- 判断这里线程 C 封装成的 Node 节点是无效节点
- 非 null,LockSupport.unpark 唤醒节点对应的线程,这里也就是唤醒线程 C
好,到这里持续反复下面的流程,也就是 C 被唤醒之后获取到锁而后开释的过程,开释过程稍有不同,因为没有后继节点线程须要被唤醒了,这里线程 C 开释锁的流程也梳理下:
- tryRelease 尝试开释锁,进入 Sync 类实现的 tryRelease 办法,独占线程属性设置为 null,更新 state 为 0
- head 节点 (线程 C 对应的节点) 非 null 然而 waitStatus=0,不须要执行 unparkSuccessor 操作,因为没有后继节点线程须要被唤醒,间接返回即可
总结
至此,通过源码作者的独占锁示例,联合简略应用示例,笔者通过图示和外围源码执行过程进行了具体解说阐明,独占锁实现流程层层递进,置信读者应该足够理解 AQS 了
当然,限于篇幅,笔者未对 AQS 的所有办法进行阐明,心愿有趣味的同学自行查阅理解,心愿对各位有所帮忙
以上内容如有问题欢送指出,笔者验证后将及时修改,谢谢