关于java:RocketMQ-NameServer-详解-源码剖析

4次阅读

共计 25712 个字符,预计需要花费 65 分钟才能阅读完成。

[TOC]

1. 概述

1.1 NameServer 是什么

NameServer 是组成 RocketMQ 的重要组件之一,是除了 Broker 之外另一个须要部署的服务。构想这样一个问题:RocketMQ 的 Topic 散布在不同的 Broker 上,作为音讯的生产者和消费者,如何晓得要从哪个 Broker 地址生产或生产音讯?如果连贯的 Broker 宕机了,如何在不重启的状况下感知?NameServer 就是为了解决这些问题设计的。

NameServer 是一个简略的 Topic 路由注册核心,相似 Kafka、Dubbo 中的 Zookeeper,反对 Broker 的动静注册与发现。次要蕴含两个性能

  1. Broker 治理:NameServer 承受 Broker 集群的注册信息并且保留下来作为路由信息的根本数据。而后提供心跳检测机制,查看 Broker 是否还存活。
  2. 路由信息管理:每个 NameServer 将保留对于 Broker 集群的整个路由信息和用于客户端查问的队列信息。而后 Producer 和 Conumser 通过 NameServer 就能够晓得整个 Broker 集群的路由信息,从而进行音讯的投递和生产。

NameServer 通常以集群的形式部署,各实例间互相不进行信息通信,只是互为备份,达到高可用的成果。RocketMQ 典型的双主双从部署形式如下图所示:

Broker 定期向 NameServer 发送心跳,上报路由信息。客户端(生产者、消费者)定期申请 NameServer 获取最新的路由信息。

1.2 NameServer 与 Zookeeper

Kafka 在老版本中应用 Zookeeper 作为路由核心,在 3.0 之后的版本也将 Zookeeper 的依赖移除。在晚期版本的 RocketMQ 中据说也是应用 Zookeeper 作为路由核心,为什么支流音讯队列都摈弃了 Zookeeper 抉择自研路由核心呢?

次要起因是 Zookeeper 运行机制简单、对于 RocketMQ 来说依赖太重,保护和定位问题较艰难;而 NameServer 的实现十分轻量级,且具备很高的牢靠水平,用于路由发现的场景十分适合。此外还有以下一些起因:

  1. 依据 CAP 实践,同时最多只能满足两个点,而 zookeeper 满足的是 CP,也就是说 zookeeper 并不能保障服务的可用性,zookeeper 在进行选举的时候,整个选举的工夫太长,期间整个集群都处于不可用的状态,而这对于一个注册核心来说必定是不能承受的,作为服务发现来说就应该是为可用性而设计。
  2. 基于性能的思考,NameServer 自身的实现十分轻量,而且能够通过减少机器的形式程度扩大,减少集群的抗压能力,而 zookeeper 的写是不可扩大的,而 zookeeper 要解决这个问题只能通过划分畛域,划分多个 zookeeper 集群来解决,首先操作起来太简单,其次这样还是又违反了 CAP 中的 A 的设计,导致服务之间是不连通的。
  3. 长久化的机制来带的问题,ZooKeeper 的 ZAB 协定对每一个写申请,会在每个 ZooKeeper 节点上放弃写一个事务日志,同时再加上定期的将内存数据镜像(Snapshot)到磁盘来保证数据的一致性和持久性,而对于一个简略的服务发现的场景来说,这其实没有太大的必要,这个实现计划太重了。而且自身存储的数据应该是高度定制化的。
  4. 音讯发送应该弱依赖注册核心,而 RocketMQ 的设计理念也正是基于此,生产者在第一次发送音讯的时候从 NameServer 获取到 Broker 地址后缓存到本地,如果 NameServer 整个集群不可用,短时间内对于生产者和消费者并不会产生太大影响。

2. 概要设计

NameServer 仅仅解决其余模块的申请,而不会被动向其余模块发动申请。正如其名字 Server,它其实实质上就是一个 NettyServer。

2.1 模块

NameServer 的代码并不多,如下所示

它次要有 3 个模块:Topic 路由治理模块(RouteInfoManager)、通信模块(DefaultRequestProcessorClusterTestRequestProcessor)、KV 数据存储模块(KVConfigManager)。

RouteInfoManager 中存储 5 个 HashMap,这就是 NameServer 中次要存储的数据。它们仅存在于内存中,并不会长久化。其中数据内容如下:

  • topicQueueTable:保留 Topic 的队列信息,也是真正的路由信息。队列信息中蕴含了其所在的 Broker 名称和读写队列数量。
  • brokerAddrTable:保留 Broker 信息,蕴含其名称、集群名称、主备 Broker 地址。
  • clusterAddrTable:保留 Cluster 信息,蕴含每个集群中所有的 Broker 名称列表。
  • brokerLiveTable:Broker 状态信息,蕴含以后所有存活的 Broker,和它们最初一次上报心跳的工夫。
  • filterServerTable:Broker 上的 FilterServer 列表,用于类模式音讯过滤,该机制在 4.4 版本后被废除。

RequestProcessor 继承了 AsyncNettyRequestProcessor。作为 NameServer 的申请处理器,依据不同品种的申请做不同类型的解决。
其中 KV_CONFIG 类型的申请用于 KVConfig 模块,以后不会用到。其余申请类型由 Broker 和 Producer、Consumer 发动。

KVConfigManager 外部保留了一个二级 HashMapconfigTable,并且会将该对象进行长久化。

2.2 交互

上图为 NameServer 与其余组件交互的示意图。能够看到 Producer、Consumer、Broker 均每 30s 向 NameServer 发动一次申请,NameServer 中也有定时器,定期扫描和更新外部数据。

  • Broker

    • 每隔 30s 向 NameServer 集群的每台机器都发送心跳包,蕴含本身 Topic 队列的路由信息。
    • 当有 Topic 改变(创立 / 更新),Broker 会立刻发送 Topic 增量信息到 NameServer,同时触发 NameServer 的数据版本号产生变更(+1)。
  • NameServer

    • 将路由信息保留在内存中。它只被其余模块调用(被 Broker 上传,被客户端拉取),不会被动调用其余模块。
    • 启动一个定时工作线程,每隔 10s 扫描 brokerAddrTable 中所有的 Broker 上次发送心跳工夫,如果超过 120s 没有收到心跳,则从存活 Broker 表中移除该 Broker。
  • Client

    • 生产者第一次发送音讯时,向 NameServer 拉取该 Topic 的路由信息。
    • 消费者启动过程中会向 NameServer 申请 Topic 路由信息。
    • 每隔 30s 向 NameServer 发送申请,获取它们要生产 / 生产的 Topic 的路由信息。

3. 具体设计

3.1 NameServer 启动

上图为 NameServer 启动流程的示意图。

  1. 由启动脚本调用 NamesrvStartup#main 函数触发启动流程
  2. NamesrvStartup#createNamesrvController 函数中先解析命令行参数,而后初始化 NameServer 和 Netty remote server 配置,最初创立 NamesrvController 的实例。
  3. NamesrvStartup#start 初始化 NamesrvController;调用 NamesrvController#start() 办法,启动 Netty remoting server;最初注册敞开钩子函数,在 JVM 线程敞开之前,敞开 Netty remoting server 和解决线程池,敞开定时工作线程。
  4. NamesrvController 实例是 NameServer 的外围控制器,它的初始化办法 initialize() 先加载 KVConfig manager,而后初始化 Netty remoting server。最初增加 2 个定时工作:一个每 10s 打印一次 KV 配置,一个每 10s 扫描 Broker 列表,移除掉线的 Broker。

3.2 路由信息

3.2.1 NameServer 端保留的路由信息

NameServer 中的路由信息次要指的是后面说到的 RouteInfoManager 中的 5 个 HashMap。它们只会保留在内存中,不会被长久化。上面看一下它们的具体构造。

// Topic 中 Queue 的路由表,音讯发送时依据路由表进行 Topic 内的负载平衡
HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// Broker 根底信息表,蕴含 brokerName、所属集群名称、主备 Broker 地址
HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// Broker 集群信息,存储集群中所有 Broker 的名称
HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// Broker 状态信息,NameServer 每次收到心跳包时会替换该信息
HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// Broker 上的 FilterServer 列表,用于类模式的音讯过滤。(在 4.4 之后的版本被废除)HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

3.2.2 客户端保留的路由信息

客户端中的路由信息保留在 MQClientInstance 中,也仅保留在内存,不会长久化。

MQClientInstance 是用来与 NameServer、Broker 交互的客户端实例,同时缓存了路由信息。

/**
 * Topic 路由信息
 * 从 NameServer 更新
 */
ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();

其中蕴含该 Topic 的队列列表、Broker 信息列表等数据。

/**
 * Topic 路由信息,NameServer 返回给客户端
 */
public class TopicRouteData extends RemotingSerializable {
    // 程序音讯的配置,来自 KvConfig
    private String orderTopicConf;
    // Topic 队列元数据
    private List<QueueData> queueDatas;
    // Topic 散布的 Broker 元数据
    private List<BrokerData> brokerDatas;
    // Topic 上 FilterServer 的地址列表
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    // ...
}

3.3 路由注册

路由注册蕴含两个方面:Broker 上报路由信息,和 NameServer 解决 Broker 的申请,将 Broker 上报的路由信息存起来。

3.3.1 Broker 上报心跳和路由信息

Broker 发送心跳包的定时工作在 BrokerController#start() 办法中启动,每隔 30s 调用 registerBrokerAll 办法发送一次心跳包(REGISTER_BROKER 申请),并将本身的 Topic 队列路由信息发送给 NameServer。主节点和从节点都会发送心跳和路由信息。
Broker 会遍历 NameServer 列表,向每个 NameServer 发送心跳包。

另外一个触发 Broker 上报 Topic 配置的操作是批改 Broker 的 Topic 配置(创立 / 更新),由 TopicConfigManager 触发上报。


心跳包的申请头中蕴含

  • Broker 地址
  • BrokerId,0 示意主节点,大于 0 示意从节点
  • Broker 名称
  • 集群名称
  • 主节点地址

申请体中蕴含

  • topicConfigTable:蕴含了每个 Topic 的所有队列信息。
  • dataVersion:Broker 中 Topic 配置的版本号,每当配置更新一次,版本号 +1

上报的心跳包申请类型是:RequestCode.REGISTER_BROKER

3.3.2 NameServer 保留上报的路由信息

NameServer 的 DefaultRequestProcessor 接管到 REGISTER_BROKER 类型的申请后,将上报的路由信息调用 RouteInfoManager#registerBroker() 写入内存中的路由表。

写入过程首先会获取写锁,而后顺次写入 RouteInfoManager 中的几个路由信息表。

  1. RouteInfoManager 加写锁
  2. 更新 clusterAddrTable,更新集群信息
  3. 更新 brokerAddrTable,更新 Broker 信息
  4. 更新 topicQueueTable,更新 Topic 队列信息
  5. 更新 brokerLiveTable,更新 Broker 存活状态
  6. 更新 filterServerTable,注册 Broker 的过滤器 Server 地址列表
  7. 开释写锁

3.4 路由删除

如果 Broker 宕机,则无奈向 NameServer 发送心跳包。NameServer 中有一个定时工作线程,每隔 10s 查看 Broker 存活状态,如果 Broker 曾经 120s 没有上报心跳,则敞开与 Broker 的连贯,同时更新路由信息表,将该 Broker 相干信息移除。

每次扫描,都会遍历 brokerLiveTable,取每个 Broker 的 lastUpdateTimestamp 与以后工夫比照,如果相差大于 120s,则执行路由删除逻辑 RouteInfoManager#onChannelDestroy()
另一个触发路由删除逻辑的是 Broker 失常敞开,会调用 unregisterBroker 办法,删除 NameServer 上的 Broker 信息。

路由删除逻辑如下

  1. RouteInfoManager 加写锁
  2. brokerAddrTable 找到对应的 Broker,移除
  3. clusterAddrTable 找到对应 Broker,移除
  4. 依据 BrokerName,从 topicQueueTable 中移除该 Broker 的队列
  5. 开释写锁

3.5 路由发现(客户端拉取路由信息)

NameServer 不会被动将路由信息推送给客户端,客户端须要本人定时从 NameServer 拉取路由信息。客户端中会启动一个定时工作,每 30s 向 NameServer 发送申请获取最新的路由信息。

3.5.1 客户端申请路由信息

客户端中注册定时工作的办法是 MQClientInstance#startScheduledTask(),每隔 30s 调用 updateTopicRouteInfoFromNameServer() 办法,更新路由信息。

客户端只会获取它生产或者生产的 Topic 路由信息,更新之后保留到 MQClientInstance.topicRouteTable 中,它也仅保留在内存中。

3.5.2 NameServer 返回路由信息

NameServer 收到客户端获取路由信息申请后,调用 DefaultRequestProcessor#getRouteInfoByTopic() 办法,返回 Topic 路由信息。该办法逻辑如下

  1. 调用 RouteInfoManager#pickupTopicRouteData() 办法,从路由表 topicQueueTablebrokerAddrTablefilterServerTable 中获取信息,填充 TopicRouteData 对象。
  2. 如果该主题为程序音讯,从 KVConfig 中获取程序音讯相干的配置,填充进 TopicRouteData 对象。
  3. TopicRouteData 对象编码,并返回给客户端。

4. 源码分析

4.1 NameServer 启动

4.1.1 NemesrvStartup

NamesrvStartup 类是 NameServer 的启动类,它会调用 NamesrvController 类的初始化和启动办法,执行 NameServer 具体模块的初始化和启动。

NamesrvStartup#createNamesrvController 函数中先解析命令行参数,而后初始化 NameServer 和 Netty remote server 配置,最初启动 NamesrvController 的初始化

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
    //PackageConflictDetect.detectFastjson();

    // 解析命令行参数
    Options options = ServerUtil.buildCommandlineOptions(new Options());
    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
    if (null == commandLine) {System.exit(-1);
        return null;
    }

    // 初始化 Name server 配置参数
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    // 初始化 Name server 网络配置(Netty 服务端配置)final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    nettyServerConfig.setListenPort(9876);
    // 应用 -c 指定配置文件门路
    if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');
        if (file != null) {InputStream in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            properties.load(in);
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);

            namesrvConfig.setConfigStorePath(file);

            System.out.printf("load config properties file OK, %s%n", file);
            in.close();}
    }

    // 应用 -p 打印以后加载配置属性
    if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
        MixAll.printObjectProperties(console, namesrvConfig);
        MixAll.printObjectProperties(console, nettyServerConfig);
        System.exit(0);
    }

    // 加载命令行中指定的属性,形如 --listenPort 9876
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

    if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
        System.exit(-2);
    }

    // 初始化 Logback
    LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
    JoranConfigurator configurator = new JoranConfigurator();
    configurator.setContext(lc);
    lc.reset();
    configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

    log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

    // 打印 Name server 配置参数
    MixAll.printObjectProperties(log, namesrvConfig);
    MixAll.printObjectProperties(log, nettyServerConfig);

    // 初始化 Name server 控制器
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

    // remember all configs to prevent discard
    controller.getConfiguration().registerConfig(properties);

    return controller;
}

NamesrvStartup#start 初始化 NamesrvController;调用 NamesrvController#start() 办法,启动 Netty remoting server;最初注册敞开钩子函数,在 JVM 线程敞开之前,敞开 Netty remoting server 和解决线程池,敞开定时工作线程。

public static NamesrvController start(final NamesrvController controller) throws Exception {if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");
    }

    // 初始化 NamesrvController:加载 KVConfig,初始化 Netty remoting server,增加定时工作
    boolean initResult = controller.initialize();
    if (!initResult) {controller.shutdown();
        System.exit(-3);
    }

    // 注册 JVM 钩子函数,在 JVM 齐全敞开之前,执行该办法,敞开 Name server
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
        @Override
        public Void call() throws Exception {controller.shutdown();
            return null;
        }
    }));

    // 启动 NamesrvController,次要是启动 Netty remoting server
    controller.start();

    return controller;
}

4.1.2 NamesrvController 启动

初始化办法 initialize() 先加载 KVConfig manager,而后初始化 Netty remoting server。最初增加 2 个定时工作:一个每 10s 打印一次 KV 配置,一个每 10s 扫描 Broker 列表,移除掉线的 Broker。

public boolean initialize() {
    // 加载 KV 配置
    this.kvConfigManager.load();

    // 初始化通信层
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

    // 初始化线程池
    this.remotingExecutor =
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

    this.registerProcessor();

    // 减少定时工作,每 10s 扫描一次 Broker,移除未激活状态的 Broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);

    // 减少定时工作,每 10min 打印一次 KV 配置
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {NamesrvController.this.kvConfigManager.printAllPeriodically();
        }
    }, 1, 10, TimeUnit.MINUTES);

    if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
        // Register a listener to reload SslContext
        try {
            fileWatchService = new FileWatchService(new String[] {
                    TlsSystemConfig.tlsServerCertPath,
                    TlsSystemConfig.tlsServerKeyPath,
                    TlsSystemConfig.tlsServerTrustCertPath
                },
                new FileWatchService.Listener() {
                    boolean certChanged, keyChanged = false;
                    @Override
                    public void onChanged(String path) {if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {log.info("The trust certificate changed, reload the ssl context");
                            reloadServerSslContext();}
                        if (path.equals(TlsSystemConfig.tlsServerCertPath)) {certChanged = true;}
                        if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {keyChanged = true;}
                        if (certChanged && keyChanged) {log.info("The certificate and private key changed, reload the ssl context");
                            certChanged = keyChanged = false;
                            reloadServerSslContext();}
                    }
                    private void reloadServerSslContext() {((NettyRemotingServer) remotingServer).loadSslContext();}
                });
        } catch (Exception e) {log.warn("FileWatchService created error, can't load the certificate dynamically");
        }
    }

    return true;
}

start()shutdown() 办法,别离是启动和敞开 Netty remoting server、fileWatchService

其中 fileWatchService 是用来监听文件变动执行回调函数的,这里的作用是:当文件变动时,从新加载 SslContext。

public void start() throws Exception {this.remotingServer.start();

    if (this.fileWatchService != null) {this.fileWatchService.start();
    }
}

public void shutdown() {this.remotingServer.shutdown();
    this.remotingExecutor.shutdown();
    this.scheduledExecutorService.shutdown();

    if (this.fileWatchService != null) {this.fileWatchService.shutdown();
    }
}

4.2 路由信息

4.2.1 NameServer 路由信息

上面以单个 Broker 上报的路由信息为例展现 NameServer 中路由信息的构造。

topicQueueTable
HashMap<String/* topic */, Map<String /* brokerName */ , QueueData>> topicQueueTable;
  • QueueData

    • brokerName:所属 Broker 名
    • readQueueNums:读队列数量
    • writeQueueNums:写队列数量
    • perm:读写权限
    • topicSysFlag:Topic 同步标记

以后没有注册自定义 Topic,只注册了默认 Topic

{
    "RMQ_SYS_TRANS_HALF_TOPIC":{
        "broker-local":{
            "brokerName":"broker-local",
            "perm":6,
            "readQueueNums":1,
            "topicSysFlag":0,
            "writeQueueNums":1
        }
    },
    "SCHEDULE_TOPIC_XXXX":{
        "broker-local":{
            "brokerName":"broker-local",
            "perm":6,
            "readQueueNums":18,
            "topicSysFlag":0,
            "writeQueueNums":18
        }
    },
    "SELF_TEST_TOPIC":{
        "broker-local":{
            "brokerName":"broker-local",
            "perm":6,
            "readQueueNums":1,
            "topicSysFlag":0,
            "writeQueueNums":1
        }
    },
    "broker-local":{
        "broker-local":{
            "brokerName":"broker-local",
            "perm":7,
            "readQueueNums":1,
            "topicSysFlag":0,
            "writeQueueNums":1
        }
    },
    "TBW102":{
        "broker-local":{
            "brokerName":"broker-local",
            "perm":7,
            "readQueueNums":8,
            "topicSysFlag":0,
            "writeQueueNums":8
        }
    },
    "BenchmarkTest":{
        "broker-local":{
            "brokerName":"broker-local",
            "perm":6,
            "readQueueNums":1024,
            "topicSysFlag":0,
            "writeQueueNums":1024
        }
    },
    "DefaultCluster":{
        "broker-local":{
            "brokerName":"broker-local",
            "perm":7,
            "readQueueNums":16,
            "topicSysFlag":0,
            "writeQueueNums":16
        }
    },
    "DefaultCluster_REPLY_TOPIC":{
        "broker-local":{
            "brokerName":"broker-local",
            "perm":6,
            "readQueueNums":1,
            "topicSysFlag":0,
            "writeQueueNums":1
        }
    },
    "OFFSET_MOVED_EVENT":{
        "broker-local":{
            "brokerName":"broker-local",
            "perm":6,
            "readQueueNums":1,
            "topicSysFlag":0,
            "writeQueueNums":1
        }
    }
}
brokerAddrTable
HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
  • brokerAddrs

    • key:brokerId,0 示意 MASTER,大于 0 示意 SLAVE
    • value:broker 地址
{
    "broker-local":{
        "brokerAddrs":{"0":"127.0.0.1:10911"},
        "brokerName":"broker-local",
        "cluster":"DefaultCluster"
    }
}
clusterAddrTable
HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
{
    "DefaultCluster":["broker-local"]
}
brokerLiveTable
HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
  • BrokerLiveInfo:Broker 状态信息,由 Broker 心跳上报

    • lastUpdateTimestamp:上次更新工夫戳
    • dataVersion:元数据被更新的次数,在 Broker 中统计,每次更新 +1
    • channel:Netty Channel
    • haServerAddr:HA 服务器地址
{
    "127.0.0.1:10911":{
        "channel":{
            "active":true,
            "inputShutdown":false,
            "open":true,
            "outputShutdown":false,
            "registered":true,
            "shutdown":false,
            "writable":true
        },
        "dataVersion":{
            "counter":1,
            "timestamp":1651564857610
        },
        "haServerAddr":"10.0.0.2:10912",
        "lastUpdateTimestamp":1651564899813
    }
}

4.2.2 客户端路由信息

ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
public class TopicRouteData extends RemotingSerializable {
    // 程序音讯的配置,来自 KvConfig
    private String orderTopicConf;
    // Topic 队列元数据
    private List<QueueData> queueDatas;
    // Topic 散布的 Broker 元数据
    private List<BrokerData> brokerDatas;
    // Topic 上 FilterServer 的地址列表
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    // ...
}
{
    "%RETRY%benchmark_consumer":{
        "brokerDatas":[
            {
                "brokerAddrs":{"0":"127.0.0.1:10911"},
                "brokerName":"broker-local",
                "cluster":"DefaultCluster"
            }
        ],
        "filterServerTable":{ },
        "queueDatas":[
            {
                "brokerName":"broker-local",
                "perm":6,
                "readQueueNums":1,
                "topicSysFlag":0,
                "writeQueueNums":1
            }
        ]
    },
    "TBW102":{
        "brokerDatas":[
            {
                "brokerAddrs":{"0":"127.0.0.1:10911"},
                "brokerName":"broker-local",
                "cluster":"DefaultCluster"
            }
        ],
        "filterServerTable":{ },
        "queueDatas":[
            {
                "brokerName":"broker-local",
                "perm":7,
                "readQueueNums":8,
                "topicSysFlag":0,
                "writeQueueNums":8
            }
        ]
    }
}

4.3 路由注册

4.3.1 Broker 上报心跳和路由信息

BrokerController 最终会调用 BrokerOuterAPI#registerBrokerAll 上报心跳和路由信息。

// BrokerOuterAPI.java
/**
 * 向所有 Name server 发送心跳包
 * @return 心跳包发送的响应列表
 */
public List<RegisterBrokerResult> registerBrokerAll(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final boolean oneway,
    final int timeoutMills,
    final boolean compressed) {final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    if (nameServerAddressList != null && nameServerAddressList.size() > 0) {

        // 为所有心跳申请结构对立的申请头
        final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
        requestHeader.setBrokerAddr(brokerAddr);
        requestHeader.setBrokerId(brokerId);
        requestHeader.setBrokerName(brokerName);
        requestHeader.setClusterName(clusterName);
        // 主节点地址,首次申请时为空,从节点向 Name server 注册后更新
        requestHeader.setHaServerAddr(haServerAddr);
        requestHeader.setCompressed(compressed);

        // 结构对立的申请体
        RegisterBrokerBody requestBody = new RegisterBrokerBody();
        // Topic 配置,存储 Broker 启动时的一些默认 Topic
        requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
        // 音讯过滤服务器列表
        requestBody.setFilterServerList(filterServerList);
        final byte[] body = requestBody.encode(compressed);
        final int bodyCrc32 = UtilAll.crc32(body);
        requestHeader.setBodyCrc32(bodyCrc32);
        final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
        // 遍历所有 Name server 地址,发送心跳申请
        for (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
                        if (result != null) {registerBrokerResultList.add(result);
                        }

                        log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                    } catch (Exception e) {log.warn("registerBroker Exception, {}", namesrvAddr, e);
                    } finally {countDownLatch.countDown();
                    }
                }
            });
        }

        try {countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {}}

    return registerBrokerResultList;
}

4.3.2 NameServer 保留上报的路由信息

RouteInfoManager#registerBroker 将 Broker 上报的路由信息保留到 NameServer 上。

  1. RouteInfoManager 加写锁
  2. 更新 clusterAddrTable,更新集群信息
  3. 更新 brokerAddrTable,更新 Broker 信息
  4. 更新 topicQueueTable,更新 Topic 队列信息
  5. 更新 brokerLiveTable,更新 Broker 存活状态
  6. 更新 filterServerTable,注册 Broker 的过滤器 Server 地址列表
  7. 开释写锁
/**
 * 解决 Broker 心跳信息,存到本地路由表
 * 如果是 SLAVE,则返回 MASTER 的 HA 地址
 */
public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {
            // 路由注册须要加写锁,避免并发批改 RouteInfoManager 中的路由表
            this.lock.writeLock().lockInterruptibly();

            // 更新集群信息表。判断 Broker 所属集群是否存在,不存在则创立集群,而后将 Broker 名退出集群信息表
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            if (null == brokerNames) {brokerNames = new HashSet<String>();
                this.clusterAddrTable.put(clusterName, brokerNames);
            }
            brokerNames.add(brokerName);

            boolean registerFirst = false;

            // 更新 Broker 地址表,更新主备信息
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) {
                // 该 Broker 首次注册
                registerFirst = true;
                brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                this.brokerAddrTable.put(brokerName, brokerData);
            }
            Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
            //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
            //The same IP:PORT must only have one record in brokerAddrTable
            Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
            while (it.hasNext()) {Entry<Long, String> item = it.next();
                if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {it.remove();
                }
            }

            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
            registerFirst = registerFirst || (null == oldAddr);

            // 更新 Topic 信息表。只有主节点 Topic 配置信息发生变化或第一次注册才会更新
            if (null != topicConfigWrapper
                && MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                    || registerFirst) {
                    ConcurrentMap<String, TopicConfig> tcTable =
                        topicConfigWrapper.getTopicConfigTable();
                    if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {this.createAndUpdateQueueData(brokerName, entry.getValue());
                        }
                    }
                }
            }

            // 更新 Broker 存活状态信息,蕴含最初更新工夫
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                new BrokerLiveInfo(System.currentTimeMillis(),
                    topicConfigWrapper.getDataVersion(),
                    channel,
                    haServerAddr));
            if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
            }

            // 更新 Broker 的 FilterServer 列表,一个 Broker 可能有多个 Filter Server
            if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);
                } else {this.filterServerTable.put(brokerAddr, filterServerList);
                }
            }

            if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                    if (brokerLiveInfo != null) {result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                        result.setMasterAddr(masterAddr);
                    }
                }
            }
        } finally {this.lock.writeLock().unlock();}
    } catch (Exception e) {log.error("registerBroker Exception", e);
    }

    return result;
}

4.4. 路由删除

路由删除逻辑如下

  1. RouteInfoManager 加写锁
  2. brokerAddrTable 找到对应的 Broker,移除
  3. clusterAddrTable 找到对应 Broker,移除
  4. 依据 BrokerName,从 topicQueueTable 中移除该 Broker 的队列
  5. 开释写锁
/**
 * Channel 被敞开,或者 Channel Idle 工夫超限
 * 敞开与 Broker 的连贯,删除它的路由信息
 */
public void onChannelDestroy(String remoteAddr, Channel channel) {
    String brokerAddrFound = null;
    if (channel != null) {
        try {
            try {this.lock.readLock().lockInterruptibly();
                Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =
                    this.brokerLiveTable.entrySet().iterator();
                while (itBrokerLiveTable.hasNext()) {Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();
                    if (entry.getValue().getChannel() == channel) {brokerAddrFound = entry.getKey();
                        break;
                    }
                }
            } finally {this.lock.readLock().unlock();}
        } catch (Exception e) {log.error("onChannelDestroy Exception", e);
        }
    }

    if (null == brokerAddrFound) {brokerAddrFound = remoteAddr;} else {log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
    }

    if (brokerAddrFound != null && brokerAddrFound.length() > 0) {

        try {
            try {
                // 加写锁,删除该 Broker 的路由信息
                this.lock.writeLock().lockInterruptibly();
                this.brokerLiveTable.remove(brokerAddrFound);
                this.filterServerTable.remove(brokerAddrFound);

                // 移除 Broker 根底信息表中的该 Broker 信息
                String brokerNameFound = null;
                boolean removeBrokerName = false;
                Iterator<Entry<String, BrokerData>> itBrokerAddrTable =
                    this.brokerAddrTable.entrySet().iterator();
                while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {BrokerData brokerData = itBrokerAddrTable.next().getValue();

                    Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();
                    while (it.hasNext()) {Entry<Long, String> entry = it.next();
                        Long brokerId = entry.getKey();
                        String brokerAddr = entry.getValue();
                        if (brokerAddr.equals(brokerAddrFound)) {brokerNameFound = brokerData.getBrokerName();
                            it.remove();
                            log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",
                                brokerId, brokerAddr);
                            break;
                        }
                    }

                    if (brokerData.getBrokerAddrs().isEmpty()) {
                        removeBrokerName = true;
                        itBrokerAddrTable.remove();
                        log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",
                            brokerData.getBrokerName());
                    }
                }

                // 从集群信息表中移除该 Broker
                if (brokerNameFound != null && removeBrokerName) {Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();
                    while (it.hasNext()) {Entry<String, Set<String>> entry = it.next();
                        String clusterName = entry.getKey();
                        Set<String> brokerNames = entry.getValue();
                        boolean removed = brokerNames.remove(brokerNameFound);
                        if (removed) {log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",
                                brokerNameFound, clusterName);

                            if (brokerNames.isEmpty()) {log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",
                                    clusterName);
                                it.remove();}

                            break;
                        }
                    }
                }

                // 移除 TopicQueue 表中该 Broker 的队列
                if (removeBrokerName) {
                    Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =
                        this.topicQueueTable.entrySet().iterator();
                    while (itTopicQueueTable.hasNext()) {Entry<String, List<QueueData>> entry = itTopicQueueTable.next();
                        String topic = entry.getKey();
                        List<QueueData> queueDataList = entry.getValue();

                        Iterator<QueueData> itQueueData = queueDataList.iterator();
                        while (itQueueData.hasNext()) {QueueData queueData = itQueueData.next();
                            if (queueData.getBrokerName().equals(brokerNameFound)) {itQueueData.remove();
                                log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",
                                    topic, queueData);
                            }
                        }

                        if (queueDataList.isEmpty()) {itTopicQueueTable.remove();
                            log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",
                                topic);
                        }
                    }
                }
            } finally {this.lock.writeLock().unlock();}
        } catch (Exception e) {log.error("onChannelDestroy Exception", e);
        }
    }
}

4.5 路由发现

NameServer 收到客户端获取路由信息申请后,调用 DefaultRequestProcessor#getRouteInfoByTopic() 办法,返回 Topic 路由信息。该办法逻辑如下

  1. 调用 RouteInfoManager#pickupTopicRouteData() 办法,从路由表 topicQueueTablebrokerAddrTablefilterServerTable 中获取信息,填充 TopicRouteData 对象。
  2. 如果该主题为程序音讯,从 KVConfig 中获取程序音讯相干的配置,填充进 TopicRouteData 对象。
  3. TopicRouteData 对象编码,并返回给客户端。
/**
 * 解决客户端拉取路由信息申请,返回蕴含 TopicRouteData 的返回体
 */
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final GetRouteInfoRequestHeader requestHeader =
        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);

    // 依据申请的主题获取该主题的路由信息
    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

    // 如果该主题为程序音讯,则从 NameServer KvConfig 中获取程序音讯相干配置
    if (topicRouteData != null) {if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
            String orderTopicConf =
                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                    requestHeader.getTopic());
            topicRouteData.setOrderTopicConf(orderTopicConf);
        }

        byte[] content = topicRouteData.encode();
        response.setBody(content);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

    response.setCode(ResponseCode.TOPIC_NOT_EXIST);
    response.setRemark("No topic route info in name server for the topic:" + requestHeader.getTopic()
        + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
    return response;
}

参考资料

  • 官网文档——架构设计
  • 深刻分析 RocketMQ 源码 -NameServer
  • Namesrv nearby route
  • 《RocketMQ 技术底细 第 2 版》

欢送关注公众号【消息中间件】,更新消息中间件的源码解析和最新动静!

本文由博客一文多发平台 OpenWrite 公布!

正文完
 0