一,RocketMQ中Remoting通信机制
RocketMQ音讯队列集群次要包含NameServe、Broker(Master/Slave)、Producer、Consumer4个角色,根本通信流程如下:
- Broker启动后将本人注册至NameServer的操作;随后每隔30s工夫定期向NameServer上报Topic路由信息;
- 音讯生产者Producer在发送音讯时须要依据Msg的Topic从本地缓存的TopicPublishInfoTable获取路由信息(如果没有会从NameServer上从新拉取);
- Producer依据(2)中获取的路由信息抉择一个队列(MessageQueue)进行音讯发送;Broker作为音讯的接收者收音讯并落盘存储。
- 音讯消费者Consumer依据2)中获取的路由信息,并再实现客户端的负载平衡后,抉择其中的某一个或者某几个音讯队列来拉取音讯并进行生产。
二,RocketMQ中Remoting通信模块API
- RemotingService:为顶层接口。次要办法有:
void start(); void shutdown(); void registerRPCHook(RPCHook rpcHook);
RemotingServer/RemotingClient:近程服务器/客户端根底接口,两者中的办法根本相似:
/** * requestCode 命令编码 * processor RocketMQ 申请业务处理器,例如音讯发送的处理器为 SendMessageProcessor,PullMessageProcessor 为音讯拉取的业务处理器。 * executor 线程池,NettyRequestProcessor 具体业务逻辑在该线程池中执行 */ void registerProcessor(final int requestCode, final NettyRequestProcessor processor, final ExecutorService executor); void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor); int localListenPort(); //依据申请编码获取对应的申请业务处理器与线程池 Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode); RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException; void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
RomotingClient次要是Producer发送音讯与Consumer拉取音讯时用到;RomotingServer次要是Broker进行回调,获取Consumer状态等的时候用到。
这里重点须要关注下registerProcessor
注册命令处理器这个办法。RocketMQ 会依照业务逻辑进行拆分,例如音讯发送、音讯拉取等每一个网络操作会定义一个申请编码(requestCode),而后每一个类型对应一个业务处理器 NettyRequestProcessor,并能够依照不同的 requestCode 定义不同的线程池,实现不同申请的线程池隔离。- NettyRemotingAbstract: Netty 近程服务形象实现类,定义网络近程调用、申请,响应等解决逻辑。重要的属性有:
Semaphore semaphoreOneway:管制 oneway 发送形式的并发度的信号量,默认为 65535 个许可。Semaphore semaphoreAsync:管制异步发送形式的并发度的信号量,默认为 65535 个许可。ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable:以后正在等待对端返回的申请处理表,其中 opaque 示意申请的编号,全局惟一,通常采纳原子递增,客户端向对端发送网络申请时,通常会采取繁多长连贯,故发送申请后会向调用端立刻返回 ResponseFuture,同时会将申请放入到该映射表中,当响应解决实现后(响应会蕴含申请 code),而后从该映射表中获取对应的 ResponseFutre,而后告诉调用端的返回后果,这里是Future 模式在网络编程中的经典使用。HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable:注册的申请解决命令。RocketMQ 的设计中采纳了不同申请命令反对不同的线程池,即实现业务线程池的隔离。
- NettyRemotingClient:基于 Netty 网络编程客户端,实现 RemotingClient 接口并继承 NettyRemotingAbstract。
重要属性如下:
NettyClientConfig nettyClientConfig:与网络相干的配置项。Bootstrap bootstrap:Netty 客户端启动帮忙类。EventLoopGroup eventLoopGroupWorker:Netty 客户端 Work 线程组,俗称 IO 线程。ConcurrentMap<String /* addr */, ChannelWrapper> channelTables:以后客户端已创立的连贯(网络通道、Netty Cannel),每一个地址一条长连贯。ExecutorService publicExecutor:默认工作线程池。ExecutorService callbackExecutor:回掉类申请执行线程池。DefaultEventExecutorGroup defaultEventExecutorGroup:Netty ChannelHandler 线程执行组,即 Netty ChannelHandler 在这些线程中执行。
- NettyRemotingServer:基于 Netty 网络编程服务端。
其外围属性如下所示:
ServerBootstrap serverBootstrap:Netty Server 端启动帮忙类。EventLoopGroup eventLoopGroupSelector:Netty Server Work 线程组,即主从多 Reactor 中的从 Reactor,次要负责读写事件的解决。EventLoopGroup eventLoopGroupBoss:Netty Boss 线程组,即主从 Reactor 线程模型中的主 Reactor,次要负责 OP_ACCEPT 事件(创立连贯)。NettyServerConfig nettyServerConfig:Netty 服务端配置。Timer timer = new Timer("ServerHouseKeepingService", true):定时扫描器,对 NettyRemotingAbstract 中的 responseTable 进行扫描,将超时的申请移除。DefaultEventExecutorGroup defaultEventExecutorGroup:Netty ChannelHandler 线程执行组。int port:服务端绑定端口。NettyEncoder encoder:RocketMQ 通信协议(编码器)。NettyDecoder decoder:RocketMQ 通信协议(解码器)。NettyConnectManageHandler connectionManageHandler:Netty 连贯管路器 Handler,次要实现对连贯的状态跟踪。NettyServerHandler serverHandler:NettyServer 端外围业务处理器。
这里再依据类之间的调用关系再论述一下音讯的发送与生产,再看一张图:
1) . NettyRemotingClient 会在须要连贯到指定地址先通过 Netty 相干 API 创立 Channel,并进行缓存,下一次申请如果还是发送到该地址时可反复利用。
2) . 而后调用 NettyRemotingClient 的 invokeAsync 等办法进行网络发送,在发送时在 Netty 中会进行一个十分重要的步骤:对申请编码,次要是将须要发送的申请,例如 RemotingCommand,将该对象依照特定的格局(协定)转换成二进制流。
3) . NettyRemotingServer 端接管到二进制后,网络读申请就绪,进行读申请事件处理流程。首先须要从二进制流中辨认一个残缺的申请包,这就是所谓的解码,行将二进制流转换为申请对象,解码成 RemotingCommand,而后读事件会流传到 NettyServerHandler,最终执行 NettyRemotingAbstract 的 processRequestCommand,次要是依据 requestCode 获取指定的命令执行线程池与 NettyRequestProcessor,并执行对应的逻辑,而后通过网络将执行后果返回给客户端。
4) . 客户端收到服务端的响应后,读事件触发,执行解码(NettyDecoder),而后读事件会流传到 NettyClientHandler,并解决响应后果。
三,RocketMQ中Remoting通信模块具体实现
- 客户端的创立。在 RocketMQ 中客户端的实现类:NettyRemotingClient。其创立外围代码被封装在 start 办法中。
public void start() { //创立默认事件执行线程组,后续事件处理器即(ChannelPipeline 中 addLast 中事件处理器)在该线程组中执行 this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyClientConfig.getClientWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet()); } }); //创立Netty客户端,group指定Work线程组,读写事件都会在这个线程组里执行(也就是IO线程);channel指定通道类型,这里应用NIO通道 Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true)//是否禁用 Nagle,如果设置为 true 示意立刻发送,如果设置为 false,如果一个数据包比拟小,会尝试期待更多的包在一起发送 .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())//连贯超时工夫,超时未连贯胜利抛出异样 .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())//与上面一起,套接字发送缓存区与套接字接管缓存区大小,64kb .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) .handler(new ChannelInitializer<SocketChannel>() {//通过handle构建事件处理链条 @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (nettyClientConfig.isUseTLS()) { if (null != sslContext) { pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc())); log.info("Prepend SSL handler"); } else { log.warn("Connections are insecure as SSLContext is null!"); } } /**addLast里要是没有传入EventExecutorGroup,那事件的执行默认在 Work 线程组*/ pipeline.addLast(//Netty 的外围扩大点,应用程序的业务逻辑能够通过该事件处理器进行扩大 defaultEventExecutorGroup, new NettyEncoder(),//RocketMQ 申请编码器,即协定编码器 new NettyDecoder(),//RocketMQ 申请解码器,即协定解码器 new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),//闲暇检测 new NettyConnectManageHandler(),//连贯管理器 new NettyClientHandler());//Netty 客户端业务处理器,进行业务逻辑的解决 } }); //删除过期申请的定时工作 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingClient.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } }
具体步骤:
1),创立 DefaultEventExecutorGroup,默认事件执行线程组
2),调用Bootstrap的group办法指定一个Work线程组,默认状况下读写事件在该线程组中执行,也就是IO线程;同时通过channel办法指定通道类型,这里采纳NIO
3),通过Bootstrap的option办法指定网络参考
4),最初,通过 Bootstrap 的 hanle 办法构建事件处理链条
- 建设连贯
下面第1步只是创立了客户端,并没有建设连贯。在发送音讯的时候才会去建设连贯,相干代码如下:
if (createNewConnection) { ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr)); log.info("createChannel: begin to connect remote host[{}] asynchronously", addr); cw = new ChannelWrapper(channelFuture); this.channelTables.put(addr, cw); }
连贯建设后会进行缓存不便后续复用。
- 音讯的发送 这里以同步发送为例
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); //有响应后进行回调,这就是异步异步 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null);//计数器递加,进行唤醒操作 log.warn("send a request command to channel <" + addr + "> failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);//调用计数器await期待操作
1),首先封装一个ResponseFuture,而后与申请号一起存入CurrentHashMap里。当客户端收到服务端的响应后,须要依据 opaque 查找到对应的 ResponseFuture,从而唤醒客户端
2),接着通过Netty调用Channel的writeAndFlush办法进行申请的发送,外部会应用编码器 NettyEncoder 将 RemotingCommand request 编码
3),当响应返回时会回调客户端,并唤醒客户端
- 服务端的创立 这里的服务端指的是Broker
1),创立Boss与Worker两个线程组.Boss线程组是主从Reactor里的主Reactor,用来监听连贯,Worker线程组是主从Reactor里的从Reactor用来解决读写事件。
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet())); } }); this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } });
2),创立默认事件执行线程组
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyServerConfig.getServerWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet()); } });
3),应用 Netty ServerBootstrap 服务端启动类构建服务端
ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)//指定boss与worker两个线程组 .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)//指定通道类型 .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize()) .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize()) .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))//绑定到服务端指定的IP/PORT .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler(TlsSystemConfig.tlsMode)) .addLast(defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyServerHandler() ); } });
4),调用 ServerBootstrap 的 bind 办法绑定到指定端口
ChannelFuture sync = this.serverBootstrap.bind().sync();
四,通信协议
在Client和Server之间实现一次音讯发送时,须要对发送的音讯进行一个协定约定。协定内容次要能够分为以下4局部:
(1) 音讯长度:总长度,四个字节存储,占用一个int类型;
(2) 序列化类型&音讯头长度:同样占用一个int类型,第一个字节示意序列化类型,前面三个字节示意音讯头长度;
(3) 音讯头数据:通过序列化后的音讯头数据;
(4) 音讯主体数据:音讯主体的二进制字节数据内容;
待补充内容:
netty的二次编码
线程隔离