有个开发需要是预警日志每天只发送一次,当初遇到个bug,测试环境没有发送数据,我在本地启动后有新的预警日志产生,kafka没看到谬误日志,而代码里没有其余更多的逻辑,所以我猜想就是每天只发送一次的这个逻辑产生了谬误

因为我后面日志都是用的流计算kafka stream统计ip频繁查问次数,key为ip,value为次数,为此这里我定义了一个static的map来判断以后key是否有从新发送

private static Map<String, Integer> ipMap = new ConcurrentHashMap<>();//上面是流解决的伪代码stream.groupByKey(ip).windowedBy().count()..toStream().foreach((key,value)->{    if(value.intValue() >= 20 ) {//这里是超过20次才预警        if(ipMap.get(key.key()) == null || value.intValue() < ipMap.get(key.key())) { //这里实现一天最多发一条            ipMap.put(key.key(), value.intValue());            //...业务逻辑        }    }})

代码的具体逻辑就是我首先判断以后次数是否超过20次了,才预警,而后上面的ipMap逻辑就是为了解决一天只发送一条的实现,我会把value次数存起来,因为我这个工夫窗口范畴是一天,所以其实每个key的value预计的状况是

每天同一个key的value都是从0开始一直增长的,然而到第二天的时候,value又会从0开始累加,我按照这个特点实现了上述代码,当当初的value小于上一次的value时,阐明是新的一天了,这时候就能够执行业务逻辑发送预警

(为什么不必获取以后工夫来判断呢?思考到可能日志是昨天发的,然而因为执行或者其余导致提早,聚合事件的时候曾经是第二天了,所以是不牢靠的)

然而应该是有问题的,本地我新启动的话又没问题,因为我的static的ipMap是空的了,所以我借助Arthas来在线查看测试环境的内容

我间接下载好后关上arthas,执行java -jar arthas-boot.jar,这里我还遇到了一个问题,如果启动说3658被占用,是因为上一次抉择过程进行连贯没有失常退出,arthas会保留上一次监听过程,导致本次抉择新过程进行连贯时,与监听中记录的过程id不同,后果呈现谬误,持续抉择上一个过程进行连贯,执行胜利后执行 stop 命令完结连贯

而后我通过getstatic命令 查看到这个ipMap字段的内容,发现这个map的以后ip的key的value内容为20,所以下面代码有两种状况不会执行预警逻辑

一是流计算的value.intValue() >= ipMap.get(key.key()),这种状况有没有可能产生呢,是有可能的,因为这个value.intValue()是kafka进行key(ip)聚合后的总数,然而这个办法并不是每次进来一条数据就会触发一次,而是肯定的策略才产生触发(或者因为各种起因导致音讯沉积),有可能kafka stream一下子解决了超过20条的记录而后再聚合发送到上游时.foreach((key,value) -> {})这里的value的值就会呈现比上一次ipMap里的value值要大,就不会执行预警逻辑,

二是明天的次数刚好是20,昨天的次数也刚好是20,因为我写的value.intValue() < ipMap.get(key.key())所以这种状况也不会执行预警

为了查看流返回的value值,咱们持续应用arthas进行查看,首先咱们须要找到该办法:因为kafka stream的逻辑都是用Lambda表达式来写的,实际上是一个匿名办法,所以我首先在本地通过javap -p 类.class来查看这个办法的匿名办法名是什么(我这里看了一下是lambda$null$8,匿名办法的程序如同和其余办法的程序正好相同,是写在越后面的编译后的地位越在前面),而后我通过arthas的tt -t class method查看进入办法的入参值,后果也如我所料,这个value的值是57,大于20

所以我这个代码是有问题的,解决方案最初还是用工夫来进行判断的,不过不是取以后零碎工夫来判断,而是通过聚合后的.foreach((key,value)->{})中的key,不仅蕴含了自定义的ip,其中的key.window()会蕴含是属于哪个窗口的数据,这样的话我还是用一个map存起来,而后存的是这个窗口的工夫,应用这个窗口的工夫进行判断是不是同一个就晓得是不是明天了,是的话阐明曾经发送过了

这里的map不会进行回收,前面我会换成可过期key的缓存即可,过期工夫设置成1天就好了