本文咱们来剖析NameServer相干代码,在正式剖析源码前,咱们先来回顾下NameServer的性能:

NameServer是一个非常简单的Topic路由注册核心,其角色相似Dubbo中的zookeeper,反对Broker的动静注册与发现。次要包含两个性能:

  • Broker治理,NameServer承受Broker集群的注册信息并且保留下来作为路由信息的根本数据。而后提供心跳检测机制,查看Broker是否还存活;
  • 路由信息管理,每个NameServer将保留对于Broker集群的整个路由信息和用于客户端查问的队列信息。而后ProducerConumser通过NameServer就能够晓得整个Broker集群的路由信息,从而进行音讯的投递和生产。

1. 架构设计

Broker启动的时候会向所有的NameServer注册,生产者在发送音讯时会先从NameServer中获取Broker音讯服务器的地址列表,依据负载平衡算法选取一台Broker音讯服务器发送音讯。NameServer与每台Broker之间放弃着长连贯,并且每隔10秒会查看Broker是否存活,如果检测到Broker超过120秒未发送心跳,则从路由注册表中将该Broker移除。

然而路由的变动不会马上告诉音讯生产者,这是为了升高NameServe的复杂性,所以在RocketMQ中须要音讯的发送端提供容错机制来保障音讯发送的高可用性,这在后续对于RocketMQ音讯发送的章节会介绍。

2. 启动流程源码剖析

2.1 主办法:NamesrvStartup#main

NameServer位于RocketMq我的项目的namesrv模块下,主类是org.apache.rocketmq.namesrv.NamesrvStartup,代码如下:

public class NamesrvStartup {    ...    public static void main(String[] args) {        main0(args);    }    public static NamesrvController main0(String[] args) {        try {            // 创立 controller            NamesrvController controller = createNamesrvController(args);            // 启动            start(controller);            String tip = "The Name Server boot success. serializeType="                     + RemotingCommand.getSerializeTypeConfigInThisServer();            log.info(tip);            System.out.printf("%s%n", tip);            return controller;        } catch (Throwable e) {            e.printStackTrace();            System.exit(-1);        }        return null;    }    ...}复制代码

能够看到,main()办法里的代码还是相当简略的,次要蕴含了两个办法:

  • createNamesrvController(...):创立 controller
  • start(...):启动nameServer

接下来咱们就来剖析这两个办法了。

2.2 创立controllerNamesrvStartup#createNamesrvController

public static NamesrvController createNamesrvController(String[] args)         throws IOException, JoranException {    // 省略解析命令行代码    ...    // nameServer的相干配置    final NamesrvConfig namesrvConfig = new NamesrvConfig();    //  nettyServer的相干配置    final NettyServerConfig nettyServerConfig = new NettyServerConfig();    // 端口写死了。。。    nettyServerConfig.setListenPort(9876);    if (commandLine.hasOption('c')) {        // 解决配置文件        String file = commandLine.getOptionValue('c');        if (file != null) {            // 读取配置文件,并将其加载到 properties 中            InputStream in = new BufferedInputStream(new FileInputStream(file));            properties = new Properties();            properties.load(in);            // 将 properties 里的属性赋值到 namesrvConfig 与 nettyServerConfig            MixAll.properties2Object(properties, namesrvConfig);            MixAll.properties2Object(properties, nettyServerConfig);            namesrvConfig.setConfigStorePath(file);            System.out.printf("load config properties file OK, %s%n", file);            in.close();        }    }    // 解决 -p 参数,该参数用于打印nameServer、nettyServer配置,省略    ...    // 将 commandLine 的所有配置设置到 namesrvConfig 中    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);    // 查看环境变量:ROCKETMQ_HOME    if (null == namesrvConfig.getRocketmqHome()) {        // 如果不设置 ROCKETMQ_HOME,就会在这里报错        System.out.printf("Please set the %s variable in your environment to match                 the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);        System.exit(-2);    }    // 省略日志配置    ...    // 创立一个controller    final NamesrvController controller =             new NamesrvController(namesrvConfig, nettyServerConfig);    // 将以后 properties 合并到我的项目的配置中,并且以后 properties 会笼罩我的项目中的配置    controller.getConfiguration().registerConfig(properties);    return controller;}复制代码

这个办法有点长,不过所做的事就两件:

  1. 解决配置
  2. 创立NamesrvController实例

2.2.1 解决配置

咱们先简略地看下配置的解决。在咱们启动我的项目中,能够应用-c /xxx/xxx.conf指定配置文件的地位,而后在createNamesrvController(...)办法中,通过如下代码

InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);复制代码

将配置文件的内容加载到properties对象中,而后调用MixAll.properties2Object(properties, namesrvConfig)办法将properties的属性赋值给namesrvConfig`MixAll.properties2Object(...)代码如下:

public static void properties2Object(final Properties p, final Object object) {    Method[] methods = object.getClass().getMethods();    for (Method method : methods) {        String mn = method.getName();        if (mn.startsWith("set")) {            try {                String tmp = mn.substring(4);                String first = mn.substring(3, 4);                // 首字母小写                String key = first.toLowerCase() + tmp;                // 从Properties中获取对应的值                String property = p.getProperty(key);                if (property != null) {                    // 获取值,并进行相应的类型转换                    Class<?>[] pt = method.getParameterTypes();                    if (pt != null && pt.length > 0) {                        String cn = pt[0].getSimpleName();                        Object arg = null;                        // 转换成int                        if (cn.equals("int") || cn.equals("Integer")) {                            arg = Integer.parseInt(property);                        // 其余类型如long,double,float,boolean都是这样转换的,这里就省略了                            } else if (...) {                            ...                        } else {                            continue;                        }                        // 反射调用                        method.invoke(object, arg);                    }                }            } catch (Throwable ignored) {            }        }    }}复制代码

这个办法非常简单:

  1. 先获取到object中的所有setXxx(...)办法
  2. 失去setXxx(...)中的Xxx
  3. 首字母小写失去xxx
  4. properties获取xxx属性对应的值,并依据setXxx(...)办法的参数类型进行转换
  5. 反射调用setXxx(...)办法进行赋值

这里之后,namesrvConfignettyServerConfig就赋值胜利了。

2.2.2 创立NamesrvController实例

咱们再来看看createNamesrvController(...)办法的第二个重要性能:创立NamesrvController实例.

创立NamesrvController实例的代码如下:

final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

咱们间接进入NamesrvController的构造方法:

/** * 构造方法,一系列的赋值操作 */public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {    this.namesrvConfig = namesrvConfig;    this.nettyServerConfig = nettyServerConfig;    this.kvConfigManager = new KVConfigManager(this);    this.routeInfoManager = new RouteInfoManager();    this.brokerHousekeepingService = new BrokerHousekeepingService(this);    this.configuration = new Configuration(log, this.namesrvConfig, this.nettyServerConfig);    this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");}

构造方法里只是一系列的赋值操作,没做什么实质性的工作,就先不论了。

2.3 启动nameServerNamesrvStartup#start

让咱们回到一开始的NamesrvStartup#main0办法,

public static NamesrvController main0(String[] args) {    try {        NamesrvController controller = createNamesrvController(args);        start(controller);        ...    } catch (Throwable e) {        e.printStackTrace();        System.exit(-1);    }    return null;}

接下来咱们来看看start(controller)办法中做了什么,进入NamesrvStartup#start办法:

public static NamesrvController start(final NamesrvController controller) throws Exception {    if (null == controller) {        throw new IllegalArgumentException("NamesrvController is null");    }    // 初始化    boolean initResult = controller.initialize();    if (!initResult) {        controller.shutdown();        System.exit(-3);    }    // 敞开钩子,能够在敞开前进行一些操作    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {        @Override        public Void call() throws Exception {            controller.shutdown();            return null;        }    }));    // 启动    controller.start();    return controller;}

start(...)办法的逻辑也非常简洁,次要蕴含3个操作:

  1. 初始化,想必是做一些启动前的操作
  2. 增加敞开钩子,所谓的敞开钩子,能够了解为一个线程,能够用来监听jvm的敞开事件,在jvm真正敞开前,能够进行一些解决操作,这里的敞开前的解决操作就是controller.shutdown()办法所做的事了,所做的事也很容易想到,无非就是敞开线程池、敞开曾经关上的资源等,这里咱们就不深究了
  3. 启动操作,这应该就是真正启动nameServer服务了

接下来咱们次要来摸索初始化与启动操作流程。

2.3.1 初始化:NamesrvController#initialize

初始化的解决办法是NamesrvController#initialize,代码如下:

public boolean initialize() {    // 加载 kv 配置    this.kvConfigManager.load();    // 创立 netty 近程服务    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig,             this.brokerHousekeepingService);    // netty 近程服务线程    this.remotingExecutor = Executors.newFixedThreadPool(            nettyServerConfig.getServerWorkerThreads(),             new ThreadFactoryImpl("RemotingExecutorThread_"));    // 注册,就是把 remotingExecutor 注册到 remotingServer    this.registerProcessor();    // 开启定时工作,每隔10s扫描一次broker,移除不沉闷的broker    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {        @Override        public void run() {            NamesrvController.this.routeInfoManager.scanNotActiveBroker();        }    }, 5, 10, TimeUnit.SECONDS);    // 省略打印kv配置的定时工作    ...    // Tls平安传输,咱们不关注    if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {        ...    }    return true;}

这个办法所做的事很明了,代码中都曾经正文了,代码看着多,理论干的就两件事:

  1. 解决netty相干:创立近程服务与工作线程
  2. 开启定时工作:移除不沉闷的broker

什么是NettyRemotingServer呢?在本文开篇介绍NamerServer的性能时,提到NameServer是一个简略的注册核心,这个NettyRemotingServer就是对外开放的入口,用来接管broker的注册音讯的,当然还会解决一些其余音讯,咱们前面会剖析到。

1. 创立NettyRemotingServer

咱们先来看看NettyRemotingServer的创立过程:

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,        final ChannelEventListener channelEventListener) {    super(nettyServerConfig.getServerOnewaySemaphoreValue(),             nettyServerConfig.getServerAsyncSemaphoreValue());    this.serverBootstrap = new ServerBootstrap();    this.nettyServerConfig = nettyServerConfig;    this.channelEventListener = channelEventListener;    int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();    if (publicThreadNums <= 0) {        publicThreadNums = 4;    }    // 创立 publicExecutor    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {        private AtomicInteger threadIndex = new AtomicInteger(0);        @Override        public Thread newThread(Runnable r) {            return new Thread(r, "NettyServerPublicExecutor_"                     + this.threadIndex.incrementAndGet());        }    });    // 判断是否应用 epoll    if (useEpoll()) {        // boss        this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {            private AtomicInteger threadIndex = new AtomicInteger(0);            @Override            public Thread newThread(Runnable r) {                return new Thread(r, String.format("NettyEPOLLBoss_%d",                     this.threadIndex.incrementAndGet()));            }        });        // worker        this.eventLoopGroupSelector = new EpollEventLoopGroup(                nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {            private AtomicInteger threadIndex = new AtomicInteger(0);            private int threadTotal = nettyServerConfig.getServerSelectorThreads();            @Override            public Thread newThread(Runnable r) {                return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d",                     threadTotal, this.threadIndex.incrementAndGet()));            }        });    } else {        // 这里也是创立了两个线程        ...    }    // 加载ssl上下文    loadSslContext();}

整个办法下来,其实就是做了一些赋值操作,咱们挑重点讲:

  1. serverBootstrap:相熟netty的小伙伴应该对这个很相熟了,这个就是netty服务端的启动类
  2. publicExecutor:这里创立了一个名为publicExecutor线程池,临时并不知道这个线程有啥作用,先混个脸熟吧
  3. eventLoopGroupBosseventLoopGroupSelector线程组:相熟netty的小伙伴应该对这两个线程很相熟了,这就是netty用来解决连贯事件与读写事件的线程了,eventLoopGroupBoss对应的是netty的boss线程组,eventLoopGroupSelector对应的是worker线程组

到这里,netty服务的筹备工作本实现了。

2. 创立netty服务线程池

让咱们再回到NamesrvController#initialize办法,NettyRemotingServer创立实现后,接着就是netty近程服务线程池了:

this.remotingExecutor = Executors.newFixedThreadPool(    nettyServerConfig.getServerWorkerThreads(),     new ThreadFactoryImpl("RemotingExecutorThread_"));

创立实现线程池后,接着就是注册了,也就是registerProcessor办法所做的工作:

this.registerProcessor();

registerProcessor()中 ,会把以后的 NamesrvController 注册到 remotingServer中:

private void registerProcessor() {    if (namesrvConfig.isClusterTest()) {        this.remotingServer.registerDefaultProcessor(            new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),            this.remotingExecutor);    } else {        // 注册操作        this.remotingServer.registerDefaultProcessor(            new DefaultRequestProcessor(this), this.remotingExecutor);    }}

最终注册到为NettyRemotingServerdefaultRequestProcessor属性:

@Overridepublic void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {    this.defaultRequestProcessor             = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);}

好了,到这里NettyRemotingServer相干的配置就筹备实现了,这个过程中一共筹备了4个线程池:

  1. publicExecutor:临时不晓得做啥的,前面遇到了再剖析
  2. eventLoopGroupBoss:解决netty连贯事件的线程组
  3. eventLoopGroupSelector:解决netty读写事件的线程池
  4. remotingExecutor:临时不晓得做啥的,前面遇到了再剖析
3. 创立定时工作

筹备完netty相干配置后,接着代码中启动了一个定时工作:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {    @Override    public void run() {        NamesrvController.this.routeInfoManager.scanNotActiveBroker();    }}, 5, 10, TimeUnit.SECONDS);

这个定时工作位于NamesrvController#initialize办法中,每10s执行一次,工作内容由RouteInfoManager#scanNotActiveBroker提供,它所做的次要工作是监听broker的上报信息,及时移除不沉闷的broker,对于源码的具体分析,咱们前面再详细分析。

2.3.2 启动:NamesrvController#start

剖析完NamesrvController的初始化流程后,让咱们回到NamesrvStartup#start办法:

public static NamesrvController start(final NamesrvController controller) throws Exception {    ...    // 启动    controller.start();    return controller;}

接下来,咱们来看看NamesrvController的启动流程:

public void start() throws Exception {    // 启动nettyServer    this.remotingServer.start();    // 监听tls配置文件的变动,不关注    if (this.fileWatchService != null) {        this.fileWatchService.start();    }}

这个办法次要调用了NettyRemotingServer#start,咱们跟进去:

public void start() {    ...    ServerBootstrap childHandler =        // 在 NettyRemotingServer#init 中筹备的两个线程组        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)            .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)            // 省略 option(...)与childOption(...)办法的配置            ...            // 绑定ip与端口            .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))            .childHandler(new ChannelInitializer<SocketChannel>() {                @Override                public void initChannel(SocketChannel ch) throws Exception {                    ch.pipeline()                        .addLast(defaultEventExecutorGroup,                             HANDSHAKE_HANDLER_NAME, handshakeHandler)                        .addLast(defaultEventExecutorGroup,                            encoder,                            new NettyDecoder(),                            new IdleStateHandler(0, 0,                                 nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),                            connectionManageHandler,                            serverHandler                        );                }            });    if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {        childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);    }    try {        ChannelFuture sync = this.serverBootstrap.bind().sync();        InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();        this.port = addr.getPort();    } catch (InterruptedException e1) {        throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);    }    ...}

这个办法中,次要解决了NettyRemotingServer的启动,对于其余一些操作并非咱们关注的重点,就先疏忽了。

能够看到,这个办法里就是解决了一个netty的启动流程,对于netty的相干操作,非本文重点,这里就不多作阐明了。这里须要指出的是,在netty中,如果Channel是呈现了连贯/读/写等事件,这些事件会通过Pipeline上的ChannelHandler上进行流转,NettyRemotingServer增加的ChannelHandler如下:

ch.pipeline()    .addLast(defaultEventExecutorGroup,         HANDSHAKE_HANDLER_NAME, handshakeHandler)    .addLast(defaultEventExecutorGroup,        encoder,        new NettyDecoder(),        new IdleStateHandler(0, 0,             nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),        connectionManageHandler,        serverHandler    );

这些ChannelHandler只有分为几类:

  1. handshakeHandler:解决握手操作,用来判断tls的开启状态
  2. encoder/NettyDecoder:解决报文的编解码操作
  3. IdleStateHandler:解决心跳
  4. connectionManageHandler:解决连贯申请
  5. serverHandler:解决读写申请

这里咱们重点关注的是serverHandler,这个ChannelHandler就是用来解决broker注册音讯、producer/consumer获取topic音讯的,这也是咱们接下来要剖析的重点。

执行完NamesrvController#startNameServer就能够对外提供连贯服务了。

3. 总结

本文次要剖析了NameServer的启动流程,整个启动流程分为3步:

  1. 创立controller:这一步次要是解析nameServer的配置并实现赋值操作
  2. 初始化controller:次要创立了NettyRemotingServer对象、netty服务线程池、定时工作
  3. 启动controller:就是启动netty 服务