上次写了在 gateway 上应用 ribbon 的实现形式。因为思考到 ribbon 是阻塞版本,不适宜在 gateway 上应用,所以切换到了 spring cloud loadbanlancer 并且配合应用 CircuitBreaker 断路器
0. 参考
文章 1 Spring Tips: Spring Cloud Loadbalancer
文章 2 Spring Cloud LoadBalancer 官网文档
文章 3 Spring Cloud Commons 之 loadbalancer 源码笔记
版本:
spring-cloud.version Hoxton.SR7
spring-boot-starter-parent 2.3.2.RELEASE
1. 思路
仍旧是基于 Weight Route Predicate Factory 的革新。想法是通过在 config 上配置版本号和权重比例,达到指定版本分流的目标。
例如,配置文件像上面这样配置
- id: temp_old
uri: lb://TEMPLATE
predicates:
- Path=/temp/**
- VersionWeight=group1, 99, v1 #权重比例,与版本号
filters:
- StripPrefix=1
- id: temp_new
uri: lb://TEMPLATE
predicates:
- Path=/temp/**
- VersionWeight=group1, 1, v2
filters:
- StripPrefix=1
99% 的流量将进入老版本 v1,只有 1% 的流量进入到新版 v2。
gateway 是用的 webflux 的,而我的服务是一般 servlet 的,所以在服务端的负载平衡策略还是应用 ribbion 的做法
1.1 调用链路
申请链路
客户端
-> Weight Route Predicate(权重断言)
-> ReactiveLoadBalancerClientFilter(如果在 uri 上配置了“lb”则会进入响应式负载平衡)
-> chooes(ServerWebExchange exchange)(该办法从 LoadBalancerClientFactory 中获取 ReactorLoadBalancer<ServiceInstance> 就是负载平衡策略 bean)
->loadBalancer.choose(createRequest())(负载平衡算法 抉择出服务实例)
-> 申请到具体服务
实现服务实例抉择
1.2 自定义负载平衡策略
官网文档中提到自定义负载平衡策略能够应用自定义的负载平衡配置文件
public class CustomLoadBalancerConfiguration {
@Bean
ReactorLoadBalancer<ServiceInstance> randomLoadBalancer(Environment environment,
LoadBalancerClientFactory loadBalancerClientFactory) {String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new RandomLoadBalancer(loadBalancerClientFactory
.getLazyProvider(name, ServiceInstanceListSupplier.class),
name);
}
}
定义好了之后通过 @LoadBalancerClient 来指定每一个 Service(代码里的 ’stores’ 为 serviceId)对应的自定义配置
@Configuration
@LoadBalancerClient(value = "stores", configuration = CustomLoadBalancerConfiguration.class)
public class MyConfiguration {}
2. 革新点
- 写一个 grayContext, 基于 threadlocal 解决 version 的上下文传递。
-
自定义一个 VersionLoadBalancer 继承 ReactorServiceInstanceLoadBalancer,实现外部的 choose 办法,(能够参考 RoundRobinLoadBalancer 的实现)。
public class VersionLoadBalancer implements ReactorServiceInstanceLoadBalancer{ ... // 省略掉其余的代码 @Override public Mono<Response<ServiceInstance>> choose(Request request) { ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider .getIfAvailable(NoopServiceInstanceListSupplier::new); return supplier.get().next().map(this::getInstanceResponse); } private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {if (instances.isEmpty()) {log.warn("No servers available for service:" + this.serviceId); return new EmptyResponse();} // GrayUtil 自定义工具类从以后线程中取出传递的 version String version = GrayUtil.getCurrentConextStrValue("version"); if(StringUtil.isNotEmpty(version)){ // 如果没有配置元数据 version 的状况,就随机取一个 参考 RoundRobinLoadBalancer if(!instances.stream().allMatch((item->item.getMetadata() !=null && item.getMetadata().get("version") != null))){int pos = Math.abs(this.position.incrementAndGet()); ServiceInstance instance = instances.get(pos % instances.size()); return new DefaultResponse(instance); } // 凡是有配置 version 的状况,抉择 version 雷同的服务 Optional<ServiceInstance> optional = instances.stream().filter(item -> {if(item.getMetadata() != null && item.getMetadata().get("version") != null){return version.equals(item.getMetadata().get("version")); } return false; }).findFirst(); if(optional.isPresent()){return new DefaultResponse(optional.get()); } } log.warn(String.format("No servers available for service %s, version: %s", this.serviceId, version)); return new EmptyResponse();} }
-
自定义 LoadBalancerConfiguration
public class VersionLoadBalancerConfiguration { @Bean ReactorLoadBalancer<ServiceInstance> versionLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) {String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME); return new VersionLoadBalancer(loadBalancerClientFactory .getLazyProvider(name, ServiceInstanceListSupplier.class), name); } @Bean public ServiceInstanceListSupplier versionClientServiceInstanceListSupplier(ConfigurableApplicationContext context) {ServiceInstanceListSupplier serviceInstanceListSupplier = ServiceInstanceListSupplier.builder() .withDiscoveryClient() .withZonePreference() //.withCaching() // 测试环境先不缓存 .build(context); return serviceInstanceListSupplier; } }
-
自定义 @EnableGrayLoadBalancerClient 注解,参照 @LoadBalancerClient 实现主动给每个服务绑定本人的负载规定。
@Configuration(proxyBeanMethods = false) @Import(GrayLoadBalancerClientConfigurationRegistrar.class) @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface EnableGrayLoadBalancerClient { /** * 配置文件 * @return */ Class<?> configuration() ;}
public class GrayLoadBalancerClientConfigurationRegistrar implements ImportBeanDefinitionRegistrar , ResourceLoaderAware { private ApplicationContext context; private Binder binder; @Override public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry, BeanNameGenerator importBeanNameGenerator) { Map<String, Object> attrs = metadata .getAnnotationAttributes(EnableGrayLoadBalancerClient.class.getName()); List<String> appNames = getServiceIdfromRoute(); appNames.forEach(name->{if (attrs != null && attrs.containsKey("configuration")) {registerClientConfiguration(registry,name, attrs.get("configuration")); } }); } private static void registerClientConfiguration(BeanDefinitionRegistry registry, Object name, Object configuration) { BeanDefinitionBuilder builder = BeanDefinitionBuilder .genericBeanDefinition(LoadBalancerClientSpecification.class); builder.addConstructorArgValue(name); builder.addConstructorArgValue(configuration); registry.registerBeanDefinition(name + ".VersionLoadBalancerClientSpecification", builder.getBeanDefinition()); } @Override public void setResourceLoader(ResourceLoader resourceLoader) {this.context = (ApplicationContext) resourceLoader; this.binder = Binder.get(this.context.getEnvironment()); } // 应用了 VersionWeight 断言的才被退出到负载中,并不是全副退出,这里能够依据具体情况断定。private List<String> getServiceIdfromRoute(){Map<String, Object> routes = binder.bind("spring.cloud.gateway.routes" , Bindable.mapOf(String.class, Object.class)).get(); return routes.values().stream().filter(item->{LinkedHashMap<String, List> map = (LinkedHashMap<String, List>) item; LinkedHashMap<String, Object> predicates =(LinkedHashMap<String, Object>) map.get("predicates"); return predicates.values().stream().anyMatch(lm ->{if(lm instanceof LinkedHashMap){return ((LinkedHashMap)lm).values().stream().anyMatch(s->{if(s instanceof String){return ((String)s).contains("VersionWeight"); } return false; }); }else if(lm instanceof String){return ((String)lm).contains("VersionWeight"); } return false; }); }).map(item->((LinkedHashMap)item).get("uri")).map(item->{String strItem = (String)item; return strItem.substring(5); }).distinct().collect(Collectors.toList()); }
@LoadBalancerClient 是通过 LoadBalancerClientSpecification 来实现子容器隔离,和 fegin 的实现形式统一。bean 会退出到 LoadBalancerClientFactory 中,而且容器实例化是在 clientFactory.getInstance(name)时才会进行实例化。getInstance 这个办法是在 ReactiveLoadBalancerClientFilter 中被调用的,言下之意就是当被第一次调用时才会实现容器实例化并加载 LoadBalancer。
-
监听路由刷新事件(RefreshScopeRefreshedEvent)思考到咱们的灰度公布是在路由中配置的,当路由扭转时,比如减少服务器路由,就须要重写新增服务的负载规定。
public class RefreshGrayRouteListener implements ApplicationListener<ApplicationEvent>, BeanFactoryAware, BeanDefinitionRegistryPostProcessor { private BeanFactory beanFactory; private BeanDefinitionRegistry beanDefinitionRegistry; // 注册 LoadBalancerClientSpecification private static String registerClientConfiguration(BeanDefinitionRegistry registry, Object name) { String beanName = name + ".VersionLoadBalancerClientSpecification"; BeanDefinitionBuilder builder = BeanDefinitionBuilder .genericBeanDefinition(LoadBalancerClientSpecification.class); builder.addConstructorArgValue(name); builder.addConstructorArgValue(new Class[]{VersionLoadBalancerConfiguration.class}); registry.registerBeanDefinition(beanName, builder.getBeanDefinition()); return beanName; } @Override public void onApplicationEvent(ApplicationEvent event) {if (event instanceof RefreshScopeRefreshedEvent) {refreshLoadBalancerBean(); } } @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException {this.beanFactory = beanFactory;} private void refreshLoadBalancerBean() {LoadBalancerClientFactory clientFactory = beanFactory.getBean(LoadBalancerClientFactory.class); //TODO: 理论是无奈删除的 clientFactory.destroy(); // 找出配置的负载平衡的服务 GatewayProperties properties = beanFactory.getBean(GatewayProperties.class); List<String> ServiceIds = properties.getRoutes().stream().filter(routeDefinition -> {return routeDefinition.getPredicates().stream().anyMatch(predicateDefinition -> "VersionWeight".equals(predicateDefinition.getName())); }).map(routeDefinition -> routeDefinition.getUri().getHost()).distinct().collect(Collectors.toList()); // 卸载存在的 loadBalancerClientSpecification Map<String, LoadBalancerClientSpecification> loadBalancerClientSpecificationMap = ((DefaultListableBeanFactory) beanFactory).getBeansOfType(LoadBalancerClientSpecification.class); loadBalancerClientSpecificationMap.forEach((k, v) -> {if (!k.startsWith("default")) {((DefaultListableBeanFactory) beanFactory).removeBeanDefinition(k); } }); // 从新注册 LoadBalancerClientSpecification ServiceIds.forEach(name -> registerClientConfiguration(beanDefinitionRegistry,name)); loadBalancerClientSpecificationMap = ((DefaultListableBeanFactory) beanFactory).getBeansOfType(LoadBalancerClientSpecification.class); List<LoadBalancerClientSpecification> list = new ArrayList<>(loadBalancerClientSpecificationMap.size()); loadBalancerClientSpecificationMap.forEach((k, v) -> {if (!k.startsWith("default")) {list.add(v); } }); clientFactory.setConfigurations(list); ServiceIds.forEach(name->clientFactory.getInstance(name)); } @Override public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {this.beanDefinitionRegistry = beanDefinitionRegistry;} @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {}}
-
自定义 GrayFilter,继承 OncePerRequestFilter 在 head 中获取 version 放入到 threadlocal 中。
@Override protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {String version = request.getHeader("version"); if(StringUtil.isNotEmpty(version)){GrayUtil.setCurrentConextStrValue("version", version); } filterChain.doFilter(request, response); return; }
7.
3. 待解决问题
- 调用 /refresh 的时候,须要刷新两次能力初期化新退出服务的负载规定,第一次尽管刷新胜利,然而没有初期化 bean。
- 当某个服务不须要做负载平衡,也就是去掉 VersionWeight 断言时,还是会进入 versionWeight 的负载办法(我也不晓得怎么办比拟好,索性不去了,将权重均分 - -)