关于spring-cloud:spring-cloud-gatewayspring-cloud-loadbalancer-组合指定版本权重分流

上次写了在gateway上应用ribbon的实现形式。因为思考到ribbon是阻塞版本,不适宜在gateway上应用,所以切换到了spring cloud loadbanlancer 并且配合应用 CircuitBreaker断路器

0. 参考

文章 1 Spring Tips: Spring Cloud Loadbalancer
文章 2 Spring Cloud LoadBalancer 官网文档
文章 3 Spring Cloud Commons 之 loadbalancer 源码笔记
版本:
spring-cloud.version Hoxton.SR7
spring-boot-starter-parent 2.3.2.RELEASE

1. 思路

仍旧是基于 Weight Route Predicate Factory 的革新。想法是通过在config上配置版本号和权重比例,达到指定版本分流的目标。
例如,配置文件像上面这样配置

    - id: temp_old
      uri: lb://TEMPLATE
      predicates:
        - Path=/temp/**
        - VersionWeight=group1, 99, v1  #权重比例,与版本号
      filters:
        - StripPrefix=1
    - id: temp_new
      uri: lb://TEMPLATE
      predicates:
        - Path=/temp/**
        - VersionWeight=group1, 1, v2
      filters:
        - StripPrefix=1

99%的流量将进入老版本v1,只有1%的流量进入到新版v2。
gateway是用的webflux的,而我的服务是一般servlet的,所以在服务端的负载平衡策略还是应用ribbion 的做法

1.1 调用链路

申请链路
客户端
-> Weight Route Predicate(权重断言)
-> ReactiveLoadBalancerClientFilter (如果在uri上配置了“lb”则会进入响应式负载平衡)
-> chooes(ServerWebExchange exchange) (该办法从LoadBalancerClientFactory 中获取ReactorLoadBalancer<ServiceInstance> 就是负载平衡策略bean)
->loadBalancer.choose(createRequest()) (负载平衡算法 抉择出服务实例)
->申请到具体服务

实现服务实例抉择

1.2 自定义负载平衡策略

官网文档中提到自定义负载平衡策略能够应用自定义的负载平衡配置文件

public class CustomLoadBalancerConfiguration {

    @Bean
    ReactorLoadBalancer<ServiceInstance> randomLoadBalancer(Environment environment,
            LoadBalancerClientFactory loadBalancerClientFactory) {
        String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
        return new RandomLoadBalancer(loadBalancerClientFactory
                .getLazyProvider(name, ServiceInstanceListSupplier.class),
                name);
    }
}

定义好了之后通过@LoadBalancerClient来指定每一个Service(代码里的’stores’ 为serviceId)对应的自定义配置

@Configuration
@LoadBalancerClient(value = "stores", configuration = CustomLoadBalancerConfiguration.class)
public class MyConfiguration {
}

2. 革新点

  1. 写一个grayContext, 基于threadlocal解决version的上下文传递。
  2. 自定义一个 VersionLoadBalancer 继承 ReactorServiceInstanceLoadBalancer,实现外部的choose 办法,(能够参考 RoundRobinLoadBalancer 的实现)。

    public class VersionLoadBalancer implements ReactorServiceInstanceLoadBalancer{
      ... //省略掉其余的代码
      @Override
     public Mono<Response<ServiceInstance>> choose(Request request) {
         ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
                 .getIfAvailable(NoopServiceInstanceListSupplier::new);
         return supplier.get().next().map(this::getInstanceResponse);
     }
     private Response<ServiceInstance> getInstanceResponse(
             List<ServiceInstance> instances) {
         if (instances.isEmpty()) {
             log.warn("No servers available for service: " + this.serviceId);
             return new EmptyResponse();
         }
         // GrayUtil 自定义工具类从以后线程中取出传递的version
         String version = GrayUtil.getCurrentConextStrValue("version");
         if(StringUtil.isNotEmpty(version)){
             //如果没有配置元数据version的状况,就随机取一个 参考RoundRobinLoadBalancer
             if(!instances.stream().allMatch((item->item.getMetadata() !=null && item.getMetadata().get("version") != null))){
                 int pos = Math.abs(this.position.incrementAndGet());
                 ServiceInstance instance = instances.get(pos % instances.size());
                 return new DefaultResponse(instance);
             }
    
             //凡是有配置version的状况,抉择version雷同的服务
             Optional<ServiceInstance> optional = instances.stream().filter(item -> {
                 if(item.getMetadata() != null && item.getMetadata().get("version") != null){
                     return version.equals(item.getMetadata().get("version"));
                 }
                return false;
             }).findFirst();
             if(optional.isPresent()){
                 return new DefaultResponse(optional.get());
             }
         }
         log.warn(String.format("No servers available for service %s, version: %s", this.serviceId, version));
         return new EmptyResponse();
     }
    }
  3. 自定义LoadBalancerConfiguration

    public class VersionLoadBalancerConfiguration {
    
    
     @Bean
     ReactorLoadBalancer<ServiceInstance> versionLoadBalancer(Environment environment,
                                                             LoadBalancerClientFactory loadBalancerClientFactory) {
         String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
         return new VersionLoadBalancer(loadBalancerClientFactory
                 .getLazyProvider(name, ServiceInstanceListSupplier.class),
                 name);
     }
    
     @Bean
     public ServiceInstanceListSupplier versionClientServiceInstanceListSupplier(
             ConfigurableApplicationContext context) {
         ServiceInstanceListSupplier serviceInstanceListSupplier =  ServiceInstanceListSupplier.builder()
                 .withDiscoveryClient()
                 .withZonePreference()
                 //.withCaching() //测试环境先不缓存
                 .build(context);
         return serviceInstanceListSupplier;
     }
    }
  4. 自定义@EnableGrayLoadBalancerClient注解,参照@LoadBalancerClient 实现主动给每个服务绑定本人的负载规定。

    @Configuration(proxyBeanMethods = false)
    @Import(GrayLoadBalancerClientConfigurationRegistrar.class)
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface EnableGrayLoadBalancerClient {
    
     /**
      * 配置文件
      * @return
      */
     Class<?> configuration() ;
    }
    public class GrayLoadBalancerClientConfigurationRegistrar implements ImportBeanDefinitionRegistrar , ResourceLoaderAware {
    
    
     private ApplicationContext context;
    
     private Binder binder;
    
    
     @Override
     public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry,
                                         BeanNameGenerator importBeanNameGenerator) {
         Map<String, Object> attrs = metadata
                 .getAnnotationAttributes(EnableGrayLoadBalancerClient.class.getName());
         List<String> appNames = getServiceIdfromRoute();
         appNames.forEach(name->{
             if (attrs != null && attrs.containsKey("configuration")) {
                 registerClientConfiguration(registry,name, attrs.get("configuration"));
             }
         });
     }
    
     private static void registerClientConfiguration(BeanDefinitionRegistry registry,
                                                     Object name, Object configuration) {
         BeanDefinitionBuilder builder = BeanDefinitionBuilder
                 .genericBeanDefinition(LoadBalancerClientSpecification.class);
         builder.addConstructorArgValue(name);
         builder.addConstructorArgValue(configuration);
         registry.registerBeanDefinition(name + ".VersionLoadBalancerClientSpecification",
                 builder.getBeanDefinition());
     }
    
    
     @Override
     public void setResourceLoader(ResourceLoader resourceLoader) {
         this.context = (ApplicationContext) resourceLoader;
         this.binder = Binder.get(this.context.getEnvironment());
     }
     //应用了 VersionWeight 断言的才被退出到负载中,并不是全副退出,这里能够依据具体情况断定。
     private List<String> getServiceIdfromRoute(){
         Map<String, Object> routes = binder.bind("spring.cloud.gateway.routes" , Bindable.mapOf(String.class, Object.class)).get();
         return routes.values().stream().filter(item->{
             LinkedHashMap<String, List> map = (LinkedHashMap<String, List>) item;
             LinkedHashMap<String, Object> predicates =(LinkedHashMap<String, Object>) map.get("predicates");
             return predicates.values().stream().anyMatch(lm ->{
                 if(lm instanceof LinkedHashMap){
                     return ((LinkedHashMap)lm).values().stream().anyMatch(s->{
                         if(s instanceof String){
                             return ((String)s).contains("VersionWeight");
                         }
                         return false;
                     });
                 }else if(lm instanceof String){
                   return ((String)lm).contains("VersionWeight");
                 }
                 return false;
             });
         }).map(item->((LinkedHashMap)item).get("uri")).map(item->{
             String strItem = (String)item;
             return strItem.substring(5);
         }).distinct().collect(Collectors.toList());
     }

    @LoadBalancerClient 是通过 LoadBalancerClientSpecification 来实现子容器隔离,和fegin 的实现形式统一。bean会退出到LoadBalancerClientFactory中,而且容器实例化是在clientFactory.getInstance(name)时才会进行实例化。getInstance这个办法是在ReactiveLoadBalancerClientFilter 中被调用的,言下之意就是当被第一次调用时才会实现容器实例化并加载LoadBalancer。

  5. 监听路由刷新事件(RefreshScopeRefreshedEvent)思考到咱们的灰度公布是在路由中配置的,当路由扭转时,比如减少服务器路由,就须要重写新增服务的负载规定。

    public class RefreshGrayRouteListener implements ApplicationListener<ApplicationEvent>, BeanFactoryAware,
         BeanDefinitionRegistryPostProcessor {
    
     private BeanFactory beanFactory;
    
     private BeanDefinitionRegistry beanDefinitionRegistry;
    
     //注册 LoadBalancerClientSpecification
     private static String registerClientConfiguration(BeanDefinitionRegistry registry,
                                                       Object name) {
         String beanName = name + ".VersionLoadBalancerClientSpecification";
         BeanDefinitionBuilder builder = BeanDefinitionBuilder
                 .genericBeanDefinition(LoadBalancerClientSpecification.class);
         builder.addConstructorArgValue(name);
         builder.addConstructorArgValue(new Class[]{VersionLoadBalancerConfiguration.class});
         registry.registerBeanDefinition(beanName,
                 builder.getBeanDefinition());
         return beanName;
     }
    
     @Override
     public void onApplicationEvent(ApplicationEvent event) {
         if (event instanceof RefreshScopeRefreshedEvent) {
             refreshLoadBalancerBean();
         }
     }
    
     @Override
     public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
         this.beanFactory = beanFactory;
     }
    
     private void refreshLoadBalancerBean() {
    
    
         LoadBalancerClientFactory clientFactory = beanFactory.getBean(LoadBalancerClientFactory.class);
         //TODO: 理论是无奈删除的
         clientFactory.destroy();
    
         //找出配置的负载平衡的服务
         GatewayProperties properties = beanFactory.getBean(GatewayProperties.class);
         List<String> ServiceIds = properties.getRoutes().stream().filter(routeDefinition -> {
             return routeDefinition.getPredicates().stream().anyMatch(predicateDefinition -> "VersionWeight".equals(predicateDefinition.getName()));
         }).map(routeDefinition -> routeDefinition.getUri().getHost()).distinct().collect(Collectors.toList());
    
         //卸载存在的loadBalancerClientSpecification
         Map<String, LoadBalancerClientSpecification> loadBalancerClientSpecificationMap =
                 ((DefaultListableBeanFactory) beanFactory).getBeansOfType(LoadBalancerClientSpecification.class);
         loadBalancerClientSpecificationMap.forEach((k, v) -> {
             if (!k.startsWith("default")) {
                 ((DefaultListableBeanFactory) beanFactory).removeBeanDefinition(k);
             }
         });
         //从新注册 LoadBalancerClientSpecification
         ServiceIds.forEach(name -> registerClientConfiguration(beanDefinitionRegistry,name));
         loadBalancerClientSpecificationMap =
                 ((DefaultListableBeanFactory) beanFactory).getBeansOfType(LoadBalancerClientSpecification.class);
         List<LoadBalancerClientSpecification> list = new ArrayList<>(loadBalancerClientSpecificationMap.size());
         loadBalancerClientSpecificationMap.forEach((k, v) -> {
             if (!k.startsWith("default")) {
                 list.add(v);
             }
         });
         clientFactory.setConfigurations(list);
         ServiceIds.forEach(name->clientFactory.getInstance(name));
     }
    
     @Override
     public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
         this.beanDefinitionRegistry = beanDefinitionRegistry;
     }
    
     @Override
     public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
    
     }
    
    }
  6. 自定义GrayFilter,继承OncePerRequestFilter 在head中获取version 放入到threadlocal中。

     @Override
     protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
         String version = request.getHeader("version");
         if(StringUtil.isNotEmpty(version)){
             GrayUtil.setCurrentConextStrValue("version", version);
         }
         filterChain.doFilter(request, response);
         return;
     }

    7.

3.待解决问题

  • 调用/refresh的时候,须要刷新两次能力初期化新退出服务的负载规定,第一次尽管刷新胜利,然而没有初期化bean。
  • 当某个服务不须要做负载平衡,也就是去掉VersionWeight 断言时,还是会进入versionWeight的负载办法(我也不晓得怎么办比拟好,索性不去了,将权重均分- -)

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理