针对读多写少的场景,Java提供了另外一个实现Lock接口的读写锁ReentrantReadWriteLock(RRW),之前剖析过ReentrantLock是一个独占锁,同一时间只容许一个线程拜访。

而 RRW 容许多个读线程同时拜访,但不容许写线程和读线程、写线程和写线程同时拜访。
读写锁外部保护了两个锁,一个是用于读操作的ReadLock,一个是用于写操作的 WriteLock。

读写锁恪守以下三条根本准则

  1. 容许多个线程同时读共享变量;
  2. 只容许一个线程写共享变量;
  3. 如果一个写线程正在执行写操作,此时禁止读线程读共享变量。

读写锁如何实现

RRW也是基于AQS实现的,它的自定义同步器(继承自AQS)须要在同步状态state上保护多个读线程和一个写线程的状态。RRW的做法是应用高下位来实现一个整形管制两种状态,一个int占4个字节,一个字节8位。所以高16位示意读,低16位示意写。

abstract static class Sync extends AbstractQueuedSynchronizer {  static final int SHARED_SHIFT   = 16;  // 10000000000000000(65536)  static final int SHARED_UNIT    = (1 << SHARED_SHIFT);  // 65535  static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;  //1111111111111111  static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;  // 读锁(共享锁)的数量,只计算高16位的值  static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }  // 写锁(独占锁)的数量  static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } }

获取读锁

当线程获取读锁时,首先判断同步状态低16位,如果存在写锁,则获取锁失败,进入CLH队列阻塞,反之,判断以后线程是否应该被阻塞,如果不应该阻塞则尝试 CAS 同步状态,获取胜利更新同步锁为读状态。

 protected final int tryAcquireShared(int unused) {             Thread current = Thread.currentThread();  int c = getState();  // 如果以后曾经有写锁了,则获取失败  if (exclusiveCount(c) != 0 &&      getExclusiveOwnerThread() != current)      return -1;  // 获取读锁数量  int r = sharedCount(c);  // 非偏心锁实现中readerShouldBlock()返回true示意CLH队列中有正在排队的写锁  // CAS设置读锁的状态值  if (!readerShouldBlock() &&      r < MAX_COUNT &&      compareAndSetState(c, c + SHARED_UNIT)) {      // 省略记录获取readLock次数的代码      return 1;  }  // 针对下面失败的条件进行再次解决  return fullTryAcquireShared(current);}final int fullTryAcquireShared(Thread current) {    // 无线循环  for (;;) {    int c = getState();    if (exclusiveCount(c) != 0) {      // 如果不是以后线程持有写锁,则进入CLH队列阻塞      if (getExclusiveOwnerThread() != current)        return -1;    }     // 如果reader应该被阻塞    else if (readerShouldBlock()) {        // Make sure we're not acquiring read lock reentrantly        if (firstReader == current) {            // assert firstReaderHoldCount > 0;        } else {            if (rh == null) {                rh = cachedHoldCounter;                if (rh == null || rh.tid != getThreadId(current)) {                    rh = readHolds.get();                    if (rh.count == 0)                        readHolds.remove();                }            }            // 以后线程没有持有读锁,即不存在锁重入状况。则进入CLH队列阻塞            if (rh.count == 0)                return -1;        }    }    // 共享锁的如果超出了限度    if (sharedCount(c) == MAX_COUNT)        throw new Error("Maximum lock count exceeded");    // CAS设置状态值    if (compareAndSetState(c, c + SHARED_UNIT)) {            // 省略记录readLock次数的代码      return 1;    }  }}

SHARED_UNIT的值是65536,也就是说,当第一次获取读锁的后,state的值就变成了65536。
在偏心锁的实现中当CLH队列中有排队的线程,readerShouldBlock()办法就会返回为true。非偏心锁的实现中则是当CLH队列中存在期待获取写锁的线程就返回true

还须要留神的是获取读锁的时候,如果以后线程曾经持有写锁,是依然能获取读锁胜利的。前面会提到锁的降级,如果你对那里的代码有疑难,能够在回过头来看看这里申请锁的代码

开释读锁

protected final boolean tryReleaseShared(int unused) {             for (;;) {    int c = getState();    // 减去65536    int nextc = c - SHARED_UNIT;    // 只有当state的值变成0才会真正的开释锁    if (compareAndSetState(c, nextc))        return nextc == 0;}}

开释锁时,state的值须要减去65536,因为当第一次获取读锁后,state值变成了65536。

任何一个线程开释读锁的时候只有在state==0的时候才真正开释了锁,比方有100个线程获取了读锁,只有最初一个线程执行tryReleaseShared办法时才真正开释了锁,此时会唤醒CLH队列中的排队线程。

获取写锁

一个线程尝试获取写锁时,会先判断同步状态 state 是否为0。如果 state 等于 0,阐明临时没有其它线程获取锁;如果 state 不等于 0,则阐明有其它线程获取了锁。

此时再判断state的低16位(w)是否为0,如果w为0,示意其余线程获取了读锁,此时进入CLH队列进行阻塞期待。

如果w不为0,则阐明其余线程获取了写锁,此时须要判断获取了写锁的是不是以后线程,如果不是则进入CLH队列进行阻塞期待,如果获取了写锁的是以后线程,则判断以后线程获取写锁是否超过了最大次数,若超过,抛出异样。反之则更新同步状态。

// 获取写锁protected final boolean tryAcquire(int acquires) {             Thread current = Thread.currentThread();  int c = getState();  int w = exclusiveCount(c);  // 判断state是否为0  if (c != 0) {      // 获取锁失败      if (w == 0 || current != getExclusiveOwnerThread())          return false;      // 判断以后线程获取写锁是否超出了最大次数65535      if (w + exclusiveCount(acquires) > MAX_COUNT)          throw new Error("Maximum lock count exceeded");            // 锁重入      setState(c + acquires);      return true;  }  // 非偏心锁实现中writerShouldBlock()永远返回为false  // CAS批改state的值  if (writerShouldBlock() ||      !compareAndSetState(c, c + acquires))      return false;  // CAS胜利后,设置以后线程为领有独占锁的线程  setExclusiveOwnerThread(current);  return true;}

在偏心锁的实现中当CLH队列中存在排队的线程,那么writerShouldBlock()办法就会返回为true,此时获取写锁的线程就会被阻塞。

开释写锁

开释写锁的逻辑比较简单

 protected final boolean tryRelease(int releases) {  // 写锁是否被以后线程持有  if (!isHeldExclusively())      throw new IllegalMonitorStateException();    int nextc = getState() - releases;  boolean free = exclusiveCount(nextc) == 0;  // 没有其余线程持有写锁  if (free)      setExclusiveOwnerThread(null);  setState(nextc);  return free;}

锁的降级?

// 筹备读缓存readLock.lock();try {  v = map.get(key);  if(v == null) {    writeLock.lock();    try {      if(map.get(key) != null) {        return map.get(key);      }      // 更新缓存代码,省略    } finally {      writeLock.unlock();    }  }} finally {  readLock.unlock();}

对于下面获取缓存数据(这也是RRW的利用场景)的代码,先是获取读锁,而后再降级为写锁,这样的行为叫做锁的降级。惋惜RRW不反对,这样会导致写锁永恒期待,最终导致线程被永恒阻塞。所以锁的降级是不容许的

锁的降级

尽管锁的降级不容许,然而锁的降级却是能够的。

ReentrantReadWriteLock lock = new ReentrantReadWriteLock();ReadLock readLock = lock.readLock();WriteLock writeLock = lock.writeLock();Map<String, String> dataMap = new HashMap();public void processCacheData() {  readLock.lock();  if(!cacheValid()) {    // 开释读锁,因为不容许    readLock.unlock();    writeLock.lock();    try {      if(!cacheValid()) {          dataMap.put("key", "think123");      }      // 降级为读锁      readLock.lock();    } finally {        writeLock.unlock();    }  }  try {    // 依然持有读锁    System.out.println(dataMap);  } finally {      readLock.unlock();  }}public boolean cacheValid() {    return !dataMap.isEmpty();}

RRW须要留神的问题

  1. 在读取很多、写入很少的状况下,RRW 会使写入线程遭逢饥饿(Starvation)问题,也就是说写入线程会因迟迟无奈竞争到锁而始终处于期待状态。
  2. 写锁反对条件变量,读锁不反对。读锁调用newCondition() 会抛出UnsupportedOperationException 异样

举荐浏览

之前有写过AQS的实现,ReentrantLock的实现,能够参考我上面的文章

  1. AQS源码剖析
  2. ReentrantLock剖析

关注我,不迷路

老哥,能给我个点赞,和关注吗?