RocketMQ 消费者保障
- 作者: 博学谷狂野架构师
-
GitHub:GitHub 地址(有我精心筹备的 130 本电子书 PDF)
只分享干货、不吹水,让咱们一起加油!😄
音讯确认机制
consumer 的每个实例是靠队列调配来决定如何生产音讯的。那么生产进度具体是如何治理的,又是如何保障音讯胜利生产的?(RocketMQ 有保障音讯必定生产胜利的个性, 失败则重试)
什么是 ACK
音讯确认机制
在理论应用 RocketMQ 的时候咱们并不能保障每次发送的音讯都刚好能被消费者一次性失常生产胜利,可能会存在须要屡次生产能力胜利或者始终生产失败的状况,那作为发送者该做如何解决呢?
为了保证数据不被失落,RocketMQ 反对音讯确认机制,即 ack。发送者为了保障音讯必定生产胜利,只有应用方明确示意生产胜利,RocketMQ 才会认为音讯生产胜利。中途断电,抛出异样等都不会认为胜利——即都会从新投递。
保证数据能被正确处理而不仅仅是被 Consumer 收到,咱们就不能采纳 no-ack 或者 auto-ack,咱们须要手动 ack(manual-ack)。在数据处理实现后手动发送 ack,这个时候 Server 才将 Message 删除。
RocketMQ ACK
因为以上工作所有的机制都实现在 PushConsumer 中,所以本文的原理均只实用于 RocketMQ 中的 PushConsumer 即 Java 客户端中的DefaultPushConsumer
。若应用了 PullConsumer 模式,相似的工作如何 ack,如何保障生产等均须要应用方本人实现。
注:播送生产和集群生产的解决有局部区别,以下均特指集群生产(CLSUTER),播送(BROADCASTING)下局部可能不实用。
保障生产胜利
PushConsumer 为了保障音讯必定生产胜利,只有应用方明确示意生产胜利,RocketMQ 才会认为音讯生产胜利。中途断电,抛出异样等都不会认为胜利——即都会从新投递。
代码示例
生产的时候,咱们须要注入一个生产回调,具体 sample 代码如下:
COPYconsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + "Receive New Messages:" + msgs);
execute();// 执行真正生产
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
业务实现生产回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
,RocketMQ 才会认为这批音讯(默认是 1 条)是生产实现的。
如果这时候音讯生产失败,例如数据库异样,余额有余扣款失败等所有业务认为音讯须要重试的场景,只有返回ConsumeConcurrentlyStatus.RECONSUME_LATER
,RocketMQ 就会认为这批音讯生产失败了。
为了保障音讯是必定被至多生产胜利一次,RocketMQ 会把这批音讯重发回 Broker(topic 不是原 topic 而是这个生产租的 RETRY topic),在提早的某个工夫点(默认是 10 秒,业务可设置)后,再次投递到这个 ConsumerGroup。而如果始终这样反复生产都继续失败到肯定次数(默认 16 次),就会投递到 DLQ 死信队列。利用能够监控死信队列来做人工干预。
ACK 进度保留
启动的时候从哪里生产
当新实例启动的时候,PushConsumer 会拿到本生产组 broker 曾经记录好的生产进度(consumer offset),依照这个进度发动本人的第一次 Pull 申请。
如果这个生产进度在 Broker 并没有存储起来,证实这个是一个全新的生产组,这时候客户端有几个策略能够抉择:
COPYCONSUME_FROM_LAST_OFFSET // 默认策略,从该队列最尾开始生产,即跳过历史音讯
CONSUME_FROM_FIRST_OFFSET // 从队列最开始开始生产,即历史音讯(还贮存在 broker 的)全副生产一遍
CONSUME_FROM_TIMESTAMP// 从某个工夫点开始生产,和 setConsumeTimestamp()配合应用,默认是半个小时以前
所以,社区中常常有人问:“为什么我设了CONSUME_FROM_LAST_OFFSET
,历史的音讯还是被生产了”?起因就在于只有全新的生产组才会应用到这些策略,老的生产组都是按曾经存储过的生产进度持续生产。
对于老生产组想跳过历史音讯须要本身做过滤,或者应用先批改生产进度
音讯 ACK 生产进度
RocketMQ 是以 consumer group+queue 为单位是治理生产进度的,以一个 consumer offset 标记这个这个生产组在这条 queue 上的生产进度。
如果某已存在的生产组呈现了新生产实例的时候,依附这个组的生产进度,就能够判断第一次是从哪里开始拉取的,每次音讯胜利后,本地的生产进度会被更新,而后由定时器定时同步到 broker,以此长久化生产进度。
然而每次记录生产进度的时候,只会把一批音讯中最小的 offset 值为生产进度值,如下图:
这钟形式和传统的一条 message 独自 ack 的形式有实质的区别。性能上晋升的同时,会带来一个潜在的反复问题——因为生产进度只是记录了一个下标,就可能呈现拉取了 100 条音讯如 2101-2200 的音讯,前面 99 条都生产完结了,只有 2101 生产始终没有完结的状况。
在这种状况下,RocketMQ 为了保障音讯必定被生产胜利,生产进度职能维持在 2101,直到 2101 也生产完结了,本地的生产进度能力标记 2200 生产完结了(注:consumerOffset=2201)。
反复生产
在这种设计下,就有生产大量反复的危险。如 2101 在还没有生产实现的时候生产实例忽然退出(机器断电,或者被 kill)。这条 queue 的生产进度还是维持在 2101,当 queue 重新分配给新的实例的时候,新的实例从 broker 上拿到的生产进度还是维持在 2101,这时候就会又从 2101 开始生产,2102-2200 这批音讯实际上曾经被生产过还是会投递一次。
对于这个场景,RocketMQ 临时无能为力,所以业务必须要保障音讯生产的幂等性,这也是 RocketMQ 官网屡次强调的态度。
实际上,从源码的角度上看,RocketMQ 可能是思考过这个问题的,截止到 3.2.6 的版本的源码中,能够看到为了缓解这个问题的影响面,DefaultMQPushConsumer
中有个配置consumeConcurrentlyMaxSpan
COPY/**
* Concurrently max span offset.it has no effect on sequential consumption
*/
private int consumeConcurrentlyMaxSpan = 2000;
这个值默认是 2000,当 RocketMQ 发现本地缓存的音讯的最大值 - 最小值差距大于这个值(2000)的时候,会触发流控——也就是说如果头尾都卡住了局部音讯,达到了这个阈值就不再拉取音讯。
但作用理论很无限,像刚刚这个例子,2101 的生产是死循环,其余生产十分失常的话,是无能为力的。一旦退出,在不人工干预的状况下,2101 后所有音讯全副反复!
Ack 卡进度解决方案
实际上对于卡住进度的场景,能够抉择弃车保帅的计划:把音讯卡住那些音讯,先 ack 掉,让进度前移。但要保障这条音讯不会因而失落,ack 之前要把音讯 sendBack 回去,这样这条卡住的音讯就会必然反复,但会解决潜在的大量反复的场景。这也是咱们公司本人定制的解决方案。
局部源码如下:
COPYclass ConsumeRequestWithUnAck implements Runnable {
final ConsumeRequest consumeRequest;
final long resendAfterIfStillUnAck;// n 毫秒没有生产完,就重发
ConsumeRequestWithUnAck(ConsumeRequest consumeRequest,long resendAfterIfStillUnAck) {
this.consumeRequest = consumeRequest;
this.resendAfterIfStillUnAck = resendAfterIfStillUnAck;
}
@Override
public void run() {
// 每次生产前,打算延时工作,超时则 ack 并重发
final WeakReference<ConsumeRequest> crReff = new WeakReference<>(this.consumeRequest);
ScheduledFuture scheduledFuture=null;
if(!ConsumeDispatcher.this.ackAndResendScheduler.isShutdown()) {scheduledFuture= ConsumeDispatcher.this.ackAndResendScheduler.schedule(new ConsumeTooLongChecker(crReff),resendAfterIfStillUnAck,TimeUnit.MILLISECONDS);
}
try{this.consumeRequest.run();// 失常执行并更新 offset
}
finally {if (scheduledFuture != null) scheduledFuture.cancel(false);// 生产完结后, 勾销工作
}
}
}
- 定义了一个装璜器,把原来的 ConsumeRequest 对象包了一层。
-
装璜器中,每条音讯生产前都会调度一个调度器,定时触发,触发的时候如果发现音讯还存在,就执行 sendback 并 ack 的操作。
起初 RocketMQ 显然也发现了这个问题,RocketMQ 在 3.5.8 之后也是采纳这样的计划去解决这个问题。只是实现形式上有所不同(事实上我认为 RocketMQ 的计划还不够欠缺)
- 在 pushConsumer 中 有一个
consumeTimeout
字段(默认 15 分钟),用于设置最大的生产超时工夫。生产前会记录一个生产的开始工夫,前面用于比对。 - 消费者启动的时候,会定期扫描所有生产的音讯,达到这个 timeout 的那些音讯,就会触发 sendBack 并 ack 的操作。这里扫描的距离也是 consumeTimeout(单位分钟)的距离。
外围源码如下:
COPY//ConsumeMessageConcurrentlyService.java
public void start() {this.CleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {cleanExpireMsg();
}
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
//ConsumeMessageConcurrentlyService.java
private void cleanExpireMsg() {
Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
while (it.hasNext()) {Map.Entry<MessageQueue, ProcessQueue> next = it.next();
ProcessQueue pq = next.getValue();
pq.cleanExpiredMsg(this.defaultMQPushConsumer);
}
}
//ProcessQueue.java
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {return;}
int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
for (int i = 0; i < loop; i++) {
MessageExt msg = null;
try {this.lockTreeMap.readLock().lockInterruptibly();
try {if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {msg = msgTreeMap.firstEntry().getValue();} else {break;}
} finally {this.lockTreeMap.readLock().unlock();}
} catch (InterruptedException e) {log.error("getExpiredMsg exception", e);
}
try {pushConsumer.sendMessageBack(msg, 3);
log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
try {this.lockTreeMap.writeLock().lockInterruptibly();
try {if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
try {msgTreeMap.remove(msgTreeMap.firstKey());
} catch (Exception e) {log.error("send expired msg exception", e);
}
}
} finally {this.lockTreeMap.writeLock().unlock();}
} catch (InterruptedException e) {log.error("getExpiredMsg exception", e);
}
} catch (Exception e) {log.error("send expired msg exception", e);
}
}
}
通过这个逻辑比照我定制的工夫,能够看出有几个不太欠缺的问题:
- 生产 timeout 的工夫十分不准确。因为扫描的距离是 15 分钟,所以实际上触发的时候,音讯是有可能卡住了靠近 30 分钟(15*2)才被清理。
- 因为定时器一启动就开始调度了,中途这个 consumeTimeout 再更新也不会失效。
音讯重试
程序音讯的重试
对于程序音讯,当消费者生产音讯失败后,音讯队列 RocketMQ 版会主动一直地进行音讯重试(每次间隔时间为 1 秒),这时,利用会呈现音讯生产被阻塞的状况。因而,建议您应用程序音讯时,务必保障利用可能及时监控并解决生产失败的状况,防止阻塞景象的产生。
无序音讯的重试
对于无序音讯(一般、定时、延时、事务音讯),当消费者生产音讯失败时,您能够通过设置返回状态达到音讯重试的后果。
无序音讯的重试只针对集群生产形式失效;播送形式不提供失败重试个性,即生产失败后,失败音讯不再重试,持续生产新的音讯。
留神 以下内容都只针对无序音讯失效。
重试次数
音讯队列 RocketMQ 版默认容许每条音讯最多重试 16 次,每次重试的间隔时间如下。
第几次重试 | 与上次重试的间隔时间 | 第几次重试 | 与上次重试的间隔时间 |
---|---|---|---|
1 | 10 秒 | 9 | 7 分钟 |
2 | 30 秒 | 10 | 8 分钟 |
3 | 1 分钟 | 11 | 9 分钟 |
4 | 2 分钟 | 12 | 10 分钟 |
5 | 3 分钟 | 13 | 20 分钟 |
6 | 4 分钟 | 14 | 30 分钟 |
7 | 5 分钟 | 15 | 1 小时 |
8 | 6 分钟 | 16 | 2 小时 |
如果音讯重试 16 次后依然失败,音讯将不再投递。如果严格依照上述重试工夫距离计算,某条音讯在始终生产失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个工夫范畴音讯将不再重试投递。
留神 一条音讯无论重试多少次,这些重试音讯的 Message ID 不会扭转。
和生产端重试区别
消费者和生产者的重试还是有区别的,次要有两点
- 默认重试次数:Product 默认是 2 次,而 Consumer 默认是 16 次。
- 重试工夫距离:Product 是立即重试,而 Consumer 是有肯定工夫距离的 。它照
1S,5S,10S,30S,1M,2M····2H
进行重试。 - Product 在 异步 状况重试生效,而对于 Consumer 在 播送 状况下重试生效。
配置形式
重试配置形式
生产失败后,重试配置形式,集群生产形式下,音讯生产失败后冀望音讯重试,须要在音讯监听器接口的实现中明确进行配置(三种形式任选一种):
- 形式 1:返回 RECONSUME_LATER(举荐)
- 形式 2:返回 Null
- 形式 3:抛出异样
示例代码
COPY// 注册音讯监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
// 音讯解决逻辑抛出异样,音讯将重试。doConsumeMessage(list);
// 形式 1:返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,音讯将重试。return ConsumeConcurrentlyStatus.RECONSUME_LATER;
// 形式 2:返回 null,音讯将重试。// return null;
// 形式 3:间接抛出异样,音讯将重试。// throw new RuntimeException("Consumer Message exception");
}
});
无需重试的配置形式
集群生产形式下,音讯失败后冀望音讯不重试,须要捕捉生产逻辑中可能抛出的异样,最终返回 Action.CommitMessage,尔后这条音讯将不会再重试。
示例代码
COPY// 注册音讯监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
// 音讯解决逻辑抛出异样,音讯将重试。try {doConsumeMessage(list);
}catch (Exception e){
// 捕捉生产逻辑中的所有异样,并返回 Action.CommitMessage;
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 业务方失常生产
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
获取音讯重试次数
消费者收到音讯后,可依照以下形式获取音讯的重试次数:
COPY// 注册音讯监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {doConsumeMessage(list);
// 获取音讯重试次数
int retryTimes = list.get(0).getReconsumeTimes();
// 业务方失常生产
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
成果演示
生产端只发送一条音讯进行测试重试
生产端被动回绝
演示代码
COPYpublic class RetryConsumer {public static void main(String[] args) throws Exception {
// 创立一个音讯消费者,并设置一个音讯消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocket_test_consumer_group");
// 指定 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 设置 Consumer 第一次启动时从队列头部开始生产还是队列尾部开始生产
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅指定 Topic 下的所有音讯
consumer.subscribe("topicTest", "*");
// 注册音讯监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {if (list != null) {for (MessageExt ext : list) {
// 获取音讯重试次数
int retryTimes = ext.getReconsumeTimes();
try {String message = new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consumer- 线程名称 =[" + Thread.currentThread().getId() + "], 音讯重试次数:[" + retryTimes + "], 接管工夫:[" + new Date().getTime() + "], 音讯 =[" + message + "]");
} catch (UnsupportedEncodingException e) {e.printStackTrace();
}
}
}
// 业务被动回绝音讯
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
// 消费者对象在应用之前必须要调用 start 初始化
consumer.start();
System.out.println("音讯消费者已启动");
}
}
重试音讯
生产端返回
ConsumeConcurrentlyStatus.RECONSUME_LATER
,生产端就会一直的重试。
COPYConsumer- 线程名称 =[35], 音讯重试次数:[0], 接管工夫:[1608117780264], 音讯 =[Hello Java demo RocketMQ]
Consumer- 线程名称 =[36], 音讯重试次数:[1], 接管工夫:[1608117790840], 音讯 =[Hello Java demo RocketMQ]
Consumer- 线程名称 =[37], 音讯重试次数:[2], 接管工夫:[1608117820876], 音讯 =[Hello Java demo RocketMQ]
异样重试
演示代码
这里的代码意思很显著: 被动抛出一个异样,而后如果超过 3 次,那么就不持续重试上来,而是将该条记录保留到数据库由人工来兜底。
COPYpublic class RetryConsumer {public static void main(String[] args) throws Exception {
// 创立一个音讯消费者,并设置一个音讯消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocket_test_consumer_group");
// 指定 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 设置 Consumer 第一次启动时从队列头部开始生产还是队列尾部开始生产
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅指定 Topic 下的所有音讯
consumer.subscribe("topicTest", "*");
// 注册音讯监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {if (list != null) {for (MessageExt ext : list) {
// 获取音讯重试次数
int retryTimes = ext.getReconsumeTimes();
try {String message = new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consumer- 线程名称 =[" + Thread.currentThread().getId() + "], 音讯重试次数:[" + retryTimes + "], 接管工夫:[" + new Date().getTime() + "], 音讯 =[" + message + "]");
// 这里设置重试大于 3 次 那么通过保留数据库 人工来兜底
if (retryTimes >= 2) {System.out.println("该音讯曾经重试 3 次, 保留数据库。topic=[" + ext.getTags() + "],keys=[" + ext.getKeys() + "],msg=[" + message + "]");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
} catch (UnsupportedEncodingException e) {e.printStackTrace();
}
}
}
// 被动抛出异样
throw new RuntimeException("======= 这里出错了 ============");
// 业务方失常生产
//return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 消费者对象在应用之前必须要调用 start 初始化
consumer.start();
System.out.println("音讯消费者已启动");
}
}
重试音讯
COPYConsumer- 线程名称 =[36], 音讯重试次数:[0], 接管工夫:[1608118304600], 音讯 =[Hello Java demo RocketMQ]
Consumer- 线程名称 =[37], 音讯重试次数:[1], 接管工夫:[1608118315165], 音讯 =[Hello Java demo RocketMQ]
Consumer- 线程名称 =[38], 音讯重试次数:[2], 接管工夫:[1608118345191], 音讯 =[Hello Java demo RocketMQ]
该音讯曾经重试 3 次, 保留数据库。topic=[TagA],keys=[null],msg=[Hello Java demo RocketMQ]
超时重试
这里的超时异样并非真正意义上的超时,它指的是指获取音讯后,因为某种原因没有给 RocketMQ 返回生产的状态,即没有
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS
或return ConsumeConcurrentlyStatus.RECONSUME_LATER
那么 RocketMQ 会认为该音讯没有发送,会始终发送。因为它会认为该音讯基本就没有发送给消费者, 所以必定没生产。
演示代码
COPYpublic class RetryConsumer {public static void main(String[] args) throws Exception {
// 创立一个音讯消费者,并设置一个音讯消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocket_test_consumer_group");
// 指定 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 设置 Consumer 第一次启动时从队列头部开始生产还是队列尾部开始生产
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅指定 Topic 下的所有音讯
consumer.subscribe("topicTest", "*");
// 注册音讯监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {if (list != null) {for (MessageExt ext : list) {
// 获取音讯重试次数
int retryTimes = ext.getReconsumeTimes();
try {String message = new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consumer- 线程名称 =[" + Thread.currentThread().getId() + "], 音讯重试次数:[" + retryTimes + "], 接管工夫:[" + new Date().getTime() + "], 音讯 =[" + message + "]");
} catch (UnsupportedEncodingException e) {e.printStackTrace();
}
}
}
// 这里睡眠 60 秒
try {TimeUnit.SECONDS.sleep(60);
} catch (InterruptedException e) {e.printStackTrace();
}
System.out.println("休眠 60 秒 看还能不能走到这里...");
// 业务方失常生产
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 消费者对象在应用之前必须要调用 start 初始化
consumer.start();
System.out.println("音讯消费者已启动");
}
}
重试音讯
当取得 以后生产重试次数为 = 0 后 , 关掉该过程。再重新启动该过程,那么仍然可能获取该条音讯
COPYConsumer- 线程名称 =[33], 音讯重试次数:[0], 接管工夫:[1608118652598], 音讯 =[Hello Java demo RocketMQ]
重启消费者
COPYConsumer- 线程名称 =[28], 音讯重试次数:[0], 接管工夫:[1608118683304], 音讯 =[Hello Java demo RocketMQ]
休眠 60 秒 看还能不能走到这里...
重试音讯的解决
个别状况下咱们在理论生产中是不须要重试 16 次,这样既浪费时间又节约性能,实践受骗尝试反复次数达到咱们想要的后果时如果还是生产失败,那么咱们须要将对应的音讯进行记录,并且完结反复尝试。
COPYpublic class RetryConsumer {public static void main(String[] args) throws Exception {
// 创立一个音讯消费者,并设置一个音讯消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocket_test_consumer_group");
// 指定 NameServer 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 设置 Consumer 第一次启动时从队列头部开始生产还是队列尾部开始生产
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅指定 Topic 下的所有音讯
consumer.subscribe("topicTest", "*");
// 注册音讯监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {if (list != null) {for (MessageExt ext : list) {
// 获取音讯重试次数
int retryTimes = ext.getReconsumeTimes();
try {String message = new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consumer- 线程名称 =[" + Thread.currentThread().getId() + "], 音讯重试次数:[" + retryTimes + "], 接管工夫:[" + new Date().getTime() + "], 音讯 =[" + message + "]");
// 这里设置重试大于 3 次 那么通过保留数据库 人工来兜底
if (retryTimes >= 2) {System.out.println("该音讯曾经重试 3 次, 保留数据库。topic=[" + ext.getTags() + "],keys=[" + ext.getKeys() + "],msg=[" + message + "]");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
} catch (UnsupportedEncodingException e) {e.printStackTrace();
}
}
}
// 被动抛出异样
throw new RuntimeException("======= 这里出错了 ============");
// 业务方失常生产
//return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 消费者对象在应用之前必须要调用 start 初始化
consumer.start();
System.out.println("音讯消费者已启动");
}
}
所以任何异样都要捕捉返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,rocketmq 会放到重试队列, 这个重试 TOPIC 的名字是 %RETRY%+consumergroup 的名字, 如下图:
留神点
- 如果业务的回调没有解决好而抛出异样,会认为是生产失败当 ConsumeConcurrentlyStatus.RECONSUME_LATER 解决。
- 当应用程序生产的回调 MessageListenerOrderly 时,因为程序生产是要前者生产胜利能力持续生产,所以没有 ConsumeConcurrentlyStatus.RECONSUME_LATER 的这个状态,只有 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 来暂停队列的其余生产,直到原音讯一直重试胜利为止能力持续生产。
RocketMQ 重试流程
重试队列与死信队列
在介绍 RocketMQ 的生产重试机制之前,须要先来说下“重试队列”和“死信队列”两个概念。
重试队列
如果 Consumer 端因为各种类型异样导致本次生产失败,为避免该音讯失落而须要将其从新回发给 Broker 端保留,保留这种因为异样无奈失常生产而回发给 MQ 的音讯队列称之为重试队列。RocketMQ 会为每个生产组都设置一个 Topic 名称为 “%RETRY%+consumerGroup”的重试队列(这里须要留神的是, 这个 Topic 的重试队列是针对生产组,而不是针对每个 Topic 设置的 ),用于临时保留因为各种异样而导致 Consumer 端无奈生产的音讯。思考到异样复原起来须要一些工夫,会为重试队列设置多个重试级别,每个重试级别都有与之对应的从新投递延时,重试次数越多投递延时就越大。RocketMQ 对于重试音讯的解决是先保留至 Topic 名称为“SCHEDULE_TOPIC_XXXX” 的提早队列中,后盾定时工作依照对应的工夫进行 Delay 后从新保留至 “%RETRY%+consumerGroup” 的重试队列中。
死信队列
因为有些起因导致 Consumer 端长时间的无奈失常生产从 Broker 端 Pull 过去的业务音讯,为了确保音讯不会被无端的抛弃,那么超过配置的“最大重试生产次数”后就会移入到这个死信队列中。在 RocketMQ 中,SubscriptionGroupConfig 配置常量默认地设置了两个参数,一个是 retryQueueNums 为 1(重试队列数量为 1 个),另外一个是 retryMaxTimes 为 16(最大重试生产的次数为 16 次)。Broker 端通过校验判断,如果超过了最大重试生产次数则会将音讯移至这里所说的死信队列。这里,RocketMQ 会为每个生产组都设置一个 Topic 命名为“%DLQ%+consumerGroup”的死信队列。个别在理论利用中,移入至死信队列的音讯,须要人工干预解决;
Consumer 端回发消息至 Broker 端
在业务工程中的 Consumer 端(Push 生产模式下),如果音讯可能失常生产须要在注册的音讯监听回调办法中返回 CONSUME_SUCCESS 的生产状态,否则因为各类异样生产失败则返回 RECONSUME_LATER 的生产状态。生产状态的枚举类型如下所示:
COPYpublic enum ConsumeConcurrentlyStatus {
// 业务方生产胜利
CONSUME_SUCCESS,
// 业务方生产失败,之后进行从新尝试生产
RECONSUME_LATER;
}
如果业务工程对音讯生产失败了,那么则会抛出异样并且返回这里的 RECONSUME_LATER 状态。这里,在生产音讯的服务线程—consumeMessageService 中,将封装好的音讯生产工作 ConsumeRequest 提交至线程池—consumeExecutor 异步执行。从音讯生产工作 ConsumeRequest 的 run()办法中会执行业务工程中注册的音讯监听回调办法,并在 processConsumeResult 办法中依据业务工程返回的状态(CONSUME_SUCCESS 或者 RECONSUME_LATER)进行判断和做对应的解决。
CONSUME_SUCCESS
业务方失常生产
失常状况下,设置 ackIndex 的值为 consumeRequest.getMsgs().size() – 1,因而前面的遍历 consumeRequest.getMsgs()音讯汇合条件不成立,不会调用回发生产失败音讯至 Broker 端的办法—sendMessageBack(msg, context)。最初,更新生产的偏移量;
RECONSUME_LATER
业务方生产失败
异常情况下,设置 ackIndex 的值为 -1,这时就会进入到遍历 consumeRequest.getMsgs()音讯汇合的 for 循环中,执行回发消息的办法—sendMessageBack(msg, context)。这里,首先会依据 brokerName 失去 Broker 端的地址信息,而后通过网络通信的 Remoting 模块发送 RPC 申请到指定的 Broker 上,如果上述过程失败,则创立一条新的音讯从新发送给 Broker,此时新音讯的 Topic 为 “%RETRY%+ConsumeGroupName”—重试队列的主题。其中,在 MQClientAPIImpl 实例的 consumerSendMessageBack() 办法中封装了 ConsumerSendMsgBackRequestHeader 的申请体,随后实现回发生产失败音讯的 RPC 通信申请(业务申请码为:CONSUMER_SEND_MSG_BACK)。假使下面的回发消息流程失败,则会提早 5S 后从新在 Consumer 端进行从新生产。与失常生产的状况一样,在最初更新生产的偏移量;
Broker 端对于回发消息解决的次要流程
Broker 端收到这条 Consumer 端回发过来的音讯后,通过业务申请码(CONSUMER_SEND_MSG_BACK)匹配业务处理器—SendMessageProcessor 来解决。在实现一系列的前置校验(这里次要是“生产分组是否存在”、“查看 Broker 是否有写入权限”、“查看重试队列数是否大于 0”等)后,尝试获取重试队列的 TopicConfig 对象(如果是第一次无奈获取到,则调用 createTopicInSendMessageBackMethod()办法进行创立)。依据回发过来的音讯偏移量尝试从 commitlog 日志文件中查问音讯内容,若不存在则返回异样谬误。
而后,设置重试队列的 Topic—“%RETRY%+consumerGroup”至 MessageExt 的扩大属性“RETRY_TOPIC”中,并对依据提早级别 delayLevel 和最大重试生产次数 maxReconsumeTimes 进行判断,如果超过最大重试生产次数(默认 16 次),则会创立死信队列的 TopicConfig 对象(用于前面将回发过来的音讯移入死信队列)。在构建实现须要落盘的 MessageExtBrokerInner 对象后,调用“commitLog.putMessage(msg)”办法做音讯长久化。这里,须要留神的是,在 putMessage(msg)的办法里会应用 “SCHEDULE_TOPIC_XXXX” 和对应的提早级别队列 Id 别离替换 MessageExtBrokerInner 对象的 Topic 和 QueueId 属性值,并将原来设置的重试队列主题(“%RETRY%+consumerGroup”)的 Topic 和 QueueId 属性值做一个备份别离存入扩大属性 properties 的“REAL_TOPIC”和“REAL_QID”属性中。看到这里也就大抵明确了,回发给 Broker 端的生产失败的音讯并非间接保留至重试队列中,而是会先存至 Topic 为 “SCHEDULE_TOPIC_XXXX” 的定时提早队列中。
疑难:下面说了 RocketMQ 的重试队列的 Topic 是“%RETRY%+consumerGroup”,为啥这里要保留至 Topic 是“SCHEDULE_TOPIC_XXXX”的这个提早队列中呢?
在源码中搜寻下关键字—“SCHEDULE_TOPIC_XXXX”,会发现 Broker 端还存在着一个后盾服务线程—ScheduleMessageService(通过音讯存储服务—DefaultMessageStore 启动),通过查看源码能够晓得其中有一个 DeliverDelayedMessageTimerTask 定时工作线程会依据 Topic(“SCHEDULE_TOPIC_XXXX”)与 QueueId,先查到逻辑生产队列 ConsumeQueue,而后依据偏移量,找到 ConsumeQueue 中的内存映射对象,从 commitlog 日志中找到音讯对象 MessageExt,并做一个音讯体的转换(messageTimeup()办法,由定时提早队列音讯转化为重试队列的音讯),再次做长久化落盘,这时候才会真正的保留至重试队列中。看到这里就能够解释下面的疑难了,定时提早队列只是为了用于暂存的,而后提早一段时间再将音讯移入至重试队列中。RocketMQ 设定不同的延时级别 delayLevel,并且与定时提早队列绝对应,具体源码如下:
COPY// 省略
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
/**
* 定时延时音讯主题的队列与提早等级对应关系
* @param delayLevel
* @return
*/
public static int delayLevel2QueueId(final int delayLevel) {return delayLevel - 1;}
Consumer 端生产重试机制
每个 Consumer 实例在启动的时候就默认订阅了该生产组的重试队列主题,DefaultMQPushConsumerImpl 的 copySubscription()办法中的相干代码如下:
COPYprivate void copySubscription() throws MQClientException {
// 省略其余代码...
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:// 如果音讯生产模式为集群模式,还须要为该生产组对应一个重试主题
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
// 省略其余代码...
}
因而,这里也就分明了,Consumer 端会始终订阅该重试队列主题的音讯,向 Broker 端发送如下的拉取音讯的 PullRequest 申请,以尝试从新再次生产重试队列中积压的音讯。
COPYPullRequest [consumerGroup=CID_JODIE_1, messageQueue=MessageQueue [topic=%RETRY%CID_JODIE_1, brokerName=HQSKCJJIDRRD6KC, queueId=0], nextOffset=51]
最初,给出一张 RocketMQ 音讯重试机制的框图(ps:这里只是形容了音讯生产失败后重试拉取的局部重要过程):
本文由
传智教育博学谷狂野架构师
教研团队公布。如果本文对您有帮忙,欢送
关注
和点赞
;如果您有任何倡议也可留言评论
或私信
,您的反对是我保持创作的能源。转载请注明出处!