本文次要钻研一下rocketmq的订阅关系

报错

org.apache.rocketmq.client.exception.MQClientException: The consumer group[demo-group] has been created before, specify another name please.See http://rocketmq.apache.org/docs/faq/ for further details.    at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.start(DefaultMQPushConsumerImpl.java:629)    at org.apache.rocketmq.client.consumer.DefaultMQPushConsumer.start(DefaultMQPushConsumer.java:693)
启动了两个consumer,别离生产topic1和topic2,然而都应用了同一个group,后果启动报错

DefaultMQPushConsumerImpl#start

org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);                if (!registerOK) {                    this.serviceState = ServiceState.CREATE_JUST;                    this.consumeMessageService.shutdown();                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),                        null);                }
DefaultMQPushConsumerImpl的start办法会应用mQClientFactory.registerConsumer去注册consumer,如果返回false则抛出MQClientException异样

registerConsumer

org/apache/rocketmq/client/impl/factory/MQClientInstance.java

public boolean registerConsumer(final String group, final MQConsumerInner consumer) {        if (null == group || null == consumer) {            return false;        }        MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);        if (prev != null) {            log.warn("the consumer group[" + group + "] exist already.");            return false;        }        return true;    }
MQClientInstance的registerConsumer应用consumerTable保护了group与consumer的关系,这里要求一个consumer group只能与一个consumer关联,如果不同consumer用了同一个group名称则会返回false

订阅一致性问题

@Test    public void testConsume() throws MQClientException, InterruptedException {        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("string_consumer_test");        consumer.setNamesrvAddr("192.168.64.3:9876");//        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);        consumer.subscribe("string-topic-new", "*");        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);            for (MessageExt ext : msg) {                System.out.printf("consumer1: %s \n",new String(ext.getBody()));            }            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        });        consumer.start();        DefaultMQPushConsumer consumer2 = new DefaultMQPushConsumer("string_consumer_test");        consumer2.setNamesrvAddr("192.168.64.3:9876");//        consumer2.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);        consumer2.subscribe("string-topic2-new", "*");        consumer2.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {            System.out.printf("consumer2 %s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);            for (MessageExt ext : msg) {                System.out.printf("consumer2: %s \n",new String(ext.getBody()));            }            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        });        consumer2.start();        System.out.printf("Consumer Started.%n");        TimeUnit.HOURS.sleep(1);    }
像如上代码,两个consumer应用了同一个group,然而他们订阅了不同的topic,这种最初会造成consumer1及consumer2不能如预期那样失常生产音讯

DefaultMQPushConsumer.start()

org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public void start() throws MQClientException {        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));        this.defaultMQPushConsumerImpl.start();        if (null != traceDispatcher) {            try {                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());            } catch (MQClientException e) {                log.warn("trace dispatcher start failed ", e);            }        }    }
DefaultMQPushConsumer的start办法执行的是defaultMQPushConsumerImpl.start()

DefaultMQPushConsumerImpl.start()

org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);//......this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
DefaultMQPushConsumerImpl.start()办法会执行mQClientFactory.registerConsumer,最初执行mQClientFactory.sendHeartbeatToAllBrokerWithLock()

MQClientInstance.registerConsumer

org/apache/rocketmq/client/impl/factory/MQClientInstance.java

    /**     * The container of the consumer in the current client. The key is the name of consumerGroup.     */    private final ConcurrentMap<String, MQConsumerInner> consumerTable = new ConcurrentHashMap<>();     public synchronized boolean registerConsumer(final String group, final MQConsumerInner consumer) {        if (null == group || null == consumer) {            return false;        }        MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);        if (prev != null) {            log.warn("the consumer group[" + group + "] exist already.");            return false;        }        return true;    }
MQClientInstance.registerConsumer办法以group为维度去注册consumer

MQClientInstance.sendHeartbeatToAllBrokerWithLock()

org/apache/rocketmq/client/impl/factory/MQClientInstance.java

    public void sendHeartbeatToAllBrokerWithLock() {        if (this.lockHeartbeat.tryLock()) {            try {                this.sendHeartbeatToAllBroker();                this.uploadFilterClassSource();            } catch (final Exception e) {                log.error("sendHeartbeatToAllBroker exception", e);            } finally {                this.lockHeartbeat.unlock();            }        } else {            log.warn("lock heartBeat, but failed. [{}]", this.clientId);        }    }    private HeartbeatData prepareHeartbeatData() {        HeartbeatData heartbeatData = new HeartbeatData();        // clientID        heartbeatData.setClientID(this.clientId);        // Consumer        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {            MQConsumerInner impl = entry.getValue();            if (impl != null) {                ConsumerData consumerData = new ConsumerData();                consumerData.setGroupName(impl.groupName());                consumerData.setConsumeType(impl.consumeType());                consumerData.setMessageModel(impl.messageModel());                consumerData.setConsumeFromWhere(impl.consumeFromWhere());                consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());                consumerData.setUnitMode(impl.isUnitMode());                heartbeatData.getConsumerDataSet().add(consumerData);            }        }        // Producer        for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {            MQProducerInner impl = entry.getValue();            if (impl != null) {                ProducerData producerData = new ProducerData();                producerData.setGroupName(entry.getKey());                heartbeatData.getProducerDataSet().add(producerData);            }        }        return heartbeatData;    }    
每次send heartbeat,broker都会解决这些信息

ClientManageProcessor.heartBeat

org/apache/rocketmq/broker/processor/ClientManageProcessor.java

    public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {        RemotingCommand response = RemotingCommand.createResponseCommand(null);        HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);        ClientChannelInfo clientChannelInfo = new ClientChannelInfo(            ctx.channel(),            heartbeatData.getClientID(),            request.getLanguage(),            request.getVersion()        );        for (ConsumerData data : heartbeatData.getConsumerDataSet()) {            SubscriptionGroupConfig subscriptionGroupConfig =                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(                    data.getGroupName());            boolean isNotifyConsumerIdsChangedEnable = true;            if (null != subscriptionGroupConfig) {                isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();                int topicSysFlag = 0;                if (data.isUnitMode()) {                    topicSysFlag = TopicSysFlag.buildSysFlag(false, true);                }                String newTopic = MixAll.getRetryTopic(data.getGroupName());                this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(                    newTopic,                    subscriptionGroupConfig.getRetryQueueNums(),                    PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);            }            boolean changed = this.brokerController.getConsumerManager().registerConsumer(                data.getGroupName(),                clientChannelInfo,                data.getConsumeType(),                data.getMessageModel(),                data.getConsumeFromWhere(),                data.getSubscriptionDataSet(),                isNotifyConsumerIdsChangedEnable            );            if (changed) {                log.info("registerConsumer info changed {} {}",                    data.toString(),                    RemotingHelper.parseChannelRemoteAddr(ctx.channel())                );            }        }        for (ProducerData data : heartbeatData.getProducerDataSet()) {            this.brokerController.getProducerManager().registerProducer(data.getGroupName(),                clientChannelInfo);        }        response.setCode(ResponseCode.SUCCESS);        response.setRemark(null);        return response;    }
这里brokerController是依据groupName类注册consumer的,如果有changed,则会打印日志
2023-05-08 20:17:59 INFO HeartbeatThread_1 - registerConsumer info changed ConsumerData [groupName=string_consumer_test, consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_LAST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=string-topic-new, subString=*, tagsSet=[], codeSet=[], subVersion=1683548105549, expressionType=TAG], SubscriptionData [classFilterMode=false, topic=%RETRY%string_consumer_test, subString=*, tagsSet=[], codeSet=[], subVersion=1683548138803, expressionType=TAG]]] 192.168.64.1:516512023-05-08 20:19:01 INFO HeartbeatThread_2 - registerConsumer info changed ConsumerData [groupName=string_consumer_test, consumeType=CONSUME_PASSIVELY, messageModel=CLUSTERING, consumeFromWhere=CONSUME_FROM_LAST_OFFSET, unitMode=false, subscriptionDataSet=[SubscriptionData [classFilterMode=false, topic=string-topic2-new, subString=*, tagsSet=[], codeSet=[], subVersion=1683548243594, expressionType=TAG], SubscriptionData [classFilterMode=false, topic=%RETRY%string_consumer_test, subString=*, tagsSet=[], codeSet=[], subVersion=1683548247027, expressionType=TAG]]] 192.168.64.1:51675

ConsumerManager.registerConsumer

org/apache/rocketmq/broker/client/ConsumerManager.java

private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =        new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,        final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);        if (null == consumerGroupInfo) {            ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);            ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);            consumerGroupInfo = prev != null ? prev : tmp;        }        boolean r1 =            consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,                consumeFromWhere);        boolean r2 = consumerGroupInfo.updateSubscription(subList);        if (r1 || r2) {            if (isNotifyConsumerIdsChangedEnable) {                this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());            }        }        this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);        return r1 || r2;    }
每次heartbeat都会执行registerConsumer,而key是consumerGroup,这样子会造成broker端的订阅关系时而是consumer1的,时而是consumer2的,最终造成音讯延时或者音讯生产不到的问题

小结

rocketmq的订阅关系要求应用同一个consumer group的不同consumer它们对topic及tag的订阅关系要统一,不然会造成音讯未能如期生产等异样,其本质是broker端保护了key为group的ConsumerGroupInfo,而每次consumer的heartbeat则会在broker端变更同一个group的ConsumerData信息,造成订阅关系一直被变更。

doc

  • 消费者分组(ConsumerGroup)
  • 订阅关系(Subscription)
  • 我擦,RocketMQ的tag还有这个“坑”!
  • RocketMQ同一个消费者内消费者订阅不同Topic问题剖析