一,RocketMQ中Remoting通信机制

RocketMQ音讯队列集群次要包含NameServe、Broker(Master/Slave)、Producer、Consumer4个角色,根本通信流程如下:

  1. Broker启动后将本人注册至NameServer的操作;随后每隔30s工夫定期向NameServer上报Topic路由信息;
  2. 音讯生产者Producer在发送音讯时须要依据Msg的Topic从本地缓存的TopicPublishInfoTable获取路由信息(如果没有会从NameServer上从新拉取);
  3. Producer依据(2)中获取的路由信息抉择一个队列(MessageQueue)进行音讯发送;Broker作为音讯的接收者收音讯并落盘存储。
  4. 音讯消费者Consumer依据2)中获取的路由信息,并再实现客户端的负载平衡后,抉择其中的某一个或者某几个音讯队列来拉取音讯并进行生产。

二,RocketMQ中Remoting通信模块API

  1. RemotingService:为顶层接口。次要办法有:
    void start();    void shutdown();    void registerRPCHook(RPCHook rpcHook);
  1. RemotingServer/RemotingClient:近程服务器/客户端根底接口,两者中的办法根本相似:

     /**  * requestCode 命令编码  * processor RocketMQ 申请业务处理器,例如音讯发送的处理器为 SendMessageProcessor,PullMessageProcessor 为音讯拉取的业务处理器。  * executor 线程池,NettyRequestProcessor 具体业务逻辑在该线程池中执行  */ void registerProcessor(final int requestCode, final NettyRequestProcessor processor,     final ExecutorService executor); void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor); int localListenPort(); //依据申请编码获取对应的申请业务处理器与线程池 Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode); RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,     final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,     RemotingTimeoutException; void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,     final InvokeCallback invokeCallback) throws InterruptedException,     RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException; void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)     throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,     RemotingSendRequestException;

    RomotingClient次要是Producer发送音讯与Consumer拉取音讯时用到;RomotingServer次要是Broker进行回调,获取Consumer状态等的时候用到。
    这里重点须要关注下registerProcessor注册命令处理器这个办法。RocketMQ 会依照业务逻辑进行拆分,例如音讯发送、音讯拉取等每一个网络操作会定义一个申请编码(requestCode),而后每一个类型对应一个业务处理器 NettyRequestProcessor,并能够依照不同的 requestCode 定义不同的线程池,实现不同申请的线程池隔离。

  2. NettyRemotingAbstract: Netty 近程服务形象实现类,定义网络近程调用、申请,响应等解决逻辑。重要的属性有:
Semaphore semaphoreOneway:管制 oneway 发送形式的并发度的信号量,默认为 65535 个许可。Semaphore semaphoreAsync:管制异步发送形式的并发度的信号量,默认为 65535 个许可。ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable:以后正在等待对端返回的申请处理表,其中 opaque 示意申请的编号,全局惟一,通常采纳原子递增,客户端向对端发送网络申请时,通常会采取繁多长连贯,故发送申请后会向调用端立刻返回 ResponseFuture,同时会将申请放入到该映射表中,当响应解决实现后(响应会蕴含申请 code),而后从该映射表中获取对应的 ResponseFutre,而后告诉调用端的返回后果,这里是Future 模式在网络编程中的经典使用。HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable:注册的申请解决命令。RocketMQ 的设计中采纳了不同申请命令反对不同的线程池,即实现业务线程池的隔离。
  1. NettyRemotingClient:基于 Netty 网络编程客户端,实现 RemotingClient 接口并继承 NettyRemotingAbstract。
    重要属性如下:
NettyClientConfig nettyClientConfig:与网络相干的配置项。Bootstrap bootstrap:Netty 客户端启动帮忙类。EventLoopGroup eventLoopGroupWorker:Netty 客户端 Work 线程组,俗称 IO 线程。ConcurrentMap<String /* addr */, ChannelWrapper> channelTables:以后客户端已创立的连贯(网络通道、Netty Cannel),每一个地址一条长连贯。ExecutorService publicExecutor:默认工作线程池。ExecutorService callbackExecutor:回掉类申请执行线程池。DefaultEventExecutorGroup defaultEventExecutorGroup:Netty ChannelHandler 线程执行组,即 Netty ChannelHandler 在这些线程中执行。
  1. NettyRemotingServer:基于 Netty 网络编程服务端。

其外围属性如下所示:

ServerBootstrap serverBootstrap:Netty Server 端启动帮忙类。EventLoopGroup eventLoopGroupSelector:Netty Server Work 线程组,即主从多 Reactor 中的从 Reactor,次要负责读写事件的解决。EventLoopGroup eventLoopGroupBoss:Netty Boss 线程组,即主从 Reactor 线程模型中的主 Reactor,次要负责 OP_ACCEPT 事件(创立连贯)。NettyServerConfig nettyServerConfig:Netty 服务端配置。Timer timer = new Timer("ServerHouseKeepingService", true):定时扫描器,对 NettyRemotingAbstract 中的 responseTable 进行扫描,将超时的申请移除。DefaultEventExecutorGroup defaultEventExecutorGroup:Netty ChannelHandler 线程执行组。int port:服务端绑定端口。NettyEncoder encoder:RocketMQ 通信协议(编码器)。NettyDecoder decoder:RocketMQ 通信协议(解码器)。NettyConnectManageHandler connectionManageHandler:Netty 连贯管路器 Handler,次要实现对连贯的状态跟踪。NettyServerHandler serverHandler:NettyServer 端外围业务处理器。

这里再依据类之间的调用关系再论述一下音讯的发送与生产,再看一张图:

1) . NettyRemotingClient 会在须要连贯到指定地址先通过 Netty 相干 API 创立 Channel,并进行缓存,下一次申请如果还是发送到该地址时可反复利用。
2) . 而后调用 NettyRemotingClient 的 invokeAsync 等办法进行网络发送,在发送时在 Netty 中会进行一个十分重要的步骤:对申请编码,次要是将须要发送的申请,例如 RemotingCommand,将该对象依照特定的格局(协定)转换成二进制流。
3) . NettyRemotingServer 端接管到二进制后,网络读申请就绪,进行读申请事件处理流程。首先须要从二进制流中辨认一个残缺的申请包,这就是所谓的解码,行将二进制流转换为申请对象,解码成 RemotingCommand,而后读事件会流传到 NettyServerHandler,最终执行 NettyRemotingAbstract 的 processRequestCommand,次要是依据 requestCode 获取指定的命令执行线程池与 NettyRequestProcessor,并执行对应的逻辑,而后通过网络将执行后果返回给客户端。
4) . 客户端收到服务端的响应后,读事件触发,执行解码(NettyDecoder),而后读事件会流传到 NettyClientHandler,并解决响应后果。

三,RocketMQ中Remoting通信模块具体实现

  1. 客户端的创立。在 RocketMQ 中客户端的实现类:NettyRemotingClient。其创立外围代码被封装在 start 办法中。
    public void start() {        //创立默认事件执行线程组,后续事件处理器即(ChannelPipeline 中 addLast 中事件处理器)在该线程组中执行        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(            nettyClientConfig.getClientWorkerThreads(),            new ThreadFactory() {                private AtomicInteger threadIndex = new AtomicInteger(0);                @Override                public Thread newThread(Runnable r) {                    return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());                }            });        //创立Netty客户端,group指定Work线程组,读写事件都会在这个线程组里执行(也就是IO线程);channel指定通道类型,这里应用NIO通道        Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)            .option(ChannelOption.TCP_NODELAY, true)//是否禁用 Nagle,如果设置为 true 示意立刻发送,如果设置为 false,如果一个数据包比拟小,会尝试期待更多的包在一起发送            .option(ChannelOption.SO_KEEPALIVE, false)            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())//连贯超时工夫,超时未连贯胜利抛出异样            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())//与上面一起,套接字发送缓存区与套接字接管缓存区大小,64kb            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())            .handler(new ChannelInitializer<SocketChannel>() {//通过handle构建事件处理链条                @Override                public void initChannel(SocketChannel ch) throws Exception {                    ChannelPipeline pipeline = ch.pipeline();                    if (nettyClientConfig.isUseTLS()) {                        if (null != sslContext) {                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));                            log.info("Prepend SSL handler");                        } else {                            log.warn("Connections are insecure as SSLContext is null!");                        }                    }                    /**addLast里要是没有传入EventExecutorGroup,那事件的执行默认在 Work 线程组*/                    pipeline.addLast(//Netty 的外围扩大点,应用程序的业务逻辑能够通过该事件处理器进行扩大                        defaultEventExecutorGroup,                        new NettyEncoder(),//RocketMQ 申请编码器,即协定编码器                        new NettyDecoder(),//RocketMQ 申请解码器,即协定解码器                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),//闲暇检测                        new NettyConnectManageHandler(),//连贯管理器                        new NettyClientHandler());//Netty 客户端业务处理器,进行业务逻辑的解决                }            });        //删除过期申请的定时工作        this.timer.scheduleAtFixedRate(new TimerTask() {            @Override            public void run() {                try {                    NettyRemotingClient.this.scanResponseTable();                } catch (Throwable e) {                    log.error("scanResponseTable exception", e);                }            }        }, 1000 * 3, 1000);        if (this.channelEventListener != null) {            this.nettyEventExecutor.start();        }    }

具体步骤:
1),创立 DefaultEventExecutorGroup,默认事件执行线程组
2),调用Bootstrap的group办法指定一个Work线程组,默认状况下读写事件在该线程组中执行,也就是IO线程;同时通过channel办法指定通道类型,这里采纳NIO
3),通过Bootstrap的option办法指定网络参考
4),最初,通过 Bootstrap 的 hanle 办法构建事件处理链条

  1. 建设连贯
    下面第1步只是创立了客户端,并没有建设连贯。在发送音讯的时候才会去建设连贯,相干代码如下:
    if (createNewConnection) {        ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));        log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);        cw = new ChannelWrapper(channelFuture);        this.channelTables.put(addr, cw);    }

连贯建设后会进行缓存不便后续复用。

  1. 音讯的发送 这里以同步发送为例
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);            this.responseTable.put(opaque, responseFuture);            final SocketAddress addr = channel.remoteAddress();            //有响应后进行回调,这就是异步异步            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {                @Override                public void operationComplete(ChannelFuture f) throws Exception {                    if (f.isSuccess()) {                        responseFuture.setSendRequestOK(true);                        return;                    } else {                        responseFuture.setSendRequestOK(false);                    }                    responseTable.remove(opaque);                    responseFuture.setCause(f.cause());                    responseFuture.putResponse(null);//计数器递加,进行唤醒操作                    log.warn("send a request command to channel <" + addr + "> failed.");                }            });            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);//调用计数器await期待操作

1),首先封装一个ResponseFuture,而后与申请号一起存入CurrentHashMap里。当客户端收到服务端的响应后,须要依据 opaque 查找到对应的 ResponseFuture,从而唤醒客户端
2),接着通过Netty调用Channel的writeAndFlush办法进行申请的发送,外部会应用编码器 NettyEncoder 将 RemotingCommand request 编码
3),当响应返回时会回调客户端,并唤醒客户端

  1. 服务端的创立 这里的服务端指的是Broker

1),创立Boss与Worker两个线程组.Boss线程组是主从Reactor里的主Reactor,用来监听连贯,Worker线程组是主从Reactor里的从Reactor用来解决读写事件。

                this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {                private AtomicInteger threadIndex = new AtomicInteger(0);                @Override                public Thread newThread(Runnable r) {                    return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));                }            });            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {                private AtomicInteger threadIndex = new AtomicInteger(0);                private int threadTotal = nettyServerConfig.getServerSelectorThreads();                @Override                public Thread newThread(Runnable r) {                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));                }            });

2),创立默认事件执行线程组

    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(            nettyServerConfig.getServerWorkerThreads(),            new ThreadFactory() {                private AtomicInteger threadIndex = new AtomicInteger(0);                @Override                public Thread newThread(Runnable r) {                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());                }            });

3),应用 Netty ServerBootstrap 服务端启动类构建服务端

        ServerBootstrap childHandler =            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)//指定boss与worker两个线程组                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)//指定通道类型                .option(ChannelOption.SO_BACKLOG, 1024)                .option(ChannelOption.SO_REUSEADDR, true)                .option(ChannelOption.SO_KEEPALIVE, false)                .childOption(ChannelOption.TCP_NODELAY, true)                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))//绑定到服务端指定的IP/PORT                .childHandler(new ChannelInitializer<SocketChannel>() {                    @Override                    public void initChannel(SocketChannel ch) throws Exception {                        ch.pipeline()                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,                                new HandshakeHandler(TlsSystemConfig.tlsMode))                            .addLast(defaultEventExecutorGroup,                                new NettyEncoder(),                                new NettyDecoder(),                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),                                new NettyConnectManageHandler(),                                new NettyServerHandler()                            );                    }                });

4),调用 ServerBootstrap 的 bind 办法绑定到指定端口

ChannelFuture sync = this.serverBootstrap.bind().sync();

四,通信协议

在Client和Server之间实现一次音讯发送时,须要对发送的音讯进行一个协定约定。协定内容次要能够分为以下4局部:

(1) 音讯长度:总长度,四个字节存储,占用一个int类型;
(2) 序列化类型&音讯头长度:同样占用一个int类型,第一个字节示意序列化类型,前面三个字节示意音讯头长度;
(3) 音讯头数据:通过序列化后的音讯头数据;
(4) 音讯主体数据:音讯主体的二进制字节数据内容;

待补充内容:
netty的二次编码
线程隔离