本篇文章主要讲 tomcat” 粗暴 stop” 的流程,以及了解下解决方案里的几个关键的信息,详细解决步骤会在后面的文章里。
整体架构
我们先来看下 Tomcat 的整体架构,大致了解下 Tomcat 组件之间的关系:
图片来源:《四张图带你了解 Tomcat 架构》
图片来源:《Tomcat 系统架构 (上)》
图片来源:《Tomcat 系统架构(上):连接器是如何设计的?》
关闭
SpringBoot-Tomcat 在服务器关闭时,并不会等请求都处理完才关闭,而是立即关闭,我们通过代码来看下 SpringBoot-Tomcat 的关闭流程。
先从 SpringContext 开始:
public abstract class AbstractApplicationContext extends DefaultResourceLoader
implements ConfigurableApplicationContext {public void registerShutdownHook() {if (this.shutdownHook == null) {
// No shutdown hook registered yet.
this.shutdownHook = new Thread() {
@Override
public void run() {synchronized (startupShutdownMonitor) {doClose();
}
}
};
Runtime.getRuntime().addShutdownHook(this.shutdownHook);
}
}
protected void doClose() {
// Check whether an actual close attempt is necessary...
if (this.active.get() && this.closed.compareAndSet(false, true)) {if (logger.isDebugEnabled()) {logger.debug("Closing" + this);
}
LiveBeansView.unregisterApplicationContext(this);
try {
// Publish shutdown event.
publishEvent(new ContextClosedEvent(this)); // 发布事件
}
catch (Throwable ex) {logger.warn("Exception thrown from ApplicationListener handling ContextClosedEvent", ex);
}
// Stop all Lifecycle beans, to avoid delays during individual destruction.
if (this.lifecycleProcessor != null) {
try {this.lifecycleProcessor.onClose();
}
catch (Throwable ex) {logger.warn("Exception thrown from LifecycleProcessor on context close", ex);
}
}
// Destroy all cached singletons in the context's BeanFactory.
destroyBeans();
// Close the state of this context itself.
closeBeanFactory();
// Let subclasses do some final clean-up if they wish...
onClose(); // 执行关闭
// Reset local application listeners to pre-refresh state.
if (this.earlyApplicationListeners != null) {this.applicationListeners.clear();
this.applicationListeners.addAll(this.earlyApplicationListeners);
}
// Switch to inactive.
this.active.set(false);
}
}
}
注意,发布关闭事件 publishEvent(new ContextClosedEvent(this))
是在关闭资源 onClose
之前,并且 Spring 事件处理默认是同步处理的,这里很重要,后面讲到的方案里会用到这个。
public class ServletWebServerApplicationContext extends GenericWebApplicationContext
implements ConfigurableWebServerApplicationContext {private void stopAndReleaseWebServer() {
WebServer webServer = this.webServer;
if (webServer != null) {
try {webServer.stop();
this.webServer = null;
}
catch (Exception ex) {throw new IllegalStateException(ex);
}
}
}
protected void onClose() {super.onClose();
stopAndReleaseWebServer();}
}
从这里开始,进入到 Web 容器的关闭了。
public class TomcatWebServer implements WebServer {public void stop() throws WebServerException {synchronized (this.monitor) {
boolean wasStarted = this.started;
try {
this.started = false;
try {stopTomcat();
this.tomcat.destroy();}
catch (LifecycleException ex) {// swallow and continue}
}
catch (Exception ex) {throw new WebServerException("Unable to stop embedded Tomcat", ex);
}
finally {if (wasStarted) {containerCounter.decrementAndGet();
}
}
}
}
private void stopTomcat() throws LifecycleException {if (Thread.currentThread()
.getContextClassLoader() instanceof TomcatEmbeddedWebappClassLoader) {Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
}
this.tomcat.stop();}
}
进入到 tomcat.stop()
看下。
public class Tomcat {public void stop() throws LifecycleException {getServer();
server.stop();}
public Server getServer() {if (server != null) {return server;}
System.setProperty("catalina.useNaming", "false");
server = new StandardServer();
initBaseDir();
// Set configuration source
ConfigFileLoader.setSource(new CatalinaBaseConfigurationSource(new File(basedir), null));
server.setPort(-1);
Service service = new StandardService();
service.setName("Tomcat");
server.addService(service);
return server;
}
}
再进入StandardService.stop
。
public abstract class LifecycleBase implements Lifecycle {public final synchronized void stop() throws LifecycleException {if (LifecycleState.STOPPING_PREP.equals(state) || LifecycleState.STOPPING.equals(state) ||
LifecycleState.STOPPED.equals(state)) {if (log.isDebugEnabled()) {Exception e = new LifecycleException();
log.debug(sm.getString("lifecycleBase.alreadyStopped", toString()), e);
} else if (log.isInfoEnabled()) {log.info(sm.getString("lifecycleBase.alreadyStopped", toString()));
}
return;
}
if (state.equals(LifecycleState.NEW)) {
state = LifecycleState.STOPPED;
return;
}
if (!state.equals(LifecycleState.STARTED) && !state.equals(LifecycleState.FAILED)) {invalidTransition(Lifecycle.BEFORE_STOP_EVENT);
}
try {if (state.equals(LifecycleState.FAILED)) {
// Don't transition to STOPPING_PREP as that would briefly mark the
// component as available but do ensure the BEFORE_STOP_EVENT is
// fired
fireLifecycleEvent(BEFORE_STOP_EVENT, null);
} else {setStateInternal(LifecycleState.STOPPING_PREP, null, false);
}
stopInternal();
// Shouldn't be necessary but acts as a check that sub-classes are
// doing what they are supposed to.
if (!state.equals(LifecycleState.STOPPING) && !state.equals(LifecycleState.FAILED)) {invalidTransition(Lifecycle.AFTER_STOP_EVENT);
}
setStateInternal(LifecycleState.STOPPED, null, false);
} catch (Throwable t) {handleSubClassException(t, "lifecycleBase.stopFail", toString());
} finally {if (this instanceof Lifecycle.SingleUse) {
// Complete stop process first
setStateInternal(LifecycleState.STOPPED, null, false);
destroy();}
}
}
}
public final class StandardServer extends LifecycleMBeanBase implements Server {protected void stopInternal() throws LifecycleException {setState(LifecycleState.STOPPING);
if (monitorFuture != null) {monitorFuture.cancel(true);
monitorFuture = null;
}
if (periodicLifecycleEventFuture != null) {periodicLifecycleEventFuture.cancel(false);
periodicLifecycleEventFuture = null;
}
fireLifecycleEvent(CONFIGURE_STOP_EVENT, null);
// Stop our defined Services
for (int i = 0; i < services.length; i++) {services[i].stop();}
globalNamingResources.stop();
stopAwait();}
}
stop
方法在父类 LifecycleBase
里(Tomcat 的组件基本上都继承了 LifecycleBase
),StandardServer
实现子方法 stopInternal
,进入到StandardService.stop
里。这里要先说下几个对象的关系:
- 一个
Tomcat
中只有一个Server
。 - 一个
Server
可以包含多个Service
,比如一个 Tomcat(Server) 下部署多个项目,那就是多个Service
。 - 一个
Service
只有一个Container
,但是可以有多个Connector
,这是因为一个服务可以有多个连接,如同时提供Http
和Https
链接,也可以提供向相同协议不同端口的连接。 - 一个
Connector
下只有一个ProtocolHandler
。由于 I/O 模型和应用层协议可以自由组合,比如 NIO + HTTP 或者 NIO.2 + AJP。Tomcat 的设计者将网络通信和应用层协议解析放在一起考虑,设计了一个叫 ProtocolHandler 的接口来封装这两种变化点。各种协议和通信模型的组合有相应的具体实现类。比如:Http11NioProtocol 和 AjpNioProtocol。Connector
用ProtocolHandler
来处理网络连接和应用层协议,包含了 2 个重要部件:Endpoint 和 Processor。 - 一个
ProtocolHandler
下只有一个EndPoint
,Endpoint 是通信端点,即通信监听的接口,是具体的 Socket 接收和发送处理器,是对传输层的抽象,因此 Endpoint 是用来实现 TCP/IP 协议的。
public class StandardService extends LifecycleMBeanBase implements Service {protected void stopInternal() throws LifecycleException {
// Pause connectors first
synchronized (connectorsLock) {for (Connector connector: connectors) {connector.pause();
// Close server socket if bound on start
// Note: test is in AbstractEndpoint
connector.getProtocolHandler().closeServerSocketGraceful();
}
}
if(log.isInfoEnabled())
log.info(sm.getString("standardService.stop.name", this.name));
setState(LifecycleState.STOPPING);
// Stop our defined Container second
if (engine != null) {synchronized (engine) {engine.stop();
}
}
// Now stop the connectors
synchronized (connectorsLock) {for (Connector connector: connectors) {
if (!LifecycleState.STARTED.equals(connector.getState())) {
// Connectors only need stopping if they are currently
// started. They may have failed to start or may have been
// stopped (e.g. via a JMX call)
continue;
}
connector.stop();}
}
// If the Server failed to start, the mapperListener won't have been
// started
if (mapperListener.getState() != LifecycleState.INITIALIZED) {mapperListener.stop();
}
synchronized (executors) {for (Executor executor: executors) {executor.stop();
}
}
}
}
这里跟我们上面的问题有关的有这两个:
-
Connetor.pause
停止接收新请求。 -
Connector.stop
这里会关闭线程池。
public class Connector extends LifecycleMBeanBase {
@Override
protected void stopInternal() throws LifecycleException {setState(LifecycleState.STOPPING);
try {if (protocolHandler != null) {protocolHandler.stop();
}
} catch (Exception e) {
throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerStopFailed"), e);
}
}
}
public abstract class AbstractProtocol<S> implements ProtocolHandler,
MBeanRegistration {public void stop() throws Exception {if(getLog().isInfoEnabled()) {getLog().info(sm.getString("abstractProtocolHandler.stop", getName()));
logPortOffset();}
if (monitorFuture != null) {monitorFuture.cancel(true);
monitorFuture = null;
}
stopAsyncTimeout();
// Timeout any pending async request
for (Processor processor : waitingProcessors) {processor.timeoutAsync(-1);
}
endpoint.stop();}
}
public abstract class AbstractEndpoint<S,U> {public final void stop() throws Exception {stopInternal();
if (bindState == BindState.BOUND_ON_START || bindState == BindState.SOCKET_CLOSED_ON_STOP) {unbind();
bindState = BindState.UNBOUND;
}
}
public abstract void stopInternal() throws Exception; // 子类实现,子类 AprEndpoint 和 NioEndpoint(Springboot 默认是 NioEndpoint)实现方法里会调用 shutdownExecutor 方法
public void shutdownExecutor() {
Executor executor = this.executor;
if (executor != null && internalExecutor) {
this.executor = null;
if (executor instanceof ThreadPoolExecutor) {
//this is our internal one, so we need to shut it down
ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
tpe.shutdownNow();
long timeout = getExecutorTerminationTimeoutMillis();
if (timeout > 0) {
try {tpe.awaitTermination(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {// Ignore}
if (tpe.isTerminating()) {getLog().warn(sm.getString("endpoint.warn.executorShutdown", getName()));
}
}
TaskQueue queue = (TaskQueue) tpe.getQueue();
queue.setParent(null);
}
}
}
/**
* Time to wait for the internal executor (if used) to terminate when the
* endpoint is stopped in milliseconds. Defaults to 5000 (5 seconds).
*/
private long executorTerminationTimeoutMillis = 5000;
public long getExecutorTerminationTimeoutMillis() {return executorTerminationTimeoutMillis;}
}
以上可以看到,在 connector.stop -> protocolHandler.stop -> endpoint.stop -> NioEndpoint.stopInternal -> endpoint.shutdownExecutor
里,线程池直接粗暴的 shutdownNow
来停止任务。java 线程池的 shutdownNow
会尝试 interrupt 线程池中正在执行的线程,等待执行的线程也会被取消,但是并不能保证一定能成功的 interrupt 线程池中的线程,所有 tomcat 通过 awaitTermination
等待 5
秒,尽量保证停止所有的任务。
从上面的分析可以看到,原因主要是线程池被 Tomcat 暴力 shutdown。在第一章里讲到我们处理这个问题,需要做两步:
- 暂停接收新请求。
- 等待所有任务都执行完成再关闭线程池。
要完成这两步的关键是:
- 获取到
Connector
,Connector.pause
方法可以暂停接收请求。 - 获取到
线程池
。
Connector
在 Tomcat 架构中,Connector 主要负责处理与客户端的通信。Connector 的实例用于监听端口,接受来自客户端的请求并将请求转交给 Engine 处理,同时将来自 Engine 的答复返回给客户端:
- 监听网络端口
- 接受网络连接请求
- 读取请求网络字节流
- 根据具体应用层协议 (HTTP/AJP) 解析字节流,生成统一的 Tomcat Request 对象
- 将 Tomcat Request 对象转成标准的 ServletRequest
- 调用 Servlet 容器,得到 ServletResponse
- 将 ServletResponse 转成 Tomcat Response 对象
- 将 Tomcat Response 对象转成网络字节流
- 将响应字节流写回服务器。
现在我们看下 SpringBoot 生成 Tomcat 的步骤。
@Configuration
class ServletWebServerFactoryConfiguration {
@Configuration
@ConditionalOnClass({Servlet.class, Tomcat.class, UpgradeProtocol.class})
@ConditionalOnMissingBean(value = ServletWebServerFactory.class, search = SearchStrategy.CURRENT)
public static class EmbeddedTomcat {
@Bean
public TomcatServletWebServerFactory tomcatServletWebServerFactory() {return new TomcatServletWebServerFactory();
}
}
/**
* Nested configuration if Jetty is being used.
*/
@Configuration
@ConditionalOnClass({ Servlet.class, Server.class, Loader.class,
WebAppContext.class })
@ConditionalOnMissingBean(value = ServletWebServerFactory.class, search = SearchStrategy.CURRENT)
public static class EmbeddedJetty {
@Bean
public JettyServletWebServerFactory JettyServletWebServerFactory() {return new JettyServletWebServerFactory();
}
}
/**
* Nested configuration if Undertow is being used.
*/
@Configuration
@ConditionalOnClass({Servlet.class, Undertow.class, SslClientAuthMode.class})
@ConditionalOnMissingBean(value = ServletWebServerFactory.class, search = SearchStrategy.CURRENT)
public static class EmbeddedUndertow {
@Bean
public UndertowServletWebServerFactory undertowServletWebServerFactory() {return new UndertowServletWebServerFactory();
}
}
}
可以看到,生成的是 TomcatServletWebServerFactory
,TomcatServletWebServerFactory
有个 createWebServer
方法来生成 Tomcat,那这个方法在什么时候被调用?
public class ServletWebServerApplicationContext extends GenericWebApplicationContext
implements ConfigurableWebServerApplicationContext {
@Override
protected void onRefresh() {super.onRefresh();
try {createWebServer();
}
catch (Throwable ex) {throw new ApplicationContextException("Unable to start web server", ex);
}
}
private void createWebServer() {
WebServer webServer = this.webServer;
ServletContext servletContext = getServletContext();
if (webServer == null && servletContext == null) {ServletWebServerFactory factory = getWebServerFactory();
this.webServer = factory.getWebServer(getSelfInitializer());
}
else if (servletContext != null) {
try {getSelfInitializer().onStartup(servletContext);
}
catch (ServletException ex) {
throw new ApplicationContextException("Cannot initialize servlet context",
ex);
}
}
initPropertySources();}
}
可以看到 springboot 的容器类 ServletWebServerApplicationContext
在系统启动时 (onRefresh
),createWebServer
方法会调用 - ServletWebServerFactory.getWebServer`。
public class TomcatServletWebServerFactory extends AbstractServletWebServerFactory
implements ConfigurableTomcatWebServerFactory, ResourceLoaderAware {
@Override
public WebServer getWebServer(ServletContextInitializer... initializers) {Tomcat tomcat = new Tomcat();
File baseDir = (this.baseDirectory != null) ? this.baseDirectory
: createTempDir("tomcat");
tomcat.setBaseDir(baseDir.getAbsolutePath());
Connector connector = new Connector(this.protocol);
tomcat.getService().addConnector(connector);
customizeConnector(connector);
tomcat.setConnector(connector);
tomcat.getHost().setAutoDeploy(false);
configureEngine(tomcat.getEngine());
for (Connector additionalConnector : this.additionalTomcatConnectors) {tomcat.getService().addConnector(additionalConnector);
}
prepareContext(tomcat.getHost(), initializers);
return getTomcatWebServer(tomcat);
}
}
进入 getWebServer
看到是在构建 Tomcat 对象
,其中customizeConnector(connector)
这一步有对 Connector
的处理。
public class TomcatServletWebServerFactory extends AbstractServletWebServerFactory
implements ConfigurableTomcatWebServerFactory, ResourceLoaderAware {protected void customizeConnector(Connector connector) {int port = (getPort() >= 0) ? getPort() : 0;
connector.setPort(port);
if (StringUtils.hasText(this.getServerHeader())) {connector.setAttribute("server", this.getServerHeader());
}
if (connector.getProtocolHandler() instanceof AbstractProtocol) {customizeProtocol((AbstractProtocol<?>) connector.getProtocolHandler());
}
if (getUriEncoding() != null) {connector.setURIEncoding(getUriEncoding().name());
}
// Don't bind to the socket prematurely if ApplicationContext is slow to start
connector.setProperty("bindOnInit", "false");
if (getSsl() != null && getSsl().isEnabled()) {customizeSsl(connector);
}
TomcatConnectorCustomizer compression = new CompressionConnectorCustomizer(getCompression());
compression.customize(connector);
for (TomcatConnectorCustomizer customizer : this.tomcatConnectorCustomizers) {customizer.customize(connector);
}
}
public void addContextCustomizers(TomcatContextCustomizer... tomcatContextCustomizers) {
Assert.notNull(tomcatContextCustomizers,
"TomcatContextCustomizers must not be null");
this.tomcatContextCustomizers.addAll(Arrays.asList(tomcatContextCustomizers));
}
}
通过以上代码,我们了解到:可以自定义 TomcatContextCustomizer
,通过addContextCustomizers
方法添加;而 customizeConnector
方法里,会遍历所有的 TomcatContextCustomizer
对象,通过 customizer.customize(connector)
方法来传入 Connector
对象。
线程池
在上一步已经可以获取到 Connector
的基础上,要获取到线程池已经很简单了:connector -> protocolHandler -> endpoint -> getExecutor()
。
附加:
Spring 事件处理
public abstract class AbstractApplicationContext extends DefaultResourceLoader
implements ConfigurableApplicationContext {protected void publishEvent(Object event, @Nullable ResolvableType eventType) {Assert.notNull(event, "Event must not be null");
// Decorate event as an ApplicationEvent if necessary
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {applicationEvent = (ApplicationEvent) event;
}
else {applicationEvent = new PayloadApplicationEvent<>(this, event);
if (eventType == null) {eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType();}
}
// Multicast right now if possible - or lazily once the multicaster is initialized
if (this.earlyApplicationEvents != null) {this.earlyApplicationEvents.add(applicationEvent);
}
else {getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
// Publish event via parent context as well...
if (this.parent != null) {if (this.parent instanceof AbstractApplicationContext) {((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {this.parent.publishEvent(event);
}
}
}
}
可以看到重点在 getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
这一步,其中 ApplicationEventMulticaster
默认是SimpleApplicationEventMulticaster
。
public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {Executor executor = getTaskExecutor();
if (executor != null) {executor.execute(() -> invokeListener(listener, event));
}
else {invokeListener(listener, event);
}
}
}
}
首先获取所有监听对应事件的 Listener
(这一步不继续深入,逻辑也比较简单),这里executor
默认没有值,所以进入 invokeListener
方法。
public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {doInvokeListener(listener, event);
}
catch (Throwable err) {errorHandler.handleError(err);
}
}
else {doInvokeListener(listener, event);
}
}
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {listener.onApplicationEvent(event);
}
catch (ClassCastException ex) {String msg = ex.getMessage();
if (msg == null || matchesClassCastMessage(msg, event.getClass())) {
// Possibly a lambda-defined listener which we could not resolve the generic event type for
// -> let's suppress the exception and just log a debug message.
Log logger = LogFactory.getLog(getClass());
if (logger.isDebugEnabled()) {logger.debug("Non-matching event type for listener:" + listener, ex);
}
}
else {throw ex;}
}
}
}
可以看到 invokeListener
就是直接调用 listener.onApplicationEvent
方法。
所以在默认情况下(executor
为空),spring 的事情处理是同步的。