音讯拉取形式
- 通过队列负载机制后,会调配给以后消费者一些队列,留神一个生产组能够订阅多个主题,正如下面 pullRequestQueue 中所示,topic_test、topic_test2 这两个主题都调配了一个队列。
- 轮流从 pullRequestQueue 中取出一个 PullRequest 对象,依据该对象中的拉取偏移量向 Broker 发动拉取申请,默认拉取 32 条,可通过 pullBatchSize 参数进行扭转,该办法不仅会返回音讯列表,还会返更改 PullRequest 对象中的下一次拉取的偏移量。
- 接管到 Broker 返回的音讯后,会首先放入 ProccessQueue(解决队列),该队列的内部结构为 TreeMap,key 寄存的是音讯在音讯生产队列(consumequeue)中的偏移量,而 value 为具体的音讯对象。
- 而后将拉取到的音讯提交到生产组外部的线程池,并立刻返回,并将 PullRequest 对象放入到 pullRequestQueue 中,而后取出下一个 PullRequest 对象持续反复音讯拉取的流程,从这里能够看出,音讯拉取与音讯生产是不同的线程。
- 音讯生产组线程池解决完一条音讯后,会将音讯从 ProccessQueue 中删除,而后会向 Broker 汇报音讯生产进度,以便下次重启时能从上一次生产的地位开始生产。
音讯生产进度提交
通过下面拉取音讯流程咱们晓得,音讯生产组线程池在解决完一条音讯后,会将音讯从 ProccessQueue 中移除,并向 Broker 汇报音讯生产进度。
那请大家思考一下上面这个问题:
例如当初解决队列中有 5 条音讯,并且是线程池并发生产,那如果音讯偏移量为 3 的音讯(3:msg3)先于偏移量为 0、1、2 的音讯解决完,那向 Broker 如何汇报音讯生产进度呢?
如果提交 msg3 的偏移量为音讯生产进度,那汇报结束后如果消费者产生内存溢出等问题导致 JVM 异样退出,msg1 的音讯还未解决,而后重启消费者,因为音讯生产进度文件中存储的是 msg3 的音讯偏移量,会持续从 msg3 开始生产,会造成音讯失落。
RocketMQ 采取的形式是解决完 msg3 之后,会将 msg3 从音讯解决队列中移除,但在向 Broker 汇报音讯生产进度时是取 ProceeQueue 中最小的偏移量为音讯生产进度,即汇报的音讯生产进度是 0。
但如果呈现上图这种状况,也就是0,1,3都已生产完且从ProcessQueue移除,那如果咱们汇报的生产进度为2,如果产生内存溢出等异常情况,消费者重新启动,会持续从音讯偏移量为 2 的音讯开始生产,msg3 就会被生产屡次,故RocketMQ 不保障音讯反复生产,所以生产的幂等是须要业务方进行保障的。
再看下提交生产进度的流程图:
为了缩小消费者与 Broker 的网络交互,进步性能,提交音讯生产进度时会首先存入到本地缓存表中,而后定时上报到 Broker,同样 Broker 也会首先存储本地缓存表,而后定时刷写到磁盘。