共计 7992 个字符,预计需要花费 20 分钟才能阅读完成。
序
本文主要研究一下 Elasticsearch 的 ReleasableLock
ReleasableLock
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java
public class ReleasableLock implements Releasable {
private final Lock lock;
// a per-thread count indicating how many times the thread has entered the lock; only works if assertions are enabled
private final ThreadLocal<Integer> holdingThreads;
public ReleasableLock(Lock lock) {
this.lock = lock;
if (Assertions.ENABLED) {holdingThreads = new ThreadLocal<>();
} else {holdingThreads = null;}
}
@Override
public void close() {lock.unlock();
assert removeCurrentThread();}
public ReleasableLock acquire() throws EngineException {lock.lock();
assert addCurrentThread();
return this;
}
private boolean addCurrentThread() {final Integer current = holdingThreads.get();
holdingThreads.set(current == null ? 1 : current + 1);
return true;
}
private boolean removeCurrentThread() {final Integer count = holdingThreads.get();
assert count != null && count > 0;
if (count == 1) {holdingThreads.remove();
} else {holdingThreads.set(count - 1);
}
return true;
}
public boolean isHeldByCurrentThread() {if (holdingThreads == null) {throw new UnsupportedOperationException("asserts must be enabled");
}
final Integer count = holdingThreads.get();
return count != null && count > 0;
}
}
- ReleasableLock 实现了 Releasable 接口 (
close 方法
);它的构造器要求输入 Lock 参数,只有在开启了 assertions 的条件下才会初始化 holdingThreads;isHeldByCurrentThread 方法判断调用线程是否正在使用 lock - acquire 方法首先调用 lock 的 lock 方法,然后利用 assert 来断言 addCurrentThread 方法,该方法会增加调用线程正在使用 lock 的次数
- close 方法首先调用 lock 的 unlock 方法,然后利用 assert 来断言 removeCurrentThread 方法,该方法会减少调用线程正在使用 lock 的次数
ReleasableLockTests
elasticsearch-7.0.1/server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.java
public class ReleasableLockTests extends ESTestCase {
/**
* Test that accounting on whether or not a thread holds a releasable lock is correct. Previously we had a bug where on a re-entrant
* lock that if a thread entered the lock twice we would declare that it does not hold the lock after it exits its first entrance but
* not its second entrance.
*
* @throws BrokenBarrierException if awaiting on the synchronization barrier breaks
* @throws InterruptedException if awaiting on the synchronization barrier is interrupted
*/
public void testIsHeldByCurrentThread() throws BrokenBarrierException, InterruptedException {final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
final ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock());
final ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock());
final int numberOfThreads = scaledRandomIntBetween(1, 32);
final int iterations = scaledRandomIntBetween(1, 32);
final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < numberOfThreads; i++) {final Thread thread = new Thread(() -> {
try {barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {throw new RuntimeException(e);
}
for (int j = 0; j < iterations; j++) {if (randomBoolean()) {acquire(readLock, writeLock);
} else {acquire(writeLock, readLock);
}
}
try {barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {throw new RuntimeException(e);
}
});
threads.add(thread);
thread.start();}
barrier.await();
barrier.await();
for (final Thread thread : threads) {thread.join();
}
}
private void acquire(final ReleasableLock lockToAcquire, final ReleasableLock otherLock) {try (@SuppressWarnings("unused") Releasable outer = lockToAcquire.acquire()) {assertTrue(lockToAcquire.isHeldByCurrentThread());
assertFalse(otherLock.isHeldByCurrentThread());
try (@SuppressWarnings("unused") Releasable inner = lockToAcquire.acquire()) {assertTrue(lockToAcquire.isHeldByCurrentThread());
assertFalse(otherLock.isHeldByCurrentThread());
}
// previously there was a bug here and this would return false
assertTrue(lockToAcquire.isHeldByCurrentThread());
assertFalse(otherLock.isHeldByCurrentThread());
}
assertFalse(lockToAcquire.isHeldByCurrentThread());
assertFalse(otherLock.isHeldByCurrentThread());
}
}
- ReleasableLockTests 使用多线程随机执行 acquire,该方法断言 lockToAcquire 被当前线程持有,而 otherLock 不被当前线程持有
Cache.CacheSegment
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/cache/Cache.java
private static class CacheSegment<K, V> {
// read/write lock protecting mutations to the segment
ReadWriteLock segmentLock = new ReentrantReadWriteLock();
ReleasableLock readLock = new ReleasableLock(segmentLock.readLock());
ReleasableLock writeLock = new ReleasableLock(segmentLock.writeLock());
Map<K, CompletableFuture<Entry<K, V>>> map = new HashMap<>();
SegmentStats segmentStats = new SegmentStats();
/**
* get an entry from the segment; expired entries will be returned as null but not removed from the cache until the LRU list is
* pruned or a manual {@link Cache#refresh()} is performed however a caller can take action using the provided callback
*
* @param key the key of the entry to get from the cache
* @param now the access time of this entry
* @param isExpired test if the entry is expired
* @param onExpiration a callback if the entry associated to the key is expired
* @return the entry if there was one, otherwise null
*/
Entry<K, V> get(K key, long now, Predicate<Entry<K, V>> isExpired, Consumer<Entry<K, V>> onExpiration) {
CompletableFuture<Entry<K, V>> future;
try (ReleasableLock ignored = readLock.acquire()) {future = map.get(key);
}
if (future != null) {
Entry<K, V> entry;
try {entry = future.get();
} catch (ExecutionException e) {assert future.isCompletedExceptionally();
segmentStats.miss();
return null;
} catch (InterruptedException e) {throw new IllegalStateException(e);
}
if (isExpired.test(entry)) {segmentStats.miss();
onExpiration.accept(entry);
return null;
} else {segmentStats.hit();
entry.accessTime = now;
return entry;
}
} else {segmentStats.miss();
return null;
}
}
/**
* put an entry into the segment
*
* @param key the key of the entry to add to the cache
* @param value the value of the entry to add to the cache
* @param now the access time of this entry
* @return a tuple of the new entry and the existing entry, if there was one otherwise null
*/
Tuple<Entry<K, V>, Entry<K, V>> put(K key, V value, long now) {Entry<K, V> entry = new Entry<>(key, value, now);
Entry<K, V> existing = null;
try (ReleasableLock ignored = writeLock.acquire()) {
try {CompletableFuture<Entry<K, V>> future = map.put(key, CompletableFuture.completedFuture(entry));
if (future != null) {existing = future.handle((ok, ex) -> {if (ok != null) {return ok;} else {return null;}
}).get();}
} catch (ExecutionException | InterruptedException e) {throw new IllegalStateException(e);
}
}
return Tuple.tuple(entry, existing);
}
/**
* remove an entry from the segment
*
* @param key the key of the entry to remove from the cache
* @param onRemoval a callback for the removed entry
*/
void remove(K key, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) {
CompletableFuture<Entry<K, V>> future;
try (ReleasableLock ignored = writeLock.acquire()) {future = map.remove(key);
}
if (future != null) {segmentStats.eviction();
onRemoval.accept(future);
}
}
/**
* remove an entry from the segment iff the future is done and the value is equal to the
* expected value
*
* @param key the key of the entry to remove from the cache
* @param value the value expected to be associated with the key
* @param onRemoval a callback for the removed entry
*/
void remove(K key, V value, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) {
CompletableFuture<Entry<K, V>> future;
boolean removed = false;
try (ReleasableLock ignored = writeLock.acquire()) {future = map.get(key);
try {if (future != null) {if (future.isDone()) {Entry<K, V> entry = future.get();
if (Objects.equals(value, entry.value)) {removed = map.remove(key, future);
}
}
}
} catch (ExecutionException | InterruptedException e) {throw new IllegalStateException(e);
}
}
if (future != null && removed) {segmentStats.eviction();
onRemoval.accept(future);
}
}
private static class SegmentStats {private final LongAdder hits = new LongAdder();
private final LongAdder misses = new LongAdder();
private final LongAdder evictions = new LongAdder();
void hit() {hits.increment();
}
void miss() {misses.increment();
}
void eviction() {evictions.increment();
}
}
}
- CacheSegment 使用 ReentrantReadWriteLock 的 readLock 及 writeLock 创建了两个 ReleasableLock,一个为 readLock,一个为 writeLock;由于 ReleasableLock 实现了 Releasable 接口 (
close 方法
),而该接口继承了 java.lang.AutoCloseable 接口,因而可以直接利用 try with resources 语法来自动 close,从而释放锁
小结
- ReleasableLock 实现了 Releasable 接口 (
close 方法
);它的构造器要求输入 Lock 参数,只有在开启了 assertions 的条件下才会初始化 holdingThreads;isHeldByCurrentThread 方法判断调用线程是否正在使用 lock - acquire 方法首先调用 lock 的 lock 方法,然后利用 assert 来断言 addCurrentThread 方法,该方法会增加调用线程正在使用 lock 的次数
- close 方法首先调用 lock 的 unlock 方法,然后利用 assert 来断言 removeCurrentThread 方法,该方法会减少调用线程正在使用 lock 的次数
ReleasableLock 实现了 Releasable 接口 (
close 方法
),而该接口继承了 java.lang.AutoCloseable 接口,因而可以直接利用 try with resources 语法来自动 close,从而释放锁
doc
- ReleasableLock
正文完
发表至:无分类
2019-06-13