一、RocketMQ架构简介
1.1 逻辑部署图
(图片来自网络)
1.2 外围组件阐明
通过上图能够看到,RocketMQ的外围组件次要包含4个,别离是NameServer、Broker、Producer和Consumer,上面咱们先顺次简略阐明下这四个外围组件:
NameServer:NameServer充当路由信息的提供者。生产者或消费者可能通过NameServer查找各Topic相应的Broker IP列表。多个Namesrver实例组成集群,但互相独立,没有信息替换。
Broker:音讯直达角色,负责存储音讯、转发音讯。Broker服务器在RocketMQ零碎中负责接管从生产者发送来的音讯并存储、同时为消费者的拉取申请作筹备。Broker服务器也存储音讯相干的元数据,包含消费者组、生产进度偏移和主题和队列音讯等。
Producer:负责生产音讯,个别由业务零碎负责生产音讯。一个音讯生产者会把业务利用零碎里产生的音讯发送到Broker服务器。RocketMQ提供多种发送形式,同步发送、异步发送、程序发送、单向发送。同步和异步形式均须要Broker返回确认信息,单向发送不须要。
Consumer:负责生产音讯,个别是后盾零碎负责异步生产。一个音讯消费者会从Broker服务器拉取音讯、并将其提供给应用程序。从用户利用的角度而言提供了两种生产模式:拉取式生产、推动式生产。
除了下面说的三个外围组件外,还有Topic这个概念上面也会屡次提到:
Topic:示意一类音讯的汇合,每个Topic蕴含若干条音讯,每条音讯只能属于一个Topic,是RocketMQ进行音讯订阅的根本单位。一个Topic能够分片在多个Broker集群上,每一个Topic分片蕴含多个queue,具体构造能够参考下图:
1.3 设计理念
RocketMQ是基于主题的公布与订阅模式,外围性能包含音讯发送、音讯存储、音讯生产,整体设计谋求简略与性能第一,演绎来说次要是上面三种:
- NameServer取代ZK充当注册核心,NameServer集群间互不通信,容忍路由信息在集群内分钟级不统一,更加轻量级;
- 应用内存映射机制实现高效的IO存储,达到高吞吐量;
- 容忍设计缺点,通过ACK确保音讯至多生产一次,然而如果ACK失落,可能音讯反复生产,这种状况设计上容许,交给使用者本人保障。
本文重点介绍的就是NameServer,咱们上面一起来看下NameServer是如何启动以及如何进行路由治理的。
二、NameServer架构设计
在第一章曾经简略介绍了NameServer取代zk作为一种更轻量级的注册核心充当路由信息的提供者。那么具体是如何来实现路由信息管理的呢?咱们先看下图:
下面的图形容了NameServer进行路由注册、路由剔除和路由发现的外围原理。
路由注册:Broker服务器在启动的时候会想NameServer集群中所有的NameServer发送心跳信号进行注册,并会每隔30秒向nameserver发送心跳,通知NameServer本人活着。NameServer接管到Broker发送的心跳包之后,会记录该broker信息,并保留最近一次收到心跳包的工夫。
路由剔除:NameServer和每个Broker放弃长连贯,每隔30秒接管Broker发送的心跳包,同时本身每个10秒扫描BrokerLiveTable,比拟上次收到心跳工夫和以后工夫比拟是否大于120秒,如果超过,那么认为Broker不可用,剔除路由表中该Broker相干信息。
路由发现:路由发现不是实时的,路由变动后,NameServer不被动推给客户端,期待producer定期拉取最新路由信息。这样的设计形式升高了NameServer实现的复杂性,当路由发生变化时通过在音讯发送端的容错机制来保障音讯发送的高可用(这块内容会在后续介绍producer音讯发送时介绍,本文不开展解说)。
高可用:NameServer通过部署多台NameServer服务器来保障本身的高可用,同时多个NameServer服务器之间不进行通信,这样路由信息发生变化时,各个NameServer服务器之间数据可能不是完全相同的,然而通过发送端的容错机制保障音讯发送的高可用。这个也正是NameServer谋求简略高效的目标所在。
三、 启动流程
在整顿理解了NameServer的架构设计之后,咱们先来看下NameServer到底是如何启动的呢?
既然是源码解读,那么咱们先来看下代码入口:org.apache.rocketmq.namesrv.NamesrvStartup#main(String[] args),理论调用的是main0()办法,
代码如下:
public static NamesrvController main0(String[] args) {
try {
//创立namesrvController
NamesrvController controller = createNamesrvController(args);
//初始化并启动NamesrvController
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
通过main办法启动NameServer,次要分为两大步,先创立NamesrvController,而后再初始化并启动NamesrvController。咱们别离开展来剖析。
3.1 时序图
具体开展浏览代码之前,咱们先通过一个序列图对整体流程有个理解,如下图:
3.2 创立NamesrvController
先来看外围代码,如下:
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
// 设置版本号为以后版本号
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
//结构org.apache.commons.cli.Options,并增加-h -n参数,-h参数是打印帮忙信息,-n参数是指定namesrvAddr
Options options = ServerUtil.buildCommandlineOptions(new Options());
//初始化commandLine,并在options中增加-c -p参数,-c指定nameserver的配置文件门路,-p标识打印配置信息
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
//nameserver配置类,业务参数
final NamesrvConfig namesrvConfig = new NamesrvConfig();
//netty服务器配置类,网络参数
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
//设置nameserver的端口号
nettyServerConfig.setListenPort(9876);
//命令带有-c参数,阐明指定配置文件,须要依据配置文件门路读取配置文件内容,并将文件中配置信息赋值给NamesrvConfig和NettyServerConfig
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
//反射的形式
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
//设置配置文件门路
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
//命令行带有-p,阐明是打印参数的命令,那么就打印出NamesrvConfig和NettyServerConfig的属性。在启动NameServer时能够先应用./mqnameserver -c configFile -p打印以后加载的配置属性
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
//打印参数命令不须要启动nameserver服务,只须要打印参数即可
System.exit(0);
}
//解析命令行参数,并加载到namesrvConfig中
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
//查看ROCKETMQ_HOME,不能为空
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
//初始化logback日志工厂,rocketmq默认应用logback作为日志输入
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
//创立NamesrvController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
//将全局Properties的内容复制到NamesrvController.Configuration.allConfigs中
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
通过上面对每一行代码的正文,能够看进去,创立NamesrvController的过程次要分为两步:
Step1:通过命令行中获取配置。赋值给NamesrvConfig和NettyServerConfig类。
Step2:依据配置类NamesrvConfig和NettyServerConfig结构一个NamesrvController实例。
可见NamesrvConfig和NettyServerConfig是想当重要的,这两个类别离是NameServer的业务参数和网络参数,咱们别离看下这两个类外面有哪些属性:
NamesrvConfig
NettyServerConfig
注:Apache Commons CLI是开源的命令行解析工具,它能够帮忙开发者疾速构建启动命令,并且帮忙你组织命令的参数、以及输入列表等。
3.3 初始化并启动
创立了NamesrvController实例之后,开始初始化并启动NameServer。
首先进行初始化,代码入口是NamesrvController#initialize。
public boolean initialize() {
//加载kvConfigPath下kvConfig.json配置文件里的KV配置,而后将这些配置放到KVConfigManager#configTable属性中
this.kvConfigManager.load();
//依据nettyServerConfig初始化一个netty服务器。
//brokerHousekeepingService是在NamesrvController实例化时构造函数里实例化的,该类负责Broker连贯事件的解决,实现了ChannelEventListener,次要用来治理RouteInfoManager的brokerLiveTable
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//初始化负责解决Netty网络交互数据的线程池,默认线程数是8个
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//注册Netty服务端业务解决逻辑,如果开启了clusterTest,那么注册的申请解决类是ClusterTestRequestProcessor,否则申请解决类是DefaultRequestProcessor
this.registerProcessor();
//注册心跳机制线程池,提早5秒启动,每隔10秒遍历RouteInfoManager#brokerLiveTable这个属性,用来扫描不存活的broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
//注册打印KV配置线程池,提早1分钟启动、每10分钟打印出kvConfig配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
//rocketmq能够通过开启TLS来进步数据传输的安全性,如果开启了,那么须要注册一个监听器来从新加载SslContext
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
下面的代码是NameServer初始化流程,通过每行代码的正文,能够看进去,次要有5步骤操作:
- Step1:加载KV配置,并写入到KVConfigManager的configTable属性中;
- Step2:初始化netty服务器;
- Step3:初始化解决netty网络交互数据的线程池;
- Step4:注册心跳机制线程池,启动5秒后每隔10秒检测一次Broker的存活状况;
- Step5:注册打印KV配置的线程池,启动1分钟后,每隔10分钟打印一次KV配置。
RocketMQ的开发团队还应用了一个罕用的编程技巧,就是应用JVM钩子函数对NameServer进行优雅停机。这样在JVM过程敞开前,会先执行shutdown操作。
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
执行start函数,启动NameServer。代码比较简单,就是将第一步中创立的netty server进行启动。其中remotingServer.start()办法不开展具体阐明了,须要对netty比拟相熟,不是本篇文章重点,有趣味的同学能够自行下载源码浏览。
public void start() throws Exception {
//启动netty服务
this.remotingServer.start();
//如果开启了TLS
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
四、路由治理
咱们在第二章开篇有理解到NameServer作为一个轻量级的注册核心,次要是为音讯生产者和消费者提供Topic的路由信息,并对这些路由信息和Broker节点进行治理,次要包含路由注册、路由剔除和路由发现。
本章将会通过源码的角度来具体分析NameServer是如果进行路由信息管理的。外围代码次要都在org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager中实现。
4.1 路由元信息
在理解路由信息管理之前,咱们首先须要理解NameServer到底存储了哪些路由元信息,数据结构别离是什么样的。
查看代码咱们能够看到次要通过5个属性来保护路由元信息,如下:
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
咱们顺次对这5个属性进行开展阐明。
4.1.1 TopicQueueTable
阐明:Topic音讯队列路由信息,音讯发送时依据路由表进行负载平衡。
数据结构:HashMap构造,key是Topic名字,value是一个类型是QueueData的队列汇合。在第一章就讲过,一个Topic中有多个队列。QueueData的数据结构如下:
数据结构示例:
topicQueueTable:{
"topic1": [
{
"brokerName": "broker-a",
"readQueueNums":4,
"writeQueueNums":4,
"perm":6,
"topicSynFlag":0,
},
{
"brokerName": "broker-b",
"readQueueNums":4,
"writeQueueNums":4,
"perm":6,
"topicSynFlag":0,
}
]
}
4.1.2 BrokerAddrTable
阐明:Broker根底信息,蕴含BrokerName、所属集群名称、主备Broker地址。
数据结构:HashMap构造,key是BrokerName,value是一个类型是BrokerData的对象。BrokerData的数据结构如下(能够联合上面Broker主从构造逻辑图来了解):
Broker主从构造逻辑图:
数据结构示例:
brokerAddrTable:{
"broker-a": {
"cluster": "c1",
"brokerName": "broker-a",
"brokerAddrs": {
0: "192.168.1.1:10000",
1: "192.168.1.2:10000"
}
},
"broker-b": {
"cluster": "c1",
"brokerName": "broker-b",
"brokerAddrs": {
0: "192.168.1.3:10000",
1: "192.168.1.4:10000"
}
}
}
4.1.3 ClusterAddrTable
阐明:Broker集群信息,存储集群中所有Broker名称。
数据结构:HashMap构造,key是ClusterName,value是存储BrokerName的Set构造。
数据结构示例:
clusterAddrTable:{
"c1": ["broker-a","broker-b"]
}
4.1.4 BrokerLiveTable
阐明:Broker状态信息。NameServer每次收到心跳包时会替换该信息
数据结构:HashMap构造,key是Broker的地址,value是BrokerLiveInfo构造的该Broker信息对象。BrokerLiveInfo的数据结构如下:
数据结构示例:
brokerLiveTable:{
"192.168.1.1:10000": {
"lastUpdateTimestamp": 1518270318980,
"dataVersion":versionObj1,
"channel":channelObj,
"haServerAddr":""
},
"192.168.1.2:10000": {
"lastUpdateTimestamp": 1518270318980,
"dataVersion":versionObj1,
"channel":channelObj,
"haServerAddr":"192.168.1.1:10000"
},
"192.168.1.3:10000": {
"lastUpdateTimestamp": 1518270318980,
"dataVersion":versionObj1,
"channel":channelObj,
"haServerAddr":""
},
"192.168.1.4:10000": {
"lastUpdateTimestamp": 1518270318980,
"dataVersion":versionObj1,
"channel":channelObj,
"haServerAddr":"192.168.1.3:10000"
}
}
4.1.5 filterServerTable
阐明:Broker上的FilterServer列表,音讯过滤服务器列表,后续介绍Consumer时会介绍,consumer拉取数据是通过filterServer拉取,consumer向Broker注册。
数据结构:HashMap构造,key是Broker地址,value是记录了filterServer地址的List汇合。
4.2 路由注册
路由注册是通过Broker和NameServer之间的心跳性能来实现的。次要分为两步:
Step1:
Broker启动时向集群中所有NameServer发送心跳语句,每隔30秒(默认30s,工夫距离在10秒到60秒之间)再发一次。
Step2:
NameServer收到心跳包更新topicQueueTable,brokerAddrTable,brokerLiveTable,clusterAddrTable,filterServerTable。
咱们别离开展剖析这两步。
4.2.1 Broker发送心跳包
发送心跳包的外围逻辑是在Broker启动逻辑里,代码入口是org.apache.rocketmq.broker.BrokerController#start,本篇文章重点关注的是发送心跳包的逻辑实现,只列出发送心跳包的外围代码,如下:
1)创立了一个线程池注册Broker,程序启动10秒后执行,每隔30秒(默认30s,工夫距离在10秒到60秒之间,BrokerConfig.getRegisterNameServerPeriod()的默认值是30秒)执行一次。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
2)封装Topic配置和版本号之后,进行理论的路由注册(注:封装Topic配置不是本篇重点,会在介绍Broker源码时重点解说)。理论路由注册是在org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll中实现,外围代码如下:
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
//获取nameserver地址列表
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
/**
*封装申请包头start
*封装申请包头,次要封装broker相干信息
**/
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
//封装requestBody,包含topic和filterServerList相干信息
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
/**
*封装申请包头end
**/
//开启多线程到每个nameserver进行注册
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
//理论进行注册办法
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
//封装nameserver返回的信息
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}
从下面代码来看,也比较简单,首先须要封装申请包头和requestBody,而后开启多线程到每个NameServer服务器去注册。
申请包头类型为RegisterBrokerRequestHeader,次要包含如下字段:
requestBody类型是RegisterBrokerBody,次要包含如下字段:
1)理论的路由注册是通过registerBroker办法实现,外围代码如下:
private RegisterBrokerResult registerBroker(
final String namesrvAddr,
final boolean oneway,
final int timeoutMills,
final RegisterBrokerRequestHeader requestHeader,
final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
//创立申请指令,须要留神RequestCode.REGISTER_BROKER,nameserver端的网络处理器会依据requestCode进行相应的业务解决
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);
//基于netty进行网络传输
if (oneway) {
//如果是单向调用,没有返回值,不返回nameserver返回后果
try {
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null;
}
//异步调用向nameserver发动注册,获取nameserver的返回信息
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
//获取返回的reponseHeader
RegisterBrokerResponseHeader responseHeader =
(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
//从新封装返回后果,更新masterAddr和haServerAddr
RegisterBrokerResult result = new RegisterBrokerResult();
result.setMasterAddr(responseHeader.getMasterAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());
if (response.getBody() != null) {
result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
}
return result;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
}
borker和NameServer之间通过netty进行网络传输,Broker向NameServer发动注册时会在申请中增加注册码RequestCode.REGISTER_BROKER。这是一种网络跟踪办法,RocketMQ的每个申请都会定义一个requestCode,服务端的网络处理器会依据不同的requestCode进行影响的业务解决。
4.2.2 NameServer解决心跳包
Broker收回路由注册的心跳包之后,NameServer会依据心跳包中的requestCode进行解决。NameServer的默认网络处理器是DefaultRequestProcessor,具体代码如下:
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()) {
......
//,如果是RequestCode.REGISTER_BROKER,进行broker注册
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
......
default:
break;
}
return null;
}
判断requestCode,如果是RequestCode.REGISTER\_BROKER,那么确定业务解决逻辑是注册Broker。依据Broker版本号抉择不同的办法,咱们已V3\_0_11以上为例,调用registerBrokerWithFilterServer办法进行注册次要步骤分为三步:
Step1:
解析requestHeader并验签(基于crc32),判断数据是否正确;
Step2:
解析Topic信息;
Step3:
调用RouteInfoManager#registerBroker来进行Broker注册;
外围注册逻辑是由RouteInfoManager#registerBroker来实现,外围代码如下:
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
//加写锁,避免并发写RoutInfoManager中的路由表信息。
this.lock.writeLock().lockInterruptibly();
//依据clusterName从clusterAddrTable中获取所有broker名字汇合
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
//如果没有获取到,阐明broker所属集群还没记录,那么须要创立,并将brokerName退出到集群的broker汇合中
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
//依据brokerName尝试从brokerAddrTable中获取brokerData
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
//如果没获取到brokerData,新建BrokerData并放入brokerAddrTable,registerFirst设为true;
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
//更新brokerData中的brokerAddrs
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//思考到可能呈现master挂了,slave变成master的状况,这时候brokerId会变成0,这时候须要把老的brokerAddr给删除
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
//更新brokerAddrs,依据返回的oldAddr判断是否是第一次注册的broker
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
//如过Broker是Master,并且Broker的Topic配置信息发生变化或者是首次注册,须要创立或更新Topic路由元数据,填充topicQueueTable
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
//创立或更新Topic路由元数据
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
//更新BrokerLivelnfo,BrokeLivelnfo是执行路由删除的重要依据
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
//注册Broker的filterServer地址列表
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
//如果此Broker为从节点,则须要查找Broker Master的节点信息,并更新对应masterAddr属性
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
通过下面的源码剖析,能够合成出一个Broker的注册次要分7步:
- Step1:加写锁,避免并发写RoutInfoManager中的路由表信息;
- Step2:判断Broker所属集群是否存在,不存在须要创立,并将Broker名退出到集群Broker汇合中;
- Step3:保护BrokerData;
- Step4:如过Broker是Master,并且Broker的Topic配置信息发生变化或者是首次注册,须要创立或更新Topic路由元数据,填充TopicQueueTable;
- Step5:更新BrokerLivelnfo;
- Step6:注册Broker的filterServer地址列表;
- Step7:如果此Broker为从节点,则须要查找Broker Master的节点信息,并更新对应masterAddr属性,并返回给Broker端。
4.3 路由剔除
4.3.1 触发条件
路由剔除的触发条件次要有两个:
NameServer每隔10s扫描BrokerLiveTable,间断120s没收到心跳包,则移除该Broker并敞开socket连贯;
Broker失常敞开时触发路由删除。
4.3.2 源码解析
下面形容的触发点最终删除路由的逻辑是一样的,对立在RouteInfoManager#onChannelDestroy
中实现,外围代码如下:
public void onChannelDestroy(String remoteAddr, Channel channel) {
String brokerAddrFound = null;
if (channel != null) {
try {
try {
//加读锁
this.lock.readLock().lockInterruptibly();
//通过channel从brokerLiveTable中找出对应的Broker地址
Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
this.brokerLiveTable.entrySet().iterator();
while (itBrokerLiveTable.hasNext()) {
Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
if (entry.getValue().getChannel() == channel) {
brokerAddrFound = entry.getKey();
break;
}
}
} finally {
//开释读锁
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
//若该Broker曾经从存活的Broker地址列表中被革除,则间接应用remoteAddr
if (null == brokerAddrFound) {
brokerAddrFound = remoteAddr;
} else {
log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
}
if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
try {
try {
//申请写锁
this.lock.writeLock().lockInterruptibly();
//依据brokerAddress,将这个brokerAddress从brokerLiveTable和filterServerTable中移除
this.brokerLiveTable.remove(brokerAddrFound);
this.filterServerTable.remove(brokerAddrFound);
String brokerNameFound = null;
boolean removeBrokerName = false;
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
this.brokerAddrTable.entrySet().iterator();
//遍历brokerAddrTable
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {
BrokerData brokerData = itBrokerAddrTable.next().getValue();
Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> entry = it.next();
Long brokerId = entry.getKey();
String brokerAddr = entry.getValue();
//依据brokerAddress找到对应的brokerData,并将brokerData中对应的brokerAddress移除
if (brokerAddr.equals(brokerAddrFound)) {
brokerNameFound = brokerData.getBrokerName();
it.remove();
log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
brokerId, brokerAddr);
break;
}
}
//如果移除后,整个brokerData的brokerAddress空了,那么将整个brokerData移除
if (brokerData.getBrokerAddrs().isEmpty()) {
removeBrokerName = true;
itBrokerAddrTable.remove();
log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
brokerData.getBrokerName());
}
}
if (brokerNameFound != null && removeBrokerName) {
//遍历clusterAddrTable
Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, Set<String>> entry = it.next();
String clusterName = entry.getKey();
Set<String> brokerNames = entry.getValue();
//依据第三步中获取的须要移除的brokerName,将对应的brokerName移除了
boolean removed = brokerNames.remove(brokerNameFound);
if (removed) {
log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
brokerNameFound, clusterName);
//如果移除后,该汇合为空,那么将整个集群从clusterAddrTable中移除
if (brokerNames.isEmpty()) {
log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
clusterName);
it.remove();
}
break;
}
}
}
if (removeBrokerName) {
Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
this.topicQueueTable.entrySet().iterator();
//遍历topicQueueTable
while (itTopicQueueTable.hasNext()) {
Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
String topic = entry.getKey();
List<QueueData> queueDataList = entry.getValue();
Iterator<QueueData> itQueueData = queueDataList.iterator();
while (itQueueData.hasNext()) {
QueueData queueData = itQueueData.next();
//依据brokerName,将topic下对应的broker移除掉
if (queueData.getBrokerName().equals(brokerNameFound)) {
itQueueData.remove();
log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
topic, queueData);
}
}
//如果该topic下只有一个待移除的broker,那么该topic也从table中移除
if (queueDataList.isEmpty()) {
itTopicQueueTable.remove();
log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
topic);
}
}
}
} finally {
//开释写锁
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("onChannelDestroy Exception", e);
}
}
}
路由删除整体逻辑次要分为6步:
- Step1:加readlock,通过channel从BrokerLiveTable中找出对应的Broker地址,开释readlock,若该Broker曾经从存活的Broker地址列表中被革除,则间接应用remoteAddr。
- Step2:申请写锁,依据BrokerAddress从BrokerLiveTable、filterServerTable移除。
- Step3:遍历BrokerAddrTable,依据BrokerAddress找到对应的brokerData,并将brokerData中对应的brokerAddress移除,如果移除后,整个brokerData的brokerAddress空了,那么将整个brokerData移除。
- Step4:遍历clusterAddrTable,依据第三步中获取的须要移除的BrokerName,将对应的brokerName移除了。如果移除后,该汇合为空,那么将整个集群从clusterAddrTable中移除。
- Step5:遍历TopicQueueTable,依据BrokerName,将Topic下对应的Broker移除掉,如果该Topic下只有一个待移除的Broker,那么该Topic也从table中移除。
- Step6:开释写锁。
从下面能够看出,路由剔除的整体逻辑比较简单,就是单纯地针对路由元信息的数据结构进行操作。为了大家可能更好地了解这块代码,倡议大家对照4.1中介绍的路由元信息的数据结构来进行代码走读。
4.4 路由发现
当路由信息发生变化之后,NameServer不会被动推送给客户端,而是期待客户端定期到nameserver被动拉取最新路由信息。这种设计形式升高了NameServer实现的复杂性。
4.4.1 producer被动拉取
producer在启动后会开启一系列定时工作,其中有一个工作就是定期从NameServer获取Topic路由信息。代码入口是MQClientInstance#start-ScheduledTask(),外围代码如下:
private void startScheduledTask() {
......
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//从nameserver更新最新的topic路由信息
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
......
}
/**
* 从nameserver获取topic路由信息
*/
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
......
//向nameserver发送申请包,requestCode为RequestCode.GET_ROUTEINFO_BY_TOPIC
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
......
}
producer和NameServer之间通过netty进行网络传输,producer向NameServer发动的申请中增加注册码
RequestCode.GET\_ROUTEINFO\_BY_TOPIC。
4.4.2 NameServer返回路由信息
NameServer收到producer发送的申请后,会依据申请中的requestCode进行解决。解决requestCode同样是在默认的网络处理器DefaultRequestProcessor中进行解决,最终通过RouteInfoManager#pickupTopicRouteData来实现。
TopicRouteData构造
在正式解析源码前,咱们先看下NameServer返回给producer的数据结构。通过代码能够看到,返回的是一个TopicRouteData对象,具体构造如下:
其中QueueData,BrokerData,filterServerTable在4.1章节介绍路由元信息时有介绍。
源码剖析
在理解了返回给producer的TopicRouteData构造后,咱们进入RouteInfoManager#pickupTopicRouteData办法来看下具体如何实现。
public TopicRouteData pickupTopicRouteData(final String topic) {
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
Set<String> brokerNameSet = new HashSet<String>();
List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
topicRouteData.setBrokerDatas(brokerDataList);
HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
topicRouteData.setFilterServerTable(filterServerMap);
try {
try {
//加读锁
this.lock.readLock().lockInterruptibly();
//从元数据topicQueueTable中依据topic名字获取队列汇合
List<QueueData> queueDataList = this.topicQueueTable.get(topic);
if (queueDataList != null) {
//将获取到的队列汇合写入topicRouteData的queueDatas中
topicRouteData.setQueueDatas(queueDataList);
foundQueueData = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
brokerNameSet.add(qd.getBrokerName());
}
//遍历从QueueData汇合中提取的brokerName
for (String brokerName : brokerNameSet) {
//依据brokerName从brokerAddrTable获取brokerData
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
//克隆brokerData对象,并写入到topicRouteData的brokerDatas中
BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone());
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
//遍历brokerAddrs
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
//依据brokerAddr获取filterServerList,封装后写入到topicRouteData的filterServerTable中
List<String> filterServerList = this.filterServerTable.get(brokerAddr);
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
}
} finally {
//开释读锁
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("pickupTopicRouteData Exception", e);
}
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
if (foundBrokerData && foundQueueData) {
return topicRouteData;
}
return null;
}
下面代码封装了TopicRouteData的queueDatas、BrokerDatas和filterServerTable,还有orderTopicConf字段没封装,咱们再看下这个字段是在什么时候封装的,咱们向上看RouteInfoManager#pickupTopicRouteData的调用办法DefaultRequestProcessor#getRouteInfoByTopic如下:
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
......
//这块代码就是下面解析的代码,获取到topicRouteData对象
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
//判断nameserver的orderMessageEnable配置是否关上
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
//如果配置关上了,依据namespace和topic名字获取kvConfig配置文件中程序音讯配置内容
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
//封装orderTopicConf
topicRouteData.setOrderTopicConf(orderTopicConf);
}
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
//如果没有获取到topic路由,那么reponseCode为TOPIC_NOT_EXIST
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
联合这两个办法,咱们能够总结出查找Topic路由次要分为3个步骤:
调用RouteInfoManager#pickupTopicRouteData,从topicQueueTable,brokerAddrTabl,filterServerTable中获取信息,别离填充queue-Datas、BrokerDatas、filterServerTable。
如果topic为程序音讯,那么从KVconfig中获取对于程序音讯先关的配置填充到orderTopicConf中。
如果找不到路由信息,那么返回code为ResponseCode.TOPIC\_NOT\_EXIST。
五、小结
本篇文章次要是从源码的角度给大家介绍了RocketMQ的NameServer,包含NameServer的启动流程、路由注册、路由剔除和路由发现。咱们在理解了NameServer的设计原理之后,也能够回过头思考下在设计过程中一些值得咱们学习的小技巧,在此我抛砖引玉提出两点:
- 启动流程注册JVM钩子用于优雅停机。这是一个编程技巧,咱们在理论开发过程中,如果有应用线程池或者一些常驻线程工作时,能够思考通过注册JVM钩子的形式,在JVM敞开前开释资源或者实现一些事件来保障优雅停机。
- 更新路由表时须要通过加锁避免并发操作,这里应用的是锁粒度较少的读写锁,容许多个音讯发送者并发读,保障音讯发送时的高并发,但同一时刻NameServer只解决一个Broker心跳包,多个心跳包申请串行执行,这也是读写锁经典应用场景。
六、参考资料
1、《RocketMQ技术底细》
2、《RocketMQ外围原理和实际》
3、Apache RocketMQ开发者指南
作者:vivo互联网服务器团队-Ye Wenhao
发表回复