Tomcat 版本
- Spring boot 内嵌 Tomcat-embed-core - 9.0.30
- 等价于独立部署的 Tomcat 9.0.30
StandardThreadExecutor
StandardThreadExecutor 是 Catalina 结构中的一部分,是 Tomcat 生命周期中的池化线程资源的封装。类总览:
public class StandardThreadExecutor extends LifecycleMBeanBase implements Executor, ResizableExecutor { // 统一的 String 的管理类,用来防止编码等一系列的问题 protected static final StringManager sm = StringManager.getManager(Constants.Package); // 创建出来的线程的优先级 protected int threadPriority = Thread.NORM_PRIORITY; // 线程是否是守护线程,默认为是 protected boolean daemon = true; // 线程名称 protected String namePrefix = "tomcat-exec-"; // 默认的最大线程数量 protected int maxThreads = 200; // 默认的最小线程数量 protected int minSpareThreads = 25; // 存在时间 protected int maxIdleTime = 60000; // 真实工作的 ThreadPoolExecutor // 本质是 StandardThreadExecutor 只是 ThreadPoolExecutor 的装饰器 // 此处的对象类型是 org.apache.tomcat.util.threads.ThreadPoolExecutor protected ThreadPoolExecutor executor = null; // 线程池名称 protected String name; // 是否开启线程池最小线程数量,如果此处为 false 的话,minSpareThreads 就没有意义 protected boolean prestartminSpareThreads = false; // 默认的任务队列长度 protected int maxQueueSize = Integer.MAX_VALUE; // 重建线程的时间间隔 protected long threadRenewalDelay = org.apache.tomcat.util.threads.Constants.DEFAULT_THREAD_RENEWAL_DELAY; // 任务队列 private TaskQueue taskqueue = null; // 其它方法暂时忽略}
生命周期
/** * 初始化线程池 */@Overrideprotected void initInternal() throws LifecycleException { super.initInternal();}/** * 开始线程池 */@Overrideprotected void startInternal() throws LifecycleException { // 任务队列 taskqueue = new TaskQueue(maxQueueSize); // 线程工厂 TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority()); // 初始化线程池 executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf); // 重建线程的时间间隔 executor.setThreadRenewalDelay(threadRenewalDelay); // 设置线程池最小线程数量 if (prestartminSpareThreads) { executor.prestartAllCoreThreads(); } // 线程池任务队列的 parent taskqueue.setParent(executor); // 设置组件的生命周期状态 setState(LifecycleState.STARTING);}/** * 关闭线程池 */@Overrideprotected void stopInternal() throws LifecycleException { setState(LifecycleState.STOPPING); if (executor != null) { executor.shutdownNow(); } executor = null; taskqueue = null;}/** * 清除线程池 */@Overrideprotected void destroyInternal() throws LifecycleException { super.destroyInternal();}/** * 关闭线程池 */public void contextStopping() { if (executor != null) { executor.contextStopping(); }}
任务执行
/** * 加入一个带超时的任务 **/@Overridepublic void execute(Runnable command, long timeout, TimeUnit unit) { // 调用 executor 对象去执行 // 如果 executor 对象是空的,则抛出异常 if (executor != null) { executor.execute(command,timeout,unit); } else { throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); }}/** * 加入一个不带超时的任务 **/@Overridepublic void execute(Runnable command) { // 逻辑基本同上 if (executor != null) { try { executor.execute(command); } catch (RejectedExecutionException rx) { // 此处会再尝试将任务加入一次等待队列中 // TaskQueue.force(...) 方法底层会调用 Queue.offer(...) 方法 // 如果仍然失败,会抛出异常 if (!((TaskQueue) executor.getQueue()).force(command)) { throw new RejectedExecutionException(sm.getString("standardThreadExecutor.queueFull")); } } } else { throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted")); }}
其它方法基本都是 get / set 方法,可以忽略。
ThreadPoolExecutor
org.apache.tomcat.util.threads.ThreadPoolExecutor 是 Tomcat 中的线程池类。
类和变量总览:
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor { // 统一的 String 的管理类,用来防止编码等一系列的问题 protected static final StringManager sm = StringManager .getManager("org.apache.tomcat.util.threads.res"); // 任务提交数量的计数 private final AtomicInteger submittedCount = new AtomicInteger(0); private final AtomicLong lastContextStoppedTime = new AtomicLong(0L); // 线程自杀计数 private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L); // 重建线程的时间间隔 private long threadRenewalDelay = Constants.DEFAULT_THREAD_RENEWAL_DELAY; // 其它方法暂时忽略 // ...
构造器
/** * 基本是沿用了 java.util.concurrent.ThreadPoolExecutor 的构造方法 **/public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { // 调用 juc 中的 ThreadPoolExecutor 进行线程池的初始化 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); // 用于初始化常驻线程 prestartAllCoreThreads();}// 下列的构造方法都差不多public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); prestartAllCoreThreads();}public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler()); prestartAllCoreThreads();}public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new RejectHandler()); prestartAllCoreThreads();}
拒绝策略
Tomcat 中的拒绝策略为 ThreadPoolExecutor 的一个内部类 RejectHandler,直接抛出错误:
// ThreadPoolExecutor.classprivate static class RejectHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, java.util.concurrent.ThreadPoolExecutor executor) { // 直接抛出错误 throw new RejectedExecutionException(); }}
任务执行
Tomcat 的 ThreadPoolExecutor 中执行任务复写了父接口中的 execute(...) 方法:
@Overridepublic void execute(Runnable command) { execute(command,0,TimeUnit.MILLISECONDS);}public void execute(Runnable command, long timeout, TimeUnit unit) { // 提交计数加一 submittedCount.incrementAndGet(); try { // 此处调用 java.util.concurrent.ThreadPoolExecutor 中的 execute(...) 方法 super.execute(command); } catch (RejectedExecutionException rx) { // 如果调用父类中的方法执行错误,会尝试将任务再一次放入到等待队列里 if (super.getQueue() instanceof TaskQueue) { final TaskQueue queue = (TaskQueue)super.getQueue(); try { // 此处尝试放入等待队列 // 如果也失败了,就回滚提交计数,并抛出异常 if (!queue.force(command, timeout, unit)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull")); } } catch (InterruptedException x) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(x); } } else { submittedCount.decrementAndGet(); throw rx; } }}@Overrideprotected void afterExecute(Runnable r, Throwable t) { // 计数加一 submittedCount.decrementAndGet(); // 如果没有报错,那么此处尝试关闭多余的线程 // 抛出错误的方式停止线程 if (t == null) { stopCurrentThreadIfNeeded(); }}/** * 判断是否需要关闭线程 **/protected void stopCurrentThreadIfNeeded() { // 如果线程存活时间超过了 delay 值,那么此处会抛出一个错误,使线程停止 if (currentThreadShouldBeStopped()) { long lastTime = lastTimeThreadKilledItself.longValue(); if (lastTime + threadRenewalDelay < System.currentTimeMillis()) { if (lastTimeThreadKilledItself.compareAndSet(lastTime, System.currentTimeMillis() + 1)) { final String msg = sm.getString( "threadPoolExecutor.threadStoppedToAvoidPotentialLeak", Thread.currentThread().getName()); throw new StopPooledThreadException(msg); } } }}protected boolean currentThreadShouldBeStopped() { // 如果当前线程并非工作线程,或者不存在线程存活 delay 值,那么此处返回 false // 如果当前线程是工作线程,且设置了 delay 时间,且当前线程的存活时间已经超过了设置值,那么此处返回 true if (threadRenewalDelay >= 0 && Thread.currentThread() instanceof TaskThread) { TaskThread currentTaskThread = (TaskThread) Thread.currentThread(); if (currentTaskThread.getCreationTime() < this.lastContextStoppedTime.longValue()) { return true; } } return false;}
优雅关闭
public void contextStopping() { this.lastContextStoppedTime.set(System.currentTimeMillis()); // 保存 corePoolSize 的值 int savedCorePoolSize = this.getCorePoolSize(); // 获取队列 TaskQueue taskQueue = getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null; // 将 taskQueue 中的 forcedRemainingCapacity 置为零 // 不太清楚 forcedRemainingCapacity 有什么作用 if (taskQueue != null) { taskQueue.setForcedRemainingCapacity(Integer.valueOf(0)); } // corePoolSize 置为零 this.setCorePoolSize(0); // 将 taskQueue 中的 forcedRemainingCapacity 置空 if (taskQueue != null) { taskQueue.setForcedRemainingCapacity(null); } // 恢复 corePoolSize this.setCorePoolSize(savedCorePoolSize); }
TaskQueue
TaskQueue 是 Tomcat 中对任务队列的增强和封装:
public class TaskQueue extends LinkedBlockingQueue<Runnable> { // 序列编码 private static final long serialVersionUID = 1L; // 字符串管理类 protected static final StringManager sm = StringManager .getManager("org.apache.tomcat.util.threads.res"); // 任务队列关联的线程池 private transient volatile ThreadPoolExecutor parent = null; // 不太清楚是做什么用的一个容量计数 private Integer forcedRemainingCapacity = null; // 其它方法暂时忽略 // ...
加入、获取任务的相关方法
// 不带超时的添加任务方法public boolean force(Runnable o) { // 关联线程池不可为空 if (parent == null || parent.isShutdown()) throw new RejectedExecutionException(sm.getString("taskQueue.notRunning")); // 调用 LinkedBlockingQueue 的 offer(...) 方法添加任务 return super.offer(o);}// 带超时的添加任务方法public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if (parent == null || parent.isShutdown()) throw new RejectedExecutionException(sm.getString("taskQueue.notRunning")); return super.offer(o,timeout,unit);}// 本质上是调用父类的 offer(...) 方法// 非阻塞添加任务的方法@Overridepublic boolean offer(Runnable o) { if (parent == null) return super.offer(o); if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); if (parent.getSubmittedCount() <= (parent.getPoolSize())) return super.offer(o); // 这种情况下线程池可以直接消费任务,无需放入任务队列等待 if (parent.getPoolSize() < parent.getMaximumPoolSize()) return false; return super.offer(o);}// 带超时的阻塞方式获取任务@Overridepublic Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { // 获取一个任务,如果获取到的为空,则停止当前线程 // 能获取到就返回任务 Runnable runnable = super.poll(timeout, unit); if (runnable == null && parent != null) { parent.stopCurrentThreadIfNeeded(); } return runnable;}// 阻塞方式获取任务@Overridepublic Runnable take() throws InterruptedException { // 线程池存在的情况下,会使用限时的方式去获取任务 if (parent != null && parent.currentThreadShouldBeStopped()) { return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); } return super.take();}
TaskQueue 在设计的时候都考虑了关联线程池存不存在的情况,笔者认为这应该是 Tomcat 的作者考虑到开发者可能会需要复用 TaskQueue 到其它的场景中。
一点体会
Tomcat 的线程池其实封装不厚重,只是对 jdk 线程池做了简单优化:
- 任务执行失败时不会直接抛出错误,而是装回队列里再次尝试执行;
- 当线程池没有达到最大执行线程的时候,会优先开线程再使用任务队列;
- 扩展计数用于追踪任务的执行情况;
- 将线程池融入 Catalina 的生命周期组件中。
本文仅为个人的学习笔记,可能存在错误或者表述不清的地方,有缘补充