共计 10417 个字符,预计需要花费 27 分钟才能阅读完成。
序
本文次要钻研一下 rocketmq 的订阅关系
报错
org.apache.rocketmq.client.exception.MQClientException: The consumer group[demo-group] has been created before, specify another name please.
See http://rocketmq.apache.org/docs/faq/ for further details.
at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.start(DefaultMQPushConsumerImpl.java:629)
at org.apache.rocketmq.client.consumer.DefaultMQPushConsumer.start(DefaultMQPushConsumer.java:693)
启动了两个 consumer,别离生产 topic1 和 topic2,然而都应用了同一个 group,后果启动报错
DefaultMQPushConsumerImpl#start
org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
DefaultMQPushConsumerImpl 的 start 办法会应用 mQClientFactory.registerConsumer 去注册 consumer,如果返回 false 则抛出 MQClientException 异样
registerConsumer
org/apache/rocketmq/client/impl/factory/MQClientInstance.java
public boolean registerConsumer(final String group, final MQConsumerInner consumer) {if (null == group || null == consumer) {return false;}
MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
if (prev != null) {log.warn("the consumer group[" + group + "] exist already.");
return false;
}
return true;
}
MQClientInstance 的 registerConsumer 应用 consumerTable 保护了 group 与 consumer 的关系,这里要求一个 consumer group 只能与一个 consumer 关联,如果不同 consumer 用了同一个 group 名称则会返回 false
订阅一致性问题
@Test
public void testConsume() throws MQClientException, InterruptedException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("string_consumer_test");
consumer.setNamesrvAddr("192.168.64.3:9876");
// consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("string-topic-new", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
for (MessageExt ext : msg) {System.out.printf("consumer1: %s \n",new String(ext.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
DefaultMQPushConsumer consumer2 = new DefaultMQPushConsumer("string_consumer_test");
consumer2.setNamesrvAddr("192.168.64.3:9876");
// consumer2.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer2.subscribe("string-topic2-new", "*");
consumer2.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {System.out.printf("consumer2 %s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
for (MessageExt ext : msg) {System.out.printf("consumer2: %s \n",new String(ext.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer2.start();
System.out.printf("Consumer Started.%n");
TimeUnit.HOURS.sleep(1);
}
像如上代码,两个 consumer 应用了同一个 group,然而他们订阅了不同的 topic,这种最初会造成 consumer1 及 consumer2 不能如预期那样失常生产音讯
DefaultMQPushConsumer.start()
org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
public void start() throws MQClientException {setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {log.warn("trace dispatcher start failed", e);
}
}
}
DefaultMQPushConsumer 的 start 办法执行的是 defaultMQPushConsumerImpl.start()
DefaultMQPushConsumerImpl.start()
org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
//......
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
DefaultMQPushConsumerImpl.start() 办法会执行 mQClientFactory.registerConsumer,最初执行 mQClientFactory.sendHeartbeatToAllBrokerWithLock()
MQClientInstance.registerConsumer
org/apache/rocketmq/client/impl/factory/MQClientInstance.java
/**
* The container of the consumer in the current client. The key is the name of consumerGroup.
*/
private final ConcurrentMap<String, MQConsumerInner> consumerTable = new ConcurrentHashMap<>();
public synchronized boolean registerConsumer(final String group, final MQConsumerInner consumer) {if (null == group || null == consumer) {return false;}
MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
if (prev != null) {log.warn("the consumer group[" + group + "] exist already.");
return false;
}
return true;
}
MQClientInstance.registerConsumer 办法以 group 为维度去注册 consumer
MQClientInstance.sendHeartbeatToAllBrokerWithLock()
org/apache/rocketmq/client/impl/factory/MQClientInstance.java
public void sendHeartbeatToAllBrokerWithLock() {if (this.lockHeartbeat.tryLock()) {
try {this.sendHeartbeatToAllBroker();
this.uploadFilterClassSource();} catch (final Exception e) {log.error("sendHeartbeatToAllBroker exception", e);
} finally {this.lockHeartbeat.unlock();
}
} else {log.warn("lock heartBeat, but failed. [{}]", this.clientId);
}
}
private HeartbeatData prepareHeartbeatData() {HeartbeatData heartbeatData = new HeartbeatData();
// clientID
heartbeatData.setClientID(this.clientId);
// Consumer
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {MQConsumerInner impl = entry.getValue();
if (impl != null) {ConsumerData consumerData = new ConsumerData();
consumerData.setGroupName(impl.groupName());
consumerData.setConsumeType(impl.consumeType());
consumerData.setMessageModel(impl.messageModel());
consumerData.setConsumeFromWhere(impl.consumeFromWhere());
consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());
consumerData.setUnitMode(impl.isUnitMode());
heartbeatData.getConsumerDataSet().add(consumerData);
}
}
// Producer
for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {MQProducerInner impl = entry.getValue();
if (impl != null) {ProducerData producerData = new ProducerData();
producerData.setGroupName(entry.getKey());
heartbeatData.getProducerDataSet().add(producerData);
}
}
return heartbeatData;
}
每次 send heartbeat,broker 都会解决这些信息
ClientManageProcessor.heartBeat
org/apache/rocketmq/broker/processor/ClientManageProcessor.java
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {RemotingCommand response = RemotingCommand.createResponseCommand(null);
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(ctx.channel(),
heartbeatData.getClientID(),
request.getLanguage(),
request.getVersion());
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(data.getGroupName());
boolean isNotifyConsumerIdsChangedEnable = true;
if (null != subscriptionGroupConfig) {isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
int topicSysFlag = 0;
if (data.isUnitMode()) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
String newTopic = MixAll.getRetryTopic(data.getGroupName());
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
}
boolean changed = this.brokerController.getConsumerManager().registerConsumer(data.getGroupName(),
clientChannelInfo,
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere(),
data.getSubscriptionDataSet(),
isNotifyConsumerIdsChangedEnable
);
if (changed) {log.info("registerConsumer info changed {} {}",
data.toString(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel())
);
}
}
for (ProducerData data : heartbeatData.getProducerDataSet()) {this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
clientChannelInfo);
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
这里 brokerController 是依据 groupName 类注册 consumer 的,如果有 changed,则会打印日志
2023-05-08 20:17:59 INFO HeartbeatThread_1 - registerConsumer info changed ConsumerData [groupName=string_consumer_test, consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_LAST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=string-topic-new, subString=*, tagsSet=[], codeSet=[], subVersion=1683548105549, expressionType=TAG], SubscriptionData [classFilterMode=false, topic=%RETRY%string_consumer_test, subString=*, tagsSet=[], codeSet=[], subVersion=1683548138803, expressionType=TAG]]] 192.168.64.1:51651
2023-05-08 20:19:01 INFO HeartbeatThread_2 - registerConsumer info changed ConsumerData [groupName=string_consumer_test, consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_LAST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=string-topic2-new, subString=*, tagsSet=[], codeSet=[], subVersion=1683548243594, expressionType=TAG], SubscriptionData [classFilterMode=false, topic=%RETRY%string_consumer_test, subString=*, tagsSet=[], codeSet=[], subVersion=1683548247027, expressionType=TAG]]] 192.168.64.1:51675
ConsumerManager.registerConsumer
org/apache/rocketmq/broker/client/ConsumerManager.java
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {if (isNotifyConsumerIdsChangedEnable) {this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
每次 heartbeat 都会执行 registerConsumer,而 key 是 consumerGroup,这样子会造成 broker 端的订阅关系时而是 consumer1 的,时而是 consumer2 的,最终造成音讯延时或者音讯生产不到的问题
小结
rocketmq 的订阅关系要求应用同一个 consumer group 的不同 consumer 它们对 topic 及 tag 的订阅关系要统一,不然会造成音讯未能如期生产等异样,其本质是 broker 端保护了 key 为 group 的 ConsumerGroupInfo,而每次 consumer 的 heartbeat 则会在 broker 端变更同一个 group 的 ConsumerData 信息,造成订阅关系一直被变更。
doc
- 消费者分组(ConsumerGroup)
- 订阅关系(Subscription)
- 我擦,RocketMQ 的 tag 还有这个“坑”!
- RocketMQ 同一个消费者内消费者订阅不同 Topic 问题剖析