Tomcat 版本
- Spring boot 内嵌 Tomcat-embed-core – 9.0.30
- 等价于独立部署的 Tomcat 9.0.30
StandardThreadExecutor
StandardThreadExecutor 是 Catalina 结构中的一部分,是 Tomcat 生命周期中的池化线程资源的封装。类总览:
public class StandardThreadExecutor extends LifecycleMBeanBase
implements Executor, ResizableExecutor {
// 统一的 String 的管理类,用来防止编码等一系列的问题
protected static final StringManager sm =
StringManager.getManager(Constants.Package);
// 创建出来的线程的优先级
protected int threadPriority = Thread.NORM_PRIORITY;
// 线程是否是守护线程,默认为是
protected boolean daemon = true;
// 线程名称
protected String namePrefix = "tomcat-exec-";
// 默认的最大线程数量
protected int maxThreads = 200;
// 默认的最小线程数量
protected int minSpareThreads = 25;
// 存在时间
protected int maxIdleTime = 60000;
// 真实工作的 ThreadPoolExecutor
// 本质是 StandardThreadExecutor 只是 ThreadPoolExecutor 的装饰器
// 此处的对象类型是 org.apache.tomcat.util.threads.ThreadPoolExecutor
protected ThreadPoolExecutor executor = null;
// 线程池名称
protected String name;
// 是否开启线程池最小线程数量,如果此处为 false 的话,minSpareThreads 就没有意义
protected boolean prestartminSpareThreads = false;
// 默认的任务队列长度
protected int maxQueueSize = Integer.MAX_VALUE;
// 重建线程的时间间隔
protected long threadRenewalDelay =
org.apache.tomcat.util.threads.Constants.DEFAULT_THREAD_RENEWAL_DELAY;
// 任务队列
private TaskQueue taskqueue = null;
// 其它方法暂时忽略
}
生命周期
/**
* 初始化线程池
*/
@Override
protected void initInternal() throws LifecycleException {super.initInternal();
}
/**
* 开始线程池
*/
@Override
protected void startInternal() throws LifecycleException {
// 任务队列
taskqueue = new TaskQueue(maxQueueSize);
// 线程工厂
TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
// 初始化线程池
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
// 重建线程的时间间隔
executor.setThreadRenewalDelay(threadRenewalDelay);
// 设置线程池最小线程数量
if (prestartminSpareThreads) {executor.prestartAllCoreThreads();
}
// 线程池任务队列的 parent
taskqueue.setParent(executor);
// 设置组件的生命周期状态
setState(LifecycleState.STARTING);
}
/**
* 关闭线程池
*/
@Override
protected void stopInternal() throws LifecycleException {setState(LifecycleState.STOPPING);
if (executor != null) {executor.shutdownNow();
}
executor = null;
taskqueue = null;
}
/**
* 清除线程池
*/
@Override
protected void destroyInternal() throws LifecycleException {super.destroyInternal();
}
/**
* 关闭线程池
*/
public void contextStopping() {if (executor != null) {executor.contextStopping();
}
}
任务执行
/**
* 加入一个带超时的任务
**/
@Override
public void execute(Runnable command, long timeout, TimeUnit unit) {
// 调用 executor 对象去执行
// 如果 executor 对象是空的,则抛出异常
if (executor != null) {executor.execute(command,timeout,unit);
} else {throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted"));
}
}
/**
* 加入一个不带超时的任务
**/
@Override
public void execute(Runnable command) {
// 逻辑基本同上
if (executor != null) {
try {executor.execute(command);
} catch (RejectedExecutionException rx) {
// 此处会再尝试将任务加入一次等待队列中
// TaskQueue.force(...) 方法底层会调用 Queue.offer(...) 方法
// 如果仍然失败,会抛出异常
if (!((TaskQueue) executor.getQueue()).force(command)) {throw new RejectedExecutionException(sm.getString("standardThreadExecutor.queueFull"));
}
}
} else {throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted"));
}
}
其它方法基本都是 get / set 方法,可以忽略。
ThreadPoolExecutor
org.apache.tomcat.util.threads.ThreadPoolExecutor 是 Tomcat 中的线程池类。
类和变量总览:
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
// 统一的 String 的管理类,用来防止编码等一系列的问题
protected static final StringManager sm = StringManager
.getManager("org.apache.tomcat.util.threads.res");
// 任务提交数量的计数
private final AtomicInteger submittedCount = new AtomicInteger(0);
private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
// 线程自杀计数
private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);
// 重建线程的时间间隔
private long threadRenewalDelay = Constants.DEFAULT_THREAD_RENEWAL_DELAY;
// 其它方法暂时忽略
// ...
构造器
/**
* 基本是沿用了 java.util.concurrent.ThreadPoolExecutor 的构造方法
**/
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
// 调用 juc 中的 ThreadPoolExecutor 进行线程池的初始化
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
// 用于初始化常驻线程
prestartAllCoreThreads();}
// 下列的构造方法都差不多
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
prestartAllCoreThreads();}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());
prestartAllCoreThreads();}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new RejectHandler());
prestartAllCoreThreads();}
拒绝策略
Tomcat 中的拒绝策略为 ThreadPoolExecutor 的一个内部类 RejectHandler,直接抛出错误:
// ThreadPoolExecutor.class
private static class RejectHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r,
java.util.concurrent.ThreadPoolExecutor executor) {
// 直接抛出错误
throw new RejectedExecutionException();}
}
任务执行
Tomcat 的 ThreadPoolExecutor 中执行任务复写了父接口中的 execute(…) 方法:
@Override
public void execute(Runnable command) {execute(command,0,TimeUnit.MILLISECONDS);
}
public void execute(Runnable command, long timeout, TimeUnit unit) {
// 提交计数加一
submittedCount.incrementAndGet();
try {// 此处调用 java.util.concurrent.ThreadPoolExecutor 中的 execute(...) 方法
super.execute(command);
} catch (RejectedExecutionException rx) {
// 如果调用父类中的方法执行错误,会尝试将任务再一次放入到等待队列里
if (super.getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue)super.getQueue();
try {
// 此处尝试放入等待队列
// 如果也失败了,就回滚提交计数,并抛出异常
if (!queue.force(command, timeout, unit)) {submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
} catch (InterruptedException x) {submittedCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} else {submittedCount.decrementAndGet();
throw rx;
}
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
// 计数加一
submittedCount.decrementAndGet();
// 如果没有报错,那么此处尝试关闭多余的线程
// 抛出错误的方式停止线程
if (t == null) {stopCurrentThreadIfNeeded();
}
}
/**
* 判断是否需要关闭线程
**/
protected void stopCurrentThreadIfNeeded() {
// 如果线程存活时间超过了 delay 值,那么此处会抛出一个错误,使线程停止
if (currentThreadShouldBeStopped()) {long lastTime = lastTimeThreadKilledItself.longValue();
if (lastTime + threadRenewalDelay < System.currentTimeMillis()) {
if (lastTimeThreadKilledItself.compareAndSet(lastTime,
System.currentTimeMillis() + 1)) {
final String msg = sm.getString( "threadPoolExecutor.threadStoppedToAvoidPotentialLeak",
Thread.currentThread().getName());
throw new StopPooledThreadException(msg);
}
}
}
}
protected boolean currentThreadShouldBeStopped() {
// 如果当前线程并非工作线程,或者不存在线程存活 delay 值,那么此处返回 false
// 如果当前线程是工作线程,且设置了 delay 时间,且当前线程的存活时间已经超过了设置值,那么此处返回 true
if (threadRenewalDelay >= 0
&& Thread.currentThread() instanceof TaskThread) {TaskThread currentTaskThread = (TaskThread) Thread.currentThread();
if (currentTaskThread.getCreationTime() <
this.lastContextStoppedTime.longValue()) {return true;}
}
return false;
}
优雅关闭
public void contextStopping() {this.lastContextStoppedTime.set(System.currentTimeMillis());
// 保存 corePoolSize 的值
int savedCorePoolSize = this.getCorePoolSize();
// 获取队列
TaskQueue taskQueue =
getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null;
// 将 taskQueue 中的 forcedRemainingCapacity 置为零
// 不太清楚 forcedRemainingCapacity 有什么作用
if (taskQueue != null) {taskQueue.setForcedRemainingCapacity(Integer.valueOf(0));
}
// corePoolSize 置为零
this.setCorePoolSize(0);
// 将 taskQueue 中的 forcedRemainingCapacity 置空
if (taskQueue != null) {taskQueue.setForcedRemainingCapacity(null);
}
// 恢复 corePoolSize
this.setCorePoolSize(savedCorePoolSize);
}
TaskQueue
TaskQueue 是 Tomcat 中对任务队列的增强和封装:
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
// 序列编码
private static final long serialVersionUID = 1L;
// 字符串管理类
protected static final StringManager sm = StringManager
.getManager("org.apache.tomcat.util.threads.res");
// 任务队列关联的线程池
private transient volatile ThreadPoolExecutor parent = null;
// 不太清楚是做什么用的一个容量计数
private Integer forcedRemainingCapacity = null;
// 其它方法暂时忽略
// ...
加入、获取任务的相关方法
// 不带超时的添加任务方法
public boolean force(Runnable o) {
// 关联线程池不可为空
if (parent == null || parent.isShutdown())
throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
// 调用 LinkedBlockingQueue 的 offer(...) 方法添加任务
return super.offer(o);
}
// 带超时的添加任务方法
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if (parent == null || parent.isShutdown())
throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
return super.offer(o,timeout,unit);
}
// 本质上是调用父类的 offer(...) 方法
// 非阻塞添加任务的方法
@Override
public boolean offer(Runnable o) {if (parent == null)
return super.offer(o);
if (parent.getPoolSize() == parent.getMaximumPoolSize())
return super.offer(o);
if (parent.getSubmittedCount() <= (parent.getPoolSize()))
return super.offer(o);
// 这种情况下线程池可以直接消费任务,无需放入任务队列等待
if (parent.getPoolSize() < parent.getMaximumPoolSize())
return false;
return super.offer(o);
}
// 带超时的阻塞方式获取任务
@Override
public Runnable poll(long timeout, TimeUnit unit)
throws InterruptedException {
// 获取一个任务,如果获取到的为空,则停止当前线程
// 能获取到就返回任务
Runnable runnable = super.poll(timeout, unit);
if (runnable == null && parent != null) {parent.stopCurrentThreadIfNeeded();
}
return runnable;
}
// 阻塞方式获取任务
@Override
public Runnable take() throws InterruptedException {
// 线程池存在的情况下,会使用限时的方式去获取任务
if (parent != null
&& parent.currentThreadShouldBeStopped()) {return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
}
return super.take();}
TaskQueue 在设计的时候都考虑了关联线程池存不存在的情况,笔者认为这应该是 Tomcat 的作者考虑到开发者可能会需要复用 TaskQueue 到其它的场景中。
一点体会
Tomcat 的线程池其实封装不厚重,只是对 jdk 线程池做了简单优化:
- 任务执行失败时不会直接抛出错误,而是装回队列里再次尝试执行;
- 当线程池没有达到最大执行线程的时候,会优先开线程再使用任务队列;
- 扩展计数用于追踪任务的执行情况;
- 将线程池融入 Catalina 的生命周期组件中。
本文仅为个人的学习笔记,可能存在错误或者表述不清的地方,有缘补充