关于java:Java-JUC-ReentrantReadWriteLock解析

42次阅读

共计 11017 个字符,预计需要花费 28 分钟才能阅读完成。

读写锁 ReentrantReadWriteLock 原理

介绍

ReentrantReadWriteLock 和 ReentrantLock 的区别是,ReentrantLock 是 独占锁 ,同一时间只能有一个线程获取锁,但在理论中更多的是读多写少的状况,显然 ReentrantLock 满足不了该状况, 而 ReentrantReadWriteLock 采纳了读写拆散的策略,能够容许多个线程同时进行读取

从该类图能够看到,在读写锁外部保护了一个 ReadLock 和一个 WriteLock,它们依赖Sync 实现具体性能,而 Sync 继承自 AQS,并且也提供了 偏心 非偏心 的实现。

上面咱们只介绍非偏心的实现

咱们晓得在 AQS 中保护了一个 state 状态,而在 ReentrantReadWriteLock 中则须要保护读状态和写状态,那么一个 state 如何标识写和读两种状态呢?

ReentrantReadWriteLock 奇妙地应用 state 的高 16 位示意读状态,也就是获取到读锁的次数;应用低 16 位示意获取到写锁的线程的可重入次数

static final int SHARED_SHIFT   = 16;
// 共享锁 (读锁) 状态单位值 65536
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
// 共享锁线程最大个数 65535
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
// 排它锁 (写锁) 掩码,二进制,15 个 1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

// 返回读锁线程数量
static int sharedCount(int c)    {return c >>> SHARED_SHIFT;}
// 返回写锁可重入可数
static int exclusiveCount(int c) {return c & EXCLUSIVE_MASK;}

在 ReentrantReadWriteLock 类图中,firstReader用来记录 第一个获取到读锁 的线程,firstReaderHoldCount则是记录第一个获取到读锁的线程可重入次数,cachedHoldCounter用来记录最初一个获取读锁的线程可重入次数。

readHolds是 ThreadLocal 变量,用来寄存除第一个获取读线程外的其余线程获取读锁的可重入次数。

ThreadLocalHoldCounter 继承了 ThreadLocal,所以 initialValue 办法返回一个 HoldCounter 对象。

static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {public HoldCounter initialValue() {return new HoldCounter();
    }
}
static final class HoldCounter {
    int count = 0;
    // Use id, not reference, to avoid garbage retention
    final long tid = getThreadId(Thread.currentThread());
}

写锁的获取与开释

📢:在 ReentrantReadWriteLock 中写锁是应用 WriteLock 类实现

void lock()

写锁为 独占锁 ,在某一时刻只能有一个线程获取该锁,如果以后没有线程获取到读锁和写锁,则以后线程能够获取到写锁而后返回。如果以后曾经有线程获取到读锁和写锁,则以后申请写锁的线程会被阻塞挂起, 写锁是可重入锁,如果以后线程曾经获取了该锁,则再次获取时仅仅将可重入次数加 1 即可。

public void lock() {sync.acquire(1);
}
public final void acquire(int arg) {if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();}

能够看到在 lock 外部调用了 AQS 的 acquire 办法,其中 tryAcquire 则是在 ReentrantReadWriteLock 外部的 Sync 类重写。如下:

protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
                      //(1) c != 0 阐明读锁或写锁曾经被某线程获取
            if (c != 0) {//(2) w == 0 阐明曾经有线程获取到该锁,w != 0 并且以后线程并不是拥有者则返回 false
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                //(3) 阐明以后线程获取到了写锁,判断可重入次数
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                //(4)可重入次数加一
                setState(c + acquires);
                return true;
            }
            //(5)第一个写线程获取写锁
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
}

在代码(1)中,如果以后 AQS 状态值不为 0 则阐明曾经有线程获取到了读锁或者写锁。

在代码(2)中,如果 w==0 则阐明状态值的低 16 位为 0,而 AQS 状态值不为 0,暗示曾经有线程获取到了读锁,所以返回 false。而如果 w!=0 则阐明以后曾经有线程获取到了写锁,那么看看是不是本人取得的,不是返回 false。

在代码(3)中,阐明以后线程曾经获取到了该锁,判断该线程可重入次数是否大于最大值,是则抛出异样,否则执行代码(4)进行减少可重入次数,而后返回 true。

如果 AQS 的状态值等于 0 则阐明目前没有线程获取到读锁和写锁,所以执行代码(5)。其中,对于 writerShouldBlock 办法,非偏心锁的实现为如下:

final boolean writerShouldBlock() {return false; // writers can always barge}

能够看到该办法返回 false,阐明须要进行 CAS 尝试获取写锁,获取胜利则设置以后锁的持有人为以后线程,而后返回 true,否则返回 false。

偏心锁实现为如下:

final boolean writerShouldBlock() {return hasQueuedPredecessors();
}
public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
}

这里还是应用 hasQueuedPredecessors 来判断以后节点是否有前驱节点等,在《ReentrantLock 解析》文章曾经讲过了,这里就不再多讲了。

void lockInterruptibly()

相似于 lock 办法,它的不同之处在于,它会对中断进行响应,也就是当其余线程调用了该线程的 interrupt 办法中断了以后线程时,以后线程会抛出异样 InterruptedException 异样。

public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);
}

boolean tryLock()

尝试获取写锁,如果以后没有其余线程持有写锁或者读锁,则以后线程获取写锁而后返回 true;如果曾经有线程获取写锁或读锁则返回 false,并且以后线程不会阻塞;如果以后线程曾经持有了写锁则 state 值加 1 而后返回 true。实现与 tryAcquire 办法相似,不多讲。

public boolean tryLock( ) {return sync.tryWriteLock();
}
final boolean tryWriteLock() {Thread current = Thread.currentThread();
    int c = getState();
    if (c != 0) {int w = exclusiveCount(c);
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    if (!compareAndSetState(c, c + 1))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

boolean tryLock(long timeout,TimeUnit unit)

与 tryAcquire 办法不同之处在于,多了超时工夫参数,如果尝试获取写锁失败后会把以后线程挂起指定工夫,等待时间到后激活该线程重试获取,如果还是没获取到写锁则返回 false。另外,该办法会对中断进行响应,也就是当其余线程调用了该线程的 interrupt 办法中断了以后线程时,以后线程会抛出 InterruptedException 异样。

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

void unlock()

尝试开释锁,如果以后线程持有该锁,调用后则会 AQS state 状态值减 1,如果减 1 后为 0 则开释该锁,否则仅仅减 1。如果以后线程没有持有该锁而调用该办法则抛出异样。

public void unlock() {sync.release(1);
}
public final boolean release(int arg) {if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
protected final boolean tryRelease(int releases) {
    // 判断调用线程是否是写锁拥有者
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
   // 获取可重入值,这里没有思考高 16 位,因为获取写锁时状态值必定为 0
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    // 如果为 0 则开释锁,否则只是更新状态值
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

读锁的获取与开释

📢:在 ReentrantReadWriteLock 中读锁是应用 ReadLock 类实现

void lock()

获取读锁,如果以后线程没有持有该锁,则以后线程获取该锁,AQS 状态值 state 的高 16 位进行加 1,而后返回。否则如果其余线程持有写锁,则以后线程被阻塞。

public void lock() {sync.acquireShared(1);
}
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

在该段代码中,读锁的 lock 办法调用了 AQS 的 acquireShared 办法,随后外部调用了 ReentrantReadWriteLock 中的 Sync 类重写的 tryAcquireShared 办法。

protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();
    //(1)获取以后状态值
    int c = getState();
    //(2)判断是否被写锁占用
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    //(3)获取读锁计数
    int r = sharedCount(c);
    //(4)尝试获取锁,多个读线程同时只有一个会胜利,不胜利的进入 fullTryAcquireShared 办法重试
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //(5)第一个线程获取锁,设置 firstReader 等于以后线程
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        //(6)如果以后线程是第一个获取锁的线程则进行第一个线程可重入次数加 1
        } else if (firstReader == current) {firstReaderHoldCount++;} else {
            //(7)记录最初一个获取读锁的线程或其余线程读锁的可重入数
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
      //(8)相似 tryAcquireShared,然而是自旋获取
    return fullTryAcquireShared(current);
}

该办法首先获以后 AQS 状态值,而后代码(2)查看是否有其余线程获取到了写锁,如果是则返回 -1。而后放入 AQS 阻塞队列。

如果以后要获取读锁的线程曾经持有了写锁,则也能够获取读锁。然而须要留神,当一个线程先获取了写锁,而后获取了读锁解决事件结束后,要记得把读锁和写锁都开释掉,不能只开释写锁。

如果执行到代码(3)首先获取读锁的个数,到这里阐明目前没有线程获取到写锁,然而可能曾经有线程获取到了读锁,而后执行代码(4),其中非偏心锁的 readerShouldBlock 实现如下:

final boolean readerShouldBlock() {return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return // 头节点不容许为空,也就是阻塞队列存在
            (h = head) != null &&
            // 头节点下一个节点必须存在
            (s = h.next)  != null &&
            // 下一个节点不能共享,也就是写锁
            !s.isShared()         &&
            // 下一个节点的线程对象不容许为空
        s.thread != null;
}

该办法作用是,如果阻塞队列是空的,那么能够获取;如果阻塞队列不是空的,分两种状况。一:如果第一个节点是写节点,那么你不能获取读锁,阻塞排队。二,如果第一个节点是读节点,那么能够获取。

随后在代码(4)进行判断以后线程以后获取读锁的线程是否达到了最大值。最初执行 CAS 操作将 AQS 状态值的高 16 位值减少 1。

代码(5)(6)记录第一个获取读锁的线程并统计该线程获取读锁的可重入数。

代码(7)应用 cachedHoldCounter 记录最初一个获取到读锁的线程和该线程获取读锁的可重入数,readHolds 记录了以后线程获取读锁的可重入数。

如果 readerShouldBlock 办法返回 true,则阐明有线程正在获取写锁,所以执行代码(8),fullTryAcquireShared 的代码与 tryAcquireShared 相似,它们的不同之处在于,前者通过循环自旋获取。

final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                // 获取状态值
                int c = getState();
                // 如果存在写锁
                if (exclusiveCount(c) != 0) {
                    // 持有者不是本人 返回 -1
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                // 如果写锁闲暇,且能够获取读锁
                } else if (readerShouldBlock()) {
                    // 第一个读线程是以后线程
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                     // 如果不是以后线程
                    } else {if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                // 获取可重入次数
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();}
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                // 如果读锁线程大于最大值 抛出异样
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // CAS 设置读锁,高位加 1
                if (compareAndSetState(c, c + SHARED_UNIT)) {// sharedCount(c) == 0 阐明读锁闲暇 进行设置
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                     // 如果不闲暇,并且第一个线程为以后线程则进行更新相加
                    } else if (firstReader == current) {firstReaderHoldCount++;} else {
                        // 如果不是以后线程
                        if (rh == null)
                            rh = cachedHoldCounter;
                         // 如果最初一个读计数器所属线程不是以后线程
                        if (rh == null || rh.tid != getThreadId(current))
                            // 创立一个 cachedHoldCounter
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        // 计数器自增
                        rh.count++;
                        // 更新缓存计数器
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
}

void lockInterruptibly()

相似于 lock 办法,不同之处在于,该办法会对中断进行响应,也就是当其余线程调用了该线程的 interrupt 办法中断了以后线程时,以后线程会抛出 InterruptedException 异样。

boolean tryLock()

尝试获取读锁,如果以后没有其余线程持有写锁,则以后线程获取读锁会胜利,而后返回 true。如果以后曾经有其余线程持有写锁则该办法间接返回 false,但以后线程并不会被阻塞。如果以后线程曾经持有了该读锁则简略减少 AQS 的状态值高 16 位后间接返回 true。其代码相似 tryLock 的代码,这里不再讲述。

boolean tryLock(long timeout,TimeUnit unit)

与 tryLock 的不同之处在于,多了超时工夫参数,如果尝试获取读锁失败则会把以后线程挂起指定工夫,待超时工夫到后以后线程被激活,如果此时还没有获取到读锁则返回 false。另外,该办法对中断响应,也就是当其余线程调用了该线程的 interrupt 办法中断了以后线程时,以后线程会抛出 InterruptedException 异样。

void unlock()

读锁的开释是委托给 Sync 来做,releaseShared 办法如下。

public void unlock() {sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();
        return true;
    }
    return false;
}
protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();
    // 如果以后线程是第一个线程
    if (firstReader == current) {
        // 如果等于 1 阐明没有反复获取,则设置为 null
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
          // 否则减 1
            firstReaderHoldCount--;
    } else {// 如果不是以后线程
        HoldCounter rh = cachedHoldCounter;
        // 如果缓存是 null 或者缓存所属线程不是以后线程,则以后线程不是最初一个读锁
        if (rh == null || rh.tid != getThreadId(current))
          // 获取以后线程的计数器
            rh = readHolds.get();
        int count = rh.count;
        // 如果计数器小于等于一,就间接删除计数器
        if (count <= 1) {readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();}
        // 对计数器减一
        --rh.count;
    }
    // 死循环
    for (;;) {int c = getState();
        // 减去一个读锁。对高 16 位减 1
        int nextc = c - SHARED_UNIT;
        // 批改胜利,如果是 0,示意读锁和写锁都闲暇,则能够唤醒前面的期待线程
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

如上代码所示,在死循环中,首先获取以后 AQS 状态值并保留到变量 c,而后 c 减去一个读计数单位后应用 CAS 操作更新 AQS 状态值,如果更新成绩则查看以后 AQS 状态值是否为 0,为 0 阐明没有读线程占用读锁,则返回 true。

随后调用 doReleaseShared 办法开释一个因为获取写锁而被阻塞的线程,如果以后 AQS 状态值不为 0,则阐明还有其余读线程,所以返回 false。

锁的升降级

升降级是指读锁降级为写锁,写锁降级为读锁。在 ReentrantReadWriteLock 读写锁中,只反对写锁降级为读锁,而不反对读锁降级为写锁。

代码示例:

public class LockTest {private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false);
    private static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
    private static ReentrantReadWriteLock.ReadLock readLock = lock.readLock();

    public static void main(String[] args) {new Thread(() -> write()).start();
        new Thread(() -> read()).start();}

    private static void read() {readLock.lock();
        try {System.out.println(Thread.currentThread().getName() + "开始学习《Thinking in Java》");
            writeLock.lock();
            System.out.println(Thread.currentThread().getName() + "取得到了写锁");
        } finally {writeLock.unlock();
            System.out.println(Thread.currentThread().getName() + "太难了!我不学了!");
            readLock.unlock();}
    }

    private static void write() {writeLock.lock();
        try {System.out.println(Thread.currentThread().getName() + "开始印刷《Thinking in Java》");
            readLock.lock();
            System.out.println(Thread.currentThread().getName() + "在写锁中获取到了读锁");
        } finally {readLock.unlock();
            System.out.println(Thread.currentThread().getName() + "印刷实现");
            writeLock.unlock();}
    }
}

运行后果:

Thread- 0 开始印刷《Thinking in Java》Thread- 0 在写锁中获取到了读锁
Thread- 0 印刷实现
Thread- 1 开始学习《Thinking in Java》

咱们能够看到在写锁中胜利取得到了读锁,而在 读锁中被始终阻塞。阐明不反对锁降级!

为什么 ReentrantReadWriteLock 不反对锁降级

次要是防止死锁,例如两个线程 A 和 B 都在读,A 降级要求 B 开释读锁,B 降级要求 A 开释读锁,相互期待造成死循环。如果能严格保障每次都只有一个线程降级那也是能够的。

在 tryAcquireShared 办法和 fullTryAcquireShared 中都有体现,例如上面的判断:

if (exclusiveCount(c) != 0) {if (getExclusiveOwnerThread() != current)
        return -1;

该代码意思是:当写锁被持有时,如果持有该锁的线程不是以后线程,就返回获取锁失败,反之就会持续获取读锁。称之为锁降级。

总结

  1. 读写锁特点:读锁是共享锁,写锁是排他锁,读锁和写锁不能同时存在
  2. 插队策略:为了避免线程饥饿,读锁不能插队
  3. 降级策略:只能降级,不能降级
  4. ReentrantReadWriteLock 适宜于 读多写少 的场合,能够进步并发效率,而 ReentrantLock 适宜一般场合

总结

ReentrantReadWriteLock 奇妙地应用 AQS 的状态值的高 16 位示意获取到读锁的个数,低 16 位示意获取写锁的线程的可重入次数,并通过 CAS 对其进行操作实现了读写拆散,非常适合在读多写少的场景下应用。

正文完
 0