关于rocketmq:RocketMQ-消息拉取

36次阅读

共计 1076 个字符,预计需要花费 3 分钟才能阅读完成。

上一篇提到,因为音讯队列负载机制,会往 pullRequestQueue 队列放入一个个的 PullRequest。

这些 PullRequest 会有一个专门的线程,把它取出来并封装成服务端的一个 Request,发送给 broker。

在发送服务端之前,须要晓得 broker 的地址在哪,须要从什么偏移量开始拉取(偏移量集群模式存 broker,播送模式存本地),一次性拉多少数据。

broker 收到申请后,就会从 commitlog 中,依据偏移量把所须要的音讯给取出来,因为 broker 的主从同步,这边返回音讯的时候,也会告知下一次拉取是从主 broker 拉取还是 slave 的 broker 拉取数据。

从 commit 拉取音讯的时候,会有这几个状况(上面疏忽 offsetCheckInSlave 判断):

  1. 以后音讯队列并没有音讯,则下次拉取音讯的时候,如果这个 broker 是主节点,还是这个偏移量。如果这个 broker 是从节点,则下次间接从 0 开始。

  1. 以后音讯队列有音讯,然而咱们拉取的偏移量比队列里最小的偏移量还小,比方咱们须要拉取 100 的数据,然而音讯队列的最小偏移量是 500。则下次拉取音讯的时候,如果这个 broker 是主节点,还是这个偏移量。如果这个 broker 是从节点,则下次间接从 500 开始。

  1. 如果咱们咱们拉取的偏移量刚好等于音讯队列里最大的偏移量,所以咱们也没有数据能够生产了,那下次拉取的时候,还是 100 这个偏移量。

  1. 如果咱们咱们拉取的偏移量大于音讯队列里最大的偏移量,所以咱们也没有数据能够生产了,那下次拉取的时候,这里还要判断音讯队列里最小的偏移量是否等于 0。

  1. 失常状况下,咱们须要的音讯是在音讯队列里最小的偏移量和最大的偏移量之间,那就间接把数据从 commitlog 中取出来并返回。

生产端接管 broker 响应后,就会依据下面各个状况的偏移量进行更新,并且把音讯存入 PullRequest 的 processQueue 中。

并且再把 PullRequest 放入 pullRequestQueue 队列中,期待下次拉取。

processQueue 的音讯如果来不及生产,会始终的沉积,所以 PullRequest 在拉取音讯的时候,会先判断 processQueue 里的音讯数量是否曾经超过 1000,如果超过了,则以后不拉取,并放入 pullRequestQueue 队列中,50ms 后才能够持续拉取。

此外还会判断 processQueue 中音讯的大小、processQueue 中队列最大偏移量与最小偏离量的间距,如果超过了阈值,也会放入 pullRequestQueue 队列中,50ms 后才能够持续拉取。

正文完
 0