聊聊Elasticsearch的ReleasableLock

49次阅读

共计 7992 个字符,预计需要花费 20 分钟才能阅读完成。

本文主要研究一下 Elasticsearch 的 ReleasableLock

ReleasableLock

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/ReleasableLock.java

public class ReleasableLock implements Releasable {
    private final Lock lock;


    // a per-thread count indicating how many times the thread has entered the lock; only works if assertions are enabled
    private final ThreadLocal<Integer> holdingThreads;

    public ReleasableLock(Lock lock) {
        this.lock = lock;
        if (Assertions.ENABLED) {holdingThreads = new ThreadLocal<>();
        } else {holdingThreads = null;}
    }

    @Override
    public void close() {lock.unlock();
        assert removeCurrentThread();}


    public ReleasableLock acquire() throws EngineException {lock.lock();
        assert addCurrentThread();
        return this;
    }

    private boolean addCurrentThread() {final Integer current = holdingThreads.get();
        holdingThreads.set(current == null ? 1 : current + 1);
        return true;
    }

    private boolean removeCurrentThread() {final Integer count = holdingThreads.get();
        assert count != null && count > 0;
        if (count == 1) {holdingThreads.remove();
        } else {holdingThreads.set(count - 1);
        }
        return true;
    }

    public boolean isHeldByCurrentThread() {if (holdingThreads == null) {throw new UnsupportedOperationException("asserts must be enabled");
        }
        final Integer count = holdingThreads.get();
        return count != null && count > 0;
    }
}
  • ReleasableLock 实现了 Releasable 接口 (close 方法 );它的构造器要求输入 Lock 参数,只有在开启了 assertions 的条件下才会初始化 holdingThreads;isHeldByCurrentThread 方法判断调用线程是否正在使用 lock
  • acquire 方法首先调用 lock 的 lock 方法,然后利用 assert 来断言 addCurrentThread 方法,该方法会增加调用线程正在使用 lock 的次数
  • close 方法首先调用 lock 的 unlock 方法,然后利用 assert 来断言 removeCurrentThread 方法,该方法会减少调用线程正在使用 lock 的次数

ReleasableLockTests

elasticsearch-7.0.1/server/src/test/java/org/elasticsearch/common/util/concurrent/ReleasableLockTests.java

public class ReleasableLockTests extends ESTestCase {

    /**
     * Test that accounting on whether or not a thread holds a releasable lock is correct. Previously we had a bug where on a re-entrant
     * lock that if a thread entered the lock twice we would declare that it does not hold the lock after it exits its first entrance but
     * not its second entrance.
     *
     * @throws BrokenBarrierException if awaiting on the synchronization barrier breaks
     * @throws InterruptedException   if awaiting on the synchronization barrier is interrupted
     */
    public void testIsHeldByCurrentThread() throws BrokenBarrierException, InterruptedException {final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        final ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock());
        final ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock());

        final int numberOfThreads = scaledRandomIntBetween(1, 32);
        final int iterations = scaledRandomIntBetween(1, 32);
        final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads);
        final List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < numberOfThreads; i++) {final Thread thread = new Thread(() -> {
                try {barrier.await();
                } catch (final BrokenBarrierException | InterruptedException e) {throw new RuntimeException(e);
                }
                for (int j = 0; j < iterations; j++) {if (randomBoolean()) {acquire(readLock, writeLock);
                    } else {acquire(writeLock, readLock);
                    }
                }
                try {barrier.await();
                } catch (final BrokenBarrierException | InterruptedException e) {throw new RuntimeException(e);
                }
            });
            threads.add(thread);
            thread.start();}

        barrier.await();
        barrier.await();
        for (final Thread thread : threads) {thread.join();
        }
    }

    private void acquire(final ReleasableLock lockToAcquire, final ReleasableLock otherLock) {try (@SuppressWarnings("unused") Releasable outer = lockToAcquire.acquire()) {assertTrue(lockToAcquire.isHeldByCurrentThread());
            assertFalse(otherLock.isHeldByCurrentThread());
            try (@SuppressWarnings("unused") Releasable inner = lockToAcquire.acquire()) {assertTrue(lockToAcquire.isHeldByCurrentThread());
                assertFalse(otherLock.isHeldByCurrentThread());
            }
            // previously there was a bug here and this would return false
            assertTrue(lockToAcquire.isHeldByCurrentThread());
            assertFalse(otherLock.isHeldByCurrentThread());
        }
        assertFalse(lockToAcquire.isHeldByCurrentThread());
        assertFalse(otherLock.isHeldByCurrentThread());
    }

}
  • ReleasableLockTests 使用多线程随机执行 acquire,该方法断言 lockToAcquire 被当前线程持有,而 otherLock 不被当前线程持有

Cache.CacheSegment

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/cache/Cache.java

    private static class CacheSegment<K, V> {
        // read/write lock protecting mutations to the segment
        ReadWriteLock segmentLock = new ReentrantReadWriteLock();

        ReleasableLock readLock = new ReleasableLock(segmentLock.readLock());
        ReleasableLock writeLock = new ReleasableLock(segmentLock.writeLock());

        Map<K, CompletableFuture<Entry<K, V>>> map = new HashMap<>();

        SegmentStats segmentStats = new SegmentStats();

        /**
         * get an entry from the segment; expired entries will be returned as null but not removed from the cache until the LRU list is
         * pruned or a manual {@link Cache#refresh()} is performed however a caller can take action using the provided callback
         *
         * @param key       the key of the entry to get from the cache
         * @param now       the access time of this entry
         * @param isExpired test if the entry is expired
         * @param onExpiration a callback if the entry associated to the key is expired
         * @return the entry if there was one, otherwise null
         */
        Entry<K, V> get(K key, long now, Predicate<Entry<K, V>> isExpired, Consumer<Entry<K, V>> onExpiration) {
            CompletableFuture<Entry<K, V>> future;
            try (ReleasableLock ignored = readLock.acquire()) {future = map.get(key);
            }
            if (future != null) {
                Entry<K, V> entry;
                try {entry = future.get();
                } catch (ExecutionException e) {assert future.isCompletedExceptionally();
                    segmentStats.miss();
                    return null;
                } catch (InterruptedException e) {throw new IllegalStateException(e);
                }
                if (isExpired.test(entry)) {segmentStats.miss();
                    onExpiration.accept(entry);
                    return null;
                } else {segmentStats.hit();
                    entry.accessTime = now;
                    return entry;
                }
            } else {segmentStats.miss();
                return null;
            }
        }

        /**
         * put an entry into the segment
         *
         * @param key   the key of the entry to add to the cache
         * @param value the value of the entry to add to the cache
         * @param now   the access time of this entry
         * @return a tuple of the new entry and the existing entry, if there was one otherwise null
         */
        Tuple<Entry<K, V>, Entry<K, V>> put(K key, V value, long now) {Entry<K, V> entry = new Entry<>(key, value, now);
            Entry<K, V> existing = null;
            try (ReleasableLock ignored = writeLock.acquire()) {
                try {CompletableFuture<Entry<K, V>> future = map.put(key, CompletableFuture.completedFuture(entry));
                    if (future != null) {existing = future.handle((ok, ex) -> {if (ok != null) {return ok;} else {return null;}
                        }).get();}
                } catch (ExecutionException | InterruptedException e) {throw new IllegalStateException(e);
                }
            }
            return Tuple.tuple(entry, existing);
        }

        /**
         * remove an entry from the segment
         *
         * @param key       the key of the entry to remove from the cache
         * @param onRemoval a callback for the removed entry
         */
        void remove(K key, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) {
            CompletableFuture<Entry<K, V>> future;
            try (ReleasableLock ignored = writeLock.acquire()) {future = map.remove(key);
            }
            if (future != null) {segmentStats.eviction();
                onRemoval.accept(future);
            }
        }

        /**
         * remove an entry from the segment iff the future is done and the value is equal to the
         * expected value
         *
         * @param key the key of the entry to remove from the cache
         * @param value the value expected to be associated with the key
         * @param onRemoval a callback for the removed entry
         */
        void remove(K key, V value, Consumer<CompletableFuture<Entry<K, V>>> onRemoval) {
            CompletableFuture<Entry<K, V>> future;
            boolean removed = false;
            try (ReleasableLock ignored = writeLock.acquire()) {future = map.get(key);
                try {if (future != null) {if (future.isDone()) {Entry<K, V> entry = future.get();
                            if (Objects.equals(value, entry.value)) {removed = map.remove(key, future);
                            }
                        }
                    }
                } catch (ExecutionException | InterruptedException e) {throw new IllegalStateException(e);
                }
            }

            if (future != null && removed) {segmentStats.eviction();
                onRemoval.accept(future);
            }
        }

        private static class SegmentStats {private final LongAdder hits = new LongAdder();
            private final LongAdder misses = new LongAdder();
            private final LongAdder evictions = new LongAdder();

            void hit() {hits.increment();
            }

            void miss() {misses.increment();
            }

            void eviction() {evictions.increment();
            }
        }
    }
  • CacheSegment 使用 ReentrantReadWriteLock 的 readLock 及 writeLock 创建了两个 ReleasableLock,一个为 readLock,一个为 writeLock;由于 ReleasableLock 实现了 Releasable 接口 (close 方法 ),而该接口继承了 java.lang.AutoCloseable 接口,因而可以直接利用 try with resources 语法来自动 close,从而释放锁

小结

  • ReleasableLock 实现了 Releasable 接口 (close 方法 );它的构造器要求输入 Lock 参数,只有在开启了 assertions 的条件下才会初始化 holdingThreads;isHeldByCurrentThread 方法判断调用线程是否正在使用 lock
  • acquire 方法首先调用 lock 的 lock 方法,然后利用 assert 来断言 addCurrentThread 方法,该方法会增加调用线程正在使用 lock 的次数
  • close 方法首先调用 lock 的 unlock 方法,然后利用 assert 来断言 removeCurrentThread 方法,该方法会减少调用线程正在使用 lock 的次数

ReleasableLock 实现了 Releasable 接口 (close 方法 ),而该接口继承了 java.lang.AutoCloseable 接口,因而可以直接利用 try with resources 语法来自动 close,从而释放锁

doc

  • ReleasableLock

正文完
 0