共计 8701 个字符,预计需要花费 22 分钟才能阅读完成。
前言
在 java 中重入锁有两个,一个是 synchronized 关键字,是 C ++ 实现的,而另外一个就是本篇文章中的配角儿 ReentrantLock。ReentrantLock 是在 jdk 1.5 版本后引入的 juc 包中一个比拟重要的工具类,由驰名的并发巨匠 Doug Lea 编写;接下来就让咱们一起来学习一下它吧。
AQS
在学习 ReentrantLock 之前,咱们必须先学下 AQS,它是一个实现锁和一些相干的同步器如(semaphores,events)的框架。它外部是一个独占线程,同步状态 state,FIFO 的期待队列和提供原子 CAS 办法的 UnSafe 对象组成。它的类图关系如下
外部组成图
FIFO 期待队列
AQS 外部的先进先出期待队列是由双向链表实现,这个链表是存在头结点的,即蓝色中的那个节点外部是没有线程的。而 T1 和 T2 节点中的线程会处于自旋获取锁的状态中,当然在自旋肯定的工夫后还获取不到锁,则会进入阻塞状态。
Node 节点
Node 节点有两种模式,一种共享模式,另外一种是独占模式。它外部比拟值得的咱们关注的是 waitStatus 这个成员变量,它一共有 5 中值,别离是 -3,-2,-1,0,1。在本文中咱们不探讨 -3,咱们先探讨 -1,0,1 这三个状态。
static final class Node {
/** 共享模式的节点 */
static final Node SHARED = new Node();
/** 独占模式的节点 */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
/** waitStatus 状态值为 1 时,阐明这个线程将被勾销不执行了 */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
/** waitStatus 状态值为 - 1 时,阐明这个节点有任务去唤醒它后继节点 */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
/** waitStatus 状态值为 - 2 时,阐明这个线程在 condition 队列中阻塞期待 */
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {return nextWaiter == SHARED;}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
同步器
在介绍完上述的 AQS 后,咱们看下 ReentrantLock 外部的同步器,它继承自 AQS。而 Sync 这个抽象类有两种具体实现,别离是非偏心同步器和偏心同步器。当咱们须要应用非偏心锁的时候,则在 ReentrantLock 的构造函数中将 sync 赋值为非偏心同步器,须要应用偏心锁时,则 sync 赋值为偏心同步器。而 ReenrantLock 默认是应用非偏心锁的。
/** 同步器 */
private final Sync sync;
/** 构造函数 */
public ReentrantLock() {sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
那非偏心锁和偏心锁有何区别呢,非偏心锁模式下即意味着新的线程不须要到期待队列中阻塞也能够和期待队列中刚被唤醒的线程中竞争,即新线程能够插队。而偏心锁模式下即意味着新线程必须去排队。
非偏心锁
在介绍完同步器后,咱们以非偏心同步器为例子,去介绍一个线程从开始到阻塞过程。假如第一次有 T1 线程去尝试去获取锁,而它第一次就取得了锁。它就将 exclusiveOwnerThread 设置为本身,即此时exclusiveOwnerThread = T1,state = 1
。
static final class NonfairSync extends Sync {
/* 获取锁办法 */
final void lock() {
// 第一次取得锁胜利
if (compareAndSetState(0, 1))
// 设置独占线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 获取锁失败
acquire(1);
}
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}
}
紧接着有 T2 线程来获取锁,但此时 state 为 1,获取锁失败。则会进入 acquire 办法尝试取得锁。首先它会进行第一次取得锁,而 tryAcquire 办法是由 AQS 提供的模板办法。而这里调用的是非偏心同步器的实现。
public final void acquire(int arg) {if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
在非偏心获取锁办法中,会进行 CAS 替换,替换胜利,则设置独占线程。
而当以后线程是本人的时候,也即是重入锁。会将 state 这状态进行叠加。
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
return true;
}
}
/* 重入锁的状况 */
else if (current == getExclusiveOwnerThread()) {
/** state 状态进行叠加 */
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
/** 设置状态 */
setState(nextc);
return true;
}
return false;
}
如果 T2 线程第一次尝试获取锁失败,则要进入期待队列中,即进入 addWaiter 办法,而在队列初始化的时候,首先调用的是 enq 办法。
/*
这是第初始化的队列的办法,也即是增加第一个线程
*/
private Node enq(final Node node) {
// 死循环:执行 2 次
for (;;) {
// 第一次 t ->NULL 和 tail->NULL
// 此时 head->NULL
// 第二次 t->[__],t 也指向空节点
Node t = tail;
// 第一次判断成立
// 第二次不成立了
if (t == null) { // Must initialize
// 创立一个空节点,让 head 指针指向它
//head->[___]
if (compareAndSetHead(new Node()))
//head—>[__],tail->[__]
//tail 和 head 都指向空节点
tail = head;
} else {/* 第二次就到这, head->[__]<-node.prev
^
|
tail,t
编程下面这样 */
node.prev = t;
// 而后 node 编程尾结点
if (compareAndSetTail(t, node)) {
// 再将空节点的 next 指向 node
t.next = node;
// 初始化实现
return t;
}
}
}
}
enq 办法中,node 会进入一个 for 的死循环中,直到变成尾结点。咱们这里假如它的 CAS 操作第一次就胜利,则总共须要两次 CAS 操作。
在第一次循环时,因为 tail 尾指针没有初始化,指向为 NULL。则会进入第一个 if 语句块,此时 CAS 操作是将 head 指针指向一个不蕴含线程的虚结点,即头结点。而后 tail 指针也指向这个头结点。
而第二次循环,因为 tail 不为 NULL 了,则进入 else 语句块,node 结点将 prev 指针指向虚节点。随后 CAS 更新 tail 指针,将 tail 指向 node。
最初就是将 t 的下一个结点指向 node,至此队列初始化实现。
咱们假如在此同时,有线程 T3 也去竞争锁,但竞争失败,调用了 addWaiter 办法,而此时队列曾经初始实现,它不会再调用 enq 办法入队,而是在 addWaiter 办法中入队。变成如下图的样子。具体过程看代码正文辣,不再具体画了。
// 增加期待线程后, 返回这个 node
private Node addWaiter(Node mode) {
// 创立一个节点,设置外面的线程为以后线程
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 游标 pred 设置尾结点
Node pred = tail;
// 即这个双端队列曾经有元素了
if (pred != null) {
//
node.prev = pred;
//cas 将 node 设置为尾结点
if (compareAndSetTail(pred, node)) {
//pred 还停留在尾结点的上一个节点,所以将它的 next 指向 node
pred.next = node;
return node;
}
}
// 就加第一个元素时,会执行这个办法
enq(node);
return node;}
此时,T2 和 T3 线程尽管曾经入队,但它们并没有进行阻塞,会在 acquireQueued 中进行自旋,去第三次尝试取得锁。咱们假如第一种状况,即 T2 并没有获取锁胜利。则他们都会进入语句中。整段代码看上面。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
此时线程如下图。
咱们来对 shouldParkAfterFailedAcquire(p, node)
进行剖析。在这个办法中有三种状况。
- SIGNAL:节点状态为 -1
- CANCELLED:节点状态为 1
- 默认为 0:节点状态为 0
而此时 T2,T3 节点状态都为 0,则会进入 else 语句块,最初只剩 T3 状态变为 0
此时 T2 和 T3 都会调用 parkAndCheckInterrupt()
办法进行阻塞,不再自旋。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
/** 获取前驱节点的状态 */
int ws = pred.waitStatus;
/** 如果前驱节点状态为 -1,则会进入阻塞 */
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 将前驱节点的期待状态更新为 -1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
而在 shouldParkAfterFailedAcquire
办法中还有一种状况值得咱们留神的是前驱期待状态为 CANCELLED,此时它做了什么呢?
do {
/*
将这个 node 的前驱和第一个不为勾销状态的节点进行相连
pred = pred.prev;
node.prev = pred;
*/
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 第一个不为勾销状态的节点和这个 node 节点相连
pred.next = node;
此时它做的事件是,找到第一个不为勾销状态的节点,而后将这个为勾销状态的前驱节点断开分割。
咱们假如此时状况如下,T5,T6 刚加进队列,T3 和 T4 因为自旋获取锁中出现异常而进入 finally 块中调用勾销办法。这里咱们还要分明一点,就是线程退出期待队列中,并不会立马进入阻塞,它们还是会在自旋获取锁的。所以 T5 线程的前驱节点 T4 线程的状态是 1,即会找到第一个期待状态不为 1 的节点。也就是 T2。
node.prev = pred = pred.prev;
而后将前驱连贯至 T2
跳出 while 循环后,会将 T2 节点的 next 指针指向 T5。
在咱们了解完 shouldParkAfterFailedAcquire
这个办法后。来看下 acquireQueued 办法中做了啥事。其实也很简略,就是如果如果队列的节点的前驱节点是 head 节点,如果获取锁胜利,则调用 setHead
办法开释这个线程。
// 而这个办法是,让刚阻塞的线程满足它前驱节点是头结点的状况下,还有一次开释的机会。final boolean acquireQueued(final Node node, int arg) {
// 默认是 failde 是 true,这个标记是判断是否须要因为在自旋中出现异常而进行线程去程操作
boolean failed = true;
try {
// 阻塞是 false 失败的
boolean interrupted = false;
// 死循环,这个死循环是始终自旋,取得锁用的
for (;;) {
// 拿到刚阻塞的线程的前驱节点
final Node p = node.predecessor();
// 如果它的前驱节点是那个头节点和获取锁胜利的状况下
if (p == head && tryAcquire(arg)) {setHead(node);
// 头节点和 node 勾销关联
p.next = null; // help GC
// 获取锁胜利,就将 failde 设置为 false
failed = false;
// 返回 Interrupted 判断,即阻塞失败了
return interrupted;
}
// 这个是获取锁失败了,他就会返回 true 进行阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//
interrupted = true;
}
} finally {if (failed)
cancelAcquire(node);
}
}
而 setHead 办法是怎么开释线程的呢?咱们能够简略的看一下。
// 开释这个线程,这个办法是
/** 让 head 指针指向 node,将 node 里的线程开释后,这个 node 就成了
不蕴含线程的头结点,再将 prev 设置为 null。和后面的头节点勾销关联
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
在后面咱们也提到了线程会因为在自旋中因为一场等起因获取锁失败,转而进入勾销办法中cancelAcquire
,咱们来看下这个办法,具体解释在正文里,首先咱们先总体理解下这个办法
private void cancelAcquire(Node node) {
// 如果节点为 null,则返回
if (node == null)
return;
// 将 thread 革除
node.thread = null;
// 上面这步示意将 node 的 pre 指向之前第一个非勾销状态的结点(即跳过所有勾销状态的结点),waitStatus > 0 示意以后结点状态为勾销状态
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
// 将以后节点设置为勾销状态
node.waitStatus = Node.CANCELLED;
// 如果以后要勾销的节点是尾节点,则将第一个为勾销状态的节点设置为尾结点
if (node == tail && compareAndSetTail(node, pred)) {
// 并将第一个为费勾销状态的节点设置为 null
compareAndSetNext(pred, predNext, null);
} else {
// 这种状况是咱们要考虑一下的,因为要勾销的节点在两头
//
int ws;
/**
如果该节点往前找的第一个非勾销状态的节点不是头结点
且期待状态是 SIGNAL
或者
*/
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
// 取得以后要勾销节点的下一个节点
Node next = node.next;
if (next != null && next.waitStatus <= 0)
// 将第一个期待状态为非勾销状态的节点的后驱节点换成 next
compareAndSetNext(pred, predNext, next);
} else {unparkSuccessor(node);
}
// 最初将尾结点连贯本身,帮忙 Gc
node.next = node; // help GC
}
}
在咱们整体看完这个办法后,咱们能够分步看下这办法怎么做的。
首先是始终往前找,找到第一个不为勾销状态的节点。将本人的前驱连贯到它,而后将本人的状态设置为勾销状态。
// 始终往前找,找到第一个不为勾销状态的节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
找到当前,这个要勾销的节点不是尾节点,即是两头节点,如果是两头节点,它就要将第一个不为勾销状态的节点设为 singal,因为只有这样状态的前驱节点能力阻塞前面队列中的节点线程。如果设置不胜利,它就会唤醒前面的节点。
//
int ws;
/**
如果该节点往前找的第一个非勾销状态的节点不是头结点
且期待状态是 SIGNAL, 或者能把它 cas 换成 signal 且这个节点没有被唤醒
为什么要这样做呢?因为后面也晓得,一个前面的节点要被阻塞,须要后面的节点状态是 signal
*/
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
// 取得以后要勾销节点的下一个节点
Node next = node.next;
if (next != null && next.waitStatus <= 0)
// 将第一个期待状态为非勾销状态的节点的后驱节点换成 next
compareAndSetNext(pred, predNext, next);
} else {
// 如果做不到,就将后继节点给唤醒
unparkSuccessor(node);
}
// 最初将尾结点连贯本身,帮忙 Gc
node.next = node; // help GC
在讲完 acquireQueued 办法后,咱们能够晓得它其实在这办法里曾经实现了阻塞,但在 acquire 办法里为啥还要阻塞呢?是因为要避免线程中途唤醒而补的一次阻断,但这是怎么的状况?我也不是很分明
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
在叙述完上述一大篇,咱们终于了解残缺个上锁过程了,接下来能够看下锁的开释过程。开释的过程比较简单,将状态减至 0 后,将独占线程设置为 0 即实现了开释。
protected final boolean tryRelease(int releases) {int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}