关于rocketmq:RocketMQ源码NameServer架构设计及启动流程

32次阅读

共计 12936 个字符,预计需要花费 33 分钟才能阅读完成。

本文咱们来剖析 NameServer 相干代码,在正式剖析源码前,咱们先来回顾下 NameServer 的性能:

NameServer是一个非常简单的 Topic 路由注册核心,其角色相似 Dubbo 中的 zookeeper,反对Broker 的动静注册与发现。次要包含两个性能:

  • Broker治理,NameServer承受 Broker 集群的注册信息并且保留下来作为路由信息的根本数据。而后提供心跳检测机制,查看 Broker 是否还存活;
  • 路由信息管理,每个 NameServer 将保留对于 Broker 集群的整个路由信息和用于客户端查问的队列信息。而后 ProducerConumser通过 NameServer 就能够晓得整个 Broker 集群的路由信息,从而进行音讯的投递和生产。

1. 架构设计

Broker 启动的时候会向所有的 NameServer 注册,生产者在发送音讯时会先从 NameServer 中获取 Broker 音讯服务器的地址列表,依据负载平衡算法选取一台 Broker 音讯服务器发送音讯。NameServer 与每台 Broker 之间放弃着长连贯,并且每隔 10 秒会查看 Broker 是否存活,如果检测到 Broker 超过 120 秒未发送心跳,则从路由注册表中将该 Broker 移除。

然而路由的变动 不会马上告诉音讯生产 者,这是为了 升高 NameServe 的复杂性 ,所以在 RocketMQ 中须要音讯的 发送端提供容错机制来保障音讯发送的高可用性,这在后续对于 RocketMQ 音讯发送的章节会介绍。

2. 启动流程源码剖析

2.1 主办法:NamesrvStartup#main

NameServer位于 RocketMq 我的项目的 namesrv 模块下,主类是org.apache.rocketmq.namesrv.NamesrvStartup,代码如下:

public class NamesrvStartup {

    ...

    public static void main(String[] args) {main0(args);
    }

    public static NamesrvController main0(String[] args) {
        try {
            // 创立 controller
            NamesrvController controller = createNamesrvController(args);
            // 启动
            start(controller);
            String tip = "The Name Server boot success. serializeType=" 
                    + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

    ...
}
复制代码

能够看到,main()办法里的代码还是相当简略的,次要蕴含了两个办法:

  • createNamesrvController(...):创立 controller
  • start(...):启动nameServer

接下来咱们就来剖析这两个办法了。

2.2 创立controllerNamesrvStartup#createNamesrvController

public static NamesrvController createNamesrvController(String[] args) 
        throws IOException, JoranException {
    // 省略解析命令行代码
    ...

    // nameServer 的相干配置
    final NamesrvConfig namesrvConfig = new NamesrvConfig();
    //  nettyServer 的相干配置
    final NettyServerConfig nettyServerConfig = new NettyServerConfig();
    // 端口写死了。。。nettyServerConfig.setListenPort(9876);
    if (commandLine.hasOption('c')) {
        // 解决配置文件
        String file = commandLine.getOptionValue('c');
        if (file != null) {
            // 读取配置文件,并将其加载到 properties 中
            InputStream in = new BufferedInputStream(new FileInputStream(file));
            properties = new Properties();
            properties.load(in);
            // 将 properties 里的属性赋值到 namesrvConfig 与 nettyServerConfig
            MixAll.properties2Object(properties, namesrvConfig);
            MixAll.properties2Object(properties, nettyServerConfig);

            namesrvConfig.setConfigStorePath(file);

            System.out.printf("load config properties file OK, %s%n", file);
            in.close();}
    }

    // 解决 -p 参数,该参数用于打印 nameServer、nettyServer 配置,省略
    ...

    // 将 commandLine 的所有配置设置到 namesrvConfig 中
    MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
    // 查看环境变量:ROCKETMQ_HOME
    if (null == namesrvConfig.getRocketmqHome()) {
        // 如果不设置 ROCKETMQ_HOME,就会在这里报错
        System.out.printf("Please set the %s variable in your environment to match 
                the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
        System.exit(-2);
    }

    // 省略日志配置
    ...

    // 创立一个 controller
    final NamesrvController controller = 
            new NamesrvController(namesrvConfig, nettyServerConfig);

    // 将以后 properties 合并到我的项目的配置中,并且以后 properties 会笼罩我的项目中的配置
    controller.getConfiguration().registerConfig(properties);

    return controller;
}
复制代码

这个办法有点长,不过所做的事就两件:

  1. 解决配置
  2. 创立 NamesrvController 实例

2.2.1 解决配置

咱们先简略地看下配置的解决。在咱们启动我的项目中,能够应用 -c /xxx/xxx.conf 指定配置文件的地位,而后在 createNamesrvController(...) 办法中,通过如下代码

InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
复制代码

将配置文件的内容加载到 properties 对象中,而后调用 MixAll.properties2Object(properties, namesrvConfig) 办法将 properties 的属性赋值给 namesrvConfig`MixAll.properties2Object(...) 代码如下:

public static void properties2Object(final Properties p, final Object object) {Method[] methods = object.getClass().getMethods();
    for (Method method : methods) {String mn = method.getName();
        if (mn.startsWith("set")) {
            try {String tmp = mn.substring(4);
                String first = mn.substring(3, 4);
                // 首字母小写
                String key = first.toLowerCase() + tmp;
                // 从 Properties 中获取对应的值
                String property = p.getProperty(key);
                if (property != null) {
                    // 获取值,并进行相应的类型转换
                    Class<?>[] pt = method.getParameterTypes();
                    if (pt != null && pt.length > 0) {String cn = pt[0].getSimpleName();
                        Object arg = null;
                        // 转换成 int
                        if (cn.equals("int") || cn.equals("Integer")) {arg = Integer.parseInt(property);
                        // 其余类型如 long,double,float,boolean 都是这样转换的,这里就省略了    
                        } else if (...) {...} else {continue;}
                        // 反射调用
                        method.invoke(object, arg);
                    }
                }
            } catch (Throwable ignored) {}}
    }
}
复制代码

这个办法非常简单:

  1. 先获取到 object 中的所有 setXxx(...) 办法
  2. 失去 setXxx(...) 中的Xxx
  3. 首字母小写失去xxx
  4. properties 获取 xxx 属性对应的值,并依据 setXxx(...) 办法的参数类型进行转换
  5. 反射调用 setXxx(...) 办法进行赋值

这里之后,namesrvConfignettyServerConfig 就赋值胜利了。

2.2.2 创立 NamesrvController 实例

咱们再来看看 createNamesrvController(...) 办法的第二个重要性能:创立 NamesrvController 实例.

创立 NamesrvController 实例的代码如下:

final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

咱们间接进入 NamesrvController 的构造方法:

/**
 * 构造方法,一系列的赋值操作
 */
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
    this.namesrvConfig = namesrvConfig;
    this.nettyServerConfig = nettyServerConfig;
    this.kvConfigManager = new KVConfigManager(this);
    this.routeInfoManager = new RouteInfoManager();
    this.brokerHousekeepingService = new BrokerHousekeepingService(this);
    this.configuration = new Configuration(log, this.namesrvConfig, this.nettyServerConfig);
    this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}

构造方法里只是一系列的赋值操作,没做什么实质性的工作,就先不论了。

2.3 启动nameServerNamesrvStartup#start

让咱们回到一开始的 NamesrvStartup#main0 办法,

public static NamesrvController main0(String[] args) {

    try {NamesrvController controller = createNamesrvController(args);
        start(controller);
        ...
    } catch (Throwable e) {e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

接下来咱们来看看 start(controller) 办法中做了什么,进入 NamesrvStartup#start 办法:

public static NamesrvController start(final NamesrvController controller) throws Exception {if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");
    }
    // 初始化
    boolean initResult = controller.initialize();
    if (!initResult) {controller.shutdown();
        System.exit(-3);
    }
    // 敞开钩子,能够在敞开前进行一些操作
    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
        @Override
        public Void call() throws Exception {controller.shutdown();
            return null;
        }
    }));
    // 启动
    controller.start();

    return controller;
}

start(...)办法的逻辑也非常简洁,次要蕴含 3 个操作:

  1. 初始化,想必是做一些启动前的操作
  2. 增加敞开钩子,所谓的敞开钩子,能够了解为一个线程,能够用来监听 jvm 的敞开事件,在 jvm 真正敞开前,能够进行一些解决操作,这里的敞开前的解决操作就是 controller.shutdown() 办法所做的事了,所做的事也很容易想到,无非就是敞开线程池、敞开曾经关上的资源等,这里咱们就不深究了
  3. 启动操作,这应该就是真正启动 nameServer 服务了

接下来咱们次要来摸索初始化与启动操作流程。

2.3.1 初始化:NamesrvController#initialize

初始化的解决办法是NamesrvController#initialize,代码如下:

public boolean initialize() {
    // 加载 kv 配置
    this.kvConfigManager.load();
    // 创立 netty 近程服务
    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, 
            this.brokerHousekeepingService);
    // netty 近程服务线程
    this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), 
            new ThreadFactoryImpl("RemotingExecutorThread_"));
    // 注册,就是把 remotingExecutor 注册到 remotingServer
    this.registerProcessor();

    // 开启定时工作,每隔 10s 扫描一次 broker,移除不沉闷的 broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);

    // 省略打印 kv 配置的定时工作
    ...

    // Tls 平安传输,咱们不关注
    if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {...}

    return true;
}

这个办法所做的事很明了,代码中都曾经正文了,代码看着多,理论干的就两件事:

  1. 解决 netty 相干:创立近程服务与工作线程
  2. 开启定时工作:移除不沉闷的 broker

什么是 NettyRemotingServer 呢?在本文开篇介绍 NamerServer 的性能时,提到 NameServer 是一个简略的注册核心,这个 NettyRemotingServer 就是对外开放的入口,用来接管 broker 的注册音讯的,当然还会解决一些其余音讯,咱们前面会剖析到。

1. 创立NettyRemotingServer

咱们先来看看 NettyRemotingServer 的创立过程:

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
        final ChannelEventListener channelEventListener) {super(nettyServerConfig.getServerOnewaySemaphoreValue(), 
            nettyServerConfig.getServerAsyncSemaphoreValue());
    this.serverBootstrap = new ServerBootstrap();
    this.nettyServerConfig = nettyServerConfig;
    this.channelEventListener = channelEventListener;

    int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
    if (publicThreadNums <= 0) {publicThreadNums = 4;}

    // 创立 publicExecutor
    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "NettyServerPublicExecutor_" 
                    + this.threadIndex.incrementAndGet());
        }
    });

    // 判断是否应用 epoll
    if (useEpoll()) {
        // boss
        this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyEPOLLBoss_%d", 
                    this.threadIndex.incrementAndGet()));
            }
        });
        // worker
        this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);
            private int threadTotal = nettyServerConfig.getServerSelectorThreads();

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", 
                    threadTotal, this.threadIndex.incrementAndGet()));
            }
        });
    } else {
        // 这里也是创立了两个线程
        ...
    }
    // 加载 ssl 上下文
    loadSslContext();}

整个办法下来,其实就是做了一些赋值操作,咱们挑重点讲:

  1. serverBootstrap:相熟 netty 的小伙伴应该对这个很相熟了,这个就是 netty 服务端的启动类
  2. publicExecutor:这里创立了一个名为 publicExecutor 线程池,临时并不知道这个线程有啥作用,先混个脸熟吧
  3. eventLoopGroupBosseventLoopGroupSelector 线程组:相熟 netty 的小伙伴应该对这两个线程很相熟了,这就是 netty 用来解决连贯事件与读写事件的线程了,eventLoopGroupBoss对应的是 netty 的 boss 线程组,eventLoopGroupSelector对应的是 worker 线程组

到这里,netty 服务的筹备工作本实现了。

2. 创立 netty 服务线程池

让咱们再回到 NamesrvController#initialize 办法,NettyRemotingServer创立实现后,接着就是 netty 近程服务线程池了:

this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), 
    new ThreadFactoryImpl("RemotingExecutorThread_"));

创立实现线程池后,接着就是注册了,也就是 registerProcessor 办法所做的工作:

this.registerProcessor();

registerProcessor() 中,会把以后的 NamesrvController 注册到 remotingServer中:

private void registerProcessor() {if (namesrvConfig.isClusterTest()) {
        this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
            this.remotingExecutor);
    } else {
        // 注册操作
        this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
    }
}

最终注册到为 NettyRemotingServerdefaultRequestProcessor属性:

@Override
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
    this.defaultRequestProcessor 
            = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
}

好了,到这里 NettyRemotingServer 相干的配置就筹备实现了,这个过程中一共筹备了 4 个线程池:

  1. publicExecutor:临时不晓得做啥的,前面遇到了再剖析
  2. eventLoopGroupBoss:解决 netty 连贯事件的线程组
  3. eventLoopGroupSelector:解决 netty 读写事件的线程池
  4. remotingExecutor:临时不晓得做啥的,前面遇到了再剖析
3. 创立定时工作

筹备完 netty 相干配置后,接着代码中启动了一个定时工作:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();
    }
}, 5, 10, TimeUnit.SECONDS);

这个定时工作位于 NamesrvController#initialize 办法中,每 10s 执行一次,工作内容由 RouteInfoManager#scanNotActiveBroker 提供,它所做的次要工作是监听 broker 的上报信息,及时移除不沉闷的broker,对于源码的具体分析,咱们前面再详细分析。

2.3.2 启动:NamesrvController#start

剖析完 NamesrvController 的初始化流程后,让咱们回到 NamesrvStartup#start 办法:

public static NamesrvController start(final NamesrvController controller) throws Exception {

    ...

    // 启动
    controller.start();

    return controller;
}

接下来,咱们来看看 NamesrvController 的启动流程:

public void start() throws Exception {
    // 启动 nettyServer
    this.remotingServer.start();
    // 监听 tls 配置文件的变动,不关注
    if (this.fileWatchService != null) {this.fileWatchService.start();
    }
}

这个办法次要调用了NettyRemotingServer#start,咱们跟进去:

public void start() {
    ...

    ServerBootstrap childHandler =
        // 在 NettyRemotingServer#init 中筹备的两个线程组
        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
            .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)

            // 省略 option(...)与 childOption(...)办法的配置
            ...
            // 绑定 ip 与端口
            .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {ch.pipeline()
                        .addLast(defaultEventExecutorGroup, 
                            HANDSHAKE_HANDLER_NAME, handshakeHandler)
                        .addLast(defaultEventExecutorGroup,
                            encoder,
                            new NettyDecoder(),
                            new IdleStateHandler(0, 0, 
                                nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                            connectionManageHandler,
                            serverHandler
                        );
                }
            });

    if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    }

    try {ChannelFuture sync = this.serverBootstrap.bind().sync();
        InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
        this.port = addr.getPort();} catch (InterruptedException e1) {throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
    }

    ...
}

这个办法中,次要解决了 NettyRemotingServer 的启动,对于其余一些操作并非咱们关注的重点,就先疏忽了。

能够看到,这个办法里就是解决了一个 netty 的启动流程,对于 netty 的相干操作,非本文重点,这里就不多作阐明了。这里须要指出的是,在 netty 中,如果 Channel 是呈现了 连贯 / 读 / 写 等事件,这些事件会通过 Pipeline 上的 ChannelHandler 上进行流转,NettyRemotingServer增加的 ChannelHandler 如下:

ch.pipeline()
    .addLast(defaultEventExecutorGroup, 
        HANDSHAKE_HANDLER_NAME, handshakeHandler)
    .addLast(defaultEventExecutorGroup,
        encoder,
        new NettyDecoder(),
        new IdleStateHandler(0, 0, 
            nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
        connectionManageHandler,
        serverHandler
    );

这些 ChannelHandler 只有分为几类:

  1. handshakeHandler:解决握手操作,用来判断 tls 的开启状态
  2. encoder/NettyDecoder:解决报文的编解码操作
  3. IdleStateHandler:解决心跳
  4. connectionManageHandler:解决连贯申请
  5. serverHandler:解决读写申请

这里咱们重点关注的是 serverHandler,这个ChannelHandler 就是用来解决 broker 注册音讯、producer/consumer获取 topic 音讯的,这也是咱们接下来要剖析的重点。

执行完 NamesrvController#startNameServer 就能够对外提供连贯服务了。

3. 总结

本文次要剖析了 NameServer 的启动流程,整个启动流程分为 3 步:

  1. 创立 controller:这一步次要是解析nameServer 的配置并实现赋值操作
  2. 初始化 controller:次要创立了NettyRemotingServer 对象、netty服务线程池、定时工作
  3. 启动controller:就是启动netty 服务

正文完
 0