- RocketMQ Topic创立机制
RocketMQ Topic创立机制分为两种:一种主动创立,一种手动创立。能够通过设置broker的配置文件来禁用或者容许主动创立。默认是开启的容许主动创立
autoCreateTopicEnable=true/false
上面会联合源码来深度剖析一下主动创立和手动创立的过程。
主动Topic
默认状况下,topic不必手动创立,当producer进行音讯发送时,会从nameserver拉取topic的路由信息,如果topic的路由信息不存在,那么会默认拉取broker启动时默认创立好名为“TBW102”的Topic,这定义在org.apache.rocketmq.common.MixAll类中
// Will be created at broker when isAutoCreateTopicEnable
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";
复制代码
主动创立开关是下BrokerConfig类中有一个公有变量:
@ImportantField
private boolean autoCreateTopicEnable = true;
复制代码
这变量能够通过配置文件配置来进行批改,代码中的默认值为true,所以在默认的状况下Rocket MQ是会主动创立Topic的。
在Broker启动,会调用TopicConfigManager的构造方法,在构造方法中定义了一系列RocketMQ零碎内置的一些零碎Topic(这里只关注一下TBW102):
{
// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); //8 topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); //8 int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE; topicConfig.setPerm(perm); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
复制代码
这里有 this.brokerController.getBrokerConfig().isAutoCreateTopicEnable() 这样一段代码,在开启容许主动创立的时候,会把以后Topic的信息存入topicConfigTable变量中。而后通过发送定期发送心跳包把Topic和Broker的信息发送到NameServer的RouteInfoManager中进行保留。在BrokerController中定义了这样的一个定时工作来执行这个心跳包的发送:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
复制代码
这里就阐明了如何把每个Broker的零碎自定义的Topic注册到NameServer。接下来看在发送过程中如何从NameServer获取Topic的路由信息:
DefaultMQProducerImpl.sendDefaultImpl
private SendResult sendDefaultImpl(
Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { //省略代码 //获取路由信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); }
复制代码
通过DefaultMQProducerImpl.tryToFindTopicPublishInfo办法获取Topic的路由信息。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); //第一次从缓存中获取--必定没有因为还没创立 if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); //从NameServer获取--也是没有,因为没有创立 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { //第二次从这里获取 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; }}
复制代码
上面来看一下 MQClientInstance.updateTopicRouteInfoFromNameServer 的办法:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) { //省略代码if (isDefault && defaultMQProducer != null) { //应用默认的TBW102 Topic获取数据 topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } } else { //这是失常的 topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); } //省略代码 }
复制代码
如果isDefault=true并且defaultMQProducer不为空,从nameserver中获取默认路由信息,此时会获取所有已开启主动创立开关的broker的默认“TBW102”topic路由信息,并保留默认的topic音讯队列数量。
这里会比拟一下配在在 DefaultMQProducer.defaultTopicQueueNums中的默认值和TBW102中的值哪个更小。
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataIsChange(old, topicRouteData); if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); } else { log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData); }
}
复制代码
判断获取默认的是否存在,如果存在把以后的Topic的信息更新。也就是把TBW102 Topic的数据更新为主动创立的数据。
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();for (BrokerData bd : topicRouteData.getBrokerDatas()) { this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update Pub info{ TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData); publishInfo.setHaveTopicRouterInfo(true); Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQProducerInner> entry = it.next(); MQProducerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicPublishInfo(topic, publishInfo); } }} // Update sub info{ Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData); Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicSubscribeInfo(topic, subscribeInfo); } }}log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);return true;
}
复制代码
更新本地的缓存。这样TBW102 Topic的负载和一些默认的路由信息就会被本人创立的Topic应用。这里就是整个主动创立的过程.
总结一下就是:通过应用零碎外部的一个TBW102的Topic的配置来主动创立以后用户的要创立的自定义Topic。
- 手动创立--事后创立
手动创立也叫事后创立,就是在应用Topic之前就创立,能够通过命令行或者通过RocketMQ的治理界面创立Topic。
通过界面控制台创立
我的项目地址: github.com/apache/rock…
TopicController次要负责Topic的治理
@RequestMapping(value = "/createOrUpdate.do", method = { RequestMethod.POST})
@ResponseBody
public Object topicCreateOrUpdateRequest(@RequestBody TopicConfigInfo topicCreateOrUpdateRequest) {
Preconditions.checkArgument(CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getBrokerNameList()) || CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getClusterNameList()), "clusterName or brokerName can not be all blank");logger.info("op=look topicCreateOrUpdateRequest={}", JsonUtil.obj2String(topicCreateOrUpdateRequest));topicService.createOrUpdate(topicCreateOrUpdateRequest);return true;
}
复制代码
而后通过MQAdminExtImpl.createAndUpdateTopicConfig办法来创立:
@Overridepublic void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, config);}
复制代码
通过调用DefaultMQAdminExtImpl.createAndUpdateTopicConfig创立Topic
@Override
public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
}
复制代码
最初通过MQClientAPIImpl.createTopic创立Topic
public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); requestHeader.setTopic(topicConfig.getTopicName()); requestHeader.setDefaultTopic(defaultTopic); requestHeader.setReadQueueNums(topicConfig.getReadQueueNums()); requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums()); requestHeader.setPerm(topicConfig.getPerm()); requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name()); requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag()); requestHeader.setOrder(topicConfig.isOrder()); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { return; } default: break; } throw new MQClientException(response.getCode(), response.getRemark());}