关于rocketmq:RocketMQ源码解析topic创建机制

43次阅读

共计 8030 个字符,预计需要花费 21 分钟才能阅读完成。

  1. RocketMQ Topic 创立机制
    RocketMQ Topic 创立机制分为两种:一种主动创立,一种手动创立。能够通过设置 broker 的配置文件来禁用或者容许主动创立。默认是开启的容许主动创立

autoCreateTopicEnable=true/false

上面会联合源码来深度剖析一下主动创立和手动创立的过程。

  1. 主动 Topic
    默认状况下,topic 不必手动创立,当 producer 进行音讯发送时,会从 nameserver 拉取 topic 的路由信息,如果 topic 的路由信息不存在,那么会默认拉取 broker 启动时默认创立好名为“TBW102”的 Topic, 这定义在 org.apache.rocketmq.common.MixAll 类中
    // Will be created at broker when isAutoCreateTopicEnable
    public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = “TBW102”;
    复制代码
    主动创立开关是下 BrokerConfig 类中有一个公有变量:
    @ImportantField
    private boolean autoCreateTopicEnable = true;
    复制代码
    这变量能够通过配置文件配置来进行批改,代码中的默认值为 true,所以在默认的状况下 Rocket MQ 是会主动创立 Topic 的。
    在 Broker 启动,会调用 TopicConfigManager 的构造方法,在构造方法中定义了一系列 RocketMQ 零碎内置的一些零碎 Topic(这里只关注一下 TBW102):
    {
    // MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
    if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {

     String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
     TopicConfig topicConfig = new TopicConfig(topic);
     this.systemTopicList.add(topic);
     topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
         .getDefaultTopicQueueNums()); //8
     topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
         .getDefaultTopicQueueNums()); //8
     int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
     topicConfig.setPerm(perm);
     this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);

    }
    }
    复制代码
    这里有 this.brokerController.getBrokerConfig().isAutoCreateTopicEnable() 这样一段代码,在开启容许主动创立的时候,会把以后 Topic 的信息存入 topicConfigTable 变量中。而后通过发送定期发送心跳包把 Topic 和 Broker 的信息发送到 NameServer 的 RouteInfoManager 中进行保留。在 BrokerController 中定义了这样的一个定时工作来执行这个心跳包的发送:
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

         @Override
         public void run() {
             try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
             } catch (Throwable e) {log.error("registerBrokerAll Exception", e);
             }
         }
     }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
    

复制代码
这里就阐明了如何把每个 Broker 的零碎自定义的 Topic 注册到 NameServer。接下来看在发送过程中如何从 NameServer 获取 Topic 的路由信息:
DefaultMQProducerImpl.sendDefaultImpl
private SendResult sendDefaultImpl(

    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    
    // 省略代码
    
    // 获取路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    
}

复制代码
通过 DefaultMQProducerImpl.tryToFindTopicPublishInfo 办法获取 Topic 的路由信息。

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    // 第一次从缓存中获取 -- 必定没有因为还没创立
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        // 从 NameServer 获取 -- 也是没有,因为没有创立
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else {
        // 第二次从这里获取
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

复制代码
上面来看一下 MQClientInstance.updateTopicRouteInfoFromNameServer 的办法:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,

    DefaultMQProducer defaultMQProducer) {
        
// 省略代码


if (isDefault && defaultMQProducer != null) {
        // 应用默认的 TBW102 Topic 获取数据
        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                        1000 * 3);
            if (topicRouteData != null) {for (QueueData data : topicRouteData.getQueueDatas()) {int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                            data.setReadQueueNums(queueNums);
                            data.setWriteQueueNums(queueNums);
                        }
                    }
                } else {
                    // 这是失常的
                    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                }
  // 省略代码      
}

复制代码
如果 isDefault=true 并且 defaultMQProducer 不为空,从 nameserver 中获取默认路由信息,此时会获取所有已开启主动创立开关的 broker 的默认“TBW102”topic 路由信息,并保留默认的 topic 音讯队列数量。

这里会比拟一下配在在 DefaultMQProducer.defaultTopicQueueNums 中的默认值和 TBW102 中的值哪个更小。

if (topicRouteData != null) {

    TopicRouteData old = this.topicRouteTable.get(topic);
    boolean changed = topicRouteDataIsChange(old, topicRouteData);
    if (!changed) {changed = this.isNeedUpdateTopicRouteInfo(topic);
    } else {log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
    }

}
复制代码
判断获取默认的是否存在,如果存在把以后的 Topic 的信息更新。也就是把 TBW102 Topic 的数据更新为主动创立的数据。
if (changed) {

TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();

for (BrokerData bd : topicRouteData.getBrokerDatas()) {this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}

// Update Pub info
{TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
    publishInfo.setHaveTopicRouterInfo(true);
    Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
    while (it.hasNext()) {Entry<String, MQProducerInner> entry = it.next();
        MQProducerInner impl = entry.getValue();
        if (impl != null) {impl.updateTopicPublishInfo(topic, publishInfo);
        }
    }
}
    // Update sub info
{Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
    Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
    while (it.hasNext()) {Entry<String, MQConsumerInner> entry = it.next();
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {impl.updateTopicSubscribeInfo(topic, subscribeInfo);
        }
    }
}
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;

}
复制代码
更新本地的缓存。这样 TBW102 Topic 的负载和一些默认的路由信息就会被本人创立的 Topic 应用。这里就是整个主动创立的过程.
总结一下就是:通过应用零碎外部的一个 TBW102 的 Topic 的配置来主动创立以后用户的要创立的自定义 Topic。

  1. 手动创立 – 事后创立
    手动创立也叫事后创立,就是在应用 Topic 之前就创立,能够通过命令行或者通过 RocketMQ 的治理界面创立 Topic。
    通过界面控制台创立

我的项目地址:github.com/apache/rock…

TopicController 次要负责 Topic 的治理
@RequestMapping(value = “/createOrUpdate.do”, method = { RequestMethod.POST})
@ResponseBody
public Object topicCreateOrUpdateRequest(@RequestBody TopicConfigInfo topicCreateOrUpdateRequest) {

Preconditions.checkArgument(CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getBrokerNameList()) || CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getClusterNameList()),
        "clusterName or brokerName can not be all blank");
logger.info("op=look topicCreateOrUpdateRequest={}", JsonUtil.obj2String(topicCreateOrUpdateRequest));
topicService.createOrUpdate(topicCreateOrUpdateRequest);
return true;

}
复制代码
而后通过 MQAdminExtImpl.createAndUpdateTopicConfig 办法来创立:

@Override
public void createAndUpdateTopicConfig(String addr, TopicConfig config)
    throws RemotingException, MQBrokerException, InterruptedException, MQClientException {MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, config);
}

复制代码
通过调用 DefaultMQAdminExtImpl.createAndUpdateTopicConfig 创立 Topic
@Override
public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException,

    InterruptedException, MQClientException {this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);

}
复制代码
最初通过 MQClientAPIImpl.createTopic 创立 Topic

public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
    final long timeoutMillis)
    throws RemotingException, MQBrokerException, InterruptedException, MQClientException {CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
    requestHeader.setTopic(topicConfig.getTopicName());
    requestHeader.setDefaultTopic(defaultTopic);
    requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
    requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
    requestHeader.setPerm(topicConfig.getPerm());
    requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
    requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
    requestHeader.setOrder(topicConfig.isOrder());

    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);

    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
        request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {return;}
        default:
            break;
    }

    throw new MQClientException(response.getCode(), response.getRemark());
}

正文完
 0