关于rocketmq:RocketMQ-消息消费过程

42次阅读

共计 1318 个字符,预计需要花费 4 分钟才能阅读完成。

上篇讲到音讯从 broker 拉取后,会把音讯存入 PullRequest 的 processQueue 中。此时就会有其余线程,对这些音讯进行生产。

其余线程包含程序生产和并发生产。

并发生产

并发生产中,用于生产音讯的线程叫做 ConsumeRequest,每一个 ConsumeRequest 默认生产 32 条音讯,如果须要生产的音讯超过 32 条,就会创立多个 ConsumeRequest。这些 ConsumeRequest 就会放入线程池中期待运行。

ConsumeRequest 线程运行的时候,会调用监听器,拿到音讯生产的后果,并发生产的后果有两种,RECONSUME_LATER 和 CONSUME_SUCCESS。

RECONSUME_LATER 阐明须要从新生产,但播送模式下只打印正告级别的日志,集群模式下,从新封装音讯成 ConsumeRequest,再放入线程池,5s 后再进行生产。

同时,也须要跟 broker 说,这个音讯生产失败了。broker 创立重试主题,并设置音讯重试次数,当音讯拉取超过重试次数后,就会进入 DLQ 队列中。

有哪些音讯须要从新生产?比方这次音讯有 10 个,ackIndex 是 2,那下标为 3 到 9 的音讯,都要从新生产。如果这个后果是 CONSUME_LATER,ackIndex 就会被设置为 -1,那就是所有的音讯,都要从新生产。

NSUME_SUCCESS 阐明生产胜利,依据 ackIndex 计算一下胜利和失败的信息。

对于 RECONSUME_LATER 或 NSUME_SUCCESS 的音讯解决完后,就要把音讯从 ProcessQueue 中移除,并更新 OffsetStore 里的音讯生产进度。

OffsetStore 里的音讯生产进度会由一个定时工作,每 5s 会提交给 broker,而后 broker 再更新偏移量。所以即使音讯生产胜利了,然而还没提交给 broker 的时候,宕机了,那下次还是从之前的偏移量开始读取数据,导致音讯反复生产。

程序生产

程序生产中,也是封装一个 ConsumeRequest,这个 ConsumeRequest 和并发生产的 ConsumeRequest 不同,他并没有音讯列表,所以音讯间接从 ProcessQueue 拿。

线程生产的时候,首先会获取一个锁。这个锁的粒度是音讯队列,也就是说,一个音讯生产队列,只能有一个线程进行生产。

获取锁后,就从 ProcessQueue 把音讯拿进去。

如果 ProcessQueue 有音讯,那还须要拿到 ProcessQueue 的 ConsumeLock。

而后进行音讯生产,并通过监听器拿到生产后果的状态,并开释锁。

如果后果是 SUCCESS,则更新 ProcessQueue 中的音讯生产信息。

如果后果是 SUSPEND_CURRENT_QUEUE_A_MOMENT,则会查看音讯的重试次数。

如果这个重试次数超过容许的最大重试次数,就会把这个音讯发送给 broker,且寄存在死信队列中。

如果没有超过,则会持续从新生产,并累加生产次数。

最初,再更新 OffsetStore 里的音讯生产进度。

OffsetStore 的解决,和并发生产一样。

线程拿到 messageQueueLock 锁后,并不会始终持有这个锁,默认生产 60s,就会开释这个锁。

正文完
 0