聊聊rsocket load balancer的Ewma

44次阅读

共计 5772 个字符,预计需要花费 15 分钟才能阅读完成。


本文主要研究一下 rsocket load balancer 的 Ewma
Moving Average
SMA
SMA(Simple Moving Average),即简单移动平均,其公式如下:
SMAt = (Pt + Pt-1 + Pt-2 + Pt-3 + … + Pt-n+1) / n
这里的 Pt 到为 Pt-n+ 1 为最近的 n 个数据
WMA
WMA(Weighted Moving Average),即加权移动平均,其公式如下:
WMAt = (Pt * Wt) + (Pt-1 * Wt-1) + … + (Pt-n+1 * Wt-n+1)
WMA 会给最近的 n 个数据加上权重,其中这些权重加起来和为 1,一般是较近的数据权重比较大
EMA 或 EWMA
EMA(Exponentially Moving Average) 指数移动平均或 EWMA(Exponentially Weighted Moving Average) 指数加权移动平均,其公式如下:
EMAt = (Pt * S) + (1- S) * EMAt-1
它有一个 S 参数为平滑指数,一般是取 2 /(N+1)
Ewma
rsocket-load-balancer-0.12.1-sources.jar!/io/rsocket/stat/Ewma.java
public class Ewma {
private final long tau;
private volatile long stamp;
private volatile double ewma;

public Ewma(long halfLife, TimeUnit unit, double initialValue) {
this.tau = Clock.unit().convert((long) (halfLife / Math.log(2)), unit);
stamp = 0L;
ewma = initialValue;
}

public synchronized void insert(double x) {
long now = Clock.now();
double elapsed = Math.max(0, now – stamp);
stamp = now;

double w = Math.exp(-elapsed / tau);
ewma = w * ewma + (1.0 – w) * x;
}

public synchronized void reset(double value) {
stamp = 0L;
ewma = value;
}

public double value() {
return ewma;
}

@Override
public String toString() {
return “Ewma(value=” + ewma + “, age=” + (Clock.now() – stamp) + “)”;
}
}

Ewma 的构造器需要指定 halfLife、timeunit、initialValue(ewma 初始值) 参数;ewma = w ewma + (1.0 – w) x,其中 x 为当前值,w 为权重
权重 w = Math.exp(-elapsed / tau),即 e 的 -elapsed / tau 次方;elapsed 为距离上次计算的时间长度;tau(希腊字母) 为 EWMA 的时间常量
这里的 tau = halfLife / Math.log(2) 根据 timeunit 转换后的值;其中 halfLife 参数,代表 speed of convergence,即收敛的速度

RSocketSupplier
rsocket-load-balancer-0.12.1-sources.jar!/io/rsocket/client/filter/RSocketSupplier.java
public class RSocketSupplier implements Availability, Supplier<Mono<RSocket>>, Closeable {

private static final double EPSILON = 1e-4;

private Supplier<Mono<RSocket>> rSocketSupplier;

private final MonoProcessor<Void> onClose;

private final long tau;
private long stamp;
private final Ewma errorPercentage;

public RSocketSupplier(Supplier<Mono<RSocket>> rSocketSupplier, long halfLife, TimeUnit unit) {
this.rSocketSupplier = rSocketSupplier;
this.tau = Clock.unit().convert((long) (halfLife / Math.log(2)), unit);
this.stamp = Clock.now();
this.errorPercentage = new Ewma(halfLife, unit, 1.0);
this.onClose = MonoProcessor.create();
}

public RSocketSupplier(Supplier<Mono<RSocket>> rSocketSupplier) {
this(rSocketSupplier, 5, TimeUnit.SECONDS);
}

@Override
public double availability() {
double e = errorPercentage.value();
if (Clock.now() – stamp > tau) {
// If the window is expired artificially increase the availability
double a = Math.min(1.0, e + 0.5);
errorPercentage.reset(a);
}
if (e < EPSILON) {
e = 0.0;
} else if (1.0 – EPSILON < e) {
e = 1.0;
}

return e;
}

private synchronized void updateErrorPercentage(double value) {
errorPercentage.insert(value);
stamp = Clock.now();
}

@Override
public Mono<RSocket> get() {
return rSocketSupplier
.get()
.doOnNext(o -> updateErrorPercentage(1.0))
.doOnError(t -> updateErrorPercentage(0.0))
.map(AvailabilityAwareRSocketProxy::new);
}

@Override
public void dispose() {
onClose.onComplete();
}

@Override
public boolean isDisposed() {
return onClose.isDisposed();
}

@Override
public Mono<Void> onClose() {
return onClose;
}

private class AvailabilityAwareRSocketProxy extends RSocketProxy {
public AvailabilityAwareRSocketProxy(RSocket source) {
super(source);

onClose.doFinally(signalType -> source.dispose()).subscribe();
}

@Override
public Mono<Void> fireAndForget(Payload payload) {
return source
.fireAndForget(payload)
.doOnError(t -> errorPercentage.insert(0.0))
.doOnSuccess(v -> updateErrorPercentage(1.0));
}

@Override
public Mono<Payload> requestResponse(Payload payload) {
return source
.requestResponse(payload)
.doOnError(t -> errorPercentage.insert(0.0))
.doOnSuccess(p -> updateErrorPercentage(1.0));
}

@Override
public Flux<Payload> requestStream(Payload payload) {
return source
.requestStream(payload)
.doOnError(th -> errorPercentage.insert(0.0))
.doOnComplete(() -> updateErrorPercentage(1.0));
}

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return source
.requestChannel(payloads)
.doOnError(th -> errorPercentage.insert(0.0))
.doOnComplete(() -> updateErrorPercentage(1.0));
}

@Override
public Mono<Void> metadataPush(Payload payload) {
return source
.metadataPush(payload)
.doOnError(t -> errorPercentage.insert(0.0))
.doOnSuccess(v -> updateErrorPercentage(1.0));
}

@Override
public double availability() {
// If the window is expired set success and failure to zero and return
// the child availability
if (Clock.now() – stamp > tau) {
updateErrorPercentage(1.0);
}
return source.availability() * errorPercentage.value();
}
}
}

RSocketSupplier 实现了 Availability、Supplier、Closeable 接口,其中它定义了 errorPercentage 变量,其类型为 Ewma;如果没有指定 halfLife 值,则 RSocketSupplier 默认 halfLife 为 5 秒,ewma 的初始值为 1.0
RSocketSupplier 定义了一个常量 EPSILON = 1e-4,其 availability 方法会先计算 availability,然后在距离上次计算时间 stamp 超过 tau 值时会重置 errorPercentage;之后当 availability 小于 EPSILON 时返回 0,当 availability + EPSILON 大于 1 时返回 1.0
updateErrorPercentage 方法用于往 ewma 插入新值,同时更新 stamp;get 方法的 doOnNext 方法 updateErrorPercentage 值为 1.0,doOnError 方法 updateErrorPercentage 值为 0.0;map 会将 RSocket 转换为 AvailabilityAwareRSocketProxy;AvailabilityAwareRSocketProxy 对目标 RSocket 进行代理,对相关方法的 doOnError 及 doOnSuccess 都织入 errorPercentage 的统计

小结

Moving Average 有好几种算法,包括 SMA(Simple Moving Average)、WMA(Weighted Moving Average)、EMA(Exponentially Moving Average) 或 EWMA(Exponentially Weighted Moving Average)
Ewma 的构造器需要指定 halfLife、timeunit、initialValue(ewma 初始值) 参数;ewma = w ewma + (1.0 – w) x,其中 x 为当前值,w 为权重;权重 w = Math.exp(-elapsed / tau),即 e 的 -elapsed / tau 次方;elapsed 为距离上次计算的时间长度;tau(希腊字母) 为 EWMA 的时间常量;这里的 tau = halfLife / Math.log(2) 根据 timeunit 转换后的值;其中 halfLife 参数,代表 speed of convergence,即收敛的速度
rsocket load balancer 使用了 Ewma 了统计服务的 availability;其中 RSocketSupplier 实现了 Availability、Supplier、Closeable 接口,其中它定义了 errorPercentage 变量,其类型为 Ewma;如果没有指定 halfLife 值,则 RSocketSupplier 默认 halfLife 为 5 秒,ewma 的初始值为 1.0;RSocketSupplier 的 get 方法会将 RSocket 转换为 AvailabilityAwareRSocketProxy,而 AvailabilityAwareRSocketProxy 则会对目标 RSocket 进行代理,对相关方法的 doOnError 及 doOnSuccess 都织入 errorPercentage 的统计

doc

Simple Moving Average – SMA Definition
Weighted Moving Averages: The Basics
Exponential Moving Average – EMA Definition
How Is the Exponential Moving Average (EMA) Formula Calculated?
Moving Average, Weighted Moving Average, and Exponential Moving Average
Exploring the Exponentially Weighted Moving Average
EWMA 移动平均模型
rsocket EWMA

正文完
 0