本文主要研究一下Elasticsearch的RunOnce

RunOnce

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/RunOnce.java

public class RunOnce implements Runnable {    private final Runnable delegate;    private final AtomicBoolean hasRun;    public RunOnce(final Runnable delegate) {        this.delegate = Objects.requireNonNull(delegate);        this.hasRun = new AtomicBoolean(false);    }    @Override    public void run() {        if (hasRun.compareAndSet(false, true)) {            delegate.run();        }    }    /**     * {@code true} if the {@link RunOnce} has been executed once.     */    public boolean hasRun() {        return hasRun.get();    }}
  • RunOnce实现了Runnable接口,它的构造器要求输入Runnable,同时构造了hasRun变量;run方法会先使用compareAndSet将hasRun由false设置为true,如果成功则执行代理的Runnable的run方法;hasRun方法则返回hasRun值

实例

elasticsearch-7.0.1/server/src/test/java/org/elasticsearch/common/util/concurrent/RunOnceTests.java

public class RunOnceTests extends ESTestCase {    public void testRunOnce() {        final AtomicInteger counter = new AtomicInteger(0);        final RunOnce runOnce = new RunOnce(counter::incrementAndGet);        assertFalse(runOnce.hasRun());        runOnce.run();        assertTrue(runOnce.hasRun());        assertEquals(1, counter.get());        runOnce.run();        assertTrue(runOnce.hasRun());        assertEquals(1, counter.get());    }    public void testRunOnceConcurrently() throws InterruptedException {        final AtomicInteger counter = new AtomicInteger(0);        final RunOnce runOnce = new RunOnce(counter::incrementAndGet);        final Thread[] threads = new Thread[between(3, 10)];        final CountDownLatch latch = new CountDownLatch(1);        for (int i = 0; i < threads.length; i++) {            threads[i] = new Thread(() -> {                try {                    latch.await();                } catch (InterruptedException e) {                    throw new RuntimeException(e);                }                runOnce.run();            });            threads[i].start();        }        latch.countDown();        for (Thread thread : threads) {            thread.join();        }        assertTrue(runOnce.hasRun());        assertEquals(1, counter.get());    }    public void testRunOnceWithAbstractRunnable() {        final AtomicInteger onRun = new AtomicInteger(0);        final AtomicInteger onFailure = new AtomicInteger(0);        final AtomicInteger onAfter = new AtomicInteger(0);        final RunOnce runOnce = new RunOnce(new AbstractRunnable() {            @Override            protected void doRun() throws Exception {                onRun.incrementAndGet();                throw new RuntimeException("failure");            }            @Override            public void onFailure(Exception e) {                onFailure.incrementAndGet();            }            @Override            public void onAfter() {                onAfter.incrementAndGet();            }        });        final int iterations = randomIntBetween(1, 10);        for (int i = 0; i < iterations; i++) {            runOnce.run();            assertEquals(1, onRun.get());            assertEquals(1, onFailure.get());            assertEquals(1, onAfter.get());            assertTrue(runOnce.hasRun());        }    }}
  • testRunOnce方法验证了顺序多次执行runOnce的场景;testRunOnceConcurrently方法则验证了并发多次执行runOnce的场景;testRunOnceWithAbstractRunnable则验证了使用AbstractRunnable作为runnable的场景

小结

RunOnce实现了Runnable接口,它的构造器要求输入Runnable,同时构造了hasRun变量;run方法会先使用compareAndSet将hasRun由false设置为true,如果成功则执行代理的Runnable的run方法;hasRun方法则返回hasRun值

doc

  • RunOnce