ReentrantLock, 咱们称之为可重入锁。其中依赖了 AbstractQueuedSynchronizer 类来实现线程的同步。
ReentrantLock 中定义了一个 Sync 的同步类,源码如下:
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
// 形象办法
abstract void lock();
// 非偏心,尝试获取资源
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 独占形式,尝试开释资源,胜利返回 true,失败返回 false
protected final boolean tryRelease(int releases) {int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {return getState() == 0 ? null : getExclusiveOwnerThread();}
final int getHoldCount() {return isHeldExclusively() ? getState() : 0;}
final boolean isLocked() {return getState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
Sync 类继承了 AbstractQueuedSynchronizer,简称 AQS。
AQS 中提供了两种锁:
-
独占锁,同一时刻只容许一个线程取得锁
/** * 独占形式,尝试获取资源,胜利返回 true,失败返回 false * @param arg * @return */ @Override protected boolean tryAcquire(int arg) {return super.tryAcquire(arg); } /** * 独占形式,尝试开释资源,胜利返回 true,失败返回 false * @param arg * @return */ @Override protected boolean tryRelease(int arg) {return super.tryRelease(arg); }
- 共享锁,同一时刻容许多个线程同时取得锁
/**
* 共享形式,尝试开释资源,如果开释后容许唤醒后续期待节点则返回 true,否则返回 false
* @param arg
* @return
*/
@Override
protected boolean tryReleaseShared(int arg) {return super.tryReleaseShared(arg);
}
/**
* 共享形式。尝试获取资源。正数示意失败;0 示意胜利,但没有残余可用资源;负数示意胜利,且有残余可用资源。* @param arg
* @return
*/
@Override
protected int tryAcquireShared(int arg) {return super.tryAcquireShared(arg);
}
外围变量
/**
* 同步状态变量
*/
private volatile int state;
- state > 0:示意有线程曾经抢占到资源,然而并未开释,在重入的状况下 state 的值可能大于 1
- state = 0:示意以后锁资源处于闲暇状态
// 保障多线程竞争下 state 的原子性
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
ReentrantLock 源码
- 当调用 ReentrantLock.lock() 办法实际上是调用形象动态外部类 sync.lock() 办法。
public void lock() {sync.lock();
}
syanc 有两个具体的实现:
偏心锁,必须依照 FIFO 的规定来拜访锁资源
-
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() {acquire(1); } protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
非偏心锁,能够不依照 FIFO 的规定,间接尝试获取锁资源,默认应用非偏心锁
-
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; final void lock() { // 不论以后线程是否排队,间接通过 CAS 抢占锁资源,如果胜利则示意获取锁,// 否则这调用 acquire(1) 执行锁竞争的逻辑; if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires); } }
acquire(int i) 办法源码
/**
通过 tryAcquire() 办法尝试获取独占锁,如果胜利则返回 true,否则返回 false。如果 tryAcquire() 办法返回 false,则阐明以后锁被占用,只能通过 addWaiter() 办法将以后线程封装成 Node 并增加到 AQS 的同步队列中
acquireQueued() 办法将 Node 作为参数,通过自旋去尝试获取锁
*/
public final void acquire(int arg) {if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
CAS 实现原理
protected final boolean compareAndSetState(int expect, int update) {
// 通过 CAS 乐观锁的形式来做比拟并替换,如果以后内存中 state 的值和预期值 expect 相等,则更新为 update。如果更新胜利则返回 true,否则返回 false。// 这个操作是原子性的,不波及 state 属性,也不会呈现线程平安问题
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
state 属性
state 是 AQS 中的一个属性,它在不同的实现中所表白的含意是不一样的。对重入锁的实现来说,state 示意同步状态,它有如下两个含意。
- 当 state= 0 时,示意无锁状态。
- 当 state>0 时,示意曾经有线程取得了锁,也就是说 state=1,然而因为 ReentrantLock 容许重入,所以当同一个线程屡次取得同步锁的时候,state 会递增,比方重入 5 次,那么 state=5。而在开释锁的时候,同样须要开释 5 次,直到 state= 0 其余线程才有资格取得锁。
nonfairTryAcquire() 办法源码
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();// 获取以后线程
int c = getState();// 获取 state 值
if (c == 0) {// 等于 0 示意无锁
if (compareAndSetState(0, acquires)) {//CAS 比拟并替换 state 的值,胜利则示意获取锁
setExclusiveOwnerThread(current);// 保留以后获取到线程的锁,下次访问此资源不须要再次竞争锁
return true;
}
}
else if (current == getExclusiveOwnerThread()) {// 如果是同一个线程
// 则间接减少重入的次数
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
nonfairTryAcquire() 办法的实现逻辑如下。
判断以后锁的状态,c== 0 示意无锁,在无锁状态下通过 compareAndSetState() 办法批改 state 抢占锁资源。
○ 如果抢占胜利,则返回 true。
○ 如果抢占失败,则返回 false。
current == getExclusiveOwnerThread(),该判断阐明抢占到锁的线程和以后线程是同一个线程,示意线程重入,因而间接减少重入次数并保留到 state 字段中
AbstractQueuedSynchronizer.addWaiter(Node mode)
当 tryAcquire() 办法获取锁失败当前,会先调用 addWaiter() 办法把以后线程封装成 Node 退出同步队列中;源码如下
private Node addWaiter(Node mode) {// 入参 mode 示意以后节点的状态,传递的参数是 Node.EXCLUSIVE,示意独占状态。Node node = new Node(Thread.currentThread(), mode);// 把获取锁失败的线程封装成 Node
Node pred = tail;//tail 在 AQS 中示意队列对尾的,默认为 null
if (pred != null) {// 在 tail 不为 null 的状况下,队列中示意有节点
node.prev = pred;// 把以后线程的 Node 的 prev 指向 tail
if (compareAndSetTail(pred, node)) {// 通过 CAS 把 node 退出 AQS 队列中,也就是设置为 tail
pred.next = node;// 把原来 tail 节点的 next 指向以后 node
return node;
}
}
enq(node);// 当 tail=null 时,把 node 增加到同步队列
return node;
}
将以后线程封装成 Node 并进行存储,后续能够间接从节点中失去线程,再通过 unpark(thread) 办法来唤醒。
通过 pred!=null 判断以后链表是否曾经实现初始化,如果曾经实现初始化,则通过 compareAndSetTail 操作把以后线程的 Node 设置为 tail 节点,并建设双向关联。
如果链表还没初始化或者 CAS 增加失败(存在线程竞争),则调用 enq() 办法来实现增加操作。
enq() 办法
private Node enq(final Node node) {for (;;) {
Node t = tail;
if (t == null) { // 如果为 null 则调用 CAS 初始化。直到胜利初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
该办法采纳了自旋锁来实现同步队列的初始化,并把以后节点增加到了同步队列中。AQS 的整体构造如图:
ReentrantLock 开释锁源码剖析
public void unlock() {sync.release(1);
}
public final boolean release(int arg) {if (tryRelease(arg)) {// 开释胜利
Node h = head;// 获取到 AQS 中的 head 节点
if (h != null && h.waitStatus != 0)
// 如果 head 不为 null 且状态不等于 0,则调用 unparkSuccessor(h) 办法唤醒后续节点
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease(int releases) 通过批改 state 值来开释锁
protected final boolean tryRelease(int releases) {int c = getState() - releases;// 减去开释的次数
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);//
}
setState(c);
return free;
}
独占锁在加锁时状态会加 1,在开释锁时状态回减 1,同一个锁可重入后,可能会递增,呈现 2,3,4,5 这些值, 只有调用 unlock() 办法的次数与调用 lock() 办法的次数相等 ,才会将 ExclusiveOwnerThread 线程设置为空,示意锁开释结束
unparkSuccessor(Node node) 唤醒同步队列中的线程
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;// 获取 head 节点的状态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);// 设置节点状态为 0
Node s = node.next;// 失去 head 节点的下一个节点
if (s == null || s.waitStatus > 0) {
// 如果下一个节点为 null 或者 status>0,则 cancelled 状态
// 通过从尾部节点开始扫描,找到间隔 head 最近的一个 waitStatus<= 0 的节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)// 如果 next 节点不为空,则间接唤醒这个线程
LockSupport.unpark(s.thread);
}
unparkSuccessor() 办法次要有两个逻辑。
- 判断以后节点的状态,如果节点状态已生效,则从 tail 节点开始扫描,找到离 head 最近且状态为 SIGNAL 的节点。
-
通过 LockSupport.unpark() 办法唤醒该节点。
为什么要从 tail 开始往前扫描?
这和 enq() 办法有关系,在 enq() 办法的逻辑中,把一个新节点增加到链表中的逻辑如下。
将新节点的 prev 指向 tail。
通过 CAS 将 tail 设置为新节点,因为 CAS 是原子操作,所以可能保障线程的安全性。
t.next=node,目标是设置原 tail 的 next 节点指向新节点。
如果在 CAS 操作之后、t.next=node 操作之前,存在其余线程调用 unlock() 办法从 head 开始往后遍历,因为 t.next=node 还没执行,所以链表的关系还没有建设残缺,就会导致遍历到 t 节点的时候被中断。而如果从 tail 往前遍历,就肯定不会呈现这个问题。
开释锁的线程继续执行
回到 AQS 中的 acquireQueued() 办法,本来未抢占到锁的线程被阻塞在该办法中,当被阻塞的线程被唤醒后,持续从阻塞的地位开始执行,代码如下。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {final Node p = node.predecessor();// 返回上一个节点
if (p == head && tryAcquire(arg)) {// 再次抢占锁资源
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())// 唤醒,进入下一次循环
interrupted = true;
}
} finally {if (failed)
cancelAcquire(node);
}
}