本文咱们来剖析NameServer
相干代码,在正式剖析源码前,咱们先来回顾下NameServer
的性能:
NameServer
是一个非常简单的Topic
路由注册核心,其角色相似Dubbo
中的zookeeper
,反对Broker
的动静注册与发现。次要包含两个性能:
Broker
治理,NameServer
承受Broker
集群的注册信息并且保留下来作为路由信息的根本数据。而后提供心跳检测机制,查看Broker
是否还存活;- 路由信息管理,每个
NameServer
将保留对于Broker
集群的整个路由信息和用于客户端查问的队列信息。而后Producer
和Conumser
通过NameServer
就能够晓得整个Broker
集群的路由信息,从而进行音讯的投递和生产。
1. 架构设计
Broker启动的时候会向所有的NameServer
注册,生产者在发送音讯时会先从NameServer中获取Broker音讯服务器的地址列表,依据负载平衡算法选取一台Broker音讯服务器发送音讯。NameServer与每台Broker之间放弃着长连贯,并且每隔10秒会查看Broker是否存活,如果检测到Broker超过120秒未发送心跳,则从路由注册表中将该Broker移除。
然而路由的变动不会马上告诉音讯生产
者,这是为了升高NameServe的复杂性
,所以在RocketMQ中须要音讯的发送端提供容错机制来保障音讯发送的高可用性
,这在后续对于RocketMQ音讯发送的章节会介绍。
2. 启动流程源码剖析
2.1 主办法:NamesrvStartup#main
NameServer
位于RocketMq
我的项目的namesrv
模块下,主类是org.apache.rocketmq.namesrv.NamesrvStartup
,代码如下:
public class NamesrvStartup {
...
public static void main(String[] args) {
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
// 创立 controller
NamesrvController controller = createNamesrvController(args);
// 启动
start(controller);
String tip = "The Name Server boot success. serializeType="
+ RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
...
}
复制代码
能够看到,main()
办法里的代码还是相当简略的,次要蕴含了两个办法:
createNamesrvController(...)
:创立controller
start(...)
:启动nameServer
接下来咱们就来剖析这两个办法了。
2.2 创立controller
:NamesrvStartup#createNamesrvController
public static NamesrvController createNamesrvController(String[] args)
throws IOException, JoranException {
// 省略解析命令行代码
...
// nameServer的相干配置
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// nettyServer的相干配置
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// 端口写死了。。。
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
// 解决配置文件
String file = commandLine.getOptionValue('c');
if (file != null) {
// 读取配置文件,并将其加载到 properties 中
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
// 将 properties 里的属性赋值到 namesrvConfig 与 nettyServerConfig
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
// 解决 -p 参数,该参数用于打印nameServer、nettyServer配置,省略
...
// 将 commandLine 的所有配置设置到 namesrvConfig 中
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
// 查看环境变量:ROCKETMQ_HOME
if (null == namesrvConfig.getRocketmqHome()) {
// 如果不设置 ROCKETMQ_HOME,就会在这里报错
System.out.printf("Please set the %s variable in your environment to match
the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// 省略日志配置
...
// 创立一个controller
final NamesrvController controller =
new NamesrvController(namesrvConfig, nettyServerConfig);
// 将以后 properties 合并到我的项目的配置中,并且以后 properties 会笼罩我的项目中的配置
controller.getConfiguration().registerConfig(properties);
return controller;
}
复制代码
这个办法有点长,不过所做的事就两件:
- 解决配置
- 创立
NamesrvController
实例
2.2.1 解决配置
咱们先简略地看下配置的解决。在咱们启动我的项目中,能够应用-c /xxx/xxx.conf
指定配置文件的地位,而后在createNamesrvController(...)
办法中,通过如下代码
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
复制代码
将配置文件的内容加载到properties
对象中,而后调用MixAll.properties2Object(properties, namesrvConfig)
办法将properties
的属性赋值给namesrvConfig
,`MixAll.properties2Object(...)
代码如下:
public static void properties2Object(final Properties p, final Object object) {
Method[] methods = object.getClass().getMethods();
for (Method method : methods) {
String mn = method.getName();
if (mn.startsWith("set")) {
try {
String tmp = mn.substring(4);
String first = mn.substring(3, 4);
// 首字母小写
String key = first.toLowerCase() + tmp;
// 从Properties中获取对应的值
String property = p.getProperty(key);
if (property != null) {
// 获取值,并进行相应的类型转换
Class<?>[] pt = method.getParameterTypes();
if (pt != null && pt.length > 0) {
String cn = pt[0].getSimpleName();
Object arg = null;
// 转换成int
if (cn.equals("int") || cn.equals("Integer")) {
arg = Integer.parseInt(property);
// 其余类型如long,double,float,boolean都是这样转换的,这里就省略了
} else if (...) {
...
} else {
continue;
}
// 反射调用
method.invoke(object, arg);
}
}
} catch (Throwable ignored) {
}
}
}
}
复制代码
这个办法非常简单:
- 先获取到
object
中的所有setXxx(...)
办法 - 失去
setXxx(...)
中的Xxx
- 首字母小写失去
xxx
- 从
properties
获取xxx
属性对应的值,并依据setXxx(...)
办法的参数类型进行转换 - 反射调用
setXxx(...)
办法进行赋值
这里之后,namesrvConfig
与nettyServerConfig
就赋值胜利了。
2.2.2 创立NamesrvController
实例
咱们再来看看createNamesrvController(...)
办法的第二个重要性能:创立NamesrvController
实例.
创立NamesrvController
实例的代码如下:
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
咱们间接进入NamesrvController
的构造方法:
/**
* 构造方法,一系列的赋值操作
*/
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
this.routeInfoManager = new RouteInfoManager();
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(log, this.namesrvConfig, this.nettyServerConfig);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
构造方法里只是一系列的赋值操作,没做什么实质性的工作,就先不论了。
2.3 启动nameServer
:NamesrvStartup#start
让咱们回到一开始的NamesrvStartup#main0
办法,
public static NamesrvController main0(String[] args) {
try {
NamesrvController controller = createNamesrvController(args);
start(controller);
...
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
接下来咱们来看看start(controller)
办法中做了什么,进入NamesrvStartup#start
办法:
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 初始化
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 敞开钩子,能够在敞开前进行一些操作
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
// 启动
controller.start();
return controller;
}
start(...)
办法的逻辑也非常简洁,次要蕴含3个操作:
- 初始化,想必是做一些启动前的操作
- 增加敞开钩子,所谓的敞开钩子,能够了解为一个线程,能够用来监听jvm的敞开事件,在jvm真正敞开前,能够进行一些解决操作,这里的敞开前的解决操作就是
controller.shutdown()
办法所做的事了,所做的事也很容易想到,无非就是敞开线程池、敞开曾经关上的资源等,这里咱们就不深究了 - 启动操作,这应该就是真正启动
nameServer
服务了
接下来咱们次要来摸索初始化与启动操作流程。
2.3.1 初始化:NamesrvController#initialize
初始化的解决办法是NamesrvController#initialize
,代码如下:
public boolean initialize() {
// 加载 kv 配置
this.kvConfigManager.load();
// 创立 netty 近程服务
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig,
this.brokerHousekeepingService);
// netty 近程服务线程
this.remotingExecutor = Executors.newFixedThreadPool(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注册,就是把 remotingExecutor 注册到 remotingServer
this.registerProcessor();
// 开启定时工作,每隔10s扫描一次broker,移除不沉闷的broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 省略打印kv配置的定时工作
...
// Tls平安传输,咱们不关注
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
...
}
return true;
}
这个办法所做的事很明了,代码中都曾经正文了,代码看着多,理论干的就两件事:
- 解决netty相干:创立近程服务与工作线程
- 开启定时工作:移除不沉闷的broker
什么是NettyRemotingServer
呢?在本文开篇介绍NamerServer
的性能时,提到NameServer
是一个简略的注册核心,这个NettyRemotingServer
就是对外开放的入口,用来接管broker
的注册音讯的,当然还会解决一些其余音讯,咱们前面会剖析到。
1. 创立NettyRemotingServer
咱们先来看看NettyRemotingServer
的创立过程:
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(),
nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
// 创立 publicExecutor
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_"
+ this.threadIndex.incrementAndGet());
}
});
// 判断是否应用 epoll
if (useEpoll()) {
// boss
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyEPOLLBoss_%d",
this.threadIndex.incrementAndGet()));
}
});
// worker
this.eventLoopGroupSelector = new EpollEventLoopGroup(
nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d",
threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
// 这里也是创立了两个线程
...
}
// 加载ssl上下文
loadSslContext();
}
整个办法下来,其实就是做了一些赋值操作,咱们挑重点讲:
serverBootstrap
:相熟netty的小伙伴应该对这个很相熟了,这个就是netty服务端的启动类publicExecutor
:这里创立了一个名为publicExecutor
线程池,临时并不知道这个线程有啥作用,先混个脸熟吧eventLoopGroupBoss
与eventLoopGroupSelector
线程组:相熟netty的小伙伴应该对这两个线程很相熟了,这就是netty用来解决连贯事件与读写事件的线程了,eventLoopGroupBoss
对应的是netty的boss
线程组,eventLoopGroupSelector
对应的是worker
线程组
到这里,netty服务的筹备工作本实现了。
2. 创立netty服务线程池
让咱们再回到NamesrvController#initialize
办法,NettyRemotingServer
创立实现后,接着就是netty近程服务线程池了:
this.remotingExecutor = Executors.newFixedThreadPool(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactoryImpl("RemotingExecutorThread_"));
创立实现线程池后,接着就是注册了,也就是registerProcessor
办法所做的工作:
this.registerProcessor();
在registerProcessor()
中 ,会把以后的 NamesrvController
注册到 remotingServer
中:
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {
this.remotingServer.registerDefaultProcessor(
new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.remotingExecutor);
} else {
// 注册操作
this.remotingServer.registerDefaultProcessor(
new DefaultRequestProcessor(this), this.remotingExecutor);
}
}
最终注册到为NettyRemotingServer
的defaultRequestProcessor
属性:
@Override
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
this.defaultRequestProcessor
= new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
}
好了,到这里NettyRemotingServer
相干的配置就筹备实现了,这个过程中一共筹备了4个线程池:
publicExecutor
:临时不晓得做啥的,前面遇到了再剖析eventLoopGroupBoss
:解决netty连贯事件的线程组eventLoopGroupSelector
:解决netty读写事件的线程池remotingExecutor
:临时不晓得做啥的,前面遇到了再剖析
3. 创立定时工作
筹备完netty相干配置后,接着代码中启动了一个定时工作:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
这个定时工作位于NamesrvController#initialize
办法中,每10s执行一次,工作内容由RouteInfoManager#scanNotActiveBroker
提供,它所做的次要工作是监听broker
的上报信息,及时移除不沉闷的broker
,对于源码的具体分析,咱们前面再详细分析。
2.3.2 启动:NamesrvController#start
剖析完NamesrvController
的初始化流程后,让咱们回到NamesrvStartup#start
办法:
public static NamesrvController start(final NamesrvController controller) throws Exception {
...
// 启动
controller.start();
return controller;
}
接下来,咱们来看看NamesrvController
的启动流程:
public void start() throws Exception {
// 启动nettyServer
this.remotingServer.start();
// 监听tls配置文件的变动,不关注
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
这个办法次要调用了NettyRemotingServer#start
,咱们跟进去:
public void start() {
...
ServerBootstrap childHandler =
// 在 NettyRemotingServer#init 中筹备的两个线程组
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
// 省略 option(...)与childOption(...)办法的配置
...
// 绑定ip与端口
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup,
HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0,
nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
...
}
这个办法中,次要解决了NettyRemotingServer
的启动,对于其余一些操作并非咱们关注的重点,就先疏忽了。
能够看到,这个办法里就是解决了一个netty
的启动流程,对于netty
的相干操作,非本文重点,这里就不多作阐明了。这里须要指出的是,在netty中,如果Channel
是呈现了连贯/读/写
等事件,这些事件会通过Pipeline
上的ChannelHandler
上进行流转,NettyRemotingServer
增加的ChannelHandler
如下:
ch.pipeline()
.addLast(defaultEventExecutorGroup,
HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0,
nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
这些ChannelHandler
只有分为几类:
handshakeHandler
:解决握手操作,用来判断tls的开启状态encoder
/NettyDecoder
:解决报文的编解码操作IdleStateHandler
:解决心跳connectionManageHandler
:解决连贯申请serverHandler
:解决读写申请
这里咱们重点关注的是serverHandler
,这个ChannelHandler
就是用来解决broker
注册音讯、producer
/consumer
获取topic音讯的,这也是咱们接下来要剖析的重点。
执行完NamesrvController#start
,NameServer
就能够对外提供连贯服务了。
3. 总结
本文次要剖析了NameServer
的启动流程,整个启动流程分为3步:
- 创立
controller
:这一步次要是解析nameServer
的配置并实现赋值操作 - 初始化
controller
:次要创立了NettyRemotingServer
对象、netty
服务线程池、定时工作 - 启动
controller
:就是启动netty
服务
发表回复