很多场景下读是多于写的,咱们通过将读写锁拆散能够很大水平进步性能。
ReadWriteLock
public interface ReadWriteLock {Lock readLock();
Lock writeLock();}
ReadWriteLock 接口只定义了读锁和写锁两个办法,ReentrantReadWriteLock 对其进行了实现,咱们重点看下 ReentrantReadWriteLock 这个类。
ReentrantReadWriteLock 里重要属性
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;
ReentrantReadWriteLock 里重要外部类
// 同步器
abstract static class Sync extends AbstractQueuedSynchronizer {...}
// 非偏心同步器
static final class NonfairSync extends Sync{...}
// 偏心同步器
static final class FairSync extends Sync{...}
// 读锁,持有同步器 Sync
public static class ReadLock implements Lock, java.io.Serializable{
private final Sync sync;
...
}
// 写锁,持有同步器 Sync
public static class WriteLock implements Lock, java.io.Serializable{
private final Sync sync;
...
}
读写状态的设计
与读写状态相关联的有
- Sync 外部类里与的相干属性与办法:
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);//65536
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;//65535, 对应十六进制里的 0000FFFF
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;//65535
/** 读锁获取的次数 */
static int sharedCount(int c) {return c >>> SHARED_SHIFT;}
/** 写锁获取的次数 */
static int exclusiveCount(int c) {return c & EXCLUSIVE_MASK;}
- AbstractQueuedSynchronizer 类里的
private volatile int state;
AQS 就是在一个整型变量上保护了读写两种状态,高 16 位示意读,低 16 位示意写。通过位运算能够晓得一个线程获取读写锁的次数,读锁获取次数就是下面 sharedCount 办法计算的 c >>> SHARED_SHIFT,而写锁获取次数就是 exclusiveCount 办法计算的 c & EXCLUSIVE_MASK(将十六位抹去)。写锁状态减少 1 次为 state+1,读状态减少 1 次为 state+(1<<16)。当 state 不等于 0,当写状态 c & EXCLUSIVE_MASK 等于 0,读状态 c >>> SHARED_SHIFT 大于 0 时,示意读锁被获取。
构造方法
能够结构偏心与非偏心两种锁,默认为非偏心。
public ReentrantReadWriteLock() {this(false);
}
public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
读锁获取的实现
读锁是一个反对重入的共享锁,可能同时被多个线程获取,获取读锁次要是依照共享模式来获取锁的,大体流程差不多,tryAcquireShared 略有差别。
/**
* private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
* private Lock readLock = readWriteLock.readLock();
* readLock.lock();
*/
public void lock() {sync.acquireShared(1);
}
public final void acquireShared(int arg) {
// 胜利为 1,失败为 -1
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();
int c = getState();
// 其它线程获取了写锁,须要退出期待队列
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);// 读锁获取次数
// 以后读状态获取不需阻塞且获取次数小于 65535,则 CAS 批改状态获取读锁
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 第一个获取读锁线程与以后线程雷同,则读锁线程持有器累加
firstReaderHoldCount++;
} else {
// 其它线程获取的读锁,应用其它线程的 threadlocal 累加获取次数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 以后线程获取读锁须要被阻塞;获取次数超次;CAS 失败 这三种状况会
// 进入到 fullTryAcquireShared 办法从新获取一次读锁
return fullTryAcquireShared(current);
}
对于 readerShouldBlock()办法在偏心锁与非偏心锁有不同的实现,返回 true(示意须要阻塞)。
- 偏心锁下须要阻塞:头节点的下一节点对应的线程不是以后线程,阐明曾经早有其它线程在排队了,依照 FIFO 的程序,以后线程须要排队以示公平。这个能够防止饥饿。
- 非偏心锁下须要阻塞:写状态已被获取,且头节点下一节点对应的排队线程是要获取写锁,那以后线程也得排队。
读锁开释的实现
/**
* private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
* private Lock readLock = readWriteLock.readLock();
* readLock.unlock();
*/
public void unlock() {sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();
// 上面的判断是已获取读锁线程是否以后线程,后果都将获取次数递加
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {readHolds.remove();
if (count <= 0)
// 线程没锁却去开释,须要抛异样
throw unmatchedUnlockException();}
--rh.count;
}
for (;;) {int c = getState();
int nextc = c - SHARED_UNIT;
//CAS 批改读数状态,只有所有读锁都开释了 (为 0) 才返回 true
// 返回 true 才会唤醒后继节点
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
private void doReleaseShared() {for (;;) {
Node h = head;
// 同步队列不为空
if (h != null && h != tail) {
int ws = h.waitStatus;
// 判断后续节点是否须要唤醒
if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);// 唤醒
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
好了,下面写了读锁的获取与开释,这里进行下总结:
读锁的获取:
尝试应用 tryAcquireShared 进行读锁获取
- 胜利则执行业务逻辑;
-
失败:
- 如果其它线程获取了写锁,返回失败。
- 如果是偏心锁且排在后面的线程是其它线程,因为不能插队,返回失败;如果是非偏心锁且排在后面的线程是想获取写锁,因为写锁的排它性,也返回失败。
- 获取次数超过 65535,返回失败。
- CAS 设置状态没胜利,返回失败。
- 在 fullTryAcquireShared()再次获取锁,如果不胜利,返回失败。
- 下面几步都没胜利的话,在 doAcquireShared 还会进行获取,如果没胜利,退出到期待队列里。
读锁的开释:
- 计数器减。
- CAS 设置状态。
- 唤醒后继节点。
写锁的获取
写锁是一个反对重进入的排它锁。如果以后线程曾经获取了写锁,则减少写状态。如果以后线程在获取写锁时,读锁曾经获取或者是其它线程获取的写锁,则以后线程进入期待状态。
/**
* private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
* private Lock writeLock = readWriteLock.writeLock();
* writeLock.lock();
*/
public void lock() {sync.acquire(1);
}
public final void acquire(int arg) {if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();}
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();// 锁状态
int w = exclusiveCount(c);// 写锁获取次数
if (c != 0) {// 有线程获取了锁,可能是读锁也可能是写锁
// (Note: if c != 0 and w == 0 then shared count != 0)
// 有线程获取了读锁,或以后线程不是获取写锁的线程,返回失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)// 超次
throw new Error("Maximum lock count exceeded");
// Reentrant acquire 其它情景则视为获取写锁胜利
setState(c + acquires);
return true;
}
// 尝试获取写锁的线程是否须要阻塞,也分偏心与非偏心两种实现:// 偏心:判断是否有前驱节点(没判断是获取读锁还是写锁),有的话本人就须要阻塞排队,返回 false
// 非偏心:总是返回 false
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
写锁的开释
/**
* private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
* private Lock writeLock = readWriteLock.writeLock();
* writeLock.unlock();
*/
public void unlock() {sync.release(1);
}
public final boolean release(int arg) {if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);// 唤醒后继节点
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {if (!isHeldExclusively())// 获取写锁的线程是否是以后线程,不是抛异样
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);// 将获取写锁的线程设置为 null
setState(nextc);
return free;
}
StampedLock
https://www.cnblogs.com/admol…
https://segmentfault.com/a/11…