乐趣区

关于spring-cloud:SpringCloud升级之路20200x版32-改进负载均衡算法

本系列代码地址:https://github.com/JoJoTec/sp…

在后面一节,咱们梳理了实现 Feign 断路器以及线程隔离的思路,这一节,咱们先不看如何源码实现(因为源码中会蕴含负载平衡算法的改良局部),先来探讨下如何优化目前的负载平衡算法。

之前的负载平衡算法

  1. 获取服务实例列表,将实例列表依照 ip 端口排序,如果不排序即便 position 是下一个可能也代表的是之前曾经调用过的实例
  2. 依据申请中的 traceId,从 本地缓存中以 traceId 为 key 获取一个初始值为随机数的原子变量 position,这样避免所有申请都从第一个实例开始调用,之后第二个、第三个这样。
  3. position 原子加一,之后对实例个数取余,返回对应下标的实例进行调用

其中申请蕴含 traceId 是来自于咱们应用了 spring-cloud-sleuth 链路追踪,基于这种机制咱们能保障申请不会重试到之前曾经调用过的实例。源码是:

// 肯定必须是实现 ReactorServiceInstanceLoadBalancer
// 而不是 ReactorLoadBalancer<ServiceInstance>
// 因为注册的时候是 ReactorServiceInstanceLoadBalancer
@Log4j2
public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer {
    private final ServiceInstanceListSupplier serviceInstanceListSupplier;
    // 每次申请算上重试不会超过 1 分钟
    // 对于超过 1 分钟的,这种申请必定比拟重,不应该重试
    private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
            // 随机初始值,避免每次都是从第一个开始调用
            .build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000)));
    private final String serviceId;
    private final Tracer tracer;


    public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) {
        this.serviceInstanceListSupplier = serviceInstanceListSupplier;
        this.serviceId = serviceId;
        this.tracer = tracer;
    }
    
    // 每次重试,其实都会调用这个 choose 办法从新获取一个实例
    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances));
    }

    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {if (serviceInstances.isEmpty()) {log.warn("No servers available for service:" + this.serviceId);
            return new EmptyResponse();}
        return getInstanceResponseByRoundRobin(serviceInstances);
    }

    private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) {if (serviceInstances.isEmpty()) {log.warn("No servers available for service:" + this.serviceId);
            return new EmptyResponse();}
        // 为了解决原始算法不同调用并发可能导致一个申请重试雷同的实例
        // 从 sleuth 的 Tracer 中获取以后申请的上下文
        Span currentSpan = tracer.currentSpan();
        // 如果上下文不存在,则可能不是前端用户申请,而是其余某些机制触发,咱们就创立一个新的上下文
        if (currentSpan == null) {currentSpan = tracer.newTrace();
        }
        // 从申请上下文中获取申请的 traceId,用来惟一标识一个申请
        long l = currentSpan.context().traceId();
        AtomicInteger seed = positionCache.get(l);
        int s = seed.getAndIncrement();
        int pos = s % serviceInstances.size();
        log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size());
        return new DefaultResponse(serviceInstances.stream()
                // 实例返回列表程序可能不同,为了保持一致,先排序再取
                .sorted(Comparator.comparing(ServiceInstance::getInstanceId))
                .collect(Collectors.toList()).get(pos));
    }
}

然而在这次申请突增很多的时候,这种负载平衡算法还是给咱们带来了问题。

首先,本次突增,咱们并没有采取扩容,导致 本次的性能压力对于压力的平衡散布十分敏感 。举个例子是,假如微服务 A 有 9 个实例,在业务顶峰点来的时候, 最现实的状况是保障无论何时这 9 个负载压力都齐全平衡 ,然而因为咱们应用了初始值为随机数的原子变量 position,尽管从一天的总量上来看,负责平衡压力必定是平衡,然而在某一小段时间内,很可能 压力全都跑到了某几个实例上,导致这几个实例被压垮,熔断,而后又都跑到了另外的几个实例上,又被压垮,熔断,如此恶性循环。

而后,咱们部署采纳的是 k8s 部署,同一个虚拟机下面可能会跑很多微服务的 pod。在某些状况下,同一个微服务的多个 pod 可能会跑到同一个虚拟机 Node 上,这个能够从 pod 的 ip 网段上看进去:例如某个微服务有如下 7 个实例:10.238.13.12:8181,10.238.13.24:8181,10.238.15.12:8181,10.238.17.12:8181,10.238.20.220:8181,10.238.21.31:8181,10.238.21.121:8181,那么 10.238.13.12:8181 与 10.238.13.24:8181 很可能在同一个 Node 上,10.238.21.31:8181 和 10.238.21.121:8181 很可能在同一个 Node 上。咱们重试, 须要优先重试与之前重试过的实例尽量不在同一个 Node 上的实例,因为同一个 Node 上的实例只有有一个有问题或者压力过大,其余的基本上也有问题或者压力过大。

最初,如果调用某个实例始终失败,那么这个实例的调用优先级须要排在其余失常的实例前面。这个对于缩小疾速刷新公布(一下子启动很多实例之后停掉多个老实例,实例个数大于重试次数配置)对于用户的影响,以及某个可用区忽然产生异样导致多个实例下线对用户的影响,以及业务压力曾经过来,压力变小后,须要关掉不再须要的实例,导致大量实例产生迁徙的时候对用户的影响,有很大的作用。

针对以上问题的优化计划

咱们针对下面三个问题,提出了一种优化后的解决方案:

  1. 针对每次申请,记录:

    1. 本次申请曾经调用过哪些实例 -> 申请调用过的实例缓存
    2. 调用的实例,以后有多少申请在解决中 -> 实例运行申请数
    3. 调用的实例,最近申请错误率 -> 实例申请错误率
  2. 随机将实例列表打乱,避免在以上三个指标都雷同时,总是将申请发给同一个实例。
  3. 依照 以后申请没有调用过靠前 -> 错误率越小越靠前 的程序排序 -> 实例运行申请数越小越靠前
  4. 取排好序之后的列表第一个实例作为本次负载平衡的实例

具体实现是:以下的代码来自于:https://github.com/JoJoTec/sp…

咱们应用了依赖:

<dependency>
    <groupId>io.dropwizard.metrics</groupId>
    <artifactId>metrics-core</artifactId>
</dependency>

记录实例数据的缓存类:

@Log4j2
public class ServiceInstanceMetrics {
    private static final String CALLING = "-Calling";
    private static final String FAILED = "-Failed";

    private MetricRegistry metricRegistry;

    ServiceInstanceMetrics() {}

    public ServiceInstanceMetrics(MetricRegistry metricRegistry) {this.metricRegistry = metricRegistry;}

    /**
     * 记录调用实例
     * @param serviceInstance
     */
    public void recordServiceInstanceCall(ServiceInstance serviceInstance) {String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
        metricRegistry.counter(key + CALLING).inc();}
    /**
     * 记录调用实例完结
     * @param serviceInstance
     * @param isSuccess 是否胜利
     */
    public void recordServiceInstanceCalled(ServiceInstance serviceInstance, boolean isSuccess) {String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
        metricRegistry.counter(key + CALLING).dec();
        if (!isSuccess) {
            // 不胜利则记录失败
            metricRegistry.meter(key + FAILED).mark();}
    }

    /**
     * 获取正在运行的调用次数
     * @param serviceInstance
     * @return
     */
    public long getCalling(ServiceInstance serviceInstance) {String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
        long count = metricRegistry.counter(key + CALLING).getCount();
        log.debug("ServiceInstanceMetrics-getCalling: {} -> {}", key, count);
        return count;
    }

    /**
     * 获取最近一分钟调用失败次数分钟速率,其实是滑动平均数
     * @param serviceInstance
     * @return
     */
    public double getFailedInRecentOneMin(ServiceInstance serviceInstance) {String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
        double rate = metricRegistry.meter(key + FAILED).getOneMinuteRate();
        log.debug("ServiceInstanceMetrics-getFailedInRecentOneMin: {} -> {}", key, rate);
        return rate;
    }
}

负载平衡外围代码:

private final LoadingCache<Long, Set<String>> calledIpPrefixes = Caffeine.newBuilder()
        .expireAfterAccess(3, TimeUnit.MINUTES)
        .build(k -> Sets.newConcurrentHashSet());
private final String serviceId;
private final Tracer tracer;
private final ServiceInstanceMetrics serviceInstanceMetrics;

// 每次重试,其实都会调用这个 choose 办法从新获取一个实例
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {Span span = tracer.currentSpan();
    return serviceInstanceListSupplier.get().next()
            .map(serviceInstances -> {
                // 放弃 span 和调用 choose 的 span 一样
                try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {return getInstanceResponse(serviceInstances);
                }
            });
}


private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {if (serviceInstances.isEmpty()) {log.warn("No servers available for service:" + this.serviceId);
        return new EmptyResponse();}
    // 读取 spring-cloud-sleuth 的对于以后申请的链路追踪上下文,获取对应的 traceId
    Span currentSpan = tracer.currentSpan();
    if (currentSpan == null) {currentSpan = tracer.newTrace();
    }
    long l = currentSpan.context().traceId();
    return getInstanceResponseByRoundRobin(l, serviceInstances);
}

@VisibleForTesting
public Response<ServiceInstance> getInstanceResponseByRoundRobin(long traceId, List<ServiceInstance> serviceInstances) {
    // 首先随机打乱列表中实例的程序
    Collections.shuffle(serviceInstances);
    // 须要先将所有参数缓存起来,否则 comparator 会调用屡次,并且可能在排序过程中参数产生扭转(针对实例的申请统计数据始终在并发扭转)Map<ServiceInstance, Integer> used = Maps.newHashMap();
    Map<ServiceInstance, Long> callings = Maps.newHashMap();
    Map<ServiceInstance, Double> failedInRecentOneMin = Maps.newHashMap();
    serviceInstances = serviceInstances.stream().sorted(
            Comparator
                    // 之前曾经调用过的网段,这里排前面
                    .<ServiceInstance>comparingInt(serviceInstance -> {
                        return used.computeIfAbsent(serviceInstance, k -> {return calledIpPrefixes.get(traceId).stream().anyMatch(prefix -> {return serviceInstance.getHost().contains(prefix);
                            }) ? 1 : 0;
                        });
                    })
                    // 以后错误率起码的
                    .thenComparingDouble(serviceInstance -> {
                        return failedInRecentOneMin.computeIfAbsent(serviceInstance, k -> {double value = serviceInstanceMetrics.getFailedInRecentOneMin(serviceInstance);
                            // 因为应用的是挪动平均值(EMA),须要疏忽过小的差别(保留两位小数,不是四舍五入,而是间接舍弃)return ((int) (value * 100)) / 100.0;
                        });
                    })
                    // 以后负载申请起码的
                    .thenComparingLong(serviceInstance -> {
                        return callings.computeIfAbsent(serviceInstance, k ->
                                serviceInstanceMetrics.getCalling(serviceInstance)
                        );
                    })
    ).collect(Collectors.toList());
    if (serviceInstances.isEmpty()) {log.warn("No servers available for service:" + this.serviceId);
        return new EmptyResponse();}
    ServiceInstance serviceInstance = serviceInstances.get(0);
    // 记录本次返回的网段
    calledIpPrefixes.get(traceId).add(serviceInstance.getHost().substring(0, serviceInstance.getHost().lastIndexOf(".")));
    // 目前记录这个只为了兼容之前的单元测试(调用次数测试)positionCache.get(traceId).getAndIncrement();
    return new DefaultResponse(serviceInstance);
}

对于记录实例数据的缓存何时更新,是在 FeignClient 粘合重试,断路以及线程隔离的代码中的,这个咱们下一节就会看到。

一些组内对于方案设计的取舍 Q&A

1. 为何没有应用所有微服务共享的缓存来保留调用数据,来让这些数据更加精确

共享缓存的可选计划包含将这些数据记录放入 Redis,或者是 Apache Ignite 这样的内存网格中。然而有两个问题:

  1. 如果数据记录放入 Redis 这样的额定存储,如果 Redis 不可用会导致所有的负载平衡都无奈执行。如果放入 Apache Ignite,如果对应的节点下线,那么对应的负载平衡也无奈执行。这些都是不能承受的。
  2. 假如微服务 A 须要调用微服务 B,可能 A 的某个实例调用 B 的某个实例有问题,然而 A 的其余实例调用 B 的这个实例却没有问题,例如当某个可用区与另一个可用区网络拥塞的时候。如果用同一个缓存 Key 记录 A 所有的实例调用 B 这个实例的数据,显然是不精确的。

每个微服务应用本地缓存,记录本人调用其余实例的数据,在咱们这里看来,不仅是更容易实现,也是更精确的做法。

2. 采纳 EMA 的形式而不是申请窗口的形式统计最近错误率

采纳申请窗口的形式统计,必定是最精确的,例如咱们统计最近一分钟的错误率,就将最近一分钟的申请缓存起来,读取的时候,将缓存起来的申请数据加在一起取平均数即可。然而这种形式在申请突增的时候,可能会占用很多很多内存来缓存这些申请。同时计算错误率的时候,随着缓存申请数的增多也会耗费更大量的 CPU 进行计算。这样做很不值得。

EMA 这种滑动平均值的计算形式,常见于各种性能监控统计场景,例如 JVM 中 TLAB 大小的动静计算,G1 GC Region 大小的伸缩以及其余很多 JVM 须要动静得出适合值的中央,都用这种计算形式。他不必将申请缓存起来,而是间接用最新值乘以一个比例之后加上老值乘以(1 – 这个比例),这个比例个别高于 0.5,示意 EMA 和以后最新值更加相干。

然而 EMA 也带来另一个问题,咱们会发现随着程序运行小数点位数会十分多,会看到相似于如下的值:0.00000000123, 0.120000001, 0.120000003, 为了疏忽过于粗疏差别的影响(其实这些影响也来自于很久之前的谬误申请),咱们 只保留两位小数进行排序

微信搜寻“我的编程喵”关注公众号,每日一刷,轻松晋升技术,斩获各种 offer

退出移动版