本文主要研究一下Elasticsearch的ExponentiallyWeightedMovingAverage

ExponentiallyWeightedMovingAverage

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/ExponentiallyWeightedMovingAverage.java

public class ExponentiallyWeightedMovingAverage {    private final double alpha;    private final AtomicLong averageBits;    /**     * Create a new EWMA with a given {@code alpha} and {@code initialAvg}. A smaller alpha means     * that new data points will have less weight, where a high alpha means older data points will     * have a lower influence.     */    public ExponentiallyWeightedMovingAverage(double alpha, double initialAvg) {        if (alpha < 0 || alpha > 1) {            throw new IllegalArgumentException("alpha must be greater or equal to 0 and less than or equal to 1");        }        this.alpha = alpha;        this.averageBits = new AtomicLong(Double.doubleToLongBits(initialAvg));    }    public double getAverage() {        return Double.longBitsToDouble(this.averageBits.get());    }    public void addValue(double newValue) {        boolean successful = false;        do {            final long currentBits = this.averageBits.get();            final double currentAvg = getAverage();            final double newAvg = (alpha * newValue) + ((1 - alpha) * currentAvg);            final long newBits = Double.doubleToLongBits(newAvg);            successful = averageBits.compareAndSet(currentBits, newBits);        } while (successful == false);    }}
  • ExponentiallyWeightedMovingAverage实现了EWMA,它是线程安全的;其构造器要求输入alpha及initialAvg;alpha越大表示新数据权重越大旧数据权重越小
  • getAverage返回的是averageBits的值,不过它存储的是double的bit形式,返回的时候使用Double.longBitsToDouble转换会double
  • addValue方法使用(alpha * newValue) + ((1 - alpha) * currentAvg)计算新值,然后使用averageBits.compareAndSet方法来实现原子更新

实例

elasticsearch-7.0.1/server/src/test/java/org/elasticsearch/common/ExponentiallyWeightedMovingAverageTests.java

public class ExponentiallyWeightedMovingAverageTests extends ESTestCase {    public void testEWMA() {        final ExponentiallyWeightedMovingAverage ewma = new ExponentiallyWeightedMovingAverage(0.5, 10);        ewma.addValue(12);        assertThat(ewma.getAverage(), equalTo(11.0));        ewma.addValue(10);        ewma.addValue(15);        ewma.addValue(13);        assertThat(ewma.getAverage(), equalTo(12.875));    }    public void testInvalidAlpha() {        IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new ExponentiallyWeightedMovingAverage(-0.5, 10));        assertThat(ex.getMessage(), equalTo("alpha must be greater or equal to 0 and less than or equal to 1"));        ex = expectThrows(IllegalArgumentException.class, () -> new ExponentiallyWeightedMovingAverage(1.5, 10));        assertThat(ex.getMessage(), equalTo("alpha must be greater or equal to 0 and less than or equal to 1"));    }    public void testConvergingToValue() {        final ExponentiallyWeightedMovingAverage ewma = new ExponentiallyWeightedMovingAverage(0.5, 10000);        for (int i = 0; i < 100000; i++) {            ewma.addValue(1);        }        assertThat(ewma.getAverage(), lessThan(2.0));    }}
  • testEWMA方法测试算法的计算逻辑;testInvalidAlpha测试alpha参数的校验;testConvergingToValue则测试ewma值的收敛

小结

  • ExponentiallyWeightedMovingAverage实现了EWMA,它是线程安全的;其构造器要求输入alpha及initialAvg;alpha越大表示新数据权重越大旧数据权重越小
  • getAverage返回的是averageBits的值,不过它存储的是double的bit形式,返回的时候使用Double.longBitsToDouble转换会double
  • addValue方法使用(alpha * newValue) + ((1 - alpha) * currentAvg)计算新值,然后使用averageBits.compareAndSet方法来实现原子更新

doc

  • 聊聊rsocket load balancer的Ewma
  • ExponentiallyWeightedMovingAverage
  • ExponentiallyWeightedMovingAverageTests