RocketMQ消费者保障
- 作者: 博学谷狂野架构师
GitHub:GitHub地址 (有我精心筹备的130本电子书PDF)
只分享干货、不吹水,让咱们一起加油!
音讯确认机制
consumer的每个实例是靠队列调配来决定如何生产音讯的。那么生产进度具体是如何治理的,又是如何保障音讯胜利生产的?(RocketMQ有保障音讯必定生产胜利的个性,失败则重试)
什么是ACK
音讯确认机制
在理论应用RocketMQ的时候咱们并不能保障每次发送的音讯都刚好能被消费者一次性失常生产胜利,可能会存在须要屡次生产能力胜利或者始终生产失败的状况,那作为发送者该做如何解决呢?
为了保证数据不被失落,RocketMQ反对音讯确认机制,即ack。发送者为了保障音讯必定生产胜利,只有应用方明确示意生产胜利,RocketMQ才会认为音讯生产胜利。中途断电,抛出异样等都不会认为胜利——即都会从新投递。
保证数据能被正确处理而不仅仅是被Consumer收到,咱们就不能采纳no-ack或者auto-ack,咱们须要手动ack(manual-ack)。在数据处理实现后手动发送ack,这个时候Server才将Message删除。
RocketMQ ACK
因为以上工作所有的机制都实现在PushConsumer中,所以本文的原理均只实用于RocketMQ中的PushConsumer即Java客户端中的DefaultPushConsumer
。 若应用了PullConsumer模式,相似的工作如何ack,如何保障生产等均须要应用方本人实现。
注:播送生产和集群生产的解决有局部区别,以下均特指集群生产(CLSUTER),播送(BROADCASTING)下局部可能不实用。
保障生产胜利
PushConsumer为了保障音讯必定生产胜利,只有应用方明确示意生产胜利,RocketMQ才会认为音讯生产胜利。中途断电,抛出异样等都不会认为胜利——即都会从新投递。
代码示例
生产的时候,咱们须要注入一个生产回调,具体sample代码如下:
COPYconsumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); execute();//执行真正生产 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
业务实现生产回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
,RocketMQ才会认为这批音讯(默认是1条)是生产实现的。
如果这时候音讯生产失败,例如数据库异样,余额有余扣款失败等所有业务认为音讯须要重试的场景,只有返回ConsumeConcurrentlyStatus.RECONSUME_LATER
,RocketMQ就会认为这批音讯生产失败了。
为了保障音讯是必定被至多生产胜利一次,RocketMQ会把这批音讯重发回Broker(topic不是原topic而是这个生产租的RETRY topic),在提早的某个工夫点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果始终这样反复生产都继续失败到肯定次数(默认16次),就会投递到DLQ死信队列。利用能够监控死信队列来做人工干预。
ACK进度保留
启动的时候从哪里生产
当新实例启动的时候,PushConsumer会拿到本生产组broker曾经记录好的生产进度(consumer offset),依照这个进度发动本人的第一次Pull申请。
如果这个生产进度在Broker并没有存储起来,证实这个是一个全新的生产组,这时候客户端有几个策略能够抉择:
COPYCONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始生产,即跳过历史音讯CONSUME_FROM_FIRST_OFFSET //从队列最开始开始生产,即历史音讯(还贮存在broker的)全副生产一遍CONSUME_FROM_TIMESTAMP//从某个工夫点开始生产,和setConsumeTimestamp()配合应用,默认是半个小时以前
所以,社区中常常有人问:“为什么我设了CONSUME_FROM_LAST_OFFSET
,历史的音讯还是被生产了”? 起因就在于只有全新的生产组才会应用到这些策略,老的生产组都是按曾经存储过的生产进度持续生产。
对于老生产组想跳过历史音讯须要本身做过滤,或者应用先批改生产进度
音讯ACK生产进度
RocketMQ是以consumer group+queue为单位是治理生产进度的,以一个consumer offset标记这个这个生产组在这条queue上的生产进度。
如果某已存在的生产组呈现了新生产实例的时候,依附这个组的生产进度,就能够判断第一次是从哪里开始拉取的,每次音讯胜利后,本地的生产进度会被更新,而后由定时器定时同步到broker,以此长久化生产进度。
然而每次记录生产进度的时候,只会把一批音讯中最小的offset值为生产进度值,如下图:
这钟形式和传统的一条message独自ack的形式有实质的区别。性能上晋升的同时,会带来一个潜在的反复问题——因为生产进度只是记录了一个下标,就可能呈现拉取了100条音讯如 2101-2200的音讯,前面99条都生产完结了,只有2101生产始终没有完结的状况。
在这种状况下,RocketMQ为了保障音讯必定被生产胜利,生产进度职能维持在2101,直到2101也生产完结了,本地的生产进度能力标记2200生产完结了(注:consumerOffset=2201)。
反复生产
在这种设计下,就有生产大量反复的危险。如2101在还没有生产实现的时候生产实例忽然退出(机器断电,或者被kill)。这条queue的生产进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的生产进度还是维持在2101,这时候就会又从2101开始生产,2102-2200这批音讯实际上曾经被生产过还是会投递一次。
对于这个场景,RocketMQ临时无能为力,所以业务必须要保障音讯生产的幂等性,这也是RocketMQ官网屡次强调的态度。
实际上,从源码的角度上看,RocketMQ可能是思考过这个问题的,截止到3.2.6的版本的源码中,能够看到为了缓解这个问题的影响面,DefaultMQPushConsumer
中有个配置consumeConcurrentlyMaxSpan
COPY/** * Concurrently max span offset.it has no effect on sequential consumption */private int consumeConcurrentlyMaxSpan = 2000;
这个值默认是2000,当RocketMQ发现本地缓存的音讯的最大值-最小值差距大于这个值(2000)的时候,会触发流控——也就是说如果头尾都卡住了局部音讯,达到了这个阈值就不再拉取音讯。
但作用理论很无限,像刚刚这个例子,2101的生产是死循环,其余生产十分失常的话,是无能为力的。一旦退出,在不人工干预的状况下,2101后所有音讯全副反复!
Ack卡进度解决方案
实际上对于卡住进度的场景,能够抉择弃车保帅的计划:把音讯卡住那些音讯,先ack掉,让进度前移。但要保障这条音讯不会因而失落,ack之前要把音讯sendBack回去,这样这条卡住的音讯就会必然反复,但会解决潜在的大量反复的场景。 这也是咱们公司本人定制的解决方案。
局部源码如下:
COPYclass ConsumeRequestWithUnAck implements Runnable { final ConsumeRequest consumeRequest; final long resendAfterIfStillUnAck;//n毫秒没有生产完,就重发 ConsumeRequestWithUnAck(ConsumeRequest consumeRequest,long resendAfterIfStillUnAck) { this.consumeRequest = consumeRequest; this.resendAfterIfStillUnAck = resendAfterIfStillUnAck; } @Override public void run() { //每次生产前,打算延时工作,超时则ack并重发 final WeakReference<ConsumeRequest> crReff = new WeakReference<>(this.consumeRequest); ScheduledFuture scheduledFuture=null; if(!ConsumeDispatcher.this.ackAndResendScheduler.isShutdown()) { scheduledFuture= ConsumeDispatcher.this.ackAndResendScheduler.schedule(new ConsumeTooLongChecker(crReff),resendAfterIfStillUnAck,TimeUnit.MILLISECONDS); } try{ this.consumeRequest.run();//失常执行并更新offset } finally { if (scheduledFuture != null) scheduledFuture.cancel(false);//生产完结后,勾销工作 } } }
- 定义了一个装璜器,把原来的ConsumeRequest对象包了一层。
装璜器中,每条音讯生产前都会调度一个调度器,定时触发,触发的时候如果发现音讯还存在,就执行sendback并ack的操作。
起初RocketMQ显然也发现了这个问题,RocketMQ在3.5.8之后也是采纳这样的计划去解决这个问题。只是实现形式上有所不同(事实上我认为RocketMQ的计划还不够欠缺)
- 在pushConsumer中 有一个
consumeTimeout
字段(默认15分钟),用于设置最大的生产超时工夫。生产前会记录一个生产的开始工夫,前面用于比对。 - 消费者启动的时候,会定期扫描所有生产的音讯,达到这个timeout的那些音讯,就会触发sendBack并ack的操作。这里扫描的距离也是consumeTimeout(单位分钟)的距离。
外围源码如下:
COPY//ConsumeMessageConcurrentlyService.javapublic void start() { this.CleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { @Override public void run() { cleanExpireMsg(); } }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);}//ConsumeMessageConcurrentlyService.javaprivate void cleanExpireMsg() { Iterator<Map.Entry<MessageQueue, ProcessQueue>> it = this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator(); while (it.hasNext()) { Map.Entry<MessageQueue, ProcessQueue> next = it.next(); ProcessQueue pq = next.getValue(); pq.cleanExpiredMsg(this.defaultMQPushConsumer); }} //ProcessQueue.javapublic void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) { if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) { return; } int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16; for (int i = 0; i < loop; i++) { MessageExt msg = null; try { this.lockTreeMap.readLock().lockInterruptibly(); try { if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) { msg = msgTreeMap.firstEntry().getValue(); } else { break; } } finally { this.lockTreeMap.readLock().unlock(); } } catch (InterruptedException e) { log.error("getExpiredMsg exception", e); } try { pushConsumer.sendMessageBack(msg, 3); log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); try { this.lockTreeMap.writeLock().lockInterruptibly(); try { if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) { try { msgTreeMap.remove(msgTreeMap.firstKey()); } catch (Exception e) { log.error("send expired msg exception", e); } } } finally { this.lockTreeMap.writeLock().unlock(); } } catch (InterruptedException e) { log.error("getExpiredMsg exception", e); } } catch (Exception e) { log.error("send expired msg exception", e); } }}
通过这个逻辑比照我定制的工夫,能够看出有几个不太欠缺的问题:
- 生产timeout的工夫十分不准确。因为扫描的距离是15分钟,所以实际上触发的时候,音讯是有可能卡住了靠近30分钟(15*2)才被清理。
- 因为定时器一启动就开始调度了,中途这个consumeTimeout再更新也不会失效。
音讯重试
程序音讯的重试
对于程序音讯,当消费者生产音讯失败后,音讯队列RocketMQ版会主动一直地进行音讯重试(每次间隔时间为1秒),这时,利用会呈现音讯生产被阻塞的状况。因而,建议您应用程序音讯时,务必保障利用可能及时监控并解决生产失败的状况,防止阻塞景象的产生。
无序音讯的重试
对于无序音讯(一般、定时、延时、事务音讯),当消费者生产音讯失败时,您能够通过设置返回状态达到音讯重试的后果。
无序音讯的重试只针对集群生产形式失效;播送形式不提供失败重试个性,即生产失败后,失败音讯不再重试,持续生产新的音讯。
留神 以下内容都只针对无序音讯失效。
重试次数
音讯队列RocketMQ版默认容许每条音讯最多重试16次,每次重试的间隔时间如下。
第几次重试 | 与上次重试的间隔时间 | 第几次重试 | 与上次重试的间隔时间 |
---|---|---|---|
1 | 10秒 | 9 | 7分钟 |
2 | 30秒 | 10 | 8分钟 |
3 | 1分钟 | 11 | 9分钟 |
4 | 2分钟 | 12 | 10分钟 |
5 | 3分钟 | 13 | 20分钟 |
6 | 4分钟 | 14 | 30分钟 |
7 | 5分钟 | 15 | 1小时 |
8 | 6分钟 | 16 | 2小时 |
如果音讯重试16次后依然失败,音讯将不再投递。如果严格依照上述重试工夫距离计算,某条音讯在始终生产失败的前提下,将会在接下来的4小时46分钟之内进行16次重试,超过这个工夫范畴音讯将不再重试投递。
留神 一条音讯无论重试多少次,这些重试音讯的Message ID不会扭转。
和生产端重试区别
消费者和生产者的重试还是有区别的,次要有两点
- 默认重试次数:Product默认是2次,而Consumer默认是16次。
- 重试工夫距离:Product是立即重试,而Consumer是有肯定工夫距离的。它照
1S,5S,10S,30S,1M,2M····2H
进行重试。 - Product在异步状况重试生效,而对于Consumer在播送状况下重试生效。
配置形式
重试配置形式
生产失败后,重试配置形式,集群生产形式下,音讯生产失败后冀望音讯重试,须要在音讯监听器接口的实现中明确进行配置(三种形式任选一种):
- 形式1:返回RECONSUME_LATER(举荐)
- 形式2:返回Null
- 形式3:抛出异样
示例代码
COPY//注册音讯监听器consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) { //音讯解决逻辑抛出异样,音讯将重试。 doConsumeMessage(list); //形式1:返回ConsumeConcurrentlyStatus.RECONSUME_LATER,音讯将重试。 return ConsumeConcurrentlyStatus.RECONSUME_LATER; //形式2:返回null,音讯将重试。 // return null; //形式3:间接抛出异样,音讯将重试。 // throw new RuntimeException("Consumer Message exception"); }});
无需重试的配置形式
集群生产形式下,音讯失败后冀望音讯不重试,须要捕捉生产逻辑中可能抛出的异样,最终返回Action.CommitMessage,尔后这条音讯将不会再重试。
示例代码
COPY//注册音讯监听器consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) { //音讯解决逻辑抛出异样,音讯将重试。 try { doConsumeMessage(list); }catch (Exception e){ //捕捉生产逻辑中的所有异样,并返回Action.CommitMessage; return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //业务方失常生产 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});
获取音讯重试次数
消费者收到音讯后,可依照以下形式获取音讯的重试次数:
COPY//注册音讯监听器consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) { doConsumeMessage(list); //获取音讯重试次数 int retryTimes = list.get(0).getReconsumeTimes(); //业务方失常生产 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});
成果演示
生产端只发送一条音讯进行测试重试
生产端被动回绝
演示代码
COPYpublic class RetryConsumer { public static void main(String[] args) throws Exception { //创立一个音讯消费者,并设置一个音讯消费者组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocket_test_consumer_group"); //指定 NameServer 地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //设置 Consumer 第一次启动时从队列头部开始生产还是队列尾部开始生产 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //订阅指定 Topic 下的所有音讯 consumer.subscribe("topicTest", "*"); //注册音讯监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) { if (list != null) { for (MessageExt ext : list) { //获取音讯重试次数 int retryTimes = ext.getReconsumeTimes(); try { String message = new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET); System.out.println("Consumer-线程名称=[" + Thread.currentThread().getId() + "],音讯重试次数:[" + retryTimes + "],接管工夫:[" + new Date().getTime() + "],音讯=[" + message + "]"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } } //业务被动回绝音讯 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); // 消费者对象在应用之前必须要调用 start 初始化 consumer.start(); System.out.println("音讯消费者已启动"); }}
重试音讯
生产端返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
,生产端就会一直的重试。
COPYConsumer-线程名称=[35],音讯重试次数:[0],接管工夫:[1608117780264],音讯=[Hello Java demo RocketMQ ]Consumer-线程名称=[36],音讯重试次数:[1],接管工夫:[1608117790840],音讯=[Hello Java demo RocketMQ ]Consumer-线程名称=[37],音讯重试次数:[2],接管工夫:[1608117820876],音讯=[Hello Java demo RocketMQ ]
异样重试
演示代码
这里的代码意思很显著: 被动抛出一个异样,而后如果超过3次,那么就不持续重试上来,而是将该条记录保留到数据库由人工来兜底。
COPYpublic class RetryConsumer { public static void main(String[] args) throws Exception { //创立一个音讯消费者,并设置一个音讯消费者组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocket_test_consumer_group"); //指定 NameServer 地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //设置 Consumer 第一次启动时从队列头部开始生产还是队列尾部开始生产 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //订阅指定 Topic 下的所有音讯 consumer.subscribe("topicTest", "*"); //注册音讯监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) { if (list != null) { for (MessageExt ext : list) { //获取音讯重试次数 int retryTimes = ext.getReconsumeTimes(); try { String message = new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET); System.out.println("Consumer-线程名称=[" + Thread.currentThread().getId() + "],音讯重试次数:[" + retryTimes + "],接管工夫:[" + new Date().getTime() + "],音讯=[" + message + "]"); //这里设置重试大于3次 那么通过保留数据库 人工来兜底 if (retryTimes >= 2) { System.out.println("该音讯曾经重试3次,保留数据库。topic=[" + ext.getTags() + "],keys=[" + ext.getKeys() + "],msg=[" + message + "]"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } } //被动抛出异样 throw new RuntimeException("=======这里出错了============"); //业务方失常生产 //return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 消费者对象在应用之前必须要调用 start 初始化 consumer.start(); System.out.println("音讯消费者已启动"); }}
重试音讯
COPYConsumer-线程名称=[36],音讯重试次数:[0],接管工夫:[1608118304600],音讯=[Hello Java demo RocketMQ ]Consumer-线程名称=[37],音讯重试次数:[1],接管工夫:[1608118315165],音讯=[Hello Java demo RocketMQ ]Consumer-线程名称=[38],音讯重试次数:[2],接管工夫:[1608118345191],音讯=[Hello Java demo RocketMQ ]该音讯曾经重试3次,保留数据库。topic=[TagA],keys=[null],msg=[Hello Java demo RocketMQ ]
超时重试
这里的超时异样并非真正意义上的超时,它指的是指获取音讯后,因为某种原因没有给RocketMQ返回生产的状态,即没有return ConsumeConcurrentlyStatus.CONSUME_SUCCESS
或return ConsumeConcurrentlyStatus.RECONSUME_LATER
那么 RocketMQ会认为该音讯没有发送,会始终发送。因为它会认为该音讯基本就没有发送给消费者,所以必定没生产。
演示代码
COPYpublic class RetryConsumer { public static void main(String[] args) throws Exception { //创立一个音讯消费者,并设置一个音讯消费者组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocket_test_consumer_group"); //指定 NameServer 地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //设置 Consumer 第一次启动时从队列头部开始生产还是队列尾部开始生产 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //订阅指定 Topic 下的所有音讯 consumer.subscribe("topicTest", "*"); //注册音讯监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) { if (list != null) { for (MessageExt ext : list) { //获取音讯重试次数 int retryTimes = ext.getReconsumeTimes(); try { String message = new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET); System.out.println("Consumer-线程名称=[" + Thread.currentThread().getId() + "],音讯重试次数:[" + retryTimes + "],接管工夫:[" + new Date().getTime() + "],音讯=[" + message + "]"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } } //这里睡眠60秒 try { TimeUnit.SECONDS.sleep(60); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("休眠60秒 看还能不能走到这里..."); // 业务方失常生产 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 消费者对象在应用之前必须要调用 start 初始化 consumer.start(); System.out.println("音讯消费者已启动"); }}
重试音讯
当取得 以后生产重试次数为 = 0 后 , 关掉该过程。再重新启动该过程,那么仍然可能获取该条音讯
COPYConsumer-线程名称=[33],音讯重试次数:[0],接管工夫:[1608118652598],音讯=[Hello Java demo RocketMQ ]
重启消费者
COPYConsumer-线程名称=[28],音讯重试次数:[0],接管工夫:[1608118683304],音讯=[Hello Java demo RocketMQ ]休眠60秒 看还能不能走到这里...
重试音讯的解决
个别状况下咱们在理论生产中是不须要重试16次,这样既浪费时间又节约性能,实践受骗尝试反复次数达到咱们想要的后果时如果还是生产失败,那么咱们须要将对应的音讯进行记录,并且完结反复尝试。
COPYpublic class RetryConsumer { public static void main(String[] args) throws Exception { //创立一个音讯消费者,并设置一个音讯消费者组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocket_test_consumer_group"); //指定 NameServer 地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //设置 Consumer 第一次启动时从队列头部开始生产还是队列尾部开始生产 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //订阅指定 Topic 下的所有音讯 consumer.subscribe("topicTest", "*"); //注册音讯监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) { if (list != null) { for (MessageExt ext : list) { //获取音讯重试次数 int retryTimes = ext.getReconsumeTimes(); try { String message = new String(ext.getBody(), RemotingHelper.DEFAULT_CHARSET); System.out.println("Consumer-线程名称=[" + Thread.currentThread().getId() + "],音讯重试次数:[" + retryTimes + "],接管工夫:[" + new Date().getTime() + "],音讯=[" + message + "]"); //这里设置重试大于3次 那么通过保留数据库 人工来兜底 if (retryTimes >= 2) { System.out.println("该音讯曾经重试3次,保留数据库。topic=[" + ext.getTags() + "],keys=[" + ext.getKeys() + "],msg=[" + message + "]"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } } //被动抛出异样 throw new RuntimeException("=======这里出错了============"); //业务方失常生产 //return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 消费者对象在应用之前必须要调用 start 初始化 consumer.start(); System.out.println("音讯消费者已启动"); }}
所以任何异样都要捕捉返回ConsumeConcurrentlyStatus.RECONSUME_LATER,rocketmq会放到重试队列,这个重试TOPIC的名字是%RETRY%+consumergroup的名字,如下图:
留神点
- 如果业务的回调没有解决好而抛出异样,会认为是生产失败当ConsumeConcurrentlyStatus.RECONSUME_LATER解决。
- 当应用程序生产的回调MessageListenerOrderly时,因为程序生产是要前者生产胜利能力持续生产,所以没有ConsumeConcurrentlyStatus.RECONSUME_LATER的这个状态,只有ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余生产,直到原音讯一直重试胜利为止能力持续生产。
RocketMQ重试流程
重试队列与死信队列
在介绍RocketMQ的生产重试机制之前,须要先来说下“重试队列”和“死信队列”两个概念。
重试队列
如果Consumer端因为各种类型异样导致本次生产失败,为避免该音讯失落而须要将其从新回发给Broker端保留,保留这种因为异样无奈失常生产而回发给MQ的音讯队列称之为重试队列。RocketMQ会为每个生产组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里须要留神的是,这个Topic的重试队列是针对生产组,而不是针对每个Topic设置的),用于临时保留因为各种异样而导致Consumer端无奈生产的音讯。思考到异样复原起来须要一些工夫,会为重试队列设置多个重试级别,每个重试级别都有与之对应的从新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试音讯的解决是先保留至Topic名称为“SCHEDULE_TOPIC_XXXX”的提早队列中,后盾定时工作依照对应的工夫进行Delay后从新保留至“%RETRY%+consumerGroup”的重试队列中。
死信队列
因为有些起因导致Consumer端长时间的无奈失常生产从Broker端Pull过去的业务音讯,为了确保音讯不会被无端的抛弃,那么超过配置的“最大重试生产次数”后就会移入到这个死信队列中。在RocketMQ中,SubscriptionGroupConfig配置常量默认地设置了两个参数,一个是retryQueueNums为1(重试队列数量为1个),另外一个是retryMaxTimes为16(最大重试生产的次数为16次)。Broker端通过校验判断,如果超过了最大重试生产次数则会将音讯移至这里所说的死信队列。这里,RocketMQ会为每个生产组都设置一个Topic命名为“%DLQ%+consumerGroup”的死信队列。个别在理论利用中,移入至死信队列的音讯,须要人工干预解决;
Consumer端回发消息至Broker端
在业务工程中的Consumer端(Push生产模式下),如果音讯可能失常生产须要在注册的音讯监听回调办法中返回CONSUME_SUCCESS的生产状态,否则因为各类异样生产失败则返回RECONSUME_LATER的生产状态。生产状态的枚举类型如下所示:
COPYpublic enum ConsumeConcurrentlyStatus { //业务方生产胜利 CONSUME_SUCCESS, //业务方生产失败,之后进行从新尝试生产 RECONSUME_LATER;}
如果业务工程对音讯生产失败了,那么则会抛出异样并且返回这里的RECONSUME_LATER状态。这里,在生产音讯的服务线程—consumeMessageService中,将封装好的音讯生产工作ConsumeRequest提交至线程池—consumeExecutor异步执行。从音讯生产工作ConsumeRequest的run()办法中会执行业务工程中注册的音讯监听回调办法,并在processConsumeResult办法中依据业务工程返回的状态(CONSUME_SUCCESS或者RECONSUME_LATER)进行判断和做对应的解决。
CONSUME_SUCCESS
业务方失常生产
失常状况下,设置ackIndex的值为consumeRequest.getMsgs().size() - 1,因而前面的遍历consumeRequest.getMsgs()音讯汇合条件不成立,不会调用回发生产失败音讯至Broker端的办法—sendMessageBack(msg, context)。最初,更新生产的偏移量;
RECONSUME_LATER
业务方生产失败
异常情况下,设置ackIndex的值为-1,这时就会进入到遍历consumeRequest.getMsgs()音讯汇合的for循环中,执行回发消息的办法—sendMessageBack(msg, context)。这里,首先会依据brokerName失去Broker端的地址信息,而后通过网络通信的Remoting模块发送RPC申请到指定的Broker上,如果上述过程失败,则创立一条新的音讯从新发送给Broker,此时新音讯的Topic为“%RETRY%+ConsumeGroupName”—重试队列的主题。其中,在MQClientAPIImpl实例的consumerSendMessageBack()办法中封装了ConsumerSendMsgBackRequestHeader的申请体,随后实现回发生产失败音讯的RPC通信申请(业务申请码为:CONSUMER_SEND_MSG_BACK)。假使下面的回发消息流程失败,则会提早5S后从新在Consumer端进行从新生产。与失常生产的状况一样,在最初更新生产的偏移量;
Broker端对于回发消息解决的次要流程
Broker端收到这条Consumer端回发过来的音讯后,通过业务申请码(CONSUMER_SEND_MSG_BACK)匹配业务处理器—SendMessageProcessor来解决。在实现一系列的前置校验(这里次要是“生产分组是否存在”、“查看Broker是否有写入权限”、“查看重试队列数是否大于0”等)后,尝试获取重试队列的TopicConfig对象(如果是第一次无奈获取到,则调用createTopicInSendMessageBackMethod()办法进行创立)。依据回发过来的音讯偏移量尝试从commitlog日志文件中查问音讯内容,若不存在则返回异样谬误。
而后,设置重试队列的Topic—“%RETRY%+consumerGroup”至MessageExt的扩大属性“RETRY_TOPIC”中,并对依据提早级别delayLevel和最大重试生产次数maxReconsumeTimes进行判断,如果超过最大重试生产次数(默认16次),则会创立死信队列的TopicConfig对象(用于前面将回发过来的音讯移入死信队列)。在构建实现须要落盘的MessageExtBrokerInner对象后,调用“commitLog.putMessage(msg)”办法做音讯长久化。这里,须要留神的是,在putMessage(msg)的办法里会应用“SCHEDULE_TOPIC_XXXX”和对应的提早级别队列Id别离替换MessageExtBrokerInner对象的Topic和QueueId属性值,并将原来设置的重试队列主题(“%RETRY%+consumerGroup”)的Topic和QueueId属性值做一个备份别离存入扩大属性properties的“REAL_TOPIC”和“REAL_QID”属性中。看到这里也就大抵明确了,回发给Broker端的生产失败的音讯并非间接保留至重试队列中,而是会先存至Topic为“SCHEDULE_TOPIC_XXXX”的定时提早队列中。
疑难:下面说了RocketMQ的重试队列的Topic是“%RETRY%+consumerGroup”,为啥这里要保留至Topic是“SCHEDULE_TOPIC_XXXX”的这个提早队列中呢?
在源码中搜寻下关键字—“SCHEDULE_TOPIC_XXXX”,会发现Broker端还存在着一个后盾服务线程—ScheduleMessageService(通过音讯存储服务—DefaultMessageStore启动),通过查看源码能够晓得其中有一个DeliverDelayedMessageTimerTask定时工作线程会依据Topic(“SCHEDULE_TOPIC_XXXX”)与QueueId,先查到逻辑生产队列ConsumeQueue,而后依据偏移量,找到ConsumeQueue中的内存映射对象,从commitlog日志中找到音讯对象MessageExt,并做一个音讯体的转换(messageTimeup()办法,由定时提早队列音讯转化为重试队列的音讯),再次做长久化落盘,这时候才会真正的保留至重试队列中。看到这里就能够解释下面的疑难了,定时提早队列只是为了用于暂存的,而后提早一段时间再将音讯移入至重试队列中。RocketMQ设定不同的延时级别delayLevel,并且与定时提早队列绝对应,具体源码如下:
COPY//省略private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";/** * 定时延时音讯主题的队列与提早等级对应关系 * @param delayLevel * @return */public static int delayLevel2QueueId(final int delayLevel) { return delayLevel - 1;}
Consumer端生产重试机制
每个Consumer实例在启动的时候就默认订阅了该生产组的重试队列主题,DefaultMQPushConsumerImpl的copySubscription()办法中的相干代码如下:
COPYprivate void copySubscription() throws MQClientException { //省略其余代码... switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: break; case CLUSTERING://如果音讯生产模式为集群模式,还须要为该生产组对应一个重试主题 final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), retryTopic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break; default: break; } //省略其余代码... }
因而,这里也就分明了,Consumer端会始终订阅该重试队列主题的音讯,向Broker端发送如下的拉取音讯的PullRequest申请,以尝试从新再次生产重试队列中积压的音讯。
COPYPullRequest [consumerGroup=CID_JODIE_1, messageQueue=MessageQueue [topic=%RETRY%CID_JODIE_1, brokerName=HQSKCJJIDRRD6KC, queueId=0], nextOffset=51]
最初,给出一张RocketMQ音讯重试机制的框图(ps:这里只是形容了音讯生产失败后重试拉取的局部重要过程):
本文由
传智教育博学谷狂野架构师
教研团队公布。如果本文对您有帮忙,欢送
关注
和点赞
;如果您有任何倡议也可留言评论
或私信
,您的反对是我保持创作的能源。转载请注明出处!