关于spring-cloud:Spring-Cloud-升级之路20200x-7使用-Spring-Cloud-LoadBalancer-2

99次阅读

共计 10816 个字符,预计需要花费 28 分钟才能阅读完成。

本我的项目代码地址:https://github.com/HashZhang/…

咱们应用 Spring Cloud 官网举荐的 Spring Cloud LoadBalancer 作为咱们的客户端负载均衡器。上一节咱们理解了 Spring Cloud LoadBalancer 的构造,接下来咱们来说一下咱们在应用 Spring Cloud LoadBalancer 要实现的性能:

  1. 咱们要实现 不同集群之间不相互调用,通过实例的 metamap 中的 zone 配置 ,来辨别不同集群的实例。只有实例的metamap 中的 zone 配置一样的实例能力相互调用。这个通过实现自定义的 ServiceInstanceListSupplier 即可实现
  2. 负载平衡的轮询算法,须要申请与申请之间隔离,不能共用同一个 position 导致某个申请失败之后的重试还是原来失败的实例。上一节看到的默认的 RoundRobinLoadBalancer 是所有线程共用同一个原子变量 position 每次申请原子加 1。在这种状况下会有问题:假如有微服务 A 有两个实例:实例 1 和实例 2。申请 A 达到时,RoundRobinLoadBalancer 返回实例 1,这时有申请 B 达到,RoundRobinLoadBalancer 返回实例 2。而后如果申请 A 失败重试,RoundRobinLoadBalancer 又返回了实例 1。这不是咱们冀望看到的。

针对这两个性能,咱们别离编写本人的实现。

实现不同集群不相互调用

Spring Cloud LoadBalancer 中的 zone 配置

Spring Cloud LoadBalancer 定义了 LoadBalancerZoneConfig

public class LoadBalancerZoneConfig {
    // 标识以后负载均衡器处于哪一个 zone
    private String zone;
    public LoadBalancerZoneConfig(String zone) {this.zone = zone;}
    public String getZone() {return zone;}
    public void setZone(String zone) {this.zone = zone;}
}

如果没有引入 Eureka 相干依赖,则这个 zone 通过 spring.cloud.loadbalancer.zone 配置:
LoadBalancerAutoConfiguration

@Bean
@ConditionalOnMissingBean
public LoadBalancerZoneConfig zoneConfig(Environment environment) {return new LoadBalancerZoneConfig(environment.getProperty("spring.cloud.loadbalancer.zone"));
}

如果引入了 Eureka 相干依赖,则如果在 Eureka 元数据配置了 zone,则这个 zone 会笼罩 Spring Cloud LoadBalancer 中的 LoadBalancerZoneConfig

EurekaLoadBalancerClientConfiguration

@PostConstruct
public void postprocess() {if (!StringUtils.isEmpty(zoneConfig.getZone())) {return;}
    String zone = getZoneFromEureka();
    if (!StringUtils.isEmpty(zone)) {if (LOG.isDebugEnabled()) {LOG.debug("Setting the value of'" + LOADBALANCER_ZONE + "'to" + zone);
        }
        // 设置 `LoadBalancerZoneConfig`
        zoneConfig.setZone(zone);
    }
}

private String getZoneFromEureka() {
    String zone;
    // 是否配置了 spring.cloud.loadbalancer.eureka.approximateZoneFromHostname 为 true
    boolean approximateZoneFromHostname = eurekaLoadBalancerProperties.isApproximateZoneFromHostname();
    // 如果配置了,则尝试从 Eureka 配置的 host 名称中提取
    // 理论就是以 . 宰割 host,而后第二个就是 zone
    // 例如 www.zone1.com 就是 zone1
    if (approximateZoneFromHostname && eurekaConfig != null) {return ZoneUtils.extractApproximateZone(this.eurekaConfig.getHostName(false));
    }
    else {
        // 否则,从 metadata map 中取 zone 这个 key
        zone = eurekaConfig == null ? null : eurekaConfig.getMetadataMap().get("zone");
        // 如果这个 key 不存在,则从配置中以 region 从 zone 列表取第一个 zone 作为以后 zone
        if (StringUtils.isEmpty(zone) && clientConfig != null) {String[] zones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
            // Pick the first one from the regions we want to connect to
            zone = zones != null && zones.length > 0 ? zones[0] : null;
        }
        return zone;
    }
}

实现 SameZoneOnlyServiceInstanceListSupplier

为了实现通过 zone 来过滤同一 zone 下的实例,并且相对不会返回非同一 zone 下的实例,咱们来编写代码:

SameZoneOnlyServiceInstanceListSupplier

/**
 * 只返回与以后实例同一个 Zone 的服务实例,不同 zone 之间的服务不相互调用
 */
public class SameZoneOnlyServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier {
    /**
     * 实例元数据 map 中示意 zone 配置的 key
     */
    private final String ZONE = "zone";
    /**
     * 以后 spring cloud loadbalancer 的 zone 配置
     */
    private final LoadBalancerZoneConfig zoneConfig;
    private String zone;

    public SameZoneOnlyServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, LoadBalancerZoneConfig zoneConfig) {super(delegate);
        this.zoneConfig = zoneConfig;
    }

    @Override
    public Flux<List<ServiceInstance>> get() {return getDelegate().get().map(this::filteredByZone);
    }

    // 通过 zoneConfig 过滤
    private List<ServiceInstance> filteredByZone(List<ServiceInstance> serviceInstances) {if (zone == null) {zone = zoneConfig.getZone();
        }
        if (zone != null) {List<ServiceInstance> filteredInstances = new ArrayList<>();
            for (ServiceInstance serviceInstance : serviceInstances) {String instanceZone = getZone(serviceInstance);
                if (zone.equalsIgnoreCase(instanceZone)) {filteredInstances.add(serviceInstance);
                }
            }
            if (filteredInstances.size() > 0) {return filteredInstances;}
        }
        /**
         * @see ZonePreferenceServiceInstanceListSupplier 在没有雷同 zone 实例的时候返回的是所有实例
         * 咱们这里为了实现不同 zone 之间不相互调用须要返回空列表
         */
        return List.of();}

    // 读取实例的 zone,没有配置则为 null
    private String getZone(ServiceInstance serviceInstance) {Map<String, String> metadata = serviceInstance.getMetadata();
        if (metadata != null) {return metadata.get(ZONE);
        }
        return null;
    }
}

实现申请与申请之间隔离的负载平衡算法

在之前章节的讲述中,咱们提到了咱们应用 spring-cloud-sleuth 作为链路追踪库。咱们想能够通过其中的 traceId,来辨别到底是否是同一个申请。

RoundRobinWithRequestSeparatedPositionLoadBalancer

// 肯定必须是实现 ReactorServiceInstanceLoadBalancer
// 而不是 ReactorLoadBalancer<ServiceInstance>
// 因为注册的时候是 ReactorServiceInstanceLoadBalancer
@Log4j2
public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer {
    private final ServiceInstanceListSupplier serviceInstanceListSupplier;
    // 每次申请算上重试不会超过 1 分钟
    // 对于超过 1 分钟的,这种申请必定比拟重,不应该重试
    private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
            // 随机初始值,避免每次都是从第一个开始调用
            .build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000)));
    private final String serviceId;
    private final Tracer tracer;


    public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) {
        this.serviceInstanceListSupplier = serviceInstanceListSupplier;
        this.serviceId = serviceId;
        this.tracer = tracer;
    }

    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances));
    }

    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {if (serviceInstances.isEmpty()) {log.warn("No servers available for service:" + this.serviceId);
            return new EmptyResponse();}
        return getInstanceResponseByRoundRobin(serviceInstances);
    }

    private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) {if (serviceInstances.isEmpty()) {log.warn("No servers available for service:" + this.serviceId);
            return new EmptyResponse();}
        // 为了解决原始算法不同调用并发可能导致一个申请重试雷同的实例
        Span currentSpan = tracer.currentSpan();
        if (currentSpan == null) {currentSpan = tracer.newTrace();
        }
        long l = currentSpan.context().traceId();
        AtomicInteger seed = positionCache.get(l);
        int s = seed.getAndIncrement();
        int pos = s % serviceInstances.size();
        log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size());
        return new DefaultResponse(serviceInstances.stream()
                // 实例返回列表程序可能不同,为了保持一致,先排序再取
                .sorted(Comparator.comparing(ServiceInstance::getInstanceId))
                .collect(Collectors.toList()).get(pos));
    }
}

将上述两个元素退出咱们自定义的 LoadBalancerClient 并启用

在上一节,咱们提到了能够通过 @LoadBalancerClients 注解配置默认的负载均衡器配置,咱们这里就是通过这种形式进行配置。首先在 spring.factories 中增加主动配置类:

spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.github.hashjang.spring.cloud.iiford.service.common.auto.LoadBalancerAutoConfiguration

而后编写这个主动配置类,其实很简略,就是增加一个 @LoadBalancerClients 注解,设置默认配置类:

LoadBalancerAutoConfiguration

@Configuration(proxyBeanMethods = false)
@LoadBalancerClients(defaultConfiguration = DefaultLoadBalancerConfiguration.class)
public class LoadBalancerAutoConfiguration {}

编写这个默认配置类,将下面咱们实现的两个类,组装进去:

DefaultLoadBalancerConfiguration

@Configuration(proxyBeanMethods = false)
public class DefaultLoadBalancerConfiguration {

    @Bean
    public ServiceInstanceListSupplier serviceInstanceListSupplier(
            DiscoveryClient discoveryClient,
            Environment env,
            ConfigurableApplicationContext context,
            LoadBalancerZoneConfig zoneConfig
    ) {
        ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context
                .getBeanProvider(LoadBalancerCacheManager.class);
        return  // 开启服务实例缓存
                new CachingServiceInstanceListSupplier(
                        // 只能返回同一个 zone 的服务实例
                        new SameZoneOnlyServiceInstanceListSupplier(
                                // 启用通过 discoveryClient 的服务发现
                                new DiscoveryClientServiceInstanceListSupplier(discoveryClient, env),
                                zoneConfig
                        )
                        , cacheManagerProvider.getIfAvailable());
    }

    @Bean
    public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(
            Environment environment,
            ServiceInstanceListSupplier serviceInstanceListSupplier,
            Tracer tracer
    ) {String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
        return new RoundRobinWithRequestSeparatedPositionLoadBalancer(
                serviceInstanceListSupplier,
                name,
                tracer
        );
    }
}

这样,咱们就实现了自定义的负载均衡器。也了解了 Spring Cloud LoadBalancer 的应用。接下来,咱们来单元测试下这些性能。集成测试前面会有独自的章节,不必焦急。

单元测试上述性能

通过这届单元测试,咱们也能够理解下个别咱们实现 spring cloud 自定义的根底组件,怎么去单元测试。

这里的单元测试次要测试三个场景:

  1. 只返回同一个 zone 下的实例,其余 zone 的不会返回
  2. 对于多个申请,每个申请返回的与上次的实例不同。
  3. 对于多线程的每个申请,如果重试,返回的都是不同的实例

编写代码:
LoadBalancerTest

//SpringRunner 也蕴含了 MockitoJUnitRunner,所以 @Mock 等注解也失效了
@RunWith(SpringRunner.class)
@SpringBootTest(properties = {LoadBalancerEurekaAutoConfiguration.LOADBALANCER_ZONE + "=zone1"})
public class LoadBalancerTest {@EnableAutoConfiguration(exclude = EurekaDiscoveryClientConfiguration.class)
    @Configuration
    public static class App {
        @Bean
        public DiscoveryClient discoveryClient() {ServiceInstance zone1Instance1 = Mockito.mock(ServiceInstance.class);
            ServiceInstance zone1Instance2 = Mockito.mock(ServiceInstance.class);
            ServiceInstance zone2Instance3 = Mockito.mock(ServiceInstance.class);
            Map<String, String> zone1 = Map.ofEntries(Map.entry("zone", "zone1")
            );
            Map<String, String> zone2 = Map.ofEntries(Map.entry("zone", "zone2")
            );
            when(zone1Instance1.getMetadata()).thenReturn(zone1);
            when(zone1Instance1.getInstanceId()).thenReturn("instance1");
            when(zone1Instance2.getMetadata()).thenReturn(zone1);
            when(zone1Instance2.getInstanceId()).thenReturn("instance2");
            when(zone2Instance3.getMetadata()).thenReturn(zone2);
            when(zone2Instance3.getInstanceId()).thenReturn("instance3");
            DiscoveryClient mock = Mockito.mock(DiscoveryClient.class);
            Mockito.when(mock.getInstances("testService"))
                    .thenReturn(List.of(zone1Instance1, zone1Instance2, zone2Instance3));
            return mock;
        }
    }

    @Autowired
    private LoadBalancerClientFactory loadBalancerClientFactory;
    @Autowired
    private Tracer tracer;

    /**
     * 只返回同一个 zone 下的实例
     */
    @Test
    public void testFilteredByZone() {
        ReactiveLoadBalancer<ServiceInstance> testService =
                loadBalancerClientFactory.getInstance("testService");
        for (int i = 0; i < 100; i++) {ServiceInstance server = Mono.from(testService.choose()).block().getServer();
            // 必须处于和以后实例同一个 zone 下
            Assert.assertEquals(server.getMetadata().get("zone"), "zone1");
        }
    }

    /**
     * 返回不同的实例
     */
    @Test
    public void testReturnNext() {
        ReactiveLoadBalancer<ServiceInstance> testService =
                loadBalancerClientFactory.getInstance("testService");
        // 获取服务实例
        ServiceInstance server1 = Mono.from(testService.choose()).block().getServer();
        ServiceInstance server2 = Mono.from(testService.choose()).block().getServer();
        // 每次抉择的是不同实例
        Assert.assertNotEquals(server1.getInstanceId(), server2.getInstanceId());
    }

    /**
     * 跨线程,默认状况下是可能返回同一实例的,在咱们的实现下,放弃
     * span 则会返回下一个实例,这样保障多线程环境同一个 request 重试会返回下一实例
     * @throws Exception
     */
    @Test
    public void testSameSpanReturnNext() throws Exception {Span span = tracer.nextSpan();
        // 测试 100 次
        for (int i = 0; i < 100; i++) {try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
                ReactiveLoadBalancer<ServiceInstance> testService =
                        loadBalancerClientFactory.getInstance("testService");
                // 获取实例
                ServiceInstance server1 = Mono.from(testService.choose()).block().getServer();
                AtomicReference<ServiceInstance> server2 = new AtomicReference<>();
                Thread thread = new Thread(() -> {
                    // 放弃 trace,这样就会认为依然是同一个申请上下文,这样模仿重试
                    try (Tracer.SpanInScope cleared2 = tracer.withSpanInScope(span)) {server2.set(Mono.from(testService.choose()).block().getServer());
                    }
                });
                thread.start();
                thread.join();
                System.out.println(i);
                Assert.assertNotEquals(server1.getInstanceId(), server2.get().getInstanceId());
            }
        }
    }
}

运行测试,测试通过。

微信搜寻“我的编程喵”关注公众号,加作者微信,每日一刷,轻松晋升技术,斩获各种 offer

正文完
 0