springcloud项目优雅重启五tomcat关闭流程

42次阅读

共计 17023 个字符,预计需要花费 43 分钟才能阅读完成。

本篇文章主要讲 tomcat” 粗暴 stop” 的流程,以及了解下解决方案里的几个关键的信息,详细解决步骤会在后面的文章里。

整体架构

我们先来看下 Tomcat 的整体架构,大致了解下 Tomcat 组件之间的关系:

图片来源:《四张图带你了解 Tomcat 架构》

图片来源:《Tomcat 系统架构 (上)》

图片来源:《Tomcat 系统架构(上):连接器是如何设计的?》

关闭

SpringBoot-Tomcat 在服务器关闭时,并不会等请求都处理完才关闭,而是立即关闭,我们通过代码来看下 SpringBoot-Tomcat 的关闭流程。
先从 SpringContext 开始:

public abstract class AbstractApplicationContext extends DefaultResourceLoader
  implements ConfigurableApplicationContext {public void registerShutdownHook() {if (this.shutdownHook == null) {
            // No shutdown hook registered yet.
            this.shutdownHook = new Thread() {
                @Override
                public void run() {synchronized (startupShutdownMonitor) {doClose();
                    }
                }
            };
            Runtime.getRuntime().addShutdownHook(this.shutdownHook);
        }
    }  
    protected void doClose() {
        // Check whether an actual close attempt is necessary...
        if (this.active.get() && this.closed.compareAndSet(false, true)) {if (logger.isDebugEnabled()) {logger.debug("Closing" + this);
            }

            LiveBeansView.unregisterApplicationContext(this);

            try {
                // Publish shutdown event.
                publishEvent(new ContextClosedEvent(this)); // 发布事件
            }
            catch (Throwable ex) {logger.warn("Exception thrown from ApplicationListener handling ContextClosedEvent", ex);
            }

            // Stop all Lifecycle beans, to avoid delays during individual destruction.
            if (this.lifecycleProcessor != null) {
                try {this.lifecycleProcessor.onClose();
                }
                catch (Throwable ex) {logger.warn("Exception thrown from LifecycleProcessor on context close", ex);
                }
            }

            // Destroy all cached singletons in the context's BeanFactory.
            destroyBeans();

            // Close the state of this context itself.
            closeBeanFactory();

            // Let subclasses do some final clean-up if they wish...
            onClose(); // 执行关闭

            // Reset local application listeners to pre-refresh state.
            if (this.earlyApplicationListeners != null) {this.applicationListeners.clear();
                this.applicationListeners.addAll(this.earlyApplicationListeners);
            }

            // Switch to inactive.
            this.active.set(false);
        }
    }        
}  

注意,发布关闭事件 publishEvent(new ContextClosedEvent(this)) 是在关闭资源 onClose 之前,并且 Spring 事件处理默认是同步处理的,这里很重要,后面讲到的方案里会用到这个。

public class ServletWebServerApplicationContext extends GenericWebApplicationContext
  implements ConfigurableWebServerApplicationContext {private void stopAndReleaseWebServer() {
        WebServer webServer = this.webServer;
        if (webServer != null) {
            try {webServer.stop();
                this.webServer = null;
            }
            catch (Exception ex) {throw new IllegalStateException(ex);
            }
        }
    }
            
    protected void onClose() {super.onClose();
        stopAndReleaseWebServer();}
    
}   

从这里开始,进入到 Web 容器的关闭了。

public class TomcatWebServer implements WebServer {public void stop() throws WebServerException {synchronized (this.monitor) {
            boolean wasStarted = this.started;
            try {
                this.started = false;
                try {stopTomcat();
                    this.tomcat.destroy();}
                catch (LifecycleException ex) {// swallow and continue}
            }
            catch (Exception ex) {throw new WebServerException("Unable to stop embedded Tomcat", ex);
            }
            finally {if (wasStarted) {containerCounter.decrementAndGet();
                }
            }
        }
    }
    
    private void stopTomcat() throws LifecycleException {if (Thread.currentThread()
                .getContextClassLoader() instanceof TomcatEmbeddedWebappClassLoader) {Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
        }
        this.tomcat.stop();}

}

进入到 tomcat.stop() 看下。

public class Tomcat {public void stop() throws LifecycleException {getServer();
        server.stop();}
    
    public Server getServer() {if (server != null) {return server;}

        System.setProperty("catalina.useNaming", "false");

        server = new StandardServer();

        initBaseDir();

        // Set configuration source
        ConfigFileLoader.setSource(new CatalinaBaseConfigurationSource(new File(basedir), null));

        server.setPort(-1);

        Service service = new StandardService();
        service.setName("Tomcat");
        server.addService(service);
        return server;
    }    
}

再进入StandardService.stop

public abstract class LifecycleBase implements Lifecycle {public final synchronized void stop() throws LifecycleException {if (LifecycleState.STOPPING_PREP.equals(state) || LifecycleState.STOPPING.equals(state) ||
                LifecycleState.STOPPED.equals(state)) {if (log.isDebugEnabled()) {Exception e = new LifecycleException();
                log.debug(sm.getString("lifecycleBase.alreadyStopped", toString()), e);
            } else if (log.isInfoEnabled()) {log.info(sm.getString("lifecycleBase.alreadyStopped", toString()));
            }

            return;
        }

        if (state.equals(LifecycleState.NEW)) {
            state = LifecycleState.STOPPED;
            return;
        }

        if (!state.equals(LifecycleState.STARTED) && !state.equals(LifecycleState.FAILED)) {invalidTransition(Lifecycle.BEFORE_STOP_EVENT);
        }

        try {if (state.equals(LifecycleState.FAILED)) {
                // Don't transition to STOPPING_PREP as that would briefly mark the
                // component as available but do ensure the BEFORE_STOP_EVENT is
                // fired
                fireLifecycleEvent(BEFORE_STOP_EVENT, null);
            } else {setStateInternal(LifecycleState.STOPPING_PREP, null, false);
            }

            stopInternal();

            // Shouldn't be necessary but acts as a check that sub-classes are
            // doing what they are supposed to.
            if (!state.equals(LifecycleState.STOPPING) && !state.equals(LifecycleState.FAILED)) {invalidTransition(Lifecycle.AFTER_STOP_EVENT);
            }

            setStateInternal(LifecycleState.STOPPED, null, false);
        } catch (Throwable t) {handleSubClassException(t, "lifecycleBase.stopFail", toString());
        } finally {if (this instanceof Lifecycle.SingleUse) {
                // Complete stop process first
                setStateInternal(LifecycleState.STOPPED, null, false);
                destroy();}
        }
    }
}
public final class StandardServer extends LifecycleMBeanBase implements Server {protected void stopInternal() throws LifecycleException {setState(LifecycleState.STOPPING);

        if (monitorFuture != null) {monitorFuture.cancel(true);
            monitorFuture = null;
        }
        if (periodicLifecycleEventFuture != null) {periodicLifecycleEventFuture.cancel(false);
            periodicLifecycleEventFuture = null;
        }

        fireLifecycleEvent(CONFIGURE_STOP_EVENT, null);

        // Stop our defined Services
        for (int i = 0; i < services.length; i++) {services[i].stop();}

        globalNamingResources.stop();

        stopAwait();}
}    

stop方法在父类 LifecycleBase 里(Tomcat 的组件基本上都继承了 LifecycleBase),StandardServer 实现子方法 stopInternal,进入到StandardService.stop 里。这里要先说下几个对象的关系:

  • 一个 Tomcat 中只有一个Server
  • 一个 Server 可以包含多个 Service,比如一个 Tomcat(Server) 下部署多个项目,那就是多个Service
  • 一个 Service 只有一个 Container,但是可以有多个Connector,这是因为一个服务可以有多个连接,如同时提供HttpHttps链接,也可以提供向相同协议不同端口的连接。
  • 一个 Connector 下只有一个 ProtocolHandler。由于 I/O 模型和应用层协议可以自由组合,比如 NIO + HTTP 或者 NIO.2 + AJP。Tomcat 的设计者将网络通信和应用层协议解析放在一起考虑,设计了一个叫 ProtocolHandler 的接口来封装这两种变化点。各种协议和通信模型的组合有相应的具体实现类。比如:Http11NioProtocol 和 AjpNioProtocol。ConnectorProtocolHandler来处理网络连接和应用层协议,包含了 2 个重要部件:Endpoint 和 Processor。
  • 一个 ProtocolHandler 下只有一个EndPoint,Endpoint 是通信端点,即通信监听的接口,是具体的 Socket 接收和发送处理器,是对传输层的抽象,因此 Endpoint 是用来实现 TCP/IP 协议的。
public class StandardService extends LifecycleMBeanBase implements Service {protected void stopInternal() throws LifecycleException {

        // Pause connectors first
        synchronized (connectorsLock) {for (Connector connector: connectors) {connector.pause();
                // Close server socket if bound on start
                // Note: test is in AbstractEndpoint
                connector.getProtocolHandler().closeServerSocketGraceful();
            }
        }

        if(log.isInfoEnabled())
            log.info(sm.getString("standardService.stop.name", this.name));
        setState(LifecycleState.STOPPING);

        // Stop our defined Container second
        if (engine != null) {synchronized (engine) {engine.stop();
            }
        }

        // Now stop the connectors
        synchronized (connectorsLock) {for (Connector connector: connectors) {
                if (!LifecycleState.STARTED.equals(connector.getState())) {
                    // Connectors only need stopping if they are currently
                    // started. They may have failed to start or may have been
                    // stopped (e.g. via a JMX call)
                    continue;
                }
                connector.stop();}
        }

        // If the Server failed to start, the mapperListener won't have been
        // started
        if (mapperListener.getState() != LifecycleState.INITIALIZED) {mapperListener.stop();
        }

        synchronized (executors) {for (Executor executor: executors) {executor.stop();
            }
        }

    }
}    

这里跟我们上面的问题有关的有这两个:

  1. Connetor.pause停止接收新请求。
  2. Connector.stop这里会关闭线程池。
public class Connector extends LifecycleMBeanBase {
    @Override
    protected void stopInternal() throws LifecycleException {setState(LifecycleState.STOPPING);

        try {if (protocolHandler != null) {protocolHandler.stop();
            }
        } catch (Exception e) {
            throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerStopFailed"), e);
        }
    }

}

public abstract class AbstractProtocol<S> implements ProtocolHandler,
        MBeanRegistration {public void stop() throws Exception {if(getLog().isInfoEnabled()) {getLog().info(sm.getString("abstractProtocolHandler.stop", getName()));
            logPortOffset();}

        if (monitorFuture != null) {monitorFuture.cancel(true);
            monitorFuture = null;
        }
        stopAsyncTimeout();
        // Timeout any pending async request
        for (Processor processor : waitingProcessors) {processor.timeoutAsync(-1);
        }

        endpoint.stop();}        
}  

public abstract class AbstractEndpoint<S,U> {public final void stop() throws Exception {stopInternal();
        if (bindState == BindState.BOUND_ON_START || bindState == BindState.SOCKET_CLOSED_ON_STOP) {unbind();
            bindState = BindState.UNBOUND;
        }
    }
    
    public abstract void stopInternal() throws Exception; // 子类实现,子类 AprEndpoint 和 NioEndpoint(Springboot 默认是 NioEndpoint)实现方法里会调用 shutdownExecutor 方法

    public void shutdownExecutor() {
        Executor executor = this.executor;
        if (executor != null && internalExecutor) {
            this.executor = null;
            if (executor instanceof ThreadPoolExecutor) {
                //this is our internal one, so we need to shut it down
                ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
                tpe.shutdownNow();
                long timeout = getExecutorTerminationTimeoutMillis();
                if (timeout > 0) {
                    try {tpe.awaitTermination(timeout, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {// Ignore}
                    if (tpe.isTerminating()) {getLog().warn(sm.getString("endpoint.warn.executorShutdown", getName()));
                    }
                }
                TaskQueue queue = (TaskQueue) tpe.getQueue();
                queue.setParent(null);
            }
        }
    }
    
    /**
     * Time to wait for the internal executor (if used) to terminate when the
     * endpoint is stopped in milliseconds. Defaults to 5000 (5 seconds).
     */
    private long executorTerminationTimeoutMillis = 5000;

    public long getExecutorTerminationTimeoutMillis() {return executorTerminationTimeoutMillis;}
}      

以上可以看到,在 connector.stop -> protocolHandler.stop -> endpoint.stop -> NioEndpoint.stopInternal -> endpoint.shutdownExecutor 里,线程池直接粗暴的 shutdownNow 来停止任务。java 线程池的 shutdownNow 会尝试 interrupt 线程池中正在执行的线程,等待执行的线程也会被取消,但是并不能保证一定能成功的 interrupt 线程池中的线程,所有 tomcat 通过 awaitTermination 等待 5 秒,尽量保证停止所有的任务。

从上面的分析可以看到,原因主要是线程池被 Tomcat 暴力 shutdown。在第一章里讲到我们处理这个问题,需要做两步:

  1. 暂停接收新请求。
  2. 等待所有任务都执行完成再关闭线程池。

要完成这两步的关键是:

  1. 获取到 ConnectorConnector.pause 方法可以暂停接收请求。
  2. 获取到 线程池

Connector

在 Tomcat 架构中,Connector 主要负责处理与客户端的通信。Connector 的实例用于监听端口,接受来自客户端的请求并将请求转交给 Engine 处理,同时将来自 Engine 的答复返回给客户端:

  1. 监听网络端口
  2. 接受网络连接请求
  3. 读取请求网络字节流
  4. 根据具体应用层协议 (HTTP/AJP) 解析字节流,生成统一的 Tomcat Request 对象
  5. 将 Tomcat Request 对象转成标准的 ServletRequest
  6. 调用 Servlet 容器,得到 ServletResponse
  7. 将 ServletResponse 转成 Tomcat Response 对象
  8. 将 Tomcat Response 对象转成网络字节流
  9. 将响应字节流写回服务器。

现在我们看下 SpringBoot 生成 Tomcat 的步骤。

@Configuration
class ServletWebServerFactoryConfiguration {

    @Configuration
    @ConditionalOnClass({Servlet.class, Tomcat.class, UpgradeProtocol.class})
    @ConditionalOnMissingBean(value = ServletWebServerFactory.class, search = SearchStrategy.CURRENT)
    public static class EmbeddedTomcat {

        @Bean
        public TomcatServletWebServerFactory tomcatServletWebServerFactory() {return new TomcatServletWebServerFactory();
        }

    }

    /**
     * Nested configuration if Jetty is being used.
     */
    @Configuration
    @ConditionalOnClass({ Servlet.class, Server.class, Loader.class,
            WebAppContext.class })
    @ConditionalOnMissingBean(value = ServletWebServerFactory.class, search = SearchStrategy.CURRENT)
    public static class EmbeddedJetty {

        @Bean
        public JettyServletWebServerFactory JettyServletWebServerFactory() {return new JettyServletWebServerFactory();
        }

    }

    /**
     * Nested configuration if Undertow is being used.
     */
    @Configuration
    @ConditionalOnClass({Servlet.class, Undertow.class, SslClientAuthMode.class})
    @ConditionalOnMissingBean(value = ServletWebServerFactory.class, search = SearchStrategy.CURRENT)
    public static class EmbeddedUndertow {

        @Bean
        public UndertowServletWebServerFactory undertowServletWebServerFactory() {return new UndertowServletWebServerFactory();
        }

    }

}

可以看到,生成的是 TomcatServletWebServerFactoryTomcatServletWebServerFactory 有个 createWebServer 方法来生成 Tomcat,那这个方法在什么时候被调用?

public class ServletWebServerApplicationContext extends GenericWebApplicationContext
  implements ConfigurableWebServerApplicationContext {
    @Override
    protected void onRefresh() {super.onRefresh();
        try {createWebServer();
        }
        catch (Throwable ex) {throw new ApplicationContextException("Unable to start web server", ex);
        }
    }
    
    private void createWebServer() {
        WebServer webServer = this.webServer;
        ServletContext servletContext = getServletContext();
        if (webServer == null && servletContext == null) {ServletWebServerFactory factory = getWebServerFactory();
            this.webServer = factory.getWebServer(getSelfInitializer());
        }
        else if (servletContext != null) {
            try {getSelfInitializer().onStartup(servletContext);
            }
            catch (ServletException ex) {
                throw new ApplicationContextException("Cannot initialize servlet context",
                        ex);
            }
        }
        initPropertySources();}
}    

可以看到 springboot 的容器类 ServletWebServerApplicationContext 在系统启动时 (onRefresh),createWebServer 方法会调用 - ServletWebServerFactory.getWebServer`。

public class TomcatServletWebServerFactory extends AbstractServletWebServerFactory
  implements ConfigurableTomcatWebServerFactory, ResourceLoaderAware {
    @Override
    public WebServer getWebServer(ServletContextInitializer... initializers) {Tomcat tomcat = new Tomcat();
        File baseDir = (this.baseDirectory != null) ? this.baseDirectory
                : createTempDir("tomcat");
        tomcat.setBaseDir(baseDir.getAbsolutePath());
        Connector connector = new Connector(this.protocol);
        tomcat.getService().addConnector(connector);
        customizeConnector(connector);
        tomcat.setConnector(connector);
        tomcat.getHost().setAutoDeploy(false);
        configureEngine(tomcat.getEngine());
        for (Connector additionalConnector : this.additionalTomcatConnectors) {tomcat.getService().addConnector(additionalConnector);
        }
        prepareContext(tomcat.getHost(), initializers);
        return getTomcatWebServer(tomcat);
    }
}        

进入 getWebServer 看到是在构建 Tomcat 对象,其中customizeConnector(connector) 这一步有对 Connector 的处理。

public class TomcatServletWebServerFactory extends AbstractServletWebServerFactory
    implements ConfigurableTomcatWebServerFactory, ResourceLoaderAware {protected void customizeConnector(Connector connector) {int port = (getPort() >= 0) ? getPort() : 0;
        connector.setPort(port);
        if (StringUtils.hasText(this.getServerHeader())) {connector.setAttribute("server", this.getServerHeader());
        }
        if (connector.getProtocolHandler() instanceof AbstractProtocol) {customizeProtocol((AbstractProtocol<?>) connector.getProtocolHandler());
        }
        if (getUriEncoding() != null) {connector.setURIEncoding(getUriEncoding().name());
        }
        // Don't bind to the socket prematurely if ApplicationContext is slow to start
        connector.setProperty("bindOnInit", "false");
        if (getSsl() != null && getSsl().isEnabled()) {customizeSsl(connector);
        }
        TomcatConnectorCustomizer compression = new CompressionConnectorCustomizer(getCompression());
        compression.customize(connector);
        for (TomcatConnectorCustomizer customizer : this.tomcatConnectorCustomizers) {customizer.customize(connector);
        }
    }
    
    public void addContextCustomizers(TomcatContextCustomizer... tomcatContextCustomizers) {
        Assert.notNull(tomcatContextCustomizers,
                "TomcatContextCustomizers must not be null");
        this.tomcatContextCustomizers.addAll(Arrays.asList(tomcatContextCustomizers));
    }
}

通过以上代码,我们了解到:可以自定义 TomcatContextCustomizer,通过addContextCustomizers 方法添加;而 customizeConnector 方法里,会遍历所有的 TomcatContextCustomizer 对象,通过 customizer.customize(connector) 方法来传入 Connector 对象。

线程池

在上一步已经可以获取到 Connector 的基础上,要获取到线程池已经很简单了:connector -> protocolHandler -> endpoint -> getExecutor()

附加:

Spring 事件处理

public abstract class AbstractApplicationContext extends DefaultResourceLoader
  implements ConfigurableApplicationContext {protected void publishEvent(Object event, @Nullable ResolvableType eventType) {Assert.notNull(event, "Event must not be null");

        // Decorate event as an ApplicationEvent if necessary
        ApplicationEvent applicationEvent;
        if (event instanceof ApplicationEvent) {applicationEvent = (ApplicationEvent) event;
        }
        else {applicationEvent = new PayloadApplicationEvent<>(this, event);
            if (eventType == null) {eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType();}
        }

        // Multicast right now if possible - or lazily once the multicaster is initialized
        if (this.earlyApplicationEvents != null) {this.earlyApplicationEvents.add(applicationEvent);
        }
        else {getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
        }

        // Publish event via parent context as well...
        if (this.parent != null) {if (this.parent instanceof AbstractApplicationContext) {((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
            }
            else {this.parent.publishEvent(event);
            }
        }
    }
    
}        

可以看到重点在 getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); 这一步,其中 ApplicationEventMulticaster 默认是SimpleApplicationEventMulticaster

public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
        for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {Executor executor = getTaskExecutor();
            if (executor != null) {executor.execute(() -> invokeListener(listener, event));
            }
            else {invokeListener(listener, event);
            }
        }
    }
}

首先获取所有监听对应事件的 Listener(这一步不继续深入,逻辑也比较简单),这里executor 默认没有值,所以进入 invokeListener 方法。

public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {ErrorHandler errorHandler = getErrorHandler();
        if (errorHandler != null) {
            try {doInvokeListener(listener, event);
            }
            catch (Throwable err) {errorHandler.handleError(err);
            }
        }
        else {doInvokeListener(listener, event);
        }
    }
    
    private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
        try {listener.onApplicationEvent(event);
        }
        catch (ClassCastException ex) {String msg = ex.getMessage();
            if (msg == null || matchesClassCastMessage(msg, event.getClass())) {
                // Possibly a lambda-defined listener which we could not resolve the generic event type for
                // -> let's suppress the exception and just log a debug message.
                Log logger = LogFactory.getLog(getClass());
                if (logger.isDebugEnabled()) {logger.debug("Non-matching event type for listener:" + listener, ex);
                }
            }
            else {throw ex;}
        }
    }
}

可以看到 invokeListener 就是直接调用 listener.onApplicationEvent 方法。
所以在默认情况下(executor为空),spring 的事情处理是同步的。

正文完
 0