本篇文章主要讲tomcat"粗暴stop"的流程,以及了解下解决方案里的几个关键的信息,详细解决步骤会在后面的文章里。
整体架构
我们先来看下Tomcat的整体架构,大致了解下Tomcat组件之间的关系:
图片来源:《四张图带你了解Tomcat架构》
图片来源:《Tomcat系统架构(上)》
图片来源:《Tomcat系统架构(上): 连接器是如何设计的?》
关闭
SpringBoot-Tomcat在服务器关闭时,并不会等请求都处理完才关闭,而是立即关闭,我们通过代码来看下SpringBoot-Tomcat的关闭流程。
先从SpringContext开始:
public abstract class AbstractApplicationContext extends DefaultResourceLoader implements ConfigurableApplicationContext { public void registerShutdownHook() { if (this.shutdownHook == null) { // No shutdown hook registered yet. this.shutdownHook = new Thread() { @Override public void run() { synchronized (startupShutdownMonitor) { doClose(); } } }; Runtime.getRuntime().addShutdownHook(this.shutdownHook); } } protected void doClose() { // Check whether an actual close attempt is necessary... if (this.active.get() && this.closed.compareAndSet(false, true)) { if (logger.isDebugEnabled()) { logger.debug("Closing " + this); } LiveBeansView.unregisterApplicationContext(this); try { // Publish shutdown event. publishEvent(new ContextClosedEvent(this)); // 发布事件 } catch (Throwable ex) { logger.warn("Exception thrown from ApplicationListener handling ContextClosedEvent", ex); } // Stop all Lifecycle beans, to avoid delays during individual destruction. if (this.lifecycleProcessor != null) { try { this.lifecycleProcessor.onClose(); } catch (Throwable ex) { logger.warn("Exception thrown from LifecycleProcessor on context close", ex); } } // Destroy all cached singletons in the context's BeanFactory. destroyBeans(); // Close the state of this context itself. closeBeanFactory(); // Let subclasses do some final clean-up if they wish... onClose(); // 执行关闭 // Reset local application listeners to pre-refresh state. if (this.earlyApplicationListeners != null) { this.applicationListeners.clear(); this.applicationListeners.addAll(this.earlyApplicationListeners); } // Switch to inactive. this.active.set(false); } } }
注意,发布关闭事件publishEvent(new ContextClosedEvent(this))
是在关闭资源onClose
之前,并且Spring事件处理默认是同步处理的,这里很重要,后面讲到的方案里会用到这个。
public class ServletWebServerApplicationContext extends GenericWebApplicationContext implements ConfigurableWebServerApplicationContext { private void stopAndReleaseWebServer() { WebServer webServer = this.webServer; if (webServer != null) { try { webServer.stop(); this.webServer = null; } catch (Exception ex) { throw new IllegalStateException(ex); } } } protected void onClose() { super.onClose(); stopAndReleaseWebServer(); } }
从这里开始,进入到Web容器的关闭了。
public class TomcatWebServer implements WebServer { public void stop() throws WebServerException { synchronized (this.monitor) { boolean wasStarted = this.started; try { this.started = false; try { stopTomcat(); this.tomcat.destroy(); } catch (LifecycleException ex) { // swallow and continue } } catch (Exception ex) { throw new WebServerException("Unable to stop embedded Tomcat", ex); } finally { if (wasStarted) { containerCounter.decrementAndGet(); } } } } private void stopTomcat() throws LifecycleException { if (Thread.currentThread() .getContextClassLoader() instanceof TomcatEmbeddedWebappClassLoader) { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); } this.tomcat.stop(); }}
进入到tomcat.stop()
看下。
public class Tomcat { public void stop() throws LifecycleException { getServer(); server.stop(); } public Server getServer() { if (server != null) { return server; } System.setProperty("catalina.useNaming", "false"); server = new StandardServer(); initBaseDir(); // Set configuration source ConfigFileLoader.setSource(new CatalinaBaseConfigurationSource(new File(basedir), null)); server.setPort( -1 ); Service service = new StandardService(); service.setName("Tomcat"); server.addService(service); return server; } }
再进入StandardService.stop
。
public abstract class LifecycleBase implements Lifecycle { public final synchronized void stop() throws LifecycleException { if (LifecycleState.STOPPING_PREP.equals(state) || LifecycleState.STOPPING.equals(state) || LifecycleState.STOPPED.equals(state)) { if (log.isDebugEnabled()) { Exception e = new LifecycleException(); log.debug(sm.getString("lifecycleBase.alreadyStopped", toString()), e); } else if (log.isInfoEnabled()) { log.info(sm.getString("lifecycleBase.alreadyStopped", toString())); } return; } if (state.equals(LifecycleState.NEW)) { state = LifecycleState.STOPPED; return; } if (!state.equals(LifecycleState.STARTED) && !state.equals(LifecycleState.FAILED)) { invalidTransition(Lifecycle.BEFORE_STOP_EVENT); } try { if (state.equals(LifecycleState.FAILED)) { // Don't transition to STOPPING_PREP as that would briefly mark the // component as available but do ensure the BEFORE_STOP_EVENT is // fired fireLifecycleEvent(BEFORE_STOP_EVENT, null); } else { setStateInternal(LifecycleState.STOPPING_PREP, null, false); } stopInternal(); // Shouldn't be necessary but acts as a check that sub-classes are // doing what they are supposed to. if (!state.equals(LifecycleState.STOPPING) && !state.equals(LifecycleState.FAILED)) { invalidTransition(Lifecycle.AFTER_STOP_EVENT); } setStateInternal(LifecycleState.STOPPED, null, false); } catch (Throwable t) { handleSubClassException(t, "lifecycleBase.stopFail", toString()); } finally { if (this instanceof Lifecycle.SingleUse) { // Complete stop process first setStateInternal(LifecycleState.STOPPED, null, false); destroy(); } } }}public final class StandardServer extends LifecycleMBeanBase implements Server { protected void stopInternal() throws LifecycleException { setState(LifecycleState.STOPPING); if (monitorFuture != null) { monitorFuture.cancel(true); monitorFuture = null; } if (periodicLifecycleEventFuture != null) { periodicLifecycleEventFuture.cancel(false); periodicLifecycleEventFuture = null; } fireLifecycleEvent(CONFIGURE_STOP_EVENT, null); // Stop our defined Services for (int i = 0; i < services.length; i++) { services[i].stop(); } globalNamingResources.stop(); stopAwait(); }}
stop
方法在父类LifecycleBase
里(Tomcat的组件基本上都继承了LifecycleBase
),StandardServer
实现子方法stopInternal
,进入到StandardService.stop
里。这里要先说下几个对象的关系:
- 一个
Tomcat
中只有一个Server
。 - 一个
Server
可以包含多个Service
,比如一个Tomcat(Server)下部署多个项目,那就是多个Service
。 - 一个
Service
只有一个Container
,但是可以有多个Connector
,这是因为一个服务可以有多个连接,如同时提供Http
和Https
链接,也可以提供向相同协议不同端口的连接。 - 一个
Connector
下只有一个ProtocolHandler
。由于 I/O 模型和应用层协议可以自由组合,比如 NIO + HTTP 或者 NIO.2 + AJP。Tomcat 的设计者将网络通信和应用层协议解析放在一起考虑,设计了一个叫 ProtocolHandler 的接口来封装这两种变化点。各种协议和通信模型的组合有相应的具体实现类。比如:Http11NioProtocol 和 AjpNioProtocol。Connector
用ProtocolHandler
来处理网络连接和应用层协议,包含了2 个重要部件:Endpoint 和 Processor。 - 一个
ProtocolHandler
下只有一个EndPoint
,Endpoint 是通信端点,即通信监听的接口,是具体的 Socket 接收和发送处理器,是对传输层的抽象,因此 Endpoint 是用来实现 TCP/IP 协议的。
public class StandardService extends LifecycleMBeanBase implements Service { protected void stopInternal() throws LifecycleException { // Pause connectors first synchronized (connectorsLock) { for (Connector connector: connectors) { connector.pause(); // Close server socket if bound on start // Note: test is in AbstractEndpoint connector.getProtocolHandler().closeServerSocketGraceful(); } } if(log.isInfoEnabled()) log.info(sm.getString("standardService.stop.name", this.name)); setState(LifecycleState.STOPPING); // Stop our defined Container second if (engine != null) { synchronized (engine) { engine.stop(); } } // Now stop the connectors synchronized (connectorsLock) { for (Connector connector: connectors) { if (!LifecycleState.STARTED.equals( connector.getState())) { // Connectors only need stopping if they are currently // started. They may have failed to start or may have been // stopped (e.g. via a JMX call) continue; } connector.stop(); } } // If the Server failed to start, the mapperListener won't have been // started if (mapperListener.getState() != LifecycleState.INITIALIZED) { mapperListener.stop(); } synchronized (executors) { for (Executor executor: executors) { executor.stop(); } } }}
这里跟我们上面的问题有关的有这两个:
Connetor.pause
停止接收新请求。Connector.stop
这里会关闭线程池。
public class Connector extends LifecycleMBeanBase { @Override protected void stopInternal() throws LifecycleException { setState(LifecycleState.STOPPING); try { if (protocolHandler != null) { protocolHandler.stop(); } } catch (Exception e) { throw new LifecycleException( sm.getString("coyoteConnector.protocolHandlerStopFailed"), e); } }}public abstract class AbstractProtocol<S> implements ProtocolHandler, MBeanRegistration { public void stop() throws Exception { if(getLog().isInfoEnabled()) { getLog().info(sm.getString("abstractProtocolHandler.stop", getName())); logPortOffset(); } if (monitorFuture != null) { monitorFuture.cancel(true); monitorFuture = null; } stopAsyncTimeout(); // Timeout any pending async request for (Processor processor : waitingProcessors) { processor.timeoutAsync(-1); } endpoint.stop(); } } public abstract class AbstractEndpoint<S,U> { public final void stop() throws Exception { stopInternal(); if (bindState == BindState.BOUND_ON_START || bindState == BindState.SOCKET_CLOSED_ON_STOP) { unbind(); bindState = BindState.UNBOUND; } } public abstract void stopInternal() throws Exception; // 子类实现,子类AprEndpoint和NioEndpoint(Springboot默认是NioEndpoint)实现方法里会调用shutdownExecutor方法 public void shutdownExecutor() { Executor executor = this.executor; if (executor != null && internalExecutor) { this.executor = null; if (executor instanceof ThreadPoolExecutor) { //this is our internal one, so we need to shut it down ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor; tpe.shutdownNow(); long timeout = getExecutorTerminationTimeoutMillis(); if (timeout > 0) { try { tpe.awaitTermination(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { // Ignore } if (tpe.isTerminating()) { getLog().warn(sm.getString("endpoint.warn.executorShutdown", getName())); } } TaskQueue queue = (TaskQueue) tpe.getQueue(); queue.setParent(null); } } } /** * Time to wait for the internal executor (if used) to terminate when the * endpoint is stopped in milliseconds. Defaults to 5000 (5 seconds). */ private long executorTerminationTimeoutMillis = 5000; public long getExecutorTerminationTimeoutMillis() { return executorTerminationTimeoutMillis; }}
以上可以看到,在connector.stop -> protocolHandler.stop -> endpoint.stop -> NioEndpoint.stopInternal -> endpoint.shutdownExecutor
里,线程池直接粗暴的shutdownNow
来停止任务。java线程池的shutdownNow
会尝试interrupt线程池中正在执行的线程,等待执行的线程也会被取消,但是并不能保证一定能成功的interrupt线程池中的线程,所有tomcat通过awaitTermination
等待5
秒,尽量保证停止所有的任务。
从上面的分析可以看到,原因主要是线程池被Tomcat暴力shutdown。在第一章里讲到我们处理这个问题,需要做两步:
- 暂停接收新请求。
- 等待所有任务都执行完成再关闭线程池。
要完成这两步的关键是:
- 获取到
Connector
,Connector.pause
方法可以暂停接收请求。 - 获取到
线程池
。
Connector
在Tomcat架构中,Connector主要负责处理与客户端的通信。Connector的实例用于监听端口,接受来自客户端的请求并将请求转交给Engine处理,同时将来自Engine的答复返回给客户端:
- 监听网络端口
- 接受网络连接请求
- 读取请求网络字节流
- 根据具体应用层协议(HTTP/AJP)解析字节流,生成统一的Tomcat Request对象
- 将Tomcat Request对象转成标准的ServletRequest
- 调用Servlet容器,得到ServletResponse
- 将ServletResponse转成 Tomcat Response对象
- 将Tomcat Response对象转成网络字节流
- 将响应字节流写回服务器。
现在我们看下SpringBoot生成Tomcat的步骤。
@Configurationclass ServletWebServerFactoryConfiguration { @Configuration @ConditionalOnClass({ Servlet.class, Tomcat.class, UpgradeProtocol.class }) @ConditionalOnMissingBean(value = ServletWebServerFactory.class, search = SearchStrategy.CURRENT) public static class EmbeddedTomcat { @Bean public TomcatServletWebServerFactory tomcatServletWebServerFactory() { return new TomcatServletWebServerFactory(); } } /** * Nested configuration if Jetty is being used. */ @Configuration @ConditionalOnClass({ Servlet.class, Server.class, Loader.class, WebAppContext.class }) @ConditionalOnMissingBean(value = ServletWebServerFactory.class, search = SearchStrategy.CURRENT) public static class EmbeddedJetty { @Bean public JettyServletWebServerFactory JettyServletWebServerFactory() { return new JettyServletWebServerFactory(); } } /** * Nested configuration if Undertow is being used. */ @Configuration @ConditionalOnClass({ Servlet.class, Undertow.class, SslClientAuthMode.class }) @ConditionalOnMissingBean(value = ServletWebServerFactory.class, search = SearchStrategy.CURRENT) public static class EmbeddedUndertow { @Bean public UndertowServletWebServerFactory undertowServletWebServerFactory() { return new UndertowServletWebServerFactory(); } }}
可以看到,生成的是TomcatServletWebServerFactory
,TomcatServletWebServerFactory
有个createWebServer
方法来生成Tomcat,那这个方法在什么时候被调用?
public class ServletWebServerApplicationContext extends GenericWebApplicationContext implements ConfigurableWebServerApplicationContext { @Override protected void onRefresh() { super.onRefresh(); try { createWebServer(); } catch (Throwable ex) { throw new ApplicationContextException("Unable to start web server", ex); } } private void createWebServer() { WebServer webServer = this.webServer; ServletContext servletContext = getServletContext(); if (webServer == null && servletContext == null) { ServletWebServerFactory factory = getWebServerFactory(); this.webServer = factory.getWebServer(getSelfInitializer()); } else if (servletContext != null) { try { getSelfInitializer().onStartup(servletContext); } catch (ServletException ex) { throw new ApplicationContextException("Cannot initialize servlet context", ex); } } initPropertySources(); }}
可以看到springboot的容器类ServletWebServerApplicationContext
在系统启动时(onRefresh
),createWebServer
方法会调用- ServletWebServerFactory.getWebServer`。
public class TomcatServletWebServerFactory extends AbstractServletWebServerFactory implements ConfigurableTomcatWebServerFactory, ResourceLoaderAware { @Override public WebServer getWebServer(ServletContextInitializer... initializers) { Tomcat tomcat = new Tomcat(); File baseDir = (this.baseDirectory != null) ? this.baseDirectory : createTempDir("tomcat"); tomcat.setBaseDir(baseDir.getAbsolutePath()); Connector connector = new Connector(this.protocol); tomcat.getService().addConnector(connector); customizeConnector(connector); tomcat.setConnector(connector); tomcat.getHost().setAutoDeploy(false); configureEngine(tomcat.getEngine()); for (Connector additionalConnector : this.additionalTomcatConnectors) { tomcat.getService().addConnector(additionalConnector); } prepareContext(tomcat.getHost(), initializers); return getTomcatWebServer(tomcat); }}
进入getWebServer
看到是在构建Tomcat对象
,其中customizeConnector(connector)
这一步有对Connector
的处理。
public class TomcatServletWebServerFactory extends AbstractServletWebServerFactory implements ConfigurableTomcatWebServerFactory, ResourceLoaderAware { protected void customizeConnector(Connector connector) { int port = (getPort() >= 0) ? getPort() : 0; connector.setPort(port); if (StringUtils.hasText(this.getServerHeader())) { connector.setAttribute("server", this.getServerHeader()); } if (connector.getProtocolHandler() instanceof AbstractProtocol) { customizeProtocol((AbstractProtocol<?>) connector.getProtocolHandler()); } if (getUriEncoding() != null) { connector.setURIEncoding(getUriEncoding().name()); } // Don't bind to the socket prematurely if ApplicationContext is slow to start connector.setProperty("bindOnInit", "false"); if (getSsl() != null && getSsl().isEnabled()) { customizeSsl(connector); } TomcatConnectorCustomizer compression = new CompressionConnectorCustomizer( getCompression()); compression.customize(connector); for (TomcatConnectorCustomizer customizer : this.tomcatConnectorCustomizers) { customizer.customize(connector); } } public void addContextCustomizers( TomcatContextCustomizer... tomcatContextCustomizers) { Assert.notNull(tomcatContextCustomizers, "TomcatContextCustomizers must not be null"); this.tomcatContextCustomizers.addAll(Arrays.asList(tomcatContextCustomizers)); }}
通过以上代码,我们了解到:可以自定义TomcatContextCustomizer
,通过addContextCustomizers
方法添加;而customizeConnector
方法里,会遍历所有的TomcatContextCustomizer
对象,通过customizer.customize(connector)
方法来传入Connector
对象。
线程池
在上一步已经可以获取到Connector
的基础上,要获取到线程池已经很简单了:connector -> protocolHandler -> endpoint -> getExecutor()
。
附加:
Spring事件处理
public abstract class AbstractApplicationContext extends DefaultResourceLoader implements ConfigurableApplicationContext { protected void publishEvent(Object event, @Nullable ResolvableType eventType) { Assert.notNull(event, "Event must not be null"); // Decorate event as an ApplicationEvent if necessary ApplicationEvent applicationEvent; if (event instanceof ApplicationEvent) { applicationEvent = (ApplicationEvent) event; } else { applicationEvent = new PayloadApplicationEvent<>(this, event); if (eventType == null) { eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType(); } } // Multicast right now if possible - or lazily once the multicaster is initialized if (this.earlyApplicationEvents != null) { this.earlyApplicationEvents.add(applicationEvent); } else { getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); } // Publish event via parent context as well... if (this.parent != null) { if (this.parent instanceof AbstractApplicationContext) { ((AbstractApplicationContext) this.parent).publishEvent(event, eventType); } else { this.parent.publishEvent(event); } } } }
可以看到重点在getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
这一步,其中ApplicationEventMulticaster
默认是SimpleApplicationEventMulticaster
。
public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster { public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) { ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event)); for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) { Executor executor = getTaskExecutor(); if (executor != null) { executor.execute(() -> invokeListener(listener, event)); } else { invokeListener(listener, event); } } }}
首先获取所有监听对应事件的Listener
(这一步不继续深入,逻辑也比较简单),这里executor
默认没有值,所以进入invokeListener
方法。
public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster { protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) { ErrorHandler errorHandler = getErrorHandler(); if (errorHandler != null) { try { doInvokeListener(listener, event); } catch (Throwable err) { errorHandler.handleError(err); } } else { doInvokeListener(listener, event); } } private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) { try { listener.onApplicationEvent(event); } catch (ClassCastException ex) { String msg = ex.getMessage(); if (msg == null || matchesClassCastMessage(msg, event.getClass())) { // Possibly a lambda-defined listener which we could not resolve the generic event type for // -> let's suppress the exception and just log a debug message. Log logger = LogFactory.getLog(getClass()); if (logger.isDebugEnabled()) { logger.debug("Non-matching event type for listener: " + listener, ex); } } else { throw ex; } } }}
可以看到invokeListener
就是直接调用listener.onApplicationEvent
方法。
所以在默认情况下(executor
为空),spring的事情处理是同步的。