背景
在之前《基于日志告警》的实现中,能够配置 每小时最多发送 X 条告警
,然而实现的比拟粗犷:
- 强制为
每小时
。 - 是以
天然小时
为时间段(从整点 0 分到 60 分),就容易呈现相似59 分发送 X 条,下一个小时的 1 分又发送 X 条
的状况,也就是在59- 1 分
这短短几分钟之内发送X * 2
条告警。
这里更好的计划应该是:能够自定义配置 在 X 时间段内最多发送 Y 条告警
, 时间段
以滑动窗口
来计数。
关键点:
数据结构
固定长度的数据为一个 窗口
,数组的每个对象就是一个 桶
,通过下标指针来标识以后窗口。
比方以下例子是时长为 60 秒的窗口,分为 10 个桶,即每个桶时长 6 秒。
public class SlidingWindow {
protected int bucketCount;
protected int intervalInMs;
protected WindowBucket[] windowBuckets;}
滑动形式
- 被动:启动定时工作,定时滑动下标指针。
- 被动:数据进来时,实时依据工夫计算以后下标。
以上 被动
形式的计划的毛病:
- 须要额定加个定时工作。
- 定时工作自身可能会影响到下标滑动的精准度,特地是如果窗口时长很短,比方:时长 1 秒,并且分成 100 个桶。
新 / 旧数据
以上 被动
的形式,在统计汇总时,怎么过滤掉 已过期的的数据
(数据是上一个窗口的数据)?
仍旧以时长为 60 秒的窗口,分为 10 个桶为例子:
- 第
5
秒的时候,插入数据,这时候插入的桶下标为1
。 - 第
61-66
秒都没有数据,也就是新的一轮窗口,下标为1
的桶并没有批改
。 - 第
70
秒的时候,插入数据,插入的桶下标为2
。这时候如果合并数组的数据,会把下标1
的数据都合并进来,但它的数据是上个窗口
的数据,不应该被算进来。
解决方案是给每个桶加一个 开始工夫
。
例如:刚开始第一个桶的起始工夫是 1s
,第二个桶的起始工夫是7s
。通过一轮(60s)后,第一个桶的起始工夫应该是61s
,第二个桶的起始工夫变为67s
。
在插入数据时,也通过 开始工夫
判断以后桶的数据是不是上个窗口的:
- 如果不是:以后桶数据上累加。
- 如果是:则须要将桶数据重置(计数改为 0,从新设置以后桶的起始工夫),通过锁来防止出现并发问题。
合并的时候,查看每个桶的 开始工夫
间隔以后工夫是否在 一个窗口内
。
比方以上例子,在第 70
秒的插入数据到第二个桶,且第二个桶的起始工夫被改为 67s
。这时候合并统计以后窗口数据时,发现下标1
的起始工夫是1s
,不在以后窗口工夫内(67-0 > 60
),就不算入合并值。
public class WindowBucketWrap<T extends WindowBucket> {
private volatile long beginTime;
private T windowBucket;
}
同时为了防止出现 时钟回拨
的问题,取工夫用的是 nanoTime
。
因为 JAVA 的 System.nanoTime()
存在性能上的问题,这里自定义了 工夫服务
。
/**
* @author
* @data 2020-12-21 11:07
* @description
*/
public class TimeServer {
/**
* 一毫秒有多少纳秒.
*/
public static final long NANOS_PER_MILLI = 1000_000L;
private static final long beginNanoTime;
private static volatile long beginMilliTime;
private static volatile long currentNanoTime;
static {currentNanoTime = System.nanoTime();
beginNanoTime = currentNanoTime;
beginMilliTime = System.currentTimeMillis();
Thread daemon = new Thread(new Runnable() {
@Override
public void run() {while (true) {currentNanoTime = System.nanoTime();
try {TimeUnit.NANOSECONDS.sleep(NANOS_PER_MILLI / 2);
} catch (Throwable ignore) {}}
}
});
daemon.setDaemon(true);
daemon.setName("slidingwindow-time-tick-thread");
daemon.start();}
数据字段
不同的业务场景,窗口保留的数据是不一样的。比方我的项目背景的 日志告警
能够只有 已告警次数
一种数据,限流
能够有 已通过
和被限流
两种数据;既然是通用的,那就要兼容各种场景,不同的业务能够定义不同的统计数据。
这里能够将桶的数据字段设为数组类型,长度和列表通过 数据枚举
来指定。
public class EventTypeWrap<E extends Enum> {private E[] eventTypes;
public EventTypeWrap(Class<E> clazz) {eventTypes = clazz.getEnumConstants();
Assert.notEmpty(eventTypes, "事件类型列表为空!");
}
public E[] getEventTypes() {return eventTypes;}
}
public class SlidingWindow {public WindowBucket buildWindowBucket() {int eventTypeSize = eventTypeWrap.getEventTypes().length;
return new WindowBucket(eventTypeSize);
}
}
public class WindowBucket {private long[] eventValues;
public WindowBucket(int eventSize) {eventValues = new long[eventSize];
}
}
数据计算时,传入指定的类型。
public void add(Enum eventType) {……}
性能和一致性
因为数据扩散到多个桶内,合并值的时候,须要将所有桶的数据加起来,而在合并的过程中,桶内的数据是会扭转的,这里的实现分两种:
- 强一致性:加锁。
- 最终一致性:不加锁。
public class AccurateWindowBucket extends WindowBucket {private long[] eventValues;
private ReentrantLock updateLock;
/**
* @param eventSize
* @author
* @date 2020-12-21 15:35
*/
public AccurateWindowBucket(ReentrantLock updateLock, int eventSize) {super(eventSize);
this.updateLock = updateLock;
eventValues = new long[eventSize];
}
@Override
public void add(Enum eventType, int value) {updateLock.lock();
try {int index = eventType.ordinal();
eventValues[index] = eventValues[index] + value;
} finally {updateLock.unlock();
}
}
}
public class EfficiencyWindowBucket extends WindowBucket {private LongAdder[] eventValues;
/**
* @param eventSize
* @author
* @date 2020-12-21 15:35
*/
public EfficiencyWindowBucket(int eventSize) {super(eventSize);
eventValues = new LongAdder[eventSize];
IntStream.range(0, eventSize).forEach(i -> {eventValues[i] = new LongAdder();});
}
@Override
public void add(Enum eventType, int value) {eventValues[eventType.ordinal()].add(value);
}
}
以上两种计划,对 性能
有极致
要求的,用 最终一致性
,对 精度
有要求的,用 强一致性
。
源码
窗口桶:
public abstract class WindowBucket {
protected int eventSize;
/**
* @param eventSize
* @author
* @date 2020-12-21 15:35
*/
public WindowBucket(int eventSize) {this.eventSize = eventSize;}
/**
* @param eventType
* @return long
* @author
* @date 2020-12-18 11:22
*/
public void add(Enum eventType) {add(eventType, 1);
}
/**
* @param eventType
* @param value
* @return long
* @author
* @date 2020-12-18 11:22
*/
public abstract void add(Enum eventType, int value);
/**
* @param eventType
* @return long
* @author
* @date 2020-12-24 17:13
*/
public abstract long addAndGet(Enum eventType);
/**
* @param eventType
* @return long
* @author
* @date 2020-12-21 15:36
*/
public abstract long getValue(Enum eventType);
/**
* @author
* @date 2020-12-21 17:28
*/
public abstract void reset();
/**
* @return long[]
* @author
* @date 2020-12-21 17:29
*/
public abstract long[] getValues();
@Override
public String toString() {return this.getClass().getSimpleName() + "{" +
"eventValues=" + Arrays.toString(getValues()) +
'}';
}
}
/**
* 性能优先,最终一致性
* @author minchin
* @date 2020-12-24 11:02
*/
public class EfficiencyWindowBucket extends WindowBucket {private LongAdder[] eventValues;
/**
* @param eventSize
* @author minchin
* @date 2020-12-21 15:35
*/
public EfficiencyWindowBucket(int eventSize) {super(eventSize);
eventValues = new LongAdder[eventSize];
IntStream.range(0, eventSize).forEach(i -> {eventValues[i] = new LongAdder();});
}
@Override
public void add(Enum eventType, int value) {eventValues[eventType.ordinal()].add(value);
}
@Override
public long addAndGet(Enum eventType) {add(eventType);
return getValue(eventType);
}
@Override
public long getValue(Enum eventType) {return eventValues[eventType.ordinal()].longValue();}
@Override
public void reset() {for (LongAdder value : eventValues) {value.reset();
}
}
@Override
public long[] getValues() {long[] values = new long[eventSize];
IntStream.range(0, eventSize).forEach(i -> {values[i] = eventValues[i].longValue();});
return values;
}
}
/**
* 强一致性
* @author
* @data 2020-12-24 10:35
* @description
*/
public class AccurateWindowBucket extends WindowBucket {private long[] eventValues;
private ReentrantLock updateLock;
/**
* @param eventSize
* @author minchin
* @date 2020-12-21 15:35
*/
public AccurateWindowBucket(ReentrantLock updateLock, int eventSize) {super(eventSize);
this.updateLock = updateLock;
eventValues = new long[eventSize];
}
@Override
public void add(Enum eventType, int value) {updateLock.lock();
try {int index = eventType.ordinal();
eventValues[index] = eventValues[index] + value;
} finally {updateLock.unlock();
}
}
@Override
public long addAndGet(Enum eventType) {updateLock.lock();
try {add(eventType);
return getValue(eventType);
} finally {updateLock.unlock();
}
}
@Override
public long getValue(Enum eventType) {updateLock.lock();
try {return eventValues[eventType.ordinal()];
} finally {updateLock.unlock();
}
}
@Override
public void reset() {updateLock.lock();
try {IntStream.range(0, eventSize).forEach(i -> eventValues[i] = 0L);
} finally {updateLock.unlock();
}
}
@Override
public long[] getValues() {updateLock.lock();
try {return Arrays.copyOf(eventValues, eventValues.length);
} finally {updateLock.unlock();
}
}
}
/**
* @author
* @data 2020-12-18 11:17
* @description
*/
public class WindowBucketWrap<T extends WindowBucket> {
private volatile long beginTime;
private T windowBucket;
public WindowBucketWrap(long beginTime, T windowBucket) {
this.beginTime = beginTime;
this.windowBucket = windowBucket;
}
/**
* @param windowStarTime
* @author minchin
* @date 2020-12-21 10:14
*/
public void reset(long windowStarTime) {windowBucket.reset();
beginTime = windowStarTime;
}
public long getBeginTime() {return beginTime;}
public WindowBucketWrap<T> setBeginTime(long beginTime) {
this.beginTime = beginTime;
return this;
}
public T getWindowBucket() {return windowBucket;}
@Override
public String toString() {
return "WindowBucketWrap{" +
"beginTime=" + beginTime +
", windowBucket=" + windowBucket +
'}';
}
}
窗口:
/**
* @author
* @data 2020-12-17 17:14
* @description
*/
public class EventTypeWrap<E extends Enum> {private E[] eventTypes;
public EventTypeWrap(Class<E> clazz) {eventTypes = clazz.getEnumConstants();
Assert.notEmpty(eventTypes, "事件类型列表为空!");
}
public E[] getEventTypes() {return eventTypes;}
public EventTypeWrap<E> setEventTypes(E[] eventTypes) {
this.eventTypes = eventTypes;
return this;
}
}
/**
* @author
* @data 2020-12-24 11:16
* @description
*/
public enum SlidingWindowType {
/**
* 精准
*/
ACCURATE,
/**
* 性能,最终统一
*/
EFFICIENCY
}
/**
* @author
* @data 2020-12-18 15:36
* @description
*/
public abstract class SlidingWindow {
protected ReentrantLock updateLock;
protected Class<? extends Enum> eventTypeClass;
protected SlidingWindowType windowType = SlidingWindowType.EFFICIENCY;
protected EventTypeWrap<? extends Enum> eventTypeWrap;
protected int bucketCount;
protected int intervalInMs;
protected int bucketLengthInMs;
protected WindowBucketWrap[] windowBucketWraps;
protected SlidingWindow() {}
/**
* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow
* @author
* @date 2020-12-18 16:58
*/
protected SlidingWindow init() {if (eventTypeClass == null || bucketCount == 0 || intervalInMs == 0) {throw new IllegalArgumentException("init SlidingWindow error!");
}
updateLock = new ReentrantLock();
eventTypeWrap = new EventTypeWrap(eventTypeClass);
long currentTime = TimeServer.getIntervalMilliTime();
windowBucketWraps = new WindowBucketWrap[bucketCount];
IntStream.range(0, bucketCount).forEach(i -> {windowBucketWraps[i] = new WindowBucketWrap(currentTime, buildWindowBucket());
});
bucketLengthInMs = intervalInMs / bucketCount;
return this;
}
/**
* @return com.xxxx.xxxx.common.slidingwindow.WindowBucket
* @author
* @date 2020-12-24 11:22
*/
protected abstract WindowBucket buildWindowBucket();
/**
* @param eventType
* @author
* @date 2020-12-21 17:33
*/
public void add(Enum eventType) {WindowBucket windowBucket = getCurrentBucket().getWindowBucket();
windowBucket.add(eventType);
}
/**
* @param eventType
* @param value
* @author
* @date 2020-12-24 17:16
*/
public void add(Enum eventType, int value) {WindowBucket windowBucket = getCurrentBucket().getWindowBucket();
windowBucket.add(eventType, value);
}
/**
* @param eventType
* @return long
* @author
* @date 2020-12-24 17:13
*/
public abstract long addAndSum(Enum eventType);
/**
* 相似 LongAdder,晋升性能,然而局部失落精度
*
* @param eventType
* @return long
* @author
* @date 2020-12-21 17:42
*/
public abstract long sum(Enum eventType);
/**
* @return com.xxxx.xxxx.common.slidingwindow.WindowBucketWrap
* @author
* @date 2020-12-18 16:56
*/
public WindowBucketWrap getCurrentBucket() {long currentTime = TimeServer.getIntervalMilliTime();
int bucketId = calculateCurrentBucketId(currentTime);
long windowStarTime = calculateWindowStartTime(currentTime);
while (true) {WindowBucketWrap windowBucketWrap = windowBucketWraps[bucketId];
long windowBucketBeginTime = windowBucketWrap.getBeginTime();
if (windowBucketBeginTime == windowStarTime) {return windowBucketWrap;}
// 新的一轮
if (windowStarTime > windowBucketBeginTime) {resetWindow(windowBucketWrap, windowStarTime);
} else {
// 因为用了 nanoTime,不应该呈现 "时钟回拨" 问题,所以间接抛异样
throw new DateTimeException(
String.format("windowBucketBeginTime: %d, windowStarTime: %d, 以后时钟异样,请查看是否呈现相似' 时钟回拨 '的问题!",
windowBucketBeginTime, windowStarTime));
}
}
}
/**
* @param currentTime
* @return int
* @author
* @date 2020-12-18 17:09
*/
protected int calculateCurrentBucketId(long currentTime) {
long timeId = currentTime / bucketLengthInMs;
return (int) (timeId % windowBucketWraps.length);
}
/**
* @param currentTime
* @return long
* @author
* @date 2020-12-18 17:19
*/
protected long calculateWindowStartTime(long currentTime) {return currentTime - currentTime % bucketLengthInMs;}
/**
* @param windowBucketWrap
* @param windowStarTime
* @author
* @date 2020-12-21 10:30
*/
protected void resetWindow(WindowBucketWrap windowBucketWrap, long windowStarTime) {if (updateLock.tryLock()) {
try {if (windowStarTime > windowBucketWrap.getBeginTime()) {windowBucketWrap.reset(windowStarTime);
}
} finally {updateLock.unlock();
}
} else {
// 如果持续循环,很可能其余线程还没操作完,先让出 CPU 给其余业务 / 线程
Thread.yield();}
}
/**
* @param currentTime
* @param windowBucketWrap
* @return boolean
* @author
* @date 2020-12-21 17:37
*/
protected boolean isWindowDeprecated(long currentTime, WindowBucketWrap<WindowBucket> windowBucketWrap) {return currentTime - windowBucketWrap.getBeginTime() > intervalInMs;
}
public Class<? extends Enum> getEventTypeClass() {return eventTypeClass;}
protected SlidingWindow setEventTypeClass(Class<? extends Enum> eventTypeClass) {
this.eventTypeClass = eventTypeClass;
return this;
}
/**
* @return com.xxxx.xxxx.common.slidingwindow.EventTypeWrap<? extends java.lang.Enum>
* @author
* @date 2020-12-18 16:48
*/
public EventTypeWrap<? extends Enum> getEventTypeWrap() {return eventTypeWrap;}
/**
* @return int
* @author
* @date 2020-12-18 16:56
*/
public int getBucketCount() {return bucketCount;}
/**
* @param bucketCount
* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow
* @author
* @date 2020-12-18 16:56
*/
protected SlidingWindow setBucketCount(int bucketCount) {
this.bucketCount = bucketCount;
return this;
}
/**
* @return int
* @author
* @date 2020-12-18 16:56
*/
public int getIntervalInMs() {return intervalInMs;}
/**
* @param intervalInMs
* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow
* @author
* @date 2020-12-18 16:56
*/
protected SlidingWindow setIntervalInMs(int intervalInMs) {
this.intervalInMs = intervalInMs;
return this;
}
/**
* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindowType
* @author
* @date 2020-12-24 11:18
*/
public SlidingWindowType getWindowType() {return windowType;}
/**
* @param windowType
* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow
* @author
* @date 2020-12-24 11:18
*/
protected SlidingWindow setWindowType(SlidingWindowType windowType) {
this.windowType = windowType;
return this;
}
@Override public String toString() {
return "SlidingWindow{" +
"eventTypeClass=" + eventTypeClass.getName() +
", windowType=" + windowType +
", bucketCount=" + bucketCount +
", intervalInMs=" + intervalInMs +
", bucketLengthInMs=" + bucketLengthInMs +
", windowBucketWraps=\n" + StringUtils.join(windowBucketWraps, ",\n") +
'}';
}
/**
* @author
* @date 2020-12-18 16:48
*/
public static class SlidingWindowBuilder {
private SlidingWindow slidingWindow;
public SlidingWindowBuilder() {this(SlidingWindowType.EFFICIENCY);
}
public SlidingWindowBuilder(SlidingWindowType windowType) {if (SlidingWindowType.ACCURATE.equals(windowType)) {slidingWindow = new AccurateSlidingWindow();
} else {slidingWindow = new EfficiencySlidingWindow();
}
}
/**
* @param eventTypeClass
* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow.SlidingWindowBuilder
* @author
* @date 2020-12-18 16:48
*/
public SlidingWindowBuilder ofEventTypeClass(Class<? extends Enum> eventTypeClass) {slidingWindow.setEventTypeClass(eventTypeClass);
return this;
}
/**
* @param bucketCount
* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow.SlidingWindowBuilder
* @author
* @date 2020-12-18 16:55
*/
public SlidingWindowBuilder ofBucketCount(int bucketCount) {slidingWindow.setBucketCount(bucketCount);
return this;
}
/**
* @param windowIntervalInMs
* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow.SlidingWindowBuilder
* @author
* @date 2020-12-18 16:55
*/
public SlidingWindowBuilder ofWindowIntervalInMs(int windowIntervalInMs) {slidingWindow.setIntervalInMs(windowIntervalInMs);
return this;
}
/**
* @return com.xxxx.xxxx.common.slidingwindow.SlidingWindow
* @author
* @date 2020-12-18 16:48
*/
public SlidingWindow build() {return slidingWindow.init();
}
}
}
/**
* @author
* @data 2020-12-24 17:26
* @description
*/
public class AccurateSlidingWindow extends SlidingWindow {
@Override
public WindowBucket buildWindowBucket() {int eventTypeSize = eventTypeWrap.getEventTypes().length;
return new AccurateWindowBucket(updateLock, eventTypeSize);
}
@Override
public long addAndSum(Enum eventType) {updateLock.lock();
try {WindowBucket windowBucket = getCurrentBucket().getWindowBucket();
windowBucket.add(eventType);
return sum(eventType);
} finally {updateLock.unlock();
}
}
@Override
public long sum(Enum eventType) {updateLock.lock();
try {
long value = 0;
long currentTime = TimeServer.getIntervalMilliTime();
for (WindowBucketWrap windowBucketWrap : windowBucketWraps) {if (!isWindowDeprecated(currentTime, windowBucketWrap)) {value += windowBucketWrap.getWindowBucket().getValue(eventType);
}
}
return value;
} finally {updateLock.unlock();
}
}
}
/**
* @author
* @data 2020-12-24 17:26
* @description
*/
public class EfficiencySlidingWindow extends SlidingWindow {
@Override
public WindowBucket buildWindowBucket() {int eventTypeSize = eventTypeWrap.getEventTypes().length;
return new EfficiencyWindowBucket(eventTypeSize);
}
@Override
public long addAndSum(Enum eventType) {add(eventType);
return sum(eventType);
}
@Override
public long sum(Enum eventType) {
long value = 0;
long currentTime = TimeServer.getIntervalMilliTime();
for (WindowBucketWrap windowBucketWrap : windowBucketWraps) {if (!isWindowDeprecated(currentTime, windowBucketWrap)) {value += windowBucketWrap.getWindowBucket().getValue(eventType);
}
}
return value;
}
}
工夫服务器:
/**
* @author
* @data 2020-12-21 11:07
* @description
*/
public class TimeServer {
/**
* 一毫秒有多少纳秒.
*/
public static final long NANOS_PER_MILLI = 1000_000L;
private static final long beginNanoTime;
private static volatile long beginMilliTime;
private static volatile long currentNanoTime;
static {currentNanoTime = System.nanoTime();
beginNanoTime = currentNanoTime;
beginMilliTime = System.currentTimeMillis();
Thread daemon = new Thread(new Runnable() {
@Override
public void run() {while (true) {currentNanoTime = System.nanoTime();
try {
// 不须要十分高的精度,容许有点儿误差
TimeUnit.NANOSECONDS.sleep(NANOS_PER_MILLI / 2);
} catch (Throwable ignore) {}}
}
});
daemon.setDaemon(true);
daemon.setName("slidingwindow-time-tick-thread");
daemon.start();}
/**
* @return long
* @author
* @date 2020-12-21 11:14
*/
public static long getCurrentNanoTime() {return currentNanoTime;}
/**
* @return long
* @author
* @date 2020-12-21 12:59
*/
public static long getCurrentMilliTime() {return beginMilliTime + getIntervalMilliTime();
}
/**
* @return long
* @author
* @date 2020-12-21 11:16
*/
public static long getIntervalNanoTime() {return getCurrentNanoTime() - beginNanoTime;
}
/**
* @return long
* @author
* @date 2020-12-21 11:20
*/
public static long getIntervalMilliTime() {return getIntervalNanoTime() / NANOS_PER_MILLI;
}
}
应用
原先的日志次数限流类 TimesLimiter
引入滑动窗口:
/**
*
* @author
* @data 2020-06-02 10:51
* @description
*/
public class TimesLimiter implements Limiter {
private static final int SLIDINGWINDOW_BUCKETCOUNT = 10;
private int timesLimit;
private SlidingWindow slidingWindow;
public TimesLimiter(int timesLimitSeconds, int timesLimit) {
this.timesLimit = timesLimit;
slidingWindow = new SlidingWindow.SlidingWindowBuilder(SlidingWindowType.ACCURATE)
.ofEventTypeClass(StandardLevel.class)
.ofBucketCount(SLIDINGWINDOW_BUCKETCOUNT)
.ofWindowIntervalInMs(timesLimitSeconds * 1000)
.build();}
@Override
public boolean isLimited(String content) {if (slidingWindow.addAndSum(StandardLevel.ERROR) > timesLimit) {slidingWindow.add(StandardLevel.ERROR, -1);
return true;
}
return false;
}
}