关于java:深度解析Spring-Cloud-Ribbon的实现源码及原理

Ribbon的核心作用就是进行申请的负载平衡,它的基本原理如下图所示。就是客户端集成Ribbon这个组件,Ribbon中会针对曾经配置的服务提供者地址列表进行负载平衡的计算,失去一个指标地址之后,再发动申请。

那么接下来,咱们从两个层面去剖析Ribbon的原理

  1. @LoadBalanced 注解如何让一般的RestTemplate具备负载平衡的能力
  2. OpenFeign集成Ribbon的实现原理

@LoadBalancer注解解析过程剖析

在应用RestTemplate的时候,咱们加了一个@LoadBalance注解,就能让这个RestTemplate在申请时,就领有客户端负载平衡的能力。

@Bean
@LoadBalanced
RestTemplate restTemplate() {
    return new RestTemplate();
}

而后,咱们关上@LoadBalanced这个注解,能够发现该注解仅仅是申明了一个@qualifier注解。

@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {

}

@qualifier注解的作用

咱们平时在应用注解去注入一个Bean时,都是采纳@Autowired。并且大家应该晓得@Autowired是能够注入一个List或者Map的。给大家举个例子(在一个springboot利用中)

定义一个TestClass

@AllArgsConstructor
@Data
public class TestClass {
    private String name;
}

申明一个配置类,并注入TestClass

@Configuration
public class TestConfig {

    @Bean("testClass1")
    TestClass testClass(){
        return new TestClass("testClass1");
    }

    @Bean("testClass2")
    TestClass testClass2(){
        return new TestClass("testClass2");
    }
}

定义一个Controller,用于测试, 留神,此时咱们应用的是@Autowired来注入一个List汇合

@RestController
public class TestController {

    @Autowired(required = false)
    List<TestClass> testClasses= Collections.emptyList();

    @GetMapping("/test")
    public Object test(){
        return testClasses;
    }
}

此时拜访:http://localhost:8080/test , 失去的后果是

[
    {
        name: "testClass1"
    },
    {
        name: "testClass2"
    }
]

批改TestConfigTestController

@Configuration
public class TestConfig {

    @Bean("testClass1")
    @Qualifier
    TestClass testClass(){
        return new TestClass("testClass1");
    }

    @Bean("testClass2")
    TestClass testClass2(){
        return new TestClass("testClass2");
    }
}
@RestController
public class TestController {

    @Autowired(required = false)
    @Qualifier
    List<TestClass> testClasses= Collections.emptyList();

    @GetMapping("/test")
    public Object test(){
        return testClasses;
    }
}

再次拜访:http://localhost:8080/test , 失去的后果是

[
    {
        name: "testClass1"
    }
]

@LoadBalancer注解筛选及拦挡

理解了@qualifier注解的作用后,再回到@LoadBalancer注解上,就不难理解了。

因为咱们须要扫描到减少了@LoadBalancer注解的RestTemplate实例,所以,@LoadBalancer能够实现这个动作,它的具体的实现代码如下:

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
public class LoadBalancerAutoConfiguration {

   @LoadBalanced
   @Autowired(required = false)
   private List<RestTemplate> restTemplates = Collections.emptyList();
}

从这个代码中能够看出,在LoadBalancerAutoConfiguration这个配置类中,会应用同样的形式,把配置了@LoadBalanced注解的RestTemplate注入到restTemplates汇合中。

拿到了RestTemplate之后,在LoadBalancerInterceptorConfig配置类中,会针对这些RestTemplate进行拦挡,实现代码如下:

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
public class LoadBalancerAutoConfiguration {

    @LoadBalanced
    @Autowired(required = false)
    private List<RestTemplate> restTemplates = Collections.emptyList();

    //省略....

    @Bean
    @ConditionalOnMissingBean
    public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
        return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
    }

    @Configuration(proxyBeanMethods = false)
    @Conditional(RetryMissingOrDisabledCondition.class)
    static class LoadBalancerInterceptorConfig {
        
        //装载一个LoadBalancerInterceptor的实例到IOC容器。
        @Bean
        public LoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
                LoadBalancerRequestFactory requestFactory) {
            return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
        }
        
        //会遍历所有加了@LoadBalanced注解的RestTemplate,在原有的拦截器之上,再减少了一个LoadBalancerInterceptor
        @Bean
        @ConditionalOnMissingBean
        public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
            return restTemplate -> {
                List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
                list.add(loadBalancerInterceptor);
                restTemplate.setInterceptors(list);
            };
        }

    }
    //省略....
}

LoadBalancerInterceptor

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
      final ClientHttpRequestExecution execution) throws IOException {
   final URI originalUri = request.getURI();
   String serviceName = originalUri.getHost();
   Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
   return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}

RestTemplate调用过程

咱们在程序中,应用上面的代码发动近程申请时

restTemplate.getForObject(url,String.class);

它的整个调用过程如下。

RestTemplate.getForObject

​ —–> AbstractClientHttpRequest.execute()

​ —–>AbstractBufferingClientHttpRequest.executeInternal()

​ —–> InterceptingClientHttpRequest.executeInternal()

​ —–> InterceptingClientHttpRequest.execute()

InterceptingClientHttpRequest.execute()办法的代码如下。

@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
    if (this.iterator.hasNext()) { //遍历所有的拦截器,通过拦截器进行一一解决。
        ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
        return nextInterceptor.intercept(request, body, this);
    }
    else {
        HttpMethod method = request.getMethod();
        Assert.state(method != null, "No standard HTTP method");
        ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
        request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
        if (body.length > 0) {
            if (delegate instanceof StreamingHttpOutputMessage) {
                StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
            }
            else {
                StreamUtils.copy(body, delegate.getBody());
            }
        }
        return delegate.execute();
    }
}

LoadBalancerInterceptor

LoadBalancerInterceptor是一个拦截器,当一个被@Loadbalanced注解润饰的RestTemplate对象发动HTTP申请时,会被LoadBalancerInterceptorintercept办法拦挡,

在这个办法中间接通过getHost办法就能够获取到服务名(因为咱们在应用RestTemplate调用服务的时候,应用的是服务名而不是域名,所以这里能够通过getHost间接拿到服务名而后去调用execute办法发动申请)

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
      final ClientHttpRequestExecution execution) throws IOException {
   final URI originalUri = request.getURI();
   String serviceName = originalUri.getHost();
   Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
   return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}

LoadBalancerClient其实是一个接口,咱们看一下它的类图,它有一个惟一的实现类:RibbonLoadBalancerClient

RibbonLoadBalancerClient.execute

RibbonLoadBalancerClient这个类的代码比拟长,咱们次要看一下他的外围办法execute

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
    throws IOException {
    ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
    Server server = getServer(loadBalancer, hint);
    if (server == null) {
        throw new IllegalStateException("No instances available for " + serviceId);
    }
    RibbonServer ribbonServer = new RibbonServer(serviceId, server,
                                                 isSecure(server, serviceId),
                                                 serverIntrospector(serviceId).getMetadata(server));

    return execute(serviceId, ribbonServer, request);
}

上述代码的实现逻辑如下:

  • 依据serviceId取得一个ILoadBalancer,实例为:ZoneAwareLoadBalancer
  • 调用getServer办法去获取一个服务实例
  • 判断Server的值是否为空。这里的Server实际上就是传统的一个服务节点,这个对象存储了服务节点的一些元数据,比方host、port等

getServer

getServer是用来取得一个具体的服务节点,它的实现如下

protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
    if (loadBalancer == null) {
        return null;
    }
    // Use 'default' on a null hint, or just pass it on?
    return loadBalancer.chooseServer(hint != null ? hint : "default");
}

通过代码能够看到,getServer理论调用了IloadBalancer.chooseServer这个办法,ILoadBalancer这个是一个负载均衡器接口。

public interface ILoadBalancer {
    //addServers示意向负载均衡器中保护的实例列表减少服务实例
    public void addServers(List<Server> newServers);
    //chooseServer示意通过某种策略,从负载平衡服务器中挑选出一个具体的服务实例
    public Server chooseServer(Object key);
    //markServerDown示意用来告诉和标识负载均衡器中某个具体实例曾经进行服务,否则负载均衡器在下一次获取服务实例清单前都会认为这个服务实例是失常工作的
    public void markServerDown(Server server);
    //getReachableServers示意获取以后失常工作的服务实例列表
    public List<Server> getReachableServers();
    //getAllServers示意获取所有的服务实例列表,包含失常的服务和进行工作的服务
    public List<Server> getAllServers();
}

ILoadBalancer的类关系图如下:

从整个类的关系图来看,BaseLoadBalancer类实现了根底的负载平衡,而DynamicServerListLoadBalancer和ZoneAwareLoadBalancer则是在负载平衡策略的根底上做了一些性能扩大。

  • AbstractLoadBalancer实现了ILoadBalancer接口,它定义了服务分组的枚举类/chooseServer(用来选取一个服务实例)/getServerList(获取某一个分组中的所有服务实例)/getLoadBalancerStats用来取得一个LoadBalancerStats对象,这个对象保留了每一个服务的状态信息。
  • BaseLoadBalancer,它实现了作为负载均衡器的基本功能,比方服务列表保护、服务存活状态监测、负载平衡算法抉择Server等。然而它只是实现基本功能,在有些简单场景中还无奈实现,比方动静服务列表、Server过滤、Zone区域意识(服务之间的调用心愿尽可能是在同一个区域内进行,缩小提早)。
  • DynamicServerListLoadBalancer是BaseLoadbalancer的一个子类,它对根底负载平衡提供了扩大,从名字上能够看出,它提供了动静服务列表的个性
  • ZoneAwareLoadBalancer 它是在DynamicServerListLoadBalancer的根底上,减少了以Zone的模式来配置多个LoadBalancer的性能。

那在getServer办法中,loadBalancer.chooseServer具体的实现类是哪一个呢?咱们找到RibbonClientConfiguration这个类

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
                                        ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
                                        IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
    if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
        return this.propertiesFactory.get(ILoadBalancer.class, config, name);
    }
    return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                                       serverListFilter, serverListUpdater);
}

从上述申明中,发现如果没有自定义ILoadBalancer,则间接返回一个ZoneAwareLoadBalancer

ZoneAwareLoadBalancer

Zone示意区域的意思,区域指的就是天文区域的概念,个别较大规模的互联网公司,都会做跨区域部署,这样做有几个益处,第一个是为不同地区的用户提供最近的拜访节点缩小拜访提早,其次是为了保障高可用,做容灾解决。

而ZoneAwareLoadBalancer就是提供了具备区域意识的负载均衡器,它的次要作用是对Zone进行了感知,保障每个Zone外面的负载平衡策略都是隔离的,它并不保障A区域过去的申请肯定会动员到A区域对应的Server内。真正实现这个需要的是ZonePreferenceServerListFilter/ZoneAffinityServerListFilter

ZoneAwareLoadBalancer的外围性能是

  • 若开启了区域意识,且zone的个数 > 1,就持续区域抉择逻辑
  • 依据ZoneAvoidanceRule.getAvailableZones()办法拿到可用区们(会T除掉齐全不可用的区域们,以及可用然而负载最高的一个区域)
  • 从可用区zone们中,通过ZoneAvoidanceRule.randomChooseZone随机选一个zone进去 (该随机听从权重规定:谁的zone外面Server数量最多,被选中的概率越大)
  • 在选中的zone外面的所有Server中,采纳该zone对对应的Rule,进行choose
@Override
public Server chooseServer(Object key) {
    //ENABLED,示意是否用区域意识的choose抉择Server,默认是true,
    //如果禁用了区域、或者只有一个zone,就间接依照父类的逻辑来进行解决,父类默认采纳轮询算法
    if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
        logger.debug("Zone aware logic disabled or there is only one zone");
        return super.chooseServer(key);
    }
    Server server = null;
    try {
        LoadBalancerStats lbStats = getLoadBalancerStats();
        Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
        logger.debug("Zone snapshots: {}", zoneSnapshot);
        if (triggeringLoad == null) {
            triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
                "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
        }

        if (triggeringBlackoutPercentage == null) {
            triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
                "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
        }
        //依据相干阈值计算可用区域
        Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
        logger.debug("Available zones: {}", availableZones);
        if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {
            //从可用区域中随机抉择一个区域,zone外面的服务器节点越多,被选中的概率越大
            String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
            logger.debug("Zone chosen: {}", zone);
            if (zone != null) {
                //依据zone取得该zone中的LB,而后依据该Zone的负载平衡算法抉择一个server
                BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
                server = zoneLoadBalancer.chooseServer(key);
            }
        }
    } catch (Exception e) {
        logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
    }
    if (server != null) {
        return server;
    } else {
        logger.debug("Zone avoidance logic is not invoked.");
        return super.chooseServer(key);
    }
}

BaseLoadBalancer.chooseServer

假如咱们当初没有应用多区域部署,那么负载策略会执行到BaseLoadBalancer.chooseServer

public Server chooseServer(Object key) {
    if (counter == null) {
        counter = createCounter();
    }
    counter.increment();
    if (rule == null) {
        return null;
    } else {
        try {
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}

依据默认的负载平衡算法来取得指定的服务节点。默认的算法是RoundBin。

rule.choose

rule代表负载平衡算法规定,它有很多实现,IRule的实现类关系图如下。

默认状况下,rule的实现为ZoneAvoidanceRule,它是在RibbonClientConfiguration这个配置类中定义的,代码如下:

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
// Order is important here, last should be the default, first should be optional
// see
// https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653
@Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class,
        RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
public class RibbonClientConfiguration {
    @Bean
    @ConditionalOnMissingBean
    public IRule ribbonRule(IClientConfig config) {
        if (this.propertiesFactory.isSet(IRule.class, name)) {
            return this.propertiesFactory.get(IRule.class, config, name);
        }
        ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
        rule.initWithNiwsConfig(config);
        return rule;
    }
}

所以,在BaseLoadBalancer.chooseServer中调用rule.choose(key);,理论会进入到ZoneAvoidanceRulechoose办法

@Override
public Server choose(Object key) {
    ILoadBalancer lb = getLoadBalancer(); //获取负载均衡器
    Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); //通过该办法获取指标服务
    if (server.isPresent()) {
        return server.get();
    } else {
        return null;
    }       
}

复合判断server所在区域的性能和server的可用性抉择server

次要剖析chooseRoundRobinAfterFiltering办法。

chooseRoundRobinAfterFiltering

从办法名称能够看进去,它是通过对指标服务集群通过过滤算法过滤一遍后,再应用轮询实现负载平衡。

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
    List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
    if (eligible.size() == 0) {
        return Optional.absent();
    }
    return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}

CompositePredicate.getEligibleServers

应用主过滤条件对所有实例过滤并返回过滤后的清单,

@Override
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
    //
    List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
    
    //依照fallbacks中存储的过滤器程序进行过滤(此处就行先ZoneAvoidancePredicate而后AvailabilityPredicate)
    Iterator<AbstractServerPredicate> i = fallbacks.iterator();
    while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
           && i.hasNext()) {
        AbstractServerPredicate predicate = i.next();
        result = predicate.getEligibleServers(servers, loadBalancerKey);
    }
    return result;
}

顺次应用次过滤条件对主过滤条件的后果进行过滤*

  • //不论是主过滤条件还是次过滤条件,都须要判断上面两个条件
  • //只有有一个条件合乎,就不再过滤,将以后后果返回供线性轮询

    • 第1个条件:过滤后的实例总数>=最小过滤实例数(默认为1)
    • 第2个条件:过滤互的实例比例>最小过滤百分比(默认为0)

getEligibleServers

这里的实现逻辑是,遍历所有服务器列表,调用this.apply办法进行验证,验证通过的节点,会退出到results这个列表中返回。

public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
    if (loadBalancerKey == null) {
        return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));            
    } else {
        List<Server> results = Lists.newArrayList();
        for (Server server: servers) {
            if (this.apply(new PredicateKey(loadBalancerKey, server))) {
                results.add(server);
            }
        }
        return results;            
    }
}

this.apply,会进入到CompositePredicate.apply办法中,代码如下。

//CompositePredicate.apply

@Override
public boolean apply(@Nullable PredicateKey input) {
    return delegate.apply(input);
}

delegate的实例是AbstractServerPredicate, 代码如下!

public static AbstractServerPredicate ofKeyPredicate(final Predicate<PredicateKey> p) {
    return new AbstractServerPredicate() {
        @Override
        @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP")
            public boolean apply(PredicateKey input) {
            return p.apply(input);
        }            
    };        
}

也就是说,会通过AbstractServerPredicate.apply办法进行过滤,其中,input示意指标服务器集群的某一个具体节点。

其中p,示意AndPredicate实例,这里用到了组合predicate进行判断,而这里的组合判断是and的关系,用到了AndPredicate实现。

 private static class AndPredicate<T> implements Predicate<T>, Serializable {
        private final List<? extends Predicate<? super T>> components;
        private static final long serialVersionUID = 0L;

        private AndPredicate(List<? extends Predicate<? super T>> components) {
            this.components = components;
        }

        public boolean apply(@Nullable T t) {
            for(int i = 0; i < this.components.size(); ++i) { //遍历多个predicate,逐个进行判断。
                if (!((Predicate)this.components.get(i)).apply(t)) {
                    return false;
                }
            }

            return true;
        }
 }

上述代码中,components是由两个predicate组合而成

  1. AvailabilityPredicate,过滤熔断状态下的服务以及并发连贯过多的服务。
  2. ZoneAvoidancePredicate,过滤掉无可用区域的节点。

所以在AndPredicateapply办法中,须要遍历这两个predicate逐个进行判断。

AvailablilityPredicate

过滤熔断状态下的服务以及并发连贯过多的服务,代码如下:

@Override
public boolean apply(@Nullable PredicateKey input) {
    LoadBalancerStats stats = getLBStats();
    if (stats == null) {
        return true;
    }
    return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
}

判断是否要跳过这个指标节点,实现逻辑如下。

private boolean shouldSkipServer(ServerStats stats) {  
        //niws.loadbalancer.availabilityFilteringRule.filterCircuitTripped是否为true
    if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) //该Server是否为断路状态
        || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {//本机发往这个Server未解决完的申请个数是否大于Server实例最大的沉闷连接数
        return true;
    }
    return false;
}

Server是否为断路状态是如何判断的呢?

ServerStats源码,这里具体源码咱们不贴了,说一下机制:

断路是通过工夫判断实现的,每次失败记录上次失败工夫。如果失败了,则触发判断,是否大于断路的最小失败次数,则判断:

计算断路持续时间: (2^失败次数)* 断路工夫因子,如果大于最大断路工夫,则取最大断路工夫。
判断以后工夫是否大于上次失败工夫+短路持续时间,如果小于,则是断路状态。
这里又波及三个配置(这里须要将default替换成你调用的微服务名称):

  • niws.loadbalancer.default.connectionFailureCountThreshold,默认为3, 触发判断是否断路的最小失败次数,也就是,默认如果失败三次,就会判断是否要断路了。
  • niws.loadbalancer.default.circuitTripTimeoutFactorSeconds, 默认为10, 断路工夫因子,
  • niws.loadbalancer.default.circuitTripMaxTimeoutSeconds,默认为30,最大断路工夫

ZoneAvoidancePredicate

ZoneAvoidancePredicate,过滤掉不可用区域的节点,代码如下!

@Override
public boolean apply(@Nullable PredicateKey input) {
    if (!ENABLED.get()) {//查看niws.loadbalancer.zoneAvoidanceRule.enabled配置的相熟是否为true(默认为true)如果为false没有开启分片过滤 则不进行过滤
        return true;
    }
    ////获取配置的分区字符串 默认为UNKNOWN
    String serverZone = input.getServer().getZone();
    if (serverZone == null) { //如果没有分区,则不须要进行过滤,间接返回即可
        // there is no zone information from the server, we do not want to filter
        // out this server
        return true;
    }
    //获取负载平衡的状态信息
    LoadBalancerStats lbStats = getLBStats();
    if (lbStats == null) {
        // no stats available, do not filter
        return true;
    }
    //如果可用区域小于等于1,也不须要进行过滤间接返回
    if (lbStats.getAvailableZones().size() <= 1) {
        // only one zone is available, do not filter
        return true;
    }
    //针对以后负载信息,创立一个区域快照,后续会用快照数据进行计算(防止后续因为数据变更导致判断计算不精确问题)
    Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
    if (!zoneSnapshot.keySet().contains(serverZone)) { //如果快照信息中没有蕴含以后服务器所在区域,则也不须要进行判断。
        // The server zone is unknown to the load balancer, do not filter it out 
        return true;
    }
    logger.debug("Zone snapshots: {}", zoneSnapshot);
    //获取无效区域
    Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
    logger.debug("Available zones: {}", availableZones);
    if (availableZones != null) { //无效区域如果蕴含以后节点,则返回true,否则返回false, 返回false示意这个区域不可用,不须要进行指标节点散发。
        return availableZones.contains(input.getServer().getZone());
    } else {
        return false;
    }
} 

LoadBalancerStats,在每次发动通信的时候,状态信息会在控制台打印如下!

DynamicServerListLoadBalancer for client goods-service initialized: DynamicServerListLoadBalancer:{NFLoadBalancer:name=goods-service,current list of Servers=[localhost:9091, localhost:9081],Load balancer stats=Zone stats: {unknown=[Zone:unknown;    Instance count:2;    Active connections count: 0;    Circuit breaker tripped count: 0;    Active connections per server: 0.0;]
},Server stats: [[Server:localhost:9091;    Zone:UNKNOWN;    Total Requests:0;    Successive connection failure:0;    Total blackout seconds:0;    Last connection made:Thu Jan 01 08:00:00 CST 1970;    First connection made: Thu Jan 01 08:00:00 CST 1970;    Active Connections:0;    total failure count in last (1000) msecs:0;    average resp time:0.0;    90 percentile resp time:0.0;    95 percentile resp time:0.0;    min resp time:0.0;    max resp time:0.0;    stddev resp time:0.0]
, [Server:localhost:9081;    Zone:UNKNOWN;    Total Requests:0;    Successive connection failure:0;    Total blackout seconds:0;    Last connection made:Thu Jan 01 08:00:00 CST 1970;    First connection made: Thu Jan 01 08:00:00 CST 1970;    Active Connections:0;    total failure count in last (1000) msecs:0;    average resp time:0.0;    90 percentile resp time:0.0;    95 percentile resp time:0.0;    min resp time:0.0;    max resp time:0.0;    stddev resp time:0.0]
]}ServerList:com.netflix.loadbalancer.ConfigurationBasedServerList@74ddb59a

getAvailableZones办法的代码如下,用来计算无效可用区域。

public static Set<String> getAvailableZones(
    Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
    double triggeringBlackoutPercentage) {
    if (snapshot.isEmpty()) { //如果快照信息为空,返回空
        return null;
    }
    //定义一个汇合存储无效区域节点
    Set<String> availableZones = new HashSet<String>(snapshot.keySet());
    if (availableZones.size() == 1) { //如果无效区域的汇合只有1个,间接返回
        return availableZones;
    }
    //记录有问题的区域汇合
    Set<String> worstZones = new HashSet<String>();
    double maxLoadPerServer = 0; //定义一个变量,保留所有zone中,均匀负载最高值
    // true:zone无限可用
    // false:zone全副可用
    boolean limitedZoneAvailability = false; //
    
    //遍历所有的区域信息. 对每个zone进行逐个剖析
    for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
        String zone = zoneEntry.getKey();  //失去zone字符串
        ZoneSnapshot zoneSnapshot = zoneEntry.getValue(); //失去该zone的快照信息
        int instanceCount = zoneSnapshot.getInstanceCount();
        if (instanceCount == 0) { //若该zone内一个实例都木有了,那就是齐全不可用,那就移除该zone,而后标记zone是无限可用的(并非全副可用)
            availableZones.remove(zone);
            limitedZoneAvailability = true;
        } else {
            double loadPerServer = zoneSnapshot.getLoadPerServer(); //获取该区域的均匀负载
            // 机器的熔断总数 / 总实例数曾经超过了阈值(默认为1,也就是全副熔断才会认为该zone齐全不可用)
            if (((double) zoneSnapshot.getCircuitTrippedCount())
                / instanceCount >= triggeringBlackoutPercentage
                || loadPerServer < 0) { //loadPerServer示意以后区域所有节点都熔断了。
                availableZones.remove(zone); 
                limitedZoneAvailability = true;
            } else { // 进入到这个逻辑,阐明并不是齐全不可用,就看看区域的状态
                // 如果以后负载和最大负载相当,那认为以后区域状态很不好,退出到worstZones中
                if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
                    // they are the same considering double calculation
                    // round error
                    worstZones.add(zone);
                   
                } else if (loadPerServer > maxLoadPerServer) {// 或者若以后负载大于最大负载了。
                    maxLoadPerServer = loadPerServer;
                    worstZones.clear();
                    worstZones.add(zone);
                }
            }
        }
    }
    // 如果最大负载小于设定的负载阈值 并且limitedZoneAvailability=false
    // 阐明全副zone都可用,并且最大负载都还没有达到阈值,那就把全副zone返回   
    if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
        // zone override is not needed here
        return availableZones;
    }
    //若最大负载超过阈值, 就不能全副返回,则间接从负载最高的区域中随机返回一个,这么解决的目标是把负载最高的那个哥们T除掉,再返回后果。
    String zoneToAvoid = randomChooseZone(snapshot, worstZones);
    if (zoneToAvoid != null) {
        availableZones.remove(zoneToAvoid);
    }
    return availableZones;

}

上述逻辑还是比较复杂的,咱们通过一个简略的文字进行阐明:

  1. 如果zone为null,那么也就是没有可用区域,间接返回null
  2. 如果zone的可用区域为1,也没有什么能够抉择的,间接返回这一个
  3. 应用Set<String> worstZones记录所有zone中比拟状态不好的的zone列表,用maxLoadPerServer示意所有zone中负载最高的区域;用limitedZoneAvailability示意是否是局部zone可用(true:局部可用,false:全副可用),接着咱们须要遍历所有的zone信息,逐个进行判断从而对无效zone的后果进行解决。

    1. 如果以后zoneinstanceCount为0,那就间接把这个区域移除就行,并且标记limitedZoneAvailability为局部可用,没什么好说的。
    2. 获取以后总的均匀负载loadPerServer,如果zone内的熔断实例数 / 总实例数 >= triggeringBlackoutPercentage 或者 loadPerServer < 0的话,阐明以后区域有问题,间接执行remove移除以后zone,并且limitedZoneAvailability=true .

      1. (熔断实例数 / 总实例数 >= 阈值,标记为以后zone就不可用了(移除掉),这个很好了解。这个阈值为0.99999d也就说所有的Server实例被熔断了,该zone才算不可用了).
      2. loadPerServer = -1,也就说当所有实例都熔断了。这两个条件判断都差不多,都是判断这个区域的可用性。
    3. 如果以后zone没有达到阈值,则判断区域的负载状况,从所有zone中找到负载最高的区域(负载差值在0.000001d),则把这些区域退出到worstZones列表,也就是这个汇合保留的是负载较高的区域。
  4. 通过上述遍历对区域数据进行计算后,最初要设置返回的无效区域数据。

    1. 最高负载maxLoadPerServer仍旧小于提供的triggeringLoad阈值,并且并且limitedZoneAvailability=false(就是说所有zone都可用的状况下),那就返回所有的zone:availableZones。 (也就是所有区域的负载都在阈值范畴内并且每个区域内的节点都还存活状态,就全副返回)
    2. 否则,最大负载超过阈值或者某些区域存在局部不可用的节点时,就从这些负载较高的节点worstZones中随机移除一个

AbstractServerPredicate

在答复上面的代码,通过getEligibleServers判断可用服务节点后,如果可用节点不为0 ,则执行incrementAndGetModulo办法进行轮询。

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
    List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
    if (eligible.size() == 0) {
        return Optional.absent();
    }
    return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}

该办法是通过轮询来实现,代码如下!

private int incrementAndGetModulo(int modulo) {
    for (;;) {
        int current = nextIndex.get();
        int next = (current + 1) % modulo;
        if (nextIndex.compareAndSet(current, next) && current < modulo)
            return current;
    }
}

服务列表的加载过程

在本实例中,咱们将服务列表配置在application.properties文件中,意味着在某个时候会加载这个列表,保留在某个地位,那它是在什么时候加载的呢?

RibbonClientConfiguration这个配置类中,有上面这个Bean的申明,(该Bean是条件触发)它用来定义默认的负载平衡实现。

@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
                                        ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
                                        IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
    if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
        return this.propertiesFactory.get(ILoadBalancer.class, config, name);
    }
    return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
                                       serverListFilter, serverListUpdater);
}

后面剖析过,它的类关系图如下!

ZoneAwareLoadBalancer在初始化时,会调用父类DynamicServerListLoadBalancer的构造方法,代码如下。

public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
                                         ServerList<T> serverList, ServerListFilter<T> filter,
                                         ServerListUpdater serverListUpdater) {
        super(clientConfig, rule, ping);
        this.serverListImpl = serverList;
        this.filter = filter;
        this.serverListUpdater = serverListUpdater;
        if (filter instanceof AbstractServerListFilter) {
            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
        }
        restOfInit(clientConfig);
    }

restOfInit

restOfInit办法次要做两件事件。

  1. 开启动静更新Server的性能
  2. 更新Server列表
void restOfInit(IClientConfig clientConfig) {
    boolean primeConnection = this.isEnablePrimingConnections();
    // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
    this.setEnablePrimingConnections(false);
    enableAndInitLearnNewServersFeature(); //开启动静更新Server

    updateListOfServers(); //更新Server列表
    
    
    if (primeConnection && this.getPrimeConnections() != null) {
        this.getPrimeConnections()
            .primeConnections(getReachableServers());
    }
    this.setEnablePrimingConnections(primeConnection);
    LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}

updateListOfServers

全量更新一次服务列表。

public void updateListOfServers() {
    List<T> servers = new ArrayList<T>();
    if (serverListImpl != null) {
        servers = serverListImpl.getUpdatedListOfServers();
        LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                     getIdentifier(), servers);

        if (filter != null) {
            servers = filter.getFilteredListOfServers(servers);
            LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                         getIdentifier(), servers);
        }
    }
    updateAllServerList(servers);
}

上述代码解释如下

  1. 因为咱们是通过application.properties文件配置的动态服务地址列表,所以此时serverListImpl的实例为:ConfigurationBasedServerList,调用getUpdatedListOfServers办法时,返回的是在application.properties文件中定义的服务列表。
  2. 判断是否须要filter,如果有,则通过filter进行服务列表过滤。

最初调用updateAllServerList,更新所有Server到本地缓存中。

protected void updateAllServerList(List<T> ls) {
    // other threads might be doing this - in which case, we pass
    if (serverListUpdateInProgress.compareAndSet(false, true)) {
        try {
            for (T s : ls) {
                s.setAlive(true); // set so that clients can start using these
                // servers right away instead
                // of having to wait out the ping cycle.
            }
            setServersList(ls);
            super.forceQuickPing();
        } finally {
            serverListUpdateInProgress.set(false);
        }
    }
}

动静Ping机制

在Ribbon中,基于Ping机制,指标服务地址也会产生动静变更,具体的实现形式在DynamicServerListLoadBalancer.restOfInit办法中

void restOfInit(IClientConfig clientConfig) {
    boolean primeConnection = this.isEnablePrimingConnections();
    // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
    this.setEnablePrimingConnections(false);
    enableAndInitLearnNewServersFeature();  //开启定时工作动静更新

    updateListOfServers();
    if (primeConnection && this.getPrimeConnections() != null) {
        this.getPrimeConnections()
            .primeConnections(getReachableServers());
    }
    this.setEnablePrimingConnections(primeConnection);
    LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
public void enableAndInitLearnNewServersFeature() {
    LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
    serverListUpdater.start(updateAction);
}

留神,这里会启动一个定时工作,而定时工作所执行的程序是updateAction,它是一个匿名外部类,定义如下。

protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
    @Override
    public void doUpdate() {
        updateListOfServers();
    }
};

定时工作的启动办法如下,这个工作每隔30s执行一次。

public synchronized void start(final UpdateAction updateAction) {
    if (isActive.compareAndSet(false, true)) {
        final Runnable wrapperRunnable = new Runnable() {
            @Override
            public void run() {
                if (!isActive.get()) {
                    if (scheduledFuture != null) {
                        scheduledFuture.cancel(true);
                    }
                    return;
                }
                try {
                    updateAction.doUpdate();  //执行具体的工作。
                    lastUpdated = System.currentTimeMillis();
                } catch (Exception e) {
                    logger.warn("Failed one update cycle", e);
                }
            }
        };

        scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
            wrapperRunnable,
            initialDelayMs,  //1000
            refreshIntervalMs,  //30000 
            TimeUnit.MILLISECONDS 
        );
    } else {
        logger.info("Already active, no-op");
    }
}

当30s之后触发了doUpdate办法后,最终进入到updateAllServerList办法

protected void updateAllServerList(List<T> ls) {
    // other threads might be doing this - in which case, we pass
    if (serverListUpdateInProgress.compareAndSet(false, true)) {
        try {
            for (T s : ls) {
                s.setAlive(true); // set so that clients can start using these
                // servers right away instead
                // of having to wait out the ping cycle.
            }
            setServersList(ls);
            super.forceQuickPing();
        } finally {
            serverListUpdateInProgress.set(false);
        }
    }
}

其中,会调用 super.forceQuickPing();进行心跳衰弱检测。

public void forceQuickPing() {
    if (canSkipPing()) {
        return;
    }
    logger.debug("LoadBalancer [{}]:  forceQuickPing invoking", name);

    try {
        new Pinger(pingStrategy).runPinger();
    } catch (Exception e) {
        logger.error("LoadBalancer [{}]: Error running forceQuickPing()", name, e);
    }
}

RibbonLoadBalancerClient.execute

通过上述剖析,再回到RibbonLoadBalancerClient.execute办法!

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
    throws IOException {
    ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
    Server server = getServer(loadBalancer, hint);
    if (server == null) {
        throw new IllegalStateException("No instances available for " + serviceId);
    }
    RibbonServer ribbonServer = new RibbonServer(serviceId, server,
                                                 isSecure(server, serviceId),
                                                 serverIntrospector(serviceId).getMetadata(server));

    return execute(serviceId, ribbonServer, request);
}

此时, Server server = getServer(loadBalancer, hint);这行代码,会返回一个具体的指标服务器。

其中,在调用execute办法之前,会包装一个RibbonServer对象传递上来,它的次要作用是用来记录申请的负载信息。

@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance,
                     LoadBalancerRequest<T> request) throws IOException {
    Server server = null;
    if (serviceInstance instanceof RibbonServer) {
        server = ((RibbonServer) serviceInstance).getServer();
    }
    if (server == null) {
        throw new IllegalStateException("No instances available for " + serviceId);
    }

    RibbonLoadBalancerContext context = this.clientFactory
        .getLoadBalancerContext(serviceId);
    RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

    try {
        T returnVal = request.apply(serviceInstance);
        statsRecorder.recordStats(returnVal);  //记录申请状态
        return returnVal;
    }
    // catch IOException and rethrow so RestTemplate behaves correctly
    catch (IOException ex) {
        statsRecorder.recordStats(ex); //记录申请状态
        throw ex;
    }
    catch (Exception ex) {
        statsRecorder.recordStats(ex);
        ReflectionUtils.rethrowRuntimeException(ex);
    }
    return null;
}

request.apply

request是LoadBalancerRequest接口,它外面提供了一个apply办法,然而从代码中咱们发现这个办法并没有实现类,那么它是在哪里实现的呢?

持续又往前剖析发现,这个request对象是从LoadBalancerInterceptor的intercept办法中传递过去的.

public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
    URI originalUri = request.getURI();
    String serviceName = originalUri.getHost();
    Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
    return (ClientHttpResponse)this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}

而request的传递,是通过this.requestFactory.createRequest(request, body, execution)创立而来,于是咱们找到这个办法。

public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) {
    return (instance) -> {
        HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);
        LoadBalancerRequestTransformer transformer;
        if (this.transformers != null) {
            for(Iterator var6 = this.transformers.iterator(); var6.hasNext(); serviceRequest = transformer.transformRequest((HttpRequest)serviceRequest, instance)) {
                transformer = (LoadBalancerRequestTransformer)var6.next();
            }
        }

        return execution.execute((HttpRequest)serviceRequest, body);
    };
}

从代码中发现,它是一个用lambda表达式实现的匿名外部类。在该外部类中,创立了一个ServiceRequestWrapper,这个ServiceRequestWrapper实际上就是HttpRequestWrapper的一个子类,ServiceRequestWrapper重写了HttpRequestWrapper的getURI()办法,重写的URI实际上就是通过调用LoadBalancerClient接口的reconstructURI函数来从新构建一个URI进行拜访。

InterceptingClientHttpRequest.execute

上述代码执行的execution.execute,又会进入到InterceptingClientHttpRequest.execute办法中,代码如下。

public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
    if (this.iterator.hasNext()) {
        ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
        return nextInterceptor.intercept(request, body, this);
    }
    else {
        HttpMethod method = request.getMethod();
        Assert.state(method != null, "No standard HTTP method");
        ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method); //留神这里
        request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
        if (body.length > 0) {
            if (delegate instanceof StreamingHttpOutputMessage) {
                StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
                streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
            }
            else {
                StreamUtils.copy(body, delegate.getBody());
            }
        }
        return delegate.execute();
    }
}

此时须要留神,request对象的实例是HttpRequestWrapper

request.getURI()

当调用request.getURI()获取指标地址创立http申请时,会调用ServiceRequestWrapper中的.getURI()办法。

@Override
public URI getURI() {
    URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
    return uri;
}

在这个办法中,调用RibbonLoadBalancerClient实例中的reconstructURI办法,依据service-id生成指标服务地址。

RibbonLoadBalancerClient.reconstructURI

public URI reconstructURI(ServiceInstance instance, URI original) {
        Assert.notNull(instance, "instance can not be null");
        String serviceId = instance.getServiceId(); //获取实例id,也就是服务名称
        RibbonLoadBalancerContext context = this.clientFactory
                .getLoadBalancerContext(serviceId); //获取RibbonLoadBalancerContext上下文,这个是从spring容器中获取的对象实例。

        URI uri;
        Server server;
        if (instance instanceof RibbonServer) { //如果instance为RibbonServer
            RibbonServer ribbonServer = (RibbonServer) instance;
            server = ribbonServer.getServer();  //获取指标服务器的Server信息
            uri = updateToSecureConnectionIfNeeded(original, ribbonServer); //判断是否须要更新成一个平安连贯。
        }
        else { //如果是一个一般的http地址
            server = new Server(instance.getScheme(), instance.getHost(),
                    instance.getPort());
            IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
            ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
            uri = updateToSecureConnectionIfNeeded(original, clientConfig,
                    serverIntrospector, server);
        }
        return context.reconstructURIWithServer(server, uri);  //调用这个办法拼接成一个实在的指标服务器地址。
}

版权申明:本博客所有文章除特地申明外,均采纳 CC BY-NC-SA 4.0 许可协定。转载请注明来自 Mic带你学架构
如果本篇文章对您有帮忙,还请帮忙点个关注和赞,您的保持是我一直创作的能源。欢送关注同名微信公众号获取更多技术干货!

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理