共计 4774 个字符,预计需要花费 12 分钟才能阅读完成。
针对读多写少的场景,Java 提供了另外一个实现 Lock 接口的读写锁 ReentrantReadWriteLock(RRW), 之前剖析过 ReentrantLock 是一个独占锁,同一时间只容许一个线程拜访。
而 RRW 容许多个读线程同时拜访,但不容许写线程和读线程、写线程和写线程同时拜访。
读写锁外部保护了两个锁,一个是用于读操作的 ReadLock,一个是用于写操作的 WriteLock。
读写锁恪守以下三条根本准则
- 容许多个线程同时读共享变量;
- 只容许一个线程写共享变量;
- 如果一个写线程正在执行写操作,此时禁止读线程读共享变量。
读写锁如何实现
RRW 也是基于 AQS 实现的,它的自定义同步器 (继承自 AQS) 须要在同步状态 state 上保护多个读线程和一个写线程的状态。RRW 的做法是应用高下位来实现一个整形管制两种状态,一个 int 占 4 个字节,一个字节 8 位。所以高 16 位示意读,低 16 位示意写。
abstract static class Sync extends AbstractQueuedSynchronizer { | |
static final int SHARED_SHIFT = 16; | |
// 10000000000000000(65536) | |
static final int SHARED_UNIT = (1 << SHARED_SHIFT); | |
// 65535 | |
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; | |
//1111111111111111 | |
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; | |
// 读锁 (共享锁) 的数量, 只计算高 16 位的值 | |
static int sharedCount(int c) {return c >>> SHARED_SHIFT;} | |
// 写锁 (独占锁) 的数量 | |
static int exclusiveCount(int c) {return c & EXCLUSIVE_MASK;} | |
} |
获取读锁
当线程获取读锁时,首先判断同步状态低 16 位,如果存在写锁,则获取锁失败,进入 CLH 队列阻塞,反之,判断以后线程是否应该被阻塞,如果不应该阻塞则尝试 CAS 同步状态,获取胜利更新同步锁为读状态。
protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread(); | |
int c = getState(); | |
// 如果以后曾经有写锁了,则获取失败 | |
if (exclusiveCount(c) != 0 && | |
getExclusiveOwnerThread() != current) | |
return -1; | |
// 获取读锁数量 | |
int r = sharedCount(c); | |
// 非偏心锁实现中 readerShouldBlock()返回 true 示意 CLH 队列中有正在排队的写锁 | |
// CAS 设置读锁的状态值 | |
if (!readerShouldBlock() && | |
r < MAX_COUNT && | |
compareAndSetState(c, c + SHARED_UNIT)) { | |
// 省略记录获取 readLock 次数的代码 | |
return 1; | |
} | |
// 针对下面失败的条件进行再次解决 | |
return fullTryAcquireShared(current); | |
} | |
final int fullTryAcquireShared(Thread current) { | |
// 无线循环 | |
for (;;) {int c = getState(); | |
if (exclusiveCount(c) != 0) { | |
// 如果不是以后线程持有写锁,则进入 CLH 队列阻塞 | |
if (getExclusiveOwnerThread() != current) | |
return -1; | |
} | |
// 如果 reader 应该被阻塞 | |
else if (readerShouldBlock()) { | |
// Make sure we're not acquiring read lock reentrantly | |
if (firstReader == current) {// assert firstReaderHoldCount > 0;} else {if (rh == null) { | |
rh = cachedHoldCounter; | |
if (rh == null || rh.tid != getThreadId(current)) {rh = readHolds.get(); | |
if (rh.count == 0) | |
readHolds.remove();} | |
} | |
// 以后线程没有持有读锁,即不存在锁重入状况。则进入 CLH 队列阻塞 | |
if (rh.count == 0) | |
return -1; | |
} | |
} | |
// 共享锁的如果超出了限度 | |
if (sharedCount(c) == MAX_COUNT) | |
throw new Error("Maximum lock count exceeded"); | |
// CAS 设置状态值 | |
if (compareAndSetState(c, c + SHARED_UNIT)) { | |
// 省略记录 readLock 次数的代码 | |
return 1; | |
} | |
} | |
} |
SHARED_UNIT
的值是 65536,也就是说,当第一次获取读锁的后,state 的值就变成了 65536。
在偏心锁的实现中当 CLH 队列中有排队的线程,readerShouldBlock()
办法就会返回为 true。非偏心锁的实现中则是当 CLH 队列中存在期待获取写锁的线程就返回 true
还须要留神的是获取读锁的时候,如果以后线程曾经持有写锁,是依然能获取读锁胜利的。前面会提到锁的降级,如果你对那里的代码有疑难,能够在回过头来看看这里申请锁的代码
开释读锁
protected final boolean tryReleaseShared(int unused) {for (;;) {int c = getState(); | |
// 减去 65536 | |
int nextc = c - SHARED_UNIT; | |
// 只有当 state 的值变成 0 才会真正的开释锁 | |
if (compareAndSetState(c, nextc)) | |
return nextc == 0; | |
} | |
} |
开释锁时,state 的值须要减去 65536,因为当第一次获取读锁后,state 值变成了 65536。
任何一个线程开释读锁的时候只有在 state==0
的时候才真正开释了锁,比方有 100 个线程获取了读锁,只有最初一个线程执行 tryReleaseShared
办法时才真正开释了锁,此时会唤醒 CLH 队列中的排队线程。
获取写锁
一个线程尝试获取写锁时,会先判断同步状态 state 是否为 0。如果 state 等于 0,阐明临时没有其它线程获取锁;如果 state 不等于 0,则阐明有其它线程获取了锁。
此时再判断 state 的低 16 位 (w) 是否为 0,如果 w 为 0,示意其余线程获取了读锁,此时进入 CLH 队列进行阻塞期待。
如果 w 不为 0,则阐明其余线程获取了写锁,此时须要判断获取了写锁的是不是以后线程,如果不是则进入 CLH 队列进行阻塞期待,如果获取了写锁的是以后线程,则判断以后线程获取写锁是否超过了最大次数,若超过,抛出异样。反之则更新同步状态。
// 获取写锁 | |
protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread(); | |
int c = getState(); | |
int w = exclusiveCount(c); | |
// 判断 state 是否为 0 | |
if (c != 0) { | |
// 获取锁失败 | |
if (w == 0 || current != getExclusiveOwnerThread()) | |
return false; | |
// 判断以后线程获取写锁是否超出了最大次数 65535 | |
if (w + exclusiveCount(acquires) > MAX_COUNT) | |
throw new Error("Maximum lock count exceeded"); | |
// 锁重入 | |
setState(c + acquires); | |
return true; | |
} | |
// 非偏心锁实现中 writerShouldBlock()永远返回为 false | |
// CAS 批改 state 的值 | |
if (writerShouldBlock() || | |
!compareAndSetState(c, c + acquires)) | |
return false; | |
// CAS 胜利后,设置以后线程为领有独占锁的线程 | |
setExclusiveOwnerThread(current); | |
return true; | |
} |
在偏心锁的实现中当 CLH 队列中存在排队的线程, 那么 writerShouldBlock()
办法就会返回为 true,此时获取写锁的线程就会被阻塞。
开释写锁
开释写锁的逻辑比较简单
protected final boolean tryRelease(int releases) { | |
// 写锁是否被以后线程持有 | |
if (!isHeldExclusively()) | |
throw new IllegalMonitorStateException(); | |
int nextc = getState() - releases; | |
boolean free = exclusiveCount(nextc) == 0; | |
// 没有其余线程持有写锁 | |
if (free) | |
setExclusiveOwnerThread(null); | |
setState(nextc); | |
return free; | |
} |
锁的降级?
// 筹备读缓存 | |
readLock.lock(); | |
try {v = map.get(key); | |
if(v == null) {writeLock.lock(); | |
try {if(map.get(key) != null) {return map.get(key); | |
} | |
// 更新缓存代码,省略 | |
} finally {writeLock.unlock(); | |
} | |
} | |
} finally {readLock.unlock(); | |
} |
对于下面获取缓存数据 (这也是 RRW 的利用场景) 的代码,先是获取读锁,而后再降级为写锁,这样的行为叫做锁的降级。惋惜 RRW 不反对,这样会导致写锁永恒期待,最终导致线程被永恒阻塞。所以 锁的降级是不容许的。
锁的降级
尽管锁的降级不容许,然而锁的降级却是能够的。
ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); | |
ReadLock readLock = lock.readLock(); | |
WriteLock writeLock = lock.writeLock(); | |
Map<String, String> dataMap = new HashMap(); | |
public void processCacheData() {readLock.lock(); | |
if(!cacheValid()) { | |
// 开释读锁,因为不容许 | |
readLock.unlock(); | |
writeLock.lock(); | |
try {if(!cacheValid()) {dataMap.put("key", "think123"); | |
} | |
// 降级为读锁 | |
readLock.lock();} finally {writeLock.unlock(); | |
} | |
} | |
try { | |
// 依然持有读锁 | |
System.out.println(dataMap); | |
} finally {readLock.unlock(); | |
} | |
} | |
public boolean cacheValid() {return !dataMap.isEmpty(); | |
} |
RRW 须要留神的问题
- 在读取很多、写入很少的状况下,RRW 会使写入线程遭逢饥饿(Starvation)问题,也就是说写入线程会因迟迟无奈竞争到锁而始终处于期待状态。
- 写锁反对条件变量,读锁不反对。读锁调用 newCondition() 会抛出 UnsupportedOperationException 异样
举荐浏览
之前有写过 AQS 的实现,ReentrantLock 的实现,能够参考我上面的文章
- AQS 源码剖析
- ReentrantLock 剖析
关注我,不迷路
老哥,能给我个点赞,和关注吗?