简介:POP Consumer—使客户端无状态,更轻量!
作者:凯易 & 耘田
前言:随着 RocketMQ 5.0 preview 的公布,5.0 的重大个性逐渐与大家见面。POP Consumer 作为 5.0 的一大个性,POP 生产模式展示了一种全新的生产模式。其具备的轻量级,无状态,无队列独占等特点,对于音讯积压场景,Streaming 生产场景等都十分敌对。在介绍 POP Consumer 之前,咱们先回顾一下目前应用较多的 Push Consumer。
Push Consumer
相熟 RocketMQ 的同学对 Push Consumer 必定不会生疏,客户端生产个别都会应用这种生产模式,应用这种生产模式也比较简单。咱们只需简略设置,并在回调办法 ConsumeMessage 中写好业务逻辑即可,启动客户端利用就能够失常生产音讯了。
public class PushConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("test_topic", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
那么 Push Consumer 是如何生产音讯的呢?
当然,Consumer 收到音讯的前提是 Producer 先发消息发到 Topic 当中。Producer 应用轮询的形式别离向每个 Queue 中发送音讯,个别生产端都不止一个,客户端启动的时候会在 Topic,Consumer group 维度产生负载平衡,为每个客户端调配须要解决的 Queue。负载平衡过程中每个客户端都获取到全副的的 ConsumerID 和所有 Queue 并进行排序,每个客户端应用雷同负责平衡算法,例如平均分配的算法,这样每个客户端都会计算出本人须要生产那些 Queue,每当 Consumer 减少或缩小就会触发负载平衡,所以咱们能够通过 RocketMQ 负载平衡机制实现动静扩容,晋升客户端收发音讯能力。
这里有个小问题:能够始终减少客户端的数量晋升生产能力吗?当然不能够,因为 Queue 数量无限,客户端数量一旦达到 Queue 数量,再扩容新节点无奈晋升生产能力,因为会有节点调配不到 Queue 而无奈生产。
客户端负责平衡为客户端调配好 Queue 后,客户端会一直向 Broker 拉取音讯,在客户端进行生产。不是 Push 客户端吗?怎么会是客户端向 Broker 拉音讯,不应该是 Broker 推音讯到客户端吗?这是一个很有意思的点,因为 RocketMQ 无论是 Push Consumer,还是 Pull Consumer,还是前面要介绍的 POP Consumer,都是客户端拉的形式生产音讯。Push Consumer 只是通过客户端 API 层面的封装让咱们感觉是 Broker 推送的。
通过客户端负载平衡以及拉音讯,客户端就能够失常生产音讯了。
残缺的的 Push Consumer 解决逻辑能够看下下面这张图,咱们能够看到 Push Consumer 残缺解决流程。
首先客户端 Rebalance 确定哪些 Consumer 客户端解决哪些 Queue,而后通过 PullMessageService 服务拉取音讯,拉取到音讯当前 ConsumeMessageConcurrentlyService 提交生产申请到音讯生产线程池,而后调用回调办法 ConsumeMessage,到这里就能够拿到音讯解决业务了,最初生产胜利更新本地 offset 并上报 offset 到 Broker。如果生产失败(抛异样,超时等),客户端会发送 sendBack 通知 Broker 哪些音讯生产失败了,Broker 会将生产失败的音讯发送到延时队列,延时后再放到 retry Topic,客户端生产 retry Topic 实现音讯重投。这样做的益处是不会因为局部生产失败的音讯而影响失常音讯的生产。想理解细节的同学能够到 github 下载源码对照这张图看一下理论的代码解决流程。
通过后面 Push Consumer 的介绍,咱们对 Push Consumer 原理有了肯定的意识。咱们能够发现,RocketMQ 的客户端做了很多事件,负载平衡,拉音讯,生产位点治理,生产失败后的 sendBack 等等。这对多语言反对无疑是不敌对的。参加过多语言开发的同学应该会感同身受,将这么多的逻辑移植到不同的语言,必定不是一件简略的事件。同时客户端的降级运维也会减少难度。
所以咱们思考可不可为客户端瘦身,把一部分逻辑从客户端移到 Broker?当然是能够的,后面介绍 Push Consumer 客户端负责平衡的时候,咱们能够发现,负载平衡须要的信息,所有 ConsumerId,本来就是客户端从 Broker 获取的,所有 Queue 信息,Broker 也能够通过 nameServer 拿到,负责平衡算法在客户端还是 Broker 端调用也没有什么大的差别,所以把 Rebalance 移植到 Broker 是一个不错抉择,Broker 负载平衡能够跟客户端负责平衡达到基本相同的成果,客户端逻辑会缩小,多语言实现更加简略,后续降级运维也会更加可控。除此以外因为 Broker 绝对客户端具备全局信息,还能够做一些更有意思的事件。例如在负责平衡的时候依据 Queue 的积压状况做负载平衡,将一些压力比拟大的客户端上的 Queue 调配给其它客户端解决等等。
POP Consumer
通过后面 Push Consumer 的介绍,咱们理解到 Push Consumer 的一些特点。
- 队列独占:Broker 上的每个队列只能调配到雷同 Consumer group 的一台 Push Consumer 机器上。
- 生产后更新 offset:每次 Pull 申请拉取批量音讯到本地队列缓存,本地生产胜利才会 commit offset。
以上特点可能会带来一些问题,比方客户端异样机器 hang,导致调配队列音讯沉积,无奈生产。
RocketMQ 的 Push Consumer 生产对于机器异样 hang 时并不非常敌对。如果遇到客户端机器 hang 住,处于半死不活的状态,与 Broker 的心跳没有断掉的时候,客户端 Rebalance 仍然会调配生产队列到 hang 机器上,并且 hang 机器生产速度很慢甚至无奈生产的时候,会导致生产沉积。另外相似还有服务端 Broker 公布时,也会因为客户端屡次 Rebalance 导致生产提早影响等无奈防止的问题。如下图所示:image.gif
当 Push Consumer 2 机器产生 hang 的时候,它所调配到的 Broker 上的 Q2 呈现重大的沉积。咱们目前解决这种问题,个别可能是找到这台机器重启,或者下线。保障业务不受异样机器影响,然而如果队列挤压到肯定水平可能机器复原了也没方法疾速追赶生产进度,这也是受 Push Consumer 的能力限度。
咱们总结下 Push Consumer 存在的一些痛点问题:
- 富客户端,客户端逻辑比拟重,多语言反对不敌对;
- 客户端或者 Broker 降级公布,重启等 Rebalance 可能导致生产挤压;
- 队列占位,单队列与单 Consumer 绑定,单个 Queue 生产能力无奈横向扩大;
- 机器 hang,会导致挤压。
基于上述问题,RocketMQ 5.0 实现了全新的生产模型 -POP Consumer。
POP Consumer 可能解决上述稳定性和解除队列占位的扩大能力。
咱们上面来简略看一下 POP Consumer 是如何生产音讯的:
POP Client 从 Broker 的队列中收回 POP 申请音讯,Broker 返回音讯 message。在音讯的零碎属性外面有一个比拟重要的属性叫做 POP_CK,POP_CK 为一条音讯的 handler,通过一个 handler 就能够定位到一条音讯。当音讯生产胜利之后,POP client 发送 ackMessage 并传递 handler 向 broker 确认音讯生产胜利。
对于音讯的重试,当 POP 出一条音讯之后,这条音讯就会进入一个不可见的工夫,在这段时间就不会再被 POP 进去。如果没有在这段不可见工夫通过 ackMessage 确认音讯生产胜利,那么过了不可见工夫之后,这条音讯就会再一次的可见。
另外,对于音讯的重试,咱们的重试策略是一个梯度的延迟时间,重试的间隔时间是一个逐渐递增的。所以,还有一个 changeInvisibleTime 能够批改音讯的不可见工夫。
从图上能够看见,原本音讯会在两头这个工夫点再一次的可见的,然而咱们在可见之前提前应用 changeInvisibleTime 缩短了不可见工夫,让这条音讯的可见工夫推延了。当用户业务代码返回 reconsumeLater 或者抛异样的时候,咱们就能够通过 changeInvisibleTime 依照重试次数来批改下一次的可见工夫了。另外如果生产 RT 超过了 30 秒(默认值,能够批改),则 Broker 也会把音讯放到重试队列。
除此以外,POP 生产的位点是由 Broker 保留和管制,而且 POP 生产是能够多个 Client 生产同一个队列,如下图所示:
三个客户端并不需要 Rebalance 去调配 Queue,取而代之的是,它们都会应用 POP 申请所有的 Broker 获取音讯进行生产。即便 POP Consumer 2 呈现 hang,其内部消息也会让 POP Consumer1 和 POP Consumer3 进行生产。这样就解决了 hang 机器可能造成的生产沉积问题。
从整体流程可见,POP 生产能够防止 Rebalance 带来的生产延时,同时客户端能够生产 Broker 的所有队列,这样就能够防止机器 hang 而导致沉积的问题。
同时扩大能力晋升,POP Consumer 能够生产同一 Topic 下所有 Queue,相比 Push Consumer 解除了每个 Queue 必须 Rebalance 到一台客户端生产的限度,Push Consuner 客户端数量最多只能等于 Queue 的数量。POP Consumer 能够冲破这个限度,多个 POP Consumer 能够生产同一个 Queue。
Broker 实现
POP Consumer 在 Broker 端是如何实现的呢?
POP Consumer 拉取音讯后,会在 Queue 维度上加锁,保障同一时刻只有一个客户端能够拉去到同一个 Queue 的音讯。获取到音讯后,会保留 checkPoint 信息在 Broker,checkPoint 信息次要包含音讯的 Topic,ConsumerGroup,QueueId,offset,POPTime,msgCout,reviveQueueId 等信息。checkPoint 信息会优先保留到 buffer 当中,期待 ack 音讯,在一段时间内收到客户端回复的 ack 音讯,对应的 checkPoint 信息从 buffer 中移除,并且更新生产进度,标识音讯生产胜利。
当 checkPoint 音讯在 buffer 中期待一段时间,始终未等到 ack 音讯时,checkPoint 信息会清理出 buffer 并发送 ck msg 到 store,ck msg 首先被发送到延时队列 SCHEDULE_Topic_XXXX 中,延时实现当前会进入 REVIVE_LOG Topic,REVIVE_LOG Topic 是保留在 store 当中待处理的 ck msg 和 ack msg 的 Topic,POPReceiveService 拉取 REVIVE_LOG Topic 的音讯放到一个 map 当中,如果 ck 有对应的 ack 则会更新 REVIVE_LOG 的生产位点,标识音讯生产实现,超时未被确认的 ck msg,会查问到 ck msg 对应的实在的音讯,并把这个音讯放到 retry Topic 当中,期待客户端生产,POP Consumer 失常生产的时候会概率性的生产到 retry Topic 中的音讯。咱们从这块设计中能够看到 RocketMQ 的罕用设计,通过一些外部的 Topic 实现业务逻辑,事务音讯,定时音讯都用了这种设计形式。
咱们简略终结一下 POP Consumer 的劣势:
- 无状态,offset 信息 Broker 保护,客户端与 Queue 无绑定。
- 轻量级,客户端只须要收发音讯,确认音讯。
- 无队列占位,Queue 不再与客户端绑定。
- 语言敌对,不便多语言移植。
- 降级更可控,逻辑都收敛到 Broker,降级更加不便可控。
POP&Push 交融
既然 POP 有这么多劣势,咱们是否应用 POP 解决 Push 的一些问题呢?后面咱们提到 Push Consumer 当一个队列因为 Consumer 问题曾经沉积很多的时候,受限于单个 Consumer 的生产能力,也无奈疾速的追赶生产进度,提早会很高。外围问题是单队列单 Consumer 的限度,导致生产能力无奈横向扩大。
咱们心愿通过 POPAPI 的模式,当一个队列沉积太多的状况下,能够切换到 POP 模式,有机会让多个 Consumer 来一起生产该队列,追赶进度,咱们在 5.0 的实现中也实现了这一点。
POP/Push 模式切换形式
能够通过两种形式进行切换。
1、命令行
mqadmin setConsumeMode -c cluster -t topic -g group -m POP -n 8
2、代码切换
public static final String CONSUMER_GROUP = "CID_JODIE_1";
public static final String TOPIC = "TopicTest";
// Or use AdminTools directly: mqadmin setConsumeMode -c cluster -t topic -g group -m POP -n 8
private static void switchPop() throws Exception {DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
mqAdminExt.start();
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
Set<String> brokerAddrs = clusterInfo.getBrokerAddrTable().values().stream().map(BrokerData::selectBrokerAddr).collect(Collectors.toSet());
for (String brokerAddr : brokerAddrs) {mqAdminExt.setMessageRequestMode(brokerAddr, TOPIC, CONSUMER_GROUP, MessageRequestMode.POP, 8, 3_000);
}
}
通过上面 POP Consumer Demo,咱们看到 POP Consumer 跟 Push API 根本是对立,应用也比较简单,相比 Push API 只是多了一步生产模式切换。
Push & POP Retry 队列差别
在应用 POP 生产模式时咱们只须要在 Push API 的根底上切换模式即可,对于 Broker 来说还是须要做一些解决的。次要须要解决的中央是 retry 队列。
Push 和 POP 模式对 retry 队列解决不一样
- Push 的 retry 解决
服务端有一个 %RETRY%ConsumerGroup 队列
客户端会有拉取工作拉取这个队列的音讯。
- POP 的 retry 解决
服务端针对每个 Topic,都有一个名为 %RETRY%ConsumerGroup_Topic 的 retry 队列
客户端没有专门针对 retry 队列的拉工作,每次一般 POP 申请都有肯定概率生产相应的 retry 队列
模式切换之后,老模式的 retry 里的音讯还须要持续解决,否则就丢音讯了。
Push & POP 切换
Push 切换到 POP
- 失常队列切换到 POP 模式
- 失常队列的 POP 申请会解决对应的 POP retry 队列
- 针对 Push retry 队列,咱们保留原来 Push retry 队列的拉取工作,并且是工作在 Push 模式。
POP 切换到 Push
- 失常队列切换到 Push 模式
- Push retry 队列天然有相应的拉取工作
- 之前 POP 的 retry 队列,咱们在客户端主动创立拉取工作,以 Push 模式去拉取。留神这里的拉取工作只拉取 POP 的 retry 队列。
总结下来就是,对于 retry 队列,咱们会非凡解决不参加模式切换。
总结
最初咱们总结下 POP Consumer。POP 作为一种全新的生产模式,解决了 Push 模式的一些痛点,使客户端无状态,更加轻量,生产逻辑也根本都收敛到了 Broker,对多语言的反对非常的敌对。在 API 层面也与 Push 实现了交融,继承了 Push API 的简略易用,同时实现了 Push,POP 之间的自在切换。
原文链接
本文为阿里云原创内容,未经容许不得转载。