最近零碎须要做一个平安审计的日志平台,对所有接入的零碎进行日志的统计分析,把频繁查问、操作类型的日志进行监控预警,因为之前用的是kafka来实现各业务系统日志接入对立日志平台的,所以想到了间接应用kafka官网自身提供的一个实时计算框架kafka stream。
kafka stream的工夫窗口有两个重要的属性:窗口大小和步长(挪动距离),滚动窗口Tumbling Time Window:步长等于窗口大小,滚动窗口是没有记录的重叠;跳跃窗口Hopping Time Window:步长不等于窗口大小。
咱们的需要是要求预警每天从0点到24点时间段内产生操作或查问次数过多的记录,之前我用的是滚动窗口,窗口大小为一天,不过我看了kafka的默认实现,窗口设置在.windowedBy(TimeWindows.of(Duration.ofDays(1))),TimeWindows对象外面次要的办法就是public Map<Long, TimeWindow> windowsFor(final long timestamp) {},依据记录的工夫戳来判断是属于哪个窗口,默认代码为
@Overridepublic Map<Long, TimeWindow> windowsFor(final long timestamp) { long windowStart = (Math.max(0, timestamp - sizeMs + advanceMs) / advanceMs) * advanceMs; final Map<Long, TimeWindow> windows = new LinkedHashMap<>(); while (windowStart <= timestamp) { final TimeWindow window = new TimeWindow(windowStart, windowStart + sizeMs); windows.put(windowStart, window); windowStart += advanceMs; } return windows;}
该实现的窗口时间段是从8点到第二天的8点为一天,而不是需要要求的0点到24点,于是我从新实现了一个类OffsetTimeWindows
@Overridepublic Map<Long, TimeWindow> windowsFor(final long timestamp) { long windowStart = timestamp - (timestamp + offset) % sizeMs; //获得以后工夫戳那天0点的工夫戳 final Map<Long, TimeWindow> windows = new LinkedHashMap<>(); while (windowStart <= timestamp) { final TimeWindow window = new TimeWindow(windowStart, windowStart + sizeMs); windows.put(windowStart, window); windowStart += advanceMs; } return windows;}
其中减少了一个offset的参数,能够在初始化这个类的时候进行赋值,以达到自定义任意时间段的成果,我是须要0点,所以该offset我设置为28800000,通过测试,可能完满实现该成果。
在实现该需要的过程中,我发现flink的客户端有间接提供设置偏移量的窗口类TumblingEventTimeWindows,而kafka自身是没有实现的,目前看起来flink是性能更齐备一些的。