共计 15124 个字符,预计需要花费 38 分钟才能阅读完成。
问题
回到第一章节讲到的几个问题:
- 业务项目实例 shutdown 时,会停止当前未完成的 REQUEST 请求。
- 某个业务项目实例已经停止了,但是网关仍会转发请求过去,导致请求失败。
- 某个业务项目实例已经重新启动了,但是网关并不会马上向这个实例转发请求;假如项目只有两个实例,如果在第一个节点刚启动完就立刻重启另外一个实例,就会导致服务不可用。
第一个问题是因为:springboot-tomcat 在系统停止时会粗暴的 shutdownNow
线程池。
第二和第三个问题的原因是一样的,因为 ribbon 缓存和 eureka 缓存导致:
- 项目服务节点停止后,gateway 从缓存(ribbon、eureka-client、eureka-server 三个都有可能缓存过时的数据)里还能取到该节点信息,还会访问该节点。
- 项目启动后,同样由于 gateway 从缓存(ribbon、eureka-client、eureka-server 三个都有可能缓存过时的数据)里还没有该节点数据,所以不会向该节点请求数据。
整体方案
所以解决问题的核心在于,在服务停止之前(严格按照顺序):
- 注销注册中心信息。
- 清除 gateway eureka-client 注册信息。
- 让 gateway ribbon 重新获取服务列表(刷新缓存)。
- tomcat 停止接收请求。
- 等待 tomat 处理未完成的请求。
- 停止 tomcat。
在服务启动且 eureka 完成注册之后(严格按照顺序):
- tomcat 启动完成。
- 注册服务。
- 让 gateway eureka-client 立即获取服务。
- 让 gateway ribbon 重新获取服务列表(刷新缓存)。
流程图
eureka-client
注销
1. 注销注册中心信息
从《项目优雅重启(三):eureka-client》可以看到,DiscoveryClient
有个 shutdown
方法,并且这个方法是 public
,而shutdown
方法是在接口 EurekaClient
定义的,所以我们只要 EurekaClient.shutdown
即可。
springboot 的 EurekaClientAutoConfiguration
类里会生产 EurekaClient-Bean
,所以只需要注入EurekaClient
即可。
@Autowired
private EurekaClient client;
这里需要特别注意的是:eureka-client
会调用 eureka-server
接口来注销信息,假如网络出了问题,或者 eureka-client
出了问题了,可能会导致请求非常慢,所以我们需要加一层保障,保证在指定的时间内还没完成注销操作,就强行中断并继续下一步。
private void unregisterEurekaData() {Thread shutDownEurakaThread = new Thread(() -> {
try {getClient().shutdown();
logger.info("============================== shutdown local eureka data success!");
} catch (Exception e) {logger.error("============================== shutdown local eureka data error!", e);
}
});
try {shutDownEurakaThread.start();
shutDownEurakaThread.join(unregisterEurekaShutdownWaitSeconds * 1000); // 不能无限制等待
if (shutDownEurakaThread.isAlive()) {logger.error("============================== shutdown local eureka doesn't compelete in allow time!");
shutDownEurakaThread.interrupt();}
} catch (Exception e) {logger.error("============================== shutdown local eureka data error!", e);
}
}
2. 刷新 gateway 缓存
可以通过 gateway 提供接口,client 调用接口的方式来刷新缓存,gateway 接口内容后面会讲到,client 这边需要做的是:
- 获取所有网关服务器地址。
- 调用所有网关的接口来刷新服务。
为了加快速度,用了线程池来并行调用,同样的如果超时未完成任务,就强行停止任务。
protected void refreshGatewayEurekaData() {if (!ifRegisterWithEureka()) {logger.error("============================== not registerWithEureka to refreshGatewayEurekaData!");
return;
}
try {
// gatewayServiceName 是 gateway 在 eureka 注册的服务名
Application application = client.getApplication(gatewayServiceName);
if (null == application) {return;}
List<InstanceInfo> instanceInfos = application.getInstances();
if (CollectionUtils.isEmpty(instanceInfos)) {return;}
SimpleClientHttpRequestFactory clientHttpRequestFactory = new SimpleClientHttpRequestFactory();
clientHttpRequestFactory.setConnectTimeout(3000);
clientHttpRequestFactory.setReadTimeout(3000);
RestTemplate restTemplate = new RestTemplate(clientHttpRequestFactory);
int poolSize = instanceInfos.size();
if(poolSize > maxPoolSize) {poolSize = maxPoolSize;}
ExecutorService executorService = Executors.newFixedThreadPool(instanceInfos.size());
instanceInfos.forEach(i ->
executorService.submit(() -> {String url = getRefreshGatewayUrl(i);
try {ResponseEntity<String> response = restTemplate.getForEntity(url, String.class);
if (!HttpStatus.OK.equals(response.getStatusCode())) {logger.error("============================== refresh remote gateway[{}] eureka data error:[{}]!", url, response.getBody());
} else {logger.info("============================== refresh remote gateway[{}] eureka data success!", url);
}
} catch (Exception e) {logger.error("============================== refresh remote gateway[{}] eureka data error!", url, e);
}
})
);
executorService.shutdown();
if (!executorService.awaitTermination(refreshGatewauEurekaShutdownWaitSeconds, TimeUnit.SECONDS)) {logger.error("============================== refresh remote gateway eureka data thread pool did not shut down gracefully within {} seconds. Proceeding with forceful shutdown", refreshGatewauEurekaShutdownWaitSeconds);
if (!executorService.isShutdown()) {executorService.shutdownNow();
}
}
} catch (Exception e) {logger.error("============================== refresh remote gateway eureka data!", e);
}
}
protected String getRefreshGatewayUrl(InstanceInfo instanceInfo) {return instanceInfo.getHomePageUrl() + gatewayRefreshEurekaUrl + (gatewayRefreshEurekaUrl.indexOf('?') == -1 ? "?" : "&") +
"serviceId=" + applicationName.toUpperCase();}
3. 停止 tomcat
从《springcloud 项目优雅重启(五):tomcat 关闭流程》可以看到,tomcat 的关闭是个很复杂很长的流程,并且 tomcat暴力 shutdown
是因为对线程池用了shutdownNow
,我们只是想在项目停止时,** 在“上两步完成之后 -tomcat 停止之前”让 tomcat 不再接收请求。
- 上文我们有个特别标注的地方:发布关闭事件 publishEvent(new ContextClosedEvent(this))是在关闭资源 onClose 之前,并且 Spring 事件处理默认是同步处理的 。所以我们可以监听
ContextClosedEvent
事件,在onClose
之前做我们想要做的事。 - 通过
TomcatContextCustomizer
可以获取到Connector
。 - 通过
Connector
可以获取到线程池。
获取到线程池之后,由于前面的步骤正常情况下已经保证不会再接收到请求,这时候只需要等线程池内所有的请求都处理之后,再让流程继续往下走。
@Configuration
@ConditionalOnProperty(name = "server.container.custom-config", havingValue = "true", matchIfMissing = true)
@AutoConfigureAfter(EurekaClientAutoConfiguration.class)
public class ContainerConfiguration {
@Bean
public ServletWebServerFactory servletContainer(@Autowired GracefulShutdown gracefulShutdown) {
// springboot 是在 ServletWebServerFactoryConfiguration 类里生成 TomcatServletWebServerFactory,方式也是直接 new 一个对象。// 我们可以像这样自己 new(ServletWebServerFactoryConfiguration 里用了 ConditionalOnMissingBean),也可以注入 springboot 生成。TomcatServletWebServerFactory tomcat = new TomcatServletWebServerFactory();
tomcat.addConnectorCustomizers(gracefulShutdown);
return tomcat;
}
}
public class DefaultGracefulShutdown implements TomcatConnectorCustomizer, ApplicationListener<ContextClosedEvent> {
@Override
public void customize(Connector connector) {this.connector = connector;}
@Override
public void onApplicationEvent(ContextClosedEvent event) {shutdownTomcatConnector();
}
private void shutdownTomcatConnector() {
try {this.connector.pause();
logger.info("============================== after Tomcat connector pause!");
Executor executor = this.connector.getProtocolHandler().getExecutor();
if (executor instanceof ThreadPoolExecutor) {logger.info("============================== Tomcat thread pool begin to shut down gracefully!");
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
threadPoolExecutor.shutdown();
if (!threadPoolExecutor.awaitTermination(tomcatShutdownWaitSeconds, TimeUnit.SECONDS)) {logger.error("============================== Tomcat thread pool did not shut down gracefully within {} seconds. Proceeding with forceful shutdown", tomcatShutdownWaitSeconds);
if (!threadPoolExecutor.isShutdown()) {threadPoolExecutor.shutdownNow();
}
}
}
} catch (Exception e) {logger.error("============================== shutdown Tomcat connector error!", e);
}
}
}
4. 结合
以上三步结合起来需要考虑几个细节:
- 第一步和第二步之间:
eureka-server
接收到请求之后,是异步传播给集群其他节点。所以我们要等eureka-server
都刷新了数据之后,再去让gateway
刷新,否则gateway
可能从一个还未更新数据的server
节点取到未刷新的数据。但是我们并不知道server
什么时候会刷新完,所以我们只能给个我们能容忍的等待时间。 - 第二步和第三步之间:如果有个请求在网关刷新缓存之前获取到了服务节点,但是还没调用服务,当这个请求发送到当前服务节点时,服务已经
connector.pause()
,会导致调用失败。整个流程如下:
public void onApplicationEvent(ContextClosedEvent event) {logger.info("============================== begin gracefully shutdown service!");
// 本地调试时可能会关闭 eureka 或者不注册 eureka
if (ifRegister()) {unregisterEurekaData();
try {
// 等待全部 "eureka-server" 刷新数据
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException ignore) { }
refreshGatewayEurekaData();
// 睡眠 3 秒,再停止 tomcat 接收新服务,如果有个请求在 client.shutdown()之前获取到了服务节点,但是还没调用服务,这时候如果 connector.pause(),会导致调用失败
try {TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException ignore) {}}
shutdownTomcatConnector();
logger.info("============================== finish gracefully shutdown service!");
}
启动
启动要做到事是:eureka-client 向 eureka-server 注册成功之后,调用网关接口刷新缓存。从 eureka-client 源码里面发现,client 在注册成功之后,并没有 发送事件(public event)
,也就是无法知道什么时候注册成功。那我们就换个思路,在项目启动完成之后,我们自己调用注册方法,这样的结果就是,eureka-client 会向 eureka-server 注册两次(自己注册一次,原先的启动流程注册一次),但是重复的注册并不会有什么影响。
public class DefaultStartRegister implements ApplicationListener<ApplicationReadyEvent> {
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {if (ifRegister()) {register();
}
}
public void register() {executorService.submit(() -> {
try {EurekaClient client = getClient();
if (InstanceInfo.InstanceStatus.UP.equals(client.getApplicationInfoManager().getInfo().getStatus())) {Method method = ReflectionUtils.findMethod(DiscoveryClient.class, "register");
// 非 public 方法
method.setAccessible(true);
// springcloud 下的 EurekaClient 实际上是个代理对象
Object target = AopUtils.isAopProxy(client) ? ProxyUtils.getTargetObject(client) : client;
AtomicReference<Boolean> registerResult = new AtomicReference<>(false);
Thread registerEurakaThread = new Thread(() -> {boolean ret = (boolean) ReflectionUtils.invokeMethod(method, target);
registerResult.set(ret);
logger.info("============================== invoke EurekaClient.register method to registerWithEureka result: {}!", ret);
});
try {registerEurakaThread.start();
// 不能无限制等待
registerEurakaThread.join(registerEurekaShutdownWaitSeconds * 1000);
if (registerEurakaThread.isAlive()) {logger.error("============================== register local eureka doesn't compelete in allow time!");
registerEurakaThread.interrupt();}
} catch (Exception e) {logger.error("============================== register local eureka data error!", e);
}
try {
// 等待全部 "eureka-server" 刷新数据
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException ignore) { }
if (registerResult.get()) {refreshGatewayEurekaData();
}
}
} catch (Exception e) {logger.error("register eureka after start error!", e);
}
});
}
}
- 整个步骤采用线程池来异步化,尽量减少对项目启动速度的影响。
- eureka-client 的注册方法
DiscoveryClient.register
不是public
,所以使用了反射的方式:用反射的优点是简单明了;缺点是如果 eureka 版本升级,可能就会变得不兼容 / 不可用(一般第三方组件升级时会尽量“保留 / 兼容”public
的方法,非 public
的方法就无法保证了),但是由于现在eureka
已经停止维护了,所以也就没有这个问题了。 - springcloud 下的 EurekaClient 实际上是个代理对象,反射的时候需要注意这个。
- 跟启动时一样,注册完成之后,等待
eureka-server
广播完成之后再调用 gateway 接口。
eureka-server
eureka-server 的三级缓存中,registry
和 readWriteCacheMap
都是实时更新,而 readOnlyCacheMap
是通过定时任务定时从 readWriteCacheMap
取数据更新。
- 如果我们通过改造
readWriteCacheMap
,让它也实时更新,那就变得跟readWriteCacheMap
作用差不多了(从细节上讲,readWriteCacheMap
用的是guava-cache
,还是会做一些额外的事情,比如计数统计、清理)。 -
registry
、readWriteCacheMap
和readOnlyCacheMap
数据都是存储在 JVM 内存里,对于绝大部分的系统来说两级缓存性能已经足够了。
所以 eureka-server 端,我们只需要关闭 readOnlyCacheMap
即可:eureka.server.use-read-only-response-cache: false
。虽然性能上会有下降,但是这点儿性能差别,对于大部分系统来说是没有影响的,除非超高并发的系统。
gateway
gateway 需要提供一个接口给 client 在停机和启动时调用,接口做两件事:
- 刷新
eureka-client
缓存。 - 让
ribbon
重新从eureka-client
里获取缓存。
1. 刷新 eureka-client
缓存
通过源码可以看到,DiscoveryClient
里有个 refreshRegistry
方法会刷新缓存,但是这个方法不是public
,同样需要反射调用。
2. 刷新 ribbon
缓存
由于不同的服务分别有不同的 LoadBalancer
对象,需要先获取到服务对应的 LoadBalancer
,那就需要知道当前要刷新的是哪个服务,所以接口要有个参数,参数是服务在eureka-server
上注册的服务名,要特别注意的是,eureka 服务名默认是大写,所以传过来的服务名参数值也需要是大写。
同样的通过源码可以找到 RibbonLoadBalancerClient.DynamicServerListLoadBalancer.updateListOfServers
方法会刷新 ribbon
缓存,这里之所以选择从 RibbonLoadBalancerClient
作为入口,是因为 springboot 有将 RibbonLoadBalancerClient
暴露成Bean
。
3. 代码
public class GatewayApplication {
@Autowired
private EurekaClient eurekaClient;
@Autowired
private LoadBalancerClient loadBalancerClient;
@Bean
public RouterFunction<ServerResponse> routerFunction() {return RouterFunctions.route(RequestPredicates.path("/eureka/refresh"), request -> {
try {log.info("receive eureka refresh, eureka current service count {}!", eurekaClient.getApplications().size());
Method method = ReflectionUtils.findMethod(DiscoveryClient.class, "refreshRegistry");
// 非 public 方法
method.setAccessible(true);
// springcloud 下的 EurekaClient 实际上是个代理对象
Object target = AopUtils.isAopProxy(eurekaClient) ? ProxyUtils.getTargetObject(eurekaClient) : eurekaClient;
ReflectionUtils.invokeMethod(method, target);
log.info("finish eureka refresh, eureka current service count {}!", eurekaClient.getApplications().size());
request.queryParam("serviceId").filter(StringUtils::isNotBlank).ifPresent(serviceId -> {RibbonLoadBalancerClient ribbonLoadBalancerClient = (RibbonLoadBalancerClient) loadBalancerClient;
Method ribbonMethod = ReflectionUtils.findMethod(RibbonLoadBalancerClient.class, "getLoadBalancer", String.class);
ribbonMethod.setAccessible(true); // 非 public 方法
Object ribbonTarget = AopUtils.isAopProxy(ribbonLoadBalancerClient) ? ProxyUtils.getTargetObject(ribbonLoadBalancerClient) : ribbonLoadBalancerClient;
DynamicServerListLoadBalancer dynamicServerListLoadBalancer = (DynamicServerListLoadBalancer) ReflectionUtils.invokeMethod(ribbonMethod, ribbonTarget, serviceId);
log.info("begin refresh ribbon, ribbon current service[{}] count {}!", serviceId, dynamicServerListLoadBalancer.getAllServers().size());
// 也可以调用 Ribbon 的 BaseLoadbalancer.markServerDown 方法来清理 Ribbon 数据
dynamicServerListLoadBalancer.updateListOfServers();
log.info("finish refresh ribbon, ribbon current service[{}] count {}!", serviceId, dynamicServerListLoadBalancer.getAllServers().size());
});
return ok().body(Mono.just("success"), String.class);
} catch (Exception e) {log.info("refresh eureka error!", e);
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).body(Mono.just(e.getClass().getName() + ":" + e.getMessage()), String.class);
}
});
}
}
- 需要注意到是,
RibbonLoadBalancerClient.getLoadBalancer
返回的是接口ILoadBalancer
,由于springboot-ribbon
默认是DynamicServerListLoadBalancer
,所以直接类型强转,但是如果有哪个项目配置了自定义规则,返回的就不一定是DynamicServerListLoadBalancer
(我没试过,所以也不确定)。 - 刷新 ribbon 这一步,也可以通过
BaseLoadbalancer.markServerDown(Server server)
方法来让指定的服务节点失效,这个方法是public
方法。我在这里采用了比较简单粗暴的updateListOfServers
直接刷新整个服务。
发版
jenkins 流程中,除了 重启脚本
之外,还需要做两件事:
- 备份当前正在运行的文件。
- 如果发版失败,回滚版本。
服务器要分两批启动,确保第一批有启动成功(可以提供服务)的节点之后,才能启动第二批,所以要有检查启动成功的机制。
1. 备份文件
- 检查是否存在
backup
文件夹,如果没有则创建。 - 将当前正在运行的
jar
文件移动到backup
目录中,并在文件后面加上时间戳。
#!/bin/bash
service_name=$1
if [-z "$service_name"]
then
echo "usage: ./backup-service.sh xxxx"
exit 1
fi
base_dir="/data/app/tscm/$service_name"
backup_dir="$base_dir/backup"
if [! -d "$backup_dir"]; then
mkdir -m 775 -p $backup_dir
fi
cd $base_dir
time=$(date "+%Y%m%d%H%M%S")
for var in ${service_name}*.jar;
do
mv "$var" "backup/${var%.jar}_$time.jar";
done
exit 0
- 执行命令为
sh backup-service.sh 项目名
,其中打包的jar
文件名称前缀文件项目名,例如项目为my-test
,则打包的文件必须为my-test{这里可以有其他字符}.jar
。 - 脚本中的
/data/app/tscm/
目录改为自己服务器中实际的目录。 - 函数成功返回 0,失败返回 1。
2. 失败回滚
- 在执行时间内定时循环检查服务。
- 如果超时还未检查到服务已经启动成功,则将当前发版文件移动到
fail-backup
目录中,将backup
目录中最新的文件(即刚备份的文件)移动回来,重新启动。 - 项目需要提供一个检查服务的接口。
#!/bin/bash
service_name=$1
env=$2
check_url=$3
if [-z "$service_name"]
then
echo "service_name required, usage: ./backup-service.sh xxxx dev http://localhost:8080/momnitor"
exit 1
fi
if [-z "$env"]
then
echo "env required, usage: ./backup-service.sh xxxx dev http://localhost:8080/momnitor"
exit 1
fi
if [-z "$check_url"]
then
echo "check_url required, usage: ./backup-service.sh xxxx dev http://localhost:8080/momnitor"
exit 1
fi
base_dir="/data/app/tscm/$service_name"
fail_dir="$base_dir/fail-backup"
result=0
count=0
while [$result -ne 200 -a $count -lt 18] # 根据接口返回的 HTTP 状态码是否 200 来判断,循环 18 次
do
sleep 10 # 睡眠 10 秒,也就是给项目最大启动时间为 18 * 10 = 180 秒 = 3 分钟
let count+=1
result=$(curl -sIL -w "%{http_code}\n" -o /dev/null ${check_url})
echo $count"times curl result:"$result
done
if [$result -eq 200]; then
exit 0
else
if [! -d "$fail_dir"]; then
mkdir -m 775 -p $fail_dir
fi
cd $base_dir
time=$(date "+%Y%m%d%H%M%S")
for var in ${service_name}*.jar;
do
mv "$var" "fail-backup/${var%.jar}_$time.jar";
done
cd $base_dir/backup
filename=`ls -t |head -n1|awk '{print $0}'`
echo "失败回滚文件:$filename"
mv "$filename" `echo "../$filename" |awk -F '_' '{print $1".jar"}'`
cd ../..
sh run-service.sh $service_name $env # 重新启动
exit 1
fi
- 执行命令为
sh check-rollback-service.sh 项目名 spring 环境 项目节点检查地址
,例如:sh check-rollback-service.sh mytest dev http://localhost:6101/base/monitor
。 - 脚本中的
/data/app/tscm/
目录改为自己服务器中实际的目录。 -
run-service.sh
为项目重启
脚本,做两件事:- 根据项目名查找 jvm 进程,并
kill
。 - 根据项目名查找
jar
文件并启动。
- 根据项目名查找 jvm 进程,并
- 函数成功返回 0,失败返回 1。
Feign
前面的处理方案是刷新网关的的缓存,但是项目组内有部分项目用了feign
,导致方案对这些项目无效。
方案
feign
跟 gateway
一样,也是经过 robbin
和eureka-client
两层处理,但是项目重启时,不可能像通知网关那样 通过接口同步
通知所有节点刷新本地缓存。
可以用消息通知的方式通知其他项目,但是这样有两个问题:
- 其他项目还得引入
MQ
/Redis
等可以接收通知的中间件。 - 如果用消息通知的方式,相当于是异步通知,就无法知道其他项目是否已经完成刷新缓存,不过这个也可以通过增加
睡眠等待
,来尽量保证其他项目已完成刷新。
鉴于通知的方式比较复杂,那是不是可以让 feign
走网关请求,而不是直接点对点请求。
通过 API 可以看到 FeignClient
配置里有 url
和path
两个参数,当指定 url
参数时,feign
就不会再走 robbin
获取服务节点,而是直接发起请求,所以只需要做很小的调整。
调整前:
@FeignClient(value = "tscm-service-purchase-facader")
调整后:
@FeignClient(value = "tscm-service-purchase-facader", url = "${gateway.host}", path="/tscm-service-purchase-facader")
或
@FeignClient(value = "tscm-service-purchase-facader", url = "${gateway.host}/tscm-service-purchase-facader")