本系列代码地址:https://github.com/HashZhang/...

咱们应用 Spring Cloud 官网举荐的 Spring Cloud LoadBalancer 作为咱们的客户端负载均衡器。上一节咱们理解了 Spring Cloud LoadBalancer 的构造,接下来咱们来说一下咱们在应用 Spring Cloud LoadBalancer 要实现的性能:

  1. 咱们要实现不同集群之间不相互调用,通过实例的metamap中的zone配置,来辨别不同集群的实例。只有实例的metamap中的zone配置一样的实例能力相互调用。这个通过实现自定义的 ServiceInstanceListSupplier 即可实现
  2. 负载平衡的轮询算法,须要申请与申请之间隔离,不能共用同一个 position 导致某个申请失败之后的重试还是原来失败的实例。上一节看到的默认的 RoundRobinLoadBalancer 是所有线程共用同一个原子变量 position 每次申请原子加 1。在这种状况下会有问题:假如有微服务 A 有两个实例:实例 1 和实例 2。申请 A 达到时,RoundRobinLoadBalancer 返回实例 1,这时有申请 B 达到,RoundRobinLoadBalancer 返回实例 2。而后如果申请 A 失败重试,RoundRobinLoadBalancer 又返回了实例 1。这不是咱们冀望看到的。

针对这两个性能,咱们别离编写本人的实现。

Spring Cloud LoadBalancer 中的 zone 配置

Spring Cloud LoadBalancer 定义了 LoadBalancerZoneConfig

public class LoadBalancerZoneConfig {    //标识以后负载均衡器处于哪一个 zone    private String zone;    public LoadBalancerZoneConfig(String zone) {        this.zone = zone;    }    public String getZone() {        return zone;    }    public void setZone(String zone) {        this.zone = zone;    }}

如果没有引入 Eureka 相干依赖,则这个 zone 通过 spring.cloud.loadbalancer.zone 配置:
LoadBalancerAutoConfiguration

@Bean@ConditionalOnMissingBeanpublic LoadBalancerZoneConfig zoneConfig(Environment environment) {    return new LoadBalancerZoneConfig(environment.getProperty("spring.cloud.loadbalancer.zone"));}

如果引入了 Eureka 相干依赖,则如果在 Eureka 元数据配置了 zone,则这个 zone 会笼罩 Spring Cloud LoadBalancer 中的 LoadBalancerZoneConfig

EurekaLoadBalancerClientConfiguration

@PostConstructpublic void postprocess() {    if (!StringUtils.isEmpty(zoneConfig.getZone())) {        return;    }    String zone = getZoneFromEureka();    if (!StringUtils.isEmpty(zone)) {        if (LOG.isDebugEnabled()) {            LOG.debug("Setting the value of '" + LOADBALANCER_ZONE + "' to " + zone);        }        //设置 `LoadBalancerZoneConfig`        zoneConfig.setZone(zone);    }}private String getZoneFromEureka() {    String zone;    //是否配置了 spring.cloud.loadbalancer.eureka.approximateZoneFromHostname 为 true    boolean approximateZoneFromHostname = eurekaLoadBalancerProperties.isApproximateZoneFromHostname();    //如果配置了,则尝试从 Eureka 配置的 host 名称中提取    //理论就是以 . 宰割 host,而后第二个就是 zone    //例如 www.zone1.com 就是 zone1    if (approximateZoneFromHostname && eurekaConfig != null) {        return ZoneUtils.extractApproximateZone(this.eurekaConfig.getHostName(false));    }    else {        //否则,从 metadata map 中取 zone 这个 key        zone = eurekaConfig == null ? null : eurekaConfig.getMetadataMap().get("zone");        //如果这个 key 不存在,则从配置中以 region 从 zone 列表取第一个 zone 作为以后 zone        if (StringUtils.isEmpty(zone) && clientConfig != null) {            String[] zones = clientConfig.getAvailabilityZones(clientConfig.getRegion());            // Pick the first one from the regions we want to connect to            zone = zones != null && zones.length > 0 ? zones[0] : null;        }        return zone;    }}

实现 SameZoneOnlyServiceInstanceListSupplier

为了实现通过 zone 来过滤同一 zone 下的实例,并且相对不会返回非同一 zone 下的实例,咱们来编写代码:

SameZoneOnlyServiceInstanceListSupplier

/** * 只返回与以后实例同一个 Zone 的服务实例,不同 zone 之间的服务不相互调用 */public class SameZoneOnlyServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier {    /**     * 实例元数据 map 中示意 zone 配置的 key     */    private final String ZONE = "zone";    /**     * 以后 spring cloud loadbalancer 的 zone 配置     */    private final LoadBalancerZoneConfig zoneConfig;    private String zone;    public SameZoneOnlyServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, LoadBalancerZoneConfig zoneConfig) {        super(delegate);        this.zoneConfig = zoneConfig;    }    @Override    public Flux<List<ServiceInstance>> get() {        return getDelegate().get().map(this::filteredByZone);    }    //通过 zoneConfig 过滤    private List<ServiceInstance> filteredByZone(List<ServiceInstance> serviceInstances) {        if (zone == null) {            zone = zoneConfig.getZone();        }        if (zone != null) {            List<ServiceInstance> filteredInstances = new ArrayList<>();            for (ServiceInstance serviceInstance : serviceInstances) {                String instanceZone = getZone(serviceInstance);                if (zone.equalsIgnoreCase(instanceZone)) {                    filteredInstances.add(serviceInstance);                }            }            if (filteredInstances.size() > 0) {                return filteredInstances;            }        }        /**         * @see ZonePreferenceServiceInstanceListSupplier 在没有雷同zone实例的时候返回的是所有实例         * 咱们这里为了实现不同 zone 之间不相互调用须要返回空列表         */        return List.of();    }    //读取实例的 zone,没有配置则为 null    private String getZone(ServiceInstance serviceInstance) {        Map<String, String> metadata = serviceInstance.getMetadata();        if (metadata != null) {            return metadata.get(ZONE);        }        return null;    }}

在之前章节的讲述中,咱们提到了咱们应用 spring-cloud-sleuth 作为链路追踪库。咱们想能够通过其中的 traceId,来辨别到底是否是同一个申请。

RoundRobinWithRequestSeparatedPositionLoadBalancer

//肯定必须是实现ReactorServiceInstanceLoadBalancer//而不是ReactorLoadBalancer<ServiceInstance>//因为注册的时候是ReactorServiceInstanceLoadBalancer@Log4j2public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer {    private final ServiceInstanceListSupplier serviceInstanceListSupplier;    //每次申请算上重试不会超过1分钟    //对于超过1分钟的,这种申请必定比拟重,不应该重试    private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)            //随机初始值,避免每次都是从第一个开始调用            .build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000)));    private final String serviceId;    private final Tracer tracer;    public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) {        this.serviceInstanceListSupplier = serviceInstanceListSupplier;        this.serviceId = serviceId;        this.tracer = tracer;    }    @Override    public Mono<Response<ServiceInstance>> choose(Request request) {        return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances));    }    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {        if (serviceInstances.isEmpty()) {            log.warn("No servers available for service: " + this.serviceId);            return new EmptyResponse();        }        return getInstanceResponseByRoundRobin(serviceInstances);    }    private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) {        if (serviceInstances.isEmpty()) {            log.warn("No servers available for service: " + this.serviceId);            return new EmptyResponse();        }        //为了解决原始算法不同调用并发可能导致一个申请重试雷同的实例        Span currentSpan = tracer.currentSpan();        if (currentSpan == null) {            currentSpan = tracer.newTrace();        }        long l = currentSpan.context().traceId();        AtomicInteger seed = positionCache.get(l);        int s = seed.getAndIncrement();        int pos = s % serviceInstances.size();        log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size());        return new DefaultResponse(serviceInstances.stream()                //实例返回列表程序可能不同,为了保持一致,先排序再取                .sorted(Comparator.comparing(ServiceInstance::getInstanceId))                .collect(Collectors.toList()).get(pos));    }}

在上一节,咱们提到了能够通过 @LoadBalancerClients 注解配置默认的负载均衡器配置,咱们这里就是通过这种形式进行配置。首先在 spring.factories 中增加主动配置类:

spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\com.github.hashjang.spring.cloud.iiford.service.common.auto.LoadBalancerAutoConfiguration

而后编写这个主动配置类,其实很简略,就是增加一个 @LoadBalancerClients 注解,设置默认配置类:

LoadBalancerAutoConfiguration

@Configuration(proxyBeanMethods = false)@LoadBalancerClients(defaultConfiguration = DefaultLoadBalancerConfiguration.class)public class LoadBalancerAutoConfiguration {}

编写这个默认配置类,将下面咱们实现的两个类,组装进去:

DefaultLoadBalancerConfiguration

@Configuration(proxyBeanMethods = false)public class DefaultLoadBalancerConfiguration {    @Bean    public ServiceInstanceListSupplier serviceInstanceListSupplier(            DiscoveryClient discoveryClient,            Environment env,            ConfigurableApplicationContext context,            LoadBalancerZoneConfig zoneConfig    ) {        ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context                .getBeanProvider(LoadBalancerCacheManager.class);        return  //开启服务实例缓存                new CachingServiceInstanceListSupplier(                        //只能返回同一个 zone 的服务实例                        new SameZoneOnlyServiceInstanceListSupplier(                                //启用通过 discoveryClient 的服务发现                                new DiscoveryClientServiceInstanceListSupplier(                                        discoveryClient, env                                ),                                zoneConfig                        )                        , cacheManagerProvider.getIfAvailable()                );    }    @Bean    public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(            Environment environment,            ServiceInstanceListSupplier serviceInstanceListSupplier,            Tracer tracer    ) {        String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);        return new RoundRobinWithRequestSeparatedPositionLoadBalancer(                serviceInstanceListSupplier,                name,                tracer        );    }}

这样,咱们就实现了自定义的负载均衡器。也了解了 Spring Cloud LoadBalancer 的应用。

咱们这一节详细分析在咱们我的项目中应用 Spring Cloud LoadBalancer 要实现的性能,实现了自定义的负载均衡器,也了解了 Spring Cloud LoadBalancer 的应用。下一节咱们应用单元测试验证咱们要实现的这些性能是否无效。

微信搜寻“我的编程喵”关注公众号,每日一刷,轻松晋升技术,斩获各种offer