本篇文章主要讲tomcat"粗暴stop"的流程,以及了解下解决方案里的几个关键的信息,详细解决步骤会在后面的文章里。

整体架构

我们先来看下Tomcat的整体架构,大致了解下Tomcat组件之间的关系:

图片来源:《四张图带你了解Tomcat架构》

图片来源:《Tomcat系统架构(上)》

图片来源:《Tomcat系统架构(上): 连接器是如何设计的?》

关闭

SpringBoot-Tomcat在服务器关闭时,并不会等请求都处理完才关闭,而是立即关闭,我们通过代码来看下SpringBoot-Tomcat的关闭流程。
先从SpringContext开始:

public abstract class AbstractApplicationContext extends DefaultResourceLoader  implements ConfigurableApplicationContext {    public void registerShutdownHook() {        if (this.shutdownHook == null) {            // No shutdown hook registered yet.            this.shutdownHook = new Thread() {                @Override                public void run() {                    synchronized (startupShutdownMonitor) {                        doClose();                    }                }            };            Runtime.getRuntime().addShutdownHook(this.shutdownHook);        }    }      protected void doClose() {        // Check whether an actual close attempt is necessary...        if (this.active.get() && this.closed.compareAndSet(false, true)) {            if (logger.isDebugEnabled()) {                logger.debug("Closing " + this);            }            LiveBeansView.unregisterApplicationContext(this);            try {                // Publish shutdown event.                publishEvent(new ContextClosedEvent(this)); // 发布事件            }            catch (Throwable ex) {                logger.warn("Exception thrown from ApplicationListener handling ContextClosedEvent", ex);            }            // Stop all Lifecycle beans, to avoid delays during individual destruction.            if (this.lifecycleProcessor != null) {                try {                    this.lifecycleProcessor.onClose();                }                catch (Throwable ex) {                    logger.warn("Exception thrown from LifecycleProcessor on context close", ex);                }            }            // Destroy all cached singletons in the context's BeanFactory.            destroyBeans();            // Close the state of this context itself.            closeBeanFactory();            // Let subclasses do some final clean-up if they wish...            onClose(); // 执行关闭            // Reset local application listeners to pre-refresh state.            if (this.earlyApplicationListeners != null) {                this.applicationListeners.clear();                this.applicationListeners.addAll(this.earlyApplicationListeners);            }            // Switch to inactive.            this.active.set(false);        }    }        }  

注意,发布关闭事件publishEvent(new ContextClosedEvent(this))是在关闭资源onClose之前,并且Spring事件处理默认是同步处理的,这里很重要,后面讲到的方案里会用到这个。

public class ServletWebServerApplicationContext extends GenericWebApplicationContext  implements ConfigurableWebServerApplicationContext {    private void stopAndReleaseWebServer() {        WebServer webServer = this.webServer;        if (webServer != null) {            try {                webServer.stop();                this.webServer = null;            }            catch (Exception ex) {                throw new IllegalStateException(ex);            }        }    }                protected void onClose() {        super.onClose();        stopAndReleaseWebServer();    }    }   

从这里开始,进入到Web容器的关闭了。

public class TomcatWebServer implements WebServer {    public void stop() throws WebServerException {        synchronized (this.monitor) {            boolean wasStarted = this.started;            try {                this.started = false;                try {                    stopTomcat();                    this.tomcat.destroy();                }                catch (LifecycleException ex) {                    // swallow and continue                }            }            catch (Exception ex) {                throw new WebServerException("Unable to stop embedded Tomcat", ex);            }            finally {                if (wasStarted) {                    containerCounter.decrementAndGet();                }            }        }    }        private void stopTomcat() throws LifecycleException {        if (Thread.currentThread()                .getContextClassLoader() instanceof TomcatEmbeddedWebappClassLoader) {            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());        }        this.tomcat.stop();    }}

进入到tomcat.stop()看下。

public class Tomcat {    public void stop() throws LifecycleException {        getServer();        server.stop();    }        public Server getServer() {        if (server != null) {            return server;        }        System.setProperty("catalina.useNaming", "false");        server = new StandardServer();        initBaseDir();        // Set configuration source        ConfigFileLoader.setSource(new CatalinaBaseConfigurationSource(new File(basedir), null));        server.setPort( -1 );        Service service = new StandardService();        service.setName("Tomcat");        server.addService(service);        return server;    }    }

再进入StandardService.stop

public abstract class LifecycleBase implements Lifecycle {    public final synchronized void stop() throws LifecycleException {        if (LifecycleState.STOPPING_PREP.equals(state) || LifecycleState.STOPPING.equals(state) ||                LifecycleState.STOPPED.equals(state)) {            if (log.isDebugEnabled()) {                Exception e = new LifecycleException();                log.debug(sm.getString("lifecycleBase.alreadyStopped", toString()), e);            } else if (log.isInfoEnabled()) {                log.info(sm.getString("lifecycleBase.alreadyStopped", toString()));            }            return;        }        if (state.equals(LifecycleState.NEW)) {            state = LifecycleState.STOPPED;            return;        }        if (!state.equals(LifecycleState.STARTED) && !state.equals(LifecycleState.FAILED)) {            invalidTransition(Lifecycle.BEFORE_STOP_EVENT);        }        try {            if (state.equals(LifecycleState.FAILED)) {                // Don't transition to STOPPING_PREP as that would briefly mark the                // component as available but do ensure the BEFORE_STOP_EVENT is                // fired                fireLifecycleEvent(BEFORE_STOP_EVENT, null);            } else {                setStateInternal(LifecycleState.STOPPING_PREP, null, false);            }            stopInternal();            // Shouldn't be necessary but acts as a check that sub-classes are            // doing what they are supposed to.            if (!state.equals(LifecycleState.STOPPING) && !state.equals(LifecycleState.FAILED)) {                invalidTransition(Lifecycle.AFTER_STOP_EVENT);            }            setStateInternal(LifecycleState.STOPPED, null, false);        } catch (Throwable t) {            handleSubClassException(t, "lifecycleBase.stopFail", toString());        } finally {            if (this instanceof Lifecycle.SingleUse) {                // Complete stop process first                setStateInternal(LifecycleState.STOPPED, null, false);                destroy();            }        }    }}public final class StandardServer extends LifecycleMBeanBase implements Server {    protected void stopInternal() throws LifecycleException {        setState(LifecycleState.STOPPING);        if (monitorFuture != null) {            monitorFuture.cancel(true);            monitorFuture = null;        }        if (periodicLifecycleEventFuture != null) {            periodicLifecycleEventFuture.cancel(false);            periodicLifecycleEventFuture = null;        }        fireLifecycleEvent(CONFIGURE_STOP_EVENT, null);        // Stop our defined Services        for (int i = 0; i < services.length; i++) {            services[i].stop();        }        globalNamingResources.stop();        stopAwait();    }}    

stop方法在父类LifecycleBase里(Tomcat的组件基本上都继承了LifecycleBase),StandardServer实现子方法stopInternal,进入到StandardService.stop里。这里要先说下几个对象的关系:

  • 一个Tomcat中只有一个Server
  • 一个Server可以包含多个Service,比如一个Tomcat(Server)下部署多个项目,那就是多个Service
  • 一个Service只有一个Container,但是可以有多个Connector,这是因为一个服务可以有多个连接,如同时提供HttpHttps链接,也可以提供向相同协议不同端口的连接。
  • 一个Connector下只有一个ProtocolHandler。由于 I/O 模型和应用层协议可以自由组合,比如 NIO + HTTP 或者 NIO.2 + AJP。Tomcat 的设计者将网络通信和应用层协议解析放在一起考虑,设计了一个叫 ProtocolHandler 的接口来封装这两种变化点。各种协议和通信模型的组合有相应的具体实现类。比如:Http11NioProtocol 和 AjpNioProtocol。ConnectorProtocolHandler来处理网络连接和应用层协议,包含了2 个重要部件:Endpoint 和 Processor。
  • 一个ProtocolHandler下只有一个EndPoint,Endpoint 是通信端点,即通信监听的接口,是具体的 Socket 接收和发送处理器,是对传输层的抽象,因此 Endpoint 是用来实现 TCP/IP 协议的。
public class StandardService extends LifecycleMBeanBase implements Service {    protected void stopInternal() throws LifecycleException {        // Pause connectors first        synchronized (connectorsLock) {            for (Connector connector: connectors) {                connector.pause();                // Close server socket if bound on start                // Note: test is in AbstractEndpoint                connector.getProtocolHandler().closeServerSocketGraceful();            }        }        if(log.isInfoEnabled())            log.info(sm.getString("standardService.stop.name", this.name));        setState(LifecycleState.STOPPING);        // Stop our defined Container second        if (engine != null) {            synchronized (engine) {                engine.stop();            }        }        // Now stop the connectors        synchronized (connectorsLock) {            for (Connector connector: connectors) {                if (!LifecycleState.STARTED.equals(                        connector.getState())) {                    // Connectors only need stopping if they are currently                    // started. They may have failed to start or may have been                    // stopped (e.g. via a JMX call)                    continue;                }                connector.stop();            }        }        // If the Server failed to start, the mapperListener won't have been        // started        if (mapperListener.getState() != LifecycleState.INITIALIZED) {            mapperListener.stop();        }        synchronized (executors) {            for (Executor executor: executors) {                executor.stop();            }        }    }}    

这里跟我们上面的问题有关的有这两个:

  1. Connetor.pause停止接收新请求。
  2. Connector.stop这里会关闭线程池。
public class Connector extends LifecycleMBeanBase {    @Override    protected void stopInternal() throws LifecycleException {        setState(LifecycleState.STOPPING);        try {            if (protocolHandler != null) {                protocolHandler.stop();            }        } catch (Exception e) {            throw new LifecycleException(                    sm.getString("coyoteConnector.protocolHandlerStopFailed"), e);        }    }}public abstract class AbstractProtocol<S> implements ProtocolHandler,        MBeanRegistration {    public void stop() throws Exception {        if(getLog().isInfoEnabled()) {            getLog().info(sm.getString("abstractProtocolHandler.stop", getName()));            logPortOffset();        }        if (monitorFuture != null) {            monitorFuture.cancel(true);            monitorFuture = null;        }        stopAsyncTimeout();        // Timeout any pending async request        for (Processor processor : waitingProcessors) {            processor.timeoutAsync(-1);        }        endpoint.stop();    }        }  public abstract class AbstractEndpoint<S,U> {    public final void stop() throws Exception {        stopInternal();        if (bindState == BindState.BOUND_ON_START || bindState == BindState.SOCKET_CLOSED_ON_STOP) {            unbind();            bindState = BindState.UNBOUND;        }    }        public abstract void stopInternal() throws Exception; // 子类实现,子类AprEndpoint和NioEndpoint(Springboot默认是NioEndpoint)实现方法里会调用shutdownExecutor方法    public void shutdownExecutor() {        Executor executor = this.executor;        if (executor != null && internalExecutor) {            this.executor = null;            if (executor instanceof ThreadPoolExecutor) {                //this is our internal one, so we need to shut it down                ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;                tpe.shutdownNow();                long timeout = getExecutorTerminationTimeoutMillis();                if (timeout > 0) {                    try {                        tpe.awaitTermination(timeout, TimeUnit.MILLISECONDS);                    } catch (InterruptedException e) {                        // Ignore                    }                    if (tpe.isTerminating()) {                        getLog().warn(sm.getString("endpoint.warn.executorShutdown", getName()));                    }                }                TaskQueue queue = (TaskQueue) tpe.getQueue();                queue.setParent(null);            }        }    }        /**     * Time to wait for the internal executor (if used) to terminate when the     * endpoint is stopped in milliseconds. Defaults to 5000 (5 seconds).     */    private long executorTerminationTimeoutMillis = 5000;    public long getExecutorTerminationTimeoutMillis() {        return executorTerminationTimeoutMillis;    }}      

以上可以看到,在connector.stop -> protocolHandler.stop -> endpoint.stop -> NioEndpoint.stopInternal -> endpoint.shutdownExecutor里,线程池直接粗暴的shutdownNow来停止任务。java线程池的shutdownNow会尝试interrupt线程池中正在执行的线程,等待执行的线程也会被取消,但是并不能保证一定能成功的interrupt线程池中的线程,所有tomcat通过awaitTermination等待5秒,尽量保证停止所有的任务。

从上面的分析可以看到,原因主要是线程池被Tomcat暴力shutdown。在第一章里讲到我们处理这个问题,需要做两步:

  1. 暂停接收新请求。
  2. 等待所有任务都执行完成再关闭线程池。

要完成这两步的关键是:

  1. 获取到ConnectorConnector.pause方法可以暂停接收请求。
  2. 获取到线程池

Connector

在Tomcat架构中,Connector主要负责处理与客户端的通信。Connector的实例用于监听端口,接受来自客户端的请求并将请求转交给Engine处理,同时将来自Engine的答复返回给客户端:

  1. 监听网络端口
  2. 接受网络连接请求
  3. 读取请求网络字节流
  4. 根据具体应用层协议(HTTP/AJP)解析字节流,生成统一的Tomcat Request对象
  5. 将Tomcat Request对象转成标准的ServletRequest
  6. 调用Servlet容器,得到ServletResponse
  7. 将ServletResponse转成 Tomcat Response对象
  8. 将Tomcat Response对象转成网络字节流
  9. 将响应字节流写回服务器。

现在我们看下SpringBoot生成Tomcat的步骤。

@Configurationclass ServletWebServerFactoryConfiguration {    @Configuration    @ConditionalOnClass({ Servlet.class, Tomcat.class, UpgradeProtocol.class })    @ConditionalOnMissingBean(value = ServletWebServerFactory.class, search = SearchStrategy.CURRENT)    public static class EmbeddedTomcat {        @Bean        public TomcatServletWebServerFactory tomcatServletWebServerFactory() {            return new TomcatServletWebServerFactory();        }    }    /**     * Nested configuration if Jetty is being used.     */    @Configuration    @ConditionalOnClass({ Servlet.class, Server.class, Loader.class,            WebAppContext.class })    @ConditionalOnMissingBean(value = ServletWebServerFactory.class, search = SearchStrategy.CURRENT)    public static class EmbeddedJetty {        @Bean        public JettyServletWebServerFactory JettyServletWebServerFactory() {            return new JettyServletWebServerFactory();        }    }    /**     * Nested configuration if Undertow is being used.     */    @Configuration    @ConditionalOnClass({ Servlet.class, Undertow.class, SslClientAuthMode.class })    @ConditionalOnMissingBean(value = ServletWebServerFactory.class, search = SearchStrategy.CURRENT)    public static class EmbeddedUndertow {        @Bean        public UndertowServletWebServerFactory undertowServletWebServerFactory() {            return new UndertowServletWebServerFactory();        }    }}

可以看到,生成的是TomcatServletWebServerFactoryTomcatServletWebServerFactory有个createWebServer方法来生成Tomcat,那这个方法在什么时候被调用?

public class ServletWebServerApplicationContext extends GenericWebApplicationContext  implements ConfigurableWebServerApplicationContext {    @Override    protected void onRefresh() {        super.onRefresh();        try {            createWebServer();        }        catch (Throwable ex) {            throw new ApplicationContextException("Unable to start web server", ex);        }    }        private void createWebServer() {        WebServer webServer = this.webServer;        ServletContext servletContext = getServletContext();        if (webServer == null && servletContext == null) {            ServletWebServerFactory factory = getWebServerFactory();            this.webServer = factory.getWebServer(getSelfInitializer());        }        else if (servletContext != null) {            try {                getSelfInitializer().onStartup(servletContext);            }            catch (ServletException ex) {                throw new ApplicationContextException("Cannot initialize servlet context",                        ex);            }        }        initPropertySources();    }}    

可以看到springboot的容器类ServletWebServerApplicationContext在系统启动时(onRefresh),createWebServer方法会调用- ServletWebServerFactory.getWebServer`。

public class TomcatServletWebServerFactory extends AbstractServletWebServerFactory  implements ConfigurableTomcatWebServerFactory, ResourceLoaderAware {    @Override    public WebServer getWebServer(ServletContextInitializer... initializers) {        Tomcat tomcat = new Tomcat();        File baseDir = (this.baseDirectory != null) ? this.baseDirectory                : createTempDir("tomcat");        tomcat.setBaseDir(baseDir.getAbsolutePath());        Connector connector = new Connector(this.protocol);        tomcat.getService().addConnector(connector);        customizeConnector(connector);        tomcat.setConnector(connector);        tomcat.getHost().setAutoDeploy(false);        configureEngine(tomcat.getEngine());        for (Connector additionalConnector : this.additionalTomcatConnectors) {            tomcat.getService().addConnector(additionalConnector);        }        prepareContext(tomcat.getHost(), initializers);        return getTomcatWebServer(tomcat);    }}        

进入getWebServer看到是在构建Tomcat对象,其中customizeConnector(connector)这一步有对Connector的处理。

public class TomcatServletWebServerFactory extends AbstractServletWebServerFactory    implements ConfigurableTomcatWebServerFactory, ResourceLoaderAware {         protected void customizeConnector(Connector connector) {        int port = (getPort() >= 0) ? getPort() : 0;        connector.setPort(port);        if (StringUtils.hasText(this.getServerHeader())) {            connector.setAttribute("server", this.getServerHeader());        }        if (connector.getProtocolHandler() instanceof AbstractProtocol) {            customizeProtocol((AbstractProtocol<?>) connector.getProtocolHandler());        }        if (getUriEncoding() != null) {            connector.setURIEncoding(getUriEncoding().name());        }        // Don't bind to the socket prematurely if ApplicationContext is slow to start        connector.setProperty("bindOnInit", "false");        if (getSsl() != null && getSsl().isEnabled()) {            customizeSsl(connector);        }        TomcatConnectorCustomizer compression = new CompressionConnectorCustomizer(                getCompression());        compression.customize(connector);        for (TomcatConnectorCustomizer customizer : this.tomcatConnectorCustomizers) {            customizer.customize(connector);        }    }        public void addContextCustomizers(            TomcatContextCustomizer... tomcatContextCustomizers) {        Assert.notNull(tomcatContextCustomizers,                "TomcatContextCustomizers must not be null");        this.tomcatContextCustomizers.addAll(Arrays.asList(tomcatContextCustomizers));    }}

通过以上代码,我们了解到:可以自定义TomcatContextCustomizer,通过addContextCustomizers方法添加;而customizeConnector方法里,会遍历所有的TomcatContextCustomizer对象,通过customizer.customize(connector)方法来传入Connector对象。

线程池

在上一步已经可以获取到Connector的基础上,要获取到线程池已经很简单了:connector -> protocolHandler -> endpoint -> getExecutor()

附加:

Spring事件处理

public abstract class AbstractApplicationContext extends DefaultResourceLoader  implements ConfigurableApplicationContext {      protected void publishEvent(Object event, @Nullable ResolvableType eventType) {        Assert.notNull(event, "Event must not be null");        // Decorate event as an ApplicationEvent if necessary        ApplicationEvent applicationEvent;        if (event instanceof ApplicationEvent) {            applicationEvent = (ApplicationEvent) event;        }        else {            applicationEvent = new PayloadApplicationEvent<>(this, event);            if (eventType == null) {                eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType();            }        }        // Multicast right now if possible - or lazily once the multicaster is initialized        if (this.earlyApplicationEvents != null) {            this.earlyApplicationEvents.add(applicationEvent);        }        else {            getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);        }        // Publish event via parent context as well...        if (this.parent != null) {            if (this.parent instanceof AbstractApplicationContext) {                ((AbstractApplicationContext) this.parent).publishEvent(event, eventType);            }            else {                this.parent.publishEvent(event);            }        }    }    }        

可以看到重点在getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);这一步,其中ApplicationEventMulticaster默认是SimpleApplicationEventMulticaster

public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {    public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {        ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));        for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {            Executor executor = getTaskExecutor();            if (executor != null) {                executor.execute(() -> invokeListener(listener, event));            }            else {                invokeListener(listener, event);            }        }    }}

首先获取所有监听对应事件的Listener(这一步不继续深入,逻辑也比较简单),这里executor默认没有值,所以进入invokeListener方法。

public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {    protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {        ErrorHandler errorHandler = getErrorHandler();        if (errorHandler != null) {            try {                doInvokeListener(listener, event);            }            catch (Throwable err) {                errorHandler.handleError(err);            }        }        else {            doInvokeListener(listener, event);        }    }        private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {        try {            listener.onApplicationEvent(event);        }        catch (ClassCastException ex) {            String msg = ex.getMessage();            if (msg == null || matchesClassCastMessage(msg, event.getClass())) {                // Possibly a lambda-defined listener which we could not resolve the generic event type for                // -> let's suppress the exception and just log a debug message.                Log logger = LogFactory.getLog(getClass());                if (logger.isDebugEnabled()) {                    logger.debug("Non-matching event type for listener: " + listener, ex);                }            }            else {                throw ex;            }        }    }}

可以看到invokeListener就是直接调用listener.onApplicationEvent方法。
所以在默认情况下(executor为空),spring的事情处理是同步的。