深度解析RocketMQ Topic的创建机制

20次阅读

共计 7091 个字符,预计需要花费 18 分钟才能阅读完成。

我还记得第一次使用 rocketmq 的时候,需要去控制台预先创建 topic,我当时就想为什么要这么设计,于是我决定撸一波源码,带大家从根源上吃透 rocketmq topic 的创建机制。
topic 在 rocketmq 的设计思想里,是作为同一个业务逻辑消息的组织形式,它仅仅是一个逻辑上的概念,而在一个 topic 下又包含若干个逻辑队列,即消息队列,消息内容实际是存放在队列中,而队列又存储在 broker 中,下面我用一张图来说明 topic 的存储模型:

其实 rocketmq 中存在两种不同的 topic 创建方式,一种是我刚刚说的预先创建,另一种是自动创建,下面我开车带大家从源码的角度来详细地解读这两种创建机制。
自动创建
默认情况下,topic 不用手动创建,当 producer 进行消息发送时,会从 nameserver 拉取 topic 的路由信息,如果 topic 的路由信息不存在,那么会默认拉取 broker 启动时默认创建好名为“TBW102”的 Topic:
org.apache.rocketmq.common.MixAll:
// Will be created at broker when isAutoCreateTopicEnable
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = “TBW102”;
自动创建的开关配置在 BrokerConfig 中,通过 autoCreateTopicEnable 字段进行控制,
org.apache.rocketmq.common.BrokerConfig:
@ImportantField
private boolean autoCreateTopicEnable = true;
在 broker 启动时,会调用 TopicConfigManager 的构造方法,autoCreateTopicEnable 打开后,会将“TBW102”保存到 topicConfigTable 中:
org.apache.rocketmq.broker.topic.TopicConfigManager#TopicConfigManager:
// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
broker 会通过发送心跳包将 topicConfigTable 的 topic 信息发送给 nameserver,nameserver 将 topic 信息注册到 RouteInfoManager 中。
继续看消息发送时是如何从 nameserver 获取 topic 的路由信息:
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo:
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 生产者第一次发送消息,topic 在 nameserver 中并不存在
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}

if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
// 第二次请求会将 isDefault=true,开启默认“TBW102”从 namerserver 获取路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
如上方法,topic 首次发送消息,此时并不能从 namserver 获取 topic 的路由信息,那么接下来会进行第二次请求 namserver,这时会将 isDefault=true,开启默认“TBW102”从 namerserver 获取路由信息,此时的“TBW102”topic 已经被 broker 默认注册到 nameserver 了:
org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:
if (isDefault && defaultMQProducer != null) {
// 使用默认的“TBW102”topic 获取路由信息
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
}
如果 isDefault=true 并且 defaultMQProducer 不为空,从 nameserver 中获取默认路由信息,此时会获取所有已开启自动创建开关的 broker 的默认“TBW102”topic 路由信息,并保存默认的 topic 消息队列数量。
org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info(“the topic[{}] route info changed, old[{}] ,new[{}]”, topic, old, topicRouteData);
}
从本地缓存中取出 topic 的路由信息,由于 topic 是第一次发送消息,这时本地并没有该 topic 的路由信息,所以对比该 topic 路由信息对比“TBW102”时 changed 为 true,即有变化,进入以下逻辑:
org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer:
// Update sub info
{
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
将“TBW102”topic 路由信息构建 TopicPublishInfo,并将用 topic 为 key,TopicPublishInfo 为 value 更新本地缓存,到这里就明白了,原来 broker 们千辛万苦创建“TBW102”topic 并将其路由信息注册到 nameserver,被新来的 topic 获取后立即用“TBW102”topic 的路由信息构建出一个 TopicPublishInfo 并且据为己有,由于 TopicPublishInfo 的路由信息时默认“TBW102”topic,因此真正要发送消息的 topic 也会被负载发送到“TBW102”topic 所在的 broker 中,这里我们可以将其称之为偷梁换柱的做法。
当 broker 接收到消息后,会在 msgCheck 方法中调用 createTopicInSendMessageMethod 方法,将 topic 的信息塞进 topicConfigTable 缓存中,并且 broker 会定时发送心跳将 topicConfigTable 发送给 nameserver 进行注册。
自动创建与消息发送时获取 topic 信息的时序图:

预先创建
其实这个叫预先创建似乎更加适合,即预先在 broker 中创建好 topic 的相关信息并注册到 nameserver 中,然后 client 端发送消息时直接从 nameserver 中获取 topic 的路由信息,但是手动创建从动作上来将更加形象通俗易懂,直接告诉你,你的 topic 信息需要在控制台上自己手动创建。
预先创建需要通过 mqadmin 提供的 topic 相关命令进行创建,执行:
./mqadmin updateTopic
官方给出的各项参数如下:
usage: mqadmin updateTopic [-b <arg>] [-c <arg>] [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>]
-t <arg> [-u <arg>] [-w <arg>]
-b,–brokerAddr <arg> create topic to which broker
-c,–clusterName <arg> create topic to which cluster
-h,–help Print help
-n,–namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
-o,–order <arg> set topic’s order(true|false
-p,–perm <arg> set topic’s permission(2|4|6), intro[2:W 4:R; 6:RW]
-r,–readQueueNums <arg> set read queue nums
-s,–hasUnitSub <arg> has unit sub (true|false
-t,–topic <arg> topic name
-u,–unit <arg> is unit topic (true|false
-w,–writeQueueNums <arg> set write queue nums
我们直接定位到其实现类执行命令的方法:
通过 broker 模式创建:
org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute:
// -b,–brokerAddr <arg> create topic to which broker
if (commandLine.hasOption(‘b’)) {
String addr = commandLine.getOptionValue(‘b’).trim();
defaultMQAdminExt.start();
defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
return;
}
从 commandLine 命令行工具获取运行时 - b 参数重的 broker 的地址,defaultMQAdminExt 是默认的 rocketmq 控制台执行的 API,此时调用 start 方法,该方法创建了一个 mqClientInstance,它封装了 netty 通信的细节,接着就是最重要的一步,调用 createAndUpdateTopicConfig 将 topic 配置信息发送到指定的 broker 上,完成 topic 的创建。
通过集群模式创建:
org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute:
// -c,–clusterName <arg> create topic to which cluster
else if (commandLine.hasOption(‘c’)) {
String clusterName = commandLine.getOptionValue(‘c’).trim();
defaultMQAdminExt.start();
Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : masterSet) {
defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
System.out.printf(“create topic to %s success.%n”, addr);
}
return;
}
通过集群模式创建与通过 broker 模式创建的逻辑大致相同,多了根据集群从 nameserver 获取集群下所有 broker 的 master 地址这个步骤,然后在循环发送 topic 信息到集群中的每个 broker 中,这个逻辑跟指定单个 broker 是一致的。
这也说明了当用集群模式去创建 topic 时,集群里面每个 broker 的 queue 的数量相同,当用单个 broker 模式去创建 topic 时,每个 broker 的 queue 数量可以不一致。
预先创建时序图:

何时需要预先创建 Topic?
建议线下开启,线上关闭,不是我说的,是官方给出的建议:

rocketmq 为什么要这么设计呢?经过一波源码深度解析后,我得到了我想要的答案:
根据上面的源码分析,我们得出,rocketmq 在发送消息时,会先去获取 topic 的路由信息,如果 topic 是第一次发送消息,由于 nameserver 没有 topic 的路由信息,所以会再次以“TBW102”这个默认 topic 获取路由信息,假设 broker 都开启了自动创建开关,那么此时会获取所有 broker 的路由信息,消息的发送会根据负载算法选择其中一台 Broker 发送消息,消息到达 broker 后,发现本地没有该 topic,会在创建该 topic 的信息塞进本地缓存中,同时会将 topic 路由信息注册到 nameserver 中,那么这样就会造成一个后果:以后所有该 topic 的消息,都将发送到这台 broker 上,如果该 topic 消息量非常大,会造成某个 broker 上负载过大,这样消息的存储就达不到负载均衡的目的了。
扫面下方二维码,关注我的公众号,开车带你临摹各种源码,来不及解释了快上车!

正文完
 0