共计 7779 个字符,预计需要花费 20 分钟才能阅读完成。
序
本文主要研究一下 rocketmq broker 的 CONSUMER_SEND_MSG_BACK
CONSUMER_SEND_MSG_BACK
rocketmq/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
public class RequestCode {
//......
public static final int CONSUMER_SEND_MSG_BACK = 36;
//......
- RequestCode 定义了 CONSUMER_SEND_MSG_BACK 常量,值为 36
processRequest
rocketmq/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private List<ConsumeMessageHook> consumeMessageHookList;
public SendMessageProcessor(final BrokerController brokerController) {super(brokerController);
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {return null;}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
RemotingCommand response;
if (requestHeader.isBatch()) {response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
}
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
}
//......
}
- SendMessageProcessor 对于 request.getCode() 为 RequestCode.CONSUMER_SEND_MSG_BACK 会执行 consumerSendMsgBack 方法
consumerSendMsgBack
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private List<ConsumeMessageHook> consumeMessageHookList;
public SendMessageProcessor(final BrokerController brokerController) {super(brokerController);
}
//......
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());
if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {ConsumeMessageContext context = new ConsumeMessageContext();
context.setNamespace(namespace);
context.setConsumerGroup(requestHeader.getGroup());
context.setTopic(requestHeader.getOriginTopic());
context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
context.setCommercialRcvTimes(1);
context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));
this.executeConsumeMessageHookAfter(context);
}
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
if (null == subscriptionGroupConfig) {response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist," + requestHeader.getGroup() + " "
+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
return response;
}
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
return response;
}
if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
if (null == topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
if (!PermName.isWriteable(topicConfig.getPerm())) {response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
return response;
}
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed," + requestHeader.getOffset());
return response;
}
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);
int delayLevel = requestHeader.getDelayLevel();
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0
);
if (null == topicConfig) {response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
} else {if (0 == delayLevel) {delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
if (putMessageResult != null) {switch (putMessageResult.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {backTopic = correctTopic;}
this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
default:
break;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(putMessageResult.getPutMessageStatus().name());
return response;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
return response;
}
//......
}
- consumerSendMsgBack 方法会通过 brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig 方法查询该请求的 consumerGroup 对应的 subscriptionGroupConfig,如果 subscriptionGroupConfig 为 null 则提前返回,如果 subscriptionGroupConfig 的 retryQueueNums 小于等于 0,也提前返回;然后通过 MixAll.getRetryTopic(requestHeader.getGroup()) 方法获取该 consumerGroup 对应的 retryTopic,并计算 queueIdInt;之后判断 subscriptionGroupConfig 设置的 maxReconsumeTimes,如果大于等于该值则将该消息发往 DLQ_GROUP_TOPIC;最后通过 brokerController.getMessageStore().putMessage(msgInner) 将该消息放入对应的 newTopic
小结
SendMessageProcessor 对于 request.getCode() 为 RequestCode.CONSUMER_SEND_MSG_BACK 会执行 consumerSendMsgBack 方法
doc
- RequestCode
正文完