背景

在之前《基于日志告警》的实现中,能够配置每小时最多发送X条告警,然而实现的比拟粗犷:

  1. 强制为每小时
  2. 是以天然小时为时间段(从整点0分到60分),就容易呈现相似59分发送X条,下一个小时的1分又发送X条的状况,也就是在59-1分这短短几分钟之内发送X * 2条告警。

这里更好的计划应该是:能够自定义配置在X时间段内最多发送Y条告警时间段滑动窗口来计数。

关键点:

数据结构

固定长度的数据为一个窗口,数组的每个对象就是一个,通过下标指针来标识以后窗口。
比方以下例子是时长为60秒的窗口,分为10个桶,即每个桶时长6秒。

public class SlidingWindow {    protected int bucketCount;    protected int intervalInMs;    protected WindowBucket[] windowBuckets;}    

滑动形式

  1. 被动:启动定时工作,定时滑动下标指针。
  2. 被动:数据进来时,实时依据工夫计算以后下标。

以上 被动形式的计划的毛病:

  1. 须要额定加个定时工作。
  2. 定时工作自身可能会影响到下标滑动的精准度,特地是如果窗口时长很短,比方:时长1秒,并且分成100个桶。

新/旧数据

以上被动的形式,在统计汇总时,怎么过滤掉已过期的的数据(数据是上一个窗口的数据)?
仍旧以时长为60秒的窗口,分为10个桶为例子:

  1. 5秒的时候,插入数据,这时候插入的桶下标为1
  2. 61-66秒都没有数据,也就是新的一轮窗口,下标为1的桶并没有批改
  3. 70秒的时候,插入数据,插入的桶下标为2。这时候如果合并数组的数据,会把下标1的数据都合并进来,但它的数据是上个窗口的数据,不应该被算进来。

解决方案是给每个桶加一个开始工夫
例如:刚开始第一个桶的起始工夫是1s,第二个桶的起始工夫是7s。通过一轮(60s)后,第一个桶的起始工夫应该是61s,第二个桶的起始工夫变为67s
在插入数据时,也通过开始工夫判断以后桶的数据是不是上个窗口的:

  1. 如果不是:以后桶数据上累加。
  2. 如果是:则须要将桶数据重置(计数改为0,从新设置以后桶的起始工夫),通过锁来防止出现并发问题。

合并的时候,查看每个桶的开始工夫间隔以后工夫是否在一个窗口内
比方以上例子,在第70秒的插入数据到第二个桶,且第二个桶的起始工夫被改为67s。这时候合并统计以后窗口数据时,发现下标1的起始工夫是1s,不在以后窗口工夫内(67-0 > 60),就不算入合并值。

public class WindowBucketWrap<T extends WindowBucket> {    private volatile long beginTime;    private T windowBucket;}   

同时为了防止出现时钟回拨的问题,取工夫用的是nanoTime
因为JAVA的System.nanoTime()存在性能上的问题,这里自定义了工夫服务

/** * @author  * @data 2020-12-21 11:07 * @description */public class TimeServer {    /**     * 一毫秒有多少纳秒.     */    public static final long NANOS_PER_MILLI = 1000_000L;    private static final long beginNanoTime;    private static volatile long beginMilliTime;    private static volatile long currentNanoTime;    static {        currentNanoTime = System.nanoTime();        beginNanoTime = currentNanoTime;        beginMilliTime = System.currentTimeMillis();        Thread daemon = new Thread(new Runnable() {            @Override            public void run() {                while (true) {                    currentNanoTime = System.nanoTime();                    try {                        TimeUnit.NANOSECONDS.sleep(NANOS_PER_MILLI / 2);                    } catch (Throwable ignore) {                    }                }            }        });        daemon.setDaemon(true);        daemon.setName("slidingwindow-time-tick-thread");        daemon.start();    }

数据字段

不同的业务场景,窗口保留的数据是不一样的。比方我的项目背景的日志告警能够只有已告警次数一种数据,限流能够有已通过被限流两种数据;既然是通用的,那就要兼容各种场景,不同的业务能够定义不同的统计数据。
这里能够将桶的数据字段设为数组类型,长度和列表通过数据枚举来指定。

public class EventTypeWrap<E extends Enum> {    private E[] eventTypes;    public EventTypeWrap(Class<E> clazz) {        eventTypes = clazz.getEnumConstants();        Assert.notEmpty(eventTypes, "事件类型列表为空!");    }    public E[] getEventTypes() {        return eventTypes;    }}public class SlidingWindow {    public WindowBucket buildWindowBucket() {        int eventTypeSize = eventTypeWrap.getEventTypes().length;        return new WindowBucket(eventTypeSize);    }}public class WindowBucket {    private long[] eventValues;        public WindowBucket(int eventSize) {        eventValues = new long[eventSize];    }    }

数据计算时,传入指定的类型。

public void add(Enum eventType) {    ……}

性能和一致性

因为数据扩散到多个桶内,合并值的时候,须要将所有桶的数据加起来,而在合并的过程中,桶内的数据是会扭转的,这里的实现分两种:

  1. 强一致性:加锁。
  2. 最终一致性:不加锁。
public class AccurateWindowBucket extends WindowBucket {    private long[] eventValues;    private ReentrantLock updateLock;    /**     * @param eventSize     * @author     * @date 2020-12-21 15:35     */    public AccurateWindowBucket(ReentrantLock updateLock, int eventSize) {        super(eventSize);        this.updateLock = updateLock;        eventValues = new long[eventSize];    }        @Override    public void add(Enum eventType, int value) {        updateLock.lock();        try {            int index = eventType.ordinal();            eventValues[index] = eventValues[index] + value;        } finally {            updateLock.unlock();        }    }    }public class EfficiencyWindowBucket extends WindowBucket {    private LongAdder[] eventValues;    /**     * @param eventSize     * @author     * @date 2020-12-21 15:35     */    public EfficiencyWindowBucket(int eventSize) {        super(eventSize);        eventValues = new LongAdder[eventSize];        IntStream.range(0, eventSize).forEach(i -> {            eventValues[i] = new LongAdder();        });    }        @Override    public void add(Enum eventType, int value) {        eventValues[eventType.ordinal()].add(value);    }    }   

以上两种计划,对性能极致要求的,用最终一致性,对精度有要求的,用强一致性

源码

窗口桶:

public abstract class WindowBucket {    protected int eventSize;    /**     * @param eventSize     * @author      * @date 2020-12-21 15:35     */    public WindowBucket(int eventSize) {        this.eventSize = eventSize;    }    /**     * @param eventType     * @return long     * @author      * @date 2020-12-18 11:22     */    public void add(Enum eventType) {        add(eventType, 1);    }    /**     * @param eventType     * @param value     * @return long     * @author      * @date 2020-12-18 11:22     */    public abstract void add(Enum eventType, int value);    /**     * @param eventType     * @return long     * @author      * @date 2020-12-24 17:13     */    public abstract long addAndGet(Enum eventType);    /**     * @param eventType     * @return long     * @author      * @date 2020-12-21 15:36     */    public abstract long getValue(Enum eventType);    /**     * @author      * @date 2020-12-21 17:28     */    public abstract void reset();    /**     * @return long[]     * @author      * @date 2020-12-21 17:29     */    public abstract long[] getValues();    @Override    public String toString() {        return this.getClass().getSimpleName() + "{" +                "eventValues=" + Arrays.toString(getValues()) +                '}';    }}/** * 性能优先,最终一致性 * @author minchin * @date 2020-12-24 11:02 */public class EfficiencyWindowBucket extends WindowBucket {    private LongAdder[] eventValues;    /**     * @param eventSize     * @author minchin     * @date 2020-12-21 15:35     */    public EfficiencyWindowBucket(int eventSize) {        super(eventSize);        eventValues = new LongAdder[eventSize];        IntStream.range(0, eventSize).forEach(i -> {            eventValues[i] = new LongAdder();        });    }    @Override    public void add(Enum eventType, int value) {        eventValues[eventType.ordinal()].add(value);    }    @Override    public long addAndGet(Enum eventType) {        add(eventType);        return getValue(eventType);    }    @Override    public long getValue(Enum eventType) {        return eventValues[eventType.ordinal()].longValue();    }    @Override    public void reset() {        for (LongAdder value : eventValues) {            value.reset();        }    }    @Override    public long[] getValues() {        long[] values = new long[eventSize];        IntStream.range(0, eventSize).forEach(i -> {            values[i] = eventValues[i].longValue();        });        return values;    }}/** * 强一致性 * @author * @data 2020-12-24 10:35 * @description */public class AccurateWindowBucket extends WindowBucket {    private long[] eventValues;    private ReentrantLock updateLock;    /**     * @param eventSize     * @author minchin     * @date 2020-12-21 15:35     */    public AccurateWindowBucket(ReentrantLock updateLock, int eventSize) {        super(eventSize);        this.updateLock = updateLock;        eventValues = new long[eventSize];    }    @Override    public void add(Enum eventType, int value) {        updateLock.lock();        try {            int index = eventType.ordinal();            eventValues[index] = eventValues[index] + value;        } finally {            updateLock.unlock();        }    }    @Override    public long addAndGet(Enum eventType) {        updateLock.lock();        try {            add(eventType);            return getValue(eventType);        } finally {            updateLock.unlock();        }    }    @Override    public long getValue(Enum eventType) {        updateLock.lock();        try {            return eventValues[eventType.ordinal()];        } finally {            updateLock.unlock();        }    }    @Override    public void reset() {        updateLock.lock();        try {            IntStream.range(0, eventSize).forEach(i -> eventValues[i] = 0L);        } finally {            updateLock.unlock();        }    }    @Override    public long[] getValues() {        updateLock.lock();        try {            return Arrays.copyOf(eventValues, eventValues.length);        } finally {            updateLock.unlock();        }    }}/** * @author * @data 2020-12-18 11:17 * @description */public class WindowBucketWrap<T extends WindowBucket> {    private volatile long beginTime;    private T windowBucket;    public WindowBucketWrap(long beginTime, T windowBucket) {        this.beginTime = beginTime;        this.windowBucket = windowBucket;    }    /**     * @param windowStarTime     * @author minchin     * @date 2020-12-21 10:14     */    public void reset(long windowStarTime) {        windowBucket.reset();        beginTime = windowStarTime;    }    public long getBeginTime() {        return beginTime;    }    public WindowBucketWrap<T> setBeginTime(long beginTime) {        this.beginTime = beginTime;        return this;    }    public T getWindowBucket() {        return windowBucket;    }    @Override    public String toString() {        return "WindowBucketWrap{" +                "beginTime=" + beginTime +                ", windowBucket=" + windowBucket +                '}';    }}

窗口:

/** * @author * @data 2020-12-17 17:14 * @description */public class EventTypeWrap<E extends Enum> {    private E[] eventTypes;    public EventTypeWrap(Class<E> clazz) {        eventTypes = clazz.getEnumConstants();        Assert.notEmpty(eventTypes, "事件类型列表为空!");    }    public E[] getEventTypes() {        return eventTypes;    }    public EventTypeWrap<E> setEventTypes(E[] eventTypes) {        this.eventTypes = eventTypes;        return this;    }}/** * @author * @data 2020-12-24 11:16 * @description */public enum SlidingWindowType {    /**     * 精准     */    ACCURATE,    /**     * 性能,最终统一     */    EFFICIENCY}/** * @author * @data 2020-12-18 15:36 * @description */public abstract class SlidingWindow {    protected ReentrantLock updateLock;    protected Class<? extends Enum> eventTypeClass;    protected SlidingWindowType windowType = SlidingWindowType.EFFICIENCY;    protected EventTypeWrap<? extends Enum> eventTypeWrap;    protected int bucketCount;    protected int intervalInMs;    protected int bucketLengthInMs;    protected WindowBucketWrap[] windowBucketWraps;    protected SlidingWindow() {    }    /**     * @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow     * @author     * @date 2020-12-18 16:58     */    protected SlidingWindow init() {        if (eventTypeClass == null || bucketCount == 0 || intervalInMs == 0) {            throw new IllegalArgumentException("init SlidingWindow error!");        }        updateLock = new ReentrantLock();        eventTypeWrap = new EventTypeWrap(eventTypeClass);        long currentTime = TimeServer.getIntervalMilliTime();        windowBucketWraps = new WindowBucketWrap[bucketCount];        IntStream.range(0, bucketCount).forEach(i -> {            windowBucketWraps[i] = new WindowBucketWrap(currentTime, buildWindowBucket());        });        bucketLengthInMs = intervalInMs / bucketCount;        return this;    }    /**     * @return com.xxxx.xxxx.common.slidingwindow.WindowBucket     * @author     * @date 2020-12-24 11:22     */    protected abstract WindowBucket buildWindowBucket();    /**     * @param eventType     * @author     * @date 2020-12-21 17:33     */    public void add(Enum eventType) {        WindowBucket windowBucket = getCurrentBucket().getWindowBucket();        windowBucket.add(eventType);    }    /**     * @param eventType     * @param value     * @author     * @date 2020-12-24 17:16     */    public void add(Enum eventType, int value) {        WindowBucket windowBucket = getCurrentBucket().getWindowBucket();        windowBucket.add(eventType, value);    }    /**     * @param eventType     * @return long     * @author     * @date 2020-12-24 17:13     */    public abstract long addAndSum(Enum eventType);    /**     * 相似LongAdder,晋升性能,然而局部失落精度     *     * @param eventType     * @return long     * @author     * @date 2020-12-21 17:42     */    public abstract long sum(Enum eventType);    /**     * @return com.xxxx.xxxx.common.slidingwindow.WindowBucketWrap     * @author     * @date 2020-12-18 16:56     */    public WindowBucketWrap getCurrentBucket() {        long currentTime = TimeServer.getIntervalMilliTime();        int bucketId = calculateCurrentBucketId(currentTime);        long windowStarTime = calculateWindowStartTime(currentTime);        while (true) {            WindowBucketWrap windowBucketWrap = windowBucketWraps[bucketId];            long windowBucketBeginTime = windowBucketWrap.getBeginTime();            if (windowBucketBeginTime == windowStarTime) {                return windowBucketWrap;            }            // 新的一轮            if (windowStarTime > windowBucketBeginTime) {                resetWindow(windowBucketWrap, windowStarTime);            } else {                //因为用了nanoTime,不应该呈现"时钟回拨"问题,所以间接抛异样                throw new DateTimeException(                        String.format("windowBucketBeginTime: %d, windowStarTime: %d, 以后时钟异样,请查看是否呈现相似'时钟回拨'的问题!",                                windowBucketBeginTime, windowStarTime));            }        }    }    /**     * @param currentTime     * @return int     * @author     * @date 2020-12-18 17:09     */    protected int calculateCurrentBucketId(long currentTime) {        long timeId = currentTime / bucketLengthInMs;        return (int) (timeId % windowBucketWraps.length);    }    /**     * @param currentTime     * @return long     * @author     * @date 2020-12-18 17:19     */    protected long calculateWindowStartTime(long currentTime) {        return currentTime - currentTime % bucketLengthInMs;    }    /**     * @param windowBucketWrap     * @param windowStarTime     * @author     * @date 2020-12-21 10:30     */    protected void resetWindow(WindowBucketWrap windowBucketWrap, long windowStarTime) {        if (updateLock.tryLock()) {            try {                if (windowStarTime > windowBucketWrap.getBeginTime()) {                    windowBucketWrap.reset(windowStarTime);                }            } finally {                updateLock.unlock();            }        } else {            // 如果持续循环,很可能其余线程还没操作完,先让出CPU给其余业务/线程            Thread.yield();        }    }    /**     * @param currentTime     * @param windowBucketWrap     * @return boolean     * @author     * @date 2020-12-21 17:37     */    protected boolean isWindowDeprecated(long currentTime, WindowBucketWrap<WindowBucket> windowBucketWrap) {        return currentTime - windowBucketWrap.getBeginTime() > intervalInMs;    }    public Class<? extends Enum> getEventTypeClass() {        return eventTypeClass;    }    protected SlidingWindow setEventTypeClass(Class<? extends Enum> eventTypeClass) {        this.eventTypeClass = eventTypeClass;        return this;    }    /**     * @return com.xxxx.xxxx.common.slidingwindow.EventTypeWrap<? extends java.lang.Enum>     * @author     * @date 2020-12-18 16:48     */    public EventTypeWrap<? extends Enum> getEventTypeWrap() {        return eventTypeWrap;    }    /**     * @return int     * @author     * @date 2020-12-18 16:56     */    public int getBucketCount() {        return bucketCount;    }    /**     * @param bucketCount     * @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow     * @author     * @date 2020-12-18 16:56     */    protected SlidingWindow setBucketCount(int bucketCount) {        this.bucketCount = bucketCount;        return this;    }    /**     * @return int     * @author     * @date 2020-12-18 16:56     */    public int getIntervalInMs() {        return intervalInMs;    }    /**     * @param intervalInMs     * @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow     * @author     * @date 2020-12-18 16:56     */    protected SlidingWindow setIntervalInMs(int intervalInMs) {        this.intervalInMs = intervalInMs;        return this;    }    /**     * @return com.xxxx.xxxx.common.slidingwindow.SlidingWindowType     * @author     * @date 2020-12-24 11:18     */    public SlidingWindowType getWindowType() {        return windowType;    }    /**     * @param windowType     * @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow     * @author     * @date 2020-12-24 11:18     */    protected SlidingWindow setWindowType(SlidingWindowType windowType) {        this.windowType = windowType;        return this;    }    @Override public String toString() {        return "SlidingWindow{" +                "eventTypeClass=" + eventTypeClass.getName() +                ", windowType=" + windowType +                ", bucketCount=" + bucketCount +                ", intervalInMs=" + intervalInMs +                ", bucketLengthInMs=" + bucketLengthInMs +                ", windowBucketWraps=\n" + StringUtils.join(windowBucketWraps, ",\n") +                '}';    }    /**     * @author     * @date 2020-12-18 16:48     */    public static class SlidingWindowBuilder {        private SlidingWindow slidingWindow;        public SlidingWindowBuilder() {            this(SlidingWindowType.EFFICIENCY);        }        public SlidingWindowBuilder(SlidingWindowType windowType) {            if (SlidingWindowType.ACCURATE.equals(windowType)) {                slidingWindow = new AccurateSlidingWindow();            } else {                slidingWindow = new EfficiencySlidingWindow();            }        }        /**         * @param eventTypeClass         * @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow.SlidingWindowBuilder         * @author         * @date 2020-12-18 16:48         */        public SlidingWindowBuilder ofEventTypeClass(Class<? extends Enum> eventTypeClass) {            slidingWindow.setEventTypeClass(eventTypeClass);            return this;        }        /**         * @param bucketCount         * @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow.SlidingWindowBuilder         * @author         * @date 2020-12-18 16:55         */        public SlidingWindowBuilder ofBucketCount(int bucketCount) {            slidingWindow.setBucketCount(bucketCount);            return this;        }        /**         * @param windowIntervalInMs         * @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow.SlidingWindowBuilder         * @author         * @date 2020-12-18 16:55         */        public SlidingWindowBuilder ofWindowIntervalInMs(int windowIntervalInMs) {            slidingWindow.setIntervalInMs(windowIntervalInMs);            return this;        }        /**         * @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow         * @author         * @date 2020-12-18 16:48         */        public SlidingWindow build() {            return slidingWindow.init();        }    }}/** * @author * @data 2020-12-24 17:26 * @description */public class AccurateSlidingWindow extends SlidingWindow {    @Override    public WindowBucket buildWindowBucket() {        int eventTypeSize = eventTypeWrap.getEventTypes().length;        return new AccurateWindowBucket(updateLock, eventTypeSize);    }    @Override    public long addAndSum(Enum eventType) {        updateLock.lock();        try {            WindowBucket windowBucket = getCurrentBucket().getWindowBucket();            windowBucket.add(eventType);            return sum(eventType);        } finally {            updateLock.unlock();        }    }    @Override    public long sum(Enum eventType) {        updateLock.lock();        try {            long value = 0;            long currentTime = TimeServer.getIntervalMilliTime();            for (WindowBucketWrap windowBucketWrap : windowBucketWraps) {                if (!isWindowDeprecated(currentTime, windowBucketWrap)) {                    value += windowBucketWrap.getWindowBucket().getValue(eventType);                }            }            return value;        } finally {            updateLock.unlock();        }    }}/** * @author * @data 2020-12-24 17:26 * @description */public class EfficiencySlidingWindow extends SlidingWindow {    @Override    public WindowBucket buildWindowBucket() {        int eventTypeSize = eventTypeWrap.getEventTypes().length;        return new EfficiencyWindowBucket(eventTypeSize);    }    @Override    public long addAndSum(Enum eventType) {        add(eventType);        return sum(eventType);    }    @Override    public long sum(Enum eventType) {        long value = 0;        long currentTime = TimeServer.getIntervalMilliTime();        for (WindowBucketWrap windowBucketWrap : windowBucketWraps) {            if (!isWindowDeprecated(currentTime, windowBucketWrap)) {                value += windowBucketWrap.getWindowBucket().getValue(eventType);            }        }        return value;    }}

工夫服务器:

/** * @author  * @data 2020-12-21 11:07 * @description */public class TimeServer {    /**     * 一毫秒有多少纳秒.     */    public static final long NANOS_PER_MILLI = 1000_000L;    private static final long beginNanoTime;    private static volatile long beginMilliTime;    private static volatile long currentNanoTime;    static {        currentNanoTime = System.nanoTime();        beginNanoTime = currentNanoTime;        beginMilliTime = System.currentTimeMillis();        Thread daemon = new Thread(new Runnable() {            @Override            public void run() {                while (true) {                    currentNanoTime = System.nanoTime();                    try {                        // 不须要十分高的精度,容许有点儿误差                        TimeUnit.NANOSECONDS.sleep(NANOS_PER_MILLI / 2);                    } catch (Throwable ignore) {                    }                }            }        });        daemon.setDaemon(true);        daemon.setName("slidingwindow-time-tick-thread");        daemon.start();    }    /**     * @return long     * @author      * @date 2020-12-21 11:14     */    public static long getCurrentNanoTime() {        return currentNanoTime;    }    /**     * @return long     * @author      * @date 2020-12-21 12:59     */    public static long getCurrentMilliTime() {        return beginMilliTime + getIntervalMilliTime();    }    /**     * @return long     * @author      * @date 2020-12-21 11:16     */    public static long getIntervalNanoTime() {        return getCurrentNanoTime() - beginNanoTime;    }    /**     * @return long     * @author      * @date 2020-12-21 11:20     */    public static long getIntervalMilliTime() {        return getIntervalNanoTime() / NANOS_PER_MILLI;    }}

应用

原先的日志次数限流类TimesLimiter引入滑动窗口:

/** * * @author * @data 2020-06-02 10:51 * @description */public class TimesLimiter implements Limiter {    private static final int SLIDINGWINDOW_BUCKETCOUNT = 10;    private int timesLimit;    private SlidingWindow slidingWindow;    public TimesLimiter(int timesLimitSeconds, int timesLimit) {        this.timesLimit = timesLimit;        slidingWindow = new SlidingWindow.SlidingWindowBuilder(SlidingWindowType.ACCURATE)                .ofEventTypeClass(StandardLevel.class)                .ofBucketCount(SLIDINGWINDOW_BUCKETCOUNT)                .ofWindowIntervalInMs(timesLimitSeconds * 1000)                .build();    }    @Override    public boolean isLimited(String content) {        if (slidingWindow.addAndSum(StandardLevel.ERROR) > timesLimit) {            slidingWindow.add(StandardLevel.ERROR, -1);            return true;        }        return false;    }}