深入分析AQS实现原理

51次阅读

共计 11807 个字符,预计需要花费 30 分钟才能阅读完成。

简单解释一下 J.U.C,是 JDK 中提供的并发工具包,java.util.concurrent。里面提供了很多并发编程中很常用的实用工具类,比如 atomic 原子操作、比如 lock 同步锁、fork/join 等。
从 Lock 作为切入点
我想以 lock 作为切入点来讲解 AQS,毕竟同步锁是解决线程安全问题的通用手段,也是我们工作中用得比较多的方式。
Lock API
Lock 是一个接口,方法定义如下
void lock() // 如果锁可用就获得锁,如果锁不可用就阻塞直到锁释放
void lockInterruptibly() // 和 lock()方法相似, 但阻塞的线程可中断,抛出 java.lang.InterruptedException 异常
boolean tryLock() // 非阻塞获取锁; 尝试获取锁,如果成功返回 true
boolean tryLock(long timeout, TimeUnit timeUnit) // 带有超时时间的获取锁方法
void unlock() // 释放锁
Lock 的实现
实现 Lock 接口的类有很多,以下为几个常见的锁实现

ReentrantLock:表示重入锁,它是唯一一个实现了 Lock 接口的类。重入锁指的是线程在获得锁之后,再次获取该锁不需要阻塞,而是直接关联一次计数器增加重入次数
ReentrantReadWriteLock:重入读写锁,它实现了 ReadWriteLock 接口,在这个类中维护了两个锁,一个是 ReadLock,一个是 WriteLock,他们都分别实现了 Lock 接口。读写锁是一种适合读多写少的场景下解决线程安全问题的工具,基本原则是:读和读不互斥、读和写互斥、写和写互斥。也就是说涉及到影响数据变化的操作都会存在互斥。
StampedLock:stampedLock 是 JDK8 引入的新的锁机制,可以简单认为是读写锁的一个改进版本,读写锁虽然通过分离读和写的功能使得读和读之间可以完全并发,但是读和写是有冲突的,如果大量的读线程存在,可能会引起写线程的饥饿。stampedLock 是一种乐观的读策略,使得乐观锁完全不会阻塞写线程

ReentrantLock 的简单实用
如何在实际应用中使用 ReentrantLock 呢?我们通过一个简单的 demo 来演示一下
public class Demo {
private static int count=0;
static Lock lock=new ReentrantLock();
public static void inc(){
lock.lock();
try {
Thread.sleep(1);
count++;
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
这段代码主要做一件事,就是通过一个静态的 incr()方法对共享变量 count 做连续递增,在没有加同步锁的情况下多线程访问这个方法一定会存在线程安全问题。所以用到了 ReentrantLock 来实现同步锁,并且在 finally 语句块中释放锁。那么我来引出一个问题,大家思考一下
多个线程通过 lock 竞争锁时,当竞争失败的锁是如何实现等待以及被唤醒的呢?
什么是 AQS
aqs 全称为 AbstractQueuedSynchronizer,它提供了一个 FIFO 队列,可以看成是一个用来实现同步锁以及其他涉及到同步功能的核心组件,常见的有:ReentrantLock、CountDownLatch 等。AQS 是一个抽象类,主要是通过继承的方式来使用,它本身没有实现任何的同步接口,仅仅是定义了同步状态的获取以及释放的方法来提供自定义的同步组件。可以这么说,只要搞懂了 AQS,那么 J.U.C 中绝大部分的 api 都能轻松掌握。
AQS 的两种功能
从使用层面来说,AQS 的功能分为两种:独占和共享

独占锁,每次只能有一个线程持有锁,比如前面给大家演示的 ReentrantLock 就是以独占方式实现的互斥锁
共享锁,允许多个线程同时获取锁,并发访问共享资源,比如 ReentrantReadWriteLock

ReentrantLock 的类图
仍然以 ReentrantLock 为例,来分析 AQS 在重入锁中的使用。毕竟单纯分析 AQS 没有太多的含义。先理解这个类图,可以方便我们理解 AQS 的原理
AQS 的内部实现
AQS 的实现依赖内部的同步队列, 也就是 FIFO 的双向队列,如果当前线程竞争锁失败,那么 AQS 会把当前线程以及等待状态信息构造成一个 Node 加入到同步队列中,同时再阻塞该线程。当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。
AQS 队列内部维护的是一个 FIFO 的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点。所以双向链表可以从任意一个节点开始很方便的访问前驱和后继。每个 Node 其实是由线程封装,当线程争抢锁失败后会封装成 Node 加入到 ASQ 队列中去
Node 类的组成如下
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev; // 前驱节点
volatile Node next; // 后继节点
volatile Thread thread;// 当前线程
Node nextWaiter; // 存储在 condition 队列中的后继节点
// 是否为共享锁
final boolean isShared() {
return nextWaiter == SHARED;
}

final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}
// 将线程构造成一个 Node,添加到等待队列
Node(Thread thread, Node mode) {// Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 这个方法会在 Condition 队列使用,后续单独写一篇文章分析 condition
Node(Thread thread, int waitStatus) {// Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
释放锁以及添加线程对于队列的变化
添加节点
当出现锁竞争以及释放锁的时候,AQS 同步队列中的节点会发生变化,首先看一下添加节点的场景。这里会涉及到两个变化

新的线程封装成 Node 节点追加到同步队列中,设置 prev 节点以及修改当前节点的前置节点的 next 节点指向自己
通过 CAS 讲 tail 重新指向新的尾部节点

释放锁移除节点
head 节点表示获取锁成功的节点,当头结点在释放同步状态时,会唤醒后继节点,如果后继节点获得锁成功,会把自己设置为头结点,节点的变化过程如下这个过程也是涉及到两个变化

修改 head 节点指向下一个获得锁的节点
新的获得锁的节点,将 prev 的指针指向 null

这里有一个小的变化,就是设置 head 节点不需要用 CAS,原因是设置 head 节点是由获得锁的线程来完成的,而同步锁只能由一个线程获得,所以不需要 CAS 保证,只需要把 head 节点设置为原首节点的后继节点,并且断开原 head 节点的 next 引用即可
AQS 的源码分析
清楚了 AQS 的基本架构以后,我们来分析一下 AQS 的源码,仍然以 ReentrantLock 为模型。
ReentrantLock 的时序图
调用 ReentrantLock 中的 lock()方法,源码的调用过程我使用了时序图来展现从图上可以看出来,当锁获取失败时,会调用 addWaiter()方法将当前线程封装成 Node 节点加入到 AQS 队列,基于这个思路,我们来分析 AQS 的源码实现
分析源码
ReentrantLock.lock()
public void lock() {
sync.lock();
}
这个是获取锁的入口,调用 sync 这个类里面的方法,sync 是什么呢?
abstract static class Sync extends AbstractQueuedSynchronizer
sync 是一个静态内部类,它继承了 AQS 这个抽象类,前面说过 AQS 是一个同步工具,主要用来实现同步控制。我们在利用这个工具的时候,会继承它来实现同步控制功能。通过进一步分析,发现 Sync 这个类有两个具体的实现,分别是 NofairSync(非公平锁),FailSync(公平锁).

公平锁 表示所有线程严格按照 FIFO 来获取锁
非公平锁 表示可以存在抢占锁的功能,也就是说不管当前队列上是否存在其他线程等待,新线程都有机会抢占锁

公平锁和非公平锁的实现上的差异,我会在文章后面做一个解释,接下来的分析仍然以非公平锁作为主要分析逻辑。
NonfairSync.lock
final void lock() {
if (compareAndSetState(0, 1)) // 通过 cas 操作来修改 state 状态,表示争抢锁的操作
setExclusiveOwnerThread(Thread.currentThread());// 设置当前获得锁状态的线程
else
acquire(1); // 尝试去获取锁
}
这段代码简单解释一下

由于这里是非公平锁,所以调用 lock 方法时,先去通过 cas 去抢占锁
如果抢占锁成功,保存获得锁成功的当前线程
抢占锁失败,调用 acquire 来走锁竞争逻辑

compareAndSetStatecompareAndSetState 的代码实现逻辑如下
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
这段代码其实逻辑很简单,就是通过 cas 乐观锁的方式来做比较并替换。上面这段代码的意思是,如果当前内存中的 state 的值和预期值 expect 相等,则替换为 update。更新成功返回 true,否则返回 false.
这个操作是原子的,不会出现线程安全问题,这里面涉及到 Unsafe 这个类的操作,一级涉及到 state 这个属性的意义。
**state**

当 state= 0 时,表示无锁状态
当 state>0 时,表示已经有线程获得了锁,也就是 state=1,但是因为 ReentrantLock 允许重入,所以同一个线程多次获得同步锁的时候,state 会递增,比如重入 5 次,那么 state=5。而在释放锁的时候,同样需要释放 5 次直到 state= 0 其他线程才有资格获得锁

private volatile int state;
需要注意的是:不同的 AQS 实现,state 所表达的含义是不一样的。UnsafeUnsafe 类是在 sun.misc 包下,不属于 Java 标准。但是很多 Java 的基础类库,包括一些被广泛使用的高性能开发库都是基于 Unsafe 类开发的,比如 Netty、Hadoop、Kafka 等;Unsafe 可认为是 Java 中留下的后门,提供了一些低层次操作,如直接内存访问、线程调度等
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
这个是一个 native 方法,第一个参数为需要改变的对象,第二个为偏移量(即之前求出来的 headOffset 的值),第三个参数为期待的值,第四个为更新后的值整个方法的作用是如果当前时刻的值等于预期值 var4 相等,则更新为新的期望值 var5,如果更新成功,则返回 true,否则返回 false;

acquire
acquire 是 AQS 中的方法,如果 CAS 操作未能成功,说明 state 已经不为 0,此时继续 acquire(1)操作, 这里大家思考一下,acquire 方法中的 1 的参数是用来做什么呢?如果没猜中,往前面回顾一下 state 这个概念
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这个方法的主要逻辑是

通过 tryAcquire 尝试获取独占锁,如果成功返回 true,失败返回 false
如果 tryAcquire 失败,则会通过 addWaiter 方法将当前线程封装成 Node 添加到 AQS 队列尾部
acquireQueued,将 Node 作为参数,通过自旋去尝试获取锁。

如果大家看过我写的 Synchronized 源码分析的文章,就应该能够明白自旋存在的意义
NonfairSync.tryAcquire
这个方法的作用是尝试获取锁,如果成功返回 true,不成功返回 false 它是重写 AQS 类中的 tryAcquire 方法,并且大家仔细看一下 AQS 中 tryAcquire 方法的定义,并没有实现,而是抛出异常。按照一般的思维模式,既然是一个不实现的模版方法,那应该定义成 abstract,让子类来实现呀?大家想想为什么
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
nonfairTryAcquire
tryAcquire(1)在 NonfairSync 中的实现代码如下
ffinal boolean nonfairTryAcquire(int acquires) {
// 获得当前执行的线程
final Thread current = Thread.currentThread();
int c = getState(); // 获得 state 的值
if (c == 0) {//state= 0 说明当前是无锁状态
// 通过 cas 操作来替换 state 的值改为 1,大家想想为什么要用 cas 呢?
// 理由是,在多线程环境中,直接修改 state= 1 会存在线程安全问题,你猜到了吗?
if (compareAndSetState(0, acquires)) {
// 保存当前获得锁的线程
setExclusiveOwnerThread(current);
return true;
}
}
// 这段逻辑就很简单了。如果是同一个线程来获得锁,则直接增加重入次数
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; // 增加重入次数
if (nextc < 0) // overflow
throw new Error(“Maximum lock count exceeded”);
setState(nextc);
return true;
}
return false;
}

获取当前线程,判断当前的锁的状态
如果 state= 0 表示当前是无锁状态,通过 cas 更新 state 状态的值
如果当前线程是属于重入,则增加重入次数

addWaiter
当 tryAcquire 方法获取锁失败以后,则会先调用 addWaiter 将当前线程封装成 Node,然后添加到 AQS 队列
private Node addWaiter(Node mode) {//mode=Node.EXCLUSIVE
// 将当前线程封装成 Node,并且 mode 为独占锁
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// tail 是 AQS 的中表示同步队列队尾的属性,刚开始为 null,所以进行 enq(node)方法
Node pred = tail;
if (pred != null) {//tail 不为空的情况,说明队列中存在节点数据
node.prev = pred; // 讲当前线程的 Node 的 prev 节点指向 tail
if (compareAndSetTail(pred, node)) {// 通过 cas 讲 node 添加到 AQS 队列
pred.next = node;//cas 成功,把旧的 tail 的 next 指针指向新的 tail
return node;
}
}
enq(node); //tail=null,将 node 添加到同步队列中
return node;
}

将当前线程封装成 Node
判断当前链表中的 tail 节点是否为空,如果不为空,则通过 cas 操作把当前线程的 node 添加到 AQS 队列
如果为空或者 cas 失败,调用 enq 将节点添加到 AQS 队列

enq
enq 就是通过自旋操作把当前节点加入到队列中
private Node enq(final Node node) {
// 自旋,不做过多解释,不清楚的关注公众号[架构师修炼宝典]
for (;;) {
Node t = tail; // 如果是第一次添加到队列,那么 tail=null
if (t == null) {// Must initialize
//CAS 的方式创建一个空的 Node 作为头结点
if (compareAndSetHead(new Node()))
// 此时队列中只一个头结点,所以 tail 也指向它
tail = head;
} else {
// 进行第二次循环时,tail 不为 null,进入 else 区域。将当前线程的 Node 结点的 prev 指向 tail,然后使用 CAS 将 tail 指向 Node
node.prev = t;
if (compareAndSetTail(t, node)) {
// t 此时指向 tail, 所以可以 CAS 成功,将 tail 重新指向 Node。此时 t 为更新前的 tail 的值,即指向空的头结点,t.next=node,就将头结点的后续结点指向 Node,返回头结点
t.next = node;
return t;
}
}
}
}
假如有两个线程 t1,t2 同时进入 enq 方法,t==null 表示队列是首次使用,需要先初始化另外一个线程 cas 失败,则进入下次循环,通过 cas 操作将 node 添加到队尾
到目前为止,通过 addwaiter 方法构造了一个 AQS 队列,并且将线程添加到了队列的节点中
acquireQueued
将添加到队列中的 Node 作为参数传入 acquireQueued 方法,这里面会做抢占锁的操作
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();// 获取 prev 节点, 若为 null 即刻抛出 NullPointException
if (p == head && tryAcquire(arg)) {// 如果前驱为 head 才有资格进行锁的抢夺
setHead(node); // 获取锁成功后就不需要再进行同步操作了, 获取锁成功的线程作为新的 head 节点
// 凡是 head 节点,head.thread 与 head.prev 永远为 null, 但是 head.next 不为 null
p.next = null; // help GC
failed = false; // 获取锁成功
return interrupted;
}
// 如果获取锁失败,则根据节点的 waitStatus 决定是否需要挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())// 若前面为 true, 则执行挂起, 待下次唤醒的时候检测中断的标志
interrupted = true;
}
} finally {
if (failed) // 如果抛出异常则取消锁的获取, 进行出队 (sync queue) 操作
cancelAcquire(node);
}
}

获取当前节点的 prev 节点
如果 prev 节点为 head 节点,那么它就有资格去争抢锁,调用 tryAcquire 抢占锁
抢占锁成功以后,把获得锁的节点设置为 head,并且移除原来的初始化 head 节点
如果获得锁失败,则根据 waitStatus 决定是否需要挂起线程
最后,通过 cancelAcquire 取消获得锁的操作

前面的逻辑都很好理解,主要看一下 shouldParkAfterFailedAcquire 这个方法和 parkAndCheckInterrupt 的作用
shouldParkAfterFailedAcquire
从上面的分析可以看出,只有队列的第二个节点可以有机会争用锁,如果成功获取锁,则此节点晋升为头节点。对于第三个及以后的节点,if (p == head)条件不成立,首先进行 shouldParkAfterFailedAcquire(p, node)操作 shouldParkAfterFailedAcquire 方法是判断一个争用锁的线程是否应该被阻塞。它首先判断一个节点的前置节点的状态是否为 Node.SIGNAL,如果是,是说明此节点已经将状态设置 - 如果锁释放,则应当通知它,所以它可以安全的阻塞了,返回 true。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 前继节点的状态
if (ws == Node.SIGNAL)// 如果是 SIGNAL 状态,意味着当前线程需要被 unpark 唤醒
return true;
如果前节点的状态大于 0,即为 CANCELLED 状态时,则会从前节点开始逐步循环找到一个没有被“CANCELLED”节点设置为当前节点的前节点,返回 false。在下次循环执行 shouldParkAfterFailedAcquire 时,返回 true。这个操作实际是把队列中 CANCELLED 的节点剔除掉。
if (ws > 0) {// 如果前继节点是“取消”状态,则设置“当前节点”的“当前前继节点”为“‘原前继节点 ’ 的前继节点”。

do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {// 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为 SIGNAL 状态。
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don’t park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt
如果 shouldParkAfterFailedAcquire 返回了 true,则会执行:parkAndCheckInterrupt()方法,它是通过 LockSupport.park(this)将当前线程挂起到 WATING 状态,它需要等待一个中断、unpark 方法来唤醒它,通过这样一种 FIFO 的机制的等待,来实现了 Lock 的操作。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

LockSupportLockSupport 类是 Java6 引入的一个类,提供了基本的线程同步原语。LockSupport 实际上是调用了 Unsafe 类里的函数,归结到 Unsafe 里,只有两个函数:
public native void unpark(Thread jthread);
public native void park(boolean isAbsolute, long time);
unpark 函数为线程提供“许可(permit)”,线程调用 park 函数则等待“许可”。这个有点像信号量,但是这个“许可”是不能叠加的,“许可”是一次性的。permit 相当于 0 / 1 的开关,默认是 0,调用一次 unpark 就加 1 变成了 1. 调用一次 park 会消费 permit,又会变成 0。如果再调用一次 park 会阻塞,因为 permit 已经是 0 了。直到 permit 变成 1. 这时调用 unpark 会把 permit 设置为 1. 每个线程都有一个相关的 permit,permit 最多只有一个,重复调用 unpark 不会累积

锁的释放
ReentrantLock.unlock
加锁的过程分析完以后,再来分析一下释放锁的过程,调用 release 方法,这个方法里面做两件事,1,释放锁;2,唤醒 park 的线程
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
这个动作可以认为就是一个设置锁状态的操作,而且是将状态减掉传入的参数值(参数是 1),如果结果状态为 0,就将排它锁的 Owner 设置为 null,以使得其它的线程有机会进行执行。在排它锁中,加锁的时候状态会增加 1(当然可以自己修改这个值),在解锁的时候减掉 1,同一个锁,在可以重入后,可能会被叠加为 2、3、4 这些值,只有 unlock()的次数与 lock()的次数对应才会将 Owner 线程设置为空,而且也只有这种情况下才会返回 true。
protected final boolean tryRelease(int releases) {
int c = getState() – releases; // 这里是将锁的数量减 1
if (Thread.currentThread() != getExclusiveOwnerThread())// 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 由于重入的关系,不是每次释放锁 c 都等于 0,
// 直到最后一次释放锁时,才会把当前线程释放
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor
在方法 unparkSuccessor(Node)中,就意味着真正要释放锁了,它传入的是 head 节点(head 节点是占用锁的节点),当前线程被释放之后,需要唤醒下一个节点的线程
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {// 判断后继节点是否为空或者是否是取消状态,
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) // 然后从队列尾部向前遍历找到最前面的一个 waitStatus 小于 0 的节点, 至于为什么从尾部开始向前遍历,因为在 doAcquireInterruptibly.cancelAcquire 方法的处理过程中只设置了 next 的变化,没有设置 prev 的变化,在最后有这样一行代码:node.next = node,如果这时执行了 unparkSuccessor 方法,并且向后遍历的话,就成了死循环了,所以这时只有 prev 是稳定的
s = t;
}
// 内部首先会发生的动作是获取 head 节点的 next 节点,如果获取到的节点不为空,则直接通过:“LockSupport.unpark()”方法来释放对应的被挂起的线程,这样一来将会有一个节点唤醒后继续进入循环进一步尝试 tryAcquire()方法来获取锁
if (s != null)
LockSupport.unpark(s.thread); // 释放许可
}
总结
通过这篇文章基本将 AQS 队列的实现过程做了比较清晰的分析,主要是基于非公平锁的独占锁实现。在获得同步锁时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用 tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。

正文完
 0