关于rocketmq:RocketMQ学习十二Consumer学习

1次阅读

共计 2623 个字符,预计需要花费 7 分钟才能阅读完成。

生产端的次要性能有启动消费者,敞开消费者,同步或异步推或者拉取音讯,获取指定音讯生产队列的生产进度,获取以后正在解决的音讯生产队列,订阅主题,勾销订阅,注册并发生产模式监听器等性能。
在 RocketMQ 的外部实现原理中,其实现机制为 PULL 模式,而 PUSH 模式是一种伪推送,是对 PULL 模式的封装。每拉去一批音讯后,提交到生产端的线程池(异步),而后马上向 Broker 拉取音讯,即实现相似“推”的成果。为什么要对拉取的根底上再封装出推送呢?次要是拉取模式对使用者不敌对,对使用者要求比拟高,封装成推送后的 API 简略。

本文会介绍 DefaultMQPushConsumer 类的一些特色,大略会从:

  • DefaultMQPushConsumer 的外围参数
  • PUSH 模型音讯拉取机制
  • 音讯生产进度提交

这些方面动手。

DefaultMQPushConsumer 外围参数

String consumerGroup:生产组的名称,在 RocketMQ 中,对于生产组来说,一个生产组就是一个独立的隔离单位。

MessageModel messageModel:音讯组音讯生产模式,在 RocketMQ 中反对集群模式、播送模式。集群模式值得是一个生产组内多个消费者独特生产一个 Topic 中的音讯,即一条音讯只会被集群内的某一个消费者解决;而播送模式是指一个生产组内的每一个消费者负责 Topic 中的所有音讯。

ConsumeFromWhere consumeFromWhere:一个消费者首次启动时(即生产进度管理器中无奈查问到该生产组的进度)时从哪个地位开始生产的策略,可选值如下所示:

CONSUME_FROM_LAST_OFFSET:从最新的音讯开始生产。
CONSUME_FROM_FIRST_OFFSET:从最新的位点开始生产。
CONSUME_FROM_TIMESTAMP:从指定的工夫戳开始生产,这里的实现思路是从 Broker 服务器寻找音讯的存储工夫小于或等于指定工夫戳中最大的音讯偏移量的音讯,从这条音讯开始生产。

AllocateMessageQueueStrategy allocateMessageQueueStrategy:音讯队列负载算法。次要解决的问题是音讯生产队列在各个消费者之间的负载平衡策略,之前的文章 RocketMQ 学习一 -RocketMQ 初探已有介绍。

OffsetStore offsetStore:音讯进度存储管理器,该属性为公有属性,不能通过 API 进行批改,该参数次要是依据生产模式在外部主动创立,RocketMQ 在播送音讯、集群生产两种模式下音讯生产进度的存储策略会有所不同。

  • 集群模式:RocketMQ 会将音讯生产进度存储在 Broker 服务器,存储门路为 ${ROCKET_HOME}/store/config/ consumerOffset.json 文件中。(还记得如何定位 CommitLog 里的音讯吗?RocketMQ 学习十 - 消息日志文件及音讯检索)
  • 播送模式:RocketMQ 会将音讯生产进存在在生产端所在的机器上,存储门路为 ${user.home}/.rocketmq_offsets 中。


音讯生产进度,首先应用 topic@consumerGroup 为键,其值是一个 Map,键为 Topic 的队列序列,值为以后的音讯生产位点。

PUSH 模型音讯拉取机制

下图是 RocketMQ 音讯拉取执行模型

其外围关键点如下:

  1. 通过队列负载机制后,会调配给以后消费者一些队列,留神一个生产组能够订阅多个主题,正如下面 pullRequestQueue 中所示,topic_test、topic_test2 这两个主题都调配了一个队列。
  2. 轮流从 pullRequestQueue 中取出一个 PullRequest 对象,依据该对象中的拉取偏移量向 Broker 发动拉取申请,默认拉取 32 条,可通过 pullBatchSize 参数进行扭转,该办法不仅会返回音讯列表,还会返更改 PullRequest 对象中的下一次拉取的偏移量。
  3. 接管到 Broker 返回的音讯后,会首先放入 ProccessQueue(解决队列),该队列的内部结构为 TreeMap,key 寄存的是音讯在音讯生产队列(consumequeue)中的偏移量,而 value 为具体的音讯对象。
  4. 而后将拉取到的音讯提交到生产组外部的线程池,并立刻返回,并将 PullRequest 对象放入到 pullRequestQueue 中,而后取出下一个 PullRequest 对象持续反复音讯拉取的流程,从这里能够看出,音讯拉取与音讯生产是不同的线程。
  5. 音讯生产组线程池解决完一条音讯后,会将音讯从 ProccessQueue 中删除,而后会向 Broker 汇报音讯生产进度,以便下次重启时能从上一次生产的地位开始生产。

音讯生产进度提交

再说下 RocketMQ PUSH 模式的音讯生产进度提交机制。
通过上文的音讯生产拉取模型能够看出,音讯生产组线程池在解决完一条音讯后,会将音讯从 ProccessQueue 中移除,并向 Broker 汇报音讯生产进度,那请大家思考一下上面这个问题:

例如当初解决队列中有 5 条音讯,并且是线程池并发生产,那如果音讯偏移量为 3 的音讯(3:msg3)先于偏移量为 0、1、2 的音讯解决完,那向 Broker 如何汇报音讯生产进度呢?
如果提交 msg3 的偏移量为音讯生产进度,那汇报结束后如果消费者产生内存溢出等问题导致 JVM 异样退出,msg1 的音讯还未解决,而后重启消费者,因为音讯生产进度文件中存储的是 msg3 的音讯偏移量,会持续从 msg3 开始生产,会造成音讯失落。
RocketMQ 采取的形式是解决完 msg3 之后,会将 msg3 从音讯解决队列中移除,但在向 Broker 汇报音讯生产进度时是取 ProceeQueue 中最小的偏移量为音讯生产进度,即汇报的音讯生产进度是 0。


但如果呈现上图这种状况,也就是 0,1,3 都已生产完且从 ProcessQueue 移除,那如果咱们汇报的生产进度为 2,如果产生内存溢出等异常情况,消费者重新启动,会持续从音讯偏移量为 2 的音讯开始生产,msg3 就会被生产屡次,故 RocketMQ 不保障音讯反复生产,所以生产的幂等是须要业务方进行保障的。

再看下提交生产进度的流程图:

为了缩小消费者与 Broker 的网络交互,进步性能,提交音讯生产进度时会首 先存入到本地缓存表 中,而后定时上报到 Broker,同样 Broker 也会首先存储本地缓存表,而后定时刷写到磁盘。

正文完
 0