本文主要研究一下Elasticsearch的ReleasableLock

ReleasableLock

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java

public class ReleasableLock implements Releasable {    private final Lock lock;    // a per-thread count indicating how many times the thread has entered the lock; only works if assertions are enabled    private final ThreadLocal<Integer> holdingThreads;    public ReleasableLock(Lock lock) {        this.lock = lock;        if (Assertions.ENABLED) {            holdingThreads = new ThreadLocal<>();        } else {            holdingThreads = null;        }    }    @Override    public void close() {        lock.unlock();        assert removeCurrentThread();    }    public ReleasableLock acquire() throws EngineException {        lock.lock();        assert addCurrentThread();        return this;    }    private boolean addCurrentThread() {        final Integer current = holdingThreads.get();        holdingThreads.set(current == null ? 1 : current + 1);        return true;    }    private boolean removeCurrentThread() {        final Integer count = holdingThreads.get();        assert count != null && count > 0;        if (count == 1) {            holdingThreads.remove();        } else {            holdingThreads.set(count - 1);        }        return true;    }    public boolean isHeldByCurrentThread() {        if (holdingThreads == null) {            throw new UnsupportedOperationException("asserts must be enabled");        }        final Integer count = holdingThreads.get();        return count != null && count > 0;    }}
  • ReleasableLock实现了Releasable接口(close方法);它的构造器要求输入Lock参数,只有在开启了assertions的条件下才会初始化holdingThreads;isHeldByCurrentThread方法判断调用线程是否正在使用lock
  • acquire方法首先调用lock的lock方法,然后利用assert来断言addCurrentThread方法,该方法会增加调用线程正在使用lock的次数
  • close方法首先调用lock的unlock方法,然后利用assert来断言removeCurrentThread方法,该方法会减少调用线程正在使用lock的次数

ReleasableLockTests

elasticsearch-7.0.1/server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.java

public class ReleasableLockTests extends ESTestCase {    /**     * Test that accounting on whether or not a thread holds a releasable lock is correct. Previously we had a bug where on a re-entrant     * lock that if a thread entered the lock twice we would declare that it does not hold the lock after it exits its first entrance but     * not its second entrance.     *     * @throws BrokenBarrierException if awaiting on the synchronization barrier breaks     * @throws InterruptedException   if awaiting on the synchronization barrier is interrupted     */    public void testIsHeldByCurrentThread() throws BrokenBarrierException, InterruptedException {        final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();        final ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock());        final ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock());        final int numberOfThreads = scaledRandomIntBetween(1, 32);        final int iterations = scaledRandomIntBetween(1, 32);        final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);        final List<Thread> threads = new ArrayList<>();        for (int i = 0; i < numberOfThreads; i++) {            final Thread thread = new Thread(() -> {                try {                    barrier.await();                } catch (final BrokenBarrierException | InterruptedException e) {                    throw new RuntimeException(e);                }                for (int j = 0; j < iterations; j++) {                    if (randomBoolean()) {                        acquire(readLock, writeLock);                    } else {                        acquire(writeLock, readLock);                    }                }                try {                    barrier.await();                } catch (final BrokenBarrierException | InterruptedException e) {                    throw new RuntimeException(e);                }            });            threads.add(thread);            thread.start();        }        barrier.await();        barrier.await();        for (final Thread thread : threads) {            thread.join();        }    }    private void acquire(final ReleasableLock lockToAcquire, final ReleasableLock otherLock) {        try (@SuppressWarnings("unused") Releasable outer = lockToAcquire.acquire()) {            assertTrue(lockToAcquire.isHeldByCurrentThread());            assertFalse(otherLock.isHeldByCurrentThread());            try (@SuppressWarnings("unused") Releasable inner = lockToAcquire.acquire()) {                assertTrue(lockToAcquire.isHeldByCurrentThread());                assertFalse(otherLock.isHeldByCurrentThread());            }            // previously there was a bug here and this would return false            assertTrue(lockToAcquire.isHeldByCurrentThread());            assertFalse(otherLock.isHeldByCurrentThread());        }        assertFalse(lockToAcquire.isHeldByCurrentThread());        assertFalse(otherLock.isHeldByCurrentThread());    }}
  • ReleasableLockTests使用多线程随机执行acquire,该方法断言lockToAcquire被当前线程持有,而otherLock不被当前线程持有

Cache.CacheSegment

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/cache/Cache.java

    private static class CacheSegment<K, V> {        // read/write lock protecting mutations to the segment        ReadWriteLock segmentLock = new ReentrantReadWriteLock();        ReleasableLock readLock = new ReleasableLock(segmentLock.readLock());        ReleasableLock writeLock = new ReleasableLock(segmentLock.writeLock());        Map<K, CompletableFuture<Entry<K, V>>> map = new HashMap<>();        SegmentStats segmentStats = new SegmentStats();        /**         * get an entry from the segment; expired entries will be returned as null but not removed from the cache until the LRU list is         * pruned or a manual {@link Cache#refresh()} is performed however a caller can take action using the provided callback         *         * @param key       the key of the entry to get from the cache         * @param now       the access time of this entry         * @param isExpired test if the entry is expired         * @param onExpiration a callback if the entry associated to the key is expired         * @return the entry if there was one, otherwise null         */        Entry<K, V> get(K key, long now, Predicate<Entry<K, V>> isExpired, Consumer<Entry<K, V>> onExpiration) {            CompletableFuture<Entry<K, V>> future;            try (ReleasableLock ignored = readLock.acquire()) {                future = map.get(key);            }            if (future != null) {                Entry<K, V> entry;                try {                    entry = future.get();                } catch (ExecutionException e) {                    assert future.isCompletedExceptionally();                    segmentStats.miss();                    return null;                } catch (InterruptedException e) {                    throw new IllegalStateException(e);                }                if (isExpired.test(entry)) {                    segmentStats.miss();                    onExpiration.accept(entry);                    return null;                } else {                    segmentStats.hit();                    entry.accessTime = now;                    return entry;                }            } else {                segmentStats.miss();                return null;            }        }        /**         * put an entry into the segment         *         * @param key   the key of the entry to add to the cache         * @param value the value of the entry to add to the cache         * @param now   the access time of this entry         * @return a tuple of the new entry and the existing entry, if there was one otherwise null         */        Tuple<Entry<K, V>, Entry<K, V>> put(K key, V value, long now) {            Entry<K, V> entry = new Entry<>(key, value, now);            Entry<K, V> existing = null;            try (ReleasableLock ignored = writeLock.acquire()) {                try {                    CompletableFuture<Entry<K, V>> future = map.put(key, CompletableFuture.completedFuture(entry));                    if (future != null) {                        existing = future.handle((ok, ex) -> {                            if (ok != null) {                                return ok;                            } else {                                return null;                            }                        }).get();                    }                } catch (ExecutionException | InterruptedException e) {                    throw new IllegalStateException(e);                }            }            return Tuple.tuple(entry, existing);        }        /**         * remove an entry from the segment         *         * @param key       the key of the entry to remove from the cache         * @param onRemoval a callback for the removed entry         */        void remove(K key, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) {            CompletableFuture<Entry<K, V>> future;            try (ReleasableLock ignored = writeLock.acquire()) {                future = map.remove(key);            }            if (future != null) {                segmentStats.eviction();                onRemoval.accept(future);            }        }        /**         * remove an entry from the segment iff the future is done and the value is equal to the         * expected value         *         * @param key the key of the entry to remove from the cache         * @param value the value expected to be associated with the key         * @param onRemoval a callback for the removed entry         */        void remove(K key, V value, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) {            CompletableFuture<Entry<K, V>> future;            boolean removed = false;            try (ReleasableLock ignored = writeLock.acquire()) {                future = map.get(key);                try {                    if (future != null) {                        if (future.isDone()) {                            Entry<K, V> entry = future.get();                            if (Objects.equals(value, entry.value)) {                                removed = map.remove(key, future);                            }                        }                    }                } catch (ExecutionException | InterruptedException e) {                    throw new IllegalStateException(e);                }            }            if (future != null && removed) {                segmentStats.eviction();                onRemoval.accept(future);            }        }        private static class SegmentStats {            private final LongAdder hits = new LongAdder();            private final LongAdder misses = new LongAdder();            private final LongAdder evictions = new LongAdder();            void hit() {                hits.increment();            }            void miss() {                misses.increment();            }            void eviction() {                evictions.increment();            }        }    }
  • CacheSegment使用ReentrantReadWriteLock的readLock及writeLock创建了两个ReleasableLock,一个为readLock,一个为writeLock;由于ReleasableLock实现了Releasable接口(close方法),而该接口继承了java.lang.AutoCloseable接口,因而可以直接利用try with resources语法来自动close,从而释放锁

小结

  • ReleasableLock实现了Releasable接口(close方法);它的构造器要求输入Lock参数,只有在开启了assertions的条件下才会初始化holdingThreads;isHeldByCurrentThread方法判断调用线程是否正在使用lock
  • acquire方法首先调用lock的lock方法,然后利用assert来断言addCurrentThread方法,该方法会增加调用线程正在使用lock的次数
  • close方法首先调用lock的unlock方法,然后利用assert来断言removeCurrentThread方法,该方法会减少调用线程正在使用lock的次数
ReleasableLock实现了Releasable接口(close方法),而该接口继承了java.lang.AutoCloseable接口,因而可以直接利用try with resources语法来自动close,从而释放锁

doc

  • ReleasableLock