乐趣区

聊聊Elasticsearch的EsThreadPoolExecutor

本文主要研究一下 Elasticsearch 的 EsThreadPoolExecutor

EsThreadPoolExecutor

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java

public class EsThreadPoolExecutor extends ThreadPoolExecutor {

    private final ThreadContext contextHolder;
    private volatile ShutdownListener listener;

    private final Object monitor = new Object();
    /**
     * Name used in error reporting.
     */
    private final String name;

    final String getName() {return name;}

    EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, ThreadContext contextHolder) {this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy(), contextHolder);
    }

    @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors")
    EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler,
            ThreadContext contextHolder) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.name = name;
        this.contextHolder = contextHolder;
    }

    public void shutdown(ShutdownListener listener) {synchronized (monitor) {if (this.listener != null) {throw new IllegalStateException("Shutdown was already called on this thread pool");
            }
            if (isTerminated()) {listener.onTerminated();
            } else {this.listener = listener;}
        }
        shutdown();}

    @Override
    protected synchronized void terminated() {super.terminated();
        synchronized (monitor) {if (listener != null) {
                try {listener.onTerminated();
                } finally {listener = null;}
            }
        }
    }

    public interface ShutdownListener {void onTerminated();
    }

    @Override
    public void execute(Runnable command) {command = wrapRunnable(command);
        try {super.execute(command);
        } catch (EsRejectedExecutionException ex) {if (command instanceof AbstractRunnable) {
                // If we are an abstract runnable we can handle the rejection
                // directly and don't need to rethrow it.
                try {((AbstractRunnable) command).onRejection(ex);
                } finally {((AbstractRunnable) command).onAfter();}
            } else {throw ex;}
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {super.afterExecute(r, t);
        EsExecutors.rethrowErrors(unwrap(r));
        assert assertDefaultContext(r);
    }

    private boolean assertDefaultContext(Runnable r) {
        try {assert contextHolder.isDefaultContext() : "the thread context is not the default context and the thread [" +
                Thread.currentThread().getName() + "] is being returned to the pool after executing [" + r + "]";
        } catch (IllegalStateException ex) {
            // sometimes we execute on a closed context and isDefaultContext doen't bypass the ensureOpen checks
            // this must not trigger an exception here since we only assert if the default is restored and
            // we don't really care if we are closed
            if (contextHolder.isClosed() == false) {throw ex;}
        }
        return true;
    }

    /**
     * Returns a stream of all pending tasks. This is similar to {@link #getQueue()} but will expose the originally submitted
     * {@link Runnable} instances rather than potentially wrapped ones.
     */
    public Stream<Runnable> getTasks() {return this.getQueue().stream().map(this::unwrap);
    }

    @Override
    public final String toString() {StringBuilder b = new StringBuilder();
        b.append(getClass().getSimpleName()).append('[');
        b.append("name =").append(name).append(",");
        if (getQueue() instanceof SizeBlockingQueue) {@SuppressWarnings("rawtypes")
            SizeBlockingQueue queue = (SizeBlockingQueue) getQueue();
            b.append("queue capacity =").append(queue.capacity()).append(",");
        }
        appendThreadPoolExecutorDetails(b);
        /*
         * ThreadPoolExecutor has some nice information in its toString but we
         * can't get at it easily without just getting the toString.
         */
        b.append(super.toString()).append(']');
        return b.toString();}

    /**
     * Append details about this thread pool to the specified {@link StringBuilder}. All details should be appended as key/value pairs in
     * the form "%s = %s,"
     *
     * @param sb the {@link StringBuilder} to append to
     */
    protected void appendThreadPoolExecutorDetails(final StringBuilder sb) { }

    protected Runnable wrapRunnable(Runnable command) {return contextHolder.preserveContext(command);
    }

    protected Runnable unwrap(Runnable runnable) {return contextHolder.unwrap(runnable);
    }
}
  • EsThreadPoolExecutor 继承了 ThreadPoolExecutor,它提供了两个构造器,它们要求 RejectedExecutionHandler 为 XRejectedExecutionHandler 类型,其中一个构造器默认为 EsAbortPolicy,它们还要求传入 ThreadContext
  • 它覆盖了 terminated、execute、afterExecute 方法,其中 terminated 方法会回调 listener.onTerminated();execute 方法会捕获 EsRejectedExecutionException 异常,在 command 为 AbstractRunnable 类型时回调其 onRejection 及 onAfter 方法;afterExecute 方法会执行 EsExecutors.rethrowErrors(unwrap(r)) 方法
  • 它提供了 wrapRunnable 及 unwrap 方法,分别会调用 contextHolder.preserveContext 及 contextHolder.unwrap 方法

XRejectedExecutionHandler

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/XRejectedExecutionHandler.java

public interface XRejectedExecutionHandler extends RejectedExecutionHandler {

    /**
     * The number of rejected executions.
     */
    long rejected();}
  • XRejectedExecutionHandler 接口继承了 RejectedExecutionHandler 接口,它定义了 rejected 方法返回 rejected 的数量;它有两个实现类分别为 EsAbortPolicy 及 ForceQueuePolicy

EsAbortPolicy

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java

public class EsAbortPolicy implements XRejectedExecutionHandler {private final CounterMetric rejected = new CounterMetric();

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {if (r instanceof AbstractRunnable) {if (((AbstractRunnable) r).isForceExecution()) {BlockingQueue<Runnable> queue = executor.getQueue();
                if (!(queue instanceof SizeBlockingQueue)) {throw new IllegalStateException("forced execution, but expected a size queue");
                }
                try {((SizeBlockingQueue) queue).forcePut(r);
                } catch (InterruptedException e) {Thread.currentThread().interrupt();
                    throw new IllegalStateException("forced execution, but got interrupted", e);
                }
                return;
            }
        }
        rejected.inc();
        throw new EsRejectedExecutionException("rejected execution of" + r + "on" + executor, executor.isShutdown());
    }

    @Override
    public long rejected() {return rejected.count();
    }
}
  • EsAbortPolicy 实现了 XRejectedExecutionHandler 接口,其内部使用 CounterMetric 类维护 rejected 数量,而 rejected 方法直接返回该值;rejectedExecution 方法对 AbstractRunnable 类型的 runnable 会判断是否 isForceExecution,且是 SizeBlockingQueue,则调用 SizeBlockingQueue 的 forcePut 方法重新 force 执行该 runnable,之后就是递增 rejected 计数

ForceQueuePolicy

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

    static class ForceQueuePolicy implements XRejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                // force queue policy should only be used with a scaling queue
                assert executor.getQueue() instanceof ExecutorScalingQueue;
                executor.getQueue().put(r);
            } catch (final InterruptedException e) {
                // a scaling queue never blocks so a put to it can never be interrupted
                throw new AssertionError(e);
            }
        }

        @Override
        public long rejected() {return 0;}

    }
  • ForceQueuePolicy 实现了 XRejectedExecutionHandler 接口,它的 rejectedExecution 方法仅仅对 ExecutorScalingQueue 进行重新入队操作,而 rejected 方法返回 0

AbstractRunnable

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRunnable.java

public abstract class AbstractRunnable implements Runnable {

    /**
     * Should the runnable force its execution in case it gets rejected?
     */
    public boolean isForceExecution() {return false;}

    @Override
    public final void run() {
        try {doRun();
        } catch (Exception t) {onFailure(t);
        } finally {onAfter();
        }
    }

    /**
     * This method is called in a finally block after successful execution
     * or on a rejection.
     */
    public void onAfter() {// nothing by default}

    /**
     * This method is invoked for all exception thrown by {@link #doRun()}
     */
    public abstract void onFailure(Exception e);

    /**
     * This should be executed if the thread-pool executing this action rejected the execution.
     * The default implementation forwards to {@link #onFailure(Exception)}
     */
    public void onRejection(Exception e) {onFailure(e);
    }

    /**
     * This method has the same semantics as {@link Runnable#run()}
     * @throws InterruptedException if the run method throws an InterruptedException
     */
    protected abstract void doRun() throws Exception;}
  • AbstractRunnable 声明实现 Runnable 接口,它的 run 方法分别会回调 doRun、onFailure、onAfter 方法;另外它还定义了 isForceExecution 方法用于确定当 rejected 的时候是否 force execution

小结

  • EsThreadPoolExecutor 继承了 ThreadPoolExecutor,它提供了两个构造器,它们要求 RejectedExecutionHandler 为 XRejectedExecutionHandler 类型,其中一个构造器默认为 EsAbortPolicy,它们还要求传入 ThreadContext
  • 它覆盖了 terminated、execute、afterExecute 方法,其中 terminated 方法会回调 listener.onTerminated();execute 方法会捕获 EsRejectedExecutionException 异常,在 command 为 AbstractRunnable 类型时回调其 onRejection 及 onAfter 方法;afterExecute 方法会执行 EsExecutors.rethrowErrors(unwrap(r)) 方法
  • XRejectedExecutionHandler 接口继承了 RejectedExecutionHandler 接口,它定义了 rejected 方法返回 rejected 的数量;它有两个实现类分别为 EsAbortPolicy 及 ForceQueuePolicy
  • EsAbortPolicy 实现了 XRejectedExecutionHandler 接口,其内部使用 CounterMetric 类维护 rejected 数量,而 rejected 方法直接返回该值;rejectedExecution 方法对 AbstractRunnable 类型的 runnable 会判断是否 isForceExecution,且是 SizeBlockingQueue,则调用 SizeBlockingQueue 的 forcePut 方法重新 force 执行该 runnable,之后就是递增 rejected 计数
  • ForceQueuePolicy 实现了 XRejectedExecutionHandler 接口,它的 rejectedExecution 方法仅仅对 ExecutorScalingQueue 进行重新入队操作,而 rejected 方法返回 0
  • AbstractRunnable 声明实现 Runnable 接口,它的 run 方法分别会回调 doRun、onFailure、onAfter 方法;另外它还定义了 isForceExecution 方法用于确定当 rejected 的时候是否 force execution

doc

  • EsThreadPoolExecutor
退出移动版