关于pulsar:Pulsar-也会重复消费

42次阅读

共计 1522 个字符,预计需要花费 4 分钟才能阅读完成。

背景

许久没有分享 Java 相干的问题排查了,最近帮共事一起排查了一个问题:

在应用 Pulsar 生产时,产生了同一条音讯重复生产的状况。

排查

当他通知我这个景象的时候我就持狐疑态度,依据之前应用的教训 Pulsar 在官网文档以及 API 中都解释过:



只有当设置了生产的 ackTimeout 并超时生产时才会反复投递音讯,默认状况下是敞开的,查看代码也的确没有开启。

那会不会是调用了 negativeAcknowledge() 办法呢(调用该办法也会触发从新投递),因为咱们使了一个第三方库 https://github.com/majusko/pulsar-java-spring-boot-starter 只有当抛出异样时才会调用该办法。

查阅代码之后也没有中央抛出异样,甚至整个过程中都没看到异样产生;这就有点诡异了。

复现

为了捋分明整个事件的前因后果,具体理解了他的应用流程;

其实也就是业务呈现了 bug,他在音讯生产时 debug 而后进行单步调试,当走完一次调试后,没多久马上又收到了同样的音讯。

但奇怪的是也不是每次 debug 后都能反复生产,咱们都说如果一个 bug 能 100% 齐全复现,那基本上就解决一大半了。

所以咱们排查的第一步就是齐全复现这个问题。


为了排除掉是 IDEA 的问题(尽管极大概率不太可能)既然是 debug 的时候产生的问题,那其实转换到代码也就是 sleep 嘛,所以咱们打算在生产逻辑里间接 sleep 一段时间看是否复现。

通过测试,sleep 几秒到几十秒都无奈复现,最初索性 sleep 一分钟,神奇的事件产生了,每次都胜利复现!

既然能胜利复现那就好说了,因为我本人的业务代码也有应用到 Pulsar 的中央,为了不便调试就筹备在本人的我的项目里再复现一次。

后果诡异的事件再次发生,我这里又不能复现了。

尽管这才是合乎预期的,但这就没法调了呀。

本着置信现代科学的前提,咱们俩惟一的区别就是我的项目不一样了,为此我比照了两边的代码。

    @PulsarConsumer(
            topic = xx,
            clazz = Xx.class,
            subscriptionType = SubscriptionType.Shared
    )
    public void consume(Data msg) {log.info("consume msg:{}", msg.getOrderId());
        Lock lock = redisLockRegistry.obtain(msg.getOrderId());
        if (lock.tryLock()) {
            try {orderService.do(msg.getOrderId());
            } catch (Exception e) {log.error("consumer msg:{} err:", msg.toString(), e);
            } finally {lock.unlock();
            }
        }

    }

后果不出所料,共事那边的代码加了锁;一个基于 Redis 的分布式锁,这时我一拍大腿不会是解锁的时候超时了导致抛了异样吧。

为了验证这个问题,在能复现的根底上我在框架的 Pulsar 生产处打了断点:


果然破案了,异样提醒曾经十分分明了:加锁曾经过了超时工夫。

进入异样后间接 negative 音讯,同时异样也被吃掉了,所以之前没有发现。


查阅了 RedisLockRegistry 的源码,默认超时工夫正好是一分钟,所以之前咱们 sleep 几十秒也无奈复现这个问题。

总结

预先我向共事理解了下为啥这里要加锁,因为我看下来齐全没有加锁的必要;后果他是因为从他人那里复制的代码才加上的,压根没想那么多。

所以这事也能得出一些教训:

  • ctrl C/V 尽管不便,但也得充分考虑本人的业务场景。
  • 应用一些第三方 API 时,须要充沛理解其作用、参数。

你的点赞与分享是对我最大的反对

正文完
 0