如何对待 AQS?
看了 Synchronized 的实现形式之后,再来看 JDK 的 AQS, 感觉就比较简单了,它的行为有点像银行排队,银行有很多窗口解决很多业务,不同的窗口解决不同的业务,比方有集体业务,也有金融业务,也有公司业务等,每个窗口都有很多人排队。一般来讲当你在窗口解决业务的时候是不会有人来打搅你的,不个别的时候是什么时候呢?那就是材料散发窗口,谁来了都给你一张材料,一边看去吧。
而且当你去解决业务的时候,你可能有些材料遗记带了,而后你又要从新去取材料,取完之后回来持续排队。
所以说,代码源于生存,今人诚不欺我。
带着问题看源码
- AQS 中是如何实现线程阻塞的?
- AQS 为什么反对多个 Condition?
- 线程唤醒的时候程序是怎么的?
带着问题去看源码比一猛子扎进源码的陆地好,所以我倡议你带着这几个问题去看 AQS 源码。
CLH 队列
AQS 是一个抽象类,很多类都是基于它来实现本人的特有的性能的,比方 Lock,Semaphore,CountDownLatch 等,咱们都晓得对于一个共享变量,为了线程平安,同一时刻必定只能有一个线程线程能够写这个变量,也就是只有一个线程能拿到锁。那么那些没有拿到锁的线程是怎么被阻塞的呢?
AQS 中保护了一个基于链表实现的 FIFO 队列(官网叫它 CLH 锁), 那些没有获取到锁的线程会被封装后放入都在这个队列外面, 咱们来看下这个队列的定义
static final class Node {
// 共享模式
static final Node SHARED = new Node();
// 排它模式,默认
static final Node EXCLUSIVE = null;
// 以后线程的一个期待状态, 默认值为 0
volatile int waitStatus;
// 前一个节点
volatile Node prev;
// 后一个节点
volatile Node next;
// 入队到这个节点的线程
volatile Thread thread;
// 下一个期待 condition 的节点或者是非凡的节点值: SHARED
Node nextWaiter;
}
这里共享模式和排它模式是什么意思呢?排它模式是示意同时只能有一个线程获取到锁执行,而共享模式示意能够同时有多个线程执行,比方 Semaphore/CountDownLatch。
waitStatus 除了默认值外还有三个值,别离代表不同的含意
- CANCELLED = 1 : 示意以后节点曾经不能在获取锁了,以后节点因为超时或者中断而进入该状态,进入该状态的节点状态不会再发生变化,同时后续还会从队列中移除。
- SIGNAL = -1 : 示意以后节点须要去唤醒后继节点。后继节点入队时,会将以后节点的状态更新为 SIGNAL
- CONDITION = -2 : 以后节点处于条件队列中,当其余线程调用了 condition.signal()后,节点会从条件队列本义到同步队列中
- PROPAGATE = -3:当共享锁开释的时候,这个状态会被传递到其余后继节点
其实本来的 CLH 队列是没有 prev 这个指针的,它存在的意义是那些勾销锁获取的线程须要被移除掉。
Synchronized 的 wait 只能有一个条件队列(WaitSet),而 AQS 则是反对多个条件队列, 其中条件队列的要害数据结构如下:
public class ConditionObject implements Condition, java.io.Serializable {
// condition queue 中的头节点
private transient Node firstWaiter;
// condition queue 中的尾节点
private transient Node lastWaiter;
/**
* 创立一个新的 condition
*/
public ConditionObject() {}
// 相似于 Object 的 notify
public final void signal() {...}
// 相似于 Object 的 wait
public final void await() throws InterruptedException {...}
}
AQS 反对多个 Condition 的起因就是每个 ConditionObject 都对应一个队列, 所以它能够反对多个 Condition。
如果多个线程申请同一把锁,有的线程阻塞在申请锁下面,有的锁阻塞到期待某个条件成立。那么当持有锁的线程让出锁的时候,哪个线程应该获取到锁呢?
当获取了锁的线程调用了 signal()时,它又会不会从条件队列 (condition queue) 中出队一个 node, 退出到同步队列中呢?答案是会的。
如果我说 AQS 剖析结束,你们会不会打我?
哈哈哈,各位放下键盘,扶我起来,我还能在水一会儿。
AQS 中除了这两个类之外,还有 AQS 自身还有几个重要的属性,它们独特形成了 AQS 这个框架类的根底构造
// wait queue(或者叫做同步队列)的头节点
private transient volatile Node head;
// wait queue(或者叫同步队列)的尾节点
private transient volatile Node tail;
// 资源
private volatile int state;
看到了 head 和 tail,是不是一个双向链表的含意跃然纸上了。
AQS 中的 state 变量,对不同的子类有不同的含意,该变量对 ReentrantLock 来说示意加锁状态,对 Semaphore 来说则是信号量的数量。总之它们都是基于 AQS 中的 state 来实现各自独有性能的。
这里的 state 我感觉更应该叫做资源,就相似于去银行排队时取的票。对于 ReentrantLock 来说,这个资源只有 1 个,对于 Semaphore 或者 CountDownLatch 来说资源能够有很多个。
Unsafe 解说
unsafe 是一个十分弱小的类,通过它提供的 API,咱们能够操作内存,批改对象属性值,甚至内存屏障,加锁解锁都能通过它实现
图片来自于: https://tech.meituan.com/2019…
AQS 中就应用了 unsafe 来实现线程的阻塞以及批改属性的值,这里把 AQS 中波及到 unsafe 的办法列出来
// 阻塞以后线程
public native void park(boolean isAbsolute, long time);
// 解锁指定的线程
public native void unpark(Object thread);
// 获取字段的偏移地址
public native long objectFieldOffset(Field f);
// CAS 更新对象的值为 x, 当当初的值是 expected
public final native boolean compareAndSwapObject(Object o, long offset,
Object expected,
Object x);
// CAS 更新
public final native boolean compareAndSwapInt(Object o, long offset,
int expected,
int x);
park 和 unpark 的底层实现其实还是之前咱们解说过的 pthread, 这里不再解说,感兴趣的能够去看看之前对于 synchronized 的文章。
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
...
thread->parker()->park(isAbsolute != 0, time);
...
UNSAFE_END
批改对象的属性值,咱们用一段代码来演示
public class UnSafeDemo {public static void main(String[] args) throws Exception {Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
Unsafe unsafe = (Unsafe) theUnsafe.get(null);
Student student = new Student();
student.setAge(18);
long ageOffset = unsafe.objectFieldOffset(Student.class.getDeclaredField("age"));
// cas 批改 student 的 age 为 20
unsafe.compareAndSwapInt(student, ageOffset, 18, 20);
System.out.println(student.getAge());
}
}
class Student {
int age;
public int getAge() {return age;}
public void setAge(int age) {this.age = age;}
}
下面代码的输入后果为 20
, 咱们通过首先通过 objectFieldOffset 获取到 age 属性的内存偏移地址,而后通过 compareAndSwapInt 将 age 属性的值批改为 20。
比方对于一维数组 a[10], 基地址是 0x000000, 那么 a[1]的内存地址就是 0x000001 = 0x000000(基地址) + 1(偏移量)
源码剖析
其余类如果要想将 AQS 作为基类以实现本人的性能,只须要依据需要实现以下办法就行了
// 尝试获取锁
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}
// 尝试开释锁
protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();
}
// 尝试获取共享锁
protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();
}
// 尝试开释共享锁
protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();
}
// 锁是否被独占
protected boolean isHeldExclusively() {throw new UnsupportedOperationException();
}
AQS 中获取锁的办法是在 acquire()
中,开释锁的办法是release()
,如果咱们想要实现自定义的锁的时候,只须要依据本人的需要实现对应的 tryXXX 办法就行了,为了简化剖析,咱们只剖析独占锁的源码。
获取锁
public final void acquire(int arg) {if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
咱们假如 tryAcquire 返回 false(不关怀子类实现), 也就是当初锁曾经被其余线程占有了。当初又有线程来获取锁必定是会获取失败的,所以失败的线程会被封装成 Node 插入到队列中
private Node addWaiter(Node mode) {
// 将 nextWaiter 设置为 EXCLUSIVE,示意节点正以独占模式期待
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
// CAS 将 tail 节点更新为 node
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果执行到这里就存在两种状况
// 1. pred == null, 示意是第一次插入,以后同步队列中没有其余线程
// 2. 表明有其余线程批改了 tail 的值, 导致 CAS 批改 tail 失败
enq(node);
return node;
}
// CAS 更新 tail 节点
private Node enq(final Node node) {for (;;) {
Node t = tail;
// 解决第一次插入的状况,这里 head = new Node()
if (t == null) {if (compareAndSetHead(new Node()))
tail = head;
} else {
// 解决竞争强烈的状况或者第一次插入后未建设链路关系的状况
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
下面逻辑很简略,就是结构一个排它锁模式的 node,插入队列中(尾插入)。
然而你尤其须要留神到 enq 中,当节点第一次插入的时候,head 的值是 new Node(), 它是一个虚构节点,这个节点自身没有可运行的线程。
在链表中,应用这种虚构节点很失常,有时候更加有利于操作。
enq 执行实现后就造成了这样的链路关系:
acquireQueued 的源码如下
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {final Node p = node.predecessor();
// 1. 如果前一个节点是头节点,则尝试再次获取锁(机会更大)
if (p == head && tryAcquire(arg)) {setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 2. 获取锁失败,须要查看并更新节点状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 3. 如果线程被中断,须要将节点从队列中移除
if (failed)
cancelAcquire(node);
}
}
逻辑也比拟简单明了,在线程被阻塞之前,如果前一个节点是头节点(head), 那么在尝试去获取一次锁,如果胜利了就间接返回。
如果还是失败的话,就更新下节点状态,而后将线程阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前置节点曾经是 SIGNAL 了,能够释怀的 park 了
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 如果前置节点状态是 CANCELLED, 须要将节点移除
do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 这就是咱们之前说的在入队的时候,会通过 CAS 将前置节点的状态设置为 SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
线程阻塞的代码更简略
private final boolean parkAndCheckInterrupt() {
// 调用了 unsafe 的 park 阻塞以后线程
LockSupport.park(this);
return Thread.interrupted();}
至此,获取锁的源码就解析实现了。
从下面的代码,咱们能够晓得 head --> next --> next --> tail
这样的数据结构组成了同步队列,期待获取锁的线程会被封装成 node 插入到队列中去。
开释锁
release()中次要调用了 unparkSuccessor()办法来唤醒后继节点,源码如下
// 传入的 node 是 head 节点
private void unparkSuccessor(Node node) {
// 1. 获取结点状态
int ws = node.waitStatus;
// 2. 应用 CAS 更新 waitStatus 为默认值 0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 3. 通过循环找到后继节点中状态不为 CANCELLED 的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 4. 调用 unsafe 解锁线程
if (s != null)
LockSupport.unpark(s.thread);
}
之前提到过 head 节点是虚构节点,所以会调用 unsafe.park
去解锁 head.next 节点
条件变量(Condition)
Java 语言内置的管程 (synchronized) 里只有一个条件变量,而咱们的 AQS 是反对多个条件变量的,Java 中定义了一个接口,叫做 Condition,AQS 的外部类 ConditionObject 实现了这个接口。
public interface Condition {// 使得以后线程被阻塞,直到收到信号 (signal) 或者线程被中断
void await() throws InterruptedException;
// 唤醒一个期待线程
void signal();
// 唤醒所有的期待线程
void signalAll();
// 省略其余办法
}
Condition 中线程期待和告诉须要调用await()
、signal()
、signalAll()
,它们的语义和 wait()
、notify()
、notifyAll()
是雷同的。然而不一样的是,Condition 里只能应用后面的 await()
、signal()
、signalAll()
,而前面的 wait()
、notify()
、notifyAll()
只有在 synchronized 实现的管程里能力应用。
await
public final void await() throws InterruptedException {if (Thread.interrupted())
throw new InterruptedException();
// 将以后线程结构成一个 waiter 节点,waitStatue 的值为 -2(CONDITION), 将它插入到条件队列中
Node node = addConditionWaiter();
// 开释持有的锁
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果不在同步队列中,则阻塞以后线程
while (!isOnSyncQueue(node)) {LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 在同步队列中,有可能被唤醒了,须要去从新获取锁。并解决中断逻辑
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果 lastWaiter 不是出于 condition 状态,则须要移除掉
if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();
t = lastWaiter;
}
// 将以后线程构建为一个 condition node
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
// 批改 lastWaiter 的指向
lastWaiter = node;
return node;
}
await()办法的逻辑也比较简单,其执行的次要流程如下
- 如果以后线程曾经被中断则抛出异样,未中断则执行步骤 2
- 将以后线程结构为 condition node,并插入到条件队列中
- 如果 condition node 不在同步队列中(有可能被唤醒后,移出条件队列了),则调用 unsafe.park 阻塞以后线程
通过以上代码剖析,await 之后条件队列会造成上面这样的数据结构
firstWaiter --> nextWaiter --> nextWaiter --> lastWaiter
这里的 firsetWaiter 并不是虚构节点,当只有一个线程在条件队列中时,firstWaiter == node == lastWaiter
signal
public final void signal() {if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获取第一个期待节点,进行唤醒
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
// 将节点从条件队列中移除
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 将节点从条件队列挪动到同步队列
final boolean transferForSignal(Node node) {
// CAS 批改 waitStatus 失败,就证实 node 处于 CANCELLED 状态
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// enq 的源码下面下面获取锁有提到过,就是将节点插入同步队列并返回前置节点
Node p = enq(node);
int ws = p.waitStatus;
// 如果前置节点状态是 CANCELLED 或者批改 waitStatus 状态为 SIGNAL 失败,那么须要唤醒刚插入的节点
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
其实 signal 的逻辑也是很简略的,不晓得为什么写到 signal 的时候,我就想起了我过后写 synchronized 的时候,一样的索然无味,无欲无求。我想可能是没人给我点赞。
说回 signal,它的次要逻辑如下
- 将第一个期待节点从期待队列中移除
- 批改第一个期待节点的 waitStatus 为 0
- 将节点入队到同步队列, 并返回前一个节点
- 判断前一个节点是否处于 CANCELLED 或者 waitStatus 是否失常批改,如果不能则解锁刚入队的节点
你须要留神的是,signal 并没有开释本人取得的锁。开释锁的操作依然是通过 release 的。
总结
通过源码的剖析,我置信你肯定能够答复上之前的三个问题。也让咱们晓得了 AQS 和 Synchronized 其实外在实现形式是很相似的。
AQS 中有同步队列和条件队列,signal 的时候是将节点从条件队列转移到同步队列。
Synchronized 中有 EntryList 和 WaitSet。notify 的时候将线程从 WaitSet 挪动到 EntryList 中。
同样的,看源码我集体更喜爱沿着一条线去看,而不是大而全的看,这样更容易把控整个脉络。