共计 11222 个字符,预计需要花费 29 分钟才能阅读完成。
前言
Java 并发包(JUC:java.util.concurrent)中提供了很多并发工具,这其中,很多咱们耳熟能详的并发工具,ReentrantLock、Semaphore,它们的实现都用到了一个独特的基类 –AbstractQueuedSynchronizer, 简称 AQS。AQS 是一个用来构建锁和同步器的框架,应用 AQS 能简略且高效地结构出利用宽泛的大量的同步器,比方咱们提到的 ReentrantLock,Semaphore,其余的诸如 ReentrantReadWriteLock,SynchronousQueue,FutureTask 等等皆是基于 AQS 的。当然,咱们本人也能利用 AQS 十分轻松容易地结构出合乎咱们本人需要的同步器。
AQS 负责管理同步器类中的状态,他治理了一个整数状态信息,能够通过 getState、setState、compareAndSetState 等类型办法来进行操作。ReentrantLock 示意所有线程曾经反复获取所的次数,Semaphore 示意残余许可数量。
ReentrantLock
1.1 ReentrantLock 概览
ReentrantLock 是一个可重入的互斥锁,又被称为“独占锁”。ReentrantLock 类实现了 Lock,它领有与 synchronized 雷同的并发性和内存语义,然而增加了相似锁投票、定时锁等待和可中断锁等待的一些个性。此外,它还提供了在强烈争用状况下更佳的性能。(换句话说,当许多线程都想访问共享资源时,JVM 能够花更少的时候来调度线程,把更多工夫用在执行线程上。)
ReentrantLock 锁在同一个工夫点只能被一个线程锁持有。
可重入:ReentrantLock 锁,能够被单个线程屡次获取。
ReentrantLock 分为“偏心锁”和“非偏心锁”。它们的区别体现在获取锁的机制上是否偏心。ReentrantLock 在同一个工夫点只能被一个线程获取 (当某线程获取到“锁”时,其它线程就必须期待);ReentraantLock 是通过一个 FIFO(先进先出)的期待队列来治理获取该锁所有线程的。在“偏心锁”的机制下,线程顺次排队获取锁;而“非偏心锁”在锁是可获取状态时,不论本人是不是在队列的结尾都会尝试获取锁而不须要马上排队( 获取不到锁再排队)。
通常咱们会将 ReentrantLock 和 synchronized 做比拟,两者孰优孰劣,其实在 java 官网对 synchronized 进行了诸多优化之后(偏差锁、轻量锁、自适应自旋、锁打消、锁粗化 …),两者的性能差距并不大,只是在某些方面存在差别而已,在此用一张表格做一个简略的比照:
// **************************Synchronized 的应用形式 **************************
// 1. 用于代码块
synchronized (this) {}
// 2. 用于对象
synchronized (object) {}
// 3. 用于办法
public synchronized void test () {}
// 4. 可重入
for (int i = 0; i < 100; i++) {synchronized (this) {}}
// **************************ReentrantLock 的应用形式 **************************
public void test () throw Exception {
// 1. 初始化抉择偏心锁、非偏心锁
ReentrantLock lock = new ReentrantLock(true);
// 2. 可用于代码块
lock.lock();
try {
try {
// 3. 反对多种加锁形式,比拟灵便; 具备可重入个性
if(lock.tryLock(100, TimeUnit.MILLISECONDS)){}} finally {
// 4. 手动开释锁
lock.unlock()}
} finally {lock.unlock();
}
}
1.2 ReentrantLock 与 AQS
下面咱们对 ReentrantLock 有了一个大略的理解,接下来能够来看看 ReentrantLock 和 AQS 之间到底是什么关系呢,其实 ReentrantLock 的底层就是应用的 AQS 实现的,咱们一起来看看源码:
/**ReentrantLock 实现了 Lock 类 其中有一个字段,Sync **/
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
private final Sync sync;
/**
* Sync 就是继承了 AQS 的抽象类,此锁的同步控制根底。* 上面将 Sync 细分为偏心锁和非偏心锁。* 应用 AQS 的 state 来示意锁的重入次数。*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* 形象的 lock 办法,别离给偏心锁和非偏心锁具体实现。*/
abstract void lock();
...
}
/**
* 非偏心锁
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
...
}
/**
* 偏心锁
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {acquire(1);
}
...
}
/**
* ReentrantLock 的默认构造方法,默认创立非偏心锁
*/
public ReentrantLock() {sync = new NonfairSync();
}
/**
* 通过传入 boolean 参数来决定创立偏心锁还是非偏心锁。*/
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
ReentrantLock 中的偏心锁和非偏心锁实现形式差异不大,差异在于偏心锁判断是否间接进入队列,先看看非偏心锁的加锁流程源码:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/** 加锁办法 **/
final void lock() {
// 若通过 CAS 设置变量 State(同步状态)胜利,也就是获取锁胜利,则将以后线程设置为独占线程。if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 若通过 CAS 设置变量 State(同步状态)失败,也就是获取锁失败,则进入 Acquire 办法进行后续解决。acquire(1);
}
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}
}
再看下偏心锁源码中获锁的形式:
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {acquire(1);
}
...
}
通过查看偏心锁和非偏心锁获取锁的源码得悉,都会通过 lock()办法区上锁,然而 lock()办法到底是怎么上锁的呢?带着这样的疑难,咱们持续跟踪一下会发现,这个办法属于 FairSync 和 NonfairSync 的父类 AQS(AbstractQueuedSynchronizer) 的外围办法。
在理解 AQS 之前,咱们带着几个疑难:
- 下面提到的通过 CAS 尝试扭转 state 的状态来示意获取锁胜利或者失败,如果获取失败了,调用 acquire() 办法,是怎么实现的呢?
- 获取不到锁,这个线程是在干嘛呢?一直尝试锁?还是挂起期待唤醒?
上面咱们会对 AQS 以及 ReentrantLock 和 AQS 的关联做具体介绍。
AQS
理解 AQS 之前,先看看 AQS 的整体框架:
- 上图中有色彩的为 Method,无色彩的为 Attribution。
- AQS 框架共分为五层,自上而下由浅入深,从 AQS 对外裸露的 API 到底层根底数据。
- 当有自定义同步器接入时,只需重写第一层所须要的局部办法即可,不须要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先通过第一层的 API 进入 AQS 外部办法,而后通过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的期待队列解决,而这些解决形式均依赖于第五层的根底数据提供层。
AQS 概述
AQS 保护一个用 Volatile 的 int 类型 state 的共享资源,通过内置的 FIFO 来实现获取资源线程的排队工作。( 这个内置的同步队列称为 ”CLH” 队列)。该队列由一个一个的 Node 结点组成,每个 Node 结点保护一个 prev 援用和 next 援用,别离指向本人的前驱和后继结点。AQS 保护两个指针,别离指向队列头部 head 和尾部 tail。
其实就是个变体 双端双向链表 。
当线程获取资源失败(比方 tryAcquire 时试图设置 state 状态失败),会被结构成一个结点退出 CLH 队列中,同时以后线程会被阻塞在队列中(通过 LockSupport.park 实现,其实是期待态)。当持有同步状态的线程开释同步状态时,会唤醒后继结点,而后此结点线程持续退出到对同步状态的抢夺中。
Node 节点
static final class Node {
/** waitStatus 值,示意线程已被勾销(期待超时或者被中断)*/
static final int CANCELLED = 1;
/** waitStatus 值,示意后继线程须要被唤醒(unpaking)*/
static final int SIGNAL = -1;
/**waitStatus 值,示意结点线程期待在 condition 上,当被 signal 后,会从期待队列转移到同步到队列中 */
static final int CONDITION = -2;
/** waitStatus 值,示意下一次共享式同步状态会被无条件地流传上来
static final int PROPAGATE = -3;
/** 期待状态,初始为 0 */
volatile int waitStatus;
/** 以后结点的前一个结点 */
volatile Node prev;
/** 以后结点的后一个结点 */
volatile Node next;
/** 与以后结点关联的排队中的线程 */
volatile Thread thread;
/** ...... */
}
同步状态 State
/**
* The synchronization state.
*/
private volatile int state;
protected final int getState() {return state;}
protected final void setState(int newState) {state = newState;}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
通过 AQS 中 state 的源码能够看出,对于 state 的办法都是 final 润饰的,阐明子类中无奈重写它们。咱们能够通过批改 State 字段示意的同步状态来实现多线程的 独占模式和共享模式(加锁过程)。
一般来说,自定义同步器要么是独占形式,要么是共享形式,它们也只需实现 tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared 中的一种即可。AQS 也反对自定义同步器同时实现独占和共享两种形式,如 ReentrantReadWriteLock。ReentrantLock 是独占锁,所以实现了 tryAcquire-tryRelease。
独占模式(通过 ReentrantLock 了解 AQS)
获取同步状态 –acquire()
public final void acquire(int arg) {if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
- tryAcquire: 首先,调用 tryAcquire 办法,若返回 true,意味着获取同步状态胜利,前面的逻辑不再执行;若返回 false,也就是获取同步状态失败,进入 acquireQueued 步骤;
- acquireQueued: 此时,获取同步状态失败,结构独占式同步结点,通过 addWatiter 将此结点增加到同步队列的尾部(此时可能会有多个线程结点试图退出同步队列尾部,须要以线程平安的形式增加);
- selfInterrupt: 该结点以在队列中尝试获取同步状态,若获取不到,则阻塞结点线程,直到被前驱结点唤醒或者被中断。
以非偏心锁为例,能够看到获取锁的大体流程:
final boolean nonfairTryAcquire(int acquires) {
// 获取以后线程
final Thread current = Thread.currentThread();
// 获取 state 的值
int c = getState();
// 如果 state 为 0,阐明以后没有其余线程占用共享资源,能够尝试获取锁
if (c == 0) {
// 用 CAS 批改 state 的值
if (compareAndSetState(0, acquires)) {
// 批改胜利,获取锁胜利,解决以后线程
setExclusiveOwnerThread(current);
// 返回获取锁胜利
return true;
}
}
// 如果 state 不为 0,判断占用锁的线程是否是以后线程
else if (current == getExclusiveOwnerThread()) {
// 如果是以后线程,对 state 做增量递增
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 设置 state
setState(nextc);
// 返回获取锁胜利(可重入的原理)return true;
}
return false;
}
- 获取以后线程;
- 获取 state 的值;
- 如果 state 为 0,阐明以后没有其余线程占用共享资源,能够尝试获取锁;
- 用 CAS 批改 state 的值;批改胜利,获取锁胜利,解决以后线程;
- 返回获取锁胜利;
- 如果 state 不为 0,判断占用锁的线程是否是以后线程;
- 如果是以后线程,对 state 做增量递增;
- 返回获取锁胜利(可重入的原理)。
值的留神得中央是,偏心锁中多了一部判断,再来看看偏心锁获取锁的大略流程:
protected final boolean tryAcquire(int acquires) {
// 获取以后线程;final Thread current = Thread.currentThread();
// 获取 state 的值;int c = getState();
// 如果 state 为 0,阐明以后没有其余线程占用共享资源,能够尝试获取锁
if (c == 0) {
// 判断以后期待队列中是否存在无效节点的办法,// 如果返回 False,阐明以后线程能够争取共享资源;// 如果返回 True,阐明队列中存在无效节点,以后线程必须退出到期待队列中。// 用 CAS 批改 state 的值;批改胜利,获取锁胜利,解决以后线程;if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
return true;
}
}
// 如果 state 不为 0,判断占用锁的线程是否是以后线程;else if (current == getExclusiveOwnerThread()) {
// 如果是以后线程,对 state 做增量递增;int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 返回获取锁胜利(可重入的原理)。setState(nextc);
return true;
}
return false;
}
/** 判断以后期待队列中是否存在无效节点的办法 **/
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized 其正确性取决于是否初始化头结点
// before tail and on head.next being accurate if the current 如果以后节点在头结点和尾结点之间
// thread is first in queue. 线程在队列中第一个
// 获取以后尾结点
Node t = tail; // Read fields in reverse initialization order
// 获取以后尾结点
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。当 h != t 时:1. 如果(s = h.next)==null,期待队列正在有线程进行初始化,但只是进行到了 Tail 指向 Head,没有将 Head 指向 Tail,此时队列中有元素,须要返回 True(这块具体见下边 addWaiter()曾经 enq()代码剖析)。2. 如果(s = h.next) != null,阐明此时队列中至多有一个无效节点,3. 如果此时 s.thread ==Thread.currentThread(),阐明期待队列的第一个无效节点中的线程与以后线程雷同,那么以后线程是能够获取资源的;4. 如果 s.thread != Thread.currentThread(),阐明期待队列的第一个无效节点线程与以后线程不同,以后线程必须退出进期待队列。
进入队列 addWaiter()
当尝试获取锁胜利,则间接完结并返回;
当尝试获取锁失败,则进入下一步: 线程退出期待队列addWaiter()
private Node addWaiter(Node mode) {
// 用以后线程和传入的锁模式新建一个 Node
Node node = new Node(Thread.currentThread(), mode);
// 先获取到以后的尾结点,将尾结点指向到 Node pred
Node pred = tail;
// 如果获取到的尾结点不为空
if (pred != null) {// 将新建节点的前置节点设置为下面获取到的 tail(tail 节点的后间接点此时为 null)
node.prev = pred;
// 利用 CAS 批改尾结点的值(将新建的 node 节点设置为 tail 尾结点)if (compareAndSetTail(pred, node)) {
// 如果设置胜利了,将之前获取到的 pred(批改之前的尾结点)的后置节点批改为以后新建的 node 节点
pred.next = node;
// 返回批改胜利的新的节点(此时此新建的这个 node 节点为新的 tail 节点)return node;
}
}
// 如果获取到的 tail 节点为 null,(阐明期待队列中没有元素),// 或者以后 Pred 指针和 Tail 指向的地位不同(阐明被别的线程曾经批改),//enq()中解决了以上两种状况
enq(node);
return node;
}
如果没有被初始化,须要进行初始化一个头结点进去。但请留神,初始化的头结点并不是以后线程节点,而是调用了无参构造函数的节点。如果经验了初始化或者并发导致队列中有元素,则与之前的办法雷同。其实,addWaiter 就是一个在双端链表增加尾节点的操作,须要留神的是,双端链表的头结点是一个无参构造函数的头结点。
private Node enq(final Node node) {for (;;) {
// 获取以后的尾结点指向到 t
Node t = tail;
// 如果此时尾结点为空
if (t == null) { // Must initialize 必须要初始化一个头结点
// 利用 CAS 初始化一个头部(调用无参结构,头肩点外部都为 null)if (compareAndSetHead(new Node()))
// 将尾结点设置为头节点(此时就一个节拍板结点和尾结点是一个)tail = head;
} else {
// 如果尾结点不为 null, 将新建的节点 node 的前置节点设置为 t
node.prev = t;
// 通过 CAS 批改尾结点,将尾结点指向为新的 node,
// 这个办法次要是对 tailOffset 和 Expect 进行比拟,如果 tailOffset 的 Node 和 Expect 的 Node 地址是雷同的,// 那么设置 Tail 的值为 Update 的值。if (compareAndSetTail(t, node)) {// 批改胜利,将之前的尾结点(当初的倒数第二个节点)的后置节点设置为新的尾结点(node)
// 返回倒数第二个节点
t.next = node;
return t;
}
}
}
}
何时出列 acquireQueued()
通过下面的 addWaiter()办法咱们能够晓得,通过将一个新建的 Node 节点放入到队列里等待获取锁的资格,那么进入队列之后,怎么能力获取到锁呢?接下来咱们来看看线程怎么获取锁。
总的来说,一个线程获取锁失败了,被放入期待队列,acquireQueued 会把放入队列中的线程一直去获取锁,直到获取胜利或者不再须要获取(中断)。
final boolean acquireQueued(final Node node, int arg) {
// 标记是否胜利拿到资源
boolean failed = true;
try {
// 标记是否被中断过
boolean interrupted = false;
// 开始自旋
for (;;) {
// 获取以后节点的前置节点
final Node p = node.predecessor();
// 如果前置节点是头节点,阐明以后节点是第一个无效节点(头节点是虚节点),开始尝试获取锁
if (p == head && tryAcquire(arg)) {
// 获取到锁,头指针挪动到以后 node
//setHead 办法是把以后节点置为虚节点,但并没有批改 waitStatus,因为它是始终须要用的数据。setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 阐明是 p 不为头结点 或者 p 为头节点且以后没有获取到锁(可能是非偏心锁被抢占了),// 这个时候就要判断以后 node 是否要被阻塞(被阻塞条件:前驱节点的 waitStatus 为 -1),避免有限循环浪费资源。if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {if (failed)
cancelAcquire(node);
}
}
/** 判断以后 node 是否要被阻塞 **/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前置节点的阻塞状态
int ws = pred.waitStatus;
// 如果 Node.SIGNAL -1 状态,阐明前置节点的状态为唤醒状态
if (ws == Node.SIGNAL)
// 能够间接 park(间接阻塞)return true;
//waitStatus>0 是勾销状态 如果前置节点是勾销状态
if (ws > 0) {
// 循环向前查找勾销节点,把勾销节点从队列中剔除
do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 设置前置节点期待状态为 SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
CANCELLED 状态节点生成 cancelAcquire()
private void cancelAcquire(Node node) {
// 将有效节点过滤
if (node == null)
return;
// 设置该节点不关联任何线程,也就是虚节点
node.thread = null;
Node pred = node.prev;
// 通过前驱节点,跳过勾销状态的 node
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取过滤后的前驱节点的后继节点
Node predNext = pred.next;
// 把以后 node 的状态设置为 CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果以后节点是尾节点,将从后往前的第一个非勾销状态的节点设置为尾节点
// 更新失败的话,则进入 else,如果更新胜利,将 tail 的后继节点设置为 null
if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 如果以后节点不是 head 的后继节点,1: 判断以后节点前驱节点的是否为 SIGNAL,2: 如果不是,则把前驱节点设置为 SINGAL 看是否胜利
// 如果 1 和 2 中有一个为 true,再判断以后节点的线程是否为 null
// 如果上述条件都满足,把以后节点的前驱节点的后继指针指向以后节点的后继节点
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果以后节点是 head 的后继节点,或者上述条件不满足,那就唤醒以后节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
以上从 ReentrantLock 的获取锁,上锁来对 AQS 的获取锁过程做了介绍,后续会持续将 AQS 开释锁的原理做介绍,未完待续 …
参考
- 从 ReentrantLock 的实现看 AQS 的原理及利用
- Java 并发包基石 -AQS 详解