集体创作公约:自己申明创作的所有文章皆为本人原创,如果有参考任何文章的中央,会标注进去,如果有疏漏,欢送大家批评。如果大家发现网上有剽窃本文章的,欢送举报,并且踊跃向这个 github 仓库 提交 issue,谢谢反对~
咱们晓得从 Spring Boot 2.3.x 这个版本开始,引入了优雅敞开的机制。咱们也在线上部署了这个机制,来减少用户体验。尽管当初大家基本上都通过最终一致性,以及事务等机制,来保障了就算非优雅敞开,也能够放弃业务正确。然而,这样总会带来短时间的数据不统一,影响用户体验。所以,引入优雅敞开,保障以后申请解决完,再开始 Destroy 所有 ApplicationContext 中的 Bean。
优雅敞开存在的问题
ApplicationContext 的敞开过程简略来说分为以下几个步骤(对应源码 AbstractApplicationContext 的 doClose 办法):
- 勾销以后 ApplicationContext 在 LivBeanView 的注册(目前其实只蕴含从 JMX 上勾销注册)
- 公布 ContextClosedEvent 事件,同步解决所有这个事件的 Listener
- 解决所有实现 Lifecycle 接口的 Bean,解析他们的敞开程序,并调用他们的 stop 办法
- Destroy 所有 ApplicationContext 中的 Bean
- 敞开 BeanFactory
简略了解优雅敞开,其实就是在下面的第三步中退出优雅敞开的逻辑实现的 Lifecycle,包含如下两步:
- 切断内部流量入口:具体点说就是让 Spring Boot 的 Web 容器间接回绝所有新收到的申请,不再解决新申请,例如间接返回 503.
- 期待承载的 Dispatcher 的线程池解决完所有申请:对于同步的 Servlet 过程其实就是解决 Servlet 申请的线程池,对于异步响应式的 WebFlux 过程其实就是所有 Web 申请的 Reactor 线程池解决完以后所有 Publisher 公布的事件。
首先,切断内部流量入口保障不再有新的申请到来,线程池解决完所有申请之后,失常的业务逻辑也是失常走完的,在这之后就能够开始敞开其余各种元素了。
然而,咱们首先要保障,优雅敞开的逻辑,须要在所有的 Lifecycle 的第一个最保险。这样保障肯定所有申请解决完,才会开始 stop 其余的 Lifecycle。如果不这样会有啥问题呢?举个例子,例如某个 Lifecycle 是负载均衡器的,stop 办法会敞开负载均衡器,如果这个 Lifecycle 在优雅敞开的 Lifecycle 的 stop 之前进行 stop,那么可能会造成某些在 负载均衡器 stop 后还没解决完的申请,并且这些申请须要应用负载均衡器调用其余微服务,执行失败。
优雅敞开还有另一个问题就是,默认的优雅敞开性能不是那么全面,有时候咱们须要在此基础上,增加更多的敞开逻辑。例如,你的我的项目中不止 有 web 容器解决申请的线程池,你本人还应用了其余线程池,并且线程池可能还比较复杂,一个向另一个提交,相互提交,各种提交等等,咱们须要在 web 容器解决申请的线程池解决完所有申请后,再期待这些线程池的执行完所有申请后再敞开。还有一个例子就是针对 MQ 消费者的,当优雅敞开时,其实应该进行生产新的音讯,期待以后所有音讯解决完。这些问题能够看下图:
源码剖析接入点 - Spring Boot + Undertow & 同步 Servlet 环境
咱们从源码触发,剖析在 Spring Boot 中应用 Undertow 作为 Web 容器并且是同步 Servlet 环境下,如果接入自定义的机制。首先,在引入 spring boot 相干依赖并且配置好优雅敞开之后:
pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <!--不应用默认的 tomcat 容器--> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> </exclusion> </exclusions></dependency><!--应用 undertow 容器--><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-undertow</artifactId></dependency>
application.yml
server: # 设置敞开形式为优雅敞开 shutdown: graceful management: endpoint: health: show-details: always # actuator 裸露 /actuator/shutdown 接口用于敞开(因为这里开启了优雅敞开所以其实是优雅敞开) shutdown: enabled: true endpoints: jmx: exposure: exclude: '*' web: exposure: include: '*'
在设置敞开形式为优雅敞开之后,Spring Boot 启动时,在创立基于 Undertow 实现的 WebServer 的时候,会增加优雅敞开的 Handler,参考源码:
UndertowWebServerFactoryDelegate
static List<HttpHandlerFactory> createHttpHandlerFactories(Compression compression, boolean useForwardHeaders, String serverHeader, Shutdown shutdown, HttpHandlerFactory... initialHttpHandlerFactories) { List<HttpHandlerFactory> factories = new ArrayList<>(Arrays.asList(initialHttpHandlerFactories)); if (compression != null && compression.getEnabled()) { factories.add(new CompressionHttpHandlerFactory(compression)); } if (useForwardHeaders) { factories.add(Handlers::proxyPeerAddress); } if (StringUtils.hasText(serverHeader)) { factories.add((next) -> Handlers.header(next, "Server", serverHeader)); } //如果指定了优雅敞开,则增加 gracefulShutdown if (shutdown == Shutdown.GRACEFUL) { factories.add(Handlers::gracefulShutdown); } return factories;}
增加的这个 Handler 就是 Undertow 的 GracefulShutdownHandler
,GracefulShutdownHandler
是一个 HttpHandler
,这个接口很简略:
public interface HttpHandler { void handleRequest(HttpServerExchange exchange) throws Exception;}
其实就是对于收到的每个 HTTP 申请,都会通过每个 HttpHandler 的 handleRequest 办法。GracefulShutdownHandler 的实现思路也很简略,既然每个申请都会通过这个类的 handleRequest 办法,那么我就在收到申请的时候将一个原子计数器原子 + 1,申请解决完后(留神是返回响应之后,不是办法返回,因为申请可能是异步的,所以这个做成了回调),将原子计数器原子 - 1,如果这个计数器为零,就证实没有任何正在解决的申请了。源码是:
GracefulShutdownHandler
:
@Overridepublic void handleRequest(HttpServerExchange exchange) throws Exception { //原子更新,申请计数器加一,返回的 snapshot 是蕴含是否敞开状态位的数字 long snapshot = stateUpdater.updateAndGet(this, incrementActive); //通过状态位判断是否正在敞开 if (isShutdown(snapshot)) { //如果正在敞开,间接申请数原子减一 decrementRequests(); //设置响应码为 503 exchange.setStatusCode(StatusCodes.SERVICE_UNAVAILABLE); //标记申请实现 exchange.endExchange(); //间接返回,不持续走其余的 HttpHandler return; } //增加申请实现时候的 listener,这个在申请实现返回响应时会被调用,将计数器原子减一 exchange.addExchangeCompleteListener(listener); //持续走下一个 HttpHandler next.handleRequest(exchange);}
那么,是什么时候调用的这个敞开呢?后面咱们说过 ApplicationContext 的敞开过程的第三步:解决所有实现 Lifecycle 接口的 Bean,解析他们的敞开程序,并调用他们的 stop 办法,其实优雅敞开就在这里被调用。当 Spring Boot + Undertow & 同步 Servlet 环境启动时,到了创立 WebServer 这一步,会创立一个优雅敞开的 Lifecycle,对应源码:
ServletWebServerApplicationContext
private void createWebServer() { WebServer webServer = this.webServer; ServletContext servletContext = getServletContext(); if (webServer == null && servletContext == null) { StartupStep createWebServer = this.getApplicationStartup().start("spring.boot.webserver.create"); ServletWebServerFactory factory = getWebServerFactory(); createWebServer.tag("factory", factory.getClass().toString()); this.webServer = factory.getWebServer(getSelfInitializer()); createWebServer.end(); //就是这里,创立一个 WebServerGracefulShutdownLifecycle 并注册到以后 ApplicationContext 的 BeanFactory 中 getBeanFactory().registerSingleton("webServerGracefulShutdown", new WebServerGracefulShutdownLifecycle(this.webServer)); getBeanFactory().registerSingleton("webServerStartStop", new WebServerStartStopLifecycle(this, this.webServer)); } else if (servletContext != null) { try { getSelfInitializer().onStartup(servletContext); } catch (ServletException ex) { throw new ApplicationContextException("Cannot initialize servlet context", ex); } } initPropertySources();}
后面说到, ApplicationContext 的敞开过程的第三步调用所有 Lifecycle 的 stop 办法,这里即 WebServerGracefulShutdownLifecycle 中的 stop 办法:
WebServerGracefulShutdownLifecycle
@Overridepublic void stop(Runnable callback) { this.running = false; this.webServer.shutDownGracefully((result) -> callback.run());}
这里的 webServer,因为咱们应用的是 Undertow,对应实现就是 UndertowWebServer,看一下他的 shutDownGracefully 实现:
UndertowWebServer
//这里的这个 GracefulShutdownHandler 就是后面说的在启动时加的 GracefulShutdownHandlerprivate volatile GracefulShutdownHandler gracefulShutdown;@Overridepublic void shutDownGracefully(GracefulShutdownCallback callback) { // 如果 GracefulShutdownHandler 不为 null,证实开启了优雅敞开(server.shutdown=graceful) if (this.gracefulShutdown == null) { //为 null,就证实没开启优雅敞开,什么都不等 callback.shutdownComplete(GracefulShutdownResult.IMMEDIATE); return; } //开启优雅敞开,须要期待申请解决完 logger.info("Commencing graceful shutdown. Waiting for active requests to complete"); this.gracefulShutdownCallback.set(callback); //调用 GracefulShutdownHandler 的 shutdown 进行优雅敞开 this.gracefulShutdown.shutdown(); //调用 GracefulShutdownHandler 的 addShutdownListener 增加敞开后调用的操作,这里是调用 notifyGracefulCallback //其实就是调用办法参数的 callback(就是内部的回调) this.gracefulShutdown.addShutdownListener((success) -> notifyGracefulCallback(success));}private void notifyGracefulCallback(boolean success) { GracefulShutdownCallback callback = this.gracefulShutdownCallback.getAndSet(null); if (callback != null) { if (success) { logger.info("Graceful shutdown complete"); callback.shutdownComplete(GracefulShutdownResult.IDLE); } else { logger.info("Graceful shutdown aborted with one or more requests still active"); callback.shutdownComplete(GracefulShutdownResult.REQUESTS_ACTIVE); } }}
再看下 GracefulShutdownHandler 的 shutdown 办法以及 addShutdownListener 办法:
GracefulShutdownHandler
:
public void shutdown() { //设置敞开状态位,并原子 + 1 stateUpdater.updateAndGet(this, incrementActiveAndShutdown); //间接申请数原子减一 decrementRequests();}private void decrementRequests() { long snapshot = stateUpdater.updateAndGet(this, decrementActive); // Shutdown has completed when the activeCount portion is zero, and shutdown is set. //如果与 敞开状态位 MASK 齐全相等,证实其余位都是 0,证实残余解决中的申请数量为 0 if (snapshot == SHUTDOWN_MASK) { //调用 shutdownComplete shutdownComplete(); }}private void shutdownComplete() { synchronized (lock) { lock.notifyAll(); //调用每个 ShutdownListener 的 shutdown 办法 for (ShutdownListener listener : shutdownListeners) { listener.shutdown(true); } shutdownListeners.clear(); }}/** * 这个办法并不只是字面意思,首先如果不是敞开中不能增加 ShutdownListener * 而后如果没有申请了,就间接调用传入的 shutdownListener 的 shutdown 办法 * 如果还有申请,则增加入 shutdownListeners,等其余调用 shutdownComplete 的时候遍历 shutdownListeners 调用 shutdown * lock 次要为了 addShutdownListener 与 shutdownComplete 对 shutdownListeners 的拜访平安 * lock 的 wait notify 次要为了实现 awaitShutdown 机制,咱们这里没有提 */public void addShutdownListener(final ShutdownListener shutdownListener) { synchronized (lock) { if (!isShutdown(stateUpdater.get(this))) { throw UndertowMessages.MESSAGES.handlerNotShutdown(); } long count = activeCount(stateUpdater.get(this)); if (count == 0) { shutdownListener.shutdown(true); } else { shutdownListeners.add(shutdownListener); } } }
这就是优雅敞开的底层原理,然而咱们还没有剖析分明 ApplicationContext 的敞开过程的第三步以及优雅敞开与其余 Lifecycle Bean 的 stop 先后顺序,咱们这里来理清一下,首先咱们看一下 Smart
开始敞开 Lifecycle Bean 的入口:
DefaultLifecycleProcessor
private void stopBeans() { //读取所有的 Lifecycle bean,返回的是一个 LinkedHashMap,遍历它的程序和放入的程序一样 //放入的程序就是从 BeanFactory 读取所有 Lifecycle 的 Bean 的返回程序,这个和 Bean 加载程序无关,不太可控,可能这个版本加载程序降级一个版本就变了 Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans(); //依照每个 Lifecycle 的 Phase 值进行分组 //如果实现了 Phased 接口就通过其 phase 办法返回得出 phase 值 //如果没有实现 Phased 接口则认为 Phase 是 0 Map<Integer, LifecycleGroup> phases = new HashMap<>(); lifecycleBeans.forEach((beanName, bean) -> { int shutdownPhase = getPhase(bean); LifecycleGroup group = phases.get(shutdownPhase); if (group == null) { group = new LifecycleGroup(shutdownPhase, this.timeoutPerShutdownPhase, lifecycleBeans, false); phases.put(shutdownPhase, group); } group.add(beanName, bean); }); //如果不为空,证实有须要敞开的 Lifecycle,开始敞开 if (!phases.isEmpty()) { //依照 Phase 值倒序 List<Integer> keys = new ArrayList<>(phases.keySet()); keys.sort(Collections.reverseOrder()); //挨个敞开 for (Integer key : keys) { phases.get(key).stop(); } }}
总结起来,其实就是:
- 获取以后 ApplicationContext 的 Beanfactory 中的所有实现了 Lifecycle 接口的 Bean。
- 读取每个 Bean 的 Phase 值,如果这个 Bean 实现了 Phased 接口,就取接口办法返回的值,如果没有实现就是 0.
- 依照 Phase 值将 Bean 分组
- 依照 Phase 值从大到小的程序,顺次遍历每组进行敞开
- 具体敞开每组的逻辑咱们就不具体看代码了,晓得敞开的时候其实还看了以后这个 Lifecycle 的 Bean 是否还依赖了其余的 Lifecycle 的 Bean,如果依赖了,优先关掉被依赖的 Lifecycle Bean
咱们来看下后面提到的优雅敞开相干的 WebServerGracefulShutdownLifecycle
的 Phase 是:
class WebServerGracefulShutdownLifecycle implements SmartLifecycle { ....}
SmartLifecycle 蕴含了 Phased 接口以及默认实现:
public interface SmartLifecycle extends Lifecycle, Phased { int DEFAULT_PHASE = Integer.MAX_VALUE; @Override default int getPhase() { return DEFAULT_PHASE; }}
能够看出,只有实现了 SmartLifecycle,Phase 默认就是最大值。所以优雅敞开的 Lifecycle: WebServerGracefulShutdownLifecycle
的 Phase 就是最大值,也就是属于最先被敞开的那一组。
总结接入点 - Spring Boot + Undertow & 同步 Servlet 环境
1. 接入点一 - 通过增加实现 SmartLifecycle 接口的 Bean,指定 Phase 比 WebServerGracefulShutdownLifecycle 的 Phase 小
后面的剖析中,咱们曾经晓得了:WebServerGracefulShutdownLifecycle
的 Phase 就是最大值,也就是属于最先被敞开的那一组。咱们想要实现的是在这之后退出一些优雅敞开的逻辑,同时在 Destroy Bean (后面提到的 ApplicationContext 敞开的第四步)之前(即 Bean 销毁之前,某些 Bean 销毁中就不能用了,比方微服务调用中的一些 Bean,这时候如果还有工作没实现调用他们就会报异样)。那咱们首先想到的就是退出一个 Phase 在这时候的 Lifecycle,在外面实现咱们的优雅敞开接入,例如:
@Log4j2@Componentpublic class BizThreadPoolShutdownLifecycle implements SmartLifecycle { private volatile boolean running = false; @Override public int getPhase() { //在 WebServerGracefulShutdownLifecycle 那一组之后 return SmartLifecycle.DEFAULT_PHASE - 1; } @Override public void start() { this.running = true; } @Override public void stop() { //在这里编写的优雅敞开逻辑 this.running = false; } @Override public boolean isRunning() { return running; }}
这样实现兼容性比拟好,并且降级底层框架依赖版本基本上不必批改。然而问题就是,可能会引入某个框架外面带 Lifecycle bean,尽管他的 Phase 是正确的,小于 WebServerGracefulShutdownLifecycle 的,然而 SmartLifecycle.DEFAULT_PHASE - 1 即等于咱们自定义的 Lifecyce, 并且这个正好是须要期待咱们的优雅敞开完结再敞开的,并且因为 Bean 加载程序问题导致框架的 Lifecycle 又跑到了咱们自定义的 Lifecycle 前进行 stop。这样就会有问题,然而问题呈现的概率并不大。
2. 接入点二 - 通过反射向 Undertow 的 GracefulShutdownHandler 的 List<ShutdownListener> shutdownListeners
中增加 ShutdownListener 实现
这种实现形式,很显著,限定了容器必须是 undertow,并且可能降级的兼容性不好。然而能够在 Http 线程池优雅敞开后立即执行咱们的优雅敞开逻辑,不必放心引入某个依赖导致咱们自定义的优雅敞开程序有问题。与第一种孰优孰劣,请大家自行判断,简略实现是:
@Log4j2@Componenet//仅在蕴含 Undertow 这个类的时候加载@ConditionalOnClass(name = "io.undertow.Undertow")public class ThreadPoolFactoryGracefulShutDownHandler implements ApplicationListener<ApplicationEvent> { //获取操作 UndertowWebServer 的 gracefulShutdown 字段的句柄 private static VarHandle undertowGracefulShutdown; //获取操作 GracefulShutdownHandler 的 shutdownListeners 字段的句柄 private static VarHandle undertowShutdownListeners; static { try { undertowGracefulShutdown = MethodHandles .privateLookupIn(UndertowWebServer.class, MethodHandles.lookup()) .findVarHandle(UndertowWebServer.class, "gracefulShutdown", GracefulShutdownHandler.class); undertowShutdownListeners = MethodHandles .privateLookupIn(GracefulShutdownHandler.class, MethodHandles.lookup()) .findVarHandle(GracefulShutdownHandler.class, "shutdownListeners", List.class); } catch (Exception e) { log.warn("ThreadPoolFactoryGracefulShutDownHandler undertow not found, ignore fetch var handles"); } } @Override public void onApplicationEvent(ApplicationEvent event) { //仅解决 WebServerInitializedEvent 事件,这个是在 WebServer 创立并初始化实现后收回的事件 if (event instanceof WebServerInitializedEvent) { WebServer webServer = ((WebServerInitializedEvent) event).getWebServer(); //查看以后的 web 容器是否是 UnderTow 的 if (webServer instanceof UndertowWebServer) { GracefulShutdownHandler gracefulShutdownHandler = (GracefulShutdownHandler) undertowGracefulShutdown.getVolatile(webServer); //如果启用了优雅敞开,则 gracefulShutdownHandler 不为 null if (gracefulShutdownHandler != null) { var shutdownListeners = (List<GracefulShutdownHandler.ShutdownListener>) undertowShutdownListeners.getVolatile(gracefulShutdownHandler); shutdownListeners.add(shutdownSuccessful -> { if (shutdownSuccessful) { //增加你的优雅敞开逻辑 } else { log.info("ThreadPoolFactoryGracefulShutDownHandler-onApplicationEvent shutdown failed"); } }); } } } }}
如何实现额定线程池的优雅敞开
当初咱们晓得如何接入了,那么针对我的项目中的自定义线程池,如何把他们敞开呢?首先必定是要先拿到所有要查看的线程池,这个不同环境形式不同,实现也比较简单,这里不再赘述,咱们假如拿到了所有线程池,并且线程池只有以下两种实现(其实就是 JDK 中的两种线程池,疏忽定时工作线程池 ScheduledThreadPoolExecutor):
java.util.concurrent.ThreadPoolExecutor
:最罕用的线程池java.util.concurrent.ForkJoinPool
:ForkJoin 模式的线程池
针对这两种线程池如何判断他们是否曾经没有工作在执行了呢?参考代码:
public static boolean isCompleted(ExecutorService executorService) { if (executorService instanceof ThreadPoolExecutor) { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService; //对于 ThreadPoolExecutor,就是判断没有任何 active 的线程了 return threadPoolExecutor.getActiveCount() == 0; } else if (executorService instanceof ForkJoinPool) { //对于 ForkJoinPool,简单一些,就是判断既没有沉闷线程,也没有运行的线程,队列外面也没有任何工作并且并没有任何期待提交的工作 ForkJoinPool forkJoinPool = (ForkJoinPool) executorService; return forkJoinPool.getActiveThreadCount() == 0 && forkJoinPool.getRunningThreadCount() == 0 && forkJoinPool.getQueuedTaskCount() == 0 && forkJoinPool.getQueuedSubmissionCount() == 0; } return true;}
如何判断所有线程池都没有工作了呢?因为理论利用可能很放飞自我,比方线程池 A 可能提交工作到线程池 B,线程池 B 有可能提交工作到线程池 C,线程池 C 又有可能提交工作给 A 和 B,所以如果咱们顺次遍历一轮所有线程池发现下面这个办法 isCompleted 都返回 true,也是不能保障所有线程池肯定运行完了的(比方我顺次查看 A,B,C,查看到 C 的时候,C 又提交工作到了 A 和 B 并完结,C 查看发现工作都实现了,然而之前查看过的 A,B 又有了工作未实现)。所以我的解决办法是:打乱所有线程池,遍历,查看每个线程池是否实现,如果查看发现都实现则计数器加 1,只有有未实现的就不加并清零计数器。一直循环,每次循环 sleep 1 秒,直到计数器为 3(也就是间断三次按随机程序查看所有线程池都没有任何工作):
List<ExecutorService> executorServices = 获取所有线程池for (int i = 0; i < 3; ) { //间断三次,以随机乱序查看所有的线程池都实现了,才认为是真正实现 Collections.shuffle(executorServices); if (executorServices.stream().allMatch(ThreadPoolFactory::isCompleted)) { i++; log.info("all threads pools are completed, i: {}", i); } else { //间断三次 i = 0; log.info("not all threads pools are completed, wait for 1s"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException ignored) { } }}
RocketMQ-spring-starter 中是如何解决的
rocketmq 的官网 spring boot starter:https://github.com/apache/roc...
其中是采纳咱们这里说的第一种接入点形式,将消费者容器做成 SmartLifcycle(Phase 为最大值,属于最优先的敞开组),在外面退出敞开逻辑:
DefaultRocketMQListenerContainer
@Overridepublic int getPhase() { // Returning Integer.MAX_VALUE only suggests that // we will be the first bean to shutdown and last bean to start return Integer.MAX_VALUE;}@Overridepublic void stop(Runnable callback) { stop(); callback.run();}@Overridepublic void stop() { if (this.isRunning()) { if (Objects.nonNull(consumer)) { //敞开消费者 consumer.shutdown(); } setRunning(false); }}
微信搜寻“我的编程喵”关注公众号,加作者微信,每日一刷,轻松晋升技术,斩获各种offer:
我会常常发一些很好的各种框架的官网社区的新闻视频材料并加上集体翻译字幕到如下地址(也包含下面的公众号),欢送关注:
- 知乎:https://www.zhihu.com/people/...
- B 站:https://space.bilibili.com/31...