共计 8879 个字符,预计需要花费 23 分钟才能阅读完成。
本系列代码地址:https://github.com/JoJoTec/sp…
在后面一节,咱们梳理了实现 Feign 断路器以及线程隔离的思路,这一节,咱们先不看如何源码实现(因为源码中会蕴含负载平衡算法的改良局部),先来探讨下如何优化目前的负载平衡算法。
之前的负载平衡算法
- 获取服务实例列表,将实例列表依照 ip 端口排序,如果不排序即便 position 是下一个可能也代表的是之前曾经调用过的实例
- 依据申请中的 traceId,从 本地缓存中以 traceId 为 key 获取一个初始值为随机数的原子变量 position,这样避免所有申请都从第一个实例开始调用,之后第二个、第三个这样。
- position 原子加一,之后对实例个数取余,返回对应下标的实例进行调用
其中申请蕴含 traceId 是来自于咱们应用了 spring-cloud-sleuth 链路追踪,基于这种机制咱们能保障申请不会重试到之前曾经调用过的实例。源码是:
// 肯定必须是实现 ReactorServiceInstanceLoadBalancer
// 而不是 ReactorLoadBalancer<ServiceInstance>
// 因为注册的时候是 ReactorServiceInstanceLoadBalancer
@Log4j2
public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private final ServiceInstanceListSupplier serviceInstanceListSupplier;
// 每次申请算上重试不会超过 1 分钟
// 对于超过 1 分钟的,这种申请必定比拟重,不应该重试
private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
// 随机初始值,避免每次都是从第一个开始调用
.build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000)));
private final String serviceId;
private final Tracer tracer;
public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) {
this.serviceInstanceListSupplier = serviceInstanceListSupplier;
this.serviceId = serviceId;
this.tracer = tracer;
}
// 每次重试,其实都会调用这个 choose 办法从新获取一个实例
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances));
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {if (serviceInstances.isEmpty()) {log.warn("No servers available for service:" + this.serviceId);
return new EmptyResponse();}
return getInstanceResponseByRoundRobin(serviceInstances);
}
private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) {if (serviceInstances.isEmpty()) {log.warn("No servers available for service:" + this.serviceId);
return new EmptyResponse();}
// 为了解决原始算法不同调用并发可能导致一个申请重试雷同的实例
// 从 sleuth 的 Tracer 中获取以后申请的上下文
Span currentSpan = tracer.currentSpan();
// 如果上下文不存在,则可能不是前端用户申请,而是其余某些机制触发,咱们就创立一个新的上下文
if (currentSpan == null) {currentSpan = tracer.newTrace();
}
// 从申请上下文中获取申请的 traceId,用来惟一标识一个申请
long l = currentSpan.context().traceId();
AtomicInteger seed = positionCache.get(l);
int s = seed.getAndIncrement();
int pos = s % serviceInstances.size();
log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size());
return new DefaultResponse(serviceInstances.stream()
// 实例返回列表程序可能不同,为了保持一致,先排序再取
.sorted(Comparator.comparing(ServiceInstance::getInstanceId))
.collect(Collectors.toList()).get(pos));
}
}
然而在这次申请突增很多的时候,这种负载平衡算法还是给咱们带来了问题。
首先,本次突增,咱们并没有采取扩容,导致 本次的性能压力对于压力的平衡散布十分敏感 。举个例子是,假如微服务 A 有 9 个实例,在业务顶峰点来的时候, 最现实的状况是保障无论何时这 9 个负载压力都齐全平衡 ,然而因为咱们应用了初始值为随机数的原子变量 position,尽管从一天的总量上来看,负责平衡压力必定是平衡,然而在某一小段时间内,很可能 压力全都跑到了某几个实例上,导致这几个实例被压垮,熔断,而后又都跑到了另外的几个实例上,又被压垮,熔断,如此恶性循环。
而后,咱们部署采纳的是 k8s 部署,同一个虚拟机下面可能会跑很多微服务的 pod。在某些状况下,同一个微服务的多个 pod 可能会跑到同一个虚拟机 Node 上,这个能够从 pod 的 ip 网段上看进去:例如某个微服务有如下 7 个实例:10.238.13.12:8181,10.238.13.24:8181,10.238.15.12:8181,10.238.17.12:8181,10.238.20.220:8181,10.238.21.31:8181,10.238.21.121:8181,那么 10.238.13.12:8181 与 10.238.13.24:8181 很可能在同一个 Node 上,10.238.21.31:8181 和 10.238.21.121:8181 很可能在同一个 Node 上。咱们重试, 须要优先重试与之前重试过的实例尽量不在同一个 Node 上的实例,因为同一个 Node 上的实例只有有一个有问题或者压力过大,其余的基本上也有问题或者压力过大。
最初,如果调用某个实例始终失败,那么这个实例的调用优先级须要排在其余失常的实例前面。这个对于缩小疾速刷新公布(一下子启动很多实例之后停掉多个老实例,实例个数大于重试次数配置)对于用户的影响,以及某个可用区忽然产生异样导致多个实例下线对用户的影响,以及业务压力曾经过来,压力变小后,须要关掉不再须要的实例,导致大量实例产生迁徙的时候对用户的影响,有很大的作用。
针对以上问题的优化计划
咱们针对下面三个问题,提出了一种优化后的解决方案:
-
针对每次申请,记录:
- 本次申请曾经调用过哪些实例 -> 申请调用过的实例缓存
- 调用的实例,以后有多少申请在解决中 -> 实例运行申请数
- 调用的实例,最近申请错误率 -> 实例申请错误率
- 随机将实例列表打乱,避免在以上三个指标都雷同时,总是将申请发给同一个实例。
- 依照 以后申请没有调用过靠前 -> 错误率越小越靠前 的程序排序 -> 实例运行申请数越小越靠前
- 取排好序之后的列表第一个实例作为本次负载平衡的实例
具体实现是:以下的代码来自于:https://github.com/JoJoTec/sp…
咱们应用了依赖:
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
记录实例数据的缓存类:
@Log4j2
public class ServiceInstanceMetrics {
private static final String CALLING = "-Calling";
private static final String FAILED = "-Failed";
private MetricRegistry metricRegistry;
ServiceInstanceMetrics() {}
public ServiceInstanceMetrics(MetricRegistry metricRegistry) {this.metricRegistry = metricRegistry;}
/**
* 记录调用实例
* @param serviceInstance
*/
public void recordServiceInstanceCall(ServiceInstance serviceInstance) {String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
metricRegistry.counter(key + CALLING).inc();}
/**
* 记录调用实例完结
* @param serviceInstance
* @param isSuccess 是否胜利
*/
public void recordServiceInstanceCalled(ServiceInstance serviceInstance, boolean isSuccess) {String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
metricRegistry.counter(key + CALLING).dec();
if (!isSuccess) {
// 不胜利则记录失败
metricRegistry.meter(key + FAILED).mark();}
}
/**
* 获取正在运行的调用次数
* @param serviceInstance
* @return
*/
public long getCalling(ServiceInstance serviceInstance) {String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
long count = metricRegistry.counter(key + CALLING).getCount();
log.debug("ServiceInstanceMetrics-getCalling: {} -> {}", key, count);
return count;
}
/**
* 获取最近一分钟调用失败次数分钟速率,其实是滑动平均数
* @param serviceInstance
* @return
*/
public double getFailedInRecentOneMin(ServiceInstance serviceInstance) {String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
double rate = metricRegistry.meter(key + FAILED).getOneMinuteRate();
log.debug("ServiceInstanceMetrics-getFailedInRecentOneMin: {} -> {}", key, rate);
return rate;
}
}
负载平衡外围代码:
private final LoadingCache<Long, Set<String>> calledIpPrefixes = Caffeine.newBuilder()
.expireAfterAccess(3, TimeUnit.MINUTES)
.build(k -> Sets.newConcurrentHashSet());
private final String serviceId;
private final Tracer tracer;
private final ServiceInstanceMetrics serviceInstanceMetrics;
// 每次重试,其实都会调用这个 choose 办法从新获取一个实例
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {Span span = tracer.currentSpan();
return serviceInstanceListSupplier.get().next()
.map(serviceInstances -> {
// 放弃 span 和调用 choose 的 span 一样
try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {return getInstanceResponse(serviceInstances);
}
});
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {if (serviceInstances.isEmpty()) {log.warn("No servers available for service:" + this.serviceId);
return new EmptyResponse();}
// 读取 spring-cloud-sleuth 的对于以后申请的链路追踪上下文,获取对应的 traceId
Span currentSpan = tracer.currentSpan();
if (currentSpan == null) {currentSpan = tracer.newTrace();
}
long l = currentSpan.context().traceId();
return getInstanceResponseByRoundRobin(l, serviceInstances);
}
@VisibleForTesting
public Response<ServiceInstance> getInstanceResponseByRoundRobin(long traceId, List<ServiceInstance> serviceInstances) {
// 首先随机打乱列表中实例的程序
Collections.shuffle(serviceInstances);
// 须要先将所有参数缓存起来,否则 comparator 会调用屡次,并且可能在排序过程中参数产生扭转(针对实例的申请统计数据始终在并发扭转)Map<ServiceInstance, Integer> used = Maps.newHashMap();
Map<ServiceInstance, Long> callings = Maps.newHashMap();
Map<ServiceInstance, Double> failedInRecentOneMin = Maps.newHashMap();
serviceInstances = serviceInstances.stream().sorted(
Comparator
// 之前曾经调用过的网段,这里排前面
.<ServiceInstance>comparingInt(serviceInstance -> {
return used.computeIfAbsent(serviceInstance, k -> {return calledIpPrefixes.get(traceId).stream().anyMatch(prefix -> {return serviceInstance.getHost().contains(prefix);
}) ? 1 : 0;
});
})
// 以后错误率起码的
.thenComparingDouble(serviceInstance -> {
return failedInRecentOneMin.computeIfAbsent(serviceInstance, k -> {double value = serviceInstanceMetrics.getFailedInRecentOneMin(serviceInstance);
// 因为应用的是挪动平均值(EMA),须要疏忽过小的差别(保留两位小数,不是四舍五入,而是间接舍弃)return ((int) (value * 100)) / 100.0;
});
})
// 以后负载申请起码的
.thenComparingLong(serviceInstance -> {
return callings.computeIfAbsent(serviceInstance, k ->
serviceInstanceMetrics.getCalling(serviceInstance)
);
})
).collect(Collectors.toList());
if (serviceInstances.isEmpty()) {log.warn("No servers available for service:" + this.serviceId);
return new EmptyResponse();}
ServiceInstance serviceInstance = serviceInstances.get(0);
// 记录本次返回的网段
calledIpPrefixes.get(traceId).add(serviceInstance.getHost().substring(0, serviceInstance.getHost().lastIndexOf(".")));
// 目前记录这个只为了兼容之前的单元测试(调用次数测试)positionCache.get(traceId).getAndIncrement();
return new DefaultResponse(serviceInstance);
}
对于记录实例数据的缓存何时更新,是在 FeignClient 粘合重试,断路以及线程隔离的代码中的,这个咱们下一节就会看到。
一些组内对于方案设计的取舍 Q&A
1. 为何没有应用所有微服务共享的缓存来保留调用数据,来让这些数据更加精确?
共享缓存的可选计划包含将这些数据记录放入 Redis,或者是 Apache Ignite 这样的内存网格中。然而有两个问题:
- 如果数据记录放入 Redis 这样的额定存储,如果 Redis 不可用会导致所有的负载平衡都无奈执行。如果放入 Apache Ignite,如果对应的节点下线,那么对应的负载平衡也无奈执行。这些都是不能承受的。
- 假如微服务 A 须要调用微服务 B,可能 A 的某个实例调用 B 的某个实例有问题,然而 A 的其余实例调用 B 的这个实例却没有问题,例如当某个可用区与另一个可用区网络拥塞的时候。如果用同一个缓存 Key 记录 A 所有的实例调用 B 这个实例的数据,显然是不精确的。
每个微服务应用本地缓存,记录本人调用其余实例的数据,在咱们这里看来,不仅是更容易实现,也是更精确的做法。
2. 采纳 EMA 的形式而不是申请窗口的形式统计最近错误率
采纳申请窗口的形式统计,必定是最精确的,例如咱们统计最近一分钟的错误率,就将最近一分钟的申请缓存起来,读取的时候,将缓存起来的申请数据加在一起取平均数即可。然而这种形式在申请突增的时候,可能会占用很多很多内存来缓存这些申请。同时计算错误率的时候,随着缓存申请数的增多也会耗费更大量的 CPU 进行计算。这样做很不值得。
EMA 这种滑动平均值的计算形式,常见于各种性能监控统计场景,例如 JVM 中 TLAB 大小的动静计算,G1 GC Region 大小的伸缩以及其余很多 JVM 须要动静得出适合值的中央,都用这种计算形式。他不必将申请缓存起来,而是间接用最新值乘以一个比例之后加上老值乘以(1 – 这个比例),这个比例个别高于 0.5,示意 EMA 和以后最新值更加相干。
然而 EMA 也带来另一个问题,咱们会发现随着程序运行小数点位数会十分多,会看到相似于如下的值:0.00000000123, 0.120000001, 0.120000003, 为了疏忽过于粗疏差别的影响(其实这些影响也来自于很久之前的谬误申请),咱们 只保留两位小数进行排序。
微信搜寻“我的编程喵”关注公众号,每日一刷,轻松晋升技术,斩获各种 offer: