共计 17966 个字符,预计需要花费 45 分钟才能阅读完成。
是它,是它,就是它,并发包的基石;
一、概述
闲来不卷,轻易聊一点。
个别状况下,大家零碎中至多也是 JDK8 了,那想必对于 JDK5 退出的一系列性能并不生疏吧。那时候重点退出了 java.util.concurrent
并发包,咱们简称为 JUC。JUC 下提供了很多并发编程实用的工具类,比方并发锁 lock、原子操作 atomic、线程池操作 Executor 等等。上面,我对 JUC 做了整顿,大抵分为上面几点:
基于 JDK8,明天重点来聊下 JUC 并发包下的一个类,AbstractQueuedSynchronizer
。
首先,通俗的从名字上看,形象的队列同步器 ;实际上,这名字也跟它的作用一模一样。 形象 ,即须要被继承; 队列同步器,其外部保护了一个队列,供线程入队期待;最终实现多个线程访问共享资源的性能。
二、源码解析
进入 AbstractQueuedSynchronizer
外部,须要把握三个重要的属性:
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
- head:标记期待队列头部节点。
- tail:标记期待队列尾部节点。
- state:线程的锁定状态;state=0,示意资源未被上锁;state>0,示意资源被上锁
咱们调试 AQS 的源码,必须寻找一个源码调试的切入点,我这里用咱们并发编程罕用的 Lock 锁作为调试 AQS 的切入点,因为这是解决线程平安问题罕用的伎俩之一。
2.1、源码的切入点
AQS 的源码调试,从 Lock
接口登程,JDK 源码定义如下:
public interface Lock {void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();}
从源码中看到,Lock
是一个接口,所以该接口会有一些实现类,其中有一个实现类ReentrantLock
,可重入锁,想必大家都不会生疏。
2.2、ReentrantLock 的 lock 办法
通过跟踪源码能够看到,ReentrantLock#lock 外部实现貌似比较简单,只有简短的一行代码
public void lock() {sync.lock();
}
其实外部是保护了一个 Sync
的抽象类,调用的是 Sync
的 lock()办法。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {return getExclusiveOwnerThread() == Thread.currentThread();}
// ...
}
能够看到,Sync
也是个抽象类,它有两个实现类:NonfairSync
和 FairSync
,这里其实就引出了咱们明天的配角,AbstractQueuedSynchronizer
,Sync
继承了它。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {acquire(1);
}
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
上面我整顿了这一系列类的 UML 图
通过类图可知,lock()办法最终调用的是 ReentrantLock
类下,外部类 NonfairSync
或FairSync
的 lock 办法;对于这两个类,前者叫非偏心锁,后者叫偏心锁。通过 ReentrantLock
的结构器可知,默认应用 NonfairSync
类。
public ReentrantLock() {sync = new NonfairSync();
}
从 NonfairSync
类的 lock 办法登程,引出第一个 AQS 下的办法 compareAndSetState。
final void lock() {if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
从 compareAndSetState 办法的命名能够发现,就是比拟并替换的意思,典型的 CAS 无锁 机制。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
咱们能够察看到,这里其实调用的是 Unsafe 类的 compareAndSwapInt 办法,传入的 expect 为 0,update 为 1;意思是如果以后值为 0,那我就把值最终更新为 1。
Unsafe
这个类上面,发现好多办法都是用 native
这个关键词进行润饰的(也包含 compareAndSwapInt 办法),用 native
关键词润饰的办法,示意原生的办法;原生办法的实现并不是 Java 语言,最终实现是 C /C++;这并不是本文的探讨范畴。
回到 AQS 的 compareAndSetState 办法,返回值是 boolean 类型,true 示意值更新为 1 胜利,false 示意不胜利。这里呈现两个分支,胜利,走 setExclusiveOwnerThread 办法;不胜利,走 acquire 办法。咱优先探讨 acquire 办法。
2.3、AQS 的 acquire 办法
先来看一下该办法的源码;
public final void acquire(int arg) {if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
这里的外围是两个办法,tryAcquire 办法和 acquireQueued 办法。首先会调用 tryAcquire()办法,看办法命名是尝试获取;实际上这个办法的确在就在做一件事“尝试获取资源”。
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}
不过 AQS 中的这个办法是 protected
润饰,并没有去实现,仅仅只是预留了办法入口,前期须要由其子类去实现;这里的子类就是上文中的 NonfairSync
类,该类的源码在上文中曾经贴出。这段源码其实使用了咱们常见的一个设计模式,“模板办法模式”。
2.4、NonfairSync 的 tryAcquire 办法
NonfairSync 的 tryAcquire 办法源码如下
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}
这里并没有间接去实现 tryAcquire 办法,而是调用了 Sync
类下的 nonfairTryAcquire 办法。
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
这里有个 getState 办法,最终返回的是 AQS 中的 state 字段,这个字段就是多个线程抢占的共享资源,所以 这个字段很重要 ;volatile
关键字润饰,保障内存的可见性,int
类型,对于 ReentrantLock 锁而言,当 state= 0 时,示意无锁,当 state>0 时,示意资源已被线程锁定。
上面剖析下这段代码:
- 如果 state= 0 示意无锁,通过 cas 去更新 state 的值,这里更新为 1。
- 将持有锁的线程更新为以后线程。
- 如果上述 cas 未更新胜利,或者 state!=0,示意已上锁。
- 持续判断下持有锁的线程如果是以后线程,state 字段做叠加,这里示意 ReentrantLock 的含意,示意可重入锁。
- 最初,state!=0,持有锁的线程也不是以后线程,示意不能对资源加锁,返回 false。
tryAcquire 办法的判断至此完结,不过最终的走向须要看它的返回值;返回 true,示意以后线程抢占到锁,或者以后线程就是抢占锁的线程,间接重入,加锁流程完结;返回 false,示意没有抢占到锁,流程持续,这里就引出下个话题,CLH线程期待队列。
2.5、AQS 的 addWaiter 办法
2.5.1、CLH 队列
首先咱来看一段源码中的正文
The wait queue is a variant of a “CLH” (Craig, Landin, and Hagersten) lock queue. CLH locks are normally used for spinlocks
大抵意思是:CLH 队列是由 Craig、Landin、Hagersten 这三位老哥名字的首字母叠加在一起命名的,它是一个期待队列,它是一个变种队列,用到了自旋。
这里的信息要抓住三点:期待队列、变种队列、自旋。
2.5.2、Node 类
在解析 addWaiter 办法实现之前,就不得不提到一个外部类Node
;addWaiter 办法的入参是这个类型,所以先来看看这个类。源码如下:
static final class Node {static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {return nextWaiter == SHARED;}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
这里先大抵介绍下,每个属性的意思:
- SHARED:类型就是 Node,示意共享模式。
- EXCLUSIVE:类型也是 Node,示意独占模式,这里的 ReentrantLock 就是独占模式。
- waitStatus:int 类型,以后 Node 节点下,存储的线程状态。
- CANCELLED:int 类型,等于 1,waitStatus 属性的值之一,示意节点被勾销状态。
- SIGNAL:int 类型,等于 -1,waitStatus 属性的值之一,示意以后节点须要去唤醒下一个节点。
- CONDITION:int 类型,等于 -2,waitStatus 属性的值之一,示意节点处于期待状态。
- PROPAGATE:int 类型,等于 -2,waitStatus 属性的值之一,示意下一个被获取的对象应该要无条件流传,该值仅在共享模式下应用。
- prev:Node 类型,指向队列中以后节点的前一个节点。
- next:Node 类型,指向队列中以后节点的下一个节点。
- thread:存储以后线程信息。
- nextWaiter:用来存储节点的指针,不过会呈现两种状况;期待队列中,会将该属性的值设置成 SHARED 或者 EXCLUSIVE,用来辨别以后节点处于共享模式还是独享模式;条件队列中,用于寄存下一个节点的指针,所以当是条件队列的状况下,这个队列是单向队列。
- isShared():返回是否属于共享模式,true 示意共享模式,false 示意独享模式。
- predecessor():获取以后节点的前一个节点。
另外,Node 类还有两个有参结构器:
从作者的正文就能看进去,第一个结构器是在期待队列的时,创立节点应用,第二个结构器是在条件队列时,创立节点应用。
2.5.3、办法解析
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
其实这段办法是在创立 Node 对象,Node 对象就是组成 CLH 队列的根底元素。
- 创立一个 Node 对象,mode 参数由上述的 acquire()办法传递而来,能够看到传入
Node.EXCLUSIVE
,示意独占模式。 - 判断队尾有指向节点,刚创立的节点放入队列的队尾,并且通过 cas 将队尾指针改成以后创立节点,最初返回以后创立节点。
- 如果队尾没有指向节点,调用 enq 办法,做队列的初始化操作。
- 这里呈现了第一个自旋,enq 办法是有限循环的,就像作者正文的一样,Must initialize,必须初始化。
- 这里先是从新 new 了一个新的 node(也能够叫空节点),标记它为队列头。
- 随后再将 addWaiter 办法中创立的 node,退出到队列尾。
总结下 addWaiter 办法干的事件:
- 创立一个节点,存储以后线程,并标记独占模式。
- 判断队列是否为空,不为空,通过 cas 将存储以后线程的 node 节点退出到对尾,并且对该节点做对尾标记。
- 队列为空,通过自旋,做初始化操作。
-
初始化过后的队列,队列头是一个空节点,队列尾是存储以后线程的节点。
2.6、AQS 的 acquireQueued 办法
还是先来看下这个办法的源码;
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) {final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally {if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0); pred.next = node; } else {compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private void cancelAcquire(Node node) {if (node == null) return; node.thread = null; Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else {unparkSuccessor(node); } node.next = node; // help GC } }
从这个办法看到,又是使用了有限循环,须要分两个步骤去察看:1. 以后办法中的判断,本人的上一个节点是否是头部节点(头部节点就是占用资源的节点);2. 以后节点正式入队列,并且被挂起。
2.6.1、acquireQueued 办法中的判断
以后节点的前一个节点是队列头部,意味着以后节点的前一个节点,就是持有资源的节点;当资源被开释,以后节点会去尝试抢夺锁资源;如果拿到锁资源,以后节点会被标记为队列头部节点,它的上个节点(老的头部节点)会被置为 null,须要被 GC 及时革除,所以作者在这里增加了一个正文:help GC;下图就是形容了这个流程:
2.6.2、shouldParkAfterFailedAcquire 办法实现
如果以后节点的上一个节点,并不是头部节点;这里就须要用到上述 Node 类中介绍的各种状态字段了;先来重点介绍下 Node
类中的两个状态属性:
- CANCELLED:int 类型,等于 1,waitStatus 属性的值之一,示意节点被勾销。
- SIGNAL:int 类型,等于 -1,waitStatus 属性的值之一,示意以后节点须要去唤醒下一个节点。
进入的 shouldParkAfterFailedAcquire 这个办法外部,该办法承受两个参数:以后节点前一个节点和以后节点。首先,获取上一个节点的 waitStatus 属性,而后通过这个属性做如下判断:
- 如果状态是 SIGNAL(即等于 -1),间接返回 true,后续就会交给 parkAndCheckInterrupt 办法去将以后线程挂起。
-
如果不是 SIGNAL,对于以后 ReentrantLock 而言,ws>0 的操作是满足的,所以上面的步骤就是以后节点始终往前寻找,跳过已被标记状态为 CANCELLED 的节点,直到找到状态是 SIGNAL 的节点,将该节点作为以后节点的上一个节点。也印证了 SIGNAL 状态的解释:以后节点的上一个节点是 SIGNAL,那么以后节点须要挂起,期待被唤醒。最初进入下个循环,直到上个节点状态是 SIGNAL,执行下面的第一步,返回 true。
这里能够设想成一个排队去食堂打饭的场景,你在抬头玩手机前,跟你后面的同学说,我玩会手机,快到了叫我一下;后果你后面的同学嫌队伍长走了(CANCELLED 状态),所以你只能持续找他的上一个同学;直到有同学答复你,好的(该同学被标记 SIGNAL 状态);而后你就抬头玩手机,期待答复你“好的”的那个同学叫你。
- 最初 compareAndSetWaitStatus 办法其实不必看也晓得,通过 cas 机制,将以后节点的上一个节点的 waitStatus 批改成 SIGNAL 状态,这样的话,以后节点能力被挂起,期待唤醒。
再来看下 parkAndCheckInterrupt 这个办法
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);
return Thread.interrupted();}
// LockSupport#park
public static void park(Object blocker) {Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
其中最终又是这个 Unsafe
类,通过它的原生办法 park,去挂起以后线程,这里就不开展赘述了。
2.7、资源上锁总结
上面整顿下从 lock 办法作为切入点,一系列的调用:
2.8、ReentrantLock 的 unlock 办法
之前始终在讲资源“上锁”,那么这个办法就是给资源解锁。这里给出重要的局部源码
// AQS 中
public final boolean release(int arg) {if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// AQS 中
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
// ReentrantLock 中
protected final boolean tryRelease(int releases) {int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2.9、ReentrantLock 的 tryRelease 办法
在调用 unlock 办法去解锁后,最终是调用 AQS 中的 release 办法去实现这个解锁性能的;在该办法中,首先会调用 ReentrantLock 中的 tryRelease 办法,去做 state 状态值的递加操作。
- 首先,获取 state 值(在 AQS 中有这个公共属性,上文提到过),这里是对以后 state 值减去 1。
- 再判断以后解锁的线程与持有锁的线程是不是同一个,不是的话,间接抛异样。所以 t 线程占用锁,只有 t 线程能力解锁,解铃还须系铃人。
- 最初判断做完递加的值是不是等于 0,如果为 0,将持有锁的线程清空,更新 state 字段为递加值(这里是 0),最初返回 true,代表锁曾经被开释了。
-
如果不是 0,更新 state 字段为递加值(不是 0),也不会清空持有锁的线程,意味着资源还是被线程加锁中,最初返回 false。
2.10、AQS 的 release 办法
在 tryRelease 办法返回 false 的时候,release 办法并不会做任何操作,间接就完结了,意味着解锁并没有实现;
然而在返回 true 的时候,具体分以下几部操作: - 拿到 CLH 队列被标记头部的节点。
- 判断不是空(队列不能是空的),并且头部节点的期待状态不是 0,在这种状况下,它只能是 -1(SIGNAL),所以是须要去唤醒下个节点的。
- 最初,调用 AQS 中的 unparkSuccessor 办法,去唤醒线程。
2.11、AQS 的 unparkSuccessor 办法
下面说到了,这个办法次要是用来唤醒线程的,上面还是做一下具体的解析:
- 该办法传参是一个 Node 节点,这里传入的是被标记队列头的节点(头部节点是持有锁资源的节点)。
- 拿到头部节点的 waitStatus 状态属性,并且判断小于 0 的状况下(该状况是 waitStatus=-1),通过 cas 机制将头部节点的状态改为 0,初始化状态。
- 拿到头部节点的下个节点,也就是真正意义上处于期待中的第一个节点。
- 它还是先判断了这个拿到的节点是否为 null,或者状态大于 0(亦或说判断状态等于 1);如果条件成立,阐明头节点的下个节点是空,或者下个节点被勾销了。
- 如果第四个判断条件满足,从队尾始终从后往前找,找到离头节点最近的那个节点。
- 通过
Unsafe
类的 unpark 原生办法去唤醒上述找到的,间隔头部节点最近的未处于勾销状态下的节点。
2.12、资源解锁总结
通过下面的形容能够发现,资源解锁是绝对简略的;它只能被上锁的线程去解锁;通过递加 AQS 外部保护的 state 属性值,直到 state 减为 0,示意资源已被解锁;当资源被解锁后,须要通过 Unsafe
的 unpark 办法,去唤醒 CLH 队列中,被挂起的第一个节点上的线程。
2.13、偏心锁与非偏心锁的差别
在 2.2 中说过,当咱们应用无参结构器创立一把“锁”的时候,默认是应用 NonfairSync 这个外部类,也就是非偏心锁;然而在源码中发现 ReentrantLock
还存在一个有参结构器,参数是一个 boolean
类型;
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
很显著,这种形式就是将选择权交给开发人员,当咱们传入 true 时,就会创立一把“偏心锁”。还是一样,先来看下偏心锁的外部;
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {acquire(1);
}
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
从源码的角度,咱来看下,为什么一个叫“非偏心锁”,另一个叫“偏心锁”?
其实不难发现,NonfairSync
外部的 lock 办法,它是一上来就通过 cas 机制去抢占 state 公共资源,抢不到才去执行 acquire 办法实现后续入队列等一系列的操作;而这里 FairSync
的 lock 办法,它是间接执行 acquire 办法,执行后续的操作。等于非偏心锁,会去多争取一次资源,对于在 CLH 队列中期待的线程,是“不偏心”的。
除了 lock 办法存在差别之外,在 tryAcquire 办法中,也存在着不同。FairSync
类中,会多执行 hasQueuedPredecessors 办法,它是 AQS 下的一个专用办法,上面具体看下这个办法;
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
只有简短的几行,却有很多种可能性,然而整个办法次要性能就是判断以后线程是否须要入队列:返回 false,队列为空,不对期待;返回 true,队列不是空,去排队期待。上面须要重点讲下这一行代码:return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
2.13.1、hasQueuedPredecessors 返回 false
返回 false,状况也有两种:1、h != t 是 false,2、h != t 是 true,并且 (s = h.next) == null 是 false,s.thread != Thread.currentThread()是 false。
第一种状况比较简单,意思是头结点和尾节点是同一个,阐明队列是空的,不须要排队期待,所以间接返回 false。
第二种状况,头尾不是同一个节点,头部节点的下个节点也不是空,并且头部节点的下一个节点就是以后线程。
其实就能够了解为,后面的资源刚开释,正好轮到以后线程来抢占资源,这种状况绝对较少。
2.13.2、hasQueuedPredecessors 返回 true
返回 true,有两种状况:1、h != t 是 true,并且 (s = h.next) == null 是 true。2、h != t 是 true,并且 (s = h.next) == null 是 false,s.thread != Thread.currentThread()是 true。
1、这里的头尾不是同一个节点是必要满足的条件,保障了队列起码不是空的。而后(s = h.next) == null 满足是 true,这里解释起来就必须回顾下 enq 初始化队列这个办法。
private Node enq(final Node node) {for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
从这个办法可知,先是将节点的 prev 指向前一个节点,而后再通过 cas 批改尾部标识,最初再将前一个节点的 next 指向以后节点;因而 AQS,入队操作是非原子性的。
持续回到判断自身,头部节点拿到锁在执行;两头节点没拿到锁在入队;此时头部节点执行完后开释锁,以后节点尝试不入队拿锁,然而两头线程曾经在排队了,然而还没来得及执行 t.next = node 的操作,导致(s = h.next) == null 满足,所以以后节点必须入队,最终返回 true。
2、满足 s.thread != Thread.currentThread()的状况,执行到这里,能够明确队列首先不是空,并且 h.next != null,也就是头节点之后还有其余节点,最初再判断了下,s.thread != Thread.currentThread 为 true,也就是头节点的下个节点并不是以后节点,既然如此,那只能乖乖去队列中排队了,所以最终返回 true。
三、业务使用
想必大家对于并发锁并不生疏了,上文我也是通过 ReentrantLock
这个并发锁为入口,一步步来解析 AQS 中的实现。所以这里就不必 ReentrantLock 举例,这里换一个同步工具:CountDownLatch
,它也是基于 AQS 来实现的。
CountDownLatch
是通过一个计数器来实现的,初始值为线程的数量。每当一个线程实现了本人的工作,计数器的值就相应得减 1。当计数器达到 0 时,示意所有的线程都已执行结束, 而后在期待的线程就能够复原执行工作。
这个其实跟 ReentrantLock
思路差不多,一个是 state 初始值就是 0,通过“上锁”一步步叠加这个值;一个是 state 让使用者本人设定初始值,通过线程调用,一步步递加这个值。
CountDownLatch
具体的使用状况如下:1、一个主线程中,须要开启多个子线程,并且要在多个子线程执行结束后,主线程能力持续往下执行。2、通过多个线程一起执行,进步执行的效率。
上面,通过一个实在的业务场景,来进一步理解下 CountDownLatch
这个同步工具,具体是怎么应用的。
当初有这么一个接口,查问用户的详情信息;用户信息由五局部组成:1、用户根本信息;2、用户影像信息;3、用户工商信息;4、用户账户信息;5、用户组织架构信息;依照本来的逻辑是依照程序 1 - 5 这样一步步查问,最初组装用户 VO 对象,接口返回。然而这里能够用上 CountDownLatch
这个工具类,申请五个线程,别离去查问这五种信息,进步接口效率。
/**
* @author 往事如风
* @version 1.0
* @date 2023/4/11 18:10
* @description:导出报表
*/
@RestController
public class QueryController {@GetMapping("/query")
public Result download() throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(5);
// 模仿查问数据
List<String> row1 = CollUtil.newArrayList("aa", "bb", "cc", "dd");
List<String> row2 = CollUtil.newArrayList("aa1", "bb1", "cc1", "dd1");
List<String> row3 = CollUtil.newArrayList("aa2", "bb2", "cc2", "dd2");
List<String> row4 = CollUtil.newArrayList("aa3", "bb3", "cc3", "dd3");
List<String> row5 = CollUtil.newArrayList("aa4", "bb4", "cc4", "dd4");
CountDownLatch count = new CountDownLatch(5);
DataQuery d = new DataQuery();
// 开始工夫
long start = System.currentTimeMillis();
System.out.println("开始查问数据。。。。");
executorService.execute(() -> {System.out.println("查问用户根本信息。。。。。。");
try {Thread.sleep(1000);
} catch (InterruptedException e) {throw new RuntimeException(e);
}
d.setBaseInfo(row1);
count.countDown();});
executorService.execute(() -> {System.out.println("查问用户影像信息。。。。。。");
try {Thread.sleep(1000);
} catch (InterruptedException e) {throw new RuntimeException(e);
}
d.setImgInfo(row2);
count.countDown();});
executorService.execute(() -> {System.out.println("查问用户工商信息。。。。。。");
try {Thread.sleep(1000);
} catch (InterruptedException e) {throw new RuntimeException(e);
}
d.setBusinessInfo(row3);
count.countDown();});
executorService.execute(() -> {System.out.println("查问用户账户信息。。。。。。");
try {Thread.sleep(1000);
} catch (InterruptedException e) {throw new RuntimeException(e);
}
d.setAccountInfo(row4);
count.countDown();});
executorService.execute(() -> {System.out.println("查问用户组织架构信息。。。。。。");
try {Thread.sleep(1000);
} catch (InterruptedException e) {throw new RuntimeException(e);
}
d.setOrgInfo(row5);
count.countDown();});
// 阻塞:直到 count 的值减为 0
count.await();
executorService.shutdown();
// 完结工夫
long end = System.currentTimeMillis();
System.out.println("查问完结。。。。。");
System.out.println("用时工夫:" + (end - start));
return Result.success(d);
}
@Data
class DataQuery {
private List<String> baseInfo;
private List<String> imgInfo;
private List<String> businessInfo;
private List<String> accountInfo;
private List<String> orgInfo;
}
}
/*
控制台输入:开始查问数据。。。。查问用户根本信息。。。。。。查问用户影像信息。。。。。。查问用户工商信息。。。。。。查问用户账户信息。。。。。。查问用户组织架构信息。。。。。。查问完结。。。。。用时工夫:1017
*/
这段代码做了模仿查问各种用户信息的操作,其中每个线程都暂停 1 秒,代表在查问这五种数据;最终打印的用时工夫是 1017ms,阐明这五个线程是同时进行的,大大提高了接口的效率。
四、写在最初
AQS 提供了一个 FIFO 队列,这里称为 CLH 队列,能够看成是一个用来实现同步锁以及其余波及到同步性能的外围组件,常见的有:ReentrantLock
、CountDownLatch
、Semaphore
等。
AQS 是一个抽象类,次要是通过继承的形式来应用,它自身没有实现任何的同步接口,仅仅是定义了同步状态的获取以及开释的办法来提供自定义的同步组件。
能够这么说,只有搞懂了 AQS,那么 J.U.C 中绝大部分的 api 都能轻松把握。
本文次要提供了从 ReentrantLock
登程,解析了 AQS 中的各种专用的办法,如果须要晓得其余类中怎么去应用 AQS 中的办法,其实也只须要找到切入点,一步步调试上来即可,不过,我想很多中央都是和 ReentrantLock
中统一的。
五、参考源码
编程文档:https://gitee.com/cicadasmile/butte-java-note
利用仓库:https://gitee.com/cicadasmile/butte-flyer-parent