共计 10062 个字符,预计需要花费 26 分钟才能阅读完成。
一、ReentrantLock 与 AQS 简介
在 Java5.0 之前,在协调对共享对象的拜访时能够应用的机制只有 synchronized 和 volatile。Java5.0 减少了一种新的机制:ReentrantLock。ReentrantLock 并不是一种代替内置加锁的办法,而是作为一种可抉择的高级性能。ReentrantLock 实现了 Lock 接口,提供了一种无条件的、可轮询的、定时的以及可中断的锁获取操作,所有加锁和解锁办法都是显式的。
咱们根本不会间接应用 AQS,AQS 是一个构建锁和同步器的框架,许多同步器都能够通过 AQS 很容易高效的结构进去,根本可能满足绝大多数状况的需要。不仅 ReentrantLock,Semaphore、CountDownLatch、ReentrantReadWriteLock、FutureTask 也是基于 AQS 构建的。AQS 解决了实现同步器的大量细节,期待线程采纳 FIFO 队列操作程序;还负责管理同步器类中的状态,能够通过 getState,setState 以及 compareAndSetState 办法来操作。
二、ReentrantLock 应用示例
public class ReentrantLockTest1 {public static ReentrantLock lock = new ReentrantLock();
int count = 0;
public void run (){lock.lock();// 加锁
try {for(int i = 0; i < 1000; i++){count++;}
} finally {lock.unlock();// 开释锁
}
}
}
以上代码通过 lock 来实现 count++ 的原子操作,lock.lock() 用来获取锁,lock.unlock() 用来开释锁。那么多线程下,如何保障同步操作?如何开释锁?如何判断没有竞争到锁的线程处于期待状态?什么时候唤醒期待线程?
三、ReentrantLock 同步类实现(以下为外围代码摘录)
ReentrantLock 是独占锁,有偏心锁和非偏心锁的策略实现。贴上 ReentrantLock 的源码来看下,具体看正文吧。
package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.Collection;
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
private final Sync sync; // 同步器提供所有的实现机制, 属于外部类,供外部调用
/**
* 自定义一个同步器类,从它派生一个偏心和非偏心版本。其中 state 为持有锁的次数。*/
abstract static class Sync extends AbstractQueuedSynchronizer {// 这就是传说的 AQS,上面会具体提到
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();// 供外部类实现
/**
* 获取非偏心锁,供 tryAcquire 调用
* 如果锁没有被其余线程持有,则获取锁并立刻返回将锁同步状态 state 设置为 1。* 如果以后线程持有锁,则同步状态 state + 1 且立刻返回。* 如果锁由另一个线程持有,则以后线程成为禁用线程调度目标和处于休眠状态,直到取得锁,此时锁持有计数设置为 1
*/
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {// 没有线程持有锁时
if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);// 设置为独占资源
return true;
}
}
else if (current == getExclusiveOwnerThread()) {// 以后线程持有锁时
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);// 设置同步状态值 state
return true;
}
return false;
}
/**
* Sync 重写 AQS tryRelease 办法
* 如果以后线程是锁的持有者,则同步状态 state -1, 如果 state=0,锁开释;
* 如果以后线程不是锁的持有者,抛出异样 IllegalMonitorStateException
*/
protected final boolean tryRelease(int releases) {int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// 以后线程是否独占资源
protected final boolean isHeldExclusively() {return getExclusiveOwnerThread() == Thread.currentThread();}
......
}
//----- 以下办法的实现都依赖继承于同步器 Sync 的实现!-----------
/**
* 非偏心锁对象
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 如果锁没有被其余线程持有,则获取锁并立刻返回将锁同步状态 state 设置为 1。final void lock() {if (compareAndSetState(0, 1)) //state 为 0 才设置为 1,不可重入!setExclusiveOwnerThread(Thread.currentThread());// 设置为以后线程独占资源
else
acquire(1);
}
// 非偏心锁版本下重写 AQS 的 tryAcquire
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}
}
/**
* 偏心锁对象
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {acquire(1);
}
/**
* 偏心锁版本下重写 AQS 的 tryAcquire
*/
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
public ReentrantLock() {sync = new NonfairSync();
}
/**
* 创立 ReentrantLock 非偏心或偏心锁实例
*/
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
/**
* 获取锁办法, 供内部服务调用
*/
public void lock() {sync.lock();
}
/**
* 尝试开释锁,供内部服务调用
*/
public void unlock() {sync.release(1);
}
/**
* 是否有线程持有该锁,有返回 true,无则返回 false,供内部服务调用
*/
public boolean isLocked() {return sync.isLocked();
}
......
}
ReentrantLock 的 lock、unlock、isLocked 办法在实现时要依赖自定义的 Sync 外部类(Sync 依赖于 AQS),Sync 类提供了根本的同步器机制的实现,比方加锁办法 nonfairTryAcquire 和解锁办法 tryRelease,还派生出两个偏心策略的类如 FairSync 和 NonfairSync。在 sync 通过实现 state 的值来判断是否能够容许获取和开释,根本通过 getState,setState 以及 compareAndSetState 来操作的同步器的状态。至于线程的排队、期待、唤醒等,AQS 都曾经实现好了。
通过这大节咱们晓得同步器是基于 AQS 构建的,实现了 AQS 最根本的操作包含各种模式的获取操作和开释操作。依据同步器不同,AQS 获取操作能够是一种独占操作(ReentrantLock), 也能够是非独占操作(Semaphore、CountDownLatch)。如果同步器反对独占获取操作,那么须要实现一些爱护办法:tryAcquire、tryRelease 和 isHeldExclusively 等,而对于共享获取的同步器,则应该实现 tryAcquireShared 和 tryReleaseShared 等办法。那么 AQS 如何做进行线程排队、期待、唤醒呢?
四、AQS 源码解析
AQS 解决了实现同步器的大量细节,期待线程采纳 FIFO 双向队列操作程序。本节摘录 AQS 独占模式下的两个入口办法来开展:acquire(获取锁)和 release(开释锁)
1. 节点 node
AQS 外部保护了一个 FIFO 双向期待队列构造,head 是队列头节点,tail 是队列尾节点。双向链表构造里有两个指针,pre 指针指向后任节点,next 指针指向后继节点。当线程竞争失败时会创立一个包含此线程的 Node 节点退出期待队列尾部。期待队列如下图:
2. 加锁
4.2.1 acquire 办法
acquire 是独占模式下获取锁的入口办法,acquire 源码如下:
public final void acquire(int arg) {if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
acquire 办法流程:
- 1、tryAcquire 在第 3 节提到,如果没有线程持有锁或者是以后线程持有锁时,会获取资源间接返回;
- 2、如果锁被其余线程持有,会调用 addWaiter 办法结构 Node 节点退出期待队列尾部;
- 3、acquireQueued 办法来以独占的不可中断的形式获取曾经在队列中的线程,如果自旋过程被中断过,返回 true,否则返回 false;
- 4、如果自旋过程中被中断过,acquireQueued 办法不会停止,会在最终获取线程后,再调用 selfInterrput(),中断以后线程;
- 5、如果在获取队列中的线程的过程中出现异常且获取锁失败时,会勾销线程获取操作(把该线程节点的 waitStatus 标记为 CANCELLED,如果是尾节点将其移出队列;如果不是将会跳过此节点指向下一个节点)。
上面具体挖下 acquire 办法里的几个办法:
4.2.1.1 addWaiter
// 为以后线程以给定模式创立排队节点。Node.EXCLUSIVE 为独占模式, Node.SHARED 共享模式
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);
// 间接入队尾,这种状况表明队列中已有期待的节点
Node pred = tail;
if (pred != null) {
node.prev = pred;// 把以后节点后任节点指向队尾节点
if (compareAndSetTail(pred, node)) {//cas 增加到同步队列
pred.next = node;// 把旧队尾的后置节点指向以后节点, 此时以后节点变成新的队尾节点
return node;
}
}
enq(node);//tail 为 null 时,走 enq 办法初始化队列(刚开始)或者增加到队尾
return node;
}
4.2.1.2 enq
private Node enq(final Node node) {for (;;) {// 保持一直的自旋,直到退出队列
Node t = tail;
if (t == null) { // 刚开始队列为空,设置新节点为 head 节点,并把 tail 节点也指向它
if (compareAndSetHead(new Node()))
tail = head;
} else {// 失常退出队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
4.2.1.3 acquireQueued
通过 addWaiter 设置好队列后,下一步就看线程如何获取锁了。acquireQueued 办法就是以独占的不可中断的形式获取曾经在队列中的线程。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;// 标记是否胜利获取锁
try {
boolean interrupted = false;// 标记是否中断过
for (;;) {// 自旋取得锁
final Node p = node.predecessor();// 获取后任节点
if (p == head && tryAcquire(arg)) {// 节点为 head 时才有资格去获取锁
setHead(node);// 旧 head 节点曾经拿到锁,把以后节点高为新 head
p.next = null; // 不便原 head 节点 GC(setHead 中曾经把 node.prev=null, 这标记着没有指针指向 head 了,而 head 也没有后继指针了,能够回收了)failed = false;// 胜利取得锁
return interrupted;
}
// 如果没有拿到锁,依据后任节点 waitStatus 状态判断是否须要挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())// 执行挂起,判断以后线程是否有中断标记
interrupted = true;
}
} finally {// 如果在获取队列中的线程的过程中出现异常且锁获取失败,会勾销线程获取操作(把该线程节点的 waitStatus 标记为 CANCELLED,如果是尾节点将其移出队列;如果不是将会跳过此节点指向下一个节点)if (failed)
cancelAcquire(node);
}
}
以上代码显示出,只有 head 节点有资格去获取锁,获取锁胜利后会把以后节点设置为新 head,同时让 GC 回收旧的 head 节点对象。如果在获取队列中的线程的过程中出现异常且锁获取失败,会勾销线程获取操作(把该线程节点的 waitStatus 标记为 CANCELLED,如果是尾节点将其移出队列;如果不是将会跳过此节点指向下一个节点)。那如果不是头节点或者别的线程依然没有开释锁怎么办呢?还需看 shouldParkAfterFailedAcquire 办法和 parkAndCheckIntrrupt 办法。
4.2.1.4 shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;// 拿到后任节点状态
if (ws == Node.SIGNAL)// 如果是 SIGNAL 状态,能够失常阻塞了,因为以后节点晓得会被失常告诉 / 唤醒
return true;
if (ws > 0) {// 后任状态为 CANCELLED 时(只有 CANCELLED 状态 >0)do {// 有限次循环, 直到找到失常的后任节点状态后跳出循环
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果后任节点为 0 or PROPAGATE 状态,设置后任为 SIGNAL 状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
整个流程中,如果后任结点的状态不是 SIGNAL,那么必须保障后任为 SIGNAL,以便本人被失常唤醒。
4.2.1.5 parkAndCheckInterrupt
LockSupport.part(this) 将线程挂起到 waittng 状态,真正进入阻塞状态,它须要 uppark()、interrupt() 办法来中断它,以此实例 FIFO 队列阻塞操作
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);// 进入阻塞状态
return Thread.interrupted();// 以后线程是否被中断,Thread.interrupted() 会革除以后线程的中断标记位(如果以后线程已中断,第一次调用这个办法的返回值是 true,第二次调用这个办法的返回值为 false)}
看完 shouldParkAfterFailedAcquire() 和 parkAndCheckInterrupt(),再答复下面 acquireQueued 办法起初提出的问题(如果不是头节点或者别的线程依然没有开释锁怎么办呢?):没有取得锁的线程会去查看下本人的后任节点是否是 SIGNAL 状态,在保障后任节点为 SIGNAL 状态下,本人真正进入到 waiting 状态,筹备被唤醒。唤醒的办法能够是 uppark()、interrupt(),唤醒后再次尝试获取锁,以此类推,直到获取锁为止。获取锁胜利后会把以后节点设置为新 head,同时让 GC 回收旧的 head 节点对象。在此过程中还会判断线程有没有被中断过,如果被中断过一次,会在 acquire 办法中自我中断(锁开释时会中断线程,中断的操作比较复杂,请另行参考线程中断的文章)。
3. 开释锁
4.3.1 release 办法
下面摸索了如何加锁及队列阻塞的操作,上面再看看如何开释锁。如上第二节中锁开释调用 unlock(),这个办法会进入 AQS 的 release 办法。AQS release 是独占模式下开释锁的入口办法,release 源码如下:
public final boolean release(int arg) {if (tryRelease(arg)) {// 如果 state= 0 时,能够开释锁
Node h = head;
if (h != null && h.waitStatus != 0)//waitStatus 为 0 状况,只有在新建节点时才会初始化为 0,而新建的节点没有后继节点,不须要执行唤醒
unparkSuccessor(h);// 唤醒期待队列里的下个线程
return true;
}
return false;
}
release 办法流程:
- 1、tryRelease 在第 3 节提到,如果以后线程是锁的持有者,则同步状态 state -1, 如果 state=0,锁才会开释;如果以后线程不是锁的持有者,抛出异样 IllegalMonitorStateException
- 2、unparkSuccessor 办法用于唤醒期待队列中下一个线程。
- 3、为什么 h.waitStatus != 0 时才会执行唤醒 unparkSuccessor 办法呢?head 节点的 waitStatus= 0 会怎么样?waitStatus 为 0,只有在 addWaiter 办法中新建节点时才会初始化为 0,其余节点曾经在 shouldParkAfterFailedAcquire 办法里调整过了(参考 4.2.1.4,那时 head 的 waitStatus=SIGNAL)。而新建的节点没有后继节点,也就不须要唤醒了。
4.3.1.1 unparkSuccessor 办法
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);//ws 小于 0,将设置头节点 watistatus 为 0
Node s = node.next;// 唤醒对象为后继节点
if (s == null || s.waitStatus > 0) {// 后继节点为空或勾销场景
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)// 从队尾向前找 waitstatus <= 0 的节点
if (t.waitStatus <= 0)// 留神此处循环没有 return,会始终往前找,找到离 head 最近的节点
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);// 唤醒后继节点
}
unparkSuccessor 首先会唤醒 head 的下一个节点,如果将被唤醒节点不为空,则间接通过 unpart() 办法来开释挂起的线程;如果 head 后继节点为空,则从后向前找,最初找到离 head 最近的节点。再联合 acquireQueued() 办法进入 if (p == head && tryAcquire(arg)) 的判断来获取锁。因为拿不到锁的线程都会自旋,所以不必放心进不了这个判断 if (p == head && tryAcquire(arg)),直到拿到锁完结。
到这里可能会有疑难,为什么后继节点要从队尾开始向前找呢,从前向后找不是更快吗?也就是说为什么循环中是 t =tail & t=t.prev(反向),而不是 t =head & t=t.next(正向)?返回到新建期待节点 addWaiter 办法看到,compareAndSetTail 办法把新建的 node 增加到同步队列前曾经设置了节点的 prev,而后任节点的 next 指向新节点是在增加到队列之后。这就阐明在 unparkSuccessor 办法来唤醒时,可能队尾节点还没来得及执行 pred.next=node; 这句话,t.next 正向找后继节点就会漏掉这个新加的节点,用 t.prev 反向找更牢靠。
if (pred != null) {
node.prev = pred;// 把以后节点后任节点指向队尾节点
if (compareAndSetTail(pred, node)) {//cas 增加到同步队列
pred.next = node;// 把旧队尾的后置节点指向以后节点, 此时以后节点变成新的队尾节点
return node;
}
}
五、ReentrantLock 独占模式整体流程图
以上时整个独占模式下锁获取和开释的流程图,供本人参考。
总结
1、本文通过源码追踪的形式解析了本人对 ReentrantLock 和 AQS 的了解。ReentrantLock 是 AQS 的实现。AQS 设计节点 waitStatus、队列状态 status 等状态属性,保护 FIFO 队列对线程节点进行阻塞以及独占锁策略的设计等诸多细节。
2、ReetrantLock 通过自定义同步器并通过 state 的值来判断是否能够容许获取锁、开释锁或进入队列。
3、AQS 队列中又通过 waitStatus 状态判断是否线程进入阻塞状态和唤醒线程。
4、AQS 保护了一个同步队列,没有获取锁的线程会进行同步队列且会不停的自旋直到该线程的节点成为头部节点且取得了锁才进行自旋。
5、开释锁时会唤醒头节点的后继节点。
最初
欢送关注公众号:前程有光,支付一线大厂 Java 面试题总结 + 各知识点学习思维导 + 一份 300 页 pdf 文档的 Java 外围知识点总结!