图解 AQS 原理之 ReentrantLock 详解 - 非公平模式
概述
并发编程中,ReentrantLock
的使用是比较多的,包括之前讲的 LinkedBlockingQueue
和ArrayBlockQueue
的内部都是使用的 ReentrantLock
,谈到它又不能的不说 AQS,AQS 的全称是AbstractQueuedSynchronizer
,这个类也是在java.util.concurrent.locks
下面,提供了一个 FIFO 的队列,可以用于构建锁的基础框架,内部通过原子变量 state
来表示锁的状态,当 state
大于 0 的时候表示锁被占用,如果 state 等于 0 时表示没有占用锁,ReentrantLock
是一个重入锁,表现在 state
上,如果持有锁的线程重复获取锁时,它会将 state
状态进行递增,也就是获得一个信号量,当释放锁时,同时也是释放了信号量,信号量跟随减少,如果上一个线程还没有完成任务,则会进行入队等待操作。
本文分析内容主要是针对 jdk1.8 版本
约束:文中图片的 ref-xxx 代表引用地址
图片中的内容 prve 更正为 prev,由于文章不是一天写的所以有些图片更正了有些没有。
AQS 主要字段
/**
* 头节点指针,通过 setHead 进行修改
*/
private transient volatile Node head;
/**
* 队列的尾指针
*/
private transient volatile Node tail;
/**
* 同步器状态
*/
private volatile int state;
AQS 需要子类实现的方法
AQS 是提供了并发的框架,它内部提供一种机制,它是基于模板方法的实现,整个类中没有任何一个 abstract 的抽象方法,取而代之的是,需要子类去实现的那些方法通过一个方法体抛出 UnsupportedOperationException 异常来让子类知道,告知如果没有实现模板的方法,则直接抛出异常。
方法名 | 方法描述 |
---|---|
tryAcquire | 以独占模式尝试获取锁,独占模式下调用 acquire,尝试去设置 state 的值,如果设置成功则返回,如果设置失败则将当前线程加入到等待队列,直到其他线程唤醒 |
tryRelease | 尝试独占模式下释放状态 |
tryAcquireShared | 尝试在共享模式获得锁,共享模式下调用 acquire,尝试去设置 state 的值,如果设置成功则返回,如果设置失败则将当前线程加入到等待队列,直到其他线程唤醒 |
tryReleaseShared | 尝试共享模式下释放状态 |
isHeldExclusively | 是否是独占模式,表示是否被当前线程占用 |
AQS 是基于 FIFO 队列实现的,那么队列的 Node 节点又是存放的什么呢?
Node 字段信息
字段名 | 类型 | 默认值 | 描述 |
---|---|---|---|
SHARED | Node | new Node() | 一个标识,指示节点使用共享模式等待 |
EXCLUSIVE | Nodel | Null | 一个标识,指示节点使用独占模式等待 |
CANCELLED |
int | 1 | 节点因超时或被中断而取消时设置状态为取消状态 |
SIGNAL |
int | -1 | 当前节点的后节点被 park,当前节点释放时,必须调用 unpark 通知后面节点,当后面节点竞争时,会将前面节点更新为SIGNAL
|
CONDITION |
int | -2 | 标识当前节点已经处于等待中,通过条件进行等待的状态 |
PROPAGATE |
int | -3 | 共享模式下释放节点时设置的状态,被标记为当前状态是表示无限传播下去 |
0 |
int | 不属于上面的任何一种状态 | |
waitStatus | int | 0 | 等待状态,默认初始化为 0,表示正常同步等待, |
pre | Node | Null | 队列中上一个节点 |
next | Node | Null | 队列中下一个节点 |
thread | Thread | Null | 当前 Node 操作的线程 |
nextWaiter | Node | Null | 指向下一个处于阻塞的节点 |
通过上面的内容我们可以看到 waitStatus 其实是有 5 个状态的,虽然这里面 0 并不是什么字段,但是他是 waitStatus 状态的一种,表示不是任何一种类型的字段,上面也讲解了关于 AQS 中子类实现的方法,AQS 提供了独占模式和共享模式两种,但是 ReentrantLock
实现的是独占模式的方式,下面来通过源码的方式解析ReentrantLock
。
ReentrantLock 源码分析
首先在源码分析之前我们先来看一下 ReentrantLock 的类的继承关系,如下图所示:
可以看到 ReentrantLock
继承自 Lock
接口,它提供了一些获取锁和释放锁的方法,以及条件判断的获取的方法,通过实现它来进行锁的控制,它是显示锁,需要显示指定起始位置和终止位置,Lock
接口的方法介绍:
方法名称 | 方法描述 |
---|---|
lock | 用来获取锁,如果锁已被其他线程获取,则进行等待。 |
tryLock | 表示用来尝试获取锁,如果获取成功,则返回 true,如果获取失败(即锁已被其他线程获取),则返回 false,也就说这个方法无论如何都会立即返回。在拿不到锁时不会一直在那等待 |
tryLock(long time, TimeUnit unit) | 和 tryLock()类似,区别在于它在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回 false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回 true |
lockInterruptibly | 获取锁,如果获取锁失败则进行等到,如果等待的线程被中断会相应中断信息。 |
unlock | 释放锁的操作 |
newCondition | 获取 Condition 对象,该组件和当前的锁绑定,当前线程只有获得了锁,才能调用该组件 wait()方法,而调用后,当前线程释放锁。 |
ReentrantLock 也实现了上面接口的内容,前面讲解了很多理论行的内容,接下来我们以一个简单的例子来进行探讨
public class ReentrantLockDemo {public static void main(String[] args) throws Exception {AddDemo runnalbeDemo = new AddDemo();
Thread thread = new Thread(runnalbeDemo::add);
thread.start();
Thread thread1 = new Thread(runnalbeDemo::add);
thread1.start();
Thread.sleep(1000);
System.out.println(runnalbeDemo.getCount());
}
private static class AddDemo {private final AtomicInteger count = new AtomicInteger();
private final ReentrantLock reentrantLock = new ReentrantLock();
private void add() {
try {reentrantLock.lock();
count.getAndIncrement();} finally {// reentrantLock.unlock();
}
}
int getCount() {return count.get();
}
}
}
- 首先声明内部类 AddDemo,AddDemo 的主要作用是将原子变量 count 进行递增的操作
- AddDemo 内部声明了 ReentrantLock 对象进行同步操作
- AddDemo 的 add 方法,进行递增操作,细心地同学发现,使用了 lock 方法获取锁,但是没有释放锁,这里面没有释放锁可以更让我们清晰的分析内部结构的变化。
- 主线程开启了两个线程进行同步进行递增的操作,最后让线程休眠一会输出累加的最后结果。
ReentrantLock
内部提供了两种 AQS 的实现,一种公平模式,一种是非公平模式,如果没有特别指定在构造器中,默认是非公平的模式,我们可以看一下无参的构造函数。
public ReentrantLock() {sync = new NonfairSync();
}
当调用有参构造函数时,指定使用哪种模式来进行操作,参数为布尔类型,如果指定为 false 的话代表非公平模式,如果指定为 true 的话代表的是公平模式,如下所示:
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
我们使用的是非公平模式,后面再来进行分析公平模式,上面也讲到了分为两种模式,这两种模式为 FairSync
和NonfairSync
两个内部静态类不可变类,不能被继承和实例化,这两个类是我们今天分析的重点,为什么说是重点呢,这里讲的内容是有关于 AQS 的,而 FairSync
和NonfairSync
实现了抽象内部类 Sync
,Sync
实现了 AbstractQueuedSynchronizer
这个类,这个类就是我们说的 AQS 也是主要同步操作的类,下面我们来看一下公平模式和非公平模式下类的继承关系,如下图所示:
非公平模式:
公平模式:
通过上面两个继承关系 UML 来看其实无差别,差别在于内部实现的原理不一样,回到上面例子中使用的是非公平模式,那先以非公平模式来进行分析,
假设第一个线程启动调用 AddDemo 的 add 方法时,首先执行的事 reentrantLock.lock()
方法,这个 lock 方法调用了sync.lock()
,sync 就是我们上面提到的两种模式的对象,来看一下源码内容:
public void lock() {sync.lock();
}
内部调用了 sync.lock()
, 其实是调用了NonfairSync
对象的 lock
方法,也就是下面的方法内容。
/**
* 非公平模式锁
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* 执行锁动作,先进行修改状态,如果锁被占用则进行请求申请锁,申请锁失败则将线程放到队列中
*/
final void lock() {if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// 继承自 AQS 的 tryAcquire 方法,尝试获取锁操作,这个方法会被 AQS 的 acquire 调用
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}
}
我们看到 lock
方法首先先对 state
状态进行修改操作,如果锁没有被占用则获取锁,并设置当前线程独占锁资源,如果尝试获取锁失败了,则进行 acqurie
方法的调用,例子中第一个线程当尝试获取锁是内部 state
状态为0
,进行修改操作的时候,发现锁并没有被占用,则获得锁,此时我们来看一下内部变化的情况,如下图所示:
此时只是将 state
的状态更新为 1
,表示锁已经被占用了,独占锁资源的线程是Thread0
,也就是exclusiveOwnerThread
的内容,头节点和尾节点都没有被初始化,当第二个线程尝试去获取锁的时候,发现锁已经被占用了,因为上一个线程并没有释放锁,所以第二线程直接获取锁时获取失败则进入到 acquire
方法中,这个方法是 AbstractQueuedSynchronizer
中的方法acquire
,先来看一下具体的实现源码如下所示:
public final void acquire(int arg) {if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
我个人理解 acquire
方法不间断的尝试获取锁,如果锁没有获取到则现将节点加入到队列中,并将当前线程设置为独占锁资源,也就是独占了锁的意思,别的线程不能拥有锁,然后如果当前节点的前节点是头节点话,再去尝试争抢锁,则设置当前节点为头节点,并将原头节点的下一个节点设置为 null,帮助 GC 回收它,如果不是头节点或争抢锁不成功,则会现将前面节点的状态设置直到设置为 SIGNAL
为止,代表下面有节点被等待了等待上一个线程发来的信号,然后就挂起当前线程。
我们接下来慢慢一步一步的分析,我们先来看一下 NonfairSync
中的tryAcquire
,如下所示:
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}
它调用的是他的父类方法,也就是 ReentrantLock
下Sync
中的 nonfairTryAcquire
方法,这个方法主要就是去申请锁的操作,来看一下具体源码:
final boolean nonfairTryAcquire(int acquires) { // 首先是一个被 final 修饰的方法
final Thread current = Thread.currentThread(); // 获取当前线程
int c = getState(); // 获取 state 的状态值
if (c == 0) { // 如果状态等于 0 代表线程没有被占用
if (compareAndSetState(0, acquires)) { //cas 修改 state 值
setExclusiveOwnerThread(current); // 设置当前线程为独占模式
return true;
}
}
else if (current == getExclusiveOwnerThread()) {// 如果 state 状态不等于 0 则先判断是否是当前线程占用锁,如果是则进行下面的流程。int nextc = c + acquires; // 这个地方就说明重入锁的原理,如果拥有锁的是当前线程,则每次获取锁 state 值都会跟随递增
if (nextc < 0) // overflow // 溢出了
throw new Error("Maximum lock count exceeded");
setState(nextc); // 直接设置 state 值就可以不需要 CAS
return true;
}
return false; // 都不是就返回 false
}
通过源码我们可以看到其实他是有三种操作逻辑:
- 如果
state
为 0,则代表锁没有被占用,尝试去修改 state 状态,并且将当前线程设置为独占锁资源,表示获得锁成功 - 如果
state
大于 0 并且拥有锁的线程和当前申请锁的线程一致,则代表重入了锁,state
值会进行递增,表示获得锁成功 - 如果
state
大于 0 并且拥有锁的线程和当前申请锁的线程不一致则直接返回 false,代表申请锁失败
当第二个线程去争抢锁的时候,state 值已经设置为 1 了也就是已经被第一个线程占用了锁,所以这里它会返回 false,而通过 acquire
方法内容可以看到 if 语句中是 !tryAcquire(arg)
,也就是!false=ture
,它会进行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
方法,这个方法里面又有一个 addWaiter
方法,从方法语义上能看到是添加等待队列的操作,方法的参数代表的是模式,Node.EXCLUSIVE
表示的是在独占模式下等待,我们先来看一下 addWaiter
里面是如何进行操作,如下所示:
private Node addWaiter(Node mode) {
// 首先生成当前线程拥有的节点
Node node = new Node(Thread.currentThread(), mode);
// 下面的内容是尝试快速进行插入末尾的操作,在没有其他线程同时操作的情况
Node pred = tail; // 获取尾节点
if (pred != null) { // 尾节点不为空,代表队列不为空
node.prev = pred; // 尾节点设置为当前节点的前节点
if (compareAndSetTail(pred, node)) { // 修改尾节点为当前节点
pred.next = node; // 原尾节点的下一个节点设置为当前节点
return node; // 返回 node 节点
}
}
enq(node); // 如果前面入队失败,这里进行循环入队操作,直到入队成功
return node;
}
前面代码中可以看到,它有一个快速入队的操作,如果快速入队失败则进行死循环进行入队操作,当然我们上面例子中发现队列其实是为空的,也就是 pred==null,不能进行快速入队操作,则进入到 enq
进行入队操作,下面看一下 enq
方法实现,如下所示:
private Node enq(final Node node) {for (;;) { // 死循环进行入队操作,直到入队成功
Node t = tail; // 获取尾节点
if (t == null) { // Must initialize // 判断尾节点为空,则必须先进行初始化
if (compareAndSetHead(new Node())) // 生成一个 Node,并将当前 Node 作为头节点
tail = head; //head 和 tail 同时指向上面 Node 节点
} else {
node.prev = t; // 设置入队的当前节点的前节点设置为尾节点
if (compareAndSetTail(t, node)) { // 将当前节点设置为尾节点
t.next = node; // 修改原有尾节点的下一个节点为当前节点
return t; // 返回最新的节点
}
}
}
}
通过上面入队操作,可以清晰的了解入队操作其实就是 Node 节点的 prev 节点和 next 节点之前的引用,运行到这里我们应该能看到入队的状态了,如下图所示:
如上图可以清晰的看到,此时拥有锁的线程是 Thread0,而当前线程是 Threa1,头节点为初始化的节点,Ref-707
引用地址所在的 Node 节点操作当前操作的节点信息,入队操作后并没有完成,而是继续往下进行,此时则进行 acquireQueued
这个方法,这个方法是不间断的去获取已经入队队列中的前节点的状态,如果前节点的状态为大于 0,则代表当前节点被取消了,会一直往前面的节点进行查找,如果节点状态小于 0 并且不等于 SIGNAL
则将其设置为 SIGNAL
状态,设置成功后将当前线程挂起,挂起线程后也有可能会反复唤醒挂起操作,原因后面会讲到。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // 取消节点标志位
try {
boolean interrupted = false; // 中断标志位
for (;;) {final Node p = node.predecessor(); // 获取前节点
if (p == head && tryAcquire(arg)) { // 这里的逻辑是如果前节点为头结点并且获取到锁则进行头结点变换
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 设置 waitStatus 状态
parkAndCheckInterrupt()) // 挂起线程
interrupted = true;
}
} finally {if (failed)
cancelAcquire(node); // 取消操作
}
}
前面的源码可以看到它在 acquireQueued
中对已经入队的节点进行尝试锁的获取,如果锁获得就修改头节点的指针,如果不是头节点或者争抢锁失败时,此时会进入到 shouldParkAfterFailedAcquire
方法,这个方法是获取不到锁时需要停止继续无限期等待锁,其实就是内部的操作逻辑也很简单,就是如果前节点状态为 0
时,需要将前节点修改为 SIGNAL
,如果前节点大于0
则代表前节点已经被取消了,应该移除队列,并将前前节点作为当前节点的前节点,一直循环直到前节点状态修改为 SIGNAL
或者前节点被释放锁,当前节点获取到锁停止循环。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 此节点已经设置了状态,要求对当前节点进行挂起操作
*/
return true;
if (ws > 0) {
/*
* 如果前节点被取消,则将取消节点移除队列操作
*/
do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus= 0 或者 PROPAGATE 时,表示当前节点还没有被挂起停止,需要等待信号来通知节点停止操作。*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
上面的方法其实很容易理解就是等待挂起信号,如果前节点的状态为 0 或 PROPAGATE 则将前节点修改为 SIGNAL
,则代表后面前节点释放锁后会通知下一个节点,也就是说唤醒下一个可以唤醒的节点继续争抢所资源,如果前节点被取消了那就继续往前寻找不是被取消的节点,这里不会找到前节点为 null 的情况,因为它默认会有一个空的头结点,也就是上图内容,此时的队列状态是如何的我们看一下,这里它会进来两次,以为我们上图可以看到当前节点前节点是Ref-724
此时 waitStatus=0
,他需要先将状态更改为SIGNAL
也就是运行最有一个 else 语句,此时又会回到外面的 for 循环中,由于方法返回的是 false 则不会运行 parkAndCheckInterrupt
方法,而是又循环了一次,此时发现当前节点争抢锁又失败了,然后此时队列的状态如下图所示:
再次进入到方法之后发现前驱节点的 waitStatus=-1,表示当前节点需要进行挂起等到,此时返回的结果是 true,则会运行 parkAndCheckInterrupt
方法,这个方法很简单就是将当前线程进行挂起操作,如下所示:
private final boolean parkAndCheckInterrupt() {LockSupport.park(this); // 挂起线程
return Thread.interrupted(); // 判断是否被中断,获取中断标识}
park
挂起线程并且响应中断信息,其实我们从这里就能发现一个问题,Thread.interrupted 方法是用来获取是否被中断的标志,如果被中断则返回 true,如果没有被中断则返回 false,当当前节点被中断后,其实就会返回 true,返回 true 这里并没有结束,而是跳到调用地方,也就是 acquireQueued
方法内部:
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
以一个案例来进行分析:
public class ReentrantLockDemo {public static void main(String[] args) throws Exception {AddDemo runnalbeDemo = new AddDemo();
Thread thread = new Thread(runnalbeDemo::add);
thread.start();
Thread thread1 = new Thread(runnalbeDemo::add);
thread1.start();
Thread thread2 = new Thread(runnalbeDemo::add);
thread2.start();
Thread.sleep(10000);
thread1.interrupt();
System.out.println(runnalbeDemo.getCount());
}
private static class AddDemo {private final AtomicInteger count = new AtomicInteger();
private final ReentrantLock reentrantLock = new ReentrantLock();
private final Condition condition = reentrantLock.newCondition();
private void add() {
try {reentrantLock.lock();
count.getAndIncrement();} finally {// reentrantLock.unlock();
}
}
int getCount() {return count.get();
}
}
}
通过上面的例子可以发现,thread1 调用中断方法 interrupt(),当调用第一次方法的时候,它会进入到 parkAndCheckInterrupt
方法,然后线程响应中断,最后返回 true,最后返回到 acquireQueued
方法内部,整个 if 语句为 true,则开始设置 interrupted=true,仅仅是设置了等于 true,但是这离还会进入下一轮的循环,假如说上次的线程没有完成任务,则没有获取到锁,还是会进入到 shouldParkAfterFailedAcquire
由于已经修改了上一个节点的 waitStatus=-1,直接返回 true,然后再进入到 parkAndCheckInterrupt
又被挂起线程,但是如果上步骤操作他正抢到锁,则会返回 ture,外面也会清除中断标志位,从这里可以清楚地看到 acquire
方法是一个不间断获得锁的操作,可能重复阻塞和解除阻塞操作。
上面阻塞队列的内容已经讲完了,接下来我们看一下 unlock 都为我们做了什么工作:
public void unlock() {sync.release(1);
}
我们可以看到他直接调用了独占模式的 release
方法,看一下具体源码:
public final boolean release(int arg) {if (tryRelease(arg)) { // 调用 ReentrantLock 中的 Sync 里面的 tryRelease 方法
Node h = head; // 获取头节点
if (h != null && h.waitStatus != 0) // 头节点不为空且状态不为 0 时进行 unpark 方法
unparkSuccessor(h); // 唤醒下一个未被取消的节点
return true;
}
return false;
}
release 方法,首先先进行尝试去释放锁,如果释放锁仍然被占用则直接返回 false,如果尝试释放锁时,发现锁已经释放,当前线程不在占用锁资源时,则会进入的下面进行一些列操作后返回 true,接下来我们先来看一下 ReentrantLock
的Sync
下的 tryRelease
方法,如下所示:
protected final boolean tryRelease(int releases) {int c = getState() - releases; // 获取 state 状态,标志信息减少 1
if (Thread.currentThread() != getExclusiveOwnerThread()) // 线程不一致抛出异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 是否已经释放锁,start= 0 代表已经释放锁
free = true; // 将标志 free 设置为 true
setExclusiveOwnerThread(null); // 取消独占锁信息
}
setState(c); // 设置锁标志信息
return free;
}
看上面的源码,表示首先先获取 state
状态,如果 state
状态减少 1 之后和 0 不相等则代表有重入锁,则表示当前线程还在占用所资源,直到线程释放锁返回 ture 标识,还是以上例子为主(此时 AddDemo
中的 unlock
不在被注释),分析其现在的队列中的状态
释放锁后,进入到 if 语句中,判断当前头节点不为空且 waitStatus!=0
,通过上图也可以发现头节点为 -1,则进入到unparkSuccessor
方法内:
private void unparkSuccessor(Node node) {
/*
* 获取节点的 waitStatus 状态
*/
int ws = node.waitStatus;
// 如果小于 0 则设置为 0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 唤醒下一个节点,唤醒下一个节点之前需要判断节点是否存在或已经被取消了节点,如果没有节点则不需唤醒操作,如果下一个节点被取消了则一直一个没有被取消的节点。*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
可以看到它是现将头节点的状态更新为 0,然后再唤醒下一个节点,如果下一个节点为空则直接返回不唤醒任何节点,如果下一个节点被取消了,那么它会从尾节点往前进行遍历,遍历与头节点最近的没有被取消的节点进行唤醒操作,在唤醒前看一下队列状态:
然后唤醒节点后他会进入到 parkAndCheckInterrupt
方法里面,再次去执行下面的方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; // 取消节点标志位
try {
boolean interrupted = false; // 中断标志位
for (;;) {final Node p = node.predecessor(); // 获取前节点
if (p == head && tryAcquire(arg)) { // 这里的逻辑是如果前节点为头结点并且获取到锁则进行头结点变换
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 设置 waitStatus 状态
parkAndCheckInterrupt()) // 挂起线程
interrupted = true;
}
} finally {if (failed)
cancelAcquire(node); // 取消操作
}
}
此时获取 p ==head 成立,并且可以正抢到所资源,所以它会进入到循环体内,进行设置头结点为当前节点,前节点的下一个节点设置为 null,返回中断标志,看一下此时队列情况,如下图所示:
AbstractQueuedSynchronizer
的独占模式其实提供了三种不同的形式进行获取锁操作,看一下下表所示:
方法名称 | 方法描述 | 对应调用的内部方法 |
---|---|---|
acquire | 以独占模式进行不间断的获取锁 | tryAcquire,acquireQueued |
acquireInterruptibly | 以独占模式相应中断的方式获取锁,发生中断抛出异常 | tryAcquire,doAcquireInterruptibly |
tryAcquireNanos | 以独占模式相应中断的方式并且在指定时间内获取锁,会阻塞一段时间,如果还未获得锁直接返回,发生中断抛出异常 | tryAcquire,doAcquireNanos |
通过上面图可以发现,他都会调用图表一中需要用户实现的方法,ReentrantLock
实现了独占模式则内部实现的是 tryAcquire
和tryRelease
方法,用来尝试获取锁和尝试释放锁的操作,其实上面内容我们用的是 ReentrantLock
中的 lock
方法作为同步器,细心的朋友会发现,这个 lock
,方法是 ReentrantLock 实现的,它内部调用了acquire
方法,实现了不间断的获取锁机制,ReentrantLock
中还有一个 lockInterruptibly
方法,它内部直接调用的是 AbstractQueuedSynchronizer
的acquireInterruptibly
方法,两个之间的区别在于,两者都会相应中断信息,前者不会做任何处理还会进入等待状态,而后者则抛出异常终止操作,
这里为了详细看清楚它内部关系我这里用张图来进行阐述,如下所示:
- 左侧代表的事 ReentrantLock,右侧代表的 AQS
- 左侧内部黄色区域代表
NonfairSync
- 图中 1 和 2 代表 AQS 调用其他方法的过程
接下来我们来看一下源码信息:
public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);
}
发现他调用的 Sync
类中的 acquireInterruptibly
方法,但其实这个方法是 AQS 中的方法,源码如下所示:
public final void acquireInterruptibly(int arg)
throws InterruptedException {if (Thread.interrupted()) // 判断线程是否被中断
throw new InterruptedException(); // 中断则抛出异常
if (!tryAcquire(arg)) // 尝试获取锁
doAcquireInterruptibly(arg); // 进行添加队列,并且修改前置节点状态,且响应中断抛出异常
}
通过上面的源码,它也调用了子类实现的 tryAcquire
方法,这个方法和我们上文提到的 tryAcquire
是一样,ReentrantLock
下的 NonfairSync
下的 tryAcquire
方法,这里这个方法就不多说了详细请看上文内容,这里主要讲一下 doAcquireInterruptibly
这个方法:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE); // 将节点添加到队列尾部
boolean failed = true; // 失败匹配机制
try {for (;;) {final Node p = node.predecessor(); // 获取前节点
if (p == head && tryAcquire(arg)) { // 如果前节点为头节点并且获得了锁
setHead(node); // 设置当前节点为头节点
p.next = null; // help GC // 头节点的下一个节点设置为 null
failed = false; // 匹配失败变为 false
return;
}
if (shouldParkAfterFailedAcquire(p, node) && // 将前节点设置为 -1,如果前节点为取消节点则往前一直寻找直到修改为 - 1 为止。parkAndCheckInterrupt()) // 挂起线程返回是否中断
throw new InterruptedException();}
} finally {if (failed)
cancelAcquire(node);
}
}
其实这个方法和 acquireQueued
区别在于以下几点:
-
acquireQueued
是在方法内部添加节点到队列尾部,而doAcquireInterruptibly
是在方法内部进行添加节点到尾部,这个区别点并不是很重要 - 重点是
acquireQueued
响应中断,但是他不会抛出异常,而后者会抛出异常 throw new InterruptedException()
分析到这里我们来用前面的例子来进行模拟一下中中断的操作,详细代码如下所示:
public class ReentrantLockDemo {public static void main(String[] args) throws Exception {AddDemo runnalbeDemo = new AddDemo();
Thread thread = new Thread(runnalbeDemo::add);
thread.start();
Thread.sleep(500);
Thread thread1 = new Thread(runnalbeDemo::add);
thread1.start();
Thread.sleep(500);
Thread thread2 = new Thread(runnalbeDemo::add);
thread2.start();
Thread.sleep(500);
Thread thread3 = new Thread(runnalbeDemo::add);
thread3.start();
Thread.sleep(10000);
thread1.interrupt();
System.out.println(runnalbeDemo.getCount());
}
private static class AddDemo {private final AtomicInteger count = new AtomicInteger();
private final ReentrantLock reentrantLock = new ReentrantLock();
private final Condition condition = reentrantLock.newCondition();
private void add() {
try {reentrantLock.lockInterruptibly();
count.getAndIncrement();} catch (Exception ex) {System.out.println("线程被中断了");
} finally {// reentrantLock.unlock();
}
}
int getCount() {return count.get();
}
}
}
上面的例子其实和前面提到的例子没有什么太大的差别主要的差别是将 lock
替换为lockInterruptibly
,其次就是在三个线程后面讲线程 1 进行中断操作,这里入队的操作不在多说,因为操作内容和上面大致相同,下面是四个个线程操作完成的状态信息:
如果线程等待的过程中抛出异常,则当前线程进入到 finally 中的时候 failed 为 true,因为修改该字段只有获取到锁的时候才会修改为 false,进来之后它会运行 cancelAcquire
来进行取消当前节点,下面我们先来分析下源码内容:
private void cancelAcquire(Node node) {
// 如果节点为空直接返回,节点不存在直接返回
if (node == null)
return;
// 设置节点所在的线程为空,清除线程操作
node.thread = null;
// 获取当前节点的前节点
Node pred = node.prev;
// 如果前节点是取消节点则跳过前节点,一直寻找一个不是取消节点为止
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取头节点下一个节点
Node predNext = pred.next;
// 这里直接设置为取消节点状态,没有使用 CAS 原因是因为直接设置只有其他线程可以跳过取消的节点
node.waitStatus = Node.CANCELLED;
// 如果当前节点为尾节点,并且设置尾节点为找到的合适的前节点时,修改前节点的下一个节点为 null
if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);
} else {
// 如果不是尾节点,则说明是中间节点,则需要通知后续节点,嘿,伙计你被唤醒了。int ws;
if (pred != head && // 前节点不是头结点
((ws = pred.waitStatus) == Node.SIGNAL || // 前节点的状态为 SIGNAL
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) // 或者前节点状态小于 0 而且修改前节点状态为 SIGNAL 成功
&& pred.thread != null) { // 前节点线程不为空
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 唤醒下一个不是取消的节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
- 首先找到当前节点的前节点,如果前节点为取消节点则一直往前寻找一个节点。
- 取消的是尾节点,则直接将前节点的下一个节点设置为 null
- 如果取消的是头节点的下一个节点,且不是尾节点的情况时,它是唤醒下一个节点,唤醒之前并没有将其移除队列,而是在唤醒下一个节点的时候,
shouldParkAfterFailedAcquire
里面将取消的节点移除队列,唤醒之后,当前节点的下一个节点也设置成自己,帮助 GC 回收它。 - 如果取消节点是中间的节点,则直接将其前节点的下一个节点设置为取消节点的下下个节点即可。
第一种情况如果我们取消的节点是前节点是头节点,此时线程 1 的节点应该是被中断操作,此时进入到 cancelAcquire
之后会进入 else 语句中,然后进去到 unparkSuccessor
方法,当进入到这个方法之前我们看一下状态变化:
我们发现线程 1 的 Node 节点的 waitStatus 变为 1 也就是 Node.CANCELLED
节点,然后运行 unparkSuccessor
方法,该方法上面就已经讲述了其中的源码,这里就不在贴源码了,就是要唤醒下一个没有被取消的节点,这里是 Ref-695
这个线程,当 Ref-695
被唤醒之后它会继续运行下面的内容:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {for (;;) {final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 再一次循环发现还是没有争抢到锁
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) && // 再一次循环之后有运行到这里了
parkAndCheckInterrupt()) // 这里被唤醒了,又要进行循环操作了
throw new InterruptedException();}
} finally {if (failed)
cancelAcquire(node);
}
}
发现再一次循环操作后,还是没有正抢到锁,这时候还是会运行 shouldParkAfterFailedAcquire
方法,这个方法内部发现前节点的状态是 Node.CANCELLED
这时候它会在内部先将节点给干掉,也就是这个代码:
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
pred.next = node;
}
最后还是会被挂起状态,因为没有释放锁操作,最后移除的节点如下所示:
如果取消的事尾节点,也就是线程 3 被中断操作,这个是比较简单的直接将尾节点删除即可,其中会走如下代码:
if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);
}
如果取消的节点是中间的节点,通过上例子中则是取消线程 2,其实它内部只是将线程取消线程的前节点的下一个节点指向了取消节点的下节点,如下图所示:
结束语
这章节分析的主要是 ReentrantLock
的内部原理,本来公平模式和非公平模式想放在一起来写,无奈发现篇幅有点长了,所以就分开进行写,这样读取来不会那么费劲,内部还有条件内容等待下章节分析,如果有分析不到位的请大家指正。