关于java:ReentrantReadWriteLock源码解析

39次阅读

共计 8172 个字符,预计需要花费 21 分钟才能阅读完成。

在应用 synchronize 关键字润饰办法后,只容许一个线程进行拜访,这个尽管有利于保障数据安全,却理论场景南辕北辙的。理论中数据都是读取多,写入少,咱们须要更粗细粒的并发锁。JVM concurrent.locks 包给咱们提供 ReadWriteLock 读写锁,内置两把锁,读锁、写锁,满足多个线程并发读取数据,写入时互斥所有线程,既保证了数据安全,又晋升了响应量。

概念

读锁:能够了解成共享锁,容许多个线程同时读取
写锁:独占锁,有且只容许一个线程拜访
读写互斥:在获取写锁时,必须期待所有读锁全副开释,能力获取胜利,读锁会梗塞写锁,写锁会梗塞所有的线程。
锁降级:在应用读锁时,曾经获取读锁线程在没有开释读锁的状况下,去获取写锁这就是锁降级。这是不被容许的,锁降级会造成死锁。

        // 这个会造成死锁
        ReadWriteLock  lock = new ReentrantReadWriteLock();
        lock.readLock().lock();
        lock.writeLock().lock();

锁降级:曾经获取到写锁线程,被容许在没有开释锁的状况上来获取读锁的,值得注意读锁、写锁依然须要独自开释。

        // 并不会造成死锁
        ReadWriteLock lock = new ReentrantReadWriteLock();
        lock.writeLock().lock();
        lock.readLock().lock();

应用官网例子演示 ReentrantReadWriteLock 应用场景,每次获取缓存时,先判断缓存是否曾经生效了,如果生效了应用写锁更新缓存。

 class CachedData {
    Object data;
    volatile boolean cacheValid; // 缓存生效标记
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
      void processCachedData() {rwl.readLock().lock(); 
     if (!cacheValid) {
        // 必须开释了读锁能力去获取写锁,这样不会造成死锁
        rwl.readLock().unlock(); 
       rwl.writeLock().lock(); 
       try {
          // 双重查看状态,因为在获取锁的可能被其余线程更新状态了
          // 获取到写锁,更新缓存和状态
          if (!cacheValid) {
            data = ...
            cacheValid = true;
          }
          // 通过在开释写锁之前获取读锁来降级
          rwl.readLock().lock(); 
       } finally {rwl.writeLock().unlock();
 // Unlock write, still hold read 
       } 
     } 
       try {use(data); 
     } finally {rwl.readLock().unlock();} 
   } 
 }

代码很少,然而十分有代表性,非常适合缓存这种读取多,更新少的场景。在每次读取缓存时,先开启读锁,查看缓存情况,须要更新缓存时。先开释读锁而后再去获取写锁,在更新前先判断缓存又没被其余线程更新过了,更新完数据后降级到读锁,再开释写锁,应用缓存开释读锁。

源码解析

这里源码剖析只有简略解说两个锁的获取、开释原理,看浏览源码之前,自备 AQS 的知识点。
ReentrantReadWriteLock 是实现 ReadWriteLock 接口的实现类,外部应用 AQS 的 int state 来示意读写锁的状态

如上图所示,两个锁的获取、开释都是同时应用 int state 来进行,应用低 16 位示意写锁获取次数、高 16 位示意读锁获取次数。应用外部类 Sync 独自编写共享锁、独占锁的获取开释具体实现,再应用 ReadLock、WriteLock 别离调用共享锁、独占锁的办法。源码浏览先从 Sync 外部类开始。

外部属性

    abstract static class Sync extends AbstractQueuedSynchronizer {
         // 读 写 锁分界点
        static final int SHARED_SHIFT   = 16;
        // 读锁最小单位,刚好示意以后领有一个读锁线程
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        // 反对最大读取次数
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        // 写锁掩码
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** 计算以后获取共享锁数量  */
        static int sharedCount(int c)    {return c >>> SHARED_SHIFT;}
        /** 计算以后获取独占锁数量  */
        static int exclusiveCount(int c) {return c & EXCLUSIVE_MASK;}

 // 次要用于保留每一个读锁线程的重入次数
 static final class HoldCounter { // 初始化对象,就将以后线程 id 赋值给 tid
            int count = 0; // 重入次数
            // Use id, not reference, to avoid garbage retention
            final long tid = getThreadId(Thread.currentThread());
        }

        /**
         *  保留 HoldCounter 到每一个线程公有栈祯
         */
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {public HoldCounter initialValue() {// 实现初始化接口,每一次调用 get()时,没有值就会调用初始化办法
                return new HoldCounter();}
        }

        /**
         *  记录读锁重入次数
         */
        private transient ThreadLocalHoldCounter readHolds;

        /**
         *  这个是上一个读锁 HoldCounter 缓存
         */
        private transient HoldCounter cachedHoldCounter;

我这里认为读锁做一个共享锁在重入次数上,state 不能精确表白出每一个线程到底重入了多少次,所以须要用到 HoldCounter 来记录每一个线程获取锁次数,在开释锁的时候,会看下如何应用的。

共享锁的获取和开释

        protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();
            int c = getState();
            // 当独占锁不等于 0,这时只有独占锁是本身的状况下能力获取到读锁
            // 两个条件都满足时,写锁获取到读锁   锁降级
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c); // 持有共享锁数
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                // 这里只对高位进行累加, 设置胜利就相当于获取锁胜利了
                compareAndSetState(c, c + SHARED_UNIT)) {if (r == 0) { // 首次加锁   firstReader 必须是读锁线程
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) { // 重入
                    firstReaderHoldCount++;
                } else { // 以后线程不是首个读锁持有者,要应用 HoldCounter 记录重入
                    HoldCounter rh = cachedHoldCounter;  // 这是上一个线程缓存
                    if (rh == null || rh.tid != getThreadId(current))
                         // 这里会返回以后线程初始化值 也就是数量为空 0
                        // 将以后线程重入对象赋值给缓存
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0) // 第一次进入
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            // cas 竞争失败,残缺版本共享锁获取
            return fullTryAcquireShared(current);
        }

readerShouldBlock 是一个队列梗塞策略办法,用于辨别偏心锁和非偏心锁的实现,当返回 true 时,会梗塞所有获取读锁线程。

        final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) { // 自旋获取锁,直到胜利
                int c = getState();
                if (exclusiveCount(c) != 0) {  // 这时曾经是写锁状态
                    if (getExclusiveOwnerThread() != current)  // 不是锁降级就退出循环
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) { // 当返回 true,则阐明曾经存在梗塞线程,这是要么自旋,要么失败
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) { // 重入获取读锁,这也是不行的
                        // assert firstReaderHoldCount > 0;
                    } else { // 判断线程栈祯是否还有重入,如果呈现重入自旋期待
                        if (rh == null) { 
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();}
                        }
                        if (rh.count == 0) // 锁曾经开释了,不能算是重入了,间接失败了
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT) 
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) { // 高 16 位运算,获取共享锁胜利
                    if (sharedCount(c) == 0) { // 上面代码跟下面基本一致,略过.....
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {firstReaderHoldCount++;} else {if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

总结下:当获取共享锁时,只有检测到独占锁时,获取锁办法会立刻返回失败。
看下共享锁开释

        protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();
            if (firstReader == current) { 
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1) // 锁曾经退出,偿还缓存
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) { // 以后锁没有重入,间接删除
                    readHolds.remove();
                    if (count <= 0) // 屡次开释锁
                        throw unmatchedUnlockException();}
                --rh.count;
            }
            for (;;) { // 自旋 锁数量减一
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;  // 共享锁数据为 0
            }
        }

HoldCounter 用于保护每一个线程开释锁数量,保障开释不会超过本身持有的数量。

独占锁获取和开释

        protected final boolean tryRelease(int releases) {if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }

        protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) { // c 不能等于 0,以后依然持有锁,有可能是独占锁或者是共享锁
                // 如果独占锁为空 0,则阐明以后依然有线程没有开释读锁,这个不满足写锁获取,间接失败
                //w > 0 , 这是阐明曾经有线程获取独占锁了,这时必须是重入才会获取胜利
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // 这里是重入了
                setState(c + acquires);
                return true;
            }
            // 竞争获取锁 
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

writerShouldBlock:当返回 true 会梗塞获取锁的线程,用于辨别偏心锁和非偏心锁实现。联合下面代码,当返回 true 时,不会去获取锁,间接失败了。
独占锁开释

        protected final boolean tryRelease(int releases) {if (!isHeldExclusively())  // 持有独占锁线程不是以后线程
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;  
            boolean free = exclusiveCount(nextc) == 0;
            if (free)  // 所有锁都被开释了,能够将独占锁线程致空
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }

偏心锁和非偏心锁

ReentrantReadWriteLock 外部有两个锁能够抉择,偏心锁和非偏心锁。通过结构参数进行抉择, 默认应用非偏心锁。

    public ReentrantReadWriteLock() {this(false);
    }

    public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock;}
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock;}

非偏心锁:在获取读锁或者写锁时,获取锁的线程并不是程序的,在梗塞队列中的线程可能长期期待,获取不到锁,而没有在梗塞队列中期待线程反而能疾速获取到锁,这个会造成线程饥饿,然而会比偏心锁有更高的吞吐量。
偏心锁:保障每一个期待最久线程最先获取到线程执行权,线程都会依照 AQS 梗塞程序获取锁,这样有利于防止线程饥饿的产生,然而在在获取锁须要判断队列有肯定性能损耗,所以吞吐量不如非偏心高。

偏心锁和非偏心锁区别在在于 writerShouldBlock、readerShouldBlock 办法实现不同而已。
偏心锁实现

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() {return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {return hasQueuedPredecessors();
        }
    }

hasQueuedPredecessors:返回 true 则阐明 AQS 中存在梗塞线程,只有在呈现写锁的时候,才会将获取锁线程放入队列中,所以 readerShouldBlock 在读锁获取时,会永远返回 false。
非偏心锁

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {return false; // writers can always barge}
        final boolean readerShouldBlock() {
            // 只有梗塞队列第一个线程为非共享锁时才会返回 true
            // 当队列后面曾经呈现写锁了,所有共享锁都不能和写锁竞争,放弃竞争
            return apparentlyFirstQueuedIsExclusive();}
    }

从下面代码晓得,只有这两个办法返回 true,都不能去竞争锁,偏心锁的策略非常明显,只有梗塞队列有线程,就会放弃锁竞争。而非偏心锁则是在写锁时,无论队列有无线程都会尝试竞争,写锁时只有队列最后面的线程为写锁时,才会放弃竞争,总的来说偏心锁和非偏心锁逻辑和 ReentrantLock 逻辑根本一样。

tryLock

在读锁、写锁的对象中,都存在 tryLock 办法,它跟 lock 办法有“亿点点”不同,尽管他们都是调用了外部 Sync 办法,然而在获取锁办法上,和下面剖析 tryAcquire、tryAcquireShared 基本一致,唯独短少了 readerShouldBlock、writerShouldBlock 应用。应用这个办法获取锁,无论偏心锁还非偏心锁,获取锁逻辑都一样。无论梗塞队列是否有线程,会间接竞争获取锁,在非偏心锁中读锁会退让队列中第一个写锁,写锁优先级会高于读锁。但 tryLock 不存在,所有锁的竞争的偏心的,疾速的,能够了解这个办法在获取锁上会有更高的优先级(相比 lock)。

正文完
 0