乐趣区

关于rocketmq:RocketMqRocketMqNameServ-源码分析Ver494

引言

RocketMq3.X 的版本和 Kafka 一样是基于 Zookeeper 进行路由治理的,然而这意味着运维须要多部署一套 Zookeeper 集群,起初 RocketMq 抉择去 ZK 最终呈现了 NameServ。NameServ 作为 RocketMq 源码浏览的切入点十分不错,本文将会介绍 Ver 4.9.4 版本的 NameServ 源码剖析。

NameServer 次要有两个性能,Broker 治理 路由信息管理

整个 NameServ 理论代码只有几百行,因为自身呈现基本目标就是代替 ZK,所以角色相似 ZK。在上面的截图中,NamesrvStartup为启动类,NamesrvController为外围控制器,RouteInfoManager为路由信息表,整个 NameServ 基本上就是围绕这三个类做文章。

NameServe 的类结构图如下:

源码剖析

NameServ 启动

NameServ 的启动步骤次要有上面几个点:

  1. 创立 NameServ 控制器,解析和创立重要配置,重要外围控制器创立并注入配置。
  2. NameServ 外围控制器初始化,NettyServ 服务等次重要相干组件创立和初始化。
  3. 启动定时工作,定期扫描过期 Broker 并且移除不沉闷 Broker,定期打印零碎全副的 KV 配置。
  4. 注册 JVM 钩子函数优雅敞开资源(Netty 和线程池),启动 Netty。
  5. Netty 服务启动

在理解代码细节之前,咱们先画一个时序图理解 NameServ 的启动过程:

显然 NameServ 的整个启动基本上是在为 Nettty 做了一系列周边服务,Netty 是网络通信的外围框架。

拜访入口

整个 NameServ 的入口为org.apache.rocketmq.namesrv.NamesrvStartup#main0,咱们间接定位到相干代码。

public static NamesrvController main0(String[] args) {  
  
    try {  
        // 1. 构建外围控制器
        NamesrvController controller = createNamesrvController(args);  
        // 2. 启动控制器
        start(controller);  
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();  
        log.info(tip);  
        System.out.printf("%s%n", tip);  
        return controller;  
    } catch (Throwable e) {e.printStackTrace();  
        System.exit(-1);  
    }  
  
    return null;  
}

构建外围控制器

NameServer 一开始的工作是构建外围控制器,从整体上看次要做了上面几个操作:

  1. 调用 Apach Commons CLI 命令行解析工具进行命令解析。
  2. 依据 运行时参数 生成 commandLine 命令行对象。
  3. 创立 NamesrvConfig 和 NettyServerConfig 对象,读取 -c 指定的配置文件门路解析配置文件。
  4. namesrvConfignettyServerConfig 对象进行初始化。

Apach Commons CLI 工具能够帮忙开发者疾速构建服务器启动命令参数,并且反对输入到列表。这里咱们接着进入到 org.apache.rocketmq.namesrv.NamesrvStartup#createNamesrvController 一探到底。进入之后发现代码还不少,所以咱们拆成 多个局部 剖析。

上面是残缺的代码:

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));  
    //PackageConflictDetect.detectFastjson();  
  
    Options options = ServerUtil.buildCommandlineOptions(new Options());  
    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());  
    if (null == commandLine) {System.exit(-1);  
        return null;    
    }  
  
    final NamesrvConfig namesrvConfig = new NamesrvConfig();  
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();  
    nettyServerConfig.setListenPort(9876);  
    if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');  
        if (file != null) {InputStream in = new BufferedInputStream(new FileInputStream(file));  
            properties = new Properties();  
            properties.load(in);  
            MixAll.properties2Object(properties, namesrvConfig);  
            MixAll.properties2Object(properties, nettyServerConfig);  
  
            namesrvConfig.setConfigStorePath(file);  
  
            System.out.printf("load config properties file OK, %s%n", file);  
            in.close();}  
    }  
  
    if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);  
        MixAll.printObjectProperties(console, namesrvConfig);  
        MixAll.printObjectProperties(console, nettyServerConfig);  
        System.exit(0);  
    }  
  
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);  
  
    if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);  
        System.exit(-2);  
    }  
  
    LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();  
    JoranConfigurator configurator = new JoranConfigurator();  
    configurator.setContext(lc);  
    lc.reset();  
    configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");  
  
    log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);  
  
    MixAll.printObjectProperties(log, namesrvConfig);  
    MixAll.printObjectProperties(log, nettyServerConfig);  
  
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);  
  
    // remember all configs to prevent discard  
    controller.getConfiguration().registerConfig(properties);  
  
    return controller;  
}

因为内容比拟多,这里这里分段进行介绍,,首先是注册相干启动后命令:

System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));  
    //PackageConflictDetect.detectFastjson();  
      // 创立命令行参数对象,这里定义了 -h 和 - n 参数
    Options options = ServerUtil.buildCommandlineOptions(new Options());  
    
    // 依据 Options 和运行时参数 args 生成命令行对象,buildCommandlineOptions 定义了 - c 参数(Name server config properties file)和 - p 参数(Print all config item)commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());  
    if (null == commandLine) {System.exit(-1);  
        return null;    
    }  

ServerUtil.buildCommandlineOptions(new Options())以及 org.apache.rocketmq.namesrv.NamesrvStartup#buildCommandlineOptions 办法外部的逻辑:

// org.apache.rocketmq.srvutil.ServerUtil#buildCommandlineOptions
public static Options buildCommandlineOptions(final Options options) {Option opt = new Option("h", "help", false, "Print help");  
    opt.setRequired(false);  
    options.addOption(opt);  
  
    opt =  
        new Option("n", "namesrvAddr", true,  
            "Name server address list, eg:'192.168.0.1:9876;192.168.0.2:9876'");  
    opt.setRequired(false);  
    options.addOption(opt);  
  
    return options;  
}

// org.apache.rocketmq.namesrv.NamesrvStartup#buildCommandlineOptions
public static Options buildCommandlineOptions(final Options options) {Option opt = new Option("c", "configFile", true, "Name server config properties file");  
    opt.setRequired(false);  
    options.addOption(opt);  
  
    opt = new Option("p", "printConfigItem", false, "Print all config items");  
    opt.setRequired(false);  
    options.addOption(opt);  
  
    return options;  
}

从集体来看这个办法并不直观,并且复用性比拟低,集体比拟偏向于改成上面的形式:

public static Option buildCommandlineOption(String opt, String longOpt, boolean hasArg, String description, boolean required){Option option = new Option(opt, longOpt, hasArg, description);
    option.setRequired(required);
    return option;
}

最初在本地集体把代码革新为上面的形式,尽管参数还须要优化,然而感觉直观了不少:

System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));  
//PackageConflictDetect.detectFastjson();  
Options options = new Options();  
// Modified to a more intuitive way of adding commands  
options.addOption(ServerUtil.buildCommandlineOption("c", "configFile", true, "Name server config properties file", false));  
options.addOption(ServerUtil.buildCommandlineOption("p", "printConfigItem", false, "Print all config items", false));  
options.addOption(ServerUtil.buildCommandlineOption("h", "help", false, "Print help", false));  
options.addOption(ServerUtil.buildCommandlineOption("n", "namesrvAddr", true,  
        "Name server address list, eg:'192.168.0.1:9876;192.168.0.2:9876'", false));  
  
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, options, new PosixParser());  
if (null == commandLine) {System.exit(-1);  
    return null;}

如果感觉惹眼能够把这一段放到写好的办法外面,通过集体倒腾之后最终的代码如下:

System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));  
//PackageConflictDetect.detectFastjson();   
Options options = buildCommandlineOptions(options);  
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, options, new PosixParser());  
if (null == commandLine) {System.exit(-1);  
    return null;
    
}

//......
public static Options buildCommandlineOptions() {Options options = new Options(); 
    // Modified to a more intuitive way of adding commands  
    options.addOption(ServerUtil.buildCommandlineOption("c", "configFile", true, "Name server config properties file", false));  
    options.addOption(ServerUtil.buildCommandlineOption("p", "printConfigItem", false, "Print all config items", false));  
    options.addOption(ServerUtil.buildCommandlineOption("h", "help", false, "Print help", false));  
    options.addOption(ServerUtil.buildCommandlineOption("n", "namesrvAddr", true,  
            "Name server address list, eg:'192.168.0.1:9876;192.168.0.2:9876'", false));  
    return options;  
}

    

置信读者对于 Apach Commons CLI 命令行解析工具进行命令解析有了大抵的理解。Apach 的命令行解析工具帮忙开发者依据运行时候的参数构建命令行对象,之后再通过 -c 的参数决定是否读取配置文件,解析配置文件之后填充到 namesrvConfignettyServerConfig对象中。

解析命令之后是填充配置到对应的对象,填充配置文件的配置代码如下:

final NamesrvConfig namesrvConfig = new NamesrvConfig();  
final NettyServerConfig nettyServerConfig = new NettyServerConfig();  
nettyServerConfig.setListenPort(9876);  
if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');  
    if (file != null) {InputStream in = new BufferedInputStream(new FileInputStream(file));  
        properties = new Properties();  
        properties.load(in);  
        MixAll.properties2Object(properties, namesrvConfig);  
        MixAll.properties2Object(properties, nettyServerConfig);  
  
        namesrvConfig.setConfigStorePath(file);  
  
        System.out.printf("load config properties file OK, %s%n", file);  
        in.close();}  
}

这一段算是 createNamesrvController(String[] args) 外围代码之一,作用是先创立 NettyServerConfig 以及 NamesrvConfig 对象,而后利用 commandLine 命令行工具读取 -c 指定的配置文件门路,这里用比拟经典的缓冲流文件 IO 读取,之后生成 Properties 对象,这些代码根本都是 JAVAEE 根底,就不一一扣细节了。

当生成 Properties 对象实现之后,将 namesrvConfig 和 nettyServerConfig 对象进行初始化。接下来有一些不重要的代码,比方发现没有参数配置 RocketMqHome 会给出提醒:

if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);  
    System.exit(-2);  
}

再比方会依据 RocketMqHome 的根门路下固定门路加载 logback_namesrv.xml 日志配置文件,如果把日志重定向到本人其余磁盘门路,须要留神conf 这个层级文件夹以及日志配置文件一并拷贝。

LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();  
JoranConfigurator configurator = new JoranConfigurator();  
configurator.setContext(lc);  
lc.reset();  
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

之后便是重点操作创立 NamesrvController 外围控制器了,这外面把 namesrvConfig 和 nettyServerConfig 载入到外围控制器待后续初始化应用,代码如下:

final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

// remember all configs to prevent discard 
// 记住所有的配置以避免抛弃  
controller.getConfiguration().registerConfig(properties);

下面的代码瓜熟蒂落地利用 namesrvConfig 和 nettyServerConfig 对象 创立 NamesrvController 对象,而后在注册一遍 properties 避免失落。

留神这里应用了 JUC 的 java.util.concurrent.locks.ReadWriteLock读写锁进行操作

ReadWriteLock 是什么,能够参考廖老师的博客:应用 ReadWriteLock – 廖雪峰的官方网站 (liaoxuefeng.com)
应用 ReadWriteLock 能够进步读取效率:

  • ReadWriteLock只容许一个线程写入;
  • ReadWriteLock容许多个线程在没有写入时同时读取;
  • ReadWriteLock适宜读多写少的场景。

看完之后咱们发现createNamesrvController(String[] args) 是十分重要的办法,外部的要害操作如下:

  • 提供 namesrvConfignettyServerConfig配置对象
  • 创立NamesrvControlle r 外围控制器

创立完外围控制器之后紧接着便是启动控制器,这里有着次重要级别的初始化操作:

// 2. 启动控制器
start(controller);  

初始化

创立外围控制器之后,紧接着是外围控制器的相干初始化动作,初始化的重要工作是上面几个:

  • 初始化外围控制器,外部逻辑属于次重要级相干组件启动。
  • 注册 JVM 钩子函数优雅敞开 Netty 和开释资源
  • 外围控制器真正启动运行,实际上为触发 Netty 服务开启。

org.apache.rocketmq.namesrv.NamesrvStartup#start 初始化代码如下:

public static NamesrvController start(final NamesrvController controller) throws Exception {if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");  
    }  
    // 对外围控制器进行初始化操作  
    boolean initResult = controller.initialize();  
    if (!initResult) {controller.shutdown();  
        System.exit(-3);  
    }  
    // 注册一个钩子函数,JVM 过程敞开时优雅地开释 netty 服务、线程池等资源  
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {controller.shutdown();  
        return null;    }));  
    // 外围控制器启动操作  
    controller.start();  
  
    return controller;  
}

start()的操作和创立外围控制器有点像,因为也是一个次重要级别的初始化操作。相干操作实现之后注册一个钩子函数优雅的开释 Netty 服务以及开释线程池的资源,最初对外围控制器进行启动操作。

咱们持续深刻外围控制器启动操作,org.apache.rocketmq.namesrv.NamesrvController#initialize代码如下:

public boolean initialize() {  
    // 加载 KV 配置  
    this.kvConfigManager.load();  
    // 创立 Netty 网络服务对象  
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);  
  
    this.remotingExecutor =  
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));  
  
    this.registerProcessor();  
    // 创立定时工作 -- 每个 10s 扫描一次 Broker,并定时剔除不沉闷的 Broker  
    this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);  
    // 创立定时工作 -- 每个 10 分钟打印一遍 KV 配置  
    this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES);  
  
    // 省略 SSL 判断代码
  
    return true;  
}

这部分代码次要目标是对外围控制器进行启动前的一些初始化操作,包含上面一些内容:

  • 依据下面办法初始化的 NamesrvConfigkvConfigPath(存储 KV 配置属性的门路)加载 KV 配置
  • 创立两个定时工作:

    • 每隔 10s 扫描一次 Broker,并定时剔除不沉闷的 Broker
    • 每隔 10 分钟打印一遍 KV 配置

这里的定时工作每次距离 10s 扫描一次 Broker,并定时剔除不沉闷的 Broker。

路由删除的逻辑放到前面进行介绍,这里临时跳过

之后咱们持续看外围控制器是如何启动的,办法入口为org.apache.rocketmq.namesrv.NamesrvController#start

public void start() throws Exception {this.remotingServer.start();  
  
    if (this.fileWatchService != null) {this.fileWatchService.start();  
    }  
}

非常简单的,代码其实就是启动一下 Netty 服务罢了,因为 RocketMq 底层通信是依赖 Netty 的,不过 Netty 的细节不在本文的探讨范畴,这里就不过多介绍开掘细节了。

至此整个路由启动的代码实现。

NameServ 注册 Broker

路由注册的时序图如下:

路由注册简略来说就是 Broker 注册到 NameServ 的过程,次要是通过心跳包实现的,那么 Broker 在代码中是如何存储的呢?咱们依据下面的时序图最初一步就能够间接找到答案,就是在 RouteManager外面,外面保护了上面的信息:

private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;  
private final Map<String/* brokerName */, BrokerData> brokerAddrTable;  
private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;  
private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;  
private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;  
private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;

和 Spring 治理 Bean 差不多的套路,用的是万能的 Map,下面定义的变量中比拟重要的如下(和文章结尾对应的统一):

  • topicQueueTable:Topic 音讯队列路由信息,包含 topic 所在的 broker 名称,读队列数量,写队列数量,同步标记等信息,rocketmq 依据 topicQueueTable 的信息进行负载平衡音讯发送。
  • brokerAddrTable:Broker 节点信息,包含 brokername,所在集群名称,还有主备节点信息。
  • clusterAddrTable:Broker 集群信息,存储了集群中所有的 Brokername。
  • brokerLiveTable:Broker 状态信息,Nameserver 每次收到 Broker 的心跳包就会更新该信息。

RocketMq 在音讯队列生产模式上应用的是公布订阅的模式设计,这在 [[【RocketMq】RocketMq 扫盲]] 中也有提到,这里不多赘述。

Broker 中会存在一个 Topic 中有很多个 Queue 的状况,在默认的参数配置中 RocketMq 为每个新创建的 Topic 默认调配 4 个读队列和 4 个写队列,多个 Broker 还会组成集群,Broker 还会定期向 NameServ 发送心跳包注册信息,NameServ 则通过 brokerLiveTable 实现 Broker 节点状态的治理。

上面咱们依据时序图一步步往下察看 NameServ 注册 Broker 的过程:

发送心跳包

下面咱们剖析了 NameServ 的启动代码,其实察看 Broker 的启动代码会发现有肯定的相似之处,都是第一步构建一个控制器,而后 start(),创立控制器这一部分内容不是重点这里跳过,咱们接着看 start()办法。

public static void main(String[] args) {start(createBrokerController(args));  
}  
  
public static BrokerController start(BrokerController controller) {  
    try {controller.start();  
  
        String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ","  
            + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();  
  
        if (null != controller.getBrokerConfig().getNamesrvAddr()) {tip += "and name server is" + controller.getBrokerConfig().getNamesrvAddr();}  
  
        log.info(tip);  
        System.out.printf("%s%n", tip);  
        return controller;  
    } catch (Throwable e) {e.printStackTrace();  
        System.exit(-1);  
    }  
  
    return null;  
}

controller.start();是时序图的开始,上面是 org.apache.rocketmq.broker.BrokerController#start: 的外部代码:

public void start() throws Exception {// 此处省略相干依赖组件的 start()过程
    //.....
    
    if (!messageStoreConfig.isEnableDLegerCommitLog()) {startProcessorByHa(messageStoreConfig.getBrokerRole());  
        // 主从同步节点配置解决
        handleSlaveSynchronize(messageStoreConfig.getBrokerRole());  
        // 首次启动强制发送心跳包
        this.registerBrokerAll(true, false, true);  
    }

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  
  
        @Override  
        public void run() {  
            try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());  
            } catch (Throwable e) {log.error("registerBrokerAll Exception", e);  
            }  
        }  
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);  
  
    if (this.brokerStatsManager != null) {this.brokerStatsManager.start();  
    }  
  
    if (this.brokerFastFailure != null) {this.brokerFastFailure.start();  
    }  
  
  
}

registerBrokerAll 这个办法的参数可读性不太好,所以这里列举一下三个参数的程序以及代码对应的参数数值:

  • boolean checkOrderConfig(true)
  • boolean oneway(false)
  • boolean forceRegister(true)

搭配上参数之后就比拟好懂了,也就是说加上配置校验以及强制执行一次注册动作,并且以非 oneWay 的形式发送一次心跳包。

上面咱们顺利进入到 registerBrokerAll() 办法,办法外部首先创立 topic 包装类,而后会有一段比拟有意思的代码,那就是如果没有读写权限会默认从新创立一个长期应用的 topicConfigTable 设置到 Topic 当中,之后是判断 Broker 此时是否须要执行发送心跳包。

然而咱们回到上一级调用 this.registerBrokerAll(true, false, true); 这里的参数传递就会发现,实际上forceRegister 总是为 true,也就是说基本上每个 Broker 第一次初始化必然须要传递心跳包的:

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {  
    // 创立 TopicConfigSerializeWrapper,topic 包装类  
    TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();  
    // 如果没有读写权限,此时会默认从新创立一个长期应用的 topicConfigTable,作为 Topic 包装类的参数数值  
    // 集体认为这一步是避免空参数导致前面的办法出现异常, 同时如果后续具备读写权限之后不须要从新创立间接应用  
    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())  
        || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {// 这里初始化的值能够应用默认的 Topic 配置数量,比方加上 topicConfigWrapper.getTopicConfigTable().values().size()  
        ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>(topicConfigWrapper.getTopicConfigTable().values().size());  
        for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {  
            TopicConfig tmp =  
                new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),  
                    this.brokerConfig.getBrokerPermission());  
            topicConfigTable.put(topicConfig.getTopicName(), tmp);  
        }  
        topicConfigWrapper.setTopicConfigTable(topicConfigTable);  
    }  
    // 判断 Broker 是否须要发送心跳包
    if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),  
        this.getBrokerAddr(),  
        this.brokerConfig.getBrokerName(),  
        this.brokerConfig.getBrokerId(),  
        this.brokerConfig.getRegisterBrokerTimeoutMills())) {  
        // 执行发送心跳包
        doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);  
    }  
}

上面咱们接着定位到 needRegister 办法进行解读,这里咱们次要定位到 org.apache.rocketmq.broker.out.BrokerOuterAPI#needRegister 办法,这里截取要害代码如下:

brokerOuterExecutor.execute(() -> {  
    try {QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();  
        requestHeader.setBrokerAddr(brokerAddr);  
        requestHeader.setBrokerId(brokerId);  
        requestHeader.setBrokerName(brokerName);  
        requestHeader.setClusterName(clusterName);  
        RemotingCommand request = 
        RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);  
        request.setBody(topicConfigWrapper.getDataVersion().encode());  
        // 同步近程调用到路由核心
        RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);  
        DataVersion nameServerDataVersion = null;  
        Boolean changed = false;  
        // 省略代码:依据返回代码进行判断解决
        //..
        log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);  
    } catch (Exception e) {changedList.add(Boolean.TRUE);  
        log.error("Query data version from name server {}  Exception, {}", namesrvAddr, e);  
    } finally {countDownLatch.countDown();  
    }  
});

这个代码不难理解,算是咱们平时写 HTTP 调用的一个变体,咱们能够通过RequestCode.QUERY_DATA_VERSION,查到 NameServer 的承受解决代码。

利用 IDEA 咱们很快发现 org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest 办法入口,之后进入到 org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#queryBrokerTopicConfig 办法,而后这里看到对应代码如下:

public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx,  
    RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);  
    final QueryDataVersionResponseHeader responseHeader = (QueryDataVersionResponseHeader) response.readCustomHeader();  
    final QueryDataVersionRequestHeader requestHeader =  
        (QueryDataVersionRequestHeader) request.decodeCommandCustomHeader(QueryDataVersionRequestHeader.class);  
    DataVersion dataVersion = DataVersion.decode(request.getBody(), DataVersion.class);  

    // 外部解决:如果 dataVersion 为空或者以后 dataVersion 不等于 brokerLiveTable 存储的 brokerLiveTable,Broker 就须要发送心跳包
    Boolean changed = this.namesrvController.getRouteInfoManager().isBrokerTopicConfigChanged(requestHeader.getBrokerAddr(), dataVersion);  
    
    if (!changed) {  
        // 更新 Broker 信息
                this.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getBrokerAddr(), System.currentTimeMillis());  
    }  
  
    DataVersion nameSeverDataVersion = this.namesrvController.getRouteInfoManager().queryBrokerTopicConfig(requestHeader.getBrokerAddr());  
    response.setCode(ResponseCode.SUCCESS);  
    response.setRemark(null);  
  
    if (nameSeverDataVersion != null) {response.setBody(nameSeverDataVersion.encode());  
    }  
    responseHeader.setChanged(changed);  
    return response;  
}

咱们进入到要害判断代码org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#isBrokerTopicConfigChanged

public boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {DataVersion prev = queryBrokerTopicConfig(brokerAddr);  
    // 如果 dataVersion 为空或者以后 dataVersion 不等于 brokerLiveTable 存储的 brokerLiveTable,Broker 就须要发送心跳包
    return null == prev || !prev.equals(dataVersion);  
}  
  
public DataVersion queryBrokerTopicConfig(final String brokerAddr) {BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);  
    if (prev != null) {return prev.getDataVersion();  
    }  
    return null;  
}

Broker 是否须要发送心跳包由该 Broker 在路由核心 org.apache.rocketmq.namesrv.routeinfo.BrokerLiveInfo#dataVersion 决定,如果 dataVersion 为空或者以后 dataVersion 不等于 brokerLiveTable 存储的brokerLiveTable,Broker 就须要发送心跳包。

Nameserver 解决心跳包

Nameserver 的 netty 服务监听收到心跳包之后,会调用到路由核心以下办法进行解决,具体的办法入口为:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker

public RegisterBrokerResult registerBroker(  
        final String clusterName,  
        final String brokerAddr,  
        final String brokerName,  
        final long brokerId,  
        final String haServerAddr,  
        final TopicConfigSerializeWrapper topicConfigWrapper,  
        final List<String> filterServerList,  
        final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();  
    try {  
        try {this.lock.writeLock().lockInterruptibly();  
  
            // 获取集群下所有的 Broker,并将以后 Broker 退出 clusterAddrTable,因为 brokerNames 是 Set 构造,并不会反复  
            Set<String> brokerNames = this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());  
            brokerNames.add(brokerName);  
  
            boolean registerFirst = false;  
  
            // 获取 Broker 信息,如果是首次注册,那么新建一个 BrokerData 并退出 brokerAddrTable  
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);  
            if (null == brokerData) {  
                registerFirst = true;  
                brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());  
                this.brokerAddrTable.put(brokerName, brokerData);  
            }  
            Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();  
            //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>  
            //The same IP:PORT must only have one record in brokerAddrTable            Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();  
            // 从库切换主库:首先删除 namesrv 中的 <1, IP:PORT>,而后增加 <0, IP:PORT>。// 同一个 IP: 端口在 brokerAddrTable 中只能有一条记录。while (it.hasNext()) {Entry<Long, String> item = it.next();  
                if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {log.debug("remove entry {} from brokerData", item);  
                    it.remove();}  
            }  
            // 里判断 Broker 是否是曾经注册过  
            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);  
  
            registerFirst = registerFirst || (null == oldAddr);  
            // 如果是 Broker 是 Master 节点吗,并且 Topic 信息更新或者是首次注册,那么创立更新 topic 队列信息  
            if (null != topicConfigWrapper  
                    && MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())  
                        || registerFirst) {  
                    ConcurrentMap<String, TopicConfig> tcTable =  
                            topicConfigWrapper.getTopicConfigTable();  
                    if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {this.createAndUpdateQueueData(brokerName, entry.getValue());  
                        }  
                    }  
                }  
            }  
            // 更新 BrokerLiveInfo 状态信息  
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,  
                    new BrokerLiveInfo(System.currentTimeMillis(),  
                            topicConfigWrapper.getDataVersion(),  
                            channel,  
                            haServerAddr));  
  
            if (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);  
                } else {this.filterServerTable.put(brokerAddr, filterServerList);  
                }  
            }  
            // 如果不是 MASTER_ID,则返回后果返回 masterAddr。if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);  
                if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);  
                    if (brokerLiveInfo != null) {result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());  
                        result.setMasterAddr(masterAddr);  
                    }  
                }  
            }  
        } finally {this.lock.writeLock().unlock();}  
    } catch (Exception e) {log.error("registerBroker Exception", e);  
    }  
  
    return result;  
}

下面的代码是Broker 心跳包的最外围办法,它次要做了上面几件事:

  • RouteInfoManager 路由信息的更新操作

    • clusterAddrTable 更新;
    • brokerAddrTable 更新;
    • topicQueueTable 更新;
    • brokerLiveTable 更新;

定期排除 Broker

依据实践学习咱们晓得,NameServ 在启动的时候会创立一个定时工作,定时剔除不沉闷的 Broker。这一点的源码在 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker 中能够找到答案。

此外在单元测试中就有对于这一项定期清理的测试,也是比拟快的找到入口的方法:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManagerBrokerRegisterTest#testScanNotActiveBroker

这个测试非常简单直观咱们:

private static RouteInfoManager routeInfoManager;  
public static String clusterName = "cluster";  
public static String brokerPrefix = "broker";  
public static String topicPrefix = "topic";  
public static int brokerPerName = 3;  
public static int brokerNameNumber = 3;
@Test  
public void testScanNotActiveBroker() {for (int j = 0; j < brokerNameNumber; j++) {String brokerName = getBrokerName(brokerPrefix, j);  
  
        for (int i = 0; i < brokerPerName; i++) {String brokerAddr = getBrokerAddr(clusterName, brokerName, i);  
  
            // set not active  
            routeInfoManager.updateBrokerInfoUpdateTimestamp(brokerAddr, 0);  
  
            assertEquals(1, routeInfoManager.scanNotActiveBroker());  
        }  
    }  
  
}

在启动单元测试之前会先构建 10 个 Broker 节点注册进去,这里单元测试仔细的应用了多个集群模仿生产环境:

private static RouteInfoManager routeInfoManager;  
public static String clusterName = "cluster";  
public static String brokerPrefix = "broker";  
public static String topicPrefix = "topic";  
public static int brokerPerName = 3;  
public static int brokerNameNumber = 3;
@Before  
public void setup() {routeInfoManager = new RouteInfoManager();  
    cluster = registerCluster(routeInfoManager,  
            clusterName,  
            brokerPrefix,  
            brokerNameNumber,  
            brokerPerName,  
            topicPrefix,  
            10);  
}

之后咱们间接跑一边单元测试,在日志中单元测试为咱们展现了具体的测试流程:

  1. 首先是构建 broker 注册,外部会塞入一些测试数据的 Topic 进行填充。
06:54:23.353 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-0:0 HAServer: cluster-broker-0:0
06:54:23.353 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-0:1 HAServer: cluster-broker-0:1
06:54:23.353 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-0:2 HAServer: cluster-broker-0:2
06:54:23.353 [main] INFO RocketmqNamesrv - cluster [cluster] brokerName [broker-1] master address change from null to cluster-broker-1:0
06:54:23.353 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-1:0 HAServer: cluster-broker-1:0
06:54:23.355 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-1:1 HAServer: cluster-broker-1:1
06:54:23.355 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-1:2 HAServer: cluster-broker-1:2
06:54:23.355 [main] INFO RocketmqNamesrv - cluster [cluster] brokerName [broker-2] master address change from null to cluster-broker-2:0
06:54:23.355 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-2:0 HAServer: cluster-broker-2:0
06:54:23.355 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-2:1 HAServer: cluster-broker-2:1
06:54:23.355 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-2:2 HAServer: cluster-broker-2:2
  1. 接着便是依据单元测试的代码进行遍历排除 Broker 节点,在循环的最初调用扫描查看不沉闷 Broker。这里为了验证间接设置 lastUpdateTimestamp(最初更新工夫)让 Broker 存活验证周期提前结束验证扫描成果。
06:55:34.483 [main] INFO RocketmqRemoting - closeChannel: close the connection to remote address[embedded] result: true
06:55:34.483 [main] WARN RocketmqNamesrv - The broker channel expired, cluster-broker-1:0 120000ms
06:55:34.483 [main] INFO RocketmqNamesrv - remove brokerAddr[0, cluster-broker-1:0] from brokerAddrTable, because channel destroyed
06:55:34.483 [main] INFO RocketmqRemoting - closeChannel: close the connection to remote address[embedded] result: true
06:55:34.483 [main] WARN RocketmqNamesrv - The broker channel expired, cluster-broker-1:1 120000ms
06:55:34.483 [main] INFO RocketmqNamesrv - remove brokerAddr[1, cluster-broker-1:1] from brokerAddrTable, because channel destroyed
06:55:34.483 [main] INFO RocketmqRemoting - closeChannel: close the connection to remote address[embedded] result: true
06:55:34.483 [main] WARN RocketmqNamesrv - The broker channel expired, cluster-broker-1:2 120000ms
06:55:34.484 [main] INFO RocketmqNamesrv - remove brokerAddr[2, cluster-broker-1:2] from brokerAddrTable, because channel destroyed
06:55:34.484 [main] INFO RocketmqNamesrv - remove brokerName[broker-1] from brokerAddrTable, because channel destroyed
06:55:34.484 [main] INFO RocketmqNamesrv - remove brokerName[broker-1], clusterName[cluster] from clusterAddrTable, because channel destroyed

以上便是单元测试的大抵内容,咱们接着看看具体的代码即可,这里还是用了迭代器模式进行遍历删除,又是一个经典的设计模式:

public int scanNotActiveBroker() {  
    int removeCount = 0;  
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();  
    while (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();  
        long last = next.getValue().getLastUpdateTimestamp();  
        // BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2 = 120 秒,在单元测试中这里的 last 被设置为 0 所以必然超时
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {RemotingUtil.closeChannel(next.getValue().getChannel());  
            it.remove();  
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);  
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());  
  
            removeCount++;  
        }  
    }  
  
    return removeCount;  
}

剔除 Broker 信息的逻辑比较简单,首先从 BrokerLiveInfo 获取状态信息,判断 Broker 的心跳工夫是否已超过限定值(默认 120 秒),若超过之后就执行剔除操作。

写在最初

剖析完了 rocketmq 自带的路由核心源码,其实咱们本人实现一个路由核心貌似也不难。NameServ 小而美的设计十分取巧,当然仅仅几百行代码的确还是存在比拟多的不完满之处,很多计划须要开发人员本人编写业务代码兜底,然而有因为设计简略负责的工作,应用并且业务代码扩展性很强,保护成本低并且性能不错。

NameServ 作为整个 RocketMq 的外围用法上简略的同时非常适合作为 Rocketmq 的切入点,集体在浏览代码中也会尝试批改代码查看成果,本人参加到源码编写和革新过程这会对代码编写者的思路更为清晰了解,也算是一个源码浏览的小技巧吧。

参考资料

  • RocketMQ 源码剖析之路由核心 (objcoding.com)
退出移动版