共计 7442 个字符,预计需要花费 19 分钟才能阅读完成。
序
本文主要研究一下 dubbo 的 EagerThreadPool
EagerThreadPool
dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPool.java
public class EagerThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
// init queue and executor
TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
threads,
alive,
TimeUnit.MILLISECONDS,
taskQueue,
new NamedInternalThreadFactory(name, true),
new AbortPolicyWithReport(name, url));
taskQueue.setExecutor(executor);
return executor;
}
}
- EagerThreadPool 实现了 ThreadPool 接口,其 getExecutor 创建的是 EagerThreadPoolExecutor,它使用的 queue 为 TaskQueue,使用的 threadFactory 为 NamedInternalThreadFactory,使用的 rejectedExecutionHandler 为 AbortPolicyWithReport
EagerThreadPoolExecutor
dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutor.java
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {
/**
* task count
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
public EagerThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit, TaskQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
/**
* @return current tasks which are executed
*/
public int getSubmittedTaskCount() {return submittedTaskCount.get();
}
@Override
protected void afterExecute(Runnable r, Throwable t) {submittedTaskCount.decrementAndGet();
}
@Override
public void execute(Runnable command) {if (command == null) {throw new NullPointerException();
}
// do not increment in method beforeExecute!
submittedTaskCount.incrementAndGet();
try {super.execute(command);
} catch (RejectedExecutionException rx) {
// retry to offer the task into queue.
final TaskQueue queue = (TaskQueue) super.getQueue();
try {if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.", rx);
}
} catch (InterruptedException x) {submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
throw t;
}
}
}
- EagerThreadPoolExecutor 继承了 ThreadPoolExecutor,它维护了 submittedTaskCount,在执行任务之前递增,在 afterExecute 的时候胡递减;其 execute 方法会捕获 RejectedExecutionException,然后使用 TaskQueue 的 retryOffer 再重新入队,入队不成功才抛出 RejectedExecutionException
TaskQueue
dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueue.java
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = -2635853580887179627L;
private EagerThreadPoolExecutor executor;
public TaskQueue(int capacity) {super(capacity);
}
public void setExecutor(EagerThreadPoolExecutor exec) {executor = exec;}
@Override
public boolean offer(Runnable runnable) {if (executor == null) {throw new RejectedExecutionException("The task queue does not have executor!");
}
int currentPoolThreadSize = executor.getPoolSize();
// have free worker. put task into queue to let the worker deal with task.
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {return super.offer(runnable);
}
// return false to let executor create new worker.
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {return false;}
// currentPoolThreadSize >= max
return super.offer(runnable);
}
/**
* retry offer task
*
* @param o task
* @return offer success or not
* @throws RejectedExecutionException if executor is terminated.
*/
public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if (executor.isShutdown()) {throw new RejectedExecutionException("Executor is shutdown!");
}
return super.offer(o, timeout, unit);
}
}
- TaskQueue 继承了 LinkedBlockingQueue,它覆盖了 offer 方法,该方法在 submittedTaskCount 小于 poolSize 的时候会入队,如果大于等于 poolSize 则再判断 currentPoolThreadSize 是否小于 maximumPoolSize,如果小于则返回 false 让线程池创建新线程,最后在 currentPoolThreadSize 大于等于 maximumPoolSize 的时候入队
NamedInternalThreadFactory
dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/threadlocal/NamedInternalThreadFactory.java
public class NamedInternalThreadFactory extends NamedThreadFactory {public NamedInternalThreadFactory() {super();
}
public NamedInternalThreadFactory(String prefix) {super(prefix, false);
}
public NamedInternalThreadFactory(String prefix, boolean daemon) {super(prefix, daemon);
}
@Override
public Thread newThread(Runnable runnable) {String name = mPrefix + mThreadNum.getAndIncrement();
InternalThread ret = new InternalThread(mGroup, runnable, name, 0);
ret.setDaemon(mDaemon);
return ret;
}
}
- NamedInternalThreadFactory 继承了 NamedThreadFactory,这里创建的是 InternalThread
AbortPolicyWithReport
dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/AbortPolicyWithReport.java
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
private final String threadName;
private final URL url;
private static volatile long lastPrintTime = 0;
private static final long TEN_MINUTES_MILLS = 10 * 60 * 1000;
private static final String OS_WIN_PREFIX = "win";
private static final String OS_NAME_KEY = "os.name";
private static final String WIN_DATETIME_FORMAT = "yyyy-MM-dd_HH-mm-ss";
private static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd_HH:mm:ss";
private static Semaphore guard = new Semaphore(1);
public AbortPolicyWithReport(String threadName, URL url) {
this.threadName = threadName;
this.url = url;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String msg = String.format("Thread pool is EXHAUSTED!" +
"Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed:"
+ "%d)," +
"Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),
e.getLargestPoolSize(),
e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
url.getProtocol(), url.getIp(), url.getPort());
logger.warn(msg);
dumpJStack();
throw new RejectedExecutionException(msg);
}
private void dumpJStack() {long now = System.currentTimeMillis();
//dump every 10 minutes
if (now - lastPrintTime < TEN_MINUTES_MILLS) {return;}
if (!guard.tryAcquire()) {return;}
ExecutorService pool = Executors.newSingleThreadExecutor();
pool.execute(() -> {String dumpPath = url.getParameter(DUMP_DIRECTORY, System.getProperty("user.home"));
SimpleDateFormat sdf;
String os = System.getProperty(OS_NAME_KEY).toLowerCase();
// window system don't support":" in file name
if (os.contains(OS_WIN_PREFIX)) {sdf = new SimpleDateFormat(WIN_DATETIME_FORMAT);
} else {sdf = new SimpleDateFormat(DEFAULT_DATETIME_FORMAT);
}
String dateStr = sdf.format(new Date());
//try-with-resources
try (FileOutputStream jStackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {JVMUtil.jstack(jStackStream);
} catch (Throwable t) {logger.error("dump jStack error", t);
} finally {guard.release();
}
lastPrintTime = System.currentTimeMillis();});
//must shutdown thread pool ,if not will lead to OOM
pool.shutdown();}
}
- AbortPolicyWithReport 继承了 ThreadPoolExecutor.AbortPolicy,其 rejectedExecution 方法会输出包含 thread pool 相关信息的 msg,然后使用 warn 级别打印出来,然后进行 dumpJStack,最后再抛出 RejectedExecutionException
小结
EagerThreadPool 实现了 ThreadPool 接口,其 getExecutor 创建的是 EagerThreadPoolExecutor,它使用的 queue 为 TaskQueue,使用的 threadFactory 为 NamedInternalThreadFactory,使用的 rejectedExecutionHandler 为 AbortPolicyWithReport
doc
- EagerThreadPool