关于java:Java并发学习笔记ReentrantReadWriteLock良心之作

2次阅读

共计 13213 个字符,预计需要花费 34 分钟才能阅读完成。

前言

上一篇博客介绍了 ReentrantLock,可是 ReentratLock 是独占锁,遇到写少读多的状况,性能会不尽人意。JUC 包还提供了一个 ReentrantReadWriteLock 锁,采纳读写拆散的形式,多个线程能够同时获取读锁。本文将从源码角度对 ReentrantReadWriteLock(以下简称读写锁)的初始化,获取,开释等进行解释和记录。

锁的初始化

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    private static final long serialVersionUID = -6992448646407690164L;
    // 读锁对象
    private final ReentrantReadWriteLock.ReadLock readerLock;
    // 写锁对象
    private final ReentrantReadWriteLock.WriteLock writerLock;
    final Sync sync;
    public ReentrantReadWriteLock() {this(false);
    }

    public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

  能够看出,读写锁外部有 readerLock 和 writerLock 两个属性,别离用来示意读锁和写锁;和 ReentrantLock 一样,同样有一个 Sync 类型的对象 sync 用来进行锁的具体操作。在初始化时,默认是用非偏心形式实现(NonfairSync 和 FairSync 都是 Sync 的子类,这块和 ReentrantLock 相似)

  上面看一下 readerLock 和 writerLock 这两个属性:

// 读锁
public static class ReadLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = -5992448646407690164L;
    private final Sync sync;

    ...
    protected ReadLock(ReentrantReadWriteLock lock) {sync = lock.sync;}

// 写锁
public static class WriteLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = -4992448646407690164L;
    private final Sync sync;

    ...
    protected WriteLock(ReentrantReadWriteLock lock) {sync = lock.sync;}

  可见,它们都有一个 Sync 类型的属性 sync,初始化时,会将 ReentrantReadWriteLock 对象的 sync 赋给本人的 sync 属性(因为调用它们构造函数的语句为readerLock = new ReadLock(this); writerLock = new WriteLock(this);)。

public void lock() {sync.acquireShared(1);
}

public void lockInterruptibly() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}

  下面是 ReadLock 里的局部办法,能够看出,也是通过 sync 来实现一些性能。WriteLock 同理。

读锁,写锁状态的获取和更新

  读写锁有读锁和写锁两个状态,然而读写锁并没有定义什么新变量,而是沿用 AQS 里的 state 变量,用变量的高 16 位示意读状态,低 16 位示意写状态,之所以能够这么做,是因为一个 int 变量是 4 字节,32 位。上面是进行读,写状态获取的代码:

// 定义了一些常量,用于读状态和写状态的操作
static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count. */
static int sharedCount(int c)    {return c >>> SHARED_SHIFT;}
/** Returns the number of exclusive holds represented in count. */
static int exclusiveCount(int c) {return c & EXCLUSIVE_MASK;}

  首先看读状态,代码为return c >>> SHARED_SHIFT;,这个 SHARED_SHIFT 是 16,也就是把 c 右移 16 位,把一个 32 位变量右移 16 位,也就是读到了此变量的高 16 位。

// 开释读锁的局部代码
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))  
...
// 获取读锁的局部代码
compareAndSetState(c, c + SHARED_UNIT))

  以上是对读锁进行获取和开释的局部代码,在这部分,对读锁状态的更新次要通过对 SHARED_UNIT 这个变量的加减来操作。这个变量的值为 (1 << SHARED_SHIFT) 也就是 10000000000000000(二进制,1 前面跟 16 个 0),能够看出每次加减这个 SHARED_UNIT,对低 16 位齐全没有影响。

  接着看写锁。获取写锁状态的代码为return c & EXCLUSIVE_MASK;,这个 EXCLUSIVE_MASK 的值为 (1 << SHARED_SHIFT) – 1; 也就是 1111111111111111(二进制,16 个 1),高 16 位全副补为 0,和一个 32 位的变量进行 & 操作,失去的数高 16 位天然全是 0,低 16 位不变,原来是 0 还是 0,原来是 1 还是 1。

// 开释写锁的局部代码
int nextc = getState() - releases;
setState(nextc);
... 
// 开释写锁的局部代码
setState(c + acquires);

  可见,写锁的状态间接进行加减操作,毕竟它操作的是低 16 位。
然而,这样子也有肯定危险,毕竟 16 位示意的数字无限,所以前面经常出现这种代码:

if (w + exclusiveCount(acquires) > MAX_COUNT)
    throw new Error("Maximum lock count exceeded");  
...
if (sharedCount(c) == MAX_COUNT)
    throw new Error("Maximum lock count exceeded");

  MAX_COUNT 值为 (1 << SHARED_SHIFT) – 1,也就是 1111111111111111(二进制,16 个 1),这是 16 位示意的极限了。所以超过时,会抛出异样。

  不得不说,这块操作还是很奇妙的。如果我本人实现,必定是从新定义两个属性,一个示意读,一个示意写。人家这还是用 state 变量,用这种奇妙的操作让一个变量示意两个状态,这样子还能持续用对于 state 的函数,的确很厉害。而且 MAX_COUNT 十进制是 65536,感觉应该不会有什么程序锁的计数能达到这么多吧。。。

写锁

写锁的获取

注:对于写锁的获取和开释不思考 lockInterruptibly()public boolean tryLock(long timeout, TimeUnit unit) 这些,只思考最根本状况。那些其它状况有空会独自开一篇总结,读锁同理

public void lock() {sync.acquireShared(1);
}  
// 位于 AQS 中
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)
        acquire(null, arg, true, false, false, 0L);
}  
// 位于 Sync 中
protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {// (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

  下面的调用关系很简略,和 ReentrantLock 相似,就不介绍了。次要看 tryAcquire()函数。首先是 if (w == 0 || current != getExclusiveOwnerThread()),此时返回 false。留神了,这是在if (c != 0) 的前提下的,c!=0,w 却等于 0,阐明此时有线程持有读锁,那么不能获取;或者尽管 w != 0,然而写锁的持有线程不是本线程,还是不能获取。接下来if (w + exclusiveCount(acquires) > MAX_COUNT),就是判断会不会超过示意范畴;如果以上分支都不满足,那么就阐明这个锁正被以后线程持有着写锁,并且不会溢出,那么间接设置 state 即可,因为是重入获取锁,也不须要设置 ExclusiveOwnerThread。而且此时调用的是 setState(),因为这种状况下,锁被本线程持有,不存在多线程竞争的状况,不须要 CAS 操作。

  如果 c 不是 0,阐明锁没被任何线程持有。那么首先调用 writerShouldBlock() 查看本线程是否须要阻塞,接着再 CAS 形式设置锁的状态,如果胜利,再调用 setExclusiveOwnerThread(current);来设置锁的持有者。这里之所以用 CAS 操作,是因为此时锁没被任何线程持有,故须要竞争。

  看一下writerShouldBlock()

// 非偏心锁
final boolean writerShouldBlock() {return false; // writers can always barge}  
// 偏心锁
final boolean writerShouldBlock() {return hasQueuedPredecessors();
}

  非偏心锁间接返回 false,偏心锁调用 AQS 里的 hasQueuedPredecessors(); 判断以后线程是否有前驱节点。这是一个逻辑表达式的短路操作。如果是非偏心锁,if (writerShouldBlock() || !compareAndSetState(c, c + acquires))前一个条件返回 false,那么要进行第二个条件的判断,尝试 CAS 设置锁;如果是偏心锁,调用 hasQueuedPredecessors(); 如果返回 true,因为是 || 操作,后一个条件不必判断了,这个逻辑表达式间接返回 true,否则,才会走下一个条件。如果把 || 改为 | 就不行了。这个性能如果让我写,我必定是 if if if,源代码的作者奇妙利用短路的操作,精简了代码,程度的确高啊。

public boolean tryLock() {return sync.tryWriteLock();
}  
...
final boolean tryWriteLock() {Thread current = Thread.currentThread();
    int c = getState();
    if (c != 0) {int w = exclusiveCount(c);
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    if (!compareAndSetState(c, c + 1))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

  还有 tryLock()。和一般的 lock()相似。不同在于:

  • 当 c = 0 时,并没有调用 writerShouldBlock() 函数,间接进行了 CAS 设置锁的状态
  • 调用之后间接返回 true or false,不会进入阻塞队列

写锁的开释

public void unlock() {sync.release(1);
}  
// 位于 AQS 中
public final boolean release(int arg) {if (tryRelease(arg)) {signalNext(head);
        return true;
    }
    return false;
}
// 位于 Sync 中
protected final boolean tryRelease(int releases) {if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

  这块和 ReentrantLock 相似。在 tryRelease 里先是查看以后线程是否真正持有锁,如果都没有持有,那还开释个啥。接着用 free = exclusiveCount(nextc) == 0; 来示意锁是不是开释洁净了,如果是,在 AQS 里会 signalNext(head); 唤起下一个线程。

  总的来说,写锁这部分和 ReentrantLock 相似,没什么太难的中央。

读锁

读锁的获取

尝试获取

public void lock() {sync.acquireShared(1);
}
// 位于 AQS 中
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)
        acquire(null, arg, true, false, false, 0L);
}
// 位于 Sync 中
protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 本线程是让读锁从 0 到 1 的线程
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        // 本线程重入形式获取,并且是第一个获取的线程
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        // 本线程不是第一个获取读锁的线程
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null ||
                rh.tid != LockSupport.getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}

  调用关系比较简单,不用说了。次要关注 tryAcquireShared 函数。首先是进行逻辑判断if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) 如果有其它线程持有写锁,那么返回 -1。本人持有写锁是没问题的,能够往下走。

  接下来又是一个短路操作:if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)),当 !readerShouldBlock()r < MAX_COUNT均为 true,就会进行第三个判断,也就是 CAS 设置锁。当三个条件都为真,那么意味着锁设置胜利了,会执行代码块里那段看起来不明觉厉的代码。当然三个条件都满足还是挺难的,所以如果这个逻辑表达式不成立,会调用 fullTryAcquireShared(current) 进行进一步获取。可见,tryAcquireShared只是进行一次尝试。

  接下来看逻辑表达式为真的那一段代码。

private transient Thread firstReader;
private transient int firstReaderHoldCount;
static final class HoldCounter {
    int count;          // initially 0
    // Use id, not reference, to avoid garbage retention
    final long tid = LockSupport.getThreadId(Thread.currentThread());
}

static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {public HoldCounter initialValue() {return new HoldCounter();
    }
}

private transient ThreadLocalHoldCounter readHolds;
private transient HoldCounter cachedHoldCounter;

  这是那段代码中呈现的变量的定义。fistReader 是用来记录第一个获取到读锁的线程,fitstReaderHoldCount 是记录此线程的持有数(ReentrantReadWriteLock 也是可重入的);readHolds 是 ThreadLocalHoldCounter 的对象,而 ThreadLocalHoldCounter 是 ThreadLocal 的子类。这个 ThreadLocal 里装的是 HoldCounter 类的对象,这个 HoldCounter 类里别离是持有数量和持有线程的 id。看起来真是挺晕的(((φ(◎ロ◎;)φ)))。

  接下来剖析这段代码,先是 r = 0 分支,此时,本线程是第一个让此读锁计数从 0 到 1 的线程,所以进行 firstReader 和 firstReaderHoldCount 的设置;否则,如果这个读锁的第一个持有的线程就是本线程,那么间接 ++firstReaderCount 即可,也很正当。这两个中央的代码也没有进行同步解决,因为 r 是之前的读锁值,在进入 r = 0 分支时,CAS 设置读锁状态曾经胜利,所以其它线程再进来读也必定到不了 r = 0 这个分支了;对于else if (firstReader == current) 分支,必定也只有本线程 =firstReader 时,能力触发,这两个分支不存在和其它线程的竞争。

  如果以上两个条件都不满足,那么这个线程就是第二个及当前取得读锁的线程。这个时候,这个线程的读锁计数就由它本人保护了。这个分支里的代码就是对此线程的读锁计数进行一番操作。首先是 HoldCounter rh = cachedHoldCounter;,有些书上说 cachedHoldCounter 是记录最初一个获取读锁的线程。我感觉也未必吧,毕竟这个变量也不是 volatile 的,无奈保障可见性,你读到的未必就是真正最初一个获取的。先进行if (rh == null || rh.tid != LockSupport.getThreadId(current)) 判断,如果不满足,也就是从 cachedHoldCounter 获取到的 rh 正好就是本线程的;如果 rh 不是本线程的,通过 cachedHoldCounter = rh = readHolds.get();设置之后,rh 也成为了本线程的 HoldCounter 变量。

  之后是 else if (rh.count == 0) 分支,如果能够进入这个分支,也就意味着 cachedHoldCounter 保留的 HoldCounter 对象的确是本线程的,然而对象里对应的 count 却为 0。那么为什么会呈现这种状况呢?因为读锁的开释过程并没有革除 cachedHoldCounter 的代码。所以是 cachedHoldCounter 对应的线程之前的读锁被开释过一次,这个线程又再次来获取读锁,所以把这个原本就属于它的 HoldCounter 变量再赋给它。

  总之不管怎样,当执行到 rh.count++; 这条语句时,rh 对应的肯定是本线程的 HoldCount 对象。把它的计数自增一个。

  这块代码我看的时候属实好受啊,看了很久才看明确。其实没有firstReader,firstReaderHoldCount,cachedHoldCounter 也不是不行。反正 HoldCounter 是 ThreadLocal 的,每个线程都有,从本人线程读也能够。然而可能那样读取效率有些低,所以这里设置了一点相当于缓存的变量,如果这些变量命中了,就不须要去本人线程读了。你看它命名也能看进去:cachedHoldCounter

齐全获取

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {int c = getState();
        // 查看写锁是否被持有
        if (exclusiveCount(c) != 0) {if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        // 查看此线程是否已取得过读锁
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {// assert firstReaderHoldCount > 0;} else {if (rh == null) {
                // 尝试先以 cachedHoldCounter 形式获取线程的 HolderCounter 对象
                    rh = cachedHoldCounter;
                // 如果 cachedHoldCounter 没有获取到,再从 ThreadLocal 里获取
                    if (rh == null ||
                        rh.tid != LockSupport.getThreadId(current)) {rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();}
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 进行 CAS 设置读锁计数
        if (compareAndSetState(c, c + SHARED_UNIT)) {if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {firstReaderHoldCount++;} else {if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null ||
                    rh.tid != LockSupport.getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

  代码首先查看此锁的写锁是否被持有。从代码能够看进去,当写锁被本线程持有时,是能够再获取读锁的;如果是其它线程持有写锁,则返回 -1。

  接下来进入到 else if (readerShouldBlock()) { 分支。进入此分支阐明写锁没有被其它线程持有,然而这个线程获取读锁须要被阻塞。

// 非偏心锁实现
final boolean readerShouldBlock() {return apparentlyFirstQueuedIsExclusive();
}  
// 偏心锁实现
final boolean readerShouldBlock() {return hasQueuedPredecessors();
}

  这都是 AQS 类里定义的函数,这里不细说了。那为啥这种状况还有一堆代码呢?为啥不间接返回 -1?

if (rh.count == 0)
    return -1;

  这块的要害代码其实在这。这部分是判断本线程是否曾经持有了读锁,从源代码来看,Java 的开发者认为如果是重入形式获取读锁,即便 readerShouldBlock() 为真,也能够去下一部分获取。如果 if (firstReader == current) 为真,那必定是重入获取的,能够进行下一步;否则又是用 cachedHoldCounter 来尝试命中缓存,没有命中,就从本人线程本地读取 HoldCounter 对象,这块之前曾经解释了。

  如果这一段代码都没有 return,那么阐明这个线程能够容许获取读锁,于是进行 CAS 操作来设置读锁的状态。如果能够进入到 if (compareAndSetState(c, c + SHARED_UNIT)) 分支,阐明曾经获取胜利了,和尝试获取相似,把线程对应 的 HoldCounter 的计数自增一个。否则,留神到整个代码包在一个for (;;) 里,线程会一直尝试 CAS 操作。

  读锁的获取还有一个tryReadLock(),就是一直循环获取,代码根本一样。

public boolean tryLock() {return sync.tryReadLock();
}  
// 位于 Sync 中
final boolean tryReadLock() {Thread current = Thread.currentThread();
    for (;;) {int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return false;
        int r = sharedCount(c);
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {firstReaderHoldCount++;} else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null ||
                    rh.tid != LockSupport.getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return true;
        }
    }
}

读锁的开释

public void unlock() {sync.releaseShared(1);
}  
// 位于 AQS 中
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {signalNext(head);
        return true;
    }
    return false;
}
位于 Sync 中
protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null ||
            rh.tid != LockSupport.getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();}
        --rh.count;
    }
    for (;;) {int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

  首先是if (firstReader == current) 分支,进入到这个分支阐明本线程是第一个取得读锁的线程,间接操作 firstReader 和 firstReaderHoldCounter 即可。

  if (firstReader == current) 对应的 else 分支又进行了一番操作,其实就是把线程对应的读锁计数 -1。在此过程中,如果发现 if (count <= 1),阐明,这个锁计数行将为 0,须要开释,所以进行了readHolds.remove(); 操作,把线程的 HoldCounter 对象革除。如果发现if (count <= 0),此时基本没有可开释的货色,抛出了异样。

  之后是循环 CAS 设置读锁状态。

其它办法

  到此为止,读写锁里比拟艰难的代码曾经解释完了,还有一些很简略的办法诸如以下:

public final boolean isFair() {return sync instanceof FairSync;}
...
protected Thread getOwner() {return sync.getOwner();
}
// other methods

  这些都很简略,没什么可说的。

总结

  本文对 ReentrantReadWriteLock 进行了源码级别的学习和记录,感觉 ReentrantReadWriteLock 次要的难点在于读锁的获取和开释。这是集体学习过程中的心得和领会,如果有了解的不全面的中央,欢送评论区交换。

正文完
 0