序
本文主要研究一下 rsocket load balancer 的 Ewma
Moving Average
SMA
SMA(Simple Moving Average),即简单移动平均,其公式如下:
SMAt = (Pt + Pt-1 + Pt-2 + Pt-3 + … + Pt-n+1) / n
这里的 Pt 到为 Pt-n+ 1 为最近的 n 个数据
WMA
WMA(Weighted Moving Average),即加权移动平均,其公式如下:
WMAt = (Pt * Wt) + (Pt-1 * Wt-1) + … + (Pt-n+1 * Wt-n+1)
WMA 会给最近的 n 个数据加上权重,其中这些权重加起来和为 1,一般是较近的数据权重比较大
EMA 或 EWMA
EMA(Exponentially Moving Average) 指数移动平均或 EWMA(Exponentially Weighted Moving Average) 指数加权移动平均,其公式如下:
EMAt = (Pt * S) + (1- S) * EMAt-1
它有一个 S 参数为平滑指数,一般是取 2 /(N+1)
Ewma
rsocket-load-balancer-0.12.1-sources.jar!/io/rsocket/stat/Ewma.java
public class Ewma {
private final long tau;
private volatile long stamp;
private volatile double ewma;
public Ewma(long halfLife, TimeUnit unit, double initialValue) {
this.tau = Clock.unit().convert((long) (halfLife / Math.log(2)), unit);
stamp = 0L;
ewma = initialValue;
}
public synchronized void insert(double x) {
long now = Clock.now();
double elapsed = Math.max(0, now – stamp);
stamp = now;
double w = Math.exp(-elapsed / tau);
ewma = w * ewma + (1.0 – w) * x;
}
public synchronized void reset(double value) {
stamp = 0L;
ewma = value;
}
public double value() {
return ewma;
}
@Override
public String toString() {
return “Ewma(value=” + ewma + “, age=” + (Clock.now() – stamp) + “)”;
}
}
Ewma 的构造器需要指定 halfLife、timeunit、initialValue(ewma 初始值) 参数;ewma = w ewma + (1.0 – w) x,其中 x 为当前值,w 为权重
权重 w = Math.exp(-elapsed / tau),即 e 的 -elapsed / tau 次方;elapsed 为距离上次计算的时间长度;tau(希腊字母) 为 EWMA 的时间常量
这里的 tau = halfLife / Math.log(2) 根据 timeunit 转换后的值;其中 halfLife 参数,代表 speed of convergence,即收敛的速度
RSocketSupplier
rsocket-load-balancer-0.12.1-sources.jar!/io/rsocket/client/filter/RSocketSupplier.java
public class RSocketSupplier implements Availability, Supplier<Mono<RSocket>>, Closeable {
private static final double EPSILON = 1e-4;
private Supplier<Mono<RSocket>> rSocketSupplier;
private final MonoProcessor<Void> onClose;
private final long tau;
private long stamp;
private final Ewma errorPercentage;
public RSocketSupplier(Supplier<Mono<RSocket>> rSocketSupplier, long halfLife, TimeUnit unit) {
this.rSocketSupplier = rSocketSupplier;
this.tau = Clock.unit().convert((long) (halfLife / Math.log(2)), unit);
this.stamp = Clock.now();
this.errorPercentage = new Ewma(halfLife, unit, 1.0);
this.onClose = MonoProcessor.create();
}
public RSocketSupplier(Supplier<Mono<RSocket>> rSocketSupplier) {
this(rSocketSupplier, 5, TimeUnit.SECONDS);
}
@Override
public double availability() {
double e = errorPercentage.value();
if (Clock.now() – stamp > tau) {
// If the window is expired artificially increase the availability
double a = Math.min(1.0, e + 0.5);
errorPercentage.reset(a);
}
if (e < EPSILON) {
e = 0.0;
} else if (1.0 – EPSILON < e) {
e = 1.0;
}
return e;
}
private synchronized void updateErrorPercentage(double value) {
errorPercentage.insert(value);
stamp = Clock.now();
}
@Override
public Mono<RSocket> get() {
return rSocketSupplier
.get()
.doOnNext(o -> updateErrorPercentage(1.0))
.doOnError(t -> updateErrorPercentage(0.0))
.map(AvailabilityAwareRSocketProxy::new);
}
@Override
public void dispose() {
onClose.onComplete();
}
@Override
public boolean isDisposed() {
return onClose.isDisposed();
}
@Override
public Mono<Void> onClose() {
return onClose;
}
private class AvailabilityAwareRSocketProxy extends RSocketProxy {
public AvailabilityAwareRSocketProxy(RSocket source) {
super(source);
onClose.doFinally(signalType -> source.dispose()).subscribe();
}
@Override
public Mono<Void> fireAndForget(Payload payload) {
return source
.fireAndForget(payload)
.doOnError(t -> errorPercentage.insert(0.0))
.doOnSuccess(v -> updateErrorPercentage(1.0));
}
@Override
public Mono<Payload> requestResponse(Payload payload) {
return source
.requestResponse(payload)
.doOnError(t -> errorPercentage.insert(0.0))
.doOnSuccess(p -> updateErrorPercentage(1.0));
}
@Override
public Flux<Payload> requestStream(Payload payload) {
return source
.requestStream(payload)
.doOnError(th -> errorPercentage.insert(0.0))
.doOnComplete(() -> updateErrorPercentage(1.0));
}
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return source
.requestChannel(payloads)
.doOnError(th -> errorPercentage.insert(0.0))
.doOnComplete(() -> updateErrorPercentage(1.0));
}
@Override
public Mono<Void> metadataPush(Payload payload) {
return source
.metadataPush(payload)
.doOnError(t -> errorPercentage.insert(0.0))
.doOnSuccess(v -> updateErrorPercentage(1.0));
}
@Override
public double availability() {
// If the window is expired set success and failure to zero and return
// the child availability
if (Clock.now() – stamp > tau) {
updateErrorPercentage(1.0);
}
return source.availability() * errorPercentage.value();
}
}
}
RSocketSupplier 实现了 Availability、Supplier、Closeable 接口,其中它定义了 errorPercentage 变量,其类型为 Ewma;如果没有指定 halfLife 值,则 RSocketSupplier 默认 halfLife 为 5 秒,ewma 的初始值为 1.0
RSocketSupplier 定义了一个常量 EPSILON = 1e-4,其 availability 方法会先计算 availability,然后在距离上次计算时间 stamp 超过 tau 值时会重置 errorPercentage;之后当 availability 小于 EPSILON 时返回 0,当 availability + EPSILON 大于 1 时返回 1.0
updateErrorPercentage 方法用于往 ewma 插入新值,同时更新 stamp;get 方法的 doOnNext 方法 updateErrorPercentage 值为 1.0,doOnError 方法 updateErrorPercentage 值为 0.0;map 会将 RSocket 转换为 AvailabilityAwareRSocketProxy;AvailabilityAwareRSocketProxy 对目标 RSocket 进行代理,对相关方法的 doOnError 及 doOnSuccess 都织入 errorPercentage 的统计
小结
Moving Average 有好几种算法,包括 SMA(Simple Moving Average)、WMA(Weighted Moving Average)、EMA(Exponentially Moving Average) 或 EWMA(Exponentially Weighted Moving Average)
Ewma 的构造器需要指定 halfLife、timeunit、initialValue(ewma 初始值) 参数;ewma = w ewma + (1.0 – w) x,其中 x 为当前值,w 为权重;权重 w = Math.exp(-elapsed / tau),即 e 的 -elapsed / tau 次方;elapsed 为距离上次计算的时间长度;tau(希腊字母) 为 EWMA 的时间常量;这里的 tau = halfLife / Math.log(2) 根据 timeunit 转换后的值;其中 halfLife 参数,代表 speed of convergence,即收敛的速度
rsocket load balancer 使用了 Ewma 了统计服务的 availability;其中 RSocketSupplier 实现了 Availability、Supplier、Closeable 接口,其中它定义了 errorPercentage 变量,其类型为 Ewma;如果没有指定 halfLife 值,则 RSocketSupplier 默认 halfLife 为 5 秒,ewma 的初始值为 1.0;RSocketSupplier 的 get 方法会将 RSocket 转换为 AvailabilityAwareRSocketProxy,而 AvailabilityAwareRSocketProxy 则会对目标 RSocket 进行代理,对相关方法的 doOnError 及 doOnSuccess 都织入 errorPercentage 的统计
doc
Simple Moving Average – SMA Definition
Weighted Moving Averages: The Basics
Exponential Moving Average – EMA Definition
How Is the Exponential Moving Average (EMA) Formula Calculated?
Moving Average, Weighted Moving Average, and Exponential Moving Average
Exploring the Exponentially Weighted Moving Average
EWMA 移动平均模型
rsocket EWMA