本文主要研究一下Elasticsearch的EsThreadPoolExecutor

EsThreadPoolExecutor

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java

public class EsThreadPoolExecutor extends ThreadPoolExecutor {    private final ThreadContext contextHolder;    private volatile ShutdownListener listener;    private final Object monitor = new Object();    /**     * Name used in error reporting.     */    private final String name;    final String getName() {        return name;    }    EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, ThreadContext contextHolder) {        this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy(), contextHolder);    }    @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors")    EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler,            ThreadContext contextHolder) {        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);        this.name = name;        this.contextHolder = contextHolder;    }    public void shutdown(ShutdownListener listener) {        synchronized (monitor) {            if (this.listener != null) {                throw new IllegalStateException("Shutdown was already called on this thread pool");            }            if (isTerminated()) {                listener.onTerminated();            } else {                this.listener = listener;            }        }        shutdown();    }    @Override    protected synchronized void terminated() {        super.terminated();        synchronized (monitor) {            if (listener != null) {                try {                    listener.onTerminated();                } finally {                    listener = null;                }            }        }    }    public interface ShutdownListener {        void onTerminated();    }    @Override    public void execute(Runnable command) {        command = wrapRunnable(command);        try {            super.execute(command);        } catch (EsRejectedExecutionException ex) {            if (command instanceof AbstractRunnable) {                // If we are an abstract runnable we can handle the rejection                // directly and don't need to rethrow it.                try {                    ((AbstractRunnable) command).onRejection(ex);                } finally {                    ((AbstractRunnable) command).onAfter();                }            } else {                throw ex;            }        }    }    @Override    protected void afterExecute(Runnable r, Throwable t) {        super.afterExecute(r, t);        EsExecutors.rethrowErrors(unwrap(r));        assert assertDefaultContext(r);    }    private boolean assertDefaultContext(Runnable r) {        try {            assert contextHolder.isDefaultContext() : "the thread context is not the default context and the thread [" +                Thread.currentThread().getName() + "] is being returned to the pool after executing [" + r + "]";        } catch (IllegalStateException ex) {            // sometimes we execute on a closed context and isDefaultContext doen't bypass the ensureOpen checks            // this must not trigger an exception here since we only assert if the default is restored and            // we don't really care if we are closed            if (contextHolder.isClosed() == false) {                throw ex;            }        }        return true;    }    /**     * Returns a stream of all pending tasks. This is similar to {@link #getQueue()} but will expose the originally submitted     * {@link Runnable} instances rather than potentially wrapped ones.     */    public Stream<Runnable> getTasks() {        return this.getQueue().stream().map(this::unwrap);    }    @Override    public final String toString() {        StringBuilder b = new StringBuilder();        b.append(getClass().getSimpleName()).append('[');        b.append("name = ").append(name).append(", ");        if (getQueue() instanceof SizeBlockingQueue) {            @SuppressWarnings("rawtypes")            SizeBlockingQueue queue = (SizeBlockingQueue) getQueue();            b.append("queue capacity = ").append(queue.capacity()).append(", ");        }        appendThreadPoolExecutorDetails(b);        /*         * ThreadPoolExecutor has some nice information in its toString but we         * can't get at it easily without just getting the toString.         */        b.append(super.toString()).append(']');        return b.toString();    }    /**     * Append details about this thread pool to the specified {@link StringBuilder}. All details should be appended as key/value pairs in     * the form "%s = %s, "     *     * @param sb the {@link StringBuilder} to append to     */    protected void appendThreadPoolExecutorDetails(final StringBuilder sb) {    }    protected Runnable wrapRunnable(Runnable command) {        return contextHolder.preserveContext(command);    }    protected Runnable unwrap(Runnable runnable) {        return contextHolder.unwrap(runnable);    }}
  • EsThreadPoolExecutor继承了ThreadPoolExecutor,它提供了两个构造器,它们要求RejectedExecutionHandler为XRejectedExecutionHandler类型,其中一个构造器默认为EsAbortPolicy,它们还要求传入ThreadContext
  • 它覆盖了terminated、execute、afterExecute方法,其中terminated方法会回调listener.onTerminated();execute方法会捕获EsRejectedExecutionException异常,在command为AbstractRunnable类型时回调其onRejection及onAfter方法;afterExecute方法会执行EsExecutors.rethrowErrors(unwrap(r))方法
  • 它提供了wrapRunnable及unwrap方法,分别会调用contextHolder.preserveContext及contextHolder.unwrap方法

XRejectedExecutionHandler

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/XRejectedExecutionHandler.java

public interface XRejectedExecutionHandler extends RejectedExecutionHandler {    /**     * The number of rejected executions.     */    long rejected();}
  • XRejectedExecutionHandler接口继承了RejectedExecutionHandler接口,它定义了rejected方法返回rejected的数量;它有两个实现类分别为EsAbortPolicy及ForceQueuePolicy

EsAbortPolicy

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java

public class EsAbortPolicy implements XRejectedExecutionHandler {    private final CounterMetric rejected = new CounterMetric();    @Override    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {        if (r instanceof AbstractRunnable) {            if (((AbstractRunnable) r).isForceExecution()) {                BlockingQueue<Runnable> queue = executor.getQueue();                if (!(queue instanceof SizeBlockingQueue)) {                    throw new IllegalStateException("forced execution, but expected a size queue");                }                try {                    ((SizeBlockingQueue) queue).forcePut(r);                } catch (InterruptedException e) {                    Thread.currentThread().interrupt();                    throw new IllegalStateException("forced execution, but got interrupted", e);                }                return;            }        }        rejected.inc();        throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown());    }    @Override    public long rejected() {        return rejected.count();    }}
  • EsAbortPolicy实现了XRejectedExecutionHandler接口,其内部使用CounterMetric类维护rejected数量,而rejected方法直接返回该值;rejectedExecution方法对AbstractRunnable类型的runnable会判断是否isForceExecution,且是SizeBlockingQueue,则调用SizeBlockingQueue的forcePut方法重新force执行该runnable,之后就是递增rejected计数

ForceQueuePolicy

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

    static class ForceQueuePolicy implements XRejectedExecutionHandler {        @Override        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {            try {                // force queue policy should only be used with a scaling queue                assert executor.getQueue() instanceof ExecutorScalingQueue;                executor.getQueue().put(r);            } catch (final InterruptedException e) {                // a scaling queue never blocks so a put to it can never be interrupted                throw new AssertionError(e);            }        }        @Override        public long rejected() {            return 0;        }    }
  • ForceQueuePolicy实现了XRejectedExecutionHandler接口,它的rejectedExecution方法仅仅对ExecutorScalingQueue进行重新入队操作,而rejected方法返回0

AbstractRunnable

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRunnable.java

public abstract class AbstractRunnable implements Runnable {    /**     * Should the runnable force its execution in case it gets rejected?     */    public boolean isForceExecution() {        return false;    }    @Override    public final void run() {        try {            doRun();        } catch (Exception t) {            onFailure(t);        } finally {            onAfter();        }    }    /**     * This method is called in a finally block after successful execution     * or on a rejection.     */    public void onAfter() {        // nothing by default    }    /**     * This method is invoked for all exception thrown by {@link #doRun()}     */    public abstract void onFailure(Exception e);    /**     * This should be executed if the thread-pool executing this action rejected the execution.     * The default implementation forwards to {@link #onFailure(Exception)}     */    public void onRejection(Exception e) {        onFailure(e);    }    /**     * This method has the same semantics as {@link Runnable#run()}     * @throws InterruptedException if the run method throws an InterruptedException     */    protected abstract void doRun() throws Exception;}
  • AbstractRunnable声明实现Runnable接口,它的run方法分别会回调doRun、onFailure、onAfter方法;另外它还定义了isForceExecution方法用于确定当rejected的时候是否force execution

小结

  • EsThreadPoolExecutor继承了ThreadPoolExecutor,它提供了两个构造器,它们要求RejectedExecutionHandler为XRejectedExecutionHandler类型,其中一个构造器默认为EsAbortPolicy,它们还要求传入ThreadContext
  • 它覆盖了terminated、execute、afterExecute方法,其中terminated方法会回调listener.onTerminated();execute方法会捕获EsRejectedExecutionException异常,在command为AbstractRunnable类型时回调其onRejection及onAfter方法;afterExecute方法会执行EsExecutors.rethrowErrors(unwrap(r))方法
  • XRejectedExecutionHandler接口继承了RejectedExecutionHandler接口,它定义了rejected方法返回rejected的数量;它有两个实现类分别为EsAbortPolicy及ForceQueuePolicy
  • EsAbortPolicy实现了XRejectedExecutionHandler接口,其内部使用CounterMetric类维护rejected数量,而rejected方法直接返回该值;rejectedExecution方法对AbstractRunnable类型的runnable会判断是否isForceExecution,且是SizeBlockingQueue,则调用SizeBlockingQueue的forcePut方法重新force执行该runnable,之后就是递增rejected计数
  • ForceQueuePolicy实现了XRejectedExecutionHandler接口,它的rejectedExecution方法仅仅对ExecutorScalingQueue进行重新入队操作,而rejected方法返回0
  • AbstractRunnable声明实现Runnable接口,它的run方法分别会回调doRun、onFailure、onAfter方法;另外它还定义了isForceExecution方法用于确定当rejected的时候是否force execution

doc

  • EsThreadPoolExecutor