聊聊rsocket load balancer的Ewma

序本文主要研究一下rsocket load balancer的EwmaMoving AverageSMASMA(Simple Moving Average),即简单移动平均,其公式如下:SMAt = (Pt + Pt-1 + Pt-2 + Pt-3 + … + Pt-n+1) / n这里的Pt到为Pt-n+1为最近的n个数据WMAWMA(Weighted Moving Average),即加权移动平均,其公式如下:WMAt = (Pt * Wt) + (Pt-1 * Wt-1) + … + (Pt-n+1 * Wt-n+1)WMA会给最近的n个数据加上权重,其中这些权重加起来和为1,一般是较近的数据权重比较大EMA或EWMAEMA(Exponentially Moving Average)指数移动平均或EWMA(Exponentially Weighted Moving Average)指数加权移动平均,其公式如下:EMAt = (Pt * S) + (1- S) * EMAt-1它有一个S参数为平滑指数,一般是取2/(N+1)Ewmarsocket-load-balancer-0.12.1-sources.jar!/io/rsocket/stat/Ewma.javapublic class Ewma { private final long tau; private volatile long stamp; private volatile double ewma; public Ewma(long halfLife, TimeUnit unit, double initialValue) { this.tau = Clock.unit().convert((long) (halfLife / Math.log(2)), unit); stamp = 0L; ewma = initialValue; } public synchronized void insert(double x) { long now = Clock.now(); double elapsed = Math.max(0, now - stamp); stamp = now; double w = Math.exp(-elapsed / tau); ewma = w * ewma + (1.0 - w) * x; } public synchronized void reset(double value) { stamp = 0L; ewma = value; } public double value() { return ewma; } @Override public String toString() { return “Ewma(value=” + ewma + “, age=” + (Clock.now() - stamp) + “)”; }}Ewma的构造器需要指定halfLife、timeunit、initialValue(ewma初始值)参数;ewma = w ewma + (1.0 - w) x,其中x为当前值,w为权重权重w = Math.exp(-elapsed / tau),即e的-elapsed / tau次方;elapsed为距离上次计算的时间长度;tau(希腊字母)为EWMA的时间常量这里的tau = halfLife / Math.log(2)根据timeunit转换后的值;其中halfLife参数,代表speed of convergence,即收敛的速度RSocketSupplierrsocket-load-balancer-0.12.1-sources.jar!/io/rsocket/client/filter/RSocketSupplier.javapublic class RSocketSupplier implements Availability, Supplier<Mono<RSocket>>, Closeable { private static final double EPSILON = 1e-4; private Supplier<Mono<RSocket>> rSocketSupplier; private final MonoProcessor<Void> onClose; private final long tau; private long stamp; private final Ewma errorPercentage; public RSocketSupplier(Supplier<Mono<RSocket>> rSocketSupplier, long halfLife, TimeUnit unit) { this.rSocketSupplier = rSocketSupplier; this.tau = Clock.unit().convert((long) (halfLife / Math.log(2)), unit); this.stamp = Clock.now(); this.errorPercentage = new Ewma(halfLife, unit, 1.0); this.onClose = MonoProcessor.create(); } public RSocketSupplier(Supplier<Mono<RSocket>> rSocketSupplier) { this(rSocketSupplier, 5, TimeUnit.SECONDS); } @Override public double availability() { double e = errorPercentage.value(); if (Clock.now() - stamp > tau) { // If the window is expired artificially increase the availability double a = Math.min(1.0, e + 0.5); errorPercentage.reset(a); } if (e < EPSILON) { e = 0.0; } else if (1.0 - EPSILON < e) { e = 1.0; } return e; } private synchronized void updateErrorPercentage(double value) { errorPercentage.insert(value); stamp = Clock.now(); } @Override public Mono<RSocket> get() { return rSocketSupplier .get() .doOnNext(o -> updateErrorPercentage(1.0)) .doOnError(t -> updateErrorPercentage(0.0)) .map(AvailabilityAwareRSocketProxy::new); } @Override public void dispose() { onClose.onComplete(); } @Override public boolean isDisposed() { return onClose.isDisposed(); } @Override public Mono<Void> onClose() { return onClose; } private class AvailabilityAwareRSocketProxy extends RSocketProxy { public AvailabilityAwareRSocketProxy(RSocket source) { super(source); onClose.doFinally(signalType -> source.dispose()).subscribe(); } @Override public Mono<Void> fireAndForget(Payload payload) { return source .fireAndForget(payload) .doOnError(t -> errorPercentage.insert(0.0)) .doOnSuccess(v -> updateErrorPercentage(1.0)); } @Override public Mono<Payload> requestResponse(Payload payload) { return source .requestResponse(payload) .doOnError(t -> errorPercentage.insert(0.0)) .doOnSuccess(p -> updateErrorPercentage(1.0)); } @Override public Flux<Payload> requestStream(Payload payload) { return source .requestStream(payload) .doOnError(th -> errorPercentage.insert(0.0)) .doOnComplete(() -> updateErrorPercentage(1.0)); } @Override public Flux<Payload> requestChannel(Publisher<Payload> payloads) { return source .requestChannel(payloads) .doOnError(th -> errorPercentage.insert(0.0)) .doOnComplete(() -> updateErrorPercentage(1.0)); } @Override public Mono<Void> metadataPush(Payload payload) { return source .metadataPush(payload) .doOnError(t -> errorPercentage.insert(0.0)) .doOnSuccess(v -> updateErrorPercentage(1.0)); } @Override public double availability() { // If the window is expired set success and failure to zero and return // the child availability if (Clock.now() - stamp > tau) { updateErrorPercentage(1.0); } return source.availability() * errorPercentage.value(); } }}RSocketSupplier实现了Availability、Supplier、Closeable接口,其中它定义了errorPercentage变量,其类型为Ewma;如果没有指定halfLife值,则RSocketSupplier默认halfLife为5秒,ewma的初始值为1.0RSocketSupplier定义了一个常量EPSILON = 1e-4,其availability方法会先计算availability,然后在距离上次计算时间stamp超过tau值时会重置errorPercentage;之后当availability小于EPSILON时返回0,当availability + EPSILON大于1时返回1.0updateErrorPercentage方法用于往ewma插入新值,同时更新stamp;get方法的doOnNext方法updateErrorPercentage值为1.0,doOnError方法updateErrorPercentage值为0.0;map会将RSocket转换为AvailabilityAwareRSocketProxy;AvailabilityAwareRSocketProxy对目标RSocket进行代理,对相关方法的doOnError及doOnSuccess都织入errorPercentage的统计小结Moving Average有好几种算法,包括SMA(Simple Moving Average)、WMA(Weighted Moving Average)、EMA(Exponentially Moving Average)或EWMA(Exponentially Weighted Moving Average)Ewma的构造器需要指定halfLife、timeunit、initialValue(ewma初始值)参数;ewma = w ewma + (1.0 - w) x,其中x为当前值,w为权重;权重w = Math.exp(-elapsed / tau),即e的-elapsed / tau次方;elapsed为距离上次计算的时间长度;tau(希腊字母)为EWMA的时间常量;这里的tau = halfLife / Math.log(2)根据timeunit转换后的值;其中halfLife参数,代表speed of convergence,即收敛的速度rsocket load balancer使用了Ewma了统计服务的availability;其中RSocketSupplier实现了Availability、Supplier、Closeable接口,其中它定义了errorPercentage变量,其类型为Ewma;如果没有指定halfLife值,则RSocketSupplier默认halfLife为5秒,ewma的初始值为1.0;RSocketSupplier的get方法会将RSocket转换为AvailabilityAwareRSocketProxy,而AvailabilityAwareRSocketProxy则会对目标RSocket进行代理,对相关方法的doOnError及doOnSuccess都织入errorPercentage的统计docSimple Moving Average - SMA DefinitionWeighted Moving Averages: The BasicsExponential Moving Average - EMA DefinitionHow Is the Exponential Moving Average (EMA) Formula Calculated?Moving Average, Weighted Moving Average, and Exponential Moving AverageExploring the Exponentially Weighted Moving AverageEWMA 移动平均模型rsocket EWMA ...

April 12, 2019 · 3 min · jiezi

rsocket-java小试牛刀

序本文主要研究一下rsocket-javaRSocketrsocket-core-0.12.1-sources.jar!/io/rsocket/RSocket.javapublic interface RSocket extends Availability, Closeable { /** * Fire and Forget interaction model of {@code RSocket}. * * @param payload Request payload. * @return {@code Publisher} that completes when the passed {@code payload} is successfully * handled, otherwise errors. / Mono<Void> fireAndForget(Payload payload); /* * Request-Response interaction model of {@code RSocket}. * * @param payload Request payload. * @return {@code Publisher} containing at most a single {@code Payload} representing the * response. / Mono<Payload> requestResponse(Payload payload); /* * Request-Stream interaction model of {@code RSocket}. * * @param payload Request payload. * @return {@code Publisher} containing the stream of {@code Payload}s representing the response. / Flux<Payload> requestStream(Payload payload); /* * Request-Channel interaction model of {@code RSocket}. * * @param payloads Stream of request payloads. * @return Stream of response payloads. / Flux<Payload> requestChannel(Publisher<Payload> payloads); /* * Metadata-Push interaction model of {@code RSocket}. * * @param payload Request payloads. * @return {@code Publisher} that completes when the passed {@code payload} is successfully * handled, otherwise errors. */ Mono<Void> metadataPush(Payload payload); @Override default double availability() { return isDisposed() ? 0.0 : 1.0; }}RSocket接口继承了Availability(定义double availability()方法)及Closeable(定义了Mono<Void> onClose()方法)接口RSocket定义了fireAndForget、requestResponse、requestStream、requestChannel方法分别对应4种Interaction ModelRSocket的Frame包含metadata及data payload,其中metadata可选,可以用于描述data payload,因而RSocket还定义了metadataPush方法用于push metadataInteraction ModelfireAndForget @Test public void testFireAndForget() throws InterruptedException { //SERVER RSocketFactory.receive() .acceptor( (setupPayload, reactiveSocket) -> Mono.just( new AbstractRSocket() { @Override public Mono<Void> fireAndForget(Payload payload) { System.out.printf(“fire-forget: %s%n”, payload.getDataUtf8()); return Mono.empty(); } })) .transport(TcpServerTransport.create(“localhost”, 8080)) .start() .subscribe(); //CLIENT RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create(“localhost”, 8080)) .start() .block(); socket .fireAndForget(DefaultPayload.create(“Hello”)) .block(); socket.dispose(); TimeUnit.SECONDS.sleep(5); }类似udp,无需ack,比较适合metrics上报、访问日志上报等requestResponse @Test public void testRequestResponse(){ //SERVER RSocketFactory.receive() .acceptor( (setupPayload, reactiveSocket) -> Mono.just( new AbstractRSocket() { @Override public Mono<Payload> requestResponse(Payload p) { return Mono.just(p); } })) .transport(TcpServerTransport.create(“localhost”, 8080)) .start() .subscribe(); //CLIENT RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create(“localhost”, 8080)) .start() .block(); socket .requestResponse(DefaultPayload.create(“Hello”)) .map(Payload::getDataUtf8) .onErrorReturn(“error”) .doOnNext(System.out::println) .block(); socket.dispose(); }类似http,但是优于http,因为它是异步的,而且是multiplexedrequestStream @Test public void testRequestStream(){ //SERVER RSocketFactory.receive() .acceptor(new SocketAcceptor() { @Override public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) { return Mono.just( new AbstractRSocket() { @Override public Flux<Payload> requestStream(Payload payload) { return Flux.interval(Duration.ofMillis(100)) .map(aLong -> DefaultPayload.create(“Interval: " + aLong)); } }); } }) .transport(TcpServerTransport.create(“localhost”, 7000)) .start() .subscribe(); //CLIENT RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create(“localhost”, 7000)) .start() .block(); socket .requestStream(DefaultPayload.create(“Hello”)) .map(Payload::getDataUtf8) .doOnNext(System.out::println) .take(10) .then() .doFinally(signalType -> socket.dispose()) .then() .block(); }类似Request-Response(返回Mono),只不过返回的是FluxrequestChannel @Test public void testRequestChannel(){ //SERVER RSocketFactory.receive() .acceptor(new SocketAcceptor(){ @Override public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) { return Mono.just( new AbstractRSocket() { @Override public Flux<Payload> requestChannel(Publisher<Payload> payloads) { return Flux.from(payloads) .map(Payload::getDataUtf8) .map(s -> “Echo: " + s) .map(DefaultPayload::create); } }); } }) .transport(TcpServerTransport.create(“localhost”, 8080)) .start() .subscribe(); //CLIENT RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create(“localhost”, 8080)) .start() .block(); socket .requestChannel( Flux.interval(Duration.ofMillis(1000)).map(i -> DefaultPayload.create(“Hello”))) .map(Payload::getDataUtf8) .doOnNext(System.out::println) .take(10) .doFinally(signalType -> socket.dispose()) .then() .block(); }类似websocket,可以双向通信MetadataPush @Test public void testMetadataPush() throws InterruptedException { //SERVER RSocketFactory.receive() .acceptor( (setupPayload, reactiveSocket) -> Mono.just( new AbstractRSocket() { @Override public Mono<Void> metadataPush(Payload payload) { System.out.printf(“metadataPush: %s%n”, payload.getDataUtf8()); return Mono.empty(); } })) .transport(TcpServerTransport.create(“localhost”, 8080)) .start() .subscribe(); //CLIENT RSocket socket = RSocketFactory.connect() .transport(TcpClientTransport.create(“localhost”, 8080)) .start() .block(); socket .metadataPush(DefaultPayload.create(“hello”,“version=1.0.0+”)) .block(); socket.dispose(); TimeUnit.SECONDS.sleep(5); }RSocket还定义了metadataPush方法,与fireAndForget方法不同的是metadataPush方法会等待data pushed成功,然后在接收到对方发送的complete signal时complete小结RSocket是一种bi-directional、multiplexed、message-based的二进制协议RSocket有四种Interaction Model,分别是Request-Response、Fire-and-Forget、Request-Stream、ChannelRSocket的Frame包含metadata及data payload,其中metadata可选,可以用于描述data payload;除了可以在4种Interaction Model对应方法的Payload参数中设置metadata外,还可以使用RSocket定义的metadataPush方法来专门push metadatadocrsocket.io ...

April 12, 2019 · 3 min · jiezi