背景
在之前《基于日志告警》的实现中,能够配置每小时最多发送X条告警
,然而实现的比拟粗犷:
- 强制为
每小时
。 - 是以
天然小时
为时间段(从整点0分到60分),就容易呈现相似59分发送X条,下一个小时的1分又发送X条
的状况,也就是在59-1分
这短短几分钟之内发送X * 2
条告警。
这里更好的计划应该是:能够自定义配置在X时间段内最多发送Y条告警
,时间段
以滑动窗口
来计数。
关键点:
数据结构
固定长度的数据为一个窗口
,数组的每个对象就是一个桶
,通过下标指针来标识以后窗口。
比方以下例子是时长为60秒的窗口,分为10个桶,即每个桶时长6秒。
public class SlidingWindow { protected int bucketCount; protected int intervalInMs; protected WindowBucket[] windowBuckets;}
滑动形式
- 被动:启动定时工作,定时滑动下标指针。
- 被动:数据进来时,实时依据工夫计算以后下标。
以上 被动
形式的计划的毛病:
- 须要额定加个定时工作。
- 定时工作自身可能会影响到下标滑动的精准度,特地是如果窗口时长很短,比方:时长1秒,并且分成100个桶。
新/旧数据
以上被动
的形式,在统计汇总时,怎么过滤掉已过期的的数据
(数据是上一个窗口的数据)?
仍旧以时长为60秒的窗口,分为10个桶为例子:
- 第
5
秒的时候,插入数据,这时候插入的桶下标为1
。 - 第
61-66
秒都没有数据,也就是新的一轮窗口,下标为1
的桶并没有批改
。 - 第
70
秒的时候,插入数据,插入的桶下标为2
。这时候如果合并数组的数据,会把下标1
的数据都合并进来,但它的数据是上个窗口
的数据,不应该被算进来。
解决方案是给每个桶加一个开始工夫
。
例如:刚开始第一个桶的起始工夫是1s
,第二个桶的起始工夫是7s
。通过一轮(60s)后,第一个桶的起始工夫应该是61s
,第二个桶的起始工夫变为67s
。
在插入数据时,也通过开始工夫
判断以后桶的数据是不是上个窗口的:
- 如果不是:以后桶数据上累加。
- 如果是:则须要将桶数据重置(计数改为0,从新设置以后桶的起始工夫),通过锁来防止出现并发问题。
合并的时候,查看每个桶的开始工夫
间隔以后工夫是否在一个窗口内
。
比方以上例子,在第70
秒的插入数据到第二个桶,且第二个桶的起始工夫被改为67s
。这时候合并统计以后窗口数据时,发现下标1
的起始工夫是1s
,不在以后窗口工夫内(67-0 > 60
),就不算入合并值。
public class WindowBucketWrap<T extends WindowBucket> { private volatile long beginTime; private T windowBucket;}
同时为了防止出现时钟回拨
的问题,取工夫用的是nanoTime
。
因为JAVA的System.nanoTime()
存在性能上的问题,这里自定义了工夫服务
。
/** * @author * @data 2020-12-21 11:07 * @description */public class TimeServer { /** * 一毫秒有多少纳秒. */ public static final long NANOS_PER_MILLI = 1000_000L; private static final long beginNanoTime; private static volatile long beginMilliTime; private static volatile long currentNanoTime; static { currentNanoTime = System.nanoTime(); beginNanoTime = currentNanoTime; beginMilliTime = System.currentTimeMillis(); Thread daemon = new Thread(new Runnable() { @Override public void run() { while (true) { currentNanoTime = System.nanoTime(); try { TimeUnit.NANOSECONDS.sleep(NANOS_PER_MILLI / 2); } catch (Throwable ignore) { } } } }); daemon.setDaemon(true); daemon.setName("slidingwindow-time-tick-thread"); daemon.start(); }
数据字段
不同的业务场景,窗口保留的数据是不一样的。比方我的项目背景的日志告警
能够只有已告警次数
一种数据,限流
能够有已通过
和被限流
两种数据;既然是通用的,那就要兼容各种场景,不同的业务能够定义不同的统计数据。
这里能够将桶的数据字段设为数组类型,长度和列表通过数据枚举
来指定。
public class EventTypeWrap<E extends Enum> { private E[] eventTypes; public EventTypeWrap(Class<E> clazz) { eventTypes = clazz.getEnumConstants(); Assert.notEmpty(eventTypes, "事件类型列表为空!"); } public E[] getEventTypes() { return eventTypes; }}public class SlidingWindow { public WindowBucket buildWindowBucket() { int eventTypeSize = eventTypeWrap.getEventTypes().length; return new WindowBucket(eventTypeSize); }}public class WindowBucket { private long[] eventValues; public WindowBucket(int eventSize) { eventValues = new long[eventSize]; } }
数据计算时,传入指定的类型。
public void add(Enum eventType) { ……}
性能和一致性
因为数据扩散到多个桶内,合并值的时候,须要将所有桶的数据加起来,而在合并的过程中,桶内的数据是会扭转的,这里的实现分两种:
- 强一致性:加锁。
- 最终一致性:不加锁。
public class AccurateWindowBucket extends WindowBucket { private long[] eventValues; private ReentrantLock updateLock; /** * @param eventSize * @author * @date 2020-12-21 15:35 */ public AccurateWindowBucket(ReentrantLock updateLock, int eventSize) { super(eventSize); this.updateLock = updateLock; eventValues = new long[eventSize]; } @Override public void add(Enum eventType, int value) { updateLock.lock(); try { int index = eventType.ordinal(); eventValues[index] = eventValues[index] + value; } finally { updateLock.unlock(); } } }public class EfficiencyWindowBucket extends WindowBucket { private LongAdder[] eventValues; /** * @param eventSize * @author * @date 2020-12-21 15:35 */ public EfficiencyWindowBucket(int eventSize) { super(eventSize); eventValues = new LongAdder[eventSize]; IntStream.range(0, eventSize).forEach(i -> { eventValues[i] = new LongAdder(); }); } @Override public void add(Enum eventType, int value) { eventValues[eventType.ordinal()].add(value); } }
以上两种计划,对性能
有极致
要求的,用最终一致性
,对精度
有要求的,用强一致性
。
源码
窗口桶:
public abstract class WindowBucket { protected int eventSize; /** * @param eventSize * @author * @date 2020-12-21 15:35 */ public WindowBucket(int eventSize) { this.eventSize = eventSize; } /** * @param eventType * @return long * @author * @date 2020-12-18 11:22 */ public void add(Enum eventType) { add(eventType, 1); } /** * @param eventType * @param value * @return long * @author * @date 2020-12-18 11:22 */ public abstract void add(Enum eventType, int value); /** * @param eventType * @return long * @author * @date 2020-12-24 17:13 */ public abstract long addAndGet(Enum eventType); /** * @param eventType * @return long * @author * @date 2020-12-21 15:36 */ public abstract long getValue(Enum eventType); /** * @author * @date 2020-12-21 17:28 */ public abstract void reset(); /** * @return long[] * @author * @date 2020-12-21 17:29 */ public abstract long[] getValues(); @Override public String toString() { return this.getClass().getSimpleName() + "{" + "eventValues=" + Arrays.toString(getValues()) + '}'; }}/** * 性能优先,最终一致性 * @author minchin * @date 2020-12-24 11:02 */public class EfficiencyWindowBucket extends WindowBucket { private LongAdder[] eventValues; /** * @param eventSize * @author minchin * @date 2020-12-21 15:35 */ public EfficiencyWindowBucket(int eventSize) { super(eventSize); eventValues = new LongAdder[eventSize]; IntStream.range(0, eventSize).forEach(i -> { eventValues[i] = new LongAdder(); }); } @Override public void add(Enum eventType, int value) { eventValues[eventType.ordinal()].add(value); } @Override public long addAndGet(Enum eventType) { add(eventType); return getValue(eventType); } @Override public long getValue(Enum eventType) { return eventValues[eventType.ordinal()].longValue(); } @Override public void reset() { for (LongAdder value : eventValues) { value.reset(); } } @Override public long[] getValues() { long[] values = new long[eventSize]; IntStream.range(0, eventSize).forEach(i -> { values[i] = eventValues[i].longValue(); }); return values; }}/** * 强一致性 * @author * @data 2020-12-24 10:35 * @description */public class AccurateWindowBucket extends WindowBucket { private long[] eventValues; private ReentrantLock updateLock; /** * @param eventSize * @author minchin * @date 2020-12-21 15:35 */ public AccurateWindowBucket(ReentrantLock updateLock, int eventSize) { super(eventSize); this.updateLock = updateLock; eventValues = new long[eventSize]; } @Override public void add(Enum eventType, int value) { updateLock.lock(); try { int index = eventType.ordinal(); eventValues[index] = eventValues[index] + value; } finally { updateLock.unlock(); } } @Override public long addAndGet(Enum eventType) { updateLock.lock(); try { add(eventType); return getValue(eventType); } finally { updateLock.unlock(); } } @Override public long getValue(Enum eventType) { updateLock.lock(); try { return eventValues[eventType.ordinal()]; } finally { updateLock.unlock(); } } @Override public void reset() { updateLock.lock(); try { IntStream.range(0, eventSize).forEach(i -> eventValues[i] = 0L); } finally { updateLock.unlock(); } } @Override public long[] getValues() { updateLock.lock(); try { return Arrays.copyOf(eventValues, eventValues.length); } finally { updateLock.unlock(); } }}/** * @author * @data 2020-12-18 11:17 * @description */public class WindowBucketWrap<T extends WindowBucket> { private volatile long beginTime; private T windowBucket; public WindowBucketWrap(long beginTime, T windowBucket) { this.beginTime = beginTime; this.windowBucket = windowBucket; } /** * @param windowStarTime * @author minchin * @date 2020-12-21 10:14 */ public void reset(long windowStarTime) { windowBucket.reset(); beginTime = windowStarTime; } public long getBeginTime() { return beginTime; } public WindowBucketWrap<T> setBeginTime(long beginTime) { this.beginTime = beginTime; return this; } public T getWindowBucket() { return windowBucket; } @Override public String toString() { return "WindowBucketWrap{" + "beginTime=" + beginTime + ", windowBucket=" + windowBucket + '}'; }}
窗口:
/** * @author * @data 2020-12-17 17:14 * @description */public class EventTypeWrap<E extends Enum> { private E[] eventTypes; public EventTypeWrap(Class<E> clazz) { eventTypes = clazz.getEnumConstants(); Assert.notEmpty(eventTypes, "事件类型列表为空!"); } public E[] getEventTypes() { return eventTypes; } public EventTypeWrap<E> setEventTypes(E[] eventTypes) { this.eventTypes = eventTypes; return this; }}/** * @author * @data 2020-12-24 11:16 * @description */public enum SlidingWindowType { /** * 精准 */ ACCURATE, /** * 性能,最终统一 */ EFFICIENCY}/** * @author * @data 2020-12-18 15:36 * @description */public abstract class SlidingWindow { protected ReentrantLock updateLock; protected Class<? extends Enum> eventTypeClass; protected SlidingWindowType windowType = SlidingWindowType.EFFICIENCY; protected EventTypeWrap<? extends Enum> eventTypeWrap; protected int bucketCount; protected int intervalInMs; protected int bucketLengthInMs; protected WindowBucketWrap[] windowBucketWraps; protected SlidingWindow() { } /** * @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow * @author * @date 2020-12-18 16:58 */ protected SlidingWindow init() { if (eventTypeClass == null || bucketCount == 0 || intervalInMs == 0) { throw new IllegalArgumentException("init SlidingWindow error!"); } updateLock = new ReentrantLock(); eventTypeWrap = new EventTypeWrap(eventTypeClass); long currentTime = TimeServer.getIntervalMilliTime(); windowBucketWraps = new WindowBucketWrap[bucketCount]; IntStream.range(0, bucketCount).forEach(i -> { windowBucketWraps[i] = new WindowBucketWrap(currentTime, buildWindowBucket()); }); bucketLengthInMs = intervalInMs / bucketCount; return this; } /** * @return com.xxxx.xxxx.common.slidingwindow.WindowBucket * @author * @date 2020-12-24 11:22 */ protected abstract WindowBucket buildWindowBucket(); /** * @param eventType * @author * @date 2020-12-21 17:33 */ public void add(Enum eventType) { WindowBucket windowBucket = getCurrentBucket().getWindowBucket(); windowBucket.add(eventType); } /** * @param eventType * @param value * @author * @date 2020-12-24 17:16 */ public void add(Enum eventType, int value) { WindowBucket windowBucket = getCurrentBucket().getWindowBucket(); windowBucket.add(eventType, value); } /** * @param eventType * @return long * @author * @date 2020-12-24 17:13 */ public abstract long addAndSum(Enum eventType); /** * 相似LongAdder,晋升性能,然而局部失落精度 * * @param eventType * @return long * @author * @date 2020-12-21 17:42 */ public abstract long sum(Enum eventType); /** * @return com.xxxx.xxxx.common.slidingwindow.WindowBucketWrap * @author * @date 2020-12-18 16:56 */ public WindowBucketWrap getCurrentBucket() { long currentTime = TimeServer.getIntervalMilliTime(); int bucketId = calculateCurrentBucketId(currentTime); long windowStarTime = calculateWindowStartTime(currentTime); while (true) { WindowBucketWrap windowBucketWrap = windowBucketWraps[bucketId]; long windowBucketBeginTime = windowBucketWrap.getBeginTime(); if (windowBucketBeginTime == windowStarTime) { return windowBucketWrap; } // 新的一轮 if (windowStarTime > windowBucketBeginTime) { resetWindow(windowBucketWrap, windowStarTime); } else { //因为用了nanoTime,不应该呈现"时钟回拨"问题,所以间接抛异样 throw new DateTimeException( String.format("windowBucketBeginTime: %d, windowStarTime: %d, 以后时钟异样,请查看是否呈现相似'时钟回拨'的问题!", windowBucketBeginTime, windowStarTime)); } } } /** * @param currentTime * @return int * @author * @date 2020-12-18 17:09 */ protected int calculateCurrentBucketId(long currentTime) { long timeId = currentTime / bucketLengthInMs; return (int) (timeId % windowBucketWraps.length); } /** * @param currentTime * @return long * @author * @date 2020-12-18 17:19 */ protected long calculateWindowStartTime(long currentTime) { return currentTime - currentTime % bucketLengthInMs; } /** * @param windowBucketWrap * @param windowStarTime * @author * @date 2020-12-21 10:30 */ protected void resetWindow(WindowBucketWrap windowBucketWrap, long windowStarTime) { if (updateLock.tryLock()) { try { if (windowStarTime > windowBucketWrap.getBeginTime()) { windowBucketWrap.reset(windowStarTime); } } finally { updateLock.unlock(); } } else { // 如果持续循环,很可能其余线程还没操作完,先让出CPU给其余业务/线程 Thread.yield(); } } /** * @param currentTime * @param windowBucketWrap * @return boolean * @author * @date 2020-12-21 17:37 */ protected boolean isWindowDeprecated(long currentTime, WindowBucketWrap<WindowBucket> windowBucketWrap) { return currentTime - windowBucketWrap.getBeginTime() > intervalInMs; } public Class<? extends Enum> getEventTypeClass() { return eventTypeClass; } protected SlidingWindow setEventTypeClass(Class<? extends Enum> eventTypeClass) { this.eventTypeClass = eventTypeClass; return this; } /** * @return com.xxxx.xxxx.common.slidingwindow.EventTypeWrap<? extends java.lang.Enum> * @author * @date 2020-12-18 16:48 */ public EventTypeWrap<? extends Enum> getEventTypeWrap() { return eventTypeWrap; } /** * @return int * @author * @date 2020-12-18 16:56 */ public int getBucketCount() { return bucketCount; } /** * @param bucketCount * @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow * @author * @date 2020-12-18 16:56 */ protected SlidingWindow setBucketCount(int bucketCount) { this.bucketCount = bucketCount; return this; } /** * @return int * @author * @date 2020-12-18 16:56 */ public int getIntervalInMs() { return intervalInMs; } /** * @param intervalInMs * @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow * @author * @date 2020-12-18 16:56 */ protected SlidingWindow setIntervalInMs(int intervalInMs) { this.intervalInMs = intervalInMs; return this; } /** * @return com.xxxx.xxxx.common.slidingwindow.SlidingWindowType * @author * @date 2020-12-24 11:18 */ public SlidingWindowType getWindowType() { return windowType; } /** * @param windowType * @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow * @author * @date 2020-12-24 11:18 */ protected SlidingWindow setWindowType(SlidingWindowType windowType) { this.windowType = windowType; return this; } @Override public String toString() { return "SlidingWindow{" + "eventTypeClass=" + eventTypeClass.getName() + ", windowType=" + windowType + ", bucketCount=" + bucketCount + ", intervalInMs=" + intervalInMs + ", bucketLengthInMs=" + bucketLengthInMs + ", windowBucketWraps=\n" + StringUtils.join(windowBucketWraps, ",\n") + '}'; } /** * @author * @date 2020-12-18 16:48 */ public static class SlidingWindowBuilder { private SlidingWindow slidingWindow; public SlidingWindowBuilder() { this(SlidingWindowType.EFFICIENCY); } public SlidingWindowBuilder(SlidingWindowType windowType) { if (SlidingWindowType.ACCURATE.equals(windowType)) { slidingWindow = new AccurateSlidingWindow(); } else { slidingWindow = new EfficiencySlidingWindow(); } } /** * @param eventTypeClass * @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow.SlidingWindowBuilder * @author * @date 2020-12-18 16:48 */ public SlidingWindowBuilder ofEventTypeClass(Class<? extends Enum> eventTypeClass) { slidingWindow.setEventTypeClass(eventTypeClass); return this; } /** * @param bucketCount * @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow.SlidingWindowBuilder * @author * @date 2020-12-18 16:55 */ public SlidingWindowBuilder ofBucketCount(int bucketCount) { slidingWindow.setBucketCount(bucketCount); return this; } /** * @param windowIntervalInMs * @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow.SlidingWindowBuilder * @author * @date 2020-12-18 16:55 */ public SlidingWindowBuilder ofWindowIntervalInMs(int windowIntervalInMs) { slidingWindow.setIntervalInMs(windowIntervalInMs); return this; } /** * @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow * @author * @date 2020-12-18 16:48 */ public SlidingWindow build() { return slidingWindow.init(); } }}/** * @author * @data 2020-12-24 17:26 * @description */public class AccurateSlidingWindow extends SlidingWindow { @Override public WindowBucket buildWindowBucket() { int eventTypeSize = eventTypeWrap.getEventTypes().length; return new AccurateWindowBucket(updateLock, eventTypeSize); } @Override public long addAndSum(Enum eventType) { updateLock.lock(); try { WindowBucket windowBucket = getCurrentBucket().getWindowBucket(); windowBucket.add(eventType); return sum(eventType); } finally { updateLock.unlock(); } } @Override public long sum(Enum eventType) { updateLock.lock(); try { long value = 0; long currentTime = TimeServer.getIntervalMilliTime(); for (WindowBucketWrap windowBucketWrap : windowBucketWraps) { if (!isWindowDeprecated(currentTime, windowBucketWrap)) { value += windowBucketWrap.getWindowBucket().getValue(eventType); } } return value; } finally { updateLock.unlock(); } }}/** * @author * @data 2020-12-24 17:26 * @description */public class EfficiencySlidingWindow extends SlidingWindow { @Override public WindowBucket buildWindowBucket() { int eventTypeSize = eventTypeWrap.getEventTypes().length; return new EfficiencyWindowBucket(eventTypeSize); } @Override public long addAndSum(Enum eventType) { add(eventType); return sum(eventType); } @Override public long sum(Enum eventType) { long value = 0; long currentTime = TimeServer.getIntervalMilliTime(); for (WindowBucketWrap windowBucketWrap : windowBucketWraps) { if (!isWindowDeprecated(currentTime, windowBucketWrap)) { value += windowBucketWrap.getWindowBucket().getValue(eventType); } } return value; }}
工夫服务器:
/** * @author * @data 2020-12-21 11:07 * @description */public class TimeServer { /** * 一毫秒有多少纳秒. */ public static final long NANOS_PER_MILLI = 1000_000L; private static final long beginNanoTime; private static volatile long beginMilliTime; private static volatile long currentNanoTime; static { currentNanoTime = System.nanoTime(); beginNanoTime = currentNanoTime; beginMilliTime = System.currentTimeMillis(); Thread daemon = new Thread(new Runnable() { @Override public void run() { while (true) { currentNanoTime = System.nanoTime(); try { // 不须要十分高的精度,容许有点儿误差 TimeUnit.NANOSECONDS.sleep(NANOS_PER_MILLI / 2); } catch (Throwable ignore) { } } } }); daemon.setDaemon(true); daemon.setName("slidingwindow-time-tick-thread"); daemon.start(); } /** * @return long * @author * @date 2020-12-21 11:14 */ public static long getCurrentNanoTime() { return currentNanoTime; } /** * @return long * @author * @date 2020-12-21 12:59 */ public static long getCurrentMilliTime() { return beginMilliTime + getIntervalMilliTime(); } /** * @return long * @author * @date 2020-12-21 11:16 */ public static long getIntervalNanoTime() { return getCurrentNanoTime() - beginNanoTime; } /** * @return long * @author * @date 2020-12-21 11:20 */ public static long getIntervalMilliTime() { return getIntervalNanoTime() / NANOS_PER_MILLI; }}
应用
原先的日志次数限流类TimesLimiter
引入滑动窗口:
/** * * @author * @data 2020-06-02 10:51 * @description */public class TimesLimiter implements Limiter { private static final int SLIDINGWINDOW_BUCKETCOUNT = 10; private int timesLimit; private SlidingWindow slidingWindow; public TimesLimiter(int timesLimitSeconds, int timesLimit) { this.timesLimit = timesLimit; slidingWindow = new SlidingWindow.SlidingWindowBuilder(SlidingWindowType.ACCURATE) .ofEventTypeClass(StandardLevel.class) .ofBucketCount(SLIDINGWINDOW_BUCKETCOUNT) .ofWindowIntervalInMs(timesLimitSeconds * 1000) .build(); } @Override public boolean isLimited(String content) { if (slidingWindow.addAndSum(StandardLevel.ERROR) > timesLimit) { slidingWindow.add(StandardLevel.ERROR, -1); return true; } return false; }}