前几篇文章介绍了 AQS(AbstractQueuedSynchronizer)中的独占模式和对 Condition 的实现,这一篇文章来聊聊基于 AQS 框架实现的锁工具:ReentrantLock。
ReentrantLock 是一个可重入的互斥锁,也被称为独占锁,具备与 synchronized 雷同的一些根本行为和语义,但性能更弱小。
本文基于 jdk1.8.0_91
1. 继承体系
public class ReentrantLock implements Lock, java.io.Serializable
ReentrantLock 是 Lock 接口的实现,办法对照如下:
Lock 接口 ReentrantLock 实现
lock() sync.lock()
lockInterruptibly() sync.acquireInterruptibly(1)
tryLock() sync.nonfairTryAcquire(1)
tryLock(long time, TimeUnit unit) sync.tryAcquireNanos(1, unit.toNanos(timeout))
unlock() sync.release(1)
newCondition() sync.newCondition()
可知 ReentrantLock 对 Lock 的实现都是调用外部类 Sync 来做的。
1.1 AQS
Sync 继承了 AQS,也就是说 ReentrantLock 的大部分实现都曾经由 AQS 实现了。
AQS 提供了一系列模板办法,对于互斥锁来说,须要实现其中的 tryAcquire、tryRelease、isHeldExclusively 办法。
java.util.concurrent.locks.AbstractQueuedSynchronizer
// 独占获取(资源数)protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}
// 独占开释(资源数)protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();
}
// 共享获取(资源数)protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();
}
// 共享获取(资源数)protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();
}
// 是否排它状态
protected boolean isHeldExclusively() {throw new UnsupportedOperationException();
}
1.2 Sync
Sync 重写了 AQS 的 tryRelease 和 isHeldExclusively 办法,而 AQS 的 tryAcquire 办法交由 Sync 的子类来实现。
Sync 中还具备了一个形象的 lock 办法,强制子类实现。
java.util.concurrent.locks.ReentrantLock.Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
protected final boolean tryRelease(int releases) {...}
protected final boolean isHeldExclusively() {...}
}
1.3 FailSync/NonfairSync
Sync 具备两个子类 FailSync 和 NonfairSync,对应的是 ReentrantLock 的两种实现:偏心锁(fair lock)、非偏心锁(non-fair lock)。
Sync 中蕴含了一个形象办法 lock 须要子类来实现,设计该形象办法的目标是,给非偏心模式加锁提供入口。
因为偏心锁和非偏心锁的区别,次要体现在获取锁的机制不同:
- 偏心模式,先发动申请的线程先获取锁,后续线程严格排队顺次期待获取锁。
- 非偏心模式,线程首次发动锁申请时,只有锁是可用状态,线程就能够尝试获取锁。如果锁不可用,以后线程只能进入同步队列,以偏心模式排队期待获取锁。
也就是说,非偏心锁只有在以后线程未进入同步队列之前,才能够去抢夺锁,一旦进入同步队列,只能按排队程序期待锁。
2. 构造方法
/** Synchronizer providing all implementation mechanics */
// 提供所有实现机制的同步器
private final Sync sync;
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
为什么默认创立的是非偏心锁?
因为偏心模式下,队列头部的线程从阻塞中被唤醒到真正运行,波及到线程调度和 CPU 上下文的切换,比拟耗时,这个过程中锁处于闲暇状态,浪费资源。
而非偏心模式下,多个线程之间采纳 CAS 来抢夺锁,这两头没有时间延迟,可能进步吞吐量。
3. Lock 接口实现
// 获取锁。void lock()
// 如果以后线程未被中断,则获取锁。void lockInterruptibly()
// 仅在调用时锁未被另一个线程放弃的状况下,才获取该锁。boolean tryLock()
// 如果锁在给定等待时间内没有被另一个线程放弃,且以后线程未被中断,则获取该锁。boolean tryLock(long timeout, TimeUnit unit)
// 试图开释此锁。void unlock()
// 返回用来与此 Lock 实例一起应用的 Condition 实例。Condition newCondition()
3.1 Lock#lock
获取锁:
- 能够取得锁:如果没有其余线程取得锁,则立刻返回,设置持有锁的数量为 1(tryAcquire)。
- 曾经取得锁:如果以后线程曾经持有锁,则立刻返回,设置持有锁的数量加 1(tryAcquire)。
- 期待取得锁:如果其余线程持有锁,则以后线程会进入阻塞直到取得锁胜利(acquireQueued)。
java.util.concurrent.locks.ReentrantLock#lock
/**
* Acquires the lock.
*
* <p>Acquires the lock if it is not held by another thread and returns
* immediately, setting the lock hold count to one.
*
* <p>If the current thread already holds the lock then the hold
* count is incremented by one and the method returns immediately.
*
* <p>If the lock is held by another thread then the
* current thread becomes disabled for thread scheduling
* purposes and lies dormant until the lock has been acquired,
* at which time the lock hold count is set to one.
*/
public void lock() {sync.lock();
}
3.1.1 Sync#lock
在 ReentrantLock 中,Sync 的两个子类 FairSync、NonfairSync 各自实现了 Sync#lock 办法。
偏心锁
java.util.concurrent.locks.ReentrantLock.FairSync#lock
final void lock() {acquire(1);
}
非偏心锁
java.util.concurrent.locks.ReentrantLock.NonfairSync#lock
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
偏心锁的 lock 办法间接调用 AQS#acquire(1);
非偏心锁首先通过 CAS 批改 state 值来获取锁,当获取失败时才会调用 AQS#acquire(1) 来获取锁。
3.1.2 AQS#acquire
AQS 中的 acquire 办法中,只有 tryAcquire 办法须要子类来实现。
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
3.1.3 tryAcquire
在 ReentrantLock 中,Sync 的两个子类 FairSync、NonfairSync 各自实现了 AQS#tryAcquire 办法。
偏心锁
java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {if (!hasQueuedPredecessors() && // 判断同步队列中是否有等待时间更长的节点
compareAndSetState(0, acquires)) { // 进入这里,阐明以后线程期待锁工夫最长,则 CAS 批改 state
setExclusiveOwnerThread(current); // 将以后线程设置为持有锁的线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 以后线程已持有锁
int nextc = c + acquires; // 重入次数
if (nextc < 0)
throw new Error("Maximum lock count exceeded"); // 可重入的最大次数 Integer.MAX_VALUE
setState(nextc);
return true;
}
return false;
}
代码流程:
- 如果锁是可用状态,且同步队列中没有比以后线程等待时间还长的节点,则尝试获取锁,胜利则设为锁的持有者。
- 如果以后线程已持有锁,则进行重入,设置已获取锁的次数,最大次数为 Integer.MAX_VALUE。
- 以上条件都不合乎,则无奈获取锁。
非偏心锁
java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
}
java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {if (compareAndSetState(0, acquires)) { // 不论以后线程在同步队列中是否期待最久,都来 CAS 抢夺锁
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 如果已取得锁,则重入。逻辑与偏心锁统一
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
代码流程:
- 如果锁是可用状态,(不论同步队列中有没有节点在阻塞期待中)间接尝试获取锁,胜利则设为锁的持有者。
- 如果以后线程已持有锁,则进行重入,设置已获取锁的次数,最大次数为 Integer.MAX_VALUE。
- 以上条件都不合乎,则无奈获取锁。
能够看到,偏心锁比非偏心锁多执行了 hasQueuedPredecessors 办法,用于判断同步队列中是否具备比以后线程期待锁工夫还长的节点。
在 AQS 中的同步队列中,头节点是一个 dummy node,因而等待时间最长的节点是头节点的下一个节点,若该节点存在且不是以后线程,则 hasQueuedPredecessors 返回 true。阐明以后节点不是同步队列中等待时间最长的节点,无奈获取锁。
java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
// 头节点的下一个节点,不是以后线程的节点,阐明以后线程期待锁工夫不是最长的
((s = h.next) == null || s.thread != Thread.currentThread());
}
3.2 Lock#lockInterruptibly
如果以后线程未被中断,则获取锁。
java.util.concurrent.locks.ReentrantLock#lockInterruptibly
public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);
}
关注获取锁过程中,对异样的解决。
- 获取锁之前,以后线程曾经中断,则间接抛出中断异样。
- 在同步队列中期待锁的过程中,如果被中断唤醒,则放弃期待锁,间接抛出异样。
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireInterruptibly
public final void acquireInterruptibly(int arg)
throws InterruptedException {if (Thread.interrupted())
// 获取锁之前,以后线程曾经中断,则间接抛出中断异样
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireInterruptibly
/**
* Acquires in exclusive interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {for (;;) {final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 线程在阻塞期待锁的过程中,被中断唤醒,则放弃期待锁,间接抛出异样
throw new InterruptedException();}
} finally {if (failed)
cancelAcquire(node);
}
}
3.3 Lock#tryLock
仅在调用时锁未被另一个线程放弃的状况下,才获取该锁。
与非偏心锁的 NonfairSync#tryAcquire 办法一样,都是调用了 Sync#nonfairTryAcquire 办法。
- ReentrantLock#lock -> NonfairSync#lock -> AQS#acquire -> NonfairSync#tryAcquire -> Sync#nonfairTryAcquire
- ReentrantLock#tryLock -> Sync#nonfairTryAcquire
留神,Sync#nonfairTryAcquire 办法不存在任何和队列相干的操作。
在锁没有被其余线程持有的状况下,不论应用的是偏心锁还是非偏心锁,多个线程都能够调用该办法来 CAS 抢夺锁,抢夺失败了不必进入同步队列,间接返回。
应用偏心锁的状况下,在上一个持有锁的线程开释锁(锁状态 state == 0)之后,同步队列中的线程被唤醒但尚未获取锁之前,此时以后线程执行 tryLock 可能会胜利取得锁,这实际上是对偏心锁的公平性的一种毁坏。
在某些状况下,突破公平性的行为可能是有用的。如果心愿恪守此锁的偏心设置,则应用 tryLock(0, TimeUnit.SECONDS)
,它简直是等效的(也检测中断)。
java.util.concurrent.locks.ReentrantLock#tryLock()
public boolean tryLock() {return sync.nonfairTryAcquire(1);
}
java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {if (compareAndSetState(0, acquires)) { // 不论同步队列中是否具备期待很久的节点,以后线程间接 CAS 抢夺锁
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 如果已取得锁,则重入。逻辑与偏心锁统一
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
3.4 Lock#tryLock(time, unit)
如果锁在给定等待时间内没有被另一个线程放弃,且以后线程未被中断,则获取该锁。
java.util.concurrent.locks.ReentrantLock#tryLock(long, java.util.concurrent.TimeUnit)
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
在 JDK 中,AQS 中的 tryAcquireNanos 办法只会被 ReentrantLock#tryLock(time, unit) 和 ReentrantReadWriteLock.WriteLock#tryLock(time, unit) 这两个办法调用到。
阐明只有独占模式能力调用。
代码流程:
- 获取锁之前,如果以后线程已中断,则抛出中断异样。
- tryAcquire:以后线程尝试获取锁,获取胜利间接返回,否则进入下一个步。
- doAcquireNanos:以后线程进入同步队列中进行期待,直到取得锁、产生中断、期待超时。
java.util.concurrent.locks.AbstractQueuedSynchronizer#tryAcquireNanos
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireNanos
/**
* Acquires in exclusive timed mode.
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {for (;;) {final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) // 直到超时都没有取得锁,则返回 false
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout); // 能够进入阻塞的状况下,剩余时间大于阈值,则阻塞,否则自旋
if (Thread.interrupted())
throw new InterruptedException(); // 线程在阻塞期待锁的过程中,被中断唤醒,则放弃期待锁,间接抛出异样}
} finally {if (failed)
cancelAcquire(node);
}
}
3.5 Lock#unlock
尝试开释锁。
java.util.concurrent.locks.ReentrantLock#unlock
/**
* Attempts to release this lock.
*
* <p>If the current thread is the holder of this lock then the hold
* count is decremented. If the hold count is now zero then the lock
* is released. If the current thread is not the holder of this
* lock then {@link IllegalMonitorStateException} is thrown.
*
* @throws IllegalMonitorStateException if the current thread does not
* hold this lock
*/
public void unlock() {sync.release(1);
}
3.5.1 AQS#release
尝试开释锁,若开释胜利,且同步队列中具备期待中的节点,则尝试唤醒后继节点。
java.util.concurrent.locks.AbstractQueuedSynchronizer#release
public final boolean release(int arg) {if (tryRelease(arg)) { // 开释锁资源
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒 head 的后继节点
return true;
}
return false;
}
3.5.2 Sync#tryRelease
在 ReentrantLock 中,不论是偏心锁还是非偏心锁,均应用雷同的 Sync#tryRelease 办法。
- 如果以后线程不是持有锁的线程,抛出 IllegalMonitorStateException。
- 因为锁是可重入的,必须把持有的锁全副开释(计数归零)才表明以后线程不再持有锁。
java.util.concurrent.locks.ReentrantLock.Sync#tryRelease
protected final boolean tryRelease(int releases) {int c = getState() - releases; // 计算开释之后的 state
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true; // 锁已全副开释(获取锁的次数必须等于开释次数),返回 true,表明能够唤醒下一个期待线程
setExclusiveOwnerThread(null); // 设置独占锁持有线程为 null
}
setState(c);
return free;
}
3.4 Lock#newCondition
创立 AQS 中的 ConditionObject 对象。是能够与 Lock 一起应用的 Condition 接口实例。
Condition 实例反对与 Object 的监视器办法(wait、notify 和 notifyAll)雷同的用法(await、signal 和 signalAll)。
- 在调用 Condition#await 或 Condition#signal 办法时,如果没有持有锁,则将抛出 IllegalMonitorStateException。
- 在调用 Condition#await 办法时,将开释锁并进入阻塞,当被唤醒时从新获取锁,并复原调用 Condition#await 时锁的持有计数。
- 如果线程在期待时被中断,则期待将终止,待从新获取锁胜利之后,再响应中断(抛异样或从新中断)。
- 期待线程按 FIFO 程序收到信号。
- 期待办法返回的线程从新获取锁的程序,与线程最后获取锁的程序雷同(对于非偏心锁,是按获取锁的程序;对于偏心锁,等同于按期待锁的工夫排序)。
java.util.concurrent.locks.ReentrantLock.Sync#newCondition
final ConditionObject newCondition() {return new ConditionObject();
}
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#ConditionObject
public ConditionObject() {}
相干浏览:
浏览 JDK 源码:AQS 中的独占模式
浏览 JDK 源码:AQS 中的共享模式
浏览 JDK 源码:AQS 对 Condition 的实现
作者:Sumkor
链接:https://segmentfault.com/a/11…