共计 13342 个字符,预计需要花费 34 分钟才能阅读完成。
前言
在之前咱们曾经对局部 JDK
源码做了介绍:
啃碎 JDK 源码 (一):String
啃碎 JDK 源码 (二):Integer
啃碎 JDK 源码 (三):ArrayList
啃碎 JDK 源码 (四):HashMap
啃碎 JDK 源码 (五):ConcurrentHashMap
啃碎 JDK 源码(六):LinkedList
明天咱们正式开始介绍 juc
包上面的类,也就是和多线程打交道的中央,和锁打交道的类用的比拟的的无非就是 ReentrantLock
和 ReentrantReadWriteLock
等,然而咱们明天要介绍的是 AbstractQueuedSynchronizer
这个抽象类,也就是面试中常常被问到的 AQS
,因为不论是ReentrantLock
还是ReentrantReadWriteLock
以及其它的一些都是基于它实现的,所以很有必要先来理解一下。
注释
AQS 的全称为(AbstractQueuedSynchronizer),咱们能够把它看成一个帮忙咱们实现锁的同步器,它基于 FIFO(先进先出) 的队列实现的,并且外部保护了一个状态变量 state
,通过原子更新这个状态变量即能够实现加锁解锁操作。
来看下 AbstractQueuedSynchronizer
的继承构造
能看到 ReentrantLock
等并不是间接继承 AbstractQueuedSynchronizer
,而是其内部类 Sync
。
接着来看看一些重要的属性:
// 队列的头节点
private transient volatile Node head;
// 队列的尾节点
private transient volatile Node tail;
// 管制加锁解锁的状态变量
private volatile int state;
定义了一个状态变量和一个队列,状态变量用来管制加锁解锁,队列用来搁置期待的线程。
这个 state
变量很重要,用来做状态标识。比方说在 ReentrantLock
外面它 示意获取锁的线程数,如果等于 0 示意还没有线程获取锁,1 示意有线程获取了锁。大于 1 示意重入锁的数量。
留神,这几个变量都要应用 volatile
关键字来润饰,因为是在多线程环境下操作,要保障它们的值批改之后对其它线程立刻可见。
还有咱们的 Node
外部类
static final class Node {
// 标识一个节点是共享模式
static final Node SHARED = new Node();
// 标识一个节点是互斥模式
static final Node EXCLUSIVE = null;
// 标识线程已勾销
static final int CANCELLED = 1;
// 标识后继节点须要唤醒
static final int SIGNAL = -1;
// 标识线程期待在一个条件上
static final int CONDITION = -2;
// 标识前面的共享锁须要无条件的流传
static final int PROPAGATE = -3;
// 以后节点保留的线程对应的期待状态
volatile int waitStatus;
// 上一个结点
volatile Node prev;
// 下一个结点
volatile Node next;
// 以后结点保留的线程
volatile Thread thread;
// 下一个期待在条件上的节点(Condition 锁时应用)Node nextWaiter;
// 是否是共享模式
final boolean isShared() {return nextWaiter == SHARED;}
// 获取前一个节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {}
Node(Thread thread, Node mode) {
// 把共享模式还是互斥模式存储到 nextWaiter 这个字段外面了
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
// 期待的状态,在 Condition 中应用
this.waitStatus = waitStatus;
this.thread = thread;
}
}
下面是一个规范的双向链表构造,保留着以后线程、前一个节点、后一个节点以及线程的状态等信息。属性比拟多,看不懂没关系,前面用到会从新讲一下。
那么在源码外面是如何批改这些变量的呢?其实就是通过咱们之前说的 CAS
来批改,如果不理解的话请参考 一文看懂 CAS
比方说 state
状态变量的批改
// 获取 Unsafe 类的实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 状态变量 state 的偏移量
private static final long stateOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
.......
} catch (Exception ex) {throw new Error(ex); }
}
外围是 compareAndSetState
办法:
protected final boolean compareAndSetState(int expect, int update) {
// 如果以后值等于 except,那么更新成 update
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
既然 AbstractQueuedSynchronizer
是一个抽象类,那么子类要实现哪些接口呢?
比如说用来加锁的 tryAcquire
办法
// 互斥模式下尝试获取锁
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}
能够看到只是抛出了异样,并且值得注意的是该办法并没有定义成形象办法,因为只有实现一部分办法就能够本人手动编写一个锁了,定义成 protect
也是不便子类去实现,除此之外还有
// 互斥模式下尝试开释锁
protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();
}
// 共享模式下尝试获取锁
protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();
}
// 共享模式下尝试开释锁
protected int tryReleaseShared(int arg) {throw new UnsupportedOperationException();
}
// 以后线程是否持有锁
protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();
}
基于 AQS 手动实现一个锁
当初咱们尝试一下基于 AQS
手动实现一个锁:
/*
* 实现 Lock 接口
*/
public class MyLock implements Lock {
private final Sync sync;
public MyLock() {sync = new Sync();
}
// 定义一个外部类 Sync 继承 AbstractQueuedSynchronizer
private static class Sync extends AbstractQueuedSynchronizer {
// 尝试获取独占锁
@Override
protected boolean tryAcquire(int arg) {
// AQS 办法:CAS 更新 state 状态变量
if (compareAndSetState(0, 1)) {
// AQS 办法:设置以后线程为持有锁的线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 尝试开释独占锁
@Override
protected boolean tryRelease(int arg) {if (getState() == 0)
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
// AQS 办法:以后线程已持有锁,能够间接批改 state 值,不须要通过 CAS 批改
setState(0);
return true;
}
// 锁是否已被开释
@Override
protected boolean isHeldExclusively() {return getState() == 1;
}
Condition newCondition() {return new ConditionObject();
}
}
@Override
public void lock() {sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {sync.release(1);
}
@Override
public Condition newCondition() {return sync.newCondition();
}
}
好了,咱们的锁曾经写完了,外围就是应用一个动态外部类 Sync
继承 AQS
, 实现加锁、开释锁等办法,其实咱们相熟的 ReentrantLock
也是这样的实现原理,当初咱们来测试一下:
public class TestLock {private final Lock lock = new MyLock();
private volatile int count = 1;
private static class WorkThread extends Thread {
private final TestLock myLock;
public WorkThread(TestLock myLock) {this.myLock = myLock;}
@Override
public void run() {myLock.execute();
}
}
public void execute() {lock.lock();
try {System.out.println(Thread.currentThread().getName() + "获取到的 count=" + count++);
} finally {lock.unlock();
}
}
public static void main(String[] args) {TestLock myLock = new TestLock();
// 启动 100 个线程
for (int i = 1; i <= 100; i++) {new WorkThread(myLock).start();}
}
}
咱们启动 100 个线程对 count
加一,看看打印后果:
能够看到最初确实是加到了 100,也就是说咱们的锁是可用的。
然而,咱们当初的锁是不可重入锁,学过 ReentrantLock
的同学应该晓得它是可重入锁,也就是在线程持有锁的状况下能够从新取得锁,如果咱们改一下 execute
办法:
public void execute() {lock.lock();
try {System.out.println(Thread.currentThread().getName() + "获取到的 count=" + count++);
if (count == 5) {execute();
}
} finally {lock.unlock();
}
}
当 count == 5 时执行调用本身,看下执行后果:
能够看到线程被阻塞了,因为以后持有锁的线程不能从新获取锁,所以咱们须要对 tryAcquire
和 tryRelease
办法进行革新:
// 尝试获取独占锁
@Override
protected boolean tryAcquire(int arg) {
// 获取以后线程
Thread currentThread = Thread.currentThread();
int state = getState();
if (state == 0) {
// AQS 办法:CAS 更新 state 状态变量
if (compareAndSetState(0, 1)) {
// AQS 办法:设置以后线程为持有锁的线程
setExclusiveOwnerThread(currentThread);
return true;
} else if (currentThread == getExclusiveOwnerThread()) {
// 因为是独占锁, 所以同一时刻只能有一个线程能获取到锁, 如果以后的锁是被以后线程获取过了, 则将状态变量 +1
int newState = state + arg;
// 设置新的状态变量
setState(newState);
return true;
}
}
return false;
}
// 尝试开释独占锁
@Override
protected boolean tryRelease(int arg) {
// 判断以后锁开释是以后线程锁独占的, 如果判断不成立则抛出异样
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
int newState = getState() - arg;
boolean free = false;
if (newState == 0) {
// 如果状态为 0 了则阐明以后线程能够开释对锁的持有了
setExclusiveOwnerThread(null);
free = true;
}
// AQS 办法:以后线程已持有锁,能够间接批改 state 值,不须要通过 CAS 批改
setState(newState);
return free;
}
在 tryAcquire
办法次要加了判断:如果 state
不为 0 的时候, 判断以后线程是否曾经和锁绑定, 曾经绑定的话则将 state+1
同时返回 true
在 tryRelease
办法中次要减少了开释锁的时候是对 state
变量逐次减一当减到 0 的时候才将锁与以后线程绑定的状态去除, 开释锁。
从新运行下曾经不会阻塞了,如果不懂的中央看下正文就明确了。
AQS 源码分析
那么当线程获取不到锁的时候是如何期待的呢?又是什么时候被唤醒的呢?接下来咱们一步步追随源码看看到底做了什么?
独占模式
AQS
有 独占
模式和 共享
模式,首先来看看独占模式,看下 lock
加锁办法:
@Override
public void lock() {sync.acquire(1);
}
这里能够看到并不是调用咱们重写的 tryAcquire
办法,而是调用父类 AbstractQueuedSynchronizer
的办法:
public final void acquire(int arg) {if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
此办法是独占模式下线程获取共享资源的顶层入口。如果获取锁胜利,线程间接返回,否则进入期待队列,直到获取锁为止,且整个过程疏忽中断的影响。
首先是调用 tryAcquire
办法来获取尝试获取锁,跟过来看一下 AQS
的实现:
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}
能够看到该办法定义为 protect
,也就是由咱们子类去实现的,如果获取锁失败的话那么会进入期待队列,来看看addWaiter
办法:
/*
* 将以后线程增加到期待队列的队尾,并返回以后线程所在的节点
*/
private Node addWaiter(Node mode) {
// 以独占模式把以后线程封装成一个 Node 节点
Node node = new Node(Thread.currentThread(), mode);
// 尝试将结点放到队列尾部
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 应用 CAS 把 node 作为尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 尾节点为空或者利用 CAS 把 node 设为尾节点失败时通过 enq 办法进行入队
enq(node);
return node;
}
/*
* 采纳 for 循环自旋的形式把 node 插入到队列中
*/
private Node enq(final Node node) {for (;;) {
Node t = tail;
// 队列为空,须要初始化
if (t == null) {if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- addWaiter 办法用于将以后线程增加到期待队列的队尾,并返回以后线程所在的节点。
enq
办法中采纳了十分经典的自旋
操作,只有通过CAS
把 node 设为尾节点后,以后线程能力退出该办法,否则的话,以后线程一直的尝试,直到能把节点增加到队列中为止。
持续看一下 acquireQueued
办法:
/*
* 通过自旋获取锁
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 默认线程没有被中断过
boolean interrupted = false;
for (;;) {
// 获取该节点的前驱节点
final Node p = node.predecessor();
// 如果前驱节点是头节点并且以后线程获取到锁
if (p == head && tryAcquire(arg)) {
// 设置以后节点为头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 挂起以后线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {if (failed)
// 移除以后节点
cancelAcquire(node);
}
}
该办法比较复杂,咱们来仔细分析一下:
- 首先获取前一个节点,如果前驱节点是头节点的话则尝试获取锁,如果获取锁胜利的话设置以后节点为头节点
- 否则调用
shouldParkAfterFailedAcquire
办法判断是否须要挂起以后线程
shouldParkAfterFailedAcquire
办法从名字也能看进去是当获取锁失败后用来判断是否须要挂起以后线程,实现性能简略的讲就是把以后 node 节点的无效前驱(无效是指 waitStatus
不是 CANCELLED 的)找到,并且将无效前驱的状态设置为SIGNAL
,之后便返回 true 代表马上能够阻塞了。来看看实现代码:
/*
* 判断是否须要挂起以后线程
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 前驱节点曾经设置了 SIGNAL,如果前驱变成了 head,并且 head 的代表线程
* exclusiveOwnerThread 开释了锁,就会来依据这个 SIGNAL 来唤醒本人
*/
return true;
if (ws > 0) {
/*
* 发现传入的前驱的状态大于 0,即 CANCELLED。阐明前驱节点曾经因为超时或响应了中断,而勾销了本人。* 所以须要向前遍历直到找到一个 <= 0 的节点
*/
do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 如果是其余状况,那么 CAS 尝试设置前驱节点为 SIGNAL
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
在 shouldParkAfterFailedAcquire
返回 true 的状况下, 持续看 parkAndCheckInterrupted
办法
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);
return Thread.interrupted();}
调用 LockSupport 的 park
办法挂起以后线程,返回该线程是否被中断过,如果被中断过,间接设置 interrupted = true
。
(LockSupport 类是 Java6 引入的一个类,提供了根本的线程同步原语,有趣味的小伙伴能够去理解一下)
最初来看下 cancelAcquire
办法:
/**
* 勾销以后节点
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
// 把以后节点的线程设为 null
node.thread = null;
// 和后面一样向前遍历找到无效的前驱节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
// 把 node 节点的 ws 设为 CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果 node 是尾节点, 利用 CAS 把前驱节点设为尾节点, 后继节点为 null 不便 GC
if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);
} else {
// 前驱节点不是头结点 && 线程不为空 && waitStatus 为 singal
// 利用 CAS 把 node 的 next 设为 pred 的 next 节点
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// node 是头结点, 唤起它的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
最初如果 acquireQueued
办法返回 true,须要进行自我中断。因为 parkAndCheckInterrupted
办法不响应中断,并且外部调用了 Thread.interrupted
办法革除中断标记位。所以当该办法返回 true(被中断)时,须要手动弥补中断标记位。
static void selfInterrupt() {Thread.currentThread().interrupt();}
流程总结
tryAcquire()
尝试间接去获取锁,如果胜利则间接返回;addWaiter()
将该线程退出期待队列的尾部,并标记为独占模式;acquireQueued()
使线程在队列中期待直到获取锁。如果在整个期待过程中被中断过,则返回 true,否则返回 false。- 如果线程在期待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断
selfInterrupt()
,将中断补上。
独占式锁获取流程
调用同步器的 acquire(int arg)
办法能够获取同步状态,该办法对中断不敏感,即线程获取同步状态失败后进入同步队列,后续对线程进行中断操作时,线程不会从同步队列中移除。获取流程:
- 以后线程通过
tryAcquire()
办法尝试获取锁,胜利则间接返回,失败则进入队列排队期待,通过 CAS 获取同步状态。 - 如果尝试获取锁失败的话,结构同步节点(独占式的
Node.EXCLUSIVE
),通过addWaiter(Node node,int args)
办法, 将节点退出到同步队列的队列尾部。 - 最初调用
acquireQueued(final Node node, int args)
办法,使该节点以死循环的形式获取同步状态,如果获取不到,则阻塞节点中的线程。acquireQueued
办法以后线程在死循环中获取同步状态,而只有前驱节点是头节点的时候能力尝试获取锁(同步状态)(p == head && tryAcquire(arg)
)。
下面看完了加锁的流程,接下来看看是如何开释锁的?
public final boolean release(int arg) {
// tryRelease 由子类实现
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
代码比较简单,tryRelease
和下面一样也是有事子类去实现,如果开释锁胜利的话那么咱们须要调用 unparkSuccessor
办法去唤醒后继节点,看下具体实现:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
// 将以后节点的状态批改为 0
compareAndSetWaitStatus(node, ws, 0);
// 如果间接后继为空或者它的 waitStatus 大于 0(曾经放弃获取锁了),咱们就遍历整个队列,获取第一个须要唤醒的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒节点
LockSupport.unpark(s.thread);
}
代码比较简单,这里就不开展细说了。
共享模式
下面讲完了独占模式,当初来讲下共享模式,所谓 共享模式就是同一个时刻容许多个线程持有锁,比方说 ReentrantReadWriteLock
就是实现了共享模式的AQS
,间接上代码:
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/*
* 共享模式获取锁
*/
private void doAcquireShared(int arg) {
// 退出队列尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {final Node p = node.predecessor();
// head 是曾经拿到锁的节点
if (p == head) {
// 尝试获取锁,返回的 r 是残余的资源数量,如果大于 0 那么须要唤醒后续节点
int r = tryAcquireShared(arg);
if (r >= 0) {
// 将 head 指向本人,还有残余资源能够再唤醒之后的线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {if (failed)
cancelAcquire(node);
}
}
这个办法大抵上看起来和独占模式是很相像的。区别只在于独占模式下,在本办法中获取到资源后,只是将本节点设置为 head 节点。而共享模式下,设置完 head 节点后,还须要判断是否须要唤醒多个线程,看一下如何唤醒线程:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* 什么状况下要唤醒后继结点?* 1. 资源残余数大于 0,有残余资源必定是要唤醒后继结点的
* 2. 头结点不存在。* 3. 头结点状态小于 0,意味着后继节点要求 node(也就是以后 head)唤醒后继结点
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();}
}
doReleaseShared
办法外面才是真正来唤醒线程:
private void doReleaseShared() {for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 因为该办法可能被并发调用,为了防止不必要的唤醒节约,因为通过 cas 来抢占唤醒权力。// 抢占成功者执行真正的后继结点唤醒工作。如果失败则进行重试
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 如果 ws==0,可能是 head 结点曾经没有后继结点,也有可能是因为 head 节点的后继结点唤醒权被其余线程刚抢占胜利。// 如果没有后继结点,显然不须要做任何事件
// 如果是唤醒权被其余线程抢占,则不须要做任何事件。因为唤醒是在队列上进行流传的。所以这里就 cas 将 head 节点的状态值批改为 PROPAGATE。用来表白该线程唤醒操作用意曾经传播。然而会由别的线程真正的执行后续的唤醒动作。同样,如果失败了,则重试。else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
加锁流程
tryAcquireShared
办法也是由子类去实现,然而 AQS 曾经把其返回值的语义定义好了:负值代表获取失败;0 代表获取胜利,但没有残余资源;负数示意获取胜利,还有残余资源,其余线程还能够去获取。所以这里 acquireShared()
的流程就是:
tryAcquireShared()
尝试获取资源,胜利则间接返回;- 失败则通过
doAcquireShared()
进入期待队列park()
,直到被unpark()/interrupt()
并胜利获取到资源才返回。整个期待过程也是疏忽中断的。 doAcquireShared(int)
此办法用于将以后线程退出期待队列尾部劳动,直到其余线程开释锁唤醒本人,本人胜利拿到相应的资源后才返回。
看完加锁的办法,当初来看共享模式下的开释锁办法:
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();
return true;
}
return false;
}
能够看到解锁也是用的 doReleaseShared
办法,代码比较简单这里不再开展细说。
总结
明天无关 AQS
的知识点就介绍到这里,有什么不对的中央请多多指教!