java并发编程学习之再谈ReentrantReadWriteLock

33次阅读

共计 5219 个字符,预计需要花费 14 分钟才能阅读完成。

之前 ReentrantReadWriteLock 讲了读写锁的场景,这边来讲他的源码,以非公平锁为例,其实和公平锁主要代码是一致的。

Sync 类

static final int SHARED_SHIFT   = 16;// 高 16 位是共享,用于读,低 16 位是独占,用于写,用一个字段保证原子性
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);// 左移 16 位,也就是高位的最后一个是 0000 0000 0000 0001 0000 0000 0000 0000
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;// 最大读的数量,正常不会这么多
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;// 左移 16 位,再减 1,也就是 0000 0000 0000 0000 1111 1111 1111 1111
static int sharedCount(int c)    {return c >>> SHARED_SHIFT;}// 无符号右移 16 位
static int exclusiveCount(int c) {return c & EXCLUSIVE_MASK;}// 返回的不为 1,说明有写锁,因为低 16 位都是 1,1 与 1 为 1,如果有些,肯定有个为 1
static final class HoldCounter {// 每个线程持有的锁的数量
    int count = 0;
    // Use id, not reference, to avoid garbage retention
    final long tid = getThreadId(Thread.currentThread());
}
static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {// 本地线程
    public HoldCounter initialValue() {return new HoldCounter();
    }
}
private transient ThreadLocalHoldCounter readHolds;// 本地线程,记录持有的锁的数量信息
private transient HoldCounter cachedHoldCounter;// 缓存 HoldCounter 的数据
private transient Thread firstReader = null;// 第一个获取读锁的线程
private transient int firstReaderHoldCount;// 第一个获取读锁的线程持有的数量 

读锁的 lock 方法

public void lock() {sync.acquireShared(1);
}
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)// 小于 0 没获取到锁
        doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();// 获取本地线程
    int c = getState();// 获取 state 的值
    if (exclusiveCount(c) != 0 &&// 不为 0 说明有写锁,原因上面分析了
        getExclusiveOwnerThread() != current)// 不是当前线程,说明不是重入
        return -1;
    int r = sharedCount(c);// 获取读锁的个数
    if (!readerShouldBlock() &&// 读锁无堵塞
        r < MAX_COUNT &&// 读锁没到最大值
        compareAndSetState(c, c + SHARED_UNIT)) {//cas 操作,高位加 1 成功说明获取到了读锁
        if (r == 0) {// 等于 0 说明第一个获取读锁
            firstReader = current;// 当前线程就是第一个
            firstReaderHoldCount = 1;// 数量为 1
        } else if (firstReader == current) {// 如果不是第一个,但是是当前线程
            firstReaderHoldCount++;// 数量加 1
        } else {// 既不是第一个,也不是当前线程
            HoldCounter rh = cachedHoldCounter;// 获取缓存 HoldCounter 
            if (rh == null || rh.tid != getThreadId(current))// 如果不为空,或者通过线程 id 对比不是当前线程
                cachedHoldCounter = rh = readHolds.get();// 缓存设置为当前线程
            else if (rh.count == 0)// 缓存的是当前线程,而且锁的数量为 0,加入到本地缓存,如果数量不为 0,说明已经在本地缓存了
                readHolds.set(rh);
            rh.count++;// 锁的数量加 1
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}
// 如果阻塞或者 cas 失败的情况,再重试获取锁
final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {//
        int c = getState();
        if (exclusiveCount(c) != 0) {// 上面分析了,如果是写锁,并且不是当前线程,放弃
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {// 阻塞的情况
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {// 当前线程是第一个不处理
                // assert firstReaderHoldCount > 0;
            } else {if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();// 如果是 0,移除本地缓存}
                }
                if (rh.count == 0)
                    return -1;//
            }
        }
        if (sharedCount(c) == MAX_COUNT)// 读锁数量太大,抛异常
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {if (sharedCount(c) == 0) {// 没有读锁
                firstReader = current;// 当前线程就是第一个
                firstReaderHoldCount = 1;// 数量为 1
            } else if (firstReader == current) {// 如果不是第一个,但是是当前线程
                firstReaderHoldCount++;// 数量加 1
            } else {// 既不是第一个,也不是当前线程
                if (rh == null)
                    rh = cachedHoldCounter;// 获取缓存 HoldCounter 
                if (rh == null || rh.tid != getThreadId(current))// 如果不为空,或者通过线程 id 对比不是当前线程
                    rh = readHolds.get();// 缓存设置为当前线程
                else if (rh.count == 0)// 缓存的是当前线程,而且锁的数量为 0,加入到本地缓存,如果数量不为 0,说明已经在本地缓存了
                    readHolds.set(rh);
                rh.count++;// 锁的数量加 1
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

读锁的 unlock 方法

public void unlock() {sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();
        return true;
    }
    return false;
}
protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();
    if (firstReader == current) {// 如果第一个是当前线程
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)// 如果数量为 1,就是直接设为空
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {// 数量小于等于 1,移除
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();}
        --rh.count;
    }
    for (;;) {int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;// 写锁和读锁为 0,无锁
    }
}

写锁的 lock 方法

public void lock() {sync.acquire(1);
}
public final void acquire(int arg) {if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 失败了就进入阻塞队列
        selfInterrupt();}
protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();// 获取当前线程
    int c = getState();// 获取 state
    int w = exclusiveCount(c);// 不为 0 说明有写锁
    if (c != 0) {// 有读或者写锁
        // 无写锁或者读锁被占
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() ||// 无阻塞
        !compareAndSetState(c, c + acquires))// 设置成功
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

写锁的 unlock 方法

public void unlock() {sync.release(1);
}
public final boolean release(int arg) {if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);// 唤醒
        return true;
    }
    return false;
}
protected final boolean tryRelease(int releases) {if (!isHeldExclusively())// 不是独占锁,抛异常
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;// 写锁都释放了
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

正文完
 0