乐趣区

关于java:深度解析Spring-Cloud-Ribbon的实现源码及原理

Ribbon 的核心作用就是进行申请的负载平衡,它的基本原理如下图所示。就是客户端集成 Ribbon 这个组件,Ribbon 中会针对曾经配置的服务提供者地址列表进行负载平衡的计算,失去一个指标地址之后,再发动申请。

那么接下来,咱们从两个层面去剖析 Ribbon 的原理

  1. @LoadBalanced 注解如何让一般的 RestTemplate 具备负载平衡的能力
  2. OpenFeign 集成 Ribbon 的实现原理

@LoadBalancer 注解解析过程剖析

在应用 RestTemplate 的时候,咱们加了一个 @LoadBalance 注解,就能让这个 RestTemplate 在申请时,就领有客户端负载平衡的能力。

@Bean
@LoadBalanced
RestTemplate restTemplate() {return new RestTemplate();
}

而后,咱们关上 @LoadBalanced 这个注解,能够发现该注解仅仅是申明了一个 @qualifier 注解。

@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {}

@qualifier 注解的作用

咱们平时在应用注解去注入一个 Bean 时,都是采纳 @Autowired。并且大家应该晓得 @Autowired 是能够注入一个 List 或者 Map 的。给大家举个例子(在一个 springboot 利用中)

定义一个 TestClass

@AllArgsConstructor
@Data
public class TestClass {private String name;}

申明一个配置类,并注入 TestClass

@Configuration
public class TestConfig {@Bean("testClass1")
    TestClass testClass(){return new TestClass("testClass1");
    }

    @Bean("testClass2")
    TestClass testClass2(){return new TestClass("testClass2");
    }
}

定义一个 Controller,用于测试,留神,此时咱们应用的是 @Autowired 来注入一个 List 汇合

@RestController
public class TestController {@Autowired(required = false)
    List<TestClass> testClasses= Collections.emptyList();

    @GetMapping("/test")
    public Object test(){return testClasses;}
}

此时拜访:http://localhost:8080/test , 失去的后果是

[
    {name: "testClass1"},
    {name: "testClass2"}
]

批改 TestConfigTestController

@Configuration
public class TestConfig {@Bean("testClass1")
    @Qualifier
    TestClass testClass(){return new TestClass("testClass1");
    }

    @Bean("testClass2")
    TestClass testClass2(){return new TestClass("testClass2");
    }
}
@RestController
public class TestController {@Autowired(required = false)
    @Qualifier
    List<TestClass> testClasses= Collections.emptyList();

    @GetMapping("/test")
    public Object test(){return testClasses;}
}

再次拜访:http://localhost:8080/test , 失去的后果是

[
    {name: "testClass1"}
]

@LoadBalancer 注解筛选及拦挡

理解了 @qualifier 注解的作用后,再回到 @LoadBalancer 注解上,就不难理解了。

因为咱们须要扫描到减少了 @LoadBalancer 注解的 RestTemplate 实例,所以,@LoadBalancer能够实现这个动作,它的具体的实现代码如下:

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
public class LoadBalancerAutoConfiguration {

   @LoadBalanced
   @Autowired(required = false)
   private List<RestTemplate> restTemplates = Collections.emptyList();}

从这个代码中能够看出,在 LoadBalancerAutoConfiguration 这个配置类中,会应用同样的形式,把配置了 @LoadBalanced 注解的 RestTemplate 注入到 restTemplates 汇合中。

拿到了 RestTemplate 之后,在 LoadBalancerInterceptorConfig 配置类中,会针对这些 RestTemplate 进行拦挡,实现代码如下:

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
public class LoadBalancerAutoConfiguration {

    @LoadBalanced
    @Autowired(required = false)
    private List<RestTemplate> restTemplates = Collections.emptyList();

    // 省略....

    @Bean
    @ConditionalOnMissingBean
    public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
    }

    @Configuration(proxyBeanMethods = false)
    @Conditional(RetryMissingOrDisabledCondition.class)
    static class LoadBalancerInterceptorConfig {
        
        // 装载一个 LoadBalancerInterceptor 的实例到 IOC 容器。@Bean
        public LoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
                LoadBalancerRequestFactory requestFactory) {return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
        }
        
        // 会遍历所有加了 @LoadBalanced 注解的 RestTemplate,在原有的拦截器之上,再减少了一个 LoadBalancerInterceptor
        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
            return restTemplate -> {List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
        }

    }
    // 省略....
}

LoadBalancerInterceptor

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
      final ClientHttpRequestExecution execution) throws IOException {final URI originalUri = request.getURI();
   String serviceName = originalUri.getHost();
   Assert.state(serviceName != null, "Request URI does not contain a valid hostname:" + originalUri);
   return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}

RestTemplate 调用过程

咱们在程序中,应用上面的代码发动近程申请时

restTemplate.getForObject(url,String.class);

它的整个调用过程如下。

RestTemplate.getForObject

​ —–> AbstractClientHttpRequest.execute()

​ —–>AbstractBufferingClientHttpRequest.executeInternal()

​ —–> InterceptingClientHttpRequest.executeInternal()

​ —–> InterceptingClientHttpRequest.execute()

InterceptingClientHttpRequest.execute()办法的代码如下。

@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {if (this.iterator.hasNext()) { // 遍历所有的拦截器,通过拦截器进行一一解决。ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
        return nextInterceptor.intercept(request, body, this);
    }
    else {HttpMethod method = request.getMethod();
        Assert.state(method != null, "No standard HTTP method");
        ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
        request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
        if (body.length > 0) {if (delegate instanceof StreamingHttpOutputMessage) {StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
            }
            else {StreamUtils.copy(body, delegate.getBody());
            }
        }
        return delegate.execute();}
}

LoadBalancerInterceptor

LoadBalancerInterceptor 是一个拦截器,当一个被 @Loadbalanced 注解润饰的 RestTemplate 对象发动 HTTP 申请时,会被 LoadBalancerInterceptorintercept办法拦挡,

在这个办法中间接通过 getHost 办法就能够获取到服务名(因为咱们在应用 RestTemplate 调用服务的时候,应用的是服务名而不是域名,所以这里能够通过 getHost 间接拿到服务名而后去调用 execute 办法发动申请)

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
      final ClientHttpRequestExecution execution) throws IOException {final URI originalUri = request.getURI();
   String serviceName = originalUri.getHost();
   Assert.state(serviceName != null, "Request URI does not contain a valid hostname:" + originalUri);
   return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}

LoadBalancerClient 其实是一个接口,咱们看一下它的类图,它有一个惟一的实现类:RibbonLoadBalancerClient

RibbonLoadBalancerClient.execute

RibbonLoadBalancerClient 这个类的代码比拟长,咱们次要看一下他的外围办法 execute

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
    throws IOException {ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
    Server server = getServer(loadBalancer, hint);
    if (server == null) {throw new IllegalStateException("No instances available for" + serviceId);
    }
    RibbonServer ribbonServer = new RibbonServer(serviceId, server,
                                                 isSecure(server, serviceId),
                                                 serverIntrospector(serviceId).getMetadata(server));

    return execute(serviceId, ribbonServer, request);
}

上述代码的实现逻辑如下:

  • 依据 serviceId 取得一个 ILoadBalancer,实例为:ZoneAwareLoadBalancer
  • 调用 getServer 办法去获取一个服务实例
  • 判断 Server 的值是否为空。这里的 Server 实际上就是传统的一个服务节点,这个对象存储了服务节点的一些元数据,比方 host、port 等

getServer

getServer 是用来取得一个具体的服务节点,它的实现如下

protected Server getServer(ILoadBalancer loadBalancer, Object hint) {if (loadBalancer == null) {return null;}
    // Use 'default' on a null hint, or just pass it on?
    return loadBalancer.chooseServer(hint != null ? hint : "default");
}

通过代码能够看到,getServer 理论调用了 IloadBalancer.chooseServer 这个办法,ILoadBalancer 这个是一个负载均衡器接口。

public interface ILoadBalancer {
    //addServers 示意向负载均衡器中保护的实例列表减少服务实例
    public void addServers(List<Server> newServers);
    //chooseServer 示意通过某种策略,从负载平衡服务器中挑选出一个具体的服务实例
    public Server chooseServer(Object key);
    //markServerDown 示意用来告诉和标识负载均衡器中某个具体实例曾经进行服务,否则负载均衡器在下一次获取服务实例清单前都会认为这个服务实例是失常工作的
    public void markServerDown(Server server);
    //getReachableServers 示意获取以后失常工作的服务实例列表
    public List<Server> getReachableServers();
    //getAllServers 示意获取所有的服务实例列表,包含失常的服务和进行工作的服务
    public List<Server> getAllServers();}

ILoadBalancer 的类关系图如下:

从整个类的关系图来看,BaseLoadBalancer 类实现了根底的负载平衡,而 DynamicServerListLoadBalancer 和 ZoneAwareLoadBalancer 则是在负载平衡策略的根底上做了一些性能扩大。

  • AbstractLoadBalancer 实现了 ILoadBalancer 接口,它定义了服务分组的枚举类 /chooseServer(用来选取一个服务实例)/getServerList(获取某一个分组中的所有服务实例)/getLoadBalancerStats 用来取得一个 LoadBalancerStats 对象,这个对象保留了每一个服务的状态信息。
  • BaseLoadBalancer,它实现了作为负载均衡器的基本功能,比方服务列表保护、服务存活状态监测、负载平衡算法抉择 Server 等。然而它只是实现基本功能,在有些简单场景中还无奈实现,比方动静服务列表、Server 过滤、Zone 区域意识(服务之间的调用心愿尽可能是在同一个区域内进行,缩小提早)。
  • DynamicServerListLoadBalancer 是 BaseLoadbalancer 的一个子类,它对根底负载平衡提供了扩大,从名字上能够看出,它提供了动静服务列表的个性
  • ZoneAwareLoadBalancer 它是在 DynamicServerListLoadBalancer 的根底上,减少了以 Zone 的模式来配置多个 LoadBalancer 的性能。

那在 getServer 办法中,loadBalancer.chooseServer具体的实现类是哪一个呢?咱们找到 RibbonClientConfiguration 这个类

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
                                        ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
                                        IRule rule, IPing ping, ServerListUpdater serverListUpdater) {if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {return this.propertiesFactory.get(ILoadBalancer.class, config, name);
    }
    return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                                       serverListFilter, serverListUpdater);
}

从上述申明中,发现如果没有自定义 ILoadBalancer,则间接返回一个ZoneAwareLoadBalancer

ZoneAwareLoadBalancer

Zone 示意区域的意思,区域指的就是天文区域的概念,个别较大规模的互联网公司,都会做跨区域部署,这样做有几个益处,第一个是为不同地区的用户提供最近的拜访节点缩小拜访提早,其次是为了保障高可用,做容灾解决。

而 ZoneAwareLoadBalancer 就是提供了具备区域意识的负载均衡器,它的次要作用是对 Zone 进行了感知,保障每个 Zone 外面的负载平衡策略都是隔离的,它并不保障 A 区域过去的申请肯定会动员到 A 区域对应的 Server 内。真正实现这个需要的是ZonePreferenceServerListFilter/ZoneAffinityServerListFilter

ZoneAwareLoadBalancer 的外围性能是

  • 若开启了区域意识,且 zone 的个数 > 1,就持续区域抉择逻辑
  • 依据 ZoneAvoidanceRule.getAvailableZones()办法拿到可用区们(会 T 除掉齐全不可用的区域们,以及可用然而负载最高的一个区域)
  • 从可用区 zone 们中,通过 ZoneAvoidanceRule.randomChooseZone 随机选一个 zone 进去(该随机听从权重规定:谁的 zone 外面 Server 数量最多,被选中的概率越大)
  • 在选中的 zone 外面的所有 Server 中,采纳该 zone 对对应的 Rule,进行 choose
@Override
public Server chooseServer(Object key) {
    //ENABLED,示意是否用区域意识的 choose 抉择 Server,默认是 true,// 如果禁用了区域、或者只有一个 zone,就间接依照父类的逻辑来进行解决,父类默认采纳轮询算法
    if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {logger.debug("Zone aware logic disabled or there is only one zone");
        return super.chooseServer(key);
    }
    Server server = null;
    try {LoadBalancerStats lbStats = getLoadBalancerStats();
        Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
        logger.debug("Zone snapshots: {}", zoneSnapshot);
        if (triggeringLoad == null) {triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
        }

        if (triggeringBlackoutPercentage == null) {triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
        }
        // 依据相干阈值计算可用区域
        Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
        logger.debug("Available zones: {}", availableZones);
        if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
            // 从可用区域中随机抉择一个区域,zone 外面的服务器节点越多,被选中的概率越大
            String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
            logger.debug("Zone chosen: {}", zone);
            if (zone != null) {
                // 依据 zone 取得该 zone 中的 LB,而后依据该 Zone 的负载平衡算法抉择一个 server
                BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                server = zoneLoadBalancer.chooseServer(key);
            }
        }
    } catch (Exception e) {logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
    }
    if (server != null) {return server;} else {logger.debug("Zone avoidance logic is not invoked.");
        return super.chooseServer(key);
    }
}

BaseLoadBalancer.chooseServer

假如咱们当初没有应用多区域部署,那么负载策略会执行到BaseLoadBalancer.chooseServer

public Server chooseServer(Object key) {if (counter == null) {counter = createCounter();
    }
    counter.increment();
    if (rule == null) {return null;} else {
        try {return rule.choose(key);
        } catch (Exception e) {logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}

依据默认的负载平衡算法来取得指定的服务节点。默认的算法是 RoundBin。

rule.choose

rule 代表负载平衡算法规定,它有很多实现,IRule 的实现类关系图如下。

默认状况下,rule的实现为 ZoneAvoidanceRule,它是在RibbonClientConfiguration 这个配置类中定义的,代码如下:

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
// Order is important here, last should be the default, first should be optional
// see
// https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653
@Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class,
        RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
public class RibbonClientConfiguration {
    @Bean
    @ConditionalOnMissingBean
    public IRule ribbonRule(IClientConfig config) {if (this.propertiesFactory.isSet(IRule.class, name)) {return this.propertiesFactory.get(IRule.class, config, name);
        }
        ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
        rule.initWithNiwsConfig(config);
        return rule;
    }
}

所以,在 BaseLoadBalancer.chooseServer 中调用 rule.choose(key);,理论会进入到ZoneAvoidanceRulechoose办法

@Override
public Server choose(Object key) {ILoadBalancer lb = getLoadBalancer(); // 获取负载均衡器
    Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); // 通过该办法获取指标服务
    if (server.isPresent()) {return server.get();
    } else {return null;}       
}

复合判断 server 所在区域的性能和 server 的可用性抉择 server

次要剖析 chooseRoundRobinAfterFiltering 办法。

chooseRoundRobinAfterFiltering

从办法名称能够看进去,它是通过对指标服务集群通过过滤算法过滤一遍后,再应用轮询实现负载平衡。

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
    if (eligible.size() == 0) {return Optional.absent();
    }
    return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}

CompositePredicate.getEligibleServers

应用主过滤条件对所有实例过滤并返回过滤后的清单,

@Override
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
    //
    List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
    
    // 依照 fallbacks 中存储的过滤器程序进行过滤(此处就行先 ZoneAvoidancePredicate 而后 AvailabilityPredicate)Iterator<AbstractServerPredicate> i = fallbacks.iterator();
    while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
           && i.hasNext()) {AbstractServerPredicate predicate = i.next();
        result = predicate.getEligibleServers(servers, loadBalancerKey);
    }
    return result;
}

顺次应用次过滤条件对主过滤条件的后果进行过滤 *

  • // 不论是主过滤条件还是次过滤条件,都须要判断上面两个条件
  • // 只有有一个条件合乎,就不再过滤,将以后后果返回供线性轮询

    • 第 1 个条件:过滤后的实例总数 >= 最小过滤实例数(默认为 1)
    • 第 2 个条件:过滤互的实例比例 > 最小过滤百分比(默认为 0)

getEligibleServers

这里的实现逻辑是,遍历所有服务器列表,调用 this.apply 办法进行验证,验证通过的节点,会退出到 results 这个列表中返回。

public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {if (loadBalancerKey == null) {return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));            
    } else {List<Server> results = Lists.newArrayList();
        for (Server server: servers) {if (this.apply(new PredicateKey(loadBalancerKey, server))) {results.add(server);
            }
        }
        return results;            
    }
}

this.apply,会进入到 CompositePredicate.apply 办法中,代码如下。

//CompositePredicate.apply

@Override
public boolean apply(@Nullable PredicateKey input) {return delegate.apply(input);
}

delegate的实例是AbstractServerPredicate, 代码如下!

public static AbstractServerPredicate ofKeyPredicate(final Predicate<PredicateKey> p) {return new AbstractServerPredicate() {
        @Override
        @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP")
            public boolean apply(PredicateKey input) {return p.apply(input);
        }            
    };        
}

也就是说,会通过 AbstractServerPredicate.apply 办法进行过滤,其中,input示意指标服务器集群的某一个具体节点。

其中 p,示意AndPredicate 实例,这里用到了组合 predicate 进行判断,而这里的组合判断是 and 的关系,用到了 AndPredicate 实现。

 private static class AndPredicate<T> implements Predicate<T>, Serializable {
        private final List<? extends Predicate<? super T>> components;
        private static final long serialVersionUID = 0L;

        private AndPredicate(List<? extends Predicate<? super T>> components) {this.components = components;}

        public boolean apply(@Nullable T t) {for(int i = 0; i < this.components.size(); ++i) { // 遍历多个 predicate, 逐个进行判断。if (!((Predicate)this.components.get(i)).apply(t)) {return false;}
            }

            return true;
        }
 }

上述代码中,components 是由两个 predicate 组合而成

  1. AvailabilityPredicate,过滤熔断状态下的服务以及并发连贯过多的服务。
  2. ZoneAvoidancePredicate,过滤掉无可用区域的节点。

所以在 AndPredicateapply办法中,须要遍历这两个 predicate 逐个进行判断。

AvailablilityPredicate

过滤熔断状态下的服务以及并发连贯过多的服务,代码如下:

@Override
public boolean apply(@Nullable PredicateKey input) {LoadBalancerStats stats = getLBStats();
    if (stats == null) {return true;}
    return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
}

判断是否要跳过这个指标节点,实现逻辑如下。

private boolean shouldSkipServer(ServerStats stats) {  
        //niws.loadbalancer.availabilityFilteringRule.filterCircuitTripped 是否为 true
    if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) // 该 Server 是否为断路状态
        || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {// 本机发往这个 Server 未解决完的申请个数是否大于 Server 实例最大的沉闷连接数
        return true;
    }
    return false;
}

Server 是否为断路状态是如何判断的呢?

ServerStats 源码,这里具体源码咱们不贴了,说一下机制:

断路是通过工夫判断实现的,每次失败记录上次失败工夫。如果失败了,则触发判断,是否大于断路的最小失败次数,则判断:

计算断路持续时间:(2^ 失败次数)* 断路工夫因子 ,如果大于最大断路工夫,则取最大断路工夫。
判断以后工夫是否大于上次失败工夫 + 短路持续时间,如果小于,则是断路状态。
这里又波及三个配置(这里须要将 default 替换成你调用的微服务名称):

  • niws.loadbalancer.default.connectionFailureCountThreshold,默认为 3,触发判断是否断路的最小失败次数,也就是,默认如果失败三次,就会判断是否要断路了。
  • niws.loadbalancer.default.circuitTripTimeoutFactorSeconds,默认为 10,断路工夫因子,
  • niws.loadbalancer.default.circuitTripMaxTimeoutSeconds,默认为 30,最大断路工夫

ZoneAvoidancePredicate

ZoneAvoidancePredicate,过滤掉不可用区域的节点,代码如下!

@Override
public boolean apply(@Nullable PredicateKey input) {if (!ENABLED.get()) {// 查看 niws.loadbalancer.zoneAvoidanceRule.enabled 配置的相熟是否为 true(默认为 true)如果为 false 没有开启分片过滤 则不进行过滤
        return true;
    }
    //// 获取配置的分区字符串 默认为 UNKNOWN
    String serverZone = input.getServer().getZone();
    if (serverZone == null) { // 如果没有分区,则不须要进行过滤,间接返回即可
        // there is no zone information from the server, we do not want to filter
        // out this server
        return true;
    }
    // 获取负载平衡的状态信息
    LoadBalancerStats lbStats = getLBStats();
    if (lbStats == null) {
        // no stats available, do not filter
        return true;
    }
    // 如果可用区域小于等于 1,也不须要进行过滤间接返回
    if (lbStats.getAvailableZones().size() <= 1) {
        // only one zone is available, do not filter
        return true;
    }
    // 针对以后负载信息,创立一个区域快照,后续会用快照数据进行计算(防止后续因为数据变更导致判断计算不精确问题)Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
    if (!zoneSnapshot.keySet().contains(serverZone)) { // 如果快照信息中没有蕴含以后服务器所在区域,则也不须要进行判断。// The server zone is unknown to the load balancer, do not filter it out 
        return true;
    }
    logger.debug("Zone snapshots: {}", zoneSnapshot);
    // 获取无效区域
    Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
    logger.debug("Available zones: {}", availableZones);
    if (availableZones != null) { // 无效区域如果蕴含以后节点,则返回 true,否则返回 false,返回 false 示意这个区域不可用,不须要进行指标节点散发。return availableZones.contains(input.getServer().getZone());
    } else {return false;}
} 

LoadBalancerStats,在每次发动通信的时候,状态信息会在控制台打印如下!

DynamicServerListLoadBalancer for client goods-service initialized: DynamicServerListLoadBalancer:{NFLoadBalancer:name=goods-service,current list of Servers=[localhost:9091, localhost:9081],Load balancer stats=Zone stats: {unknown=[Zone:unknown;    Instance count:2;    Active connections count: 0;    Circuit breaker tripped count: 0;    Active connections per server: 0.0;]
},Server stats: [[Server:localhost:9091;    Zone:UNKNOWN;    Total Requests:0;    Successive connection failure:0;    Total blackout seconds:0;    Last connection made:Thu Jan 01 08:00:00 CST 1970;    First connection made: Thu Jan 01 08:00:00 CST 1970;    Active Connections:0;    total failure count in last (1000) msecs:0;    average resp time:0.0;    90 percentile resp time:0.0;    95 percentile resp time:0.0;    min resp time:0.0;    max resp time:0.0;    stddev resp time:0.0]
, [Server:localhost:9081;    Zone:UNKNOWN;    Total Requests:0;    Successive connection failure:0;    Total blackout seconds:0;    Last connection made:Thu Jan 01 08:00:00 CST 1970;    First connection made: Thu Jan 01 08:00:00 CST 1970;    Active Connections:0;    total failure count in last (1000) msecs:0;    average resp time:0.0;    90 percentile resp time:0.0;    95 percentile resp time:0.0;    min resp time:0.0;    max resp time:0.0;    stddev resp time:0.0]
]}ServerList:com.netflix.loadbalancer.ConfigurationBasedServerList@74ddb59a

getAvailableZones 办法的代码如下,用来计算无效可用区域。

public static Set<String> getAvailableZones(
    Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
    double triggeringBlackoutPercentage) {if (snapshot.isEmpty()) { // 如果快照信息为空,返回空
        return null;
    }
    // 定义一个汇合存储无效区域节点
    Set<String> availableZones = new HashSet<String>(snapshot.keySet());
    if (availableZones.size() == 1) { // 如果无效区域的汇合只有 1 个,间接返回
        return availableZones;
    }
    // 记录有问题的区域汇合
    Set<String> worstZones = new HashSet<String>();
    double maxLoadPerServer = 0; // 定义一个变量,保留所有 zone 中,均匀负载最高值
    // true:zone 无限可用
    // false:zone 全副可用
    boolean limitedZoneAvailability = false; //
    
    // 遍历所有的区域信息. 对每个 zone 进行逐个剖析
    for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {String zone = zoneEntry.getKey();  // 失去 zone 字符串
        ZoneSnapshot zoneSnapshot = zoneEntry.getValue(); // 失去该 zone 的快照信息
        int instanceCount = zoneSnapshot.getInstanceCount();
        if (instanceCount == 0) { // 若该 zone 内一个实例都木有了,那就是齐全不可用,那就移除该 zone,而后标记 zone 是无限可用的(并非全副可用)availableZones.remove(zone);
            limitedZoneAvailability = true;
        } else {double loadPerServer = zoneSnapshot.getLoadPerServer(); // 获取该区域的均匀负载
            // 机器的熔断总数 / 总实例数曾经超过了阈值(默认为 1,也就是全副熔断才会认为该 zone 齐全不可用)if (((double) zoneSnapshot.getCircuitTrippedCount())
                / instanceCount >= triggeringBlackoutPercentage
                || loadPerServer < 0) { //loadPerServer 示意以后区域所有节点都熔断了。availableZones.remove(zone); 
                limitedZoneAvailability = true;
            } else { // 进入到这个逻辑,阐明并不是齐全不可用,就看看区域的状态
                // 如果以后负载和最大负载相当,那认为以后区域状态很不好,退出到 worstZones 中
                if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
                    // they are the same considering double calculation
                    // round error
                    worstZones.add(zone);
                   
                } else if (loadPerServer > maxLoadPerServer) {// 或者若以后负载大于最大负载了。maxLoadPerServer = loadPerServer;
                    worstZones.clear();
                    worstZones.add(zone);
                }
            }
        }
    }
    // 如果最大负载小于设定的负载阈值 并且 limitedZoneAvailability=false
    // 阐明全副 zone 都可用,并且最大负载都还没有达到阈值,那就把全副 zone 返回   
    if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
        // zone override is not needed here
        return availableZones;
    }
    // 若最大负载超过阈值,就不能全副返回,则间接从负载最高的区域中随机返回一个,这么解决的目标是把负载最高的那个哥们 T 除掉,再返回后果。String zoneToAvoid = randomChooseZone(snapshot, worstZones);
    if (zoneToAvoid != null) {availableZones.remove(zoneToAvoid);
    }
    return availableZones;

}

上述逻辑还是比较复杂的,咱们通过一个简略的文字进行阐明:

  1. 如果 zone 为 null,那么也就是没有可用区域,间接返回 null
  2. 如果 zone 的可用区域为 1,也没有什么能够抉择的,间接返回这一个
  3. 应用 Set<String> worstZones 记录所有 zone 中比拟状态不好的的 zone 列表,用 maxLoadPerServer 示意所有 zone 中负载最高的区域;用 limitedZoneAvailability 示意是否是局部 zone 可用(true:局部可用,false:全副可用),接着咱们须要遍历所有的 zone 信息,逐个进行判断从而对无效 zone 的后果进行解决。

    1. 如果以后 zoneinstanceCount为 0,那就间接把这个区域移除就行,并且标记 limitedZoneAvailability 为局部可用,没什么好说的。
    2. 获取以后总的均匀负载 loadPerServer,如果 zone 内的 熔断实例数 / 总实例数 >= triggeringBlackoutPercentage 或者 loadPerServer < 0的话,阐明以后区域有问题,间接执行 remove 移除以后 zone,并且limitedZoneAvailability=true .

      1. (熔断实例数 / 总实例数 >= 阈值 , 标记为以后 zone 就不可用了(移除掉),这个很好了解。这个阈值为0.99999d 也就说所有的 Server 实例被熔断了,该 zone 才算不可用了).
      2. loadPerServer = -1,也就说当所有实例都熔断了。这两个条件判断都差不多,都是判断这个区域的可用性。
    3. 如果以后 zone 没有达到阈值,则判断区域的负载状况,从所有 zone 中找到负载最高的区域(负载差值在 0.000001d),则把这些区域退出到worstZones 列表,也就是这个汇合保留的是负载较高的区域。
  4. 通过上述遍历对区域数据进行计算后,最初要设置返回的无效区域数据。

    1. 最高负载 maxLoadPerServer 仍旧小于提供的triggeringLoad 阈值,并且并且limitedZoneAvailability=false(就是说所有 zone 都可用的状况下),那就返回所有的 zone:availableZones。(也就是所有区域的负载都在阈值范畴内并且每个区域内的节点都还存活状态,就全副返回)
    2. 否则,最大负载超过阈值或者某些区域存在局部不可用的节点时,就从这些负载较高的节点 worstZones 中随机移除一个

AbstractServerPredicate

在答复上面的代码,通过 getEligibleServers 判断可用服务节点后,如果可用节点不为 0,则执行 incrementAndGetModulo 办法进行轮询。

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
    if (eligible.size() == 0) {return Optional.absent();
    }
    return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}

该办法是通过轮询来实现,代码如下!

private int incrementAndGetModulo(int modulo) {for (;;) {int current = nextIndex.get();
        int next = (current + 1) % modulo;
        if (nextIndex.compareAndSet(current, next) && current < modulo)
            return current;
    }
}

服务列表的加载过程

在本实例中,咱们将服务列表配置在 application.properties 文件中,意味着在某个时候会加载这个列表,保留在某个地位,那它是在什么时候加载的呢?

RibbonClientConfiguration 这个配置类中,有上面这个 Bean 的申明,(该 Bean 是条件触发)它用来定义默认的负载平衡实现。

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
                                        ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
                                        IRule rule, IPing ping, ServerListUpdater serverListUpdater) {if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {return this.propertiesFactory.get(ILoadBalancer.class, config, name);
    }
    return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                                       serverListFilter, serverListUpdater);
}

后面剖析过,它的类关系图如下!

ZoneAwareLoadBalancer 在初始化时,会调用父类 DynamicServerListLoadBalancer 的构造方法,代码如下。

public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {super(clientConfig, rule, ping);
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
        restOfInit(clientConfig);
    }

restOfInit

restOfInit办法次要做两件事件。

  1. 开启动静更新 Server 的性能
  2. 更新 Server 列表
void restOfInit(IClientConfig clientConfig) {boolean primeConnection = this.isEnablePrimingConnections();
    // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
    this.setEnablePrimingConnections(false);
    enableAndInitLearnNewServersFeature(); // 开启动静更新 Server

    updateListOfServers(); // 更新 Server 列表
    
    
    if (primeConnection && this.getPrimeConnections() != null) {this.getPrimeConnections()
            .primeConnections(getReachableServers());
    }
    this.setEnablePrimingConnections(primeConnection);
    LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}

updateListOfServers

全量更新一次服务列表。

public void updateListOfServers() {List<T> servers = new ArrayList<T>();
    if (serverListImpl != null) {servers = serverListImpl.getUpdatedListOfServers();
        LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                     getIdentifier(), servers);

        if (filter != null) {servers = filter.getFilteredListOfServers(servers);
            LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                         getIdentifier(), servers);
        }
    }
    updateAllServerList(servers);
}

上述代码解释如下

  1. 因为咱们是通过 application.properties 文件配置的动态服务地址列表,所以此时 serverListImpl 的实例为:ConfigurationBasedServerList,调用 getUpdatedListOfServers 办法时,返回的是在 application.properties 文件中定义的服务列表。
  2. 判断是否须要 filter,如果有,则通过filter 进行服务列表过滤。

最初调用updateAllServerList,更新所有 Server 到本地缓存中。

protected void updateAllServerList(List<T> ls) {
    // other threads might be doing this - in which case, we pass
    if (serverListUpdateInProgress.compareAndSet(false, true)) {
        try {for (T s : ls) {s.setAlive(true); // set so that clients can start using these
                // servers right away instead
                // of having to wait out the ping cycle.
            }
            setServersList(ls);
            super.forceQuickPing();} finally {serverListUpdateInProgress.set(false);
        }
    }
}

动静 Ping 机制

在 Ribbon 中,基于 Ping 机制,指标服务地址也会产生动静变更,具体的实现形式在 DynamicServerListLoadBalancer.restOfInit 办法中

void restOfInit(IClientConfig clientConfig) {boolean primeConnection = this.isEnablePrimingConnections();
    // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
    this.setEnablePrimingConnections(false);
    enableAndInitLearnNewServersFeature();  // 开启定时工作动静更新

    updateListOfServers();
    if (primeConnection && this.getPrimeConnections() != null) {this.getPrimeConnections()
            .primeConnections(getReachableServers());
    }
    this.setEnablePrimingConnections(primeConnection);
    LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
public void enableAndInitLearnNewServersFeature() {LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
    serverListUpdater.start(updateAction);
}

留神,这里会启动一个定时工作,而定时工作所执行的程序是updateAction,它是一个匿名外部类,定义如下。

protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
    @Override
    public void doUpdate() {updateListOfServers();
    }
};

定时工作的启动办法如下,这个工作每隔 30s 执行一次。

public synchronized void start(final UpdateAction updateAction) {if (isActive.compareAndSet(false, true)) {final Runnable wrapperRunnable = new Runnable() {
            @Override
            public void run() {if (!isActive.get()) {if (scheduledFuture != null) {scheduledFuture.cancel(true);
                    }
                    return;
                }
                try {updateAction.doUpdate();  // 执行具体的工作。lastUpdated = System.currentTimeMillis();} catch (Exception e) {logger.warn("Failed one update cycle", e);
                }
            }
        };

        scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
            wrapperRunnable,
            initialDelayMs,  //1000
            refreshIntervalMs,  //30000 
            TimeUnit.MILLISECONDS 
        );
    } else {logger.info("Already active, no-op");
    }
}

当 30s 之后触发了 doUpdate 办法后,最终进入到 updateAllServerList 办法

protected void updateAllServerList(List<T> ls) {
    // other threads might be doing this - in which case, we pass
    if (serverListUpdateInProgress.compareAndSet(false, true)) {
        try {for (T s : ls) {s.setAlive(true); // set so that clients can start using these
                // servers right away instead
                // of having to wait out the ping cycle.
            }
            setServersList(ls);
            super.forceQuickPing();} finally {serverListUpdateInProgress.set(false);
        }
    }
}

其中,会调用 super.forceQuickPing(); 进行心跳衰弱检测。

public void forceQuickPing() {if (canSkipPing()) {return;}
    logger.debug("LoadBalancer [{}]:  forceQuickPing invoking", name);

    try {new Pinger(pingStrategy).runPinger();} catch (Exception e) {logger.error("LoadBalancer [{}]: Error running forceQuickPing()", name, e);
    }
}

RibbonLoadBalancerClient.execute

通过上述剖析,再回到 RibbonLoadBalancerClient.execute 办法!

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
    throws IOException {ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
    Server server = getServer(loadBalancer, hint);
    if (server == null) {throw new IllegalStateException("No instances available for" + serviceId);
    }
    RibbonServer ribbonServer = new RibbonServer(serviceId, server,
                                                 isSecure(server, serviceId),
                                                 serverIntrospector(serviceId).getMetadata(server));

    return execute(serviceId, ribbonServer, request);
}

此时, Server server = getServer(loadBalancer, hint);这行代码,会返回一个具体的指标服务器。

其中,在调用 execute 办法之前,会包装一个 RibbonServer 对象传递上来,它的次要作用是用来记录申请的负载信息。

@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance,
                     LoadBalancerRequest<T> request) throws IOException {
    Server server = null;
    if (serviceInstance instanceof RibbonServer) {server = ((RibbonServer) serviceInstance).getServer();}
    if (server == null) {throw new IllegalStateException("No instances available for" + serviceId);
    }

    RibbonLoadBalancerContext context = this.clientFactory
        .getLoadBalancerContext(serviceId);
    RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

    try {T returnVal = request.apply(serviceInstance);
        statsRecorder.recordStats(returnVal);  // 记录申请状态
        return returnVal;
    }
    // catch IOException and rethrow so RestTemplate behaves correctly
    catch (IOException ex) {statsRecorder.recordStats(ex); // 记录申请状态
        throw ex;
    }
    catch (Exception ex) {statsRecorder.recordStats(ex);
        ReflectionUtils.rethrowRuntimeException(ex);
    }
    return null;
}

request.apply

request 是 LoadBalancerRequest 接口,它外面提供了一个 apply 办法,然而从代码中咱们发现这个办法并没有实现类,那么它是在哪里实现的呢?

持续又往前剖析发现,这个 request 对象是从 LoadBalancerInterceptor 的 intercept 办法中传递过去的.

public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {URI originalUri = request.getURI();
    String serviceName = originalUri.getHost();
    Assert.state(serviceName != null, "Request URI does not contain a valid hostname:" + originalUri);
    return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}

而 request 的传递,是通过 this.requestFactory.createRequest(request, body, execution) 创立而来,于是咱们找到这个办法。

public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) {return (instance) -> {HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);
        LoadBalancerRequestTransformer transformer;
        if (this.transformers != null) {for(Iterator var6 = this.transformers.iterator(); var6.hasNext(); serviceRequest = transformer.transformRequest((HttpRequest)serviceRequest, instance)) {transformer = (LoadBalancerRequestTransformer)var6.next();}
        }

        return execution.execute((HttpRequest)serviceRequest, body);
    };
}

从代码中发现,它是一个用 lambda 表达式实现的匿名外部类。在该外部类中,创立了一个 ServiceRequestWrapper,这个 ServiceRequestWrapper 实际上就是 HttpRequestWrapper 的一个子类,ServiceRequestWrapper 重写了 HttpRequestWrapper 的 getURI()办法,重写的 URI 实际上就是通过调用 LoadBalancerClient 接口的 reconstructURI 函数来从新构建一个 URI 进行拜访。

InterceptingClientHttpRequest.execute

上述代码执行的 execution.execute,又会进入到InterceptingClientHttpRequest.execute 办法中,代码如下。

public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {if (this.iterator.hasNext()) {ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
        return nextInterceptor.intercept(request, body, this);
    }
    else {HttpMethod method = request.getMethod();
        Assert.state(method != null, "No standard HTTP method");
        ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method); // 留神这里
        request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
        if (body.length > 0) {if (delegate instanceof StreamingHttpOutputMessage) {StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
            }
            else {StreamUtils.copy(body, delegate.getBody());
            }
        }
        return delegate.execute();}
}

此时须要留神,request对象的实例是HttpRequestWrapper

request.getURI()

当调用 request.getURI() 获取指标地址创立 http 申请时,会调用 ServiceRequestWrapper 中的 .getURI() 办法。

@Override
public URI getURI() {URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
    return uri;
}

在这个办法中,调用 RibbonLoadBalancerClient 实例中的 reconstructURI 办法,依据 service-id 生成指标服务地址。

RibbonLoadBalancerClient.reconstructURI

public URI reconstructURI(ServiceInstance instance, URI original) {Assert.notNull(instance, "instance can not be null");
        String serviceId = instance.getServiceId(); // 获取实例 id,也就是服务名称
        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId); // 获取 RibbonLoadBalancerContext 上下文,这个是从 spring 容器中获取的对象实例。URI uri;
        Server server;
        if (instance instanceof RibbonServer) { // 如果 instance 为 RibbonServer
            RibbonServer ribbonServer = (RibbonServer) instance;
            server = ribbonServer.getServer();  // 获取指标服务器的 Server 信息
            uri = updateToSecureConnectionIfNeeded(original, ribbonServer); // 判断是否须要更新成一个平安连贯。}
        else { // 如果是一个一般的 http 地址
            server = new Server(instance.getScheme(), instance.getHost(),
                    instance.getPort());
            IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
            ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
            uri = updateToSecureConnectionIfNeeded(original, clientConfig,
                    serverIntrospector, server);
        }
        return context.reconstructURIWithServer(server, uri);  // 调用这个办法拼接成一个实在的指标服务器地址。}

版权申明:本博客所有文章除特地申明外,均采纳 CC BY-NC-SA 4.0 许可协定。转载请注明来自 Mic 带你学架构
如果本篇文章对您有帮忙,还请帮忙点个关注和赞,您的保持是我一直创作的能源。欢送关注同名微信公众号获取更多技术干货!

退出移动版