关于java:RocketMQ-消费者1概念和消费流程

6次阅读

共计 4837 个字符,预计需要花费 13 分钟才能阅读完成。

1. 背景

RocketMQ 的生产能够算是 RocketMQ 的业务逻辑中最简单的一块。这外面波及到许多生产模式和个性。本想一篇文章写完,写到前面发现生产波及到的内容太多,于是决定分多篇来写。本文作为生产系列的第一篇,次要讲述 RocketMQ 生产波及到的模式和个性,也会概括性地讲一下生产流程。

我将 RocketMQ 的生产流程大抵分成 4 个步骤

  1. 重均衡
  2. 消费者拉取音讯
  3. Broker 接管拉取申请后从存储中查问音讯并返回
  4. 消费者生产音讯

每个步骤都会用一篇文章来解说。

先理解一下 RocketMQ 生产波及到地概念

2. 概念简述

2.1 生产组概念与生产模式

和大多数音讯队列一样,RocketMQ 反对两种音讯模式:集群生产(Clustering)和播送生产(Broadcasting)。在理解它们之前,须要先引入生产组的概念。

2.1.1 生产组

一个消费者实例即是一个消费者过程,负责生产音讯。单个消费者速度无限,在理论应用中通常会采纳多个消费者独特生产同样的 Topic 以放慢生产速度。这多个生产同样 Topic 的消费者组成了消费者组。

生产组是一个逻辑概念,它蕴含了多个同一类的消费者实例,通常这些消费者都生产同一类音讯(都生产雷同的 Topic)且生产逻辑统一。

生产组的引入是用来在生产音讯时更好地进行负载平衡和容错。

2.1.2 播送生产模式(BROADCASTING)

播送生产模式即全副的音讯会播送散发到所有的消费者实例,每个消费者实例会收到全量的音讯(即使生产组中有多个消费者都订阅同一 Topic)。

如下图所示,生产者发送了 5 条音讯,每个生产组中的消费者都收到全副的 5 条音讯。

播送模式应用较少,适宜各个消费者都须要告诉的场景,如刷新利用中的缓存。

注意事项:

  1. 播送生产模式下不反对 程序音讯
  2. 播送生产模式下不反对 重置生产位点
  3. 每条音讯都须要 被雷同订阅逻辑的多台机器解决
  4. 生产进度在客户端保护,呈现反复生产的概率稍大于集群模式。如果生产进度文件失落,存在音讯失落的可能。
  5. 播送模式下,音讯队列 RocketMQ 版保障每条音讯至多被每台客户端生产一次,然而并 不会重投生产失败的音讯,因而业务方须要关注生产失败的状况。
  6. 播送模式下,客户端每一次重启都会从最新消息生产。客户端在被进行期间发送至服务端的音讯将会被主动跳过,请审慎抉择。
  7. 播送模式下,每条音讯都会被大量的客户端反复解决,因而举荐尽可能应用集群模式。
  8. 播送模式下服务端不保护生产进度,所以音讯队列 RocketMQ 版控制台不反对音讯沉积查问、音讯沉积报警和订阅关系查问性能。

2.1.3 集群生产模式(CLUSTERING)

集群生产模式下,同一 Topic 下的一条音讯只会被同一生产组中的一个消费者生产。也就是说,音讯被负载平衡到了同一个生产组的多个消费者实例上。

更具体一点,在同一生产组中的不同消费者会依据负载机制来均匀地订阅 Topic 中的每个 Queue。(默认 AVG 负载形式)

RocketMQ 默认应用集群生产模式,这也是大部分场景下会应用到的生产模式。

2.2 消费者拉取音讯模式

2.2.1 Pull

指消费者 被动拉取音讯 进行生产,被动从 Broker 拉取音讯,主动权由消费者利用管制。

2.2.2 Push

Broker 被动将音讯 Push 给消费者,Broker 收到音讯就会被动推送到消费者端。该模式的生产实时性较高,也是支流场景中广泛采纳的生产模式。

消费者组中的消费者实例会依据预设的负载平衡算法对 Topic 中的 Queue 进行平均的订阅,每个 Queue 最多只能被一个消费者订阅。

在 RocketMQ 中,Push 生产其实也是由 Pull 生产(拉取)实现。Push 生产只是通过客户端 API 层面的封装让用户感觉像是 Broker 在推送音讯给消费者。

2.2.3 POP

RocketMQ 5.0 引入的新生产模式,是 Pull 拉取的另一种实现。也能够在 Push 模式下应用 POP 拉取音讯,甚至能够和 Push 模式独特应用(别离生产重试 Topic 和一般 Topic)。

POP 与 Pull 能够通过一个开关实时进行切换。POP 模式下,Broker 来管制每个消费者生产的队列和拉取的音讯,把重均衡逻辑从客户端移到了服务端。

次要解决了原来 Push 模式生产的以下痛点:

  • 富客户端:客户端逻辑比拟重,多语言反对不敌对
  • 队列独占:Topic 中的一个 Queue 最多只能被 1 个 Push 消费者生产,消费者数量无奈有限扩大。且消费者 hang 住时该队列的音讯会沉积。
  • 生产后更新 offset:本地生产胜利才会提交 offset

RocketMQ 5.0 的轻量化 gRPC 客户端就是基于 POP 生产模式开发

2.3 队列负载机制与重均衡

在集群生产模式下,生产组中的消费者独特生产订阅的 Topic 中的所有音讯,这里就存在 Topic 中的队列如何调配给消费者的问题。

2.3.1 队列负载机制

RocketMQ Broker 中的队列负载机制将一个 Topic 的不同队列依照算法尽可能均匀地调配给消费者组中的所有消费者。RocketMQ 预设了多种负载算法供不同场景下的生产。

AVG:将队列按数量平均分配给多个消费者,按 Broker 程序先调配第一个 Broker 的所有队列给第一个消费者,而后给第二个。

AVG_BY_CIRCLE:将 Broker 上的队列轮流分给不同消费者,更实用于 Topic 在不同 Broker 之间散布不平均的状况。

默认采纳 AVG 负载形式。

2.3.2 重均衡(Rebalance)

为消费者调配队列生产的这一个负载过程并不是一劳永逸的,比方当消费者数量变动、Broker 掉线等状况产生后,原先的负载就变得不再平衡,此时就须要从新进行负载平衡,这一过程被称为重均衡机制。

每隔 20s,RocketMQ 会进行一次查看,查看队列数量、消费者数量是否发生变化,如果变动则触发生产队列重均衡,从新执行上述负载算法。

2.4 生产端高牢靠

2.4.1 重试 - 死信机制

在理论应用中,音讯的生产可能呈现失败。RocketMQ 领有重试机制和死信机制来保障音讯生产的可靠性。

  1. 失常生产:生产胜利则提交生产位点
  2. 重试机制:如果失常生产失败,音讯会被消费者发回 Broker,放入重试 Topic:%RETRY% 消费者组。最多重试生产 16 次,重试的工夫距离逐步变长。(消费者组会主动订阅重试 Topic)。

    这里地提早重试采纳了 RocketMQ 的提早音讯,重试的 16 次工夫距离为提早音讯配置的每个提早等级的工夫(从第三个等级开始)。如果批改提早等级工夫的配置,重试的工夫距离也会相应发生变化。但即使提早等级工夫距离配置有余 16 个,仍会重试 16 次,前面依照最大的工夫距离来重试。

  3. 死信机制:如果失常生产和重试 16 次均失败,音讯会保留到死信 Topic %DLQ% 消费者组 中,此时需人工染指解决

2.4.2 队列负载机制与重均衡

当产生 Broker 挂掉或者消费者挂掉时,会引发重均衡,能够主动感知有组件挂掉的状况并从新调整消费者的订阅关系。

2.5 并发生产与程序生产

在消费者客户端生产时,有两种订阅音讯的形式,别离是并发生产和程序生产。播送模式不反对程序生产,仅有集群模式能应用程序生产。

须要留神的是,这里所说的程序生产指的是队列维度的程序,即在生产一个队列时,生产音讯的程序和音讯发送的程序统一。如果一个 Topic 有多个队列,是不可能达成 Topic 级别的程序生产的,因为无法控制哪个队列的音讯被先生产。Topic 只有一个队列的状况下可能实现 Topic 级别的程序生产。

具体程序生产和生产代码见 官网文档。

程序生产的形式为串行生产,并在生产时指定队列。

并发生产的形式是调用消费者的指定 MessageListenerConcurrently 作为生产的回调类,程序生产则应用 MessageListenerOrderly 类进行回调。解决这两种生产形式的生产服务也不同,别离是 ConsumeMessageConcurrentlyServiceConsumeMessageOrderlyService

程序生产的大抵原理是依附两组锁,一组在 Broker 端(Broker 锁),锁定队列和消费者的关系,保障同一时间只有一个消费者在生产;在消费者端也有一组锁(生产队列锁)以保障生产的程序性。

2.6 生产进度保留和提交

消费者生产一批音讯实现之后,须要保留生产进度。如果是集群生产模式,还须要将生产进度让其余消费者晓得,所以须要提交生产进度。这样在消费者重启或队列重均衡时能够依据生产进度持续生产。

不同模式下生产进度保留形式的不同:

  1. 播送模式:保留在 消费者本地。因为每个消费者都须要生产全量音讯音讯。在 LocalfileOffsetStore 当中。
  2. 集群模式:保留在 Broker,同时消费者端缓存。因为一个 Topic 的音讯只有被消费者组中的一个消费者生产即可,所以音讯的生产进度须要对立保留。通过 RemoteBrokerOffsetStore 存储。

集群模式下,消费者端有定时工作,定时将内存中的生产进度提交到 Broker,Broker 也有定时工作将内存中的生产偏移量长久化到磁盘。此外,消费者向 Broker 拉取音讯时也会提交生产偏移量。留神,消费者线程池提交的偏移量是线程池生产的这一批音讯中偏移量最小的音讯的偏移量。

  1. 生产完一批音讯后将音讯生产进度存在本地内存
  2. 消费者中有一个定时线程,每 5s 将内存中所有队列的生产偏移量提交到 Broker
  3. Broker 收到生产进度先缓存到内存,有一个定时工作每隔 5s 将音讯偏移量长久化到磁盘
  4. 消费者向 Broker 拉取音讯时也会将队列的音讯偏移量提交到 Broker

3. 生产流程

这张图是阿里云的文章解说生产时用到的,可能清晰地示意客户端 Push 模式并发生产流程。

从左上角第一个方框开始看

  1. 消费者启动时唤醒重均衡服务 RebalanceService,重均衡服务是客户端开始生产的终点。
  2. 重均衡服务会周期性(每 20s)执行重均衡办法 doRebalance),查问所有注册的 Broker,依据注册的 Broker 数量为本身调配负载的队列 rebalanceByTopic()
  3. 调配完队列后,会为每个调配到的新队列创立一个音讯拉取申请 pullRequest,这个拉取申请中保留一个解决队列 processQueue,即图中的红黑树(TreeMap),用来保留拉取到的音讯。红黑树保留音讯的程序。
  4. 音讯拉取线程利用生产 - 生产模式,用一个线程从拉取申请队列 pullRequestQueue 中弹出拉取申请,执行拉取工作,将拉取到的音讯放入解决队列。
  5. 拉取申请在一次拉取音讯实现之后会复用,从新被放入拉取申请队列 pullRequestQueue
  6. 拉取实现后,在 NettyClientPublicExecutorThreadPool 线程池异步处理结果,将拉取到的音讯放入解决队列,而后调用 consumeMessageService.submitConsumeRequest,将解决队列和 多个生产工作提交到生产线程池。每个生产工作生产 1 批音讯(1 批默认为 1 条)
  7. 每个消费者都有一个生产线程池 consumeMessageThreadPool,默认有 20 个生产线程。
  8. 生产线程池的每个生产线程会尝试从生产工作队列中获取生产申请,执行生产业务逻辑 listener.consumeMessage
  9. 生产实现后,如果生产胜利,则更新偏移量 updateOffset(先更新到内存 offsetTable,定时上报到 Broker。Broker 端也先放到内存,定时刷盘)。

参考资料

  • 官网文档——设计
  • RocketMQ 实战与进阶——丁威
  • RocketMQ 生产音讯——白云鹏
  • 消息中间件—RocketMQ 音讯生产(一)——癫狂侠
  • RocketMQ 音讯承受流程——赵坤
  • RocketMQ 音讯生产——贝贝猫
  • RocketMQ 5.0 POP 生产模式探秘
  • RocketMQ 音讯生产源码剖析
  • Rocketmq 生产音讯原理——服务端技术栈
  • RocketMQ——4. Consumer 生产音讯——Kong

    本文由博客一文多发平台 OpenWrite 公布!

正文完
 0