大家好,明天来聊一聊 RocketMQ 客户端音讯生产失败,怎么办?
上面是 RocketMQ 推出模式的一段代码:
public static void main(String[] args) throws InterruptedException, MQClientException {Tracer tracer = initTracer();
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageOpenTracingHookImpl(tracer));
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try{System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
}catch (Exception e){return ConsumeConcurrentlyStatus.RECONSUME_LATER;}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();}
从这段代码能够看出,消费者生产完音讯后会返回一个生产状态,那生产状态有哪些呢?参见类 ConsumeConcurrentlyStatus 中定义:
- 生产胜利,返回 CONSUME_SUCCESS;
- 生产失败,返回 RECONSUME_LATER。
上面代码就是返回下面两个状态的逻辑,对于生产状态,如果返回 null,会给它赋值 RECONSUME_LATER,解决逻辑如下:
//ConsumeRequest 类
public void run() {
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
// 省略局部逻辑
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
// 省略局部逻辑
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {}
// 省略局部逻辑
if (null == status) {
// 省略日志
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 省略局部逻辑
if (!processQueue.isDropped()) {ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {}}
这部分代码的 UML 类图如下:
下面代码中的 processConsumeResult 办法就是生产失败后客户端的解决逻辑:
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
//ackIndex 初始值是 Integer.MAX_VALUE;
int ackIndex = context.getAckIndex();
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1;}
// 省略局部逻辑
break;
case RECONSUME_LATER:
ackIndex = -1;
// 省略局部逻辑
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
// 播送模式下这里只打印日志
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {consumeRequest.getMsgs().removeAll(msgBackFailed);
// 发送回 Broker 失败的音讯,5s 后再次生产
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
// 更新本地保留的偏移量
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
1 生产胜利
下面的代码逻辑中,如果生产胜利,ackIndex 变量的值就是音讯数量缩小少 1,所以下面的 switch 逻辑是不会执行的,因为 播送模式下,只是打印一段日志(没有其余逻辑),而集群模式下,for 循环的起始 i 变量曾经等于音讯数量,循环外面的代码不会执行。
因而,如果音讯生产胜利,只会走最上面的逻辑,更新本地保留的音讯偏移量。
2 生产失败
ackIndex 变量值等于 -1。
2.1 播送模式
在生产失败的状况下,播送模式的代码只是打印了一段日志,之后更新了本地保留的音讯偏移量,因而咱们晓得 播送模式音讯生产失败后就不会从新生产了,相当于抛弃了音讯。
2.2 集群模式
从下面代码的 for 循环中,会把所有的音讯都发送回去去 Broker,这样这批音讯还能再次被拉取到进行生产。
对于发送给 Broker 失败的音讯,会提早 5s 后再次生产。代码如下:
private void submitConsumeRequestLater(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue
) {this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true);
}
}, 5000, TimeUnit.MILLISECONDS);
}
更新本地保留的音讯偏移量时,会从音讯列表中发送回 Broker 失败的音讯先删除掉。
留神:从下面逻辑能够看到,在拉取到一批音讯进行生产时,只有有一条音讯生产失败,这批音讯都会进行重试,因而生产端做好幂等是必要的。
上面再看一下发送失败的音讯给 Broker 的代码是,发送音讯时,申请的 code 码是 CONSUMER_SEND_MSG_BACK。依据这个申请码就能找 Broker 端的解决逻辑。
如果发送回 Broker 时抛出异样,须要从新发送一个新的音讯,这里有四点须要留神:
- 新音讯的 Topic 变成【%RETRY% + consumerGroup】;
- 新音讯的 RETRY_TOPIC 这个属性赋值为之前的 Topic;
- 新音讯的重试次数属性加 1;
- 新音讯的 DELAY 属性等于重试次数 + 3.
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
//Topic 变成 %RETRY% + consumerGroup
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
//RETRY_TOPIC 赋值
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
// 重试次数 +1
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
// 最大重试次数
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
//DELAY = 重试次数 + 3
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
} finally {msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}
2.3 Broker 解决
下面曾经讲过,对于解决失败的音讯,生产端会发送回 Broker,不过这里有一点须要留神,发送回 Broker 时,音讯的 Topic 变成【”%RETRY%” + namespace + “%” + 原始 topic】,封装逻辑在源码 ClientConfig.withNamespace。
依据申请码 CONSUMER_SEND_MSG_BACK 能够定位到 Broker 的解决逻辑在类 SendMessageProcessor,办法 asyncConsumerSendMsgBack。
2.3.1 进死信队列
如果重试次数超过了最大重试次数(默认 16 次),或者 delayLevel 值小于 0,则音讯进死信队列,死信队列的 Topic 为【”%DLQ%” + 生产组】,代码如下:
//asyncConsumerSendMsgBack 办法
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE | PermName.PERM_READ, 0);
msgExt.setDelayTimeLevel(0);
}
2.3.2 发送 CommitLog
如果提早级别(DELAY)等于 0,则提早级别就等于重试次数加 3。
有个中央须要留神,发送到提早队列的音讯从新进行了封装,封装这个音讯用的并不是客户端发来的那个音讯,而是从 CommitLog 依据偏移量查找的,代码如下:
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed," + requestHeader.getOffset());
return CompletableFuture.completedFuture(response);
}
如果查问失败,就会给客户端返回零碎谬误。
这里有个重要的细节,这个音讯写入 CommitLog 时,会判断 DELAY 是否大于 0,如果大于 0,就会批改 Topic。代码如下:
//CommitLog 类 asyncPutMessage 办法
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
// 从源码看,这里最大值是 18
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
//queueId = delayLevel - 1
int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
这里把 Topic 批改为 SCHEDULE_TOPIC_XXXX,供延时队列来调度。进入延时队列后,延时队列会依照上面的工夫进行调度:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
下面代码能够看到,延时音讯的调度有 18 个等级,最小的 1s,最大的 2h。而从上面的代码咱们能够看到,调度应用第三个等级开始的:
if (0 == delayLevel) {delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
2.3.3 延时队列
延时队列的代码逻辑在类 ScheduleMessageService,这里的 start 办法触发延时队列的调度,而 start 办法的业务入口在 BrokerStartup 的初始化。
首先,会计算出每个延时等级对应的延时工夫(解决到 ms 级别),放到 delayLevelTable,它是一个 ConcurrentHashMap,而后创立一个外围线程数等于 18 的定时线程池,顺次对每个级别的延时进行调度。这个工作启动后,会每 100ms 执行一次。代码如下:
public void start() {if (started.compareAndSet(false, true)) {this.load();
this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
// 省略异步
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {offset = 0L;}
if (timeDelay != null) {
// 省略异步
this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
}
// 省略其余逻辑
}
}
调度逻辑中,首先依据 Topic 和 queueId 找到对应的生产队列,而后从外面间断读取音讯:
public void executeOnTimeup() {
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
// 省略空解决
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
// 省略空解决
long nextOffset = this.offset;
try {
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
//CQ_STORE_UNIT_SIZE = 20, 因为 ConsumeQueue 中一个元素占 20 字节
for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
//offset 占 8 个字节
long offsetPy = bufferCQ.getByteBuffer().getLong();
// 音讯大小占 4 个字节
int sizePy = bufferCQ.getByteBuffer().getInt();
//ConsumeQueue 中 tagsCode 是一个投递工夫点
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {if (cq.getExt(tagsCode, cqExtUnit)) {tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown > 0) {
// 工夫未到,期待下次调度
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
return;
}
MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
// 省略事务音讯
boolean deliverSuc;
// 同步异步都有,只保留同步代码
deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);
}
nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
} catch (Exception e) { } finally {bufferCQ.release();
}
//DELAY_FOR_A_WHILE 是 100ms
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}
因为 messageTimeup 办法应用了原始的 Topic 和 QueueId 新建了音讯,所以下面的 syncDeliver 形式是将音讯从新投递到原始的队列中,这样消费者能够再次拉取到这条音讯进行生产。留神:下面 ConsumeQueue 的 tagsCode 是一个工夫点,很容易误会为是 tag 的 hashCode,MessageQueue 的存储元素中最初 8 字节的确是 tag 的 hashCode。
3 总结
消费者生产失败后,会把生产发回给 Broker 进行解决。下图是客户端解决流程:
Broker 收到音讯后,会把音讯从新发送到 CommitLog,发送到 CommitLog 之前,首先会批改 Topic 为 SCHEDULE_TOPIC_XXXX,这样就发送到了延时队列,延时队列再依据延时级别把音讯投递到原始的队列,这样消费者就能再次拉取到。流程如下图:
从流程来看,消费者批量拉取音讯,如果局部音讯生产失败,那就会整批全副重试。所以做好幂等是必要的。