一,RocketMQ 中 Remoting 通信机制
RocketMQ 音讯队列集群次要包含 NameServe、Broker(Master/Slave)、Producer、Consumer4 个角色,根本通信流程如下:
- Broker 启动后将本人注册至 NameServer 的操作;随后每隔 30s 工夫定期向 NameServer 上报 Topic 路由信息;
- 音讯生产者 Producer 在发送音讯时须要依据 Msg 的 Topic 从本地缓存的 TopicPublishInfoTable 获取路由信息(如果没有会从 NameServer 上从新拉取);
- Producer 依据(2)中获取的路由信息抉择一个队列(MessageQueue)进行音讯发送;Broker 作为音讯的接收者收音讯并落盘存储。
- 音讯消费者 Consumer 依据 2)中获取的路由信息,并再实现客户端的负载平衡后,抉择其中的某一个或者某几个音讯队列来拉取音讯并进行生产。
二,RocketMQ 中 Remoting 通信模块 API
- RemotingService:为顶层接口。次要办法有:
void start();
void shutdown();
void registerRPCHook(RPCHook rpcHook);
-
RemotingServer/RemotingClient: 近程服务器 / 客户端根底接口,两者中的办法根本相似:
/** * requestCode 命令编码 * processor RocketMQ 申请业务处理器,例如音讯发送的处理器为 SendMessageProcessor,PullMessageProcessor 为音讯拉取的业务处理器。* executor 线程池,NettyRequestProcessor 具体业务逻辑在该线程池中执行 */ void registerProcessor(final int requestCode, final NettyRequestProcessor processor, final ExecutorService executor); void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor); int localListenPort(); // 依据申请编码获取对应的申请业务处理器与线程池 Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode); RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException; void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
RomotingClient 次要是 Producer 发送音讯与 Consumer 拉取音讯时用到;RomotingServer 次要是 Broker 进行回调,获取 Consumer 状态等的时候用到。
这里重点须要关注下registerProcessor
注册命令处理器这个办法。RocketMQ 会依照业务逻辑进行拆分,例如音讯发送、音讯拉取等每一个网络操作会定义一个申请编码(requestCode),而后每一个类型对应一个业务处理器 NettyRequestProcessor,并能够依照不同的 requestCode 定义不同的线程池,实现不同申请的线程池隔离。 - NettyRemotingAbstract:Netty 近程服务形象实现类,定义网络近程调用、申请,响应等解决逻辑。重要的属性有:
Semaphore semaphoreOneway:管制 oneway 发送形式的并发度的信号量,默认为 65535 个许可。Semaphore semaphoreAsync:管制异步发送形式的并发度的信号量,默认为 65535 个许可。ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable:以后正在等
待对端返回的申请处理表,其中 opaque 示意申请的编号,全局惟一,通常采纳原子递增,客户端向
对端发送网络申请时,通常会采取繁多长连贯,故发送申请后会向调用端立刻返回
ResponseFuture,同时会将申请放入到该映射表中,当响应解决实现后(响应会蕴含申请
code),而后从该映射表中获取对应的 ResponseFutre,而后告诉调用端的返回后果,这里是
Future 模式在网络编程中的经典使用。HashMap<Integer/* request code */, Pair<NettyRequestProcessor,
ExecutorService>> processorTable:注册的申请解决命令。RocketMQ 的设计中采纳了不同
申请命令反对不同的线程池,即实现业务线程池的隔离。
- NettyRemotingClient:基于 Netty 网络编程客户端,实现 RemotingClient 接口并继承 NettyRemotingAbstract。
重要属性如下:
NettyClientConfig nettyClientConfig:与网络相干的配置项。Bootstrap bootstrap:Netty 客户端启动帮忙类。EventLoopGroup eventLoopGroupWorker:Netty 客户端 Work 线程组,俗称 IO 线程。ConcurrentMap<String /* addr */, ChannelWrapper> channelTables:以后客户端已创立的连贯(网络通道、Netty Cannel),每一个地址一条长连贯。ExecutorService publicExecutor:默认工作线程池。ExecutorService callbackExecutor:回掉类申请执行线程池。DefaultEventExecutorGroup defaultEventExecutorGroup:Netty ChannelHandler 线程执行组,即 Netty ChannelHandler 在这些线程中执行。
- NettyRemotingServer: 基于 Netty 网络编程服务端。
其外围属性如下所示:
ServerBootstrap serverBootstrap:Netty Server 端启动帮忙类。EventLoopGroup eventLoopGroupSelector:Netty Server Work 线程组,即主从多 Reactor 中的从 Reactor,次要负责读写事件的解决。EventLoopGroup eventLoopGroupBoss:Netty Boss 线程组, 即主从 Reactor 线程模型中的主 Reactor,次要负责 OP_ACCEPT 事件(创立连贯)。NettyServerConfig nettyServerConfig:Netty 服务端配置。Timer timer = new Timer("ServerHouseKeepingService", true):定时扫描器,对 NettyRemotingAbstract 中的 responseTable 进行扫描,将超时的申请移除。DefaultEventExecutorGroup defaultEventExecutorGroup:Netty ChannelHandler 线程执行组。int port:服务端绑定端口。NettyEncoder encoder:RocketMQ 通信协议(编码器)。NettyDecoder decoder:RocketMQ 通信协议(解码器)。NettyConnectManageHandler connectionManageHandler:Netty 连贯管路器 Handler,次要实现对连贯的状态跟踪。NettyServerHandler serverHandler:NettyServer 端外围业务处理器。
这里再依据类之间的调用关系再论述一下音讯的发送与生产,再看一张图:
1) . NettyRemotingClient 会在须要连贯到指定地址先通过 Netty 相干 API 创立 Channel,并进行缓存,下一次申请如果还是发送到该地址时可反复利用。
2) . 而后调用 NettyRemotingClient 的 invokeAsync 等办法进行网络发送,在发送时在 Netty 中会进行一个十分重要的步骤:对申请编码,次要是将须要发送的申请,例如 RemotingCommand,将该对象依照 特定的格局(协定)转换成二进制流。
3) . NettyRemotingServer 端接管到二进制后,网络读申请就绪,进行读申请事件处理流程。首先须要从二进制流中辨认一个残缺的申请包,这就是所谓的解码,行将二进制流转换为申请对象,解码成 RemotingCommand,而后读事件会流传到 NettyServerHandler,最终执行 NettyRemotingAbstract 的 processRequestCommand,次要是依据 requestCode 获取指定的命令执行线程池与 NettyRequestProcessor,并执行对应的逻辑,而后通过网络将执行后果返回给客户端。
4) . 客户端收到服务端的响应后,读事件触发,执行解码(NettyDecoder),而后读事件会流传到 NettyClientHandler,并解决响应后果。
三,RocketMQ 中 Remoting 通信模块具体实现
- 客户端的创立。在 RocketMQ 中客户端的实现类:NettyRemotingClient。其创立外围代码被封装在 start 办法中。
public void start() {
// 创立默认事件执行线程组,后续事件处理器即(ChannelPipeline 中 addLast 中事件处理器)在该线程组中执行
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});
// 创立 Netty 客户端,group 指定 Work 线程组,读写事件都会在这个线程组里执行(也就是 IO 线程);channel 指定通道类型,这里应用 NIO 通道
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)// 是否禁用 Nagle,如果设置为 true 示意立刻发送,如果设置为 false,如果一个数据包比拟小,会尝试期待更多的包在一起发送
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())// 连贯超时工夫,超时未连贯胜利抛出异样
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())// 与上面一起,套接字发送缓存区与套接字接管缓存区大小,64kb
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() {// 通过 handle 构建事件处理链条
@Override
public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {if (null != sslContext) {pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {log.warn("Connections are insecure as SSLContext is null!");
}
}
/**addLast 里要是没有传入 EventExecutorGroup,那事件的执行默认在 Work 线程组 */
pipeline.addLast(//Netty 的外围扩大点,应用程序的业务逻辑能够通过该事件处理器进行扩大
defaultEventExecutorGroup,
new NettyEncoder(),//RocketMQ 申请编码器,即协定编码器
new NettyDecoder(),//RocketMQ 申请解码器,即协定解码器
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),// 闲暇检测
new NettyConnectManageHandler(),// 连贯管理器
new NettyClientHandler());//Netty 客户端业务处理器,进行业务逻辑的解决
}
});
// 删除过期申请的定时工作
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
if (this.channelEventListener != null) {this.nettyEventExecutor.start();
}
}
具体步骤:
1),创立 DefaultEventExecutorGroup,默认事件执行线程组
2),调用 Bootstrap 的 group 办法指定一个 Work 线程组,默认状况下读写事件在该线程组中执行,也就是 IO 线程;同时通过 channel 办法指定通道类型,这里采纳 NIO
3),通过 Bootstrap 的 option 办法指定网络参考
4),最初,通过 Bootstrap 的 hanle 办法构建事件处理链条
- 建设连贯
下面第 1 步只是创立了客户端,并没有建设连贯。在发送音讯的时候才会去建设连贯,相干代码如下:
if (createNewConnection) {ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
cw = new ChannelWrapper(channelFuture);
this.channelTables.put(addr, cw);
}
连贯建设后会进行缓存不便后续复用。
- 音讯的发送 这里以同步发送为例
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
// 有响应后进行回调,这就是异步异步
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {responseFuture.setSendRequestOK(true);
return;
} else {responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);// 计数器递加,进行唤醒操作
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);// 调用计数器 await 期待操作
1),首先封装一个 ResponseFuture,而后与申请号一起存入 CurrentHashMap 里。当客户端收到服务端的响应后,须要依据 opaque 查找到对应的 ResponseFuture,从而唤醒客户端
2),接着通过 Netty 调用 Channel 的 writeAndFlush 办法进行申请的发送,外部会应用编码器 NettyEncoder 将 RemotingCommand request 编码
3),当响应返回时会回调客户端,并唤醒客户端
- 服务端的创立 这里的服务端指的是 Broker
1),创立 Boss 与 Worker 两个线程组.Boss 线程组是主从 Reactor 里的主 Reactor,用来监听连贯,Worker 线程组是主从 Reactor 里的从 Reactor 用来解决读写事件。
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
2),创立默认事件执行线程组
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
3),应用 Netty ServerBootstrap 服务端启动类构建服务端
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)// 指定 boss 与 worker 两个线程组
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)// 指定通道类型
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))// 绑定到服务端指定的 IP/PORT
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler());
}
});
4),调用 ServerBootstrap 的 bind 办法绑定到指定端口
ChannelFuture sync = this.serverBootstrap.bind().sync();
四,通信协议
在 Client 和 Server 之间实现一次音讯发送时,须要对发送的音讯进行一个协定约定。协定内容次要能够分为以下 4 局部:
(1) 音讯长度:总长度,四个字节存储,占用一个 int 类型;
(2) 序列化类型 & 音讯头长度:同样占用一个 int 类型,第一个字节示意序列化类型,前面三个字节示意音讯头长度;
(3) 音讯头数据:通过序列化后的音讯头数据;
(4) 音讯主体数据:音讯主体的二进制字节数据内容;
待补充内容:
netty 的二次编码
线程隔离