乐趣区

关于kafka:使用arthas在线调试测试环境

有个开发需要是预警日志每天只发送一次,当初遇到个 bug,测试环境没有发送数据,我在本地启动后有新的预警日志产生,kafka 没看到谬误日志,而代码里没有其余更多的逻辑,所以我猜想就是每天只发送一次的这个逻辑产生了谬误

因为我后面日志都是用的流计算 kafka stream 统计 ip 频繁查问次数,key 为 ip,value 为次数,为此这里我定义了一个 static 的 map 来判断以后 key 是否有从新发送

private static Map<String, Integer> ipMap = new ConcurrentHashMap<>();
// 上面是流解决的伪代码
stream
.groupByKey(ip)
.windowedBy()
.count().
.toStream()
.foreach((key,value)->{if(value.intValue() >= 20 ) {// 这里是超过 20 次才预警
        if(ipMap.get(key.key()) == null || value.intValue() < ipMap.get(key.key())) { // 这里实现一天最多发一条
            ipMap.put(key.key(), value.intValue());
            //... 业务逻辑
        }
    }
})

代码的具体逻辑就是我首先判断以后次数是否超过 20 次了,才预警,而后上面的 ipMap 逻辑就是为了解决一天只发送一条的实现,我会把 value 次数存起来,因为我这个工夫窗口范畴是一天,所以其实每个 key 的 value 预计的状况是

每天同一个 key 的 value 都是从 0 开始一直增长的,然而到第二天的时候,value 又会从 0 开始累加,我按照这个特点实现了上述代码,当当初的 value 小于上一次的 value 时,阐明是新的一天了,这时候就能够执行业务逻辑发送预警

(为什么不必获取以后工夫来判断呢?思考到可能日志是昨天发的,然而因为执行或者其余导致提早,聚合事件的时候曾经是第二天了,所以是不牢靠的)

然而应该是有问题的,本地我新启动的话又没问题,因为我的 static 的 ipMap 是空的了,所以我借助 Arthas 来在线查看测试环境的内容

我间接下载好后关上 arthas,执行 java -jar arthas-boot.jar,这里我还遇到了一个问题,如果启动说 3658 被占用,是因为上一次抉择过程进行连贯没有失常退出,arthas 会保留上一次监听过程,导致本次抉择新过程进行连贯时,与监听中记录的过程 id 不同,后果呈现谬误,持续抉择上一个过程进行连贯,执行胜利后执行 stop 命令完结连贯

而后我通过 getstatic 命令 查看到这个 ipMap 字段的内容,发现这个 map 的以后 ip 的 key 的 value 内容为 20,所以下面代码有两种状况不会执行预警逻辑

一是流计算的 value.intValue() >= ipMap.get(key.key()),这种状况有没有可能产生呢,是有可能的,因为这个 value.intValue() 是 kafka 进行 key(ip)聚合后的总数,然而这个办法并不是每次进来一条数据就会触发一次,而是肯定的策略才产生触发(或者因为各种起因导致音讯沉积),有可能 kafka stream 一下子解决了超过 20 条的记录而后再聚合发送到上游时.foreach((key,value) -> {}) 这里的 value 的值就会呈现比上一次 ipMap 里的 value 值要大,就不会执行预警逻辑,

二是明天的次数刚好是 20,昨天的次数也刚好是 20,因为我写的 value.intValue() < ipMap.get(key.key()) 所以这种状况也不会执行预警

为了查看流返回的 value 值,咱们持续应用 arthas 进行查看,首先咱们须要找到该办法:因为 kafka stream 的逻辑都是用 Lambda 表达式来写的,实际上是一个匿名办法,所以我首先在本地通过 javap -p 类.class 来查看这个办法的匿名办法名是什么(我这里看了一下是 lambda$null$8,匿名办法的程序如同和其余办法的程序正好相同,是写在越后面的编译后的地位越在前面),而后我通过 arthas 的 tt -t class method 查看进入办法的入参值,后果也如我所料,这个 value 的值是 57,大于 20

所以我这个代码是有问题的,解决方案最初还是用工夫来进行判断的,不过不是取以后零碎工夫来判断,而是通过聚合后的.foreach((key,value)->{}) 中的 key,不仅蕴含了自定义的 ip,其中的 key.window() 会蕴含是属于哪个窗口的数据,这样的话我还是用一个 map 存起来,而后存的是这个窗口的工夫,应用这个窗口的工夫进行判断是不是同一个就晓得是不是明天了,是的话阐明曾经发送过了

这里的 map 不会进行回收,前面我会换成可过期 key 的缓存即可,过期工夫设置成 1 天就好了

退出移动版