在 JUC 包中,共享锁包含 CountDownLatch、CyclicBarrier、Semaphore、ReentrantReadWriteLock、JDK1.8 新增的 StampedLock 等。
ReentrantReadWriteLock 中定义了两个锁:共享锁 readLock 和独占锁 writeLock。
共享锁 readLock 用于读操作,能同时被多个线程获取;独占锁 writeLock 用于写入操作,只能被一个线程持有。
读锁、写锁均具备偏心模式、非偏心模式两种获取锁的形式。
因为 ReentrantReadWriteLock 是基于 AQS(AbstractQueuedSynchronizer)框架实现的锁工具,在浏览 ReentrantReadWriteLock 源码之前,须要先理解 AQS 中的独占模式、共享模式、Condition 机制的实现原理,相干内容可翻阅我的前几篇文章,文末已给出链接。
本文基于 jdk1.8.0_91
@[toc]
1. 继承体系
ReentrantReadWriteLock 具备三个重要的外部类:ReadLock、WriteLock、Sync
- ReentrantReadWriteLock 自身实现了 ReadWriteLock 接口。
- ReadLock、WriteLock 实现了 Lock 接口。
- Sync 继承了 AQS 抽象类,具备 FairSync 和 NonfairSync 两个子类。
1.1 ReadWriteLock
读写锁是一种非凡的锁,它把对共享资源的拜访分为读拜访和写访问,多个线程能够同时对共享资源进行读拜访,然而同一时间只能有一个线程对共享资源进行写访问,应用读写锁能够极大地提高吞吐量。
读与写之间是否互斥:
读 | 写 | |
---|---|---|
读 | 否 | 是 |
写 | 是 | 是 |
能够看到,读写锁除了读读不互斥,读写、写读、写写都是互斥的。
罕用业务场景:读多写少,比方服务缓存等。
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();}
ReentrantReadWriteLock 是接口 ReadWriteLock 的实现。
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock;}
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock;}
}
1.2 Lock
ReadLock、WriteLock 均实现了接口 Lock。
/**
* The lock returned by method {@link ReentrantReadWriteLock#readLock}.
*/
public static class ReadLock implements Lock, java.io.Serializable {...}
/**
* The lock returned by method {@link ReentrantReadWriteLock#writeLock}.
*/
public static class WriteLock implements Lock, java.io.Serializable {...}
1.3 AQS
ReentrantReadWriteLock 具备外部类 Sync。跟 ReentrantLock 一样,Sync 继承自 AQS。
ReentrantReadWriteLock 中的 ReadLock、WriteLock 对 Lock 的实现都是通过 Sync 来做到的。
Lock 接口 ReadLock 实现 WriteLock 实现
lock() sync.acquireShared(1) sync.acquire(1)
lockInterruptibly() sync.acquireSharedInterruptibly(1) sync.acquireInterruptibly(1)
tryLock() sync.tryReadLock() sync.tryWriteLock()
tryLock(long time, TimeUnit unit) sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)) sync.tryAcquireNanos(1, unit.toNanos(timeout))
unlock() sync.releaseShared(1) sync.release(1)
newCondition() UnsupportedOperationException sync.newCondition()
能够看到,ReadLock 是共享锁,WriteLock 是独占锁。ReadLock 不反对应用 Condition。
AQS 中提供了一系列的模板办法,在 Sync 中均失去实现。
java.util.concurrent.locks.AbstractQueuedSynchronizer
// 独占获取(资源数)protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}
// 独占开释(资源数)protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();
}
// 共享获取(资源数)protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();
}
// 共享获取(资源数)protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();
}
// 是否排它状态
protected boolean isHeldExclusively() {throw new UnsupportedOperationException();
}
1.4 Sync
Sync 实现了 AQS 中全副的模板办法,因而能够同时反对独占锁、共享锁两种实现。
此外,还提供了两个形象办法,用于制订公平性策略,强制子类实现。
- readerShouldBlock:以后线程申请【读锁】是否须要阻塞,返回 true 阐明须要期待,否则能够立刻尝试获取。
- writerShouldBlock:以后线程申请【写锁】是否须要阻塞,返回 true 阐明须要期待,否则能够立刻尝试获取。
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* Returns true if the current thread, when trying to acquire
* the read lock, and otherwise eligible to do so, should block
* because of policy for overtaking other waiting threads.
*/
abstract boolean readerShouldBlock();
/**
* Returns true if the current thread, when trying to acquire
* the write lock, and otherwise eligible to do so, should block
* because of policy for overtaking other waiting threads.
*/
abstract boolean writerShouldBlock();}
1.4 FailSync/NonfairSync
Sync 具备两个子类 FailSync 和 NonfairSync。对应的是 ReentrantReadWriteLock 的两种实现:偏心锁(fair lock)、非偏心锁(non-fair lock)。
这两个子类只须要实现 Sync 中的 readerShouldBlock、writerShouldBlock 办法。
偏心锁和非偏心锁的区别,次要体现在获取读写锁的机制不同:
- 偏心模式:同步队列中存在比以后线程期待【读、写】锁的工夫还长的节点,则以后线程获取【读、写】锁时,均须要阻塞。
- 非偏心模式:以后线程能够立刻获取【写】锁;以后线程获取【读】锁时,如果同步队列中存在期待【写】锁工夫很长的节点,则以后线程须要阻塞(重入读除外)。
如果以后线程无奈获取锁,会抛出异样,或进入同步队列进行排队期待。
1.4.1 偏心锁
偏心模式:不论获取读锁还是写锁,都要严格依照先后顺序排队获取。
java.util.concurrent.locks.ReentrantReadWriteLock.FairSync
/**
* Fair version of Sync
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {return hasQueuedPredecessors(); // 如果同步队列中等待时间最长的节点不是以后线程,返回 true;否则返回 false
}
final boolean readerShouldBlock() {return hasQueuedPredecessors();
}
}
java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
// 头节点的下一个节点,不是以后线程的节点,阐明以后线程期待锁工夫不是最长的
((s = h.next) == null || s.thread != Thread.currentThread());
}
1.4.2 非偏心锁
非偏心模式:获取写锁能够立刻获取,无需排队;获取读锁之前,若判断等待时间最长的是写线程,则读线程需退让(重入读除外),进入阻塞。
java.util.concurrent.locks.ReentrantReadWriteLock.NonfairSync
/**
* Nonfair version of Sync
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {return false; // writers can always barge // 获取写锁无需阻塞}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
// 为了防止写饥饿,如果同步队列中等待时间最长的节点是互斥节点,则获取读锁须要阻塞,返回 true。return apparentlyFirstQueuedIsExclusive();}
}
java.util.concurrent.locks.AbstractQueuedSynchronizer#apparentlyFirstQueuedIsExclusive
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null && // 头节点 h 不为空
(s = h.next) != null && // 存在期待中的节点 s
!s.isShared() && // 节点 s 不是共享模式,即互斥
s.thread != null; // 节点 s 不是有效节点
}
2. 构造方法
结构 ReentrantReadWriteLock 的时候,会根据偏心或非偏心模式实例化 Sync,再应用 Sync 来结构 ReadLock、WriteLock 实例。
java.util.concurrent.locks.ReentrantReadWriteLock
/** Performs all synchronization mechanics */
final Sync sync;
/**
* Creates a new {@code ReentrantReadWriteLock} with
* default (nonfair) ordering properties.
*/
public ReentrantReadWriteLock() {this(false);
}
/**
* Creates a new {@code ReentrantReadWriteLock} with
* the given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
默认应用非偏心锁。
将 Sync 实例传递给 ReadLock、WriteLock 实例。
java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {sync = lock.sync;}
}
java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {sync = lock.sync;}
}
3. 资源 state
3.1 资源定义
应用 AQS 的属性 state 来示意资源 / 锁。
/**
* The synchronization state.
*/
private volatile int state;
ReentrantLock 应用 state 示意以后共享资源是否被其余线程锁占用。如果为 0 则示意未被占用,其余值示意该锁被重入的次数。
ReentrantReadWriteLock 应用 state 的高 16 位示意读状态,也就是获取到读锁的次数;应用低 16 位示意写状态,也就是获取到写锁的次数。
/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
*/
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 十进制为:65535
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 二进制为:1111 1111 1111 1111
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) {return c >>> SHARED_SHIFT;} // 共享锁(读)的数量,取高 16 位
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) {return c & EXCLUSIVE_MASK;} // 互斥锁(写)的数量,取低 16 位
对于指定的 state 值,应用 sharedCount()
计算失去共享锁(读锁)的数量,应用 exclusiveCount()
计算失去互斥锁(写锁)的数量。
可知,ReentrantReadWriteLock 最多反对 65535 个递归写入锁和 65535 个读取锁。
3.2 资源计数
Sync 中定义了两个外部类:
- HoldCounter 用于记录单个线程 id、该线程持有锁的数量。
- ThreadLocalHoldCounter 用于记录所有线程 id、各自持有锁的数量。
/**
* A counter for per-thread read hold counts.
* Maintained as a ThreadLocal; cached in cachedHoldCounter
*/
static final class HoldCounter { // 用来保留线程 id、该线程持有共享锁的数量
int count = 0;
// Use id, not reference, to avoid garbage retention
// 记录线程 id,不记录线程援用,避免无奈 GC
final long tid = getThreadId(Thread.currentThread());
}
/**
* ThreadLocal subclass. Easiest to explicitly define for sake
* of deserialization mechanics.
*/
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {public HoldCounter initialValue() {return new HoldCounter();
}
}
/**
* The number of reentrant read locks held by current thread.
* Initialized only in constructor and readObject.
* Removed whenever a thread's read hold count drops to 0.
*/
// 存储所有线程的 id,以及各自持有锁的数量
private transient ThreadLocalHoldCounter readHolds;
// 缓存,存储最初一个获取锁的 HoldCounter:线程 id,该线程持有锁的数量
private transient HoldCounter cachedHoldCounter;
// 缓存,存储第一个获取锁的线程
private transient Thread firstReader = null;
// 缓存,记录第一个获取锁的线程持有锁的数量
private transient int firstReaderHoldCount;
Sync() {readHolds = new ThreadLocalHoldCounter(); // 创立 ThreadLocalHoldCounter 实例
setState(getState()); // ensures visibility of readHolds
}
4. 读锁
办法摘要:
// 获取读取锁。void lock()
// 获取读取锁,除非以后线程被中断。void lockInterruptibly()
// 因为 ReadLocks 不反对条件,所以将抛出 UnsupportedOperationException。Condition newCondition()
// 返回标识此锁及其锁状态的字符串。String toString()
// 仅当写入锁在调用期间未被另一个线程放弃时获取读取锁。boolean tryLock()
// 如果另一个线程在给定的等待时间内没有放弃写入锁,并且以后线程未被中断,则获取读取锁。boolean tryLock(long timeout, TimeUnit unit)
// 试图开释此锁。void unlock()
4.1 ReadLock#lock
获取读锁。
- 如果没有其余线程持有写锁,则以后线程获取读取锁并立刻返回。
- 如果其余线程持有写锁,则以后线程进入阻塞,直到能够获取读锁。
即,跟写锁互斥。
java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#lock
/**
* Acquires the read lock.
*
* <p>Acquires the read lock if the write lock is not held by
* another thread and returns immediately.
*
* <p>If the write lock is held by another thread then
* the current thread becomes disabled for thread scheduling
* purposes and lies dormant until the read lock has been acquired.
*/
public void lock() {sync.acquireShared(1);
}
4.1.1 AQS#acquireShared
共享模式下获取锁 / 资源,忽视中断。
代码流程:
- 获取共享锁 / 资源,获取失败则进入下一步
- 进入同步队列中期待获取锁 / 资源
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireShared
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
其中,AQS#tryAcquireShared 返回值:
- 正数:获取资源失败,筹备进入同步队列;
- 0:获取资源胜利,但没有残余可用资源;
- 负数:获取资源胜利,能够唤醒下一个期待线程;
在 ReentrantReadWriteLock.ReadLock 中,tryAcquireShared 只会返回 1 和 -1.
当读锁获取失败时(其余线程正在写,或者不合乎公平性策略)返回 -1,此时才会执行 doAcquireShared 进入排队期待。
留神,当读锁的计数达到最大值,后续再获取读锁会抛出异样 throw new Error("Maximum lock count exceeded")
,而不是排队期待。
4.1.2 Sync#tryAcquireShared
代码流程:
- 如果其余线程持有写锁,则获取读锁失败。
- 否则(没有其余线程获取写锁),如果依照公平性策略能够获取读锁,则 CAS 尝试获取,并记录线程 id 和获取锁的次数。
- 上一步获取锁失败,进入 fullTryAcquireShared 在自旋中重试 CAS 获取锁。
偏心 / 非偏心规定:
- 偏心的 readerShouldBlock:同步队列中等待时间最长的节点不是以后线程,则以后线程获取读锁须要阻塞。
- 非偏心的 readerShouldBlock:同步队列中等待时间最长的节点是互斥节点,则以后线程获取读锁须要阻塞。
获取锁胜利后,须要记录以后线程持有数的数量:
- 以后是第一个持有锁的线程,记为 firstReader,且 firstReaderHoldCount 记为 1。
- 以后线程是 firstReader,则 firstReaderHoldCount 加 1。
- 以后线程不是 firstReader,应用 cachedHoldCounter 或 ThreadLocalHoldCounter 来记录线程 id 和获取锁的次数
java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquireShared
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
// 如果其余线程持有写锁,则以后线程进入阻塞(写读互斥)// 如果以后线程持有写锁,则容许再次获取读锁(反对锁降级)return -1;
int r = sharedCount(c); // 读锁被获取的次数
if (!readerShouldBlock() && // 公平性策略校验,若不须要阻塞则进入下一步
r < MAX_COUNT && // 读锁数量没有超限度
compareAndSetState(c, c + SHARED_UNIT)) { // CAS 获取读锁(高 16 位加 1)if (r == 0) { // 校验以后线程是否是 firstReader,并累计 firstReaderHoldCount
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {firstReaderHoldCount++;} else { // 进入这里,阐明以后不是首个获取锁的线程,应用 HoldCounter 记录以后线程 id 和持有锁的数量
HoldCounter rh = cachedHoldCounter; // 先查缓存 cachedHoldCounter
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get(); // 缓存没有命中,从 ThreadLocal 中获取,并更新缓存
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1; // 获取锁胜利,持有读锁数量 1
}
return fullTryAcquireShared(current); // 一次获取读锁失败后,尝试循环获取
}
4.1.3 Sync#fullTryAcquireShared
在 tryAcquireShared 中经行了一次疾速获取读锁,然而 CAS 只能容许一个线程获取锁胜利,而读锁是共享的,能够同时容许多个线程获取。因而须要调用 fullTryAcquireShared 执行完整版的获取锁的逻辑。
关注不同的中央:
- tryAcquireShared 只 CAS 获取锁一次,失败了则调用 fullTryAcquireShared,后者会在自旋中一直重试 CAS 获取锁。
- tryAcquireShared 得悉获取读锁须要阻塞,不会尝试 CAS;而 fullTryAcquireShared 得悉获取读锁须要阻塞,会进一步判断以后是否是重入的,非重入时才会让出读锁。
也就是说,重入读的状况下,如果锁是可获取状态,不会让出锁给写线程。
java.util.concurrent.locks.ReentrantReadWriteLock.Sync#fullTryAcquireShared
/**
* Full version of acquire for reads, that handles CAS misses
* and reentrant reads not dealt with in tryAcquireShared.
*/
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {int c = getState();
if (exclusiveCount(c) != 0) {
/**
* 如果是其余线程获取了写锁,那么把以后线程阻塞;* 如果是以后线程获取了写锁,不阻塞,否则会造成死锁。* 从这里能够看到 ReentrantReadWriteLock 容许锁降级。*/
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
/**
* 非偏心模式下,进入这里阐明,同步队列的头结点的后继有一个竞争写锁的线程。* 所以这里有一个锁退让的操作,即让写锁先获取。* 1. 如果满足 firstReader == current 或者 rh.count > 0 阐明是重入的读。* 不须要退让给写线程,否则会导致死锁。* 2. 如果 rh.count == 0 就阐明,这个线程是第一次获取读锁。* 为了避免写饥饿,间接将以后线程放入同步队列排队期待。*/
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) { // 阐明不是第一次读(而是重入的读),不须要退让给写线程
// assert firstReaderHoldCount > 0;
} else {if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {rh = readHolds.get();
if (rh.count == 0)
readHolds.remove(); // 以后线程不持有读锁,移除计数}
}
if (rh.count == 0) // 阐明是第一次读,须要退让给写线程
return -1; // 返回 -1,后续进入同步队列中排队期待锁
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) { // CAS 获取读锁(高 16 位加 1)if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {firstReaderHoldCount++;} else {if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
以后线程如果持有写锁,容许获取读锁,即 反对锁降级。
为什么 要反对锁降级呢?次要是为了防止出现死锁。
假如不反对锁降级,当持有写锁的线程须要获取读锁时,只能进入阻塞期待锁变为可读。
然而只有当没有线程持有写锁时,读锁才可能被获取(写读互斥),导致读锁始终无奈获取,此时呈现死锁。
为什么 重入读锁时,不须要让出锁呢?同样是为了防止出现死锁。
假如重入读锁须要让给期待写锁的线程,则重入读的线程进入阻塞。
然而只有当没有线程持有读锁时,写锁才可能被获取(读写互斥),导致写锁始终无奈被获取,此时呈现死锁。
4.2 ReadLock#tryLock
仅当写锁在调用期间未被另一个线程放弃时,获取读锁。
java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#tryLock()
/**
* Acquires the read lock only if the write lock is not held by
* another thread at the time of invocation.
*/
public boolean tryLock() {return sync.tryReadLock();
}
代码流程:
- 如果其余线程持有写锁,则获取读锁失败。
- 如果读锁的获取数量超限度,则返回失败。
- 采纳 CAS 获取锁,获取胜利则计数;获取失败则返回,不会进入同步队列。
跟 tryAcquireShared 最大的区别没有调用 readerShouldBlock,因而可能会毁坏偏心规定,或造成写饥饿。
留神:
即便已将此锁设置为应用偏心排序策略,然而调用 tryLock() 仍将立刻获取读锁(如果有可用的),不论其余线程以后是否正在期待该读锁。在某些状况下,此“闯入”行为可能很有用,即便它会突破公平性也如此。如果心愿恪守此锁的偏心设置,则应用 tryLock(0, TimeUnit.SECONDS)
,它简直是等效的(它也检测中断)。
java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryReadLock
/**
* Performs tryLock for read, enabling barging in both modes.
* This is identical in effect to tryAcquireShared except for
* lack of calls to readerShouldBlock.
*/
final boolean tryReadLock() {Thread current = Thread.currentThread();
for (;;) {int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current) // 其余线程持有写锁
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) { // 自旋中 CAS 获取读锁(高 16 位加 1)if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {firstReaderHoldCount++;} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
4.3 ReadLock#unLock
开释读锁。
java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#unlock
/**
* Attempts to release this lock.
*
* <p>If the number of readers is now zero then the lock
* is made available for write lock attempts.
*/
public void unlock() {sync.releaseShared(1);
}
4.3.1 AQS#releaseShared
共享模式下开释锁 / 资源。
- 开释共享锁 / 资源,若开释胜利,则进入下一步。
- 唤醒队列中的期待节点
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();
return true;
}
return false;
}
在 ReentrantReadWriteLock 中,只有当全副读锁都开释之后,state == 0 时才会执行 doReleaseShared,唤醒同步队列中期待着的写线程。
4.3.2 Sync#tryReleaseShared
代码流程:
- 对 HoldCounter 中以后线程持有锁的计数进行自减。
- 自旋 CAS 更新 state 进行开释锁。
- 若 state == 0,阐明以后没有线程持有锁,对于期待写入的线程来说,能够发动锁申请。
能够看到,开释读锁不会与其余行为互斥。
java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryReleaseShared
protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();
if (firstReader == current) { // 对以后线程持有锁的数量进行自减
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException(); // 以后线程没有持有锁,报错}
--rh.count;
}
for (;;) { // 自旋 CAS 更新 state(确保多个线程可能并发开释锁)int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0; // 返回 true,阐明以后没有线程持有锁,对于期待写入的线程来说,能够发动锁申请
}
}
4.4 ReadLock#newCondition
因为 AQS 中的 Condition 实现不反对共享锁,因而 ReentrantReadWriteLock 中的读锁不反对应用 Condition。
为什么 要设计成共享锁不反对应用 Condition 呢?
- 读锁和写锁尽管都是应用 AQS 的 state 属性,然而持有读锁和持有写锁是独立的。
- 假如读锁反对 Condition,以后线程获取读锁后,在 Condition 条件上期待时,因为读写互斥,可能解除该阻塞的其余线程无奈获取写锁,导致以后线程无奈被唤醒。
Java 官网的解释:
Read locks are held independently of write locks, so are not checked or affected. However it is essentially always an error to invoke a condition waiting method when the current thread has also acquired read locks, since other threads that could unblock it will not be able to acquire the write lock.
java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock#newCondition
/**
* Throws {@code UnsupportedOperationException} because
* {@code ReadLocks} do not support conditions.
*
* @throws UnsupportedOperationException always
*/
public Condition newCondition() {throw new UnsupportedOperationException();
}
5. 写锁
办法摘要:
// 查问以后线程放弃写入锁的数量。int getHoldCount()
// 查问此写入锁是否由以后线程放弃。boolean isHeldByCurrentThread()
// 获取写入锁。void lock()
// 获取写入锁,除非以后线程被中断。void lockInterruptibly()
// 返回一个用来与此 Lock 实例一起应用的 Condition 实例。Condition newCondition()
// 返回标识此锁及其锁状态的字符串。String toString()
// 仅当写入锁在调用期间未被另一个线程放弃时获取该锁。boolean tryLock()
// 如果另一个线程在给定的等待时间内没有放弃写入锁,并且以后线程未被中断,则获取写入锁。boolean tryLock(long timeout, TimeUnit unit)
// 试图开释此锁。void unlock()
5.1 WriteLock#lock
获取写锁。
- 如果其余线程既没有放弃读锁也没有放弃写锁,则获取写锁并立刻返回,并将写锁放弃计数设置为 1。
- 如果以后线程曾经放弃写锁,则放弃计数减少 1,该办法立刻返回。
- 如果写锁被其余线程放弃,则以后线程进入阻塞直到能够获取写锁。
即,跟读锁、写锁都互斥。
java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#lock
/**
* Acquires the write lock.
*
* <p>Acquires the write lock if neither the read nor write lock
* are held by another thread
* and returns immediately, setting the write lock hold count to
* one.
*
* <p>If the current thread already holds the write lock then the
* hold count is incremented by one and the method returns
* immediately.
*
* <p>If the lock is held by another thread then the current
* thread becomes disabled for thread scheduling purposes and
* lies dormant until the write lock has been acquired, at which
* time the write lock hold count is set to one.
*/
public void lock() {sync.acquire(1);
}
5.1.1 AQS#acquire
AQS 中的 acquire 办法中,只有 tryAcquire 办法须要子类来实现。
代码流程:
- 尝试获取锁,失败则进入下一步。
- 进入同步队列中期待获取锁。
java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
public final void acquire(int arg) {if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
留神,当写锁的计数达到最大值,后续再获取写锁会抛出异样 throw new Error("Maximum lock count exceeded")
,而不是排队期待。
5.1.2 Sync#tryAcquire
代码流程:
- 存在线程持有读锁,或者其余线程持有写锁,则以后线程获取写锁失败。
- 如果写锁曾经饱和,无奈再取得锁,返回失败。
- 否则,如果是重入获取写锁,或者公平性策略校验通过,则获取写锁,并更新写锁计数。
留神,以后线程持有读锁后,无奈再获取写锁,即 不反对锁降级。
为什么 不反对锁降级?
- 假如反对锁降级,持有读锁的线程均降级为写锁,会违反写锁是互斥锁的定义。
- 假如只反对其中一个读锁进行降级,则会违反读写互斥的规定。
java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquire
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) { // 锁曾经被线程持有,须要进一步辨别读、写锁,是否以后线程持有
// (Note: if c != 0 and w == 0 then shared count != 0)
// 只有读锁(读写互斥。留神以后线程持有读锁之后,也无奈获取写锁,不反对锁降级)// 或者持有写锁的不是以后线程,则无奈获取写锁
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire // 进入这里,阐明以后线程已持有写锁,是重入获取
setState(c + acquires); // 更新 state,无需应用 CAS
return true;
}
// 进入这里,阐明锁未被持有(然而同步队列中可能有线程在期待)if (writerShouldBlock() || // 校验公平性策略看是否获取写锁
!compareAndSetState(c, c + acquires)) // 可能获取写锁,则 CAS 获取(低 16 位加 1)return false;
setExclusiveOwnerThread(current); // 获取写锁胜利,记录持有写锁的是以后线程
return true;
}
5.2 WriteLock#tryLock
仅当写锁在调用期间未被另一个线程放弃时,获取写锁。
java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#tryLock()
public boolean tryLock( ) {return sync.tryWriteLock();
}
代码流程:
- 存在线程持有读锁,或者其余线程持有写锁,则以后线程获取写锁失败。
- 如果写锁曾经饱和,无奈再取得锁,返回失败。
- 否则,如果是重入获取写锁,或者没以后没有线程持有锁(可能在同步队列中期待),则 CAS 获取锁,获取胜利则计数;获取失败则返回,不会进入同步队列。
跟 Sync#tryAcquire 最大的区别没有调用 writerShouldBlock,因而可能会毁坏偏心规定。
如果心愿恪守此锁的偏心设置,则应用 tryLock(0, TimeUnit.SECONDS)
,它简直是等效的(它也检测中断)。
因为读锁是共享的,tryReadLock 中须要自旋进行 CAS 获取锁,而 tryWriteLock 中不必进行自旋,只 CAS 获取一次。
java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryWriteLock
/**
* Performs tryLock for write, enabling barging in both modes.
* This is identical in effect to tryAcquire except for lack
* of calls to writerShouldBlock.
*/
final boolean tryWriteLock() { // 仅当写入锁在调用期间未被另一个线程放弃时获取该锁
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread()) // 只有读锁(读写互斥);或者持有写锁的不是以后线程
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
5.3 WriteLock#unlock
试图开释此锁。
java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#unlock
public void unlock() {sync.release(1);
}
5.3.1 AQS#release
独占模式下开释锁。
- 尝试开释锁,若开释胜利则进入下一步。
- 唤醒同步队列中的后继节点。
java.util.concurrent.locks.AbstractQueuedSynchronizer#release
public final boolean release(int arg) {if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
在 ReentrantReadWriteLock 中,只有当全副写锁都开释之后,exclusiveCount == 0 时才会执行 unparkSuccessor,唤醒同步队列中期待着的读、写线程。
5.3.2 Sync#tryRelease
代码流程:
- 如果以后线程不是此锁的持有者,则抛出 IllegalMonitorStateException。
- 如果以后线程放弃此锁,则将放弃计数减 1。如果放弃计数当初为 0,则开释该锁。
java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryRelease
protected final boolean tryRelease(int releases) { // 开释独占锁 - 写锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases; // 更新 state,无需 CAS
boolean free = exclusiveCount(nextc) == 0; // 查看锁是否已全副开释
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
5.4 WriteLock#newCondition
返回一个用来与此 Lock 实例一起应用的 Condition 实例。
java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock#newCondition
public Condition newCondition() {return sync.newCondition();
}
在应用内置监视器锁时,返回的 Condition 实例反对与 Object 的监视器办法(wait、notify 和 notifyAll)雷同的用法。
- 如果当调用任何 Condition 办法时没有放弃此写入锁,则抛出 IllegalMonitorStateException。(因为放弃读锁是独立于写锁的,所以读锁将不被查看或受影响)
- 调用期待条件办法时,会开释写锁,在办法返回之前,会从新获取写锁,并将锁放弃计数复原到调用该办法时的值。
- 如果线程在期待时被中断,则期待将终止,并将抛出 InterruptedException,革除线程的已中断状态。
- 期待线程按 FIFO 程序收到信号。
- 期待办法返回的线程从新获取锁的程序,与线程最后获取锁的程序雷同,在默认状况下,未指定此程序,但对于偏心锁,它们更偏向于那些等待时间最长的线程。
6. 示例
JDK 官网示例,展现了如何利用重入来执行降级缓存后的锁降级(为简略起见,省略了异样解决):
class CachedData {
Object data;
volatile boolean cacheValid;
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
// Recheck state because another thread might have acquired
// write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
// Unlock write, still hold read
rwl.writeLock().unlock();
}
use(data);
rwl.readLock().unlock();
}
}
在应用某些品种的 Collection 时,能够应用 ReentrantReadWriteLock 来进步并发性。
通常,在预期 collection 很大,读取者线程拜访它的次数多于写入者线程,并且 entail 操作的开销高于同步开销时,这很值得一试。
例如,以下是一个应用 TreeMap 的类,预期它很大,并且能被同时拜访。
class RWDictionary {private final Map<String, Data> m = new TreeMap<String, Data>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
public Data get(String key) {r.lock();
try {return m.get(key); }
finally {r.unlock(); }
}
public String[] allKeys() {r.lock();
try {return m.keySet().toArray();}
finally {r.unlock(); }
}
public Data put(String key, Data value) {w.lock();
try {return m.put(key, value); }
finally {w.unlock(); }
}
public void clear() {w.lock();
try {m.clear(); }
finally {w.unlock(); }
}
}
7. 总结
7.1 互斥规定
读读不互斥,读写、写读、写写都是互斥的。
7.2 获取程序
ReentrantReadWriteLock 不会按读取者优先或写入者优先的程序,来对锁的拜访进行排序。然而,它的确反对可选的公平性策略。
非偏心模式(默认):
获取写锁能够立刻获取,无需排队;获取读锁之前,若判断等待时间最长的是写线程,则非重入的读线程需进入阻塞。
间断竞争的非偏心锁可能无限期地推延一个或多个 reader 或 writer 线程,但吞吐量通常要高于偏心锁。
偏心模式:
不论获取读锁还是写锁,都要严格依照先后顺序排队获取。
留神,读锁是共享的,只有同步队列中没有期待的线程,读锁能够被同时获取,一旦同步队列中具备线程在期待,后续的非重入的读线程只能入队期待。
此外,ReentrantReadWriteLock.ReadLock.tryLock() 和 ReentrantReadWriteLock.WriteLock.tryLock() 办法不会恪守此偏心设置。
7.3 重入
重入规定能够看作是对公平性策略的一种修改,即便同步队列中存在期待线程时,已持有锁的线程能够重入获取锁,无需退让。
读锁的最大可重入次数是 65535,写锁的最大可重入次数同样是 65535。
超过最大可重入次数,间接抛异样。
7.4 锁降级
重入还容许从写锁降级为读锁,其实现形式是:先获取写锁,而后获取读锁,最初开释写锁、读锁。
然而,从读锁降级到写锁是不可能的。
7.5 Condition 反对
写锁提供了一个 Condition 实现,读锁不反对 Condition。
7.6 告诉机制
读锁开释:当锁被多个读线程持有时,只有全副读锁都开释了,才会唤醒同步队列中期待着的节点,此时的期待节点是写线程。
写锁开释:当锁被单个写线程持有时,只有全副写锁都开释了,才会唤醒同步队列中期待着的节点,该节点可能是写线程或读线程。
读锁获取:当同步队列中的读线程胜利获取锁后,会唤醒队列中的下一个共享节点(读线程),再由下一个共享节点获取锁后唤醒下下个共享节点(见 AQS#setHeadAndPropagate)。
相干浏览:
浏览 JDK 源码:AQS 中的独占模式
浏览 JDK 源码:AQS 中的共享模式
浏览 JDK 源码:AQS 对 Condition 的实现
浏览 JDK 源码:可重入锁 ReentrantLock
作者:Sumkor
链接:https://segmentfault.com/a/11…