关于rocketmq:RocketMqRocketMqNameServ-源码分析Ver494

引言

RocketMq3.X的版本和Kafka一样是基于Zookeeper进行路由治理的,然而这意味着运维须要多部署一套Zookeeper集群,起初RocketMq抉择去ZK最终呈现了NameServ。NameServ作为RocketMq源码浏览的切入点十分不错,本文将会介绍Ver 4.9.4 版本的NameServ源码剖析。

NameServer次要有两个性能,Broker治理路由信息管理

整个NameServ理论代码只有几百行,因为自身呈现基本目标就是代替ZK,所以角色相似ZK。在上面的截图中,NamesrvStartup为启动类,NamesrvController为外围控制器,RouteInfoManager为路由信息表,整个NameServ基本上就是围绕这三个类做文章。

NameServe的类结构图如下:

源码剖析

NameServ 启动

NameServ的启动步骤次要有上面几个点:

  1. 创立NameServ控制器,解析和创立重要配置,重要外围控制器创立并注入配置。
  2. NameServ外围控制器初始化,NettyServ服务等次重要相干组件创立和初始化。
  3. 启动定时工作,定期扫描过期Broker并且移除不沉闷Broker,定期打印零碎全副的KV配置。
  4. 注册JVM钩子函数优雅敞开资源(Netty和线程池),启动Netty。
  5. Netty服务启动

在理解代码细节之前,咱们先画一个时序图理解NameServ的启动过程:

显然NameServ的整个启动基本上是在为Nettty做了一系列周边服务,Netty是网络通信的外围框架。

拜访入口

整个NameServ的入口为org.apache.rocketmq.namesrv.NamesrvStartup#main0,咱们间接定位到相干代码。

public static NamesrvController main0(String[] args) {  
  
    try {  
        // 1. 构建外围控制器
        NamesrvController controller = createNamesrvController(args);  
        // 2. 启动控制器
        start(controller);  
        String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();  
        log.info(tip);  
        System.out.printf("%s%n", tip);  
        return controller;  
    } catch (Throwable e) {  
        e.printStackTrace();  
        System.exit(-1);  
    }  
  
    return null;  
}

构建外围控制器

NameServer一开始的工作是构建外围控制器,从整体上看次要做了上面几个操作:

  1. 调用Apach Commons CLI 命令行解析工具进行命令解析。
  2. 依据运行时参数生成commandLine命令行对象。
  3. 创立NamesrvConfig和NettyServerConfig对象,读取-c指定的配置文件门路解析配置文件。
  4. namesrvConfignettyServerConfig 对象进行初始化。

Apach Commons CLI 工具能够帮忙开发者疾速构建服务器启动命令参数,并且反对输入到列表。这里咱们接着进入到 org.apache.rocketmq.namesrv.NamesrvStartup#createNamesrvController 一探到底。进入之后发现代码还不少,所以咱们拆成多个局部剖析。

上面是残缺的代码:

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {  
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));  
    //PackageConflictDetect.detectFastjson();  
  
    Options options = ServerUtil.buildCommandlineOptions(new Options());  
    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());  
    if (null == commandLine) {  
        System.exit(-1);  
        return null;    
    }  
  
    final NamesrvConfig namesrvConfig = new NamesrvConfig();  
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();  
    nettyServerConfig.setListenPort(9876);  
    if (commandLine.hasOption('c')) {  
        String file = commandLine.getOptionValue('c');  
        if (file != null) {  
            InputStream in = new BufferedInputStream(new FileInputStream(file));  
            properties = new Properties();  
            properties.load(in);  
            MixAll.properties2Object(properties, namesrvConfig);  
            MixAll.properties2Object(properties, nettyServerConfig);  
  
            namesrvConfig.setConfigStorePath(file);  
  
            System.out.printf("load config properties file OK, %s%n", file);  
            in.close();  
        }  
    }  
  
    if (commandLine.hasOption('p')) {  
        InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);  
        MixAll.printObjectProperties(console, namesrvConfig);  
        MixAll.printObjectProperties(console, nettyServerConfig);  
        System.exit(0);  
    }  
  
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);  
  
    if (null == namesrvConfig.getRocketmqHome()) {  
        System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);  
        System.exit(-2);  
    }  
  
    LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();  
    JoranConfigurator configurator = new JoranConfigurator();  
    configurator.setContext(lc);  
    lc.reset();  
    configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");  
  
    log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);  
  
    MixAll.printObjectProperties(log, namesrvConfig);  
    MixAll.printObjectProperties(log, nettyServerConfig);  
  
    final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);  
  
    // remember all configs to prevent discard  
    controller.getConfiguration().registerConfig(properties);  
  
    return controller;  
}

因为内容比拟多,这里这里分段进行介绍,,首先是注册相干启动后命令:

System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));  
    //PackageConflictDetect.detectFastjson();  
      // 创立命令行参数对象,这里定义了 -h 和 -n参数
    Options options = ServerUtil.buildCommandlineOptions(new Options());  
    
    // 依据Options和运行时参数args生成命令行对象,buildCommandlineOptions定义了-c参数(Name server config properties file)和-p参数(Print all config item)

    commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());  
    if (null == commandLine) {  
        System.exit(-1);  
        return null;    
    }  

ServerUtil.buildCommandlineOptions(new Options())以及org.apache.rocketmq.namesrv.NamesrvStartup#buildCommandlineOptions办法外部的逻辑:

// org.apache.rocketmq.srvutil.ServerUtil#buildCommandlineOptions
public static Options buildCommandlineOptions(final Options options) {  
    Option opt = new Option("h", "help", false, "Print help");  
    opt.setRequired(false);  
    options.addOption(opt);  
  
    opt =  
        new Option("n", "namesrvAddr", true,  
            "Name server address list, eg: '192.168.0.1:9876;192.168.0.2:9876'");  
    opt.setRequired(false);  
    options.addOption(opt);  
  
    return options;  
}

// org.apache.rocketmq.namesrv.NamesrvStartup#buildCommandlineOptions
public static Options buildCommandlineOptions(final Options options) {  
    Option opt = new Option("c", "configFile", true, "Name server config properties file");  
    opt.setRequired(false);  
    options.addOption(opt);  
  
    opt = new Option("p", "printConfigItem", false, "Print all config items");  
    opt.setRequired(false);  
    options.addOption(opt);  
  
    return options;  
}

从集体来看这个办法并不直观,并且复用性比拟低,集体比拟偏向于改成上面的形式:

public static Option buildCommandlineOption(String opt, String longOpt, boolean hasArg, String description, boolean required){
    Option option = new Option(opt, longOpt, hasArg, description);
    option.setRequired(required);
    return option;
}

最初在本地集体把代码革新为上面的形式,尽管参数还须要优化,然而感觉直观了不少:

System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));  
//PackageConflictDetect.detectFastjson();  
Options options = new Options();  
// Modified to a more intuitive way of adding commands  
options.addOption(ServerUtil.buildCommandlineOption("c", "configFile", true, "Name server config properties file", false));  
options.addOption(ServerUtil.buildCommandlineOption("p", "printConfigItem", false, "Print all config items", false));  
options.addOption(ServerUtil.buildCommandlineOption("h", "help", false, "Print help", false));  
options.addOption(ServerUtil.buildCommandlineOption("n", "namesrvAddr", true,  
        "Name server address list, eg: '192.168.0.1:9876;192.168.0.2:9876'", false));  
  
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, options, new PosixParser());  
if (null == commandLine) {  
    System.exit(-1);  
    return null;}

如果感觉惹眼能够把这一段放到写好的办法外面,通过集体倒腾之后最终的代码如下:

System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));  
//PackageConflictDetect.detectFastjson();   
Options options = buildCommandlineOptions(options);  
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, options, new PosixParser());  
if (null == commandLine) {  
    System.exit(-1);  
    return null;
    
}

//......
public static Options buildCommandlineOptions() {  
    Options options = new Options(); 
    // Modified to a more intuitive way of adding commands  
    options.addOption(ServerUtil.buildCommandlineOption("c", "configFile", true, "Name server config properties file", false));  
    options.addOption(ServerUtil.buildCommandlineOption("p", "printConfigItem", false, "Print all config items", false));  
    options.addOption(ServerUtil.buildCommandlineOption("h", "help", false, "Print help", false));  
    options.addOption(ServerUtil.buildCommandlineOption("n", "namesrvAddr", true,  
            "Name server address list, eg: '192.168.0.1:9876;192.168.0.2:9876'", false));  
    return options;  
}

    

置信读者对于Apach Commons CLI 命令行解析工具进行命令解析有了大抵的理解。Apach的命令行解析工具帮忙开发者依据运行时候的参数构建命令行对象,之后再通过 -c 的参数决定是否读取配置文件,解析配置文件之后填充到namesrvConfignettyServerConfig对象中。

解析命令之后是填充配置到对应的对象,填充配置文件的配置代码如下:

final NamesrvConfig namesrvConfig = new NamesrvConfig();  
final NettyServerConfig nettyServerConfig = new NettyServerConfig();  
nettyServerConfig.setListenPort(9876);  
if (commandLine.hasOption('c')) {  
    String file = commandLine.getOptionValue('c');  
    if (file != null) {  
        InputStream in = new BufferedInputStream(new FileInputStream(file));  
        properties = new Properties();  
        properties.load(in);  
        MixAll.properties2Object(properties, namesrvConfig);  
        MixAll.properties2Object(properties, nettyServerConfig);  
  
        namesrvConfig.setConfigStorePath(file);  
  
        System.out.printf("load config properties file OK, %s%n", file);  
        in.close();  
    }  
}

这一段算是createNamesrvController(String[] args)外围代码之一,作用是先创立NettyServerConfig以及NamesrvConfig对象,而后利用commandLine命令行工具读取-c指定的配置文件门路,这里用比拟经典的缓冲流文件IO读取,之后生成Properties对象,这些代码根本都是JAVAEE根底,就不一一扣细节了。

当生成Properties对象实现之后,将namesrvConfig和nettyServerConfig对象进行初始化。接下来有一些不重要的代码,比方发现没有参数配置RocketMqHome会给出提醒:

if (null == namesrvConfig.getRocketmqHome()) {  
    System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);  
    System.exit(-2);  
}

再比方会依据RocketMqHome的根门路下固定门路加载logback_namesrv.xml日志配置文件,如果把日志重定向到本人其余磁盘门路,须要留神conf 这个层级文件夹以及日志配置文件一并拷贝。

LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();  
JoranConfigurator configurator = new JoranConfigurator();  
configurator.setContext(lc);  
lc.reset();  
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

之后便是重点操作创立NamesrvController外围控制器了,这外面把namesrvConfig和nettyServerConfig载入到外围控制器待后续初始化应用,代码如下:

final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

// remember all configs to prevent discard 
// 记住所有的配置以避免抛弃  
controller.getConfiguration().registerConfig(properties);

下面的代码瓜熟蒂落地利用namesrvConfig和nettyServerConfig对象创立NamesrvController对象,而后在注册一遍properties避免失落。

留神这里应用了JUC的 java.util.concurrent.locks.ReadWriteLock读写锁进行操作

ReadWriteLock 是什么,能够参考廖老师的博客:应用ReadWriteLock – 廖雪峰的官方网站 (liaoxuefeng.com)
应用ReadWriteLock能够进步读取效率:

  • ReadWriteLock只容许一个线程写入;
  • ReadWriteLock容许多个线程在没有写入时同时读取;
  • ReadWriteLock适宜读多写少的场景。

看完之后咱们发现createNamesrvController(String[] args) 是十分重要的办法,外部的要害操作如下:

  • 提供namesrvConfignettyServerConfig配置对象
  • 创立NamesrvController外围控制器

创立完外围控制器之后紧接着便是启动控制器,这里有着次重要级别的初始化操作:

// 2. 启动控制器
start(controller);  

初始化

创立外围控制器之后,紧接着是外围控制器的相干初始化动作,初始化的重要工作是上面几个:

  • 初始化外围控制器,外部逻辑属于次重要级相干组件启动。
  • 注册JVM钩子函数优雅敞开Netty和开释资源
  • 外围控制器真正启动运行,实际上为触发Netty服务开启。

org.apache.rocketmq.namesrv.NamesrvStartup#start 初始化代码如下:

public static NamesrvController start(final NamesrvController controller) throws Exception {  
  
    if (null == controller) {  
        throw new IllegalArgumentException("NamesrvController is null");  
    }  
    // 对外围控制器进行初始化操作  
    boolean initResult = controller.initialize();  
    if (!initResult) {  
        controller.shutdown();  
        System.exit(-3);  
    }  
    // 注册一个钩子函数,JVM过程敞开时优雅地开释netty服务、线程池等资源  
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, (Callable<Void>) () -> {  
        controller.shutdown();  
        return null;    }));  
    // 外围控制器启动操作  
    controller.start();  
  
    return controller;  
}

start()的操作和创立外围控制器有点像,因为也是一个次重要级别的初始化操作。相干操作实现之后注册一个钩子函数优雅的开释Netty服务以及开释线程池的资源,最初对外围控制器进行启动操作。

咱们持续深刻外围控制器启动操作,org.apache.rocketmq.namesrv.NamesrvController#initialize代码如下:

public boolean initialize() {  
    // 加载KV配置  
    this.kvConfigManager.load();  
    // 创立Netty网络服务对象  
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);  
  
    this.remotingExecutor =  
        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));  
  
    this.registerProcessor();  
    // 创立定时工作--每个10s扫描一次Broker,并定时剔除不沉闷的Broker  
    this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);  
    // 创立定时工作--每个10分钟打印一遍KV配置  
    this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES);  
  
    // 省略SSL判断代码
  
    return true;  
}

这部分代码次要目标是对外围控制器进行启动前的一些初始化操作,包含上面一些内容:

  • 依据下面办法初始化的NamesrvConfigkvConfigPath(存储KV配置属性的门路)加载KV配置
  • 创立两个定时工作:

    • 每隔10s扫描一次Broker,并定时剔除不沉闷的Broker
    • 每隔10分钟打印一遍KV配置

这里的定时工作每次距离10s扫描一次Broker,并定时剔除不沉闷的Broker。

路由删除的逻辑放到前面进行介绍,这里临时跳过

之后咱们持续看外围控制器是如何启动的,办法入口为org.apache.rocketmq.namesrv.NamesrvController#start

public void start() throws Exception {  
    this.remotingServer.start();  
  
    if (this.fileWatchService != null) {  
        this.fileWatchService.start();  
    }  
}

非常简单的,代码其实就是启动一下Netty服务罢了,因为RocketMq 底层通信是依赖Netty的,不过Netty的细节不在本文的探讨范畴,这里就不过多介绍开掘细节了。

至此整个路由启动的代码实现。

NameServ注册Broker

路由注册的时序图如下:

路由注册简略来说就是Broker注册到NameServ的过程,次要是通过心跳包实现的,那么Broker在代码中是如何存储的呢?咱们依据下面的时序图最初一步就能够间接找到答案,就是在 RouteManager外面,外面保护了上面的信息:

private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;  
private final Map<String/* brokerName */, BrokerData> brokerAddrTable;  
private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;  
private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;  
private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;  
private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;

和Spring 治理Bean差不多的套路,用的是万能的Map,下面定义的变量中比拟重要的如下(和文章结尾对应的统一):

  • topicQueueTable:Topic音讯队列路由信息,包含topic所在的broker名称,读队列数量,写队列数量,同步标记等信息,rocketmq依据topicQueueTable的信息进行负载平衡音讯发送。
  • brokerAddrTable:Broker节点信息,包含brokername,所在集群名称,还有主备节点信息。
  • clusterAddrTable:Broker集群信息,存储了集群中所有的Brokername。
  • brokerLiveTable:Broker状态信息,Nameserver每次收到Broker的心跳包就会更新该信息。

RocketMq在音讯队列生产模式上应用的是公布订阅的模式设计,这在[[【RocketMq】RocketMq 扫盲]]中也有提到,这里不多赘述。

Broker中会存在一个Topic中有很多个Queue的状况,在默认的参数配置中RocketMq为每个新创建的Topic默认调配4个读队列和4个写队列,多个Broker还会组成集群,Broker还会定期向NameServ发送心跳包注册信息,NameServ则通过brokerLiveTable实现Broker节点状态的治理。

上面咱们依据时序图一步步往下察看NameServ注册Broker的过程:

发送心跳包

下面咱们剖析了NameServ的启动代码,其实察看Broker的启动代码会发现有肯定的相似之处,都是第一步构建一个控制器,而后start(),创立控制器这一部分内容不是重点这里跳过,咱们接着看start()办法。

public static void main(String[] args) {  
    start(createBrokerController(args));  
}  
  
public static BrokerController start(BrokerController controller) {  
    try {  
  
        controller.start();  
  
        String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "  
            + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();  
  
        if (null != controller.getBrokerConfig().getNamesrvAddr()) {  
            tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();  
        }  
  
        log.info(tip);  
        System.out.printf("%s%n", tip);  
        return controller;  
    } catch (Throwable e) {  
        e.printStackTrace();  
        System.exit(-1);  
    }  
  
    return null;  
}

controller.start();是时序图的开始,上面是org.apache.rocketmq.broker.BrokerController#start:的外部代码:

public void start() throws Exception {  

    // 此处省略相干依赖组件的start()过程
    //.....
    
    if (!messageStoreConfig.isEnableDLegerCommitLog()) {  
        startProcessorByHa(messageStoreConfig.getBrokerRole());  
        // 主从同步节点配置解决
        handleSlaveSynchronize(messageStoreConfig.getBrokerRole());  
        // 首次启动强制发送心跳包
        this.registerBrokerAll(true, false, true);  
    }

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  
  
        @Override  
        public void run() {  
            try {  
                BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());  
            } catch (Throwable e) {  
                log.error("registerBrokerAll Exception", e);  
            }  
        }  
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);  
  
    if (this.brokerStatsManager != null) {  
        this.brokerStatsManager.start();  
    }  
  
    if (this.brokerFastFailure != null) {  
        this.brokerFastFailure.start();  
    }  
  
  
}

registerBrokerAll 这个办法的参数可读性不太好,所以这里列举一下三个参数的程序以及代码对应的参数数值:

  • boolean checkOrderConfig(true)
  • boolean oneway(false)
  • boolean forceRegister(true)

搭配上参数之后就比拟好懂了,也就是说加上配置校验以及强制执行一次注册动作,并且以非oneWay的形式发送一次心跳包。

上面咱们顺利进入到 registerBrokerAll() 办法,办法外部首先创立topic包装类 ,而后会有一段比拟有意思的代码,那就是如果没有读写权限会默认从新创立一个长期应用的topicConfigTable设置到Topic当中,之后是判断Broker此时是否须要执行发送心跳包。

然而咱们回到上一级调用this.registerBrokerAll(true, false, true);这里的参数传递就会发现,实际上forceRegister总是为true,也就是说基本上每个Broker第一次初始化必然须要传递心跳包的:

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {  
    // 创立 TopicConfigSerializeWrapper,topic包装类  
    TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();  
    // 如果没有读写权限,此时会默认从新创立一个长期应用的topicConfigTable,作为Topic包装类的参数数值  
    // 集体认为这一步是避免空参数导致前面的办法出现异常,同时如果后续具备读写权限之后不须要从新创立间接应用  
    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())  
        || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {  
        // 这里初始化的值能够应用默认的Topic配置数量,比方加上topicConfigWrapper.getTopicConfigTable().values().size()  
        ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>(topicConfigWrapper.getTopicConfigTable().values().size());  
        for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {  
            TopicConfig tmp =  
                new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),  
                    this.brokerConfig.getBrokerPermission());  
            topicConfigTable.put(topicConfig.getTopicName(), tmp);  
        }  
        topicConfigWrapper.setTopicConfigTable(topicConfigTable);  
    }  
    // 判断Broker是否须要发送心跳包
    if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),  
        this.getBrokerAddr(),  
        this.brokerConfig.getBrokerName(),  
        this.brokerConfig.getBrokerId(),  
        this.brokerConfig.getRegisterBrokerTimeoutMills())) {  
        // 执行发送心跳包
        doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);  
    }  
}

上面咱们接着定位到needRegister办法进行解读,这里咱们次要定位到org.apache.rocketmq.broker.out.BrokerOuterAPI#needRegister办法,这里截取要害代码如下:

brokerOuterExecutor.execute(() -> {  
    try {  
        QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();  
        requestHeader.setBrokerAddr(brokerAddr);  
        requestHeader.setBrokerId(brokerId);  
        requestHeader.setBrokerName(brokerName);  
        requestHeader.setClusterName(clusterName);  
        RemotingCommand request = 
        RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);  
        request.setBody(topicConfigWrapper.getDataVersion().encode());  
        // 同步近程调用到路由核心
        RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);  
        DataVersion nameServerDataVersion = null;  
        Boolean changed = false;  
        // 省略代码:依据返回代码进行判断解决
        //..
        log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);  
    } catch (Exception e) {  
        changedList.add(Boolean.TRUE);  
        log.error("Query data version from name server {}  Exception, {}", namesrvAddr, e);  
    } finally {  
        countDownLatch.countDown();  
    }  
});

这个代码不难理解,算是咱们平时写HTTP调用的一个变体,咱们能够通过RequestCode.QUERY_DATA_VERSION,查到NameServer的承受解决代码。

利用IDEA咱们很快发现org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest办法入口,之后进入到org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#queryBrokerTopicConfig办法,而后这里看到对应代码如下:

public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx,  
    RemotingCommand request) throws RemotingCommandException {  
    final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);  
    final QueryDataVersionResponseHeader responseHeader = (QueryDataVersionResponseHeader) response.readCustomHeader();  
    final QueryDataVersionRequestHeader requestHeader =  
        (QueryDataVersionRequestHeader) request.decodeCommandCustomHeader(QueryDataVersionRequestHeader.class);  
    DataVersion dataVersion = DataVersion.decode(request.getBody(), DataVersion.class);  

    // 外部解决:如果dataVersion为空或者以后dataVersion不等于brokerLiveTable存储的brokerLiveTable,Broker就须要发送心跳包
    Boolean changed = this.namesrvController.getRouteInfoManager().isBrokerTopicConfigChanged(requestHeader.getBrokerAddr(), dataVersion);  
    
    if (!changed) {  
        // 更新Broker信息
                this.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getBrokerAddr(), System.currentTimeMillis());  
    }  
  
    DataVersion nameSeverDataVersion = this.namesrvController.getRouteInfoManager().queryBrokerTopicConfig(requestHeader.getBrokerAddr());  
    response.setCode(ResponseCode.SUCCESS);  
    response.setRemark(null);  
  
    if (nameSeverDataVersion != null) {  
        response.setBody(nameSeverDataVersion.encode());  
    }  
    responseHeader.setChanged(changed);  
    return response;  
}

咱们进入到要害判断代码org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#isBrokerTopicConfigChanged

public boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {  
    DataVersion prev = queryBrokerTopicConfig(brokerAddr);  
    // 如果dataVersion为空或者以后dataVersion不等于brokerLiveTable存储的brokerLiveTable,Broker就须要发送心跳包
    return null == prev || !prev.equals(dataVersion);  
}  
  
public DataVersion queryBrokerTopicConfig(final String brokerAddr) {  
    BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);  
    if (prev != null) {  
        return prev.getDataVersion();  
    }  
    return null;  
}

Broker是否须要发送心跳包由该Broker在路由核心org.apache.rocketmq.namesrv.routeinfo.BrokerLiveInfo#dataVersion决定,如果dataVersion为空或者以后dataVersion不等于brokerLiveTable存储的brokerLiveTable,Broker就须要发送心跳包。

Nameserver解决心跳包

Nameserver的netty服务监听收到心跳包之后,会调用到路由核心以下办法进行解决,具体的办法入口为:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker

public RegisterBrokerResult registerBroker(  
        final String clusterName,  
        final String brokerAddr,  
        final String brokerName,  
        final long brokerId,  
        final String haServerAddr,  
        final TopicConfigSerializeWrapper topicConfigWrapper,  
        final List<String> filterServerList,  
        final Channel channel) {  
    RegisterBrokerResult result = new RegisterBrokerResult();  
    try {  
        try {  
            this.lock.writeLock().lockInterruptibly();  
  
            // 获取集群下所有的Broker,并将以后Broker退出clusterAddrTable,因为brokerNames是Set构造,并不会反复  
            Set<String> brokerNames = this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());  
            brokerNames.add(brokerName);  
  
            boolean registerFirst = false;  
  
            // 获取Broker信息,如果是首次注册,那么新建一个BrokerData并退出brokerAddrTable  
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);  
            if (null == brokerData) {  
                registerFirst = true;  
                brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());  
                this.brokerAddrTable.put(brokerName, brokerData);  
            }  
            Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();  
            //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>  
            //The same IP:PORT must only have one record in brokerAddrTable            Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();  
            //从库切换主库:首先删除namesrv中的<1, IP:PORT>,而后增加<0, IP:PORT>。  
            //同一个IP:端口在brokerAddrTable中只能有一条记录。
            while (it.hasNext()) {  
                Entry<Long, String> item = it.next();  
                if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {  
                    log.debug("remove entry {} from brokerData", item);  
                    it.remove();  
                }  
            }  
            // 里判断Broker是否是曾经注册过  
            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);  
  
            registerFirst = registerFirst || (null == oldAddr);  
            // 如果是Broker是Master节点吗,并且Topic信息更新或者是首次注册,那么创立更新topic队列信息  
            if (null != topicConfigWrapper  
                    && MixAll.MASTER_ID == brokerId) {  
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())  
                        || registerFirst) {  
                    ConcurrentMap<String, TopicConfig> tcTable =  
                            topicConfigWrapper.getTopicConfigTable();  
                    if (tcTable != null) {  
                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {  
                            this.createAndUpdateQueueData(brokerName, entry.getValue());  
                        }  
                    }  
                }  
            }  
            // 更新BrokerLiveInfo状态信息  
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,  
                    new BrokerLiveInfo(  
                            System.currentTimeMillis(),  
                            topicConfigWrapper.getDataVersion(),  
                            channel,  
                            haServerAddr));  
  
            if (filterServerList != null) {  
                if (filterServerList.isEmpty()) {  
                    this.filterServerTable.remove(brokerAddr);  
                } else {  
                    this.filterServerTable.put(brokerAddr, filterServerList);  
                }  
            }  
            // 如果不是MASTER_ID,则返回后果返回masterAddr。  
            if (MixAll.MASTER_ID != brokerId) {  
                String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);  
                if (masterAddr != null) {  
                    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);  
                    if (brokerLiveInfo != null) {  
                        result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());  
                        result.setMasterAddr(masterAddr);  
                    }  
                }  
            }  
        } finally {  
            this.lock.writeLock().unlock();  
        }  
    } catch (Exception e) {  
        log.error("registerBroker Exception", e);  
    }  
  
    return result;  
}

下面的代码是Broker心跳包的最外围办法,它次要做了上面几件事:

  • RouteInfoManager路由信息的更新操作

    • clusterAddrTable 更新;
    • brokerAddrTable 更新;
    • topicQueueTable 更新;
    • brokerLiveTable 更新;

定期排除Broker

依据实践学习咱们晓得,NameServ在启动的时候会创立一个定时工作,定时剔除不沉闷的Broker。这一点的源码在org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker中能够找到答案。

此外在单元测试中就有对于这一项定期清理的测试,也是比拟快的找到入口的方法:org.apache.rocketmq.namesrv.routeinfo.RouteInfoManagerBrokerRegisterTest#testScanNotActiveBroker

这个测试非常简单直观咱们:

private static RouteInfoManager routeInfoManager;  
public static String clusterName = "cluster";  
public static String brokerPrefix = "broker";  
public static String topicPrefix = "topic";  
public static int brokerPerName = 3;  
public static int brokerNameNumber = 3;
@Test  
public void testScanNotActiveBroker() {  
    for (int j = 0; j < brokerNameNumber; j++) {  
        String brokerName = getBrokerName(brokerPrefix, j);  
  
        for (int i = 0; i < brokerPerName; i++) {  
            String brokerAddr = getBrokerAddr(clusterName, brokerName, i);  
  
            // set not active  
            routeInfoManager.updateBrokerInfoUpdateTimestamp(brokerAddr, 0);  
  
            assertEquals(1, routeInfoManager.scanNotActiveBroker());  
        }  
    }  
  
}

在启动单元测试之前会先构建10个Broker节点注册进去,这里单元测试仔细的应用了多个集群模仿生产环境:

private static RouteInfoManager routeInfoManager;  
public static String clusterName = "cluster";  
public static String brokerPrefix = "broker";  
public static String topicPrefix = "topic";  
public static int brokerPerName = 3;  
public static int brokerNameNumber = 3;
@Before  
public void setup() {  
    routeInfoManager = new RouteInfoManager();  
    cluster = registerCluster(routeInfoManager,  
            clusterName,  
            brokerPrefix,  
            brokerNameNumber,  
            brokerPerName,  
            topicPrefix,  
            10);  
}

之后咱们间接跑一边单元测试,在日志中单元测试为咱们展现了具体的测试流程:

  1. 首先是构建broker注册,外部会塞入一些测试数据的Topic进行填充。
06:54:23.353 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-0:0 HAServer: cluster-broker-0:0
06:54:23.353 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-0:1 HAServer: cluster-broker-0:1
06:54:23.353 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-0:2 HAServer: cluster-broker-0:2
06:54:23.353 [main] INFO RocketmqNamesrv - cluster [cluster] brokerName [broker-1] master address change from null to cluster-broker-1:0
06:54:23.353 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-1:0 HAServer: cluster-broker-1:0
06:54:23.355 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-1:1 HAServer: cluster-broker-1:1
06:54:23.355 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-1:2 HAServer: cluster-broker-1:2
06:54:23.355 [main] INFO RocketmqNamesrv - cluster [cluster] brokerName [broker-2] master address change from null to cluster-broker-2:0
06:54:23.355 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-2:0 HAServer: cluster-broker-2:0
06:54:23.355 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-2:1 HAServer: cluster-broker-2:1
06:54:23.355 [main] INFO RocketmqNamesrv - new broker registered, cluster-broker-2:2 HAServer: cluster-broker-2:2
  1. 接着便是依据单元测试的代码进行遍历排除Broker节点,在循环的最初调用扫描查看不沉闷Broker。这里为了验证间接设置lastUpdateTimestamp(最初更新工夫)让Broker存活验证周期提前结束验证扫描成果。
06:55:34.483 [main] INFO RocketmqRemoting - closeChannel: close the connection to remote address[embedded] result: true
06:55:34.483 [main] WARN RocketmqNamesrv - The broker channel expired, cluster-broker-1:0 120000ms
06:55:34.483 [main] INFO RocketmqNamesrv - remove brokerAddr[0, cluster-broker-1:0] from brokerAddrTable, because channel destroyed
06:55:34.483 [main] INFO RocketmqRemoting - closeChannel: close the connection to remote address[embedded] result: true
06:55:34.483 [main] WARN RocketmqNamesrv - The broker channel expired, cluster-broker-1:1 120000ms
06:55:34.483 [main] INFO RocketmqNamesrv - remove brokerAddr[1, cluster-broker-1:1] from brokerAddrTable, because channel destroyed
06:55:34.483 [main] INFO RocketmqRemoting - closeChannel: close the connection to remote address[embedded] result: true
06:55:34.483 [main] WARN RocketmqNamesrv - The broker channel expired, cluster-broker-1:2 120000ms
06:55:34.484 [main] INFO RocketmqNamesrv - remove brokerAddr[2, cluster-broker-1:2] from brokerAddrTable, because channel destroyed
06:55:34.484 [main] INFO RocketmqNamesrv - remove brokerName[broker-1] from brokerAddrTable, because channel destroyed
06:55:34.484 [main] INFO RocketmqNamesrv - remove brokerName[broker-1], clusterName[cluster] from clusterAddrTable, because channel destroyed

以上便是单元测试的大抵内容,咱们接着看看具体的代码即可,这里还是用了迭代器模式进行遍历删除,又是一个经典的设计模式:

public int scanNotActiveBroker() {  
    int removeCount = 0;  
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();  
    while (it.hasNext()) {  
        Entry<String, BrokerLiveInfo> next = it.next();  
        long last = next.getValue().getLastUpdateTimestamp();  
        // BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2 = 120秒,在单元测试中这里的last被设置为0所以必然超时
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {  
            RemotingUtil.closeChannel(next.getValue().getChannel());  
            it.remove();  
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);  
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());  
  
            removeCount++;  
        }  
    }  
  
    return removeCount;  
}

剔除Broker信息的逻辑比较简单,首先从BrokerLiveInfo获取状态信息,判断Broker的心跳工夫是否已超过限定值(默认120秒),若超过之后就执行剔除操作。

写在最初

剖析完了rocketmq自带的路由核心源码,其实咱们本人实现一个路由核心貌似也不难。NameServ小而美的设计十分取巧,当然仅仅几百行代码的确还是存在比拟多的不完满之处,很多计划须要开发人员本人编写业务代码兜底,然而有因为设计简略负责的工作,应用并且业务代码扩展性很强,保护成本低并且性能不错。

NameServ作为整个RocketMq的外围用法上简略的同时非常适合作为Rocketmq的切入点,集体在浏览代码中也会尝试批改代码查看成果,本人参加到源码编写和革新过程这会对代码编写者的思路更为清晰了解,也算是一个源码浏览的小技巧吧。

参考资料

  • RocketMQ源码剖析之路由核心 (objcoding.com)

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理