本文主要研究一下dubbo的EagerThreadPool

EagerThreadPool

dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPool.java

public class EagerThreadPool implements ThreadPool {    @Override    public Executor getExecutor(URL url) {        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);        int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);        int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);        // init queue and executor        TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);        EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,                threads,                alive,                TimeUnit.MILLISECONDS,                taskQueue,                new NamedInternalThreadFactory(name, true),                new AbortPolicyWithReport(name, url));        taskQueue.setExecutor(executor);        return executor;    }}
  • EagerThreadPool实现了ThreadPool接口,其getExecutor创建的是EagerThreadPoolExecutor,它使用的queue为TaskQueue,使用的threadFactory为NamedInternalThreadFactory,使用的rejectedExecutionHandler为AbortPolicyWithReport

EagerThreadPoolExecutor

dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutor.java

public class EagerThreadPoolExecutor extends ThreadPoolExecutor {    /**     * task count     */    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);    public EagerThreadPoolExecutor(int corePoolSize,                                   int maximumPoolSize,                                   long keepAliveTime,                                   TimeUnit unit, TaskQueue<Runnable> workQueue,                                   ThreadFactory threadFactory,                                   RejectedExecutionHandler handler) {        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);    }    /**     * @return current tasks which are executed     */    public int getSubmittedTaskCount() {        return submittedTaskCount.get();    }    @Override    protected void afterExecute(Runnable r, Throwable t) {        submittedTaskCount.decrementAndGet();    }    @Override    public void execute(Runnable command) {        if (command == null) {            throw new NullPointerException();        }        // do not increment in method beforeExecute!        submittedTaskCount.incrementAndGet();        try {            super.execute(command);        } catch (RejectedExecutionException rx) {            // retry to offer the task into queue.            final TaskQueue queue = (TaskQueue) super.getQueue();            try {                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {                    submittedTaskCount.decrementAndGet();                    throw new RejectedExecutionException("Queue capacity is full.", rx);                }            } catch (InterruptedException x) {                submittedTaskCount.decrementAndGet();                throw new RejectedExecutionException(x);            }        } catch (Throwable t) {            // decrease any way            submittedTaskCount.decrementAndGet();            throw t;        }    }}
  • EagerThreadPoolExecutor继承了ThreadPoolExecutor,它维护了submittedTaskCount,在执行任务之前递增,在afterExecute的时候胡递减;其execute方法会捕获RejectedExecutionException,然后使用TaskQueue的retryOffer再重新入队,入队不成功才抛出RejectedExecutionException

TaskQueue

dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueue.java

public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {    private static final long serialVersionUID = -2635853580887179627L;    private EagerThreadPoolExecutor executor;    public TaskQueue(int capacity) {        super(capacity);    }    public void setExecutor(EagerThreadPoolExecutor exec) {        executor = exec;    }    @Override    public boolean offer(Runnable runnable) {        if (executor == null) {            throw new RejectedExecutionException("The task queue does not have executor!");        }        int currentPoolThreadSize = executor.getPoolSize();        // have free worker. put task into queue to let the worker deal with task.        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {            return super.offer(runnable);        }        // return false to let executor create new worker.        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {            return false;        }        // currentPoolThreadSize >= max        return super.offer(runnable);    }    /**     * retry offer task     *     * @param o task     * @return offer success or not     * @throws RejectedExecutionException if executor is terminated.     */    public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {        if (executor.isShutdown()) {            throw new RejectedExecutionException("Executor is shutdown!");        }        return super.offer(o, timeout, unit);    }}
  • TaskQueue继承了LinkedBlockingQueue,它覆盖了offer方法,该方法在submittedTaskCount小于poolSize的时候会入队,如果大于等于poolSize则再判断currentPoolThreadSize是否小于maximumPoolSize,如果小于则返回false让线程池创建新线程,最后在currentPoolThreadSize大于等于maximumPoolSize的时候入队

NamedInternalThreadFactory

dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/threadlocal/NamedInternalThreadFactory.java

public class NamedInternalThreadFactory extends NamedThreadFactory {    public NamedInternalThreadFactory() {        super();    }    public NamedInternalThreadFactory(String prefix) {        super(prefix, false);    }    public NamedInternalThreadFactory(String prefix, boolean daemon) {        super(prefix, daemon);    }    @Override    public Thread newThread(Runnable runnable) {        String name = mPrefix + mThreadNum.getAndIncrement();        InternalThread ret = new InternalThread(mGroup, runnable, name, 0);        ret.setDaemon(mDaemon);        return ret;    }}
  • NamedInternalThreadFactory继承了NamedThreadFactory,这里创建的是InternalThread

AbortPolicyWithReport

dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/AbortPolicyWithReport.java

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {    protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);    private final String threadName;    private final URL url;    private static volatile long lastPrintTime = 0;    private static final long TEN_MINUTES_MILLS = 10 * 60 * 1000;    private static final String OS_WIN_PREFIX = "win";    private static final String OS_NAME_KEY = "os.name";    private static final String WIN_DATETIME_FORMAT = "yyyy-MM-dd_HH-mm-ss";    private static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd_HH:mm:ss";    private static Semaphore guard = new Semaphore(1);    public AbortPolicyWithReport(String threadName, URL url) {        this.threadName = threadName;        this.url = url;    }    @Override    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {        String msg = String.format("Thread pool is EXHAUSTED!" +                " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: "                + "%d)," +                " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",            threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),            e.getLargestPoolSize(),            e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),            url.getProtocol(), url.getIp(), url.getPort());        logger.warn(msg);        dumpJStack();        throw new RejectedExecutionException(msg);    }    private void dumpJStack() {        long now = System.currentTimeMillis();        //dump every 10 minutes        if (now - lastPrintTime < TEN_MINUTES_MILLS) {            return;        }        if (!guard.tryAcquire()) {            return;        }        ExecutorService pool = Executors.newSingleThreadExecutor();        pool.execute(() -> {            String dumpPath = url.getParameter(DUMP_DIRECTORY, System.getProperty("user.home"));            SimpleDateFormat sdf;            String os = System.getProperty(OS_NAME_KEY).toLowerCase();            // window system don't support ":" in file name            if (os.contains(OS_WIN_PREFIX)) {                sdf = new SimpleDateFormat(WIN_DATETIME_FORMAT);            } else {                sdf = new SimpleDateFormat(DEFAULT_DATETIME_FORMAT);            }            String dateStr = sdf.format(new Date());            //try-with-resources            try (FileOutputStream jStackStream = new FileOutputStream(                new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {                JVMUtil.jstack(jStackStream);            } catch (Throwable t) {                logger.error("dump jStack error", t);            } finally {                guard.release();            }            lastPrintTime = System.currentTimeMillis();        });        //must shutdown thread pool ,if not will lead to OOM        pool.shutdown();    }}
  • AbortPolicyWithReport继承了ThreadPoolExecutor.AbortPolicy,其rejectedExecution方法会输出包含thread pool相关信息的msg,然后使用warn级别打印出来,然后进行dumpJStack,最后再抛出RejectedExecutionException

小结

EagerThreadPool实现了ThreadPool接口,其getExecutor创建的是EagerThreadPoolExecutor,它使用的queue为TaskQueue,使用的threadFactory为NamedInternalThreadFactory,使用的rejectedExecutionHandler为AbortPolicyWithReport

doc

  • EagerThreadPool