上一篇文章中笔者曾经介绍过AQS的基础知识,然而更加具体的源码实现还未波及,本篇文章笔者就联合具体的AQS实现类来聊一聊AQS的独占锁实现过程
前言
JDK版本号:1.8.0_171
之前的文章中曾经介绍过,AQS是一套多线程访问共享资源的同步器框架实现
,开发者只须要依照需要在实现类中实现对应的办法即可实现锁或者同步器的构建,在AQS的源码正文局部,如果你有认真看过的话应该会留神到,作者举了2个简略的例子,一个是独占锁实现Mutex,另一个是共享锁实现BooleanLatch
作为示例笔者就以这两个简略的锁实现联合AQS的具体方法来进行解说阐明,这里揭示下请先看下笔者的上篇文章JDK源码那些事儿之传说中的AQS-概览
,理解AQS的基本知识,同时浏览下之前对Object和LockSupport的解说,笔者默认读者应该曾经理解了这些相干知识点
其次,集体认为初学者学习AQS的源码须要多思考多debug,多看几遍本篇文章能力了解。因为篇幅过长,笔者这篇文章只解说简略的独占锁实现流程
示例
先来看下源码作者实现的独占锁Mutex,能够看到封装了外部类Sync继承AbstractQueuedSynchronizer实现了独占锁须要的几个办法,这里次要是tryAcquire,tryRelease,其余办法留给读者自行深刻,这里再温习下AQS定义这两个办法的含意:
- tryAcquire:独占模式尝试获取资源,胜利则返回true,失败则返回false
- tryRelease:独占模式尝试开释资源,胜利则返回true,失败则返回false
这里先理解下即可,可略过持续看上面的测试代码实现
class Mutex implements Lock, Serializable {
// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
// Reports whether in locked state
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
// Acquires the lock if state is zero
@Override
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// Releases the lock by setting state to zero
@Override
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0)
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Provides a Condition
Condition newCondition() {
return new ConditionObject();
}
// Deserializes properly
private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
笔者这里写了一个简略的应用示例供大家参考,其中应用了CountDownLatch,仅仅是保障线程池中的所有线程同一时刻争抢Mutex独占锁,读者能够不必过多关注,把这个去掉也能够,不是重点,咱们重点关注下Mutex的应用即可,最终的后果想必读者也能猜到,其中的线程一个一个争抢到锁能力执行业务逻辑操作
int threadNum = 10;
CountDownLatch countDownLatch = new CountDownLatch(1);
Mutex mutex = new Mutex();
ExecutorService threadPool = Executors.newFixedThreadPool(10);
Runnable task = () -> {
try {
// 保障同时开始争抢锁
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 获取锁,未获取则阻塞期待
mutex.lock();
try {
System.out.println(Thread.currentThread().getName() + "获取锁");
try {
// 模仿业务解决
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
System.out.println(Thread.currentThread().getName() + "开释锁");
mutex.unlock();
}
};
// 开启10个线程争抢互斥锁
for (int i = 0; i < threadNum; i++) {
threadPool.submit(task);
}
// 同时开始执行
countDownLatch.countDown();
threadPool.shutdown();
外围代码
在下面的应用示例里最重要的局部在于上面这段:
// 获取锁,未获取则阻塞期待
mutex.lock();
try {
} finally {
// 开释锁
mutex.unlock();
}
笔者就联合这个示例来剖析AQS独占锁底层源码实现的流程,为了不便解说,先规定下场景:
线程池中蕴含3个线程,最终线程获取到独占锁执行业务逻辑的程序为:线程A->线程B->线程C(留神,这里程序是随机的,笔者只是为了不便解说这样设置)
那么AQS底层到底是如何流转的呢?如何在Java语言层面上保障独占呢?开释锁时如何唤醒期待的线程呢?能够先思考几分钟,想想如果让你来设计,你会怎么做?
lock
看下调用过程
mutex.lock() -> sync.acquire(1) -> !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
mutex.lock()在内部类Sync的lock办法上,而这个办法最终是在调用AbstractQueuedSynchronizer的acquire办法,这个办法含意如下:
以独占模式获取共享资源,疏忽中断,即线程在aquire过程中,中断此线程是有效的,同时至多调用一次tryAcquire办法,胜利获取到共享资源则返回,否则,将以后想要独占资源的线程封装成Node节点增加到同步队列(sync queue)中,阻塞期待获取共享资源
初学者可能会感到困惑,这里tryAcquire就是在尝试获取共享资源,胜利则不须要进行第二个条件了,否则就须要将以后线程封装成节点增加到队列中阻塞期待,联合下面示例,你也应该明确,为什么线程B和线程C在线程A未执行结束时始终阻塞期待了吧,那么这里具体是怎么实现的呢?笔者率领大家一步一步往下看
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
查看源码tryAcquire的实现,这个办法就是子类须要实现的了,可能Java老手会感觉有点绕,最好先去理解下模板办法设计模式的应用,会绝对轻松些,在下面的示例中外部类实现如下:
// Acquires the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
十分简单明了,就是通过CAS尝试将state从0设置为1,你也能看进去,这里入参是为了通用性而传入的值(对于这个示例来说无意义),这里重点了解的局部在于如果CAS胜利了,行将state从0设置成了1,表明以后线程获取到了共享资源,对于独占锁来说,就是以后线程独占,通过setExclusiveOwnerThread记录独占线程,返回true表明胜利,最终在示例代码中也就是mutex.lock处不会进行阻塞,会持续向下执行程序,此时线程A获取到锁时AQS的状况如下:
继续执行,在线程A还未通过mutex.unlock()开释锁时,线程B执行mutex.lock()获取锁,线程B在这里会阻塞住期待锁的开释,获取到锁才会继续执行,那么底层到底是如何实现的呢?
再回过头看下tryAcquire,这里返回false,再看acquire,执行第二个条件acquireQueued(addWaiter(Node.EXCLUSIVE), arg),这里波及到了两个办法,一个一个来看底层是如何解决的
首先是addWaiter(Node.EXCLUSIVE)
办法,去AQS看下实现:
private Node addWaiter(Node mode) {
// 以给定的模式创立以后线程的节点
Node node = new Node(Thread.currentThread(), mode);
// 增加到同步队列尾部
Node pred = tail;
// 尾节点非空,即曾经被初始化了
if (pred != null) {
// node前驱指向pred
node.prev = pred;
// CAS更新尾节点为新增节点node
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 尾节点为空或CAS更新尾节点失败则调用enq来操作直到胜利
enq(node);
return node;
}
这里只针对示例中的独占锁实现剖析,你能够看到源码中Node.EXCLUSIVE = null,AQS的head,tail,state在Sync外部类中曾经初始化过了,初始值为null,null和0,持续看源码做了什么,首先创立了一个Node节点,调用如下,这里能够看到设置了节点的this.nextWaiter = mode = Node.EXCLUSIVE,thread保留以后线程
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
线程B执行时,pred = null故这里间接进入enq(node)办法,看下源码实现:
private Node enq(final Node node) {
// 有限循环保障插入节点中
for (;;) {
// 获取尾节点
Node t = tail;
// 尾节点为空,先初始化,CAS设置头节点
// 胜利则设置尾节点
if (t == null) { // Must initialize
// 头节点为空,设置头节点为新创建的节点
if (compareAndSetHead(new Node()))
// 头尾节点指向同一个新节点
tail = head;
} else {
// 设置node节点前驱为尾节点
node.prev = t;
// CAS更新尾节点为node节点
// 胜利则更新旧的尾节点的next指向node
// 链表关系才算更新结束,返回旧的尾节点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
综上所述,线程B执行流程如下:
- 首次循环尾节点为空
- 创立一个空Node节点
- 头尾都指向这个节点
- 第二次循环尾节点非空
- 将参数的节点node增加到链表中并更新tail指向
- 返回node的前一个节点,这里也就是head指向的空Node节点
此时内部结构图示如下:
接下来再回头看下acquireQueued(addWaiter(Node.EXCLUSIVE), arg),看下源码中是如何解决的
- 参数一为线程B封装成的Node节点
- 参数二为传入的int值1
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 中断标识
boolean interrupted = false;
for (;;) {
// 获取node的前驱节点
final Node p = node.predecessor();
// 前驱节点为头节点,同时获取共享资源胜利
if (p == head && tryAcquire(arg)) {
// 设置头节点为node节点
setHead(node);
// 清理p节点
p.next = null; // help GC
failed = false;
return interrupted;
}
// 前驱非头节点或获取共享资源失败则查看并挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 失败勾销节点持续获取资源
if (failed)
cancelAcquire(node);
}
}
线程B执行流程如下:
- 获取到线程B封装成的Node节点的前一个节点,这里也就是AQS的head节点
- 通过tryAcquire(arg)尝试获取锁,因为线程A还未开释,所以第一个if条件失败,
- 进入第二个if条件
先看下shouldParkAfterFailedAcquire(p, node),从这个办法名你应该略微能明确该办法的作用,获取锁失败是否应该阻塞线程,首先明确下传参:
- 参数一p是head节点
- 参数二node是线程B封装的Node节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// pred节点曾经设置为SIGNAL间接返回,可进行park操作
return true;
if (ws > 0) {
// 前驱节点pred处于CANCELLED状态
// 找到其前驱节点中非CANCELLED状态的节点,
// node的前驱指向这个新的pred节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 更新pred节点next指针指向node
pred.next = node;
} else {
// 更新为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 不能进行park操作
return false;
}
线程B执行流程如下:
- 第一次执行shouldParkAfterFailedAcquire(p, node),head节点的waitStatus默认为0,compareAndSetWaitStatus(pred, ws, Node.SIGNAL)更新head节点的waitStatus为SIGNAL(不分明Node状态的含意能够去看下上一篇文章,示意后继节点中的线程在期待以后节点唤醒,这里后继节点也就是node,也就是线程B封装的Node节点)
- 返回false,shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()这次执行就算完结了,在acquireQueued里持续循环
- 第二次执行shouldParkAfterFailedAcquire(p, node),此时head节点的waitStatus为SIGNAL,进入第一个if (ws == Node.SIGNAL)条件中,返回true
- 在acquireQueued条件里开始执行第二个办法parkAndCheckInterrupt(),进行阻塞线程操作(在唤醒时查看线程中断状态)
看下parkAndCheckInterrupt源码实现:
private final boolean parkAndCheckInterrupt() {
// 设置以后对象为blocker
LockSupport.park(this);
// 返回中断标记
return Thread.interrupted();
}
这里再揭示下,没理解LockSupport的读者麻烦先去看下笔者之前对LockSupport文章的解说,或者先去Google理解下,AQS通过LockSupport.park(this)来对以后线程进行阻塞,到这里线程B将始终阻塞直到被唤醒,前面开释锁的线程唤醒阻塞线程时笔者会持续从这里再持续进行阐明,此时AQS中状态如下:
线程B阻塞期待获取锁的整个办法执行流程,读者可参考下图进行了解:
同样的,此时线程C执行mutex.lock()获取锁时,读者能够类比下面线程B执行的过程,梳理下线程C的过程,线程C同样获取不到锁,执行流程如下:
- tryAcquire返回false
- addWaiter将以后线程封装成Node节点,更新tail指针,此时tail指向线程C对应的Node
- 执行acquireQueued办法,前一个节点非head节点,间接执行shouldParkAfterFailedAcquire
- 更新前驱节点也就是线程B对应的节点状态为SIGNAL,acquireQueued第一次循环执行完结
- 开始第二次循环操作,进入shouldParkAfterFailedAcquire,返回true,开始执行parkAndCheckInterrupt,通过LockSupport.park阻塞期待
此时同步队列如下图:
unlock
看下开释锁操作,在线程A获取锁之后执行完业务逻辑执行mutex.unlock(),看下调用过程:
mutex.unlock() -> sync.release(1) -> tryRelease(arg) -> unparkSuccessor(h)
代码实现如下:
public void unlock() {
sync.release(1);
}
而后就到AQS的办法了
public final boolean release(int arg) {
// 尝试开释,子类须要实现
if (tryRelease(arg)) {
Node h = head;
// 头节点非空且头节点状态非0
if (h != null && h.waitStatus != 0)
// 唤醒头节点后继节点
unparkSuccessor(h);
return true;
}
return false;
}
子类实现如下:
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0)
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
此时线程A执行流程如下:
- tryRelease(1)尝试开释,进入Sync类实现的tryRelease办法
- 通过setExclusiveOwnerThread将独占线程属性设置为null
- setState将state设置为0,返回true
- head节点非null同时waitStatus非0时通过unparkSuccessor唤醒线程B继续执行
留神,tryRelease只是更新了AQS的相干属性,还未进行阻塞线程的唤醒操作,看下代码逻辑,head节点非null同时waitStatus非0,这里也就是下面线程B尝试获取锁失败后更新了前一个节点也就是head节点的waitStatus的起因,通过head节点的waitStatus来判断是否须要唤醒后继节点
unparkSuccessor入参在线程A执行过程中为head节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 节点状态处于SIGNAL(-1)/CONDITION(-2)/PROPAGATE(-3)时CAS更新状态为0
// 唤醒后继节点,尝试将以后节点置为初始状态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取后继节点
Node s = node.next;
// 节点为null或者处于CANCELLED(1)时
// 从尾节点从后向前进行遍历
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
// 找到无效节点,非CANCELLED(1)节点
if (t.waitStatus <= 0)
s = t;
}
// 非null唤醒节点s的线程
if (s != null)
LockSupport.unpark(s.thread);
}
来看下线程A开释锁的执行流程:
- head的节点waitStatus为SIGNAL,这里就是ws,执行compareAndSetWaitStatus将head节点的waitStatus置为0
- 获取head节点的后继节点,也就是线程B封装成的Node节点
- 判断,第一个条件后继节点为null或者节点处于CANCELLED状态(这个状态也就是勾销持续获取),阐明节点s非无效节点
- 从tail节点向前查找第一个无效节点t,s指向t,这里B线程对应节点是无效节点,不须要执行
- s非null,LockSupport.unpark唤醒s节点线程,这里也就是唤醒线程B
此时线程A曾经执行结束,将锁开释,唤醒线程B继续执行,线程B阻塞期待在parkAndCheckInterrupt
办法中,下面曾经进行了阐明,再次看下这个办法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 中断标识
boolean interrupted = false;
for (;;) {
// 获取node的前驱节点
final Node p = node.predecessor();
// 前驱节点为头节点,同时获取共享资源胜利
if (p == head && tryAcquire(arg)) {
// 设置头节点为node节点
setHead(node);
// 清理p节点
p.next = null; // help GC
failed = false;
return interrupted;
}
// 前驱非头节点或获取共享资源失败则查看并挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 失败勾销节点持续获取资源
if (failed)
cancelAcquire(node);
}
}
线程B被唤醒之后执行流程如下:
- 持续循环,线程B对应节点的前驱节点即head节点,tryAcquire获取到共享资源(锁)胜利
- 设置头节点为线程B对应的节点,这里新设置了head节点,留神下
- 清理原head节点
- 返回是否中断标记,若失败则勾销节点持续获取资源
此时AQS图示如下:
同样的,当线程B执行mutex.unlock()开释锁时,读者能够类比下面线程A执行的过程,梳理下线程B开释锁,线程C被唤醒的过程,执行流程如下:
- tryRelease尝试开释锁,进入Sync类实现的tryRelease办法,独占线程属性设置为null,更新state为0
- head节点非null同时waitStatus非0时执行unparkSuccessor(这里head节点在下面线程B被唤醒后曾经替换成线程B对应的节点了)
- head的节点waitStatus为SIGNAL,这里就是ws,执行compareAndSetWaitStatus将head节点(线程B对应的)的waitStatus置为0
- 获取head节点的后继节点,也就是线程C封装成的Node节点
- 判断这里线程C封装成的Node节点是无效节点
- 非null,LockSupport.unpark唤醒节点对应的线程,这里也就是唤醒线程C
好,到这里持续反复下面的流程,也就是C被唤醒之后获取到锁而后开释的过程,开释过程稍有不同,因为没有后继节点线程须要被唤醒了,这里线程C开释锁的流程也梳理下:
- tryRelease尝试开释锁,进入Sync类实现的tryRelease办法,独占线程属性设置为null,更新state为0
- head节点(线程C对应的节点)非null然而waitStatus=0,不须要执行unparkSuccessor操作,因为没有后继节点线程须要被唤醒,间接返回即可
总结
至此,通过源码作者的独占锁示例,联合简略应用示例,笔者通过图示和外围源码执行过程进行了具体解说阐明,独占锁实现流程层层递进,置信读者应该足够理解AQS了
当然,限于篇幅,笔者未对AQS的所有办法进行阐明,心愿有趣味的同学自行查阅理解,心愿对各位有所帮忙
以上内容如有问题欢送指出,笔者验证后将及时修改,谢谢
发表回复