关于pulsar:通过-Pulsar-源码彻底解决重复消费问题

6次阅读

共计 2030 个字符,预计需要花费 6 分钟才能阅读完成。

背景

最近真是和 Pulsar 杠上了,业务团队反馈说是线上有个利用音讯反复生产。

而且在测试环境是能够稳固复现的,依据教训来看个别能稳固复现的都比拟好解决。

定位问题

接着便是定位问题了,依据之前的教训让业务依照这几种状况先排查一下:

通过排查:1,2 能够排除了。

  1. 没有相干日志
  2. 存在异样,但最外层也捕捉了,所以不论有无异样都会 ACK。

第三个也在生产的入口和提交音讯出计算了工夫,最终发现都是在 2s 左右 ACK 的。

伪代码如下:

        Consumer consumer = client.newConsumer()
                .subscriptionType(SubscriptionType.Shared)
                .enableRetry(true)
                .topic(topic)
                .ackTimeout(30, TimeUnit.SECONDS)
                .subscriptionName("my-sub")
                .messageListener(new MessageListener<byte[]>() {
                    @SneakyThrows
                    @Override
                    public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {log.info("msg_id{}",msg.getMessageId().toString());
                        TimeUnit.SECONDS.sleep(2);
                        consumer.acknowledge(msg);
                    }
                })
                .subscribe();

那这就很奇怪了,因为代码里配置的 ackTimeout 是 30s,实践上来说是不会存在超时导致音讯重发的。

为了排除是否是超时引起的,间接将业务代码正文掉了,等于是音讯收到后立刻就 ACK,通过测试发现这样的确就没有反复生产了。

为了再次确认是不是和 ackTimeout 无关,间接将 .ackTimeout(30, TimeUnit.SECONDS) 正文掉后测试,发现也没有反复生产了。

确认起因

既然如此那肯定是和这个配置无关了,但看代码的确没有超时,为了定位具体起因只有去看 client 的源码了。

这里简略梳理下音讯的生产的流程:

  1. 依据 .receiverQueueSize(1000) 的配置,默认状况下 broker 会间接给客户端推送 1000 条音讯。
  2. 客户端将这 1000 条音讯保留到外部队列中。
  3. 如果应用同步生产 receive() 时,实质上就是去 take 这个外部队列。
  4. 如果是应用的是 messageListener 异步生产并配置 ackTimeout,每当从队列里取得一条音讯后便会把这条音讯退出 UnAckedMessageTracker 外部的一个工夫轮中,定时检测顶部是否存在音讯,如果存在则会触发从新投递。
    4.1 退出工夫轮后,异步 调用咱们自定义的事件,这个异步操作是提交到一个无界队列中由单个线程顺次排队执行(这点是这次问题的要害)
  5. 业务 ACK 的时候会从工夫轮中删除音讯,所以如果音讯 ACK 的足够快,在第四步就不会获取到音讯进行从新投递。

整体流程如上图,代码细节如下图:

所以问题的根本原因就是写入工夫轮(UnAckedMessageTracker)开始倒计时的线程和回调业务逻辑的不是同一个线程。

如果业务执行耗时,等到音讯从那个单线程的无界队列中取出来的时候很有可能曾经过了 ackTimeou 的工夫,从而导致了超时重发。

也就是用户所了解的 ackTimeout 周期(应该进入回调时候开始计时)和 SDK 实现的不统一造成的。

之后我再次确认同样的代码换为同步生产是没有问题的,不会导致反复生产:

while (true) {Message msg = consumer.receive();
            log.info("consumer Message received:" + new String(msg.getData()) + msg.getMessageId().toString());
            TimeUnit.SECONDS.sleep(2);
            consumer.acknowledge(msg);    
}

查看代码后发现同步代码的获取音讯和退出 UnAckedMessageTracker 工夫轮是同步的,也就不会呈现超时的问题。

总结

所以其实 是messageListener 异步生产的 ackTimeout 的语义是有问题的,须要将退出 UnAckedMessageTracker 处挪动到回调函数中同步调用。

我查看了最新的 2.11.x 版本的代码仍然没有修复,正筹备提个 PR 切换到 master 时才发现曾经有相干的 PR 了,只是还没有发版。

修复的背景和思路也是相似的,具体参考:

https://github.com/apache/pul…

其实业务中并不举荐应用 ackTimeout 这个配置了,不好预估工夫从而导致超时,而且我置信大部分业务配置好 ackTImeout 后直到后续出问题的时候才想起来要改。
所以罗唆一开始就不要应用。

在 go 版本的 SDK 中间接废除掉了这个参数,举荐应用 nack API 替换。

正文完
 0