乐趣区

关于java:深入剖析RocketMQ源码NameServer

一、RocketMQ 架构简介

1.1 逻辑部署图

(图片来自网络)

1.2 外围组件阐明

通过上图能够看到,RocketMQ 的外围组件次要包含 4 个,别离是 NameServer、Broker、Producer 和 Consumer,上面咱们先顺次简略阐明下这四个外围组件:

NameServer:NameServer 充当路由信息的提供者。生产者或消费者可能通过 NameServer 查找各 Topic 相应的 Broker IP 列表。多个 Namesrver 实例组成集群,但互相独立,没有信息替换。

Broker:音讯直达角色,负责存储音讯、转发音讯。Broker 服务器在 RocketMQ 零碎中负责接管从生产者发送来的音讯并存储、同时为消费者的拉取申请作筹备。Broker 服务器也存储音讯相干的元数据,包含消费者组、生产进度偏移和主题和队列音讯等。

Producer:负责生产音讯,个别由业务零碎负责生产音讯。一个音讯生产者会把业务利用零碎里产生的音讯发送到 Broker 服务器。RocketMQ 提供多种发送形式,同步发送、异步发送、程序发送、单向发送。同步和异步形式均须要 Broker 返回确认信息,单向发送不须要。

Consumer:负责生产音讯,个别是后盾零碎负责异步生产。一个音讯消费者会从 Broker 服务器拉取音讯、并将其提供给应用程序。从用户利用的角度而言提供了两种生产模式:拉取式生产、推动式生产。

除了下面说的三个外围组件外,还有 Topic 这个概念上面也会屡次提到:

Topic:示意一类音讯的汇合,每个 Topic 蕴含若干条音讯,每条音讯只能属于一个 Topic,是 RocketMQ 进行音讯订阅的根本单位。一个 Topic 能够分片在多个 Broker 集群上,每一个 Topic 分片蕴含多个 queue,具体构造能够参考下图:

1.3 设计理念

RocketMQ 是基于主题的公布与订阅模式,外围性能包含音讯发送、音讯存储、音讯生产,整体设计谋求简略与性能第一,演绎来说次要是上面三种:

  • NameServer 取代 ZK 充当注册核心,NameServer 集群间互不通信,容忍路由信息在集群内分钟级不统一,更加轻量级;
  • 应用内存映射机制实现高效的 IO 存储,达到高吞吐量;
  • 容忍设计缺点,通过 ACK 确保音讯至多生产一次,然而如果 ACK 失落,可能音讯反复生产,这种状况设计上容许,交给使用者本人保障。

本文重点介绍的就是 NameServer,咱们上面一起来看下 NameServer 是如何启动以及如何进行路由治理的。

二、NameServer 架构设计

在第一章曾经简略介绍了 NameServer 取代 zk 作为一种更轻量级的注册核心充当路由信息的提供者。那么具体是如何来实现路由信息管理的呢?咱们先看下图:

下面的图形容了 NameServer 进行路由注册、路由剔除和路由发现的外围原理。

路由注册 :Broker 服务器在启动的时候会想 NameServer 集群中所有的 NameServer 发送心跳信号进行注册,并会每隔 30 秒向 nameserver 发送心跳,通知 NameServer 本人活着。NameServer 接管到 Broker 发送的心跳包之后,会记录该 broker 信息,并保留最近一次收到心跳包的工夫。

路由剔除 :NameServer 和每个 Broker 放弃长连贯,每隔 30 秒接管 Broker 发送的心跳包,同时本身每个 10 秒扫描 BrokerLiveTable,比拟上次收到心跳工夫和以后工夫比拟是否大于 120 秒,如果超过,那么认为 Broker 不可用,剔除路由表中该 Broker 相干信息。

路由发现 :路由发现不是实时的,路由变动后,NameServer 不被动推给客户端,期待 producer 定期拉取最新路由信息。这样的设计形式升高了 NameServer 实现的复杂性,当路由发生变化时通过在音讯发送端的容错机制来保障音讯发送的高可用(这块内容会在后续介绍 producer 音讯发送时介绍,本文不开展解说)。

高可用 :NameServer 通过部署多台 NameServer 服务器来保障本身的高可用,同时多个 NameServer 服务器之间不进行通信,这样路由信息发生变化时,各个 NameServer 服务器之间数据可能不是完全相同的,然而通过发送端的容错机制保障音讯发送的高可用。这个也正是 NameServer 谋求简略高效的目标所在。

三、启动流程

在整顿理解了 NameServer 的架构设计之后,咱们先来看下 NameServer 到底是如何启动的呢?

既然是源码解读,那么咱们先来看下代码入口:org.apache.rocketmq.namesrv.NamesrvStartup#main(String[] args),理论调用的是 main0() 办法,

代码如下:

public static NamesrvController main0(String[] args) {
​
    try {
        // 创立 namesrvController
        NamesrvController controller = createNamesrvController(args);
        // 初始化并启动 NamesrvController
        start(controller);
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {e.printStackTrace();
        System.exit(-1);
    }
​
    return null;
}

通过 main 办法启动 NameServer,次要分为两大步,先创立 NamesrvController,而后再初始化并启动 NamesrvController。咱们别离开展来剖析。

3.1 时序图

具体开展浏览代码之前,咱们先通过一个序列图对整体流程有个理解,如下图:

3.2 创立 NamesrvController

先来看外围代码,如下:

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
    // 设置版本号为以后版本号
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    //PackageConflictDetect.detectFastjson();
  // 结构 org.apache.commons.cli.Options, 并增加 -h - n 参数,- h 参数是打印帮忙信息,- n 参数是指定 namesrvAddr
    Options options = ServerUtil.buildCommandlineOptions(new Options());
    // 初始化 commandLine,并在 options 中增加 -c - p 参数,- c 指定 nameserver 的配置文件门路,- p 标识打印配置信息
    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
    if (null == commandLine) {System.exit(-1);
        return null;
    }
  //nameserver 配置类,业务参数
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    //netty 服务器配置类,网络参数
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    // 设置 nameserver 的端口号
    nettyServerConfig.setListenPort(9876);
    // 命令带有 - c 参数,阐明指定配置文件,须要依据配置文件门路读取配置文件内容,并将文件中配置信息赋值给 NamesrvConfig 和 NettyServerConfig
    if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');
        if (file != null) {InputStream in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            properties.load(in);
            // 反射的形式
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);
      // 设置配置文件门路
            namesrvConfig.setConfigStorePath(file);
​
            System.out.printf("load config properties file OK, %s%n", file);
            in.close();}
    }
  // 命令行带有 -p,阐明是打印参数的命令,那么就打印出 NamesrvConfig 和 NettyServerConfig 的属性。在启动 NameServer 时能够先应用./mqnameserver -c configFile - p 打印以后加载的配置属性 
    if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
        MixAll.printObjectProperties(console, namesrvConfig);
        MixAll.printObjectProperties(console, nettyServerConfig);
        // 打印参数命令不须要启动 nameserver 服务,只须要打印参数即可
        System.exit(0);
    }
  // 解析命令行参数,并加载到 namesrvConfig 中
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
  // 查看 ROCKETMQ_HOME,不能为空
    if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
        System.exit(-2);
    }
  // 初始化 logback 日志工厂,rocketmq 默认应用 logback 作为日志输入
    LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
    JoranConfigurator configurator = new JoranConfigurator();
    configurator.setContext(lc);
    lc.reset();
    configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
​
    log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
​
    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);
  // 创立 NamesrvController
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
​
    // 将全局 Properties 的内容复制到 NamesrvController.Configuration.allConfigs 中
    // remember all configs to prevent discard
    controller.getConfiguration().registerConfig(properties);
​
    return controller;
}

通过上面对每一行代码的正文,能够看进去,创立 NamesrvController 的过程次要分为两步:

Step1:通过命令行中获取配置。赋值给 NamesrvConfig 和 NettyServerConfig 类。

Step2:依据配置类 NamesrvConfig 和 NettyServerConfig 结构一个 NamesrvController 实例。

可见 NamesrvConfig 和 NettyServerConfig 是想当重要的,这两个类别离是 NameServer 的业务参数和网络参数,咱们别离看下这两个类外面有哪些属性:

NamesrvConfig

NettyServerConfig

注:Apache Commons CLI 是开源的命令行解析工具,它能够帮忙开发者疾速构建启动命令,并且帮忙你组织命令的参数、以及输入列表等。

3.3 初始化并启动

创立了 NamesrvController 实例之后,开始初始化并启动 NameServer。

首先进行初始化,代码入口是 NamesrvController#initialize。

public boolean initialize() {
  // 加载 kvConfigPath 下 kvConfig.json 配置文件里的 KV 配置,而后将这些配置放到 KVConfigManager#configTable 属性中
    this.kvConfigManager.load();
  // 依据 nettyServerConfig 初始化一个 netty 服务器。//brokerHousekeepingService 是在 NamesrvController 实例化时构造函数里实例化的,该类负责 Broker 连贯事件的解决,实现了 ChannelEventListener,次要用来治理 RouteInfoManager 的 brokerLiveTable
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
  // 初始化负责解决 Netty 网络交互数据的线程池,默认线程数是 8 个
    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
  // 注册 Netty 服务端业务解决逻辑,如果开启了 clusterTest,那么注册的申请解决类是 ClusterTestRequestProcessor,否则申请解决类是 DefaultRequestProcessor
    this.registerProcessor();
  // 注册心跳机制线程池,提早 5 秒启动,每隔 10 秒遍历 RouteInfoManager#brokerLiveTable 这个属性,用来扫描不存活的 broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
​
        @Override
        public void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);
  // 注册打印 KV 配置线程池,提早 1 分钟启动、每 10 分钟打印出 kvConfig 配置
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
​
        @Override
        public void run() {NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);
  //rocketmq 能够通过开启 TLS 来进步数据传输的安全性,如果开启了,那么须要注册一个监听器来从新加载 SslContext
    if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
        // Register a listener to reload SslContext
        try {
            fileWatchService = new FileWatchService(new String[] {
                    TlsSystemConfig.tlsServerCertPath,
                    TlsSystemConfig.tlsServerKeyPath,
                    TlsSystemConfig.tlsServerTrustCertPath
                },
                new FileWatchService.Listener() {
                    boolean certChanged, keyChanged = false;
                    @Override
                    public void onChanged(String path) {if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {log.info("The trust certificate changed, reload the ssl context");
                            reloadServerSslContext();}
                        if (path.equals(TlsSystemConfig.tlsServerCertPath)) {certChanged = true;}
                        if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {keyChanged = true;}
                        if (certChanged && keyChanged) {log.info("The certificate and private key changed, reload the ssl context");
                            certChanged = keyChanged = false;
                            reloadServerSslContext();}
                    }
                    private void reloadServerSslContext() {((NettyRemotingServer) remotingServer).loadSslContext();}
                });
        } catch (Exception e) {log.warn("FileWatchService created error, can't load the certificate dynamically");
        }
    }
​
    return true;
}

下面的代码是 NameServer 初始化流程,通过每行代码的正文,能够看进去,次要有 5 步骤操作:

  • Step1:加载 KV 配置,并写入到 KVConfigManager 的 configTable 属性中;
  • Step2:初始化 netty 服务器;
  • Step3:初始化解决 netty 网络交互数据的线程池;
  • Step4:注册心跳机制线程池,启动 5 秒后每隔 10 秒检测一次 Broker 的存活状况;
  • Step5:注册打印 KV 配置的线程池,启动 1 分钟后,每隔 10 分钟打印一次 KV 配置。

RocketMQ 的开发团队还应用了一个罕用的编程技巧,就是应用 JVM 钩子函数对 NameServer 进行优雅停机。这样在 JVM 过程敞开前,会先执行 shutdown 操作。

Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
    @Override
    public Void call() throws Exception {controller.shutdown();
        return null;
    }
}));

执行 start 函数,启动 NameServer。代码比较简单,就是将第一步中创立的 netty server 进行启动。其中 remotingServer.start() 办法不开展具体阐明了,须要对 netty 比拟相熟,不是本篇文章重点,有趣味的同学能够自行下载源码浏览。

public void start() throws Exception {
    // 启动 netty 服务
    this.remotingServer.start();
  // 如果开启了 TLS
    if (this.fileWatchService != null) {this.fileWatchService.start();
    }
}

四、路由治理

咱们在第二章开篇有理解到 NameServer 作为一个轻量级的注册核心,次要是为音讯生产者和消费者提供 Topic 的路由信息,并对这些路由信息和 Broker 节点进行治理,次要包含路由注册、路由剔除和路由发现。

本章将会通过源码的角度来具体分析 NameServer 是如果进行路由信息管理的。外围代码次要都在 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager 中实现。

4.1 路由元信息

在理解路由信息管理之前,咱们首先须要理解 NameServer 到底存储了哪些路由元信息,数据结构别离是什么样的。

查看代码咱们能够看到次要通过 5 个属性来保护路由元信息,如下:

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

咱们顺次对这 5 个属性进行开展阐明。

4.1.1 TopicQueueTable

阐明:Topic 音讯队列路由信息,音讯发送时依据路由表进行负载平衡。

数据结构:HashMap 构造,key 是 Topic 名字,value 是一个类型是 QueueData 的队列汇合。在第一章就讲过,一个 Topic 中有多个队列。QueueData 的数据结构如下:

数据结构示例:

topicQueueTable:{
    "topic1": [
        {
            "brokerName": "broker-a",
            "readQueueNums":4,
            "writeQueueNums":4,
            "perm":6,
            "topicSynFlag":0,
        },
        {
            "brokerName": "broker-b",
            "readQueueNums":4,
            "writeQueueNums":4,
            "perm":6,
            "topicSynFlag":0,
        }
    ]
}

4.1.2 BrokerAddrTable

阐明:Broker 根底信息,蕴含 BrokerName、所属集群名称、主备 Broker 地址。

数据结构 :HashMap 构造,key 是 BrokerName,value 是一个类型是 BrokerData 的对象。BrokerData 的数据结构如下 (能够联合上面 Broker 主从构造逻辑图来了解):

Broker 主从构造逻辑图:

数据结构示例:

brokerAddrTable:{
    "broker-a": {
        "cluster": "c1",
        "brokerName": "broker-a",
        "brokerAddrs": {
            0: "192.168.1.1:10000",
            1: "192.168.1.2:10000"
        }
    },
    "broker-b": {
        "cluster": "c1",
        "brokerName": "broker-b",
        "brokerAddrs": {
            0: "192.168.1.3:10000",
            1: "192.168.1.4:10000"
        }
    }
}

4.1.3 ClusterAddrTable

阐明:Broker 集群信息,存储集群中所有 Broker 名称。

数据结构:HashMap 构造,key 是 ClusterName,value 是存储 BrokerName 的 Set 构造。

数据结构示例:

clusterAddrTable:{"c1": ["broker-a","broker-b"]
}

4.1.4 BrokerLiveTable

阐明:Broker 状态信息。NameServer 每次收到心跳包时会替换该信息

数据结构 :HashMap 构造,key 是 Broker 的地址,value 是 BrokerLiveInfo 构造的该 Broker 信息对象。BrokerLiveInfo 的数据结构如下:

数据结构示例:

brokerLiveTable:{
    "192.168.1.1:10000": {
            "lastUpdateTimestamp": 1518270318980,
            "dataVersion":versionObj1,
            "channel":channelObj,
            "haServerAddr":""},"192.168.1.2:10000": {"lastUpdateTimestamp": 1518270318980,"dataVersion":versionObj1,"channel":channelObj,"haServerAddr":"192.168.1.1:10000"},"192.168.1.3:10000": {"lastUpdateTimestamp": 1518270318980,"dataVersion":versionObj1,"channel":channelObj,"haServerAddr":""},
    "192.168.1.4:10000": {
            "lastUpdateTimestamp": 1518270318980,
            "dataVersion":versionObj1,
            "channel":channelObj,
            "haServerAddr":"192.168.1.3:10000"
     }
}

4.1.5 filterServerTable

阐明:Broker 上的 FilterServer 列表,音讯过滤服务器列表,后续介绍 Consumer 时会介绍,consumer 拉取数据是通过 filterServer 拉取,consumer 向 Broker 注册。

数据结构 :HashMap 构造,key 是 Broker 地址,value 是记录了 filterServer 地址的 List 汇合。

4.2 路由注册

路由注册是通过 Broker 和 NameServer 之间的心跳性能来实现的。次要分为两步:

Step1:

Broker 启动时向集群中所有 NameServer 发送心跳语句,每隔 30 秒(默认 30s,工夫距离在 10 秒到 60 秒之间)再发一次。

Step2:

NameServer 收到心跳包更新 topicQueueTable,brokerAddrTable,brokerLiveTable,clusterAddrTable,filterServerTable。

咱们别离开展剖析这两步。

4.2.1 Broker 发送心跳包

发送心跳包的外围逻辑是在 Broker 启动逻辑里, 代码入口是 org.apache.rocketmq.broker.BrokerController#start,本篇文章重点关注的是发送心跳包的逻辑实现,只列出发送心跳包的外围代码,如下:

1)创立了一个线程池注册 Broker,程序启动 10 秒后执行,每隔 30 秒(默认 30s,工夫距离在 10 秒到 60 秒之间,BrokerConfig.getRegisterNameServerPeriod() 的默认值是 30 秒)执行一次。

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
​
    @Override
    public void run() {
        try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
        } catch (Throwable e) {log.error("registerBrokerAll Exception", e);
        }
    }
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

2)封装 Topic 配置和版本号之后,进行理论的路由注册(注:封装 Topic 配置不是本篇重点,会在介绍 Broker 源码时重点解说)。理论路由注册是在 org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll 中实现,外围代码如下:

public List<RegisterBrokerResult> registerBrokerAll(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final boolean oneway,
    final int timeoutMills,
    final boolean compressed) {
​
    final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
    // 获取 nameserver 地址列表
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
    /**
      * 封装申请包头 start
      * 封装申请包头,次要封装 broker 相干信息
    **/
        final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
        requestHeader.setBrokerAddr(brokerAddr);
        requestHeader.setBrokerId(brokerId);
        requestHeader.setBrokerName(brokerName);
        requestHeader.setClusterName(clusterName);
        requestHeader.setHaServerAddr(haServerAddr);
        requestHeader.setCompressed(compressed);
    // 封装 requestBody,包含 topic 和 filterServerList 相干信息
        RegisterBrokerBody requestBody = new RegisterBrokerBody();
        requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
        requestBody.setFilterServerList(filterServerList);
        final byte[] body = requestBody.encode(compressed);
        final int bodyCrc32 = UtilAll.crc32(body);
        requestHeader.setBodyCrc32(bodyCrc32);
        /**
      * 封装申请包头 end
    **/
        // 开启多线程到每个 nameserver 进行注册
        final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
        for (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 理论进行注册办法
                        RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                        if (result != null) {
                            // 封装 nameserver 返回的信息
                            registerBrokerResultList.add(result);
                        }
​
                        log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                    } catch (Exception e) {log.warn("registerBroker Exception, {}", namesrvAddr, e);
                    } finally {countDownLatch.countDown();
                    }
                }
            });
        }
​
        try {countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {}}
​
    return registerBrokerResultList;
}

从下面代码来看,也比较简单,首先须要封装申请包头和 requestBody,而后开启多线程到每个 NameServer 服务器去注册。

申请包头类型为 RegisterBrokerRequestHeader,次要包含如下字段:

requestBody 类型是 RegisterBrokerBody,次要包含如下字段:

1)理论的路由注册是通过 registerBroker 办法实现,外围代码如下:

private RegisterBrokerResult registerBroker(
    final String namesrvAddr,
    final boolean oneway,
    final int timeoutMills,
    final RegisterBrokerRequestHeader requestHeader,
    final byte[] body) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
    // 创立申请指令,须要留神 RequestCode.REGISTER_BROKER,nameserver 端的网络处理器会依据 requestCode 进行相应的业务解决
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
    request.setBody(body);
  // 基于 netty 进行网络传输
    if (oneway) {
        // 如果是单向调用,没有返回值,不返回 nameserver 返回后果
        try {this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
        } catch (RemotingTooMuchRequestException e) {// Ignore}
        return null;
    }
  // 异步调用向 nameserver 发动注册,获取 nameserver 的返回信息
    RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            // 获取返回的 reponseHeader
            RegisterBrokerResponseHeader responseHeader =
                (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
            // 从新封装返回后果,更新 masterAddr 和 haServerAddr
            RegisterBrokerResult result = new RegisterBrokerResult();
            result.setMasterAddr(responseHeader.getMasterAddr());
            result.setHaServerAddr(responseHeader.getHaServerAddr());
            if (response.getBody() != null) {result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
            }
            return result;
        }
        default:
            break;
    }
​
    throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
}

borker 和 NameServer 之间通过 netty 进行网络传输,Broker 向 NameServer 发动注册时会在申请中增加注册码 RequestCode.REGISTER_BROKER。这是一种网络跟踪办法,RocketMQ 的每个申请都会定义一个 requestCode,服务端的网络处理器会依据不同的 requestCode 进行影响的业务解决。

4.2.2 NameServer 解决心跳包

Broker 收回路由注册的心跳包之后,NameServer 会依据心跳包中的 requestCode 进行解决。NameServer 的默认网络处理器是 DefaultRequestProcessor, 具体代码如下:

public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {if (ctx != null) {log.debug("receive request, {} {} {}",
                  request.getCode(),
                  RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                  request);
    }
    switch (request.getCode()) {
        ......
        //,如果是 RequestCode.REGISTER_BROKER,进行 broker 注册
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {return this.registerBrokerWithFilterServer(ctx, request);
            } else {return this.registerBroker(ctx, request);
            }
        ......
        default:
            break;
    }
    return null;
}

判断 requestCode,如果是 RequestCode.REGISTER\_BROKER,那么确定业务解决逻辑是注册 Broker。依据 Broker 版本号抉择不同的办法,咱们已 V3\_0_11 以上为例,调用 registerBrokerWithFilterServer 办法进行注册次要步骤分为三步:

Step1

解析 requestHeader 并验签(基于 crc32),判断数据是否正确;

Step2

解析 Topic 信息;

Step3

调用 RouteInfoManager#registerBroker 来进行 Broker 注册;

外围注册逻辑是由 RouteInfoManager#registerBroker 来实现,外围代码如下:

public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {
            // 加写锁,避免并发写 RoutInfoManager 中的路由表信息。this.lock.writeLock().lockInterruptibly();
      // 依据 clusterName 从 clusterAddrTable 中获取所有 broker 名字汇合
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            // 如果没有获取到,阐明 broker 所属集群还没记录,那么须要创立,并将 brokerName 退出到集群的 broker 汇合中
            if (null == brokerNames) {brokerNames = new HashSet<String>();
                this.clusterAddrTable.put(clusterName, brokerNames);
            }
            brokerNames.add(brokerName);
      
            boolean registerFirst = false;
      // 依据 brokerName 尝试从 brokerAddrTable 中获取 brokerData
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) {
                // 如果没获取到 brokerData,新建 BrokerData 并放入 brokerAddrTable,registerFirst 设为 true;registerFirst = true;
                brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                this.brokerAddrTable.put(brokerName, brokerData);
            }
            // 更新 brokerData 中的 brokerAddrs
            Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
            // 思考到可能呈现 master 挂了,slave 变成 master 的状况,这时候 brokerId 会变成 0,这时候须要把老的 brokerAddr 给删除
            //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
            //The same IP:PORT must only have one record in brokerAddrTable
            Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
            while (it.hasNext()) {Entry<Long, String> item = it.next();
                if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {it.remove();
                }
            }
      // 更新 brokerAddrs,依据返回的 oldAddr 判断是否是第一次注册的 broker
            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
            registerFirst = registerFirst || (null == oldAddr);
​
            // 如过 Broker 是 Master,并且 Broker 的 Topic 配置信息发生变化或者是首次注册,须要创立或更新 Topic 路由元数据,填充 topicQueueTable
            if (null != topicConfigWrapper
                && MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                    || registerFirst) {
                    ConcurrentMap<String, TopicConfig> tcTable =
                        topicConfigWrapper.getTopicConfigTable();
                    if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                            // 创立或更新 Topic 路由元数据
                            this.createAndUpdateQueueData(brokerName, entry.getValue());
                        }
                    }
                }
            }
      // 更新 BrokerLivelnfo,BrokeLivelnfo 是执行路由删除的重要依据
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                                                                         new BrokerLiveInfo(System.currentTimeMillis(),
                                                                             topicConfigWrapper.getDataVersion(),
                                                                             channel,
                                                                             haServerAddr));
            if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
            }
      // 注册 Broker 的 filterServer 地址列表
            if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);
                } else {this.filterServerTable.put(brokerAddr, filterServerList);
                }
            }
      // 如果此 Broker 为从节点,则须要查找 Broker Master 的节点信息,并更新对应 masterAddr 属性
            if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                    if (brokerLiveInfo != null) {result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                        result.setMasterAddr(masterAddr);
                    }
                }
            }
        } finally {this.lock.writeLock().unlock();}
    } catch (Exception e) {log.error("registerBroker Exception", e);
    }
​
    return result;
}

通过下面的源码剖析,能够合成出一个 Broker 的注册次要分 7 步:

  • Step1:加写锁,避免并发写 RoutInfoManager 中的路由表信息;
  • Step2:判断 Broker 所属集群是否存在,不存在须要创立,并将 Broker 名退出到集群 Broker 汇合中;
  • Step3:保护 BrokerData;
  • Step4:如过 Broker 是 Master,并且 Broker 的 Topic 配置信息发生变化或者是首次注册,须要创立或更新 Topic 路由元数据,填充 TopicQueueTable;
  • Step5:更新 BrokerLivelnfo;
  • Step6:注册 Broker 的 filterServer 地址列表;
  • Step7:如果此 Broker 为从节点,则须要查找 Broker Master 的节点信息,并更新对应 masterAddr 属性,并返回给 Broker 端。

4.3 路由剔除

4.3.1 触发条件

路由剔除的触发条件次要有两个:

NameServer 每隔 10s 扫描 BrokerLiveTable,间断 120s 没收到心跳包,则移除该 Broker 并敞开 socket 连贯;

Broker 失常敞开时触发路由删除。

4.3.2 源码解析

下面形容的触发点最终删除路由的逻辑是一样的,对立在 RouteInfoManager#onChannelDestroy

中实现,外围代码如下:

public void onChannelDestroy(String remoteAddr, Channel channel) {
    String brokerAddrFound = null;
    if (channel != null) {
        try {
            try {
                // 加读锁
                this.lock.readLock().lockInterruptibly();
                // 通过 channel 从 brokerLiveTable 中找出对应的 Broker 地址
                Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
                    this.brokerLiveTable.entrySet().iterator();
                while (itBrokerLiveTable.hasNext()) {Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
                    if (entry.getValue().getChannel() == channel) {brokerAddrFound = entry.getKey();
                        break;
                    }
                }
            } finally {
                // 开释读锁
                this.lock.readLock().unlock();
            }
        } catch (Exception e) {log.error("onChannelDestroy Exception", e);
        }
    }
  // 若该 Broker 曾经从存活的 Broker 地址列表中被革除,则间接应用 remoteAddr
    if (null == brokerAddrFound) {brokerAddrFound = remoteAddr;} else {log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
    }
​
    if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
​
        try {
            try {
                // 申请写锁
                this.lock.writeLock().lockInterruptibly();
                // 依据 brokerAddress,将这个 brokerAddress 从 brokerLiveTable 和 filterServerTable 中移除
                this.brokerLiveTable.remove(brokerAddrFound);
                this.filterServerTable.remove(brokerAddrFound);
                String brokerNameFound = null;
                boolean removeBrokerName = false;
                Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
                    this.brokerAddrTable.entrySet().iterator();
                // 遍历 brokerAddrTable
                while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {BrokerData brokerData = itBrokerAddrTable.next().getValue();
​
                    Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
                    while (it.hasNext()) {Entry<Long, String> entry = it.next();
                        Long brokerId = entry.getKey();
                        String brokerAddr = entry.getValue();
                        // 依据 brokerAddress 找到对应的 brokerData,并将 brokerData 中对应的 brokerAddress 移除
                        if (brokerAddr.equals(brokerAddrFound)) {brokerNameFound = brokerData.getBrokerName();
                            it.remove();
                            log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                                     brokerId, brokerAddr);
                            break;
                        }
                    }
          // 如果移除后,整个 brokerData 的 brokerAddress 空了,那么将整个 brokerData 移除
                    if (brokerData.getBrokerAddrs().isEmpty()) {
                        removeBrokerName = true;
                        itBrokerAddrTable.remove();
                        log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
                                 brokerData.getBrokerName());
                    }
                }
​
                if (brokerNameFound != null && removeBrokerName) {
                    // 遍历 clusterAddrTable
                    Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
                    while (it.hasNext()) {Entry<String, Set<String>> entry = it.next();
                        String clusterName = entry.getKey();
                        Set<String> brokerNames = entry.getValue();
                        // 依据第三步中获取的须要移除的 brokerName,将对应的 brokerName 移除了
                        boolean removed = brokerNames.remove(brokerNameFound);
                        if (removed) {log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                                     brokerNameFound, clusterName);
              // 如果移除后,该汇合为空,那么将整个集群从 clusterAddrTable 中移除
                            if (brokerNames.isEmpty()) {log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                                         clusterName);
                                it.remove();}
​
                            break;
                        }
                    }
                }
​
                if (removeBrokerName) {
                    Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
                        this.topicQueueTable.entrySet().iterator();
                    // 遍历 topicQueueTable
                    while (itTopicQueueTable.hasNext()) {Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
                        String topic = entry.getKey();
                        List<QueueData> queueDataList = entry.getValue();
​
                        Iterator<QueueData> itQueueData = queueDataList.iterator();
                        while (itQueueData.hasNext()) {QueueData queueData = itQueueData.next();
                            // 依据 brokerName,将 topic 下对应的 broker 移除掉
                            if (queueData.getBrokerName().equals(brokerNameFound)) {itQueueData.remove();
                                log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
                                         topic, queueData);
                            }
                        }
            // 如果该 topic 下只有一个待移除的 broker,那么该 topic 也从 table 中移除
                        if (queueDataList.isEmpty()) {itTopicQueueTable.remove();
                            log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
                                     topic);
                        }
                    }
                }
            } finally {
                // 开释写锁
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {log.error("onChannelDestroy Exception", e);
        }
    }
}

路由删除整体逻辑次要分为 6 步:

  • Step1:加 readlock,通过 channel 从 BrokerLiveTable 中找出对应的 Broker 地址,开释 readlock,若该 Broker 曾经从存活的 Broker 地址列表中被革除,则间接应用 remoteAddr。
  • Step2:申请写锁,依据 BrokerAddress 从 BrokerLiveTable、filterServerTable 移除。
  • Step3:遍历 BrokerAddrTable,依据 BrokerAddress 找到对应的 brokerData,并将 brokerData 中对应的 brokerAddress 移除,如果移除后,整个 brokerData 的 brokerAddress 空了,那么将整个 brokerData 移除。
  • Step4:遍历 clusterAddrTable,依据第三步中获取的须要移除的 BrokerName,将对应的 brokerName 移除了。如果移除后,该汇合为空,那么将整个集群从 clusterAddrTable 中移除。
  • Step5:遍历 TopicQueueTable,依据 BrokerName,将 Topic 下对应的 Broker 移除掉,如果该 Topic 下只有一个待移除的 Broker,那么该 Topic 也从 table 中移除。
  • Step6:开释写锁。

从下面能够看出,路由剔除的整体逻辑比较简单,就是单纯地针对路由元信息的数据结构进行操作。为了大家可能更好地了解这块代码,倡议大家对照 4.1 中介绍的路由元信息的数据结构来进行代码走读。

4.4 路由发现

当路由信息发生变化之后,NameServer 不会被动推送给客户端,而是期待客户端定期到 nameserver 被动拉取最新路由信息。这种设计形式升高了 NameServer 实现的复杂性。

4.4.1 producer 被动拉取

producer 在启动后会开启一系列定时工作,其中有一个工作就是定期从 NameServer 获取 Topic 路由信息。代码入口是 MQClientInstance#start-ScheduledTask(),外围代码如下:

private void startScheduledTask() {
    ......
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
​
        @Override
        public void run() {
            try {
                // 从 nameserver 更新最新的 topic 路由信息
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();} catch (Exception e) {log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
​
    ......
}
​
/**
    * 从 nameserver 获取 topic 路由信息
    */
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
                                                      boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
    ......
    // 向 nameserver 发送申请包,requestCode 为 RequestCode.GET_ROUTEINFO_BY_TOPIC
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
  ......
}

producer 和 NameServer 之间通过 netty 进行网络传输,producer 向 NameServer 发动的申请中增加注册码

RequestCode.GET\_ROUTEINFO\_BY_TOPIC。

4.4.2 NameServer 返回路由信息

NameServer 收到 producer 发送的申请后,会依据申请中的 requestCode 进行解决。解决 requestCode 同样是在默认的网络处理器 DefaultRequestProcessor 中进行解决,最终通过 RouteInfoManager#pickupTopicRouteData 来实现。

TopicRouteData 构造

在正式解析源码前,咱们先看下 NameServer 返回给 producer 的数据结构。通过代码能够看到,返回的是一个 TopicRouteData 对象,具体构造如下:

其中 QueueData,BrokerData,filterServerTable 在 4.1 章节介绍路由元信息时有介绍。

源码剖析

在理解了返回给 producer 的 TopicRouteData 构造后,咱们进入 RouteInfoManager#pickupTopicRouteData 办法来看下具体如何实现。

public TopicRouteData pickupTopicRouteData(final String topic) {TopicRouteData topicRouteData = new TopicRouteData();
    boolean foundQueueData = false;
    boolean foundBrokerData = false;
    Set<String> brokerNameSet = new HashSet<String>();
    List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
    topicRouteData.setBrokerDatas(brokerDataList);
​
    HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
    topicRouteData.setFilterServerTable(filterServerMap);
​
    try {
        try {
            // 加读锁
            this.lock.readLock().lockInterruptibly();
            // 从元数据 topicQueueTable 中依据 topic 名字获取队列汇合
            List<QueueData> queueDataList = this.topicQueueTable.get(topic);
            if (queueDataList != null) {
                // 将获取到的队列汇合写入 topicRouteData 的 queueDatas 中
                topicRouteData.setQueueDatas(queueDataList);
                foundQueueData = true;
​
                Iterator<QueueData> it = queueDataList.iterator();
                while (it.hasNext()) {QueueData qd = it.next();
                    brokerNameSet.add(qd.getBrokerName());
                }
        // 遍历从 QueueData 汇合中提取的 brokerName
                for (String brokerName : brokerNameSet) {
                    // 依据 brokerName 从 brokerAddrTable 获取 brokerData
                    BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                    if (null != brokerData) {
                        // 克隆 brokerData 对象,并写入到 topicRouteData 的 brokerDatas 中
                        BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone());
                        brokerDataList.add(brokerDataClone);
                        foundBrokerData = true;
                        // 遍历 brokerAddrs
                        for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                            // 依据 brokerAddr 获取 filterServerList,封装后写入到 topicRouteData 的 filterServerTable 中
                            List<String> filterServerList = this.filterServerTable.get(brokerAddr);
                            filterServerMap.put(brokerAddr, filterServerList);
                        }
                    }
                }
            }
        } finally {
            // 开释读锁
            this.lock.readLock().unlock();
        }
    } catch (Exception e) {log.error("pickupTopicRouteData Exception", e);
    }
​
    log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
​
    if (foundBrokerData && foundQueueData) {return topicRouteData;}
​
    return null;
}

下面代码封装了 TopicRouteData 的 queueDatas、BrokerDatas 和 filterServerTable,还有 orderTopicConf 字段没封装,咱们再看下这个字段是在什么时候封装的,咱们向上看 RouteInfoManager#pickupTopicRouteData 的调用办法 DefaultRequestProcessor#getRouteInfoByTopic 如下:

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
                                           RemotingCommand request) throws RemotingCommandException {
    ......
  // 这块代码就是下面解析的代码,获取到 topicRouteData 对象
    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
​
    if (topicRouteData != null) {
        // 判断 nameserver 的 orderMessageEnable 配置是否关上
        if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
            // 如果配置关上了,依据 namespace 和 topic 名字获取 kvConfig 配置文件中程序音讯配置内容
            String orderTopicConf =
                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                                                                        requestHeader.getTopic());
            // 封装 orderTopicConf
            topicRouteData.setOrderTopicConf(orderTopicConf);
        }
​
        byte[] content = topicRouteData.encode();
        response.setBody(content);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }
  // 如果没有获取到 topic 路由,那么 reponseCode 为 TOPIC_NOT_EXIST
    response.setCode(ResponseCode.TOPIC_NOT_EXIST);
    response.setRemark("No topic route info in name server for the topic:" + requestHeader.getTopic()
                       + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
    return response;
}

联合这两个办法,咱们能够总结出查找 Topic 路由次要分为 3 个步骤:

调用 RouteInfoManager#pickupTopicRouteData,从 topicQueueTable,brokerAddrTabl,filterServerTable 中获取信息,别离填充 queue-Datas、BrokerDatas、filterServerTable。

如果 topic 为程序音讯,那么从 KVconfig 中获取对于程序音讯先关的配置填充到 orderTopicConf 中。

如果找不到路由信息,那么返回 code 为 ResponseCode.TOPIC\_NOT\_EXIST。

五、小结

本篇文章次要是从源码的角度给大家介绍了 RocketMQ 的 NameServer,包含 NameServer 的启动流程、路由注册、路由剔除和路由发现。咱们在理解了 NameServer 的设计原理之后,也能够回过头思考下在设计过程中一些值得咱们学习的小技巧,在此我抛砖引玉提出两点:

  • 启动流程注册 JVM 钩子用于优雅停机。这是一个编程技巧,咱们在理论开发过程中,如果有应用线程池或者一些常驻线程工作时,能够思考通过注册 JVM 钩子的形式,在 JVM 敞开前开释资源或者实现一些事件来保障优雅停机。
  • 更新路由表时须要通过加锁避免并发操作,这里应用的是锁粒度较少的读写锁,容许多个音讯发送者并发读,保障音讯发送时的高并发,但同一时刻 NameServer 只解决一个 Broker 心跳包,多个心跳包申请串行执行,这也是读写锁经典应用场景。

六、参考资料

1、《RocketMQ 技术底细》

2、《RocketMQ 外围原理和实际》

3、Apache RocketMQ 开发者指南

作者:vivo 互联网服务器团队 -Ye Wenhao

退出移动版