springcloud项目优雅重启六解决方案

62次阅读

共计 15124 个字符,预计需要花费 38 分钟才能阅读完成。

问题

回到第一章节讲到的几个问题:

  1. 业务项目实例 shutdown 时,会停止当前未完成的 REQUEST 请求。
  2. 某个业务项目实例已经停止了,但是网关仍会转发请求过去,导致请求失败。
  3. 某个业务项目实例已经重新启动了,但是网关并不会马上向这个实例转发请求;假如项目只有两个实例,如果在第一个节点刚启动完就立刻重启另外一个实例,就会导致服务不可用。

第一个问题是因为:springboot-tomcat 在系统停止时会粗暴的 shutdownNow 线程池。
第二和第三个问题的原因是一样的,因为 ribbon 缓存和 eureka 缓存导致:

  1. 项目服务节点停止后,gateway 从缓存(ribbon、eureka-client、eureka-server 三个都有可能缓存过时的数据)里还能取到该节点信息,还会访问该节点。
  2. 项目启动后,同样由于 gateway 从缓存(ribbon、eureka-client、eureka-server 三个都有可能缓存过时的数据)里还没有该节点数据,所以不会向该节点请求数据。

整体方案

所以解决问题的核心在于,在服务停止之前(严格按照顺序):

  1. 注销注册中心信息。
  2. 清除 gateway eureka-client 注册信息。
  3. 让 gateway ribbon 重新获取服务列表(刷新缓存)。
  4. tomcat 停止接收请求。
  5. 等待 tomat 处理未完成的请求。
  6. 停止 tomcat。

在服务启动且 eureka 完成注册之后(严格按照顺序):

  1. tomcat 启动完成。
  2. 注册服务。
  3. 让 gateway eureka-client 立即获取服务。
  4. 让 gateway ribbon 重新获取服务列表(刷新缓存)。

流程图

eureka-client

注销

1. 注销注册中心信息

从《项目优雅重启(三):eureka-client》可以看到,DiscoveryClient有个 shutdown 方法,并且这个方法是 public,而shutdown 方法是在接口 EurekaClient 定义的,所以我们只要 EurekaClient.shutdown 即可。
springboot 的 EurekaClientAutoConfiguration 类里会生产 EurekaClient-Bean,所以只需要注入EurekaClient 即可。

@Autowired
private EurekaClient client;

这里需要特别注意的是:eureka-client会调用 eureka-server 接口来注销信息,假如网络出了问题,或者 eureka-client 出了问题了,可能会导致请求非常慢,所以我们需要加一层保障,保证在指定的时间内还没完成注销操作,就强行中断并继续下一步。

private void unregisterEurekaData() {Thread shutDownEurakaThread = new Thread(() -> {
        try {getClient().shutdown();
            logger.info("============================== shutdown local eureka data success!");
        } catch (Exception e) {logger.error("============================== shutdown local eureka data error!", e);
        }
    });
    try {shutDownEurakaThread.start();
        shutDownEurakaThread.join(unregisterEurekaShutdownWaitSeconds * 1000); // 不能无限制等待
        if (shutDownEurakaThread.isAlive()) {logger.error("============================== shutdown local eureka doesn't compelete in allow time!");
            shutDownEurakaThread.interrupt();}
    } catch (Exception e) {logger.error("============================== shutdown local eureka data error!", e);
    }
}    

2. 刷新 gateway 缓存

可以通过 gateway 提供接口,client 调用接口的方式来刷新缓存,gateway 接口内容后面会讲到,client 这边需要做的是:

  1. 获取所有网关服务器地址。
  2. 调用所有网关的接口来刷新服务。

为了加快速度,用了线程池来并行调用,同样的如果超时未完成任务,就强行停止任务。

protected void refreshGatewayEurekaData() {if (!ifRegisterWithEureka()) {logger.error("============================== not registerWithEureka to refreshGatewayEurekaData!");
        return;
    }
    try {
        // gatewayServiceName 是 gateway 在 eureka 注册的服务名
        Application application = client.getApplication(gatewayServiceName);  
        if (null == application) {return;}

        List<InstanceInfo> instanceInfos = application.getInstances();
        if (CollectionUtils.isEmpty(instanceInfos)) {return;}
        SimpleClientHttpRequestFactory clientHttpRequestFactory = new SimpleClientHttpRequestFactory();
        clientHttpRequestFactory.setConnectTimeout(3000);
        clientHttpRequestFactory.setReadTimeout(3000);
        RestTemplate restTemplate = new RestTemplate(clientHttpRequestFactory);
        int poolSize = instanceInfos.size();
        if(poolSize > maxPoolSize) {poolSize = maxPoolSize;}
        ExecutorService executorService = Executors.newFixedThreadPool(instanceInfos.size());
        instanceInfos.forEach(i ->
                executorService.submit(() -> {String url = getRefreshGatewayUrl(i);
                    try {ResponseEntity<String> response = restTemplate.getForEntity(url, String.class);
                        if (!HttpStatus.OK.equals(response.getStatusCode())) {logger.error("============================== refresh remote gateway[{}] eureka data error:[{}]!", url, response.getBody());
                        } else {logger.info("============================== refresh remote gateway[{}] eureka data success!", url);
                        }
                    } catch (Exception e) {logger.error("============================== refresh remote gateway[{}] eureka data error!", url, e);
                    }
                })
        );
        executorService.shutdown();
        if (!executorService.awaitTermination(refreshGatewauEurekaShutdownWaitSeconds, TimeUnit.SECONDS)) {logger.error("============================== refresh remote gateway eureka data thread pool did not shut down gracefully within {} seconds. Proceeding with forceful shutdown", refreshGatewauEurekaShutdownWaitSeconds);
            if (!executorService.isShutdown()) {executorService.shutdownNow();
            }
        }
    } catch (Exception e) {logger.error("============================== refresh remote gateway eureka data!", e);
    }
}

protected String getRefreshGatewayUrl(InstanceInfo instanceInfo) {return instanceInfo.getHomePageUrl() + gatewayRefreshEurekaUrl + (gatewayRefreshEurekaUrl.indexOf('?') == -1 ? "?" : "&") + 
        "serviceId=" + applicationName.toUpperCase();}

3. 停止 tomcat

从《springcloud 项目优雅重启(五):tomcat 关闭流程》可以看到,tomcat 的关闭是个很复杂很长的流程,并且 tomcat暴力 shutdown是因为对线程池用了shutdownNow,我们只是想在项目停止时,** 在“上两步完成之后 -tomcat 停止之前”让 tomcat 不再接收请求。

  • 上文我们有个特别标注的地方:发布关闭事件 publishEvent(new ContextClosedEvent(this))是在关闭资源 onClose 之前,并且 Spring 事件处理默认是同步处理的 。所以我们可以监听ContextClosedEvent 事件,在 onClose 之前做我们想要做的事。
  • 通过 TomcatContextCustomizer 可以获取到Connector
  • 通过 Connector 可以获取到线程池。

获取到线程池之后,由于前面的步骤正常情况下已经保证不会再接收到请求,这时候只需要等线程池内所有的请求都处理之后,再让流程继续往下走。

@Configuration
@ConditionalOnProperty(name = "server.container.custom-config", havingValue = "true", matchIfMissing = true)
@AutoConfigureAfter(EurekaClientAutoConfiguration.class)
public class ContainerConfiguration {
    @Bean
    public ServletWebServerFactory servletContainer(@Autowired GracefulShutdown gracefulShutdown) {
        // springboot 是在 ServletWebServerFactoryConfiguration 类里生成 TomcatServletWebServerFactory,方式也是直接 new 一个对象。// 我们可以像这样自己 new(ServletWebServerFactoryConfiguration 里用了 ConditionalOnMissingBean),也可以注入 springboot 生成。TomcatServletWebServerFactory tomcat = new TomcatServletWebServerFactory();
        tomcat.addConnectorCustomizers(gracefulShutdown);
        return tomcat;
    }
}    
public class DefaultGracefulShutdown implements TomcatConnectorCustomizer, ApplicationListener<ContextClosedEvent> {

    @Override
    public void customize(Connector connector) {this.connector = connector;}
    
    @Override
    public void onApplicationEvent(ContextClosedEvent event) {shutdownTomcatConnector();
    }


    private void shutdownTomcatConnector() {
        try {this.connector.pause();
            logger.info("============================== after Tomcat connector pause!");
            Executor executor = this.connector.getProtocolHandler().getExecutor();
            if (executor instanceof ThreadPoolExecutor) {logger.info("============================== Tomcat thread pool begin to shut down gracefully!");
                ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
                threadPoolExecutor.shutdown();
                if (!threadPoolExecutor.awaitTermination(tomcatShutdownWaitSeconds, TimeUnit.SECONDS)) {logger.error("============================== Tomcat thread pool did not shut down gracefully within {} seconds. Proceeding with forceful shutdown", tomcatShutdownWaitSeconds);
                    if (!threadPoolExecutor.isShutdown()) {threadPoolExecutor.shutdownNow();
                    }
                }
            }
        } catch (Exception e) {logger.error("============================== shutdown Tomcat connector error!", e);
        }
    }    
}  

4. 结合
以上三步结合起来需要考虑几个细节:

  • 第一步和第二步之间:eureka-server接收到请求之后,是异步传播给集群其他节点。所以我们要等 eureka-server 都刷新了数据之后,再去让 gateway 刷新,否则 gateway 可能从一个还未更新数据的 server 节点取到未刷新的数据。但是我们并不知道 server 什么时候会刷新完,所以我们只能给个我们能容忍的等待时间。
  • 第二步和第三步之间:如果有个请求在网关刷新缓存之前获取到了服务节点,但是还没调用服务,当这个请求发送到当前服务节点时,服务已经connector.pause(),会导致调用失败。整个流程如下:
public void onApplicationEvent(ContextClosedEvent event) {logger.info("============================== begin gracefully shutdown service!");
    // 本地调试时可能会关闭 eureka 或者不注册 eureka
    if (ifRegister()) {unregisterEurekaData();
        try {
            // 等待全部 "eureka-server" 刷新数据
            TimeUnit.SECONDS.sleep(3); 
        } catch (InterruptedException ignore) { }
       refreshGatewayEurekaData();
        // 睡眠 3 秒,再停止 tomcat 接收新服务,如果有个请求在 client.shutdown()之前获取到了服务节点,但是还没调用服务,这时候如果 connector.pause(),会导致调用失败
        try {TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException ignore) {}}
    shutdownTomcatConnector();
    logger.info("============================== finish gracefully shutdown service!");
}   

启动

启动要做到事是:eureka-client 向 eureka-server 注册成功之后,调用网关接口刷新缓存。从 eureka-client 源码里面发现,client 在注册成功之后,并没有 发送事件(public event),也就是无法知道什么时候注册成功。那我们就换个思路,在项目启动完成之后,我们自己调用注册方法,这样的结果就是,eureka-client 会向 eureka-server 注册两次(自己注册一次,原先的启动流程注册一次),但是重复的注册并不会有什么影响。

public class DefaultStartRegister implements ApplicationListener<ApplicationReadyEvent> {
    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {if (ifRegister()) {register();
        }
    }
    
    public void register() {executorService.submit(() -> {
            try {EurekaClient client = getClient();
                if (InstanceInfo.InstanceStatus.UP.equals(client.getApplicationInfoManager().getInfo().getStatus())) {Method method = ReflectionUtils.findMethod(DiscoveryClient.class, "register");
                    // 非 public 方法
                    method.setAccessible(true);
                    // springcloud 下的 EurekaClient 实际上是个代理对象
                    Object target = AopUtils.isAopProxy(client) ? ProxyUtils.getTargetObject(client) : client;
                    AtomicReference<Boolean> registerResult = new AtomicReference<>(false);
                    Thread registerEurakaThread = new Thread(() -> {boolean ret = (boolean) ReflectionUtils.invokeMethod(method, target);
                        registerResult.set(ret);
                        logger.info("============================== invoke EurekaClient.register method to registerWithEureka result: {}!", ret);
                    });
                    try {registerEurakaThread.start();
                        // 不能无限制等待
                        registerEurakaThread.join(registerEurekaShutdownWaitSeconds * 1000);
                        if (registerEurakaThread.isAlive()) {logger.error("============================== register local eureka doesn't compelete in allow time!");
                            registerEurakaThread.interrupt();}
                    } catch (Exception e) {logger.error("============================== register local eureka data error!", e);
                    }
                    try {
                        // 等待全部 "eureka-server" 刷新数据
                        TimeUnit.SECONDS.sleep(3);
                    } catch (InterruptedException ignore) { }
                    if (registerResult.get()) {refreshGatewayEurekaData();
                    }
                }
            } catch (Exception e) {logger.error("register eureka after start error!", e);
            }
        });
    }
}
  • 整个步骤采用线程池来异步化,尽量减少对项目启动速度的影响。
  • eureka-client 的注册方法 DiscoveryClient.register 不是 public,所以使用了反射的方式:用反射的优点是简单明了;缺点是如果 eureka 版本升级,可能就会变得不兼容 / 不可用(一般第三方组件升级时会尽量“保留 / 兼容”public 的方法,非 public的方法就无法保证了),但是由于现在 eureka 已经停止维护了,所以也就没有这个问题了。
  • springcloud 下的 EurekaClient 实际上是个代理对象,反射的时候需要注意这个。
  • 跟启动时一样,注册完成之后,等待 eureka-server 广播完成之后再调用 gateway 接口。

eureka-server

eureka-server 的三级缓存中,registryreadWriteCacheMap 都是实时更新,而 readOnlyCacheMap 是通过定时任务定时从 readWriteCacheMap 取数据更新。

  • 如果我们通过改造 readWriteCacheMap,让它也实时更新,那就变得跟readWriteCacheMap 作用差不多了(从细节上讲,readWriteCacheMap用的是guava-cache,还是会做一些额外的事情,比如计数统计、清理)。
  • registryreadWriteCacheMapreadOnlyCacheMap 数据都是存储在 JVM 内存里,对于绝大部分的系统来说两级缓存性能已经足够了。

所以 eureka-server 端,我们只需要关闭 readOnlyCacheMap 即可:eureka.server.use-read-only-response-cache: false。虽然性能上会有下降,但是这点儿性能差别,对于大部分系统来说是没有影响的,除非超高并发的系统。

gateway

gateway 需要提供一个接口给 client 在停机和启动时调用,接口做两件事:

  1. 刷新 eureka-client 缓存。
  2. ribbon 重新从 eureka-client 里获取缓存。

1. 刷新 eureka-client 缓存

通过源码可以看到,DiscoveryClient里有个 refreshRegistry 方法会刷新缓存,但是这个方法不是public,同样需要反射调用。

2. 刷新 ribbon 缓存

由于不同的服务分别有不同的 LoadBalancer 对象,需要先获取到服务对应的 LoadBalancer,那就需要知道当前要刷新的是哪个服务,所以接口要有个参数,参数是服务在eureka-server 上注册的服务名,要特别注意的是,eureka 服务名默认是大写,所以传过来的服务名参数值也需要是大写。
同样的通过源码可以找到 RibbonLoadBalancerClient.DynamicServerListLoadBalancer.updateListOfServers 方法会刷新 ribbon 缓存,这里之所以选择从 RibbonLoadBalancerClient 作为入口,是因为 springboot 有将 RibbonLoadBalancerClient 暴露成Bean

3. 代码

public class GatewayApplication {
    @Autowired
    private EurekaClient eurekaClient;
    @Autowired
    private LoadBalancerClient loadBalancerClient;

    @Bean
    public RouterFunction<ServerResponse> routerFunction() {return RouterFunctions.route(RequestPredicates.path("/eureka/refresh"), request -> {
            try {log.info("receive eureka refresh, eureka current service count {}!", eurekaClient.getApplications().size());
                Method method = ReflectionUtils.findMethod(DiscoveryClient.class, "refreshRegistry");
                // 非 public 方法
                method.setAccessible(true); 
                // springcloud 下的 EurekaClient 实际上是个代理对象
                Object target = AopUtils.isAopProxy(eurekaClient) ? ProxyUtils.getTargetObject(eurekaClient) : eurekaClient; 
                ReflectionUtils.invokeMethod(method, target);
                log.info("finish eureka refresh, eureka current service count {}!", eurekaClient.getApplications().size());
                request.queryParam("serviceId").filter(StringUtils::isNotBlank).ifPresent(serviceId -> {RibbonLoadBalancerClient ribbonLoadBalancerClient = (RibbonLoadBalancerClient) loadBalancerClient;
                    Method ribbonMethod = ReflectionUtils.findMethod(RibbonLoadBalancerClient.class, "getLoadBalancer", String.class);
                    ribbonMethod.setAccessible(true); // 非 public 方法
                    Object ribbonTarget = AopUtils.isAopProxy(ribbonLoadBalancerClient) ? ProxyUtils.getTargetObject(ribbonLoadBalancerClient) : ribbonLoadBalancerClient;
                    DynamicServerListLoadBalancer dynamicServerListLoadBalancer = (DynamicServerListLoadBalancer) ReflectionUtils.invokeMethod(ribbonMethod, ribbonTarget, serviceId);
                    log.info("begin refresh ribbon, ribbon current service[{}] count {}!", serviceId, dynamicServerListLoadBalancer.getAllServers().size());
                    // 也可以调用 Ribbon 的 BaseLoadbalancer.markServerDown 方法来清理 Ribbon 数据
                    dynamicServerListLoadBalancer.updateListOfServers();
                    log.info("finish refresh ribbon, ribbon current service[{}] count {}!", serviceId, dynamicServerListLoadBalancer.getAllServers().size());
                });
                return ok().body(Mono.just("success"), String.class);
            } catch (Exception e) {log.info("refresh eureka error!", e);
                return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).body(Mono.just(e.getClass().getName() + ":" + e.getMessage()), String.class);
            }
        });
    }
}
  • 需要注意到是,RibbonLoadBalancerClient.getLoadBalancer返回的是接口 ILoadBalancer,由于springboot-ribbon 默认是 DynamicServerListLoadBalancer,所以直接类型强转,但是如果有哪个项目配置了自定义规则,返回的就不一定是 DynamicServerListLoadBalancer(我没试过,所以也不确定)。
  • 刷新 ribbon 这一步,也可以通过 BaseLoadbalancer.markServerDown(Server server) 方法来让指定的服务节点失效,这个方法是 public 方法。我在这里采用了比较简单粗暴的 updateListOfServers 直接刷新整个服务。

发版

jenkins 流程中,除了 重启脚本 之外,还需要做两件事:

  1. 备份当前正在运行的文件。
  2. 如果发版失败,回滚版本。

服务器要分两批启动,确保第一批有启动成功(可以提供服务)的节点之后,才能启动第二批,所以要有检查启动成功的机制。

1. 备份文件

  • 检查是否存在 backup 文件夹,如果没有则创建。
  • 将当前正在运行的 jar 文件移动到 backup 目录中,并在文件后面加上时间戳。
#!/bin/bash
service_name=$1

if [-z "$service_name"]
then
  echo "usage: ./backup-service.sh xxxx"
  exit 1
fi

base_dir="/data/app/tscm/$service_name"
backup_dir="$base_dir/backup"
if [! -d "$backup_dir"]; then
  mkdir -m 775 -p $backup_dir
fi

cd $base_dir
time=$(date "+%Y%m%d%H%M%S")
for var in ${service_name}*.jar;
do
  mv "$var" "backup/${var%.jar}_$time.jar";
done

exit 0
  • 执行命令为 sh backup-service.sh 项目名,其中打包的jar 文件名称前缀文件项目名,例如项目为my-test,则打包的文件必须为my-test{这里可以有其他字符}.jar
  • 脚本中的 /data/app/tscm/ 目录改为自己服务器中实际的目录。
  • 函数成功返回 0,失败返回 1。

2. 失败回滚

  • 在执行时间内定时循环检查服务。
  • 如果超时还未检查到服务已经启动成功,则将当前发版文件移动到 fail-backup 目录中,将 backup 目录中最新的文件(即刚备份的文件)移动回来,重新启动。
  • 项目需要提供一个检查服务的接口。
#!/bin/bash
service_name=$1
env=$2
check_url=$3

if [-z "$service_name"]
then
  echo "service_name required, usage: ./backup-service.sh xxxx dev http://localhost:8080/momnitor"
  exit 1
fi
if [-z "$env"]
then
  echo "env required, usage: ./backup-service.sh xxxx dev http://localhost:8080/momnitor"
  exit 1
fi
if [-z "$check_url"]
then
  echo "check_url required, usage: ./backup-service.sh xxxx dev http://localhost:8080/momnitor"
  exit 1
fi

base_dir="/data/app/tscm/$service_name"
fail_dir="$base_dir/fail-backup"
result=0
count=0
while [$result -ne 200 -a $count -lt 18] # 根据接口返回的 HTTP 状态码是否 200 来判断,循环 18 次
do
  sleep 10 # 睡眠 10 秒,也就是给项目最大启动时间为 18 * 10 = 180 秒 = 3 分钟
  let count+=1
  result=$(curl -sIL -w "%{http_code}\n" -o /dev/null ${check_url})
  echo $count"times curl result:"$result
done
if [$result -eq 200]; then
  exit 0
else
  if [! -d "$fail_dir"]; then
    mkdir -m 775 -p $fail_dir
  fi
  cd $base_dir
  time=$(date "+%Y%m%d%H%M%S")
  for var in ${service_name}*.jar;
  do
    mv "$var" "fail-backup/${var%.jar}_$time.jar";
  done
  cd $base_dir/backup
  filename=`ls -t |head -n1|awk '{print $0}'`
  echo "失败回滚文件:$filename"
  mv "$filename" `echo "../$filename" |awk -F '_' '{print $1".jar"}'`
  cd ../..
  sh run-service.sh $service_name $env # 重新启动
  exit 1
fi  
  • 执行命令为sh check-rollback-service.sh 项目名 spring 环境 项目节点检查地址,例如:sh check-rollback-service.sh mytest dev http://localhost:6101/base/monitor
  • 脚本中的 /data/app/tscm/ 目录改为自己服务器中实际的目录。
  • run-service.sh 项目重启 脚本,做两件事:

    1. 根据项目名查找 jvm 进程,并kill
    2. 根据项目名查找 jar 文件并启动。
  • 函数成功返回 0,失败返回 1。

Feign

前面的处理方案是刷新网关的的缓存,但是项目组内有部分项目用了feign,导致方案对这些项目无效。

方案

feigngateway 一样,也是经过 robbineureka-client两层处理,但是项目重启时,不可能像通知网关那样 通过接口同步 通知所有节点刷新本地缓存。
可以用消息通知的方式通知其他项目,但是这样有两个问题:

  1. 其他项目还得引入 MQ/Redis 等可以接收通知的中间件。
  2. 如果用消息通知的方式,相当于是异步通知,就无法知道其他项目是否已经完成刷新缓存,不过这个也可以通过增加 睡眠等待,来尽量保证其他项目已完成刷新。

鉴于通知的方式比较复杂,那是不是可以让 feign 走网关请求,而不是直接点对点请求。
通过 API 可以看到 FeignClient 配置里有 urlpath两个参数,当指定 url 参数时,feign就不会再走 robbin 获取服务节点,而是直接发起请求,所以只需要做很小的调整。
调整前:

@FeignClient(value = "tscm-service-purchase-facader")

调整后:

@FeignClient(value = "tscm-service-purchase-facader", url = "${gateway.host}", path="/tscm-service-purchase-facader")

@FeignClient(value = "tscm-service-purchase-facader", url = "${gateway.host}/tscm-service-purchase-facader")

正文完
 0