关于java:快进来花几分钟看一下-ReentrantReadWriteLock-的原理

31次阅读

共计 8973 个字符,预计需要花费 23 分钟才能阅读完成。

前言

在看完 ReentrantLock 之后,在高并发场景下 ReentrantLock 曾经足够应用,然而因为 ReentrantLock 是独占锁,同时只有一个线程能够获取该锁,而很多利用场景都是读多写少,这时候应用 ReentrantLock 就不太适合了。读多写少的场景该如何应用?在 JUC 包下同样提供了读写锁 ReentrantReadWriteLock 来应答读多写少的场景。

公众号:『刘志航』,记录工作学习中的技术、开发及源码笔记;时不时分享一些生存中的见闻感悟。欢送大佬来领导!

介绍

反对相似 ReentrantLock 语义的 ReadWriteLock 的实现。

具备以下属性:

  • 获取程序

此类不会将读取优先或写入优先强加给锁拜访的排序。然而,它的确反对可选的 偏心 策略。

反对 偏心模式 非偏心模式 ,默认为 非偏心模式

  • 重入

容许 reader 和 writer 依照 ReentrantLock 的款式从新获取读锁或写锁。在写线程开释持有的所有写锁后,reader 才容许重入应用它们。此外,writer 能够获取读锁,但反过来则不成立。

  • 锁降级

重入还容许从写锁降级为读锁,通过先获取写锁,而后获取读锁,最初开释写锁的形式降级。然而,从读锁降级到写锁是 不可能的

  • 锁获取的中断

读锁和写锁都反对锁获取期间的中断。

  • Condition 反对

写锁提供了一个 Condition 实现,对于写锁来说,该实现的形式与 ReentrantLock.newCondition() 提供的 Condition 实现对 ReentrantLock 所做的行为雷同。当然,此 Condition 只能用于写锁。读锁不反对 Condition

  • 监测

此类反对一些确定是放弃锁还是争用锁的办法。这些办法设计用于监视系统状态,而不是同步控制。

锁最多反对 65535 个递归写锁和 65535 个读锁

以上为 Java Api 官网文档[1] 的解释,总结一下内容如下:

  1. 反对非偏心和偏心模式,默认为非偏心模式。
  2. 反对重入,读锁能够重入获取读锁,写锁能够重入获取写锁,写锁能够获取读锁,读锁不能够获取写锁。
  3. 锁能够降级,从写锁降级为读锁,然而不可能从读锁降级到写锁。

根本应用

class CachedData {
    Object data;
    volatile boolean cacheValid;
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    void processCachedData() {
        // 读锁加锁
        rwl.readLock().lock();
        if (!cacheValid) {
            // 获取写锁之前必须开释读锁
            rwl.readLock().unlock();
            // 写锁加锁
            rwl.writeLock().lock();
            try {
                // 从新查看状态,因为另一个线程可能
                // 在执行操作之前获取了写锁定并更改了状态
                if (!cacheValid) {
                    data = ...
                    cacheValid = true;
                }
                // 通过在开释写锁之前获取读锁来降级
                rwl.readLock().lock();
            } finally {rwl.writeLock().unlock(); // Unlock write, still hold read}
        }

        try {use(data);
        } finally {rwl.readLock().unlock();}
    }
}

下面只是官网文档提供的一个 demo。

问题疑难

  1. 在 ReentrantReadWriteLock 中 state 代表什么?
  2. 线程获取锁的流程是怎么样的?
  3. 读锁和写锁的可重入性是如何实现的?
  4. 以后线程获取锁失败,被阻塞的后续操作是什么?
  5. 锁降级是怎么降级的?

源码剖析

代码构造

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    private static final long serialVersionUID = -6992448646407690164L;
    /** 提供读锁的外部类 */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** 提供写锁的外部类 */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** 执行所有同步机制 */
    final Sync sync;

}

state

之前在浏览 ReentrantLock 源码的时候 state 代表了锁的状态,0 示意没有线程持有锁,大于 1 示意曾经有线程持有锁及其重入的次数。而在 ReentrantReadWriteLock 是读写锁,那就须要保留 读锁 写锁 两种状态的,那是怎么样示意的呢?

在 ReentrantReadWriteLock 中同样存在一个 Sync 继承了 AbstractQueuedSynchronizer,也是 FairSync、NonfairSync 的父类。外部定义了 state 的一些操作。

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 6317671515068378041L;
    // 移位数
    static final int SHARED_SHIFT   = 16;
    // 单位
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    // 最大数量 1 << 16 -> 65536
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    // 计算独占数应用 1 << 16 -> 65536
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    // 返回共享保留数
    static int sharedCount(int c)    {return c >>> SHARED_SHIFT;}
    // 返回独占保留数 
    static int exclusiveCount(int c) {return c & EXCLUSIVE_MASK;}

}

在 AQS 中定义 state 为 int 类型,而在 ReentrantReadWriteLock 中,将 state 的 高 16 位和低 16 位拆开示意读写锁。其中高 16 位示意读锁,低 16 位示意写锁。别离应用 sharedCount 和 exclusiveCount 办法获取读锁和写锁的以后状态。

上面别离从读锁和写锁的角度来看如何进行加锁和开释锁的?

ReadLock.lock


public static class ReadLock 
    implements Lock, java.io.Serializable {
    /**
     * 获取读取锁。* 如果写锁没有被另一个线程持有,则获取读锁并立刻返回。* 如果写锁由另一个线程持有,则出于线程调度目标,* 以后线程将被禁用,并处于休眠状态,直到获取读锁为止。*/
    public void lock() {
        // 调用 AQS 获取共享资源
        sync.acquireShared(1);
    }
}

获取共享资源,这块应用的 AQS 的逻辑,其中 tryAcquireShared(arg) 是在 ReentrantReadWriteLock.Sync 中实现的。并且 AQS 中有规定,tryAcquireShared 分为三种返回值:

  1. 小于 0: 示意失败;
  2. 等于 0: 示意共享模式获取资源胜利,但后续的节点不能以共享模式获取胜利;
  3. 大于 0: 示意共享模式获取资源胜利,后续节点在共享模式获取也可能会胜利,在这种状况下,后续期待线程必须查看可用性。
abstract static class Sync extends AbstractQueuedSynchronizer {protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();
        // 获取 state 值
        int c = getState();
        // 独占计数不为 0 且 不是以后线程,阐明曾经有写锁
        if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
            return -1;
        // 获取共享计数(读锁计数)int r = sharedCount(c);
        // 不须要阻塞读锁 && 共享计数小于最大值 && state 更新胜利
        if (!readerShouldBlock() && r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {if (r == 0) {
                // 以后读锁计数为 0
                // firstReader 是取得读锁的第一个线程
                // firstReaderHoldCount 是 firstReader 的放弃计数
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                // 读锁重入
                firstReaderHoldCount++;
            } else {
                // 以后缓存计数
                HoldCounter rh = cachedHoldCounter;
                // 以后线程没有计数 或者 没有创立计数器
                if (rh == null || rh.tid != getThreadId(current))
                    // 创立计数,基于 ThreadLocal
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0) 
                    readHolds.set(rh);
                // 计数累加
                rh.count++;
            }
            return 1;
        }
        // 残缺地获取共享锁办法,作为 tryAcquireShared 办法因 CAS 获取锁失败后的解决。// 因为后面可能失败 CAS 失败,队列策略失败等起因。return fullTryAcquireShared(current);
    }
}
  1. 先获取 state,通过 exclusiveCount 办法获取到写锁的计数值,不为 0 且 不是以后线程,阐明曾经有写锁。返回 -1 失败。
  2. 通过 sharedCount 获取读锁计数,判断是否须要阻塞以及是否超过下限后,应用 CAS 更新 读锁计数。
  3. 设置或更新 firstReader、firstReaderHoldCount、cachedHoldCounter。
  4. 最初会进行残缺的获取共享锁办法,作为之前获取失败的后续解决办法。

firstReader:firstReader 是取得读锁的第一个线程;
firstReaderHoldCount:firstReaderHoldCount 是 firstReader 的放弃计数。即取得读锁的第一个线程的重入次数。
cachedHoldCounter:最初一个取得读锁的线程取得读锁的重入次数。

final int fullTryAcquireShared(Thread current) {
 
    HoldCounter rh = null;
    // 有限循环
    for (;;) {int c = getState();
        // 是否有写锁
        if (exclusiveCount(c) != 0) {
            // 有写锁,然而不是以后线程,间接返回失败
            if (getExclusiveOwnerThread() != current)
                return -1;
        } else if (readerShouldBlock()) {
            // 须要阻塞
            // 没有写锁,确保没有从新获取读锁
            if (firstReader == current) {// assert firstReaderHoldCount > 0;} else {
                // 以后线程的读锁计数 ThreadLocal 中
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {rh = readHolds.get();
                        // 计数完结,remove 掉
                        if (rh.count == 0)
                            readHolds.remove();}
                }
                // 为 0 间接失败
                if (rh.count == 0)
                    return -1;
            }
        }
        // 达到下限 抛出异样
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // CAS 设置读锁
        if (compareAndSetState(c, c + SHARED_UNIT)) {if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {firstReaderHoldCount++;} else {if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}
  1. 首先会始终循环
  2. 有写锁,然而不是以后线程,间接返回失败。然而,有写锁,如果是以后线程,是会继续执行的。
  3. 设置或更新 firstReader、firstReaderHoldCount、cachedHoldCounter。

当存在写锁(独占锁)时,办法会返回 -1 失败,后续会调用 AQS 的 doAcquireShared 办法,循环获取资源。doAcquireShared 办法会一直循环,尝试获取读锁,一旦获取到读锁,以后节点会立刻唤醒后续节点,后续节点开始尝试获取读锁,顺次流传。

ReadLock.unlock

public static class ReadLock 
    implements Lock, java.io.Serializable {public void unlock() {sync.releaseShared(1);
    }
}

调用 AQS 的 releaseShared 开释共享资源办法。

其中 tryReleaseShared 有 ReadLock 实现。

protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();
    if (firstReader == current) {
        // 第一个线程是以后线程
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        // 第一个线程不是以后线程,更新本人的 ThreadLocal 外面的计数
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();}
        --rh.count;
    }
    // 循环
    for (;;) {int c = getState();
        int nextc = c - SHARED_UNIT;
        // 应用 CAS 更新 state
        if (compareAndSetState(c, nextc))
            // 然而如果当初读和写锁都已开释,// 它可能容许期待的写程序持续进行。return nextc == 0;
    }
}
  1. 如果是第一个线程,间接更新技术,不是则更新本人 ThreadLocal 外面保留的计数。
  2. 循环,应用 CAS 更新 state 的值。
  3. 如果 state 更新后的值为 0,阐明没有线程持有读锁或者写锁了。
  4. 当 state 为 0,此时会调用 AQS 的 doReleaseShared 办法。此时队列如果有写锁,那就会被写锁获取的锁。

WriteLock.lock

public static class WriteLock 
    implements Lock, java.io.Serializable {
    /**
     * 获取写入锁。* 如果没有其余线程持有读锁或写锁,会间接返回,并将写锁计数设置为 1。* 如果以后线程持有写锁,则将写锁计数 +1,而后返回。* 如果锁正在被其余线程持有,则以后线程用于线程调度目标,* 以后线程将被禁用,并处于休眠状态,直到获取读锁并将写锁计数设置为 1。*/
    public void lock() {sync.acquire(1);
    }
}

tryAcquire 办法由 Write 本人实现,形式和 ReentrantLock 相似。

protected final boolean tryAcquire(int acquires) {
    
    // 如果读锁计数为非零或写锁计数为非零,并且所有者是另一个线程,则失败。// 如果计数饱和,则失败。只有在 count 不为零时,才可能产生这种状况。// 否则,如果该线程是可重入获取或队列策略容许的话,则有资格进行锁定。// 如果是这样,请更新状态并设置所有者。Thread current = Thread.currentThread();
    int c = getState();
    // 写锁计数
    int w = exclusiveCount(c);
    // c!= 0 阐明有有线程获取锁了
    if (c != 0) {// (Note: if c != 0 and w == 0 then shared count != 0)
        // 判断是不是本人,不是本人 返回 false
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 判断有没有超过下限
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 重入
        setState(c + acquires);
        return true;
    }
    // 不须要阻塞,或者 CAS 更新 state 失败
    if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}
  1. 获取 state,如果 state 不为 0 则判断是否为以后线程重入获取。
  2. state 为 0,则以后线程 CAS 更新 state,获取锁。
  3. 更新胜利之后绑定以后线程。
  4. 如果失败会持续调用 AQS 的 acquireQueued,将以后阻塞放在 AQS 队列中。AQS 会一直循环,期待上一个锁开释后,尝试取得锁。

WriteLock.unlock

public static class WriteLock 
    implements Lock, java.io.Serializable {
    // 如果以后线程是此锁的持有者,则放弃计数递加。// 如果放弃当初的计数为零,则解除锁定。// 如果以后线程不是此锁的持有者则 IllegalMonitorStateException 异样。public void unlock() {sync.release(1);
    }
}

同样这块代码是应用 AQS 的逻辑,tryRelease 局部由 WriteLock 本人实现。

protected final boolean tryRelease(int releases) {if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}
  1. 如果是以后线程重入,扣减重入次数。
  2. 扣减后如果为 0,则设置锁持有线程为 null,更新 state 值。AQS 会唤醒后续节点获取锁。

总结

问题

Q:在 ReentrantReadWriteLock 中 state 代表什么?

A:state 代表锁的状态。state 为 0,没有线程持有锁,state 的高 16 为代表读锁状态,低 16 为代表写锁状态。通过位运算能够获取读写锁的理论值。

Q:线程获取锁的流程是怎么样的?

A:能够参考下面的源码笔记,以及前面的流程图。

Q:读锁和写锁的可重入性是如何实现的?

A:在加锁的时候,判断是否为以后线程,如果是以后线程,则间接累加计数。值得注意的是:读锁重入计数应用的 ThreadLocal 在线程中缓存计数,而写锁则间接用的 state 进行累加(其实和 state 低 16 位进行累加一样)。

Q:以后线程获取锁失败,被阻塞的后续操作是什么?

A:获取失败,会放到 AQS 期待队列中,在队列中一直循环,监督前一个节点是否为 head,是的话,会从新尝试获取锁。

Q:锁降级是怎么降级的?

A:
如图,在圈出局部 fullTryAcquireShared 代码中,能够看进去,在获取读锁的时候,如果以后线程持有写锁,是能够获取读锁的。这块就是指锁降级,比方线程 A 获取到了写锁,当线程 A 执行结束时,它须要获取以后数据,假如不反对锁降级,就会导致 A 开释写锁,而后再次申请读锁。而在这两头是有可能被其余阻塞的线程获取到写锁的。从而导致线程 A 在一次执行过程中数据不统一。

小结

  1. ReentrantReadWriteLock 读写锁,外部实现是 ReadLock 读锁 和 WriteLock 写锁。读锁,容许共享;写锁,是独占锁。
  2. 读写锁都反对重入,读锁的重入次数记录在线程保护的 ThreadLocal 中,写锁保护在 state 上(低 16 位)。
  3. 反对锁降级,从写锁降级为读锁,避免脏读。
  4. ReadLock 和 WriteLock 都是通过 AQS 来实现的。获取锁失败后会放到 AQS 期待队列中,后续一直尝试获取锁。区别在读锁只有存在写锁的时候才放到期待队列,而写锁是只有存在非以后线程锁(无论写锁还是读锁)都会放到期待队列。!
  5. 通过源码剖析,能够得出读写锁适宜在 读多写少 的场景中应用。

相干材料

[1] Java Api:https://docs.oracle.com/javas…

正文完
 0