共计 12548 个字符,预计需要花费 32 分钟才能阅读完成。
一、读写锁简介
事实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该容许多个线程同时读取共享资源;然而如果一个线程想去写这些共享资源,就不应该容许其余线程对该资源进行读和写的操作了。
针对这种场景,JAVA 的并发包提供了读写锁 ReentrantReadWriteLock,它示意两个锁,一个是读操作相干的锁,称为共享锁;一个是写相干的锁,称为排他锁,形容如下:
线程进入读锁的前提条件:
没有其余线程的写锁,
没有写申请或者 有写申请,但调用线程和持有锁的线程是同一个。
线程进入写锁的前提条件:
没有其余线程的读锁
没有其余线程的写锁
而读写锁有以下三个重要的个性:
(1)偏心选择性:反对非偏心(默认)和偏心的锁获取形式,吞吐量还是非偏心优于偏心。
(2)重进入:读锁和写锁都反对线程重进入。
(3)锁降级:遵循获取写锁、获取读锁再开释写锁的秩序,写锁可能降级成为读锁。
二、源码解读
咱们先来看下 ReentrantReadWriteLock 类的整体构造:
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
/** 读锁 */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** 写锁 */
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
/** 应用默认(非偏心)的排序属性创立一个新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock() {this(false);
}
/** 应用给定的偏心策略创立一个新的 ReentrantReadWriteLock */
public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
/** 返回用于写入操作的锁 */
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock;}
/** 返回用于读取操作的锁 */
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock;}
abstract static class Sync extends AbstractQueuedSynchronizer {}
static final class NonfairSync extends Sync {}
static final class FairSync extends Sync {}
public static class ReadLock implements Lock, java.io.Serializable {}
public static class WriteLock implements Lock, java.io.Serializable {}}
1、类的继承关系
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {}
阐明:能够看到,ReentrantReadWriteLock 实现了 ReadWriteLock 接口,ReadWriteLock 接口定义了获取读锁和写锁的标准,具体须要实现类去实现;同时其还实现了 Serializable 接口,示意能够进行序列化,在源代码中能够看到 ReentrantReadWriteLock 实现了本人的序列化逻辑。
2、类的外部类
ReentrantReadWriteLock 有五个外部类,五个外部类之间也是互相关联的。外部类的关系如下图所示。
阐明:如上图所示,Sync 继承自 AQS、NonfairSync 继承自 Sync 类、FairSync 继承自 Sync 类(通过构造函数传入的布尔值决定要结构哪一种 Sync 实例);ReadLock 实现了 Lock 接口、WriteLock 也实现了 Lock 接口。
Sync 类:
(1)类的继承关系
abstract static class Sync extends AbstractQueuedSynchronizer {}
阐明:Sync 抽象类继承自 AQS 抽象类,Sync 类提供了对 ReentrantReadWriteLock 的反对。
(2)类的外部类
Sync 类外部存在两个外部类,别离为 HoldCounter 和 ThreadLocalHoldCounter,其中 HoldCounter 次要与读锁配套应用,其中,HoldCounter 源码如下。
// 计数器
static final class HoldCounter {
// 计数
int count = 0;
// Use id, not reference, to avoid garbage retention
// 获取以后线程的 TID 属性的值
final long tid = getThreadId(Thread.currentThread());
}
阐明:HoldCounter 次要有两个属性,count 和 tid,其中 count 示意某个读线程重入的次数,tid 示意该线程的 tid 字段的值,该字段能够用来惟一标识一个线程。ThreadLocalHoldCounter 的源码如下
// 本地线程计数器
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
// 重写初始化办法,在没有进行 set 的状况下,获取的都是该 HoldCounter 值
public HoldCounter initialValue() {return new HoldCounter();
}
}
阐明:ThreadLocalHoldCounter 重写了 ThreadLocal 的 initialValue 办法,ThreadLocal 类能够将线程与对象相关联。在没有进行 set 的状况下,get 到的均是 initialValue 办法外面生成的那个 HolderCounter 对象。
(3)类的属性
abstract static class Sync extends AbstractQueuedSynchronizer {
// 版本序列号
private static final long serialVersionUID = 6317671515068378041L;
// 高 16 位为读锁,低 16 位为写锁
static final int SHARED_SHIFT = 16;
// 读锁单位
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 读锁最大数量
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 写锁最大数量
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 本地线程计数器
private transient ThreadLocalHoldCounter readHolds;
// 缓存的计数器
private transient HoldCounter cachedHoldCounter;
// 第一个读线程
private transient Thread firstReader = null;
// 第一个读线程的计数
private transient int firstReaderHoldCount;
}
阐明:该属性中包含了读锁、写锁线程的最大量。本地线程计数器等。
(4)类的构造函数
// 构造函数
Sync() {
// 本地线程计数器
readHolds = new ThreadLocalHoldCounter();
// 设置 AQS 的状态
setState(getState()); // ensures visibility of readHolds
}
阐明:在 Sync 的构造函数中设置了本地线程计数器和 AQS 的状态 state。
3、读写状态的设计
同步状态在重入锁的实现中是示意被同一个线程反复获取的次数,即一个整形变量来保护,然而之前的那个示意仅仅示意是否锁定,而不必辨别是读锁还是写锁。而读写锁须要在同步状态(一个整形变量)上保护多个读线程和一个写线程的状态。
读写锁对于同步状态的实现是在一个整形变量上通过“按位切割应用”:将变量切割成两局部,高 16 位示意读,低 16 位示意写。
假如以后同步状态值为 S,get 和 set 的操作如下:
(1)获取写状态:
S&0x0000FFFF: 将高 16 位全副抹去
(2)获取读状态:
S>>>16: 无符号补 0,右移 16 位
(3)写状态加 1:
S+1
(4)读状态加 1:
S+(1<<16)即 S + 0x00010000
在代码层的判断中,如果 S 不等于 0,当写状态(S&0x0000FFFF),而读状态(S>>>16)大于 0,则示意该读写锁的读锁已被获取。
4、写锁的获取与开释
看下 WriteLock 类中的 lock 和 unlock 办法:
public void lock() {sync.acquire(1);
}
public void unlock() {sync.release(1);
}
能够看到就是调用的独占式同步状态的获取与开释,因而实在的实现就是 Sync 的 tryAcquire 和 tryRelease。
写锁的获取,看下 tryAcquire:
1 protected final boolean tryAcquire(int acquires) {
2 // 以后线程
3 Thread current = Thread.currentThread();
4 // 获取状态
5 int c = getState();
6 // 写线程数量(即获取独占锁的重入数)7 int w = exclusiveCount(c);
8
9 // 以后同步状态 state != 0,阐明曾经有其余线程获取了读锁或写锁
10 if (c != 0) {
11 // 以后 state 不为 0,此时:如果写锁状态为 0 阐明读锁此时被占用返回 false;12 // 如果写锁状态不为 0 且写锁没有被以后线程持有返回 false
13 if (w == 0 || current != getExclusiveOwnerThread())
14 return false;
15
16 // 判断同一线程获取写锁是否超过最大次数(65535),反对可重入
17 if (w + exclusiveCount(acquires) > MAX_COUNT)
18 throw new Error("Maximum lock count exceeded");
19 // 更新状态
20 // 此时以后线程已持有写锁,当初是重入,所以只须要批改锁的数量即可。21 setState(c + acquires);
22 return true;
23 }
24
25 // 到这里阐明此时 c =0, 读锁和写锁都没有被获取
26 //writerShouldBlock 示意是否阻塞
27 if (writerShouldBlock() ||
28 !compareAndSetState(c, c + acquires))
29 return false;
30
31 // 设置锁为以后线程所有
32 setExclusiveOwnerThread(current);
33 return true;
34 }
其中 exclusiveCount 办法示意占有写锁的线程数量,源码如下:
static int exclusiveCount(int c) {return c & EXCLUSIVE_MASK;}
阐明:间接将状态 state 和(2^16 – 1)做与运算,其等效于将 state 模上 2^16。写锁数量由 state 的低十六位示意。
从源代码能够看出,获取写锁的步骤如下:
(1)首先获取 c、w。c 示意以后锁状态;w 示意写线程数量。而后判断同步状态 state 是否为 0。如果 state!=0,阐明曾经有其余线程获取了读锁或写锁,执行(2);否则执行(5)。
(2)如果锁状态不为零(c != 0),而写锁的状态为 0(w = 0),阐明读锁此时被其余线程占用,所以以后线程不能获取写锁,天然返回 false。或者锁状态不为零,而写锁的状态也不为 0,然而获取写锁的线程不是以后线程,则以后线程也不能获取写锁。
(3)判断以后线程获取写锁是否超过最大次数,若超过,抛异样,反之更新同步状态(此时以后线程已获取写锁,更新是线程平安的),返回 true。
(4)如果 state 为 0,此时读锁或写锁都没有被获取,判断是否须要阻塞(偏心和非偏心形式实现不同),在非偏心策略下总是不会被阻塞,在偏心策略下会进行判断(判断同步队列中是否有等待时间更长的线程,若存在,则须要被阻塞,否则,无需阻塞),如果不须要阻塞,则 CAS 更新同步状态,若 CAS 胜利则返回 true,失败则阐明锁被别的线程抢去了,返回 false。如果须要阻塞则也返回 false。
(5)胜利获取写锁后,将以后线程设置为占有写锁的线程,返回 true。
办法流程图如下:
写锁的开释,tryRelease 办法:
1 protected final boolean tryRelease(int releases) {
2 // 若锁的持有者不是以后线程,抛出异样
3 if (!isHeldExclusively())
4 throw new IllegalMonitorStateException();
5 // 写锁的新线程数
6 int nextc = getState() - releases;
7 // 如果独占模式重入数为 0 了,阐明独占模式被开释
8 boolean free = exclusiveCount(nextc) == 0;
9 if (free)
10 // 若写锁的新线程数为 0,则将锁的持有者设置为 null
11 setExclusiveOwnerThread(null);
12 // 设置写锁的新线程数
13 // 不论独占模式是否被开释,更新独占重入数
14 setState(nextc);
15 return free;
16 }
写锁的开释过程还是相对而言比较简单的:首先查看以后线程是否为写锁的持有者,如果不是抛出异样。而后查看开释后写锁的线程数是否为 0,如果为 0 则示意写锁闲暇了,开释锁资源将锁的持有线程设置为 null,否则开释仅仅只是一次重入锁而已,并不能将写锁的线程清空。
阐明:此办法用于开释写锁资源,首先会判断该线程是否为独占线程,若不为独占线程,则抛出异样,否则,计算开释资源后的写锁的数量,若为 0,示意胜利开释,资源不将被占用,否则,示意资源还被占用。其办法流程图如下。
5、读锁的获取与开释
相似于写锁,读锁的 lock 和 unlock 的理论实现对应 Sync 的 tryAcquireShared 和 tryReleaseShared 办法。
读锁的获取,看下 tryAcquireShared 办法
1 protected final int tryAcquireShared(int unused) {
2 // 获取以后线程
3 Thread current = Thread.currentThread();
4 // 获取状态
5 int c = getState();
6
7 // 如果写锁线程数 != 0,且独占锁不是以后线程则返回失败,因为存在锁降级
8 if (exclusiveCount(c) != 0 &&
9 getExclusiveOwnerThread() != current)
10 return -1;
11 // 读锁数量
12 int r = sharedCount(c);
13 /*
14 * readerShouldBlock(): 读锁是否须要期待(偏心锁准则)15 * r < MAX_COUNT:持有线程小于最大数(65535)16 * compareAndSetState(c, c + SHARED_UNIT):设置读取锁状态
17 */
18 // 读线程是否应该被阻塞、并且小于最大值、并且比拟设置胜利
19 if (!readerShouldBlock() &&
20 r < MAX_COUNT &&
21 compareAndSetState(c, c + SHARED_UNIT)) {
22 //r == 0,示意第一个读锁线程,第一个读锁 firstRead 是不会退出到 readHolds 中
23 if (r == 0) { // 读锁数量为 0
24 // 设置第一个读线程
25 firstReader = current;
26 // 读线程占用的资源数为 1
27 firstReaderHoldCount = 1;
28 } else if (firstReader == current) { // 以后线程为第一个读线程,示意第一个读锁线程重入
29 // 占用资源数加 1
30 firstReaderHoldCount++;
31 } else { // 读锁数量不为 0 并且不为以后线程
32 // 获取计数器
33 HoldCounter rh = cachedHoldCounter;
34 // 计数器为空或者计数器的 tid 不为以后正在运行的线程的 tid
35 if (rh == null || rh.tid != getThreadId(current))
36 // 获取以后线程对应的计数器
37 cachedHoldCounter = rh = readHolds.get();
38 else if (rh.count == 0) // 计数为 0
39 // 退出到 readHolds 中
40 readHolds.set(rh);
41 // 计数 +1
42 rh.count++;
43 }
44 return 1;
45 }
46 return fullTryAcquireShared(current);
47 }
其中 sharedCount 办法示意占有读锁的线程数量,源码如下:
static int sharedCount(int c) {return c >>> SHARED_SHIFT;}
阐明:间接将 state 右移 16 位,就能够失去读锁的线程数量,因为 state 的高 16 位示意读锁,对应的第十六位示意写锁数量。
读锁获取锁的过程比写锁略微简单些,首先判断写锁是否为 0 并且以后线程不占有独占锁,间接返回;否则,判断读线程是否须要被阻塞并且读锁数量是否小于最大值并且比拟设置状态胜利,若以后没有读锁,则设置第一个读线程 firstReader 和 firstReaderHoldCount;若以后线程线程为第一个读线程,则减少 firstReaderHoldCount;否则,将设置以后线程对应的 HoldCounter 对象的值。流程图如下。
留神:更新胜利后会在 firstReaderHoldCount 中或 readHolds(ThreadLocal 类型的)的本线程正本中记录以后线程重入数(23 行至 43 行代码),这是为了实现 jdk1.6 中退出的 getReadHoldCount()办法的,这个办法能获取以后线程重入共享锁的次数(state 中记录的是多个线程的总重入次数),退出了这个办法让代码简单了不少,然而其原理还是很简略的:如果以后只有一个线程的话,还不须要动用 ThreadLocal,间接往 firstReaderHoldCount 这个成员变量里存重入数,当有第二个线程来的时候,就要动用 ThreadLocal 变量 readHolds 了,每个线程领有本人的正本,用来保留本人的重入数。
fullTryAcquireShared 办法:
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) { // 有限循环
// 获取状态
int c = getState();
if (exclusiveCount(c) != 0) { // 写线程数量不为 0
if (getExclusiveOwnerThread() != current) // 不为以后线程
return -1;
} else if (readerShouldBlock()) { // 写线程数量为 0 并且读线程被阻塞
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) { // 以后线程为第一个读线程
// assert firstReaderHoldCount > 0;
} else { // 以后线程不为第一个读线程
if (rh == null) { // 计数器不为空
//
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) { // 计数器为空或者计数器的 tid 不为以后正在运行的线程的 tid
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT) // 读锁数量为最大值,抛出异样
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) { // 比拟并且设置胜利
if (sharedCount(c) == 0) { // 读线程数量为 0
// 设置第一个读线程
firstReader = current;
//
firstReaderHoldCount = 1;
} else if (firstReader == current) {firstReaderHoldCount++;} else {if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
阐明:在 tryAcquireShared 函数中,如果下列三个条件不满足(读线程是否应该被阻塞、小于最大值、比拟设置胜利)则会进行 fullTryAcquireShared 函数中,它用来保障相干操作能够胜利。其逻辑与 tryAcquireShared 逻辑相似,不再累赘。
读锁的开释,tryReleaseShared 办法
1 protected final boolean tryReleaseShared(int unused) {
2 // 获取以后线程
3 Thread current = Thread.currentThread();
4 if (firstReader == current) { // 以后线程为第一个读线程
5 // assert firstReaderHoldCount > 0;
6 if (firstReaderHoldCount == 1) // 读线程占用的资源数为 1
7 firstReader = null;
8 else // 缩小占用的资源
9 firstReaderHoldCount--;
10 } else { // 以后线程不为第一个读线程
11 // 获取缓存的计数器
12 HoldCounter rh = cachedHoldCounter;
13 if (rh == null || rh.tid != getThreadId(current)) // 计数器为空或者计数器的 tid 不为以后正在运行的线程的 tid
14 // 获取以后线程对应的计数器
15 rh = readHolds.get();
16 // 获取计数
17 int count = rh.count;
18 if (count <= 1) { // 计数小于等于 1
19 // 移除
20 readHolds.remove();
21 if (count <= 0) // 计数小于等于 0,抛出异样
22 throw unmatchedUnlockException();
23 }
24 // 缩小计数
25 --rh.count;
26 }
27 for (;;) { // 有限循环
28 // 获取状态
29 int c = getState();
30 // 获取状态
31 int nextc = c - SHARED_UNIT;
32 if (compareAndSetState(c, nextc)) // 比拟并进行设置
33 // Releasing the read lock has no effect on readers,
34 // but it may allow waiting writers to proceed if
35 // both read and write locks are now free.
36 return nextc == 0;
37 }
38 }
阐明:此办法示意读锁线程开释锁。首先判断以后线程是否为第一个读线程 firstReader,若是,则判断第一个读线程占有的资源数 firstReaderHoldCount 是否为 1,若是,则设置第一个读线程 firstReader 为空,否则,将第一个读线程占有的资源数 firstReaderHoldCount 减 1;若以后线程不是第一个读线程,那么首先会获取缓存计数器(上一个读锁线程对应的计数器),若计数器为空或者 tid 不等于以后线程的 tid 值,则获取以后线程的计数器,如果计数器的计数 count 小于等于 1,则移除以后线程对应的计数器,如果计数器的计数 count 小于等于 0,则抛出异样,之后再缩小计数即可。无论何种状况,都会进入有限循环,该循环能够确保胜利设置状态 state。其流程图如下。
在读锁的获取、开释过程中,总是会有一个对象存在着,同时该对象在获取线程获取读锁是 +1,开释读锁时 -1,该对象就是 HoldCounter。
要明确 HoldCounter 就要先明确读锁。后面提过读锁的外在实现机制就是共享锁,对于共享锁其实咱们能够略微的认为它不是一个锁的概念,它更加像一个计数器的概念。一次共享锁操作就相当于一次计数器的操作,获取共享锁计数器 +1,开释共享锁计数器 -1。只有当线程获取共享锁后能力对共享锁进行开释、重入操作。所以 HoldCounter 的作用就是以后线程持有共享锁的数量,这个数量必须要与线程绑定在一起,否则操作其余线程锁就会抛出异样。
先看读锁获取锁的局部:
if (r == 0) {//r == 0,示意第一个读锁线程,第一个读锁 firstRead 是不会退出到 readHolds 中
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {// 第一个读锁线程重入
firstReaderHoldCount++;
} else { // 非 firstReader 计数
HoldCounter rh = cachedHoldCounter;//readHoldCounter 缓存
//rh == null 或者 rh.tid != current.getId(),须要获取 rh
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh); // 退出到 readHolds 中
rh.count++; // 计数 +1
}
这里为什么要搞一个 firstRead、firstReaderHoldCount 呢?而不是间接应用 else 那段代码?这是为了一个效率问题,firstReader 是不会放入到 readHolds 中的,如果读锁仅有一个的状况下就会防止查找 readHolds。可能就看这个代码还不是很了解 HoldCounter。咱们先看 firstReader、firstReaderHoldCount 的定义:
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
这两个变量比较简单,一个示意线程,当然该线程是一个非凡的线程,一个是 firstReader 的重入计数。
HoldCounter 的定义:
static final class HoldCounter {
int count = 0;
final long tid = Thread.currentThread().getId();
}
在 HoldCounter 中仅有 count 和 tid 两个变量,其中 count 代表着计数器,tid 是线程的 id。然而如果要将一个对象和线程绑定起来仅记录 tid 必定不够的,而且 HoldCounter 基本不能起到绑定对象的作用,只是记录线程 tid 而已。
诚然,在 java 中,咱们晓得如果要将一个线程和对象绑定在一起只有 ThreadLocal 能力实现。所以如下:
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {public HoldCounter initialValue() {return new HoldCounter();
}
}
ThreadLocalHoldCounter 继承 ThreadLocal,并且重写了 initialValue 办法。
故而,HoldCounter 应该就是绑定线程上的一个计数器,而 ThradLocalHoldCounter 则是线程绑定的 ThreadLocal。从下面咱们能够看到 ThreadLocal 将 HoldCounter 绑定到以后线程上,同时 HoldCounter 也持有线程 Id,这样在开释锁的时候能力晓得 ReadWriteLock 外面缓存的上一个读取线程(cachedHoldCounter)是否是以后线程。这样做的益处是能够缩小 ThreadLocal.get()的次数,因为这也是一个耗时操作。须要阐明的是这样 HoldCounter 绑定线程 id 而不绑定线程对象的起因是防止 HoldCounter 和 ThreadLocal 相互绑定而 GC 难以开释它们(只管 GC 可能智能的发现这种援用而回收它们,然而这须要肯定的代价),所以其实这样做只是为了帮忙 GC 疾速回收对象而已。
三、总结
通过下面的源码剖析,咱们能够发现一个景象:
在线程持有读锁的状况下,该线程不能获得写锁(因为获取写锁的时候,如果发现以后的读锁被占用,就马上获取失败,不论读锁是不是被以后线程持有)。
在线程持有写锁的状况下,该线程能够持续获取读锁(获取读锁时如果发现写锁被占用,只有写锁没有被以后线程占用的状况才会获取失败)。
认真想想,这个设计是正当的:因为当线程获取读锁的时候,可能有其余线程同时也在持有读锁,因而不能把获取读锁的线程“降级”为写锁;而对于取得写锁的线程,它肯定独占了读写锁,因而能够持续让它获取读锁,当它同时获取了写锁和读锁后,还能够先开释写锁持续持有读锁,这样一个写锁就“降级”为了读锁。
综上:
一个线程要想同时持有写锁和读锁,必须先获取写锁再获取读锁;写锁能够“降级”为读锁;读锁不能“降级”为写锁。
欢送关注公众号【码农开花】一起学习成长
我会始终分享 Java 干货,也会分享收费的学习材料课程和面试宝典
回复:【计算机】【设计模式】【面试】有惊喜哦