关于rocketmq:RocketMQ学习七Netty的应用

11次阅读

共计 10473 个字符,预计需要花费 27 分钟才能阅读完成。

一,RocketMQ 中 Remoting 通信机制

RocketMQ 音讯队列集群次要包含 NameServe、Broker(Master/Slave)、Producer、Consumer4 个角色,根本通信流程如下:

  1. Broker 启动后将本人注册至 NameServer 的操作;随后每隔 30s 工夫定期向 NameServer 上报 Topic 路由信息;
  2. 音讯生产者 Producer 在发送音讯时须要依据 Msg 的 Topic 从本地缓存的 TopicPublishInfoTable 获取路由信息(如果没有会从 NameServer 上从新拉取);
  3. Producer 依据(2)中获取的路由信息抉择一个队列(MessageQueue)进行音讯发送;Broker 作为音讯的接收者收音讯并落盘存储。
  4. 音讯消费者 Consumer 依据 2)中获取的路由信息,并再实现客户端的负载平衡后,抉择其中的某一个或者某几个音讯队列来拉取音讯并进行生产。

二,RocketMQ 中 Remoting 通信模块 API

  1. RemotingService:为顶层接口。次要办法有:
    void start();

    void shutdown();

    void registerRPCHook(RPCHook rpcHook);
  1. RemotingServer/RemotingClient: 近程服务器 / 客户端根底接口,两者中的办法根本相似:

     /**
      * requestCode 命令编码
      * processor RocketMQ 申请业务处理器,例如音讯发送的处理器为 SendMessageProcessor,PullMessageProcessor 为音讯拉取的业务处理器。* executor 线程池,NettyRequestProcessor 具体业务逻辑在该线程池中执行
      */
     void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
         final ExecutorService executor);
    
     void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
    
     int localListenPort();
    
     // 依据申请编码获取对应的申请业务处理器与线程池
     Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
    
     RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
         final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
         RemotingTimeoutException;
    
     void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
         final InvokeCallback invokeCallback) throws InterruptedException,
         RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
    
     void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
         throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
         RemotingSendRequestException;

    RomotingClient 次要是 Producer 发送音讯与 Consumer 拉取音讯时用到;RomotingServer 次要是 Broker 进行回调,获取 Consumer 状态等的时候用到。
    这里重点须要关注下 registerProcessor 注册命令处理器这个办法。RocketMQ 会依照业务逻辑进行拆分,例如音讯发送、音讯拉取等每一个网络操作会定义一个申请编码(requestCode),而后每一个类型对应一个业务处理器 NettyRequestProcessor,并能够依照不同的 requestCode 定义不同的线程池,实现不同申请的线程池隔离。

  2. NettyRemotingAbstract:Netty 近程服务形象实现类,定义网络近程调用、申请,响应等解决逻辑。重要的属性有:
Semaphore semaphoreOneway:管制 oneway 发送形式的并发度的信号量,默认为 65535 个许可。Semaphore semaphoreAsync:管制异步发送形式的并发度的信号量,默认为 65535 个许可。ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable:以后正在等
待对端返回的申请处理表,其中 opaque 示意申请的编号,全局惟一,通常采纳原子递增,客户端向
对端发送网络申请时,通常会采取繁多长连贯,故发送申请后会向调用端立刻返回 
ResponseFuture,同时会将申请放入到该映射表中,当响应解决实现后(响应会蕴含申请 
code),而后从该映射表中获取对应的 ResponseFutre,而后告诉调用端的返回后果,这里是
Future 模式在网络编程中的经典使用。HashMap<Integer/* request code */, Pair<NettyRequestProcessor, 
ExecutorService>> processorTable:注册的申请解决命令。RocketMQ 的设计中采纳了不同
申请命令反对不同的线程池,即实现业务线程池的隔离。
  1. NettyRemotingClient:基于 Netty 网络编程客户端,实现 RemotingClient 接口并继承 NettyRemotingAbstract。
    重要属性如下:
NettyClientConfig nettyClientConfig:与网络相干的配置项。Bootstrap bootstrap:Netty 客户端启动帮忙类。EventLoopGroup eventLoopGroupWorker:Netty 客户端 Work 线程组,俗称 IO 线程。ConcurrentMap<String /* addr */, ChannelWrapper> channelTables:以后客户端已创立的连贯(网络通道、Netty Cannel),每一个地址一条长连贯。ExecutorService publicExecutor:默认工作线程池。ExecutorService callbackExecutor:回掉类申请执行线程池。DefaultEventExecutorGroup defaultEventExecutorGroup:Netty ChannelHandler 线程执行组,即 Netty ChannelHandler 在这些线程中执行。
  1. NettyRemotingServer: 基于 Netty 网络编程服务端。

其外围属性如下所示:

ServerBootstrap serverBootstrap:Netty Server 端启动帮忙类。EventLoopGroup eventLoopGroupSelector:Netty Server Work 线程组,即主从多 Reactor 中的从 Reactor,次要负责读写事件的解决。EventLoopGroup eventLoopGroupBoss:Netty Boss 线程组, 即主从 Reactor 线程模型中的主 Reactor,次要负责 OP_ACCEPT 事件(创立连贯)。NettyServerConfig nettyServerConfig:Netty 服务端配置。Timer timer = new Timer("ServerHouseKeepingService", true):定时扫描器,对 NettyRemotingAbstract 中的 responseTable 进行扫描,将超时的申请移除。DefaultEventExecutorGroup defaultEventExecutorGroup:Netty ChannelHandler 线程执行组。int port:服务端绑定端口。NettyEncoder encoder:RocketMQ 通信协议(编码器)。NettyDecoder decoder:RocketMQ 通信协议(解码器)。NettyConnectManageHandler connectionManageHandler:Netty 连贯管路器 Handler,次要实现对连贯的状态跟踪。NettyServerHandler serverHandler:NettyServer 端外围业务处理器。

这里再依据类之间的调用关系再论述一下音讯的发送与生产,再看一张图:

1) . NettyRemotingClient 会在须要连贯到指定地址先通过 Netty 相干 API 创立 Channel,并进行缓存,下一次申请如果还是发送到该地址时可反复利用。
2) . 而后调用 NettyRemotingClient 的 invokeAsync 等办法进行网络发送,在发送时在 Netty 中会进行一个十分重要的步骤:对申请编码,次要是将须要发送的申请,例如 RemotingCommand,将该对象依照 特定的格局(协定)转换成二进制流。
3) . NettyRemotingServer 端接管到二进制后,网络读申请就绪,进行读申请事件处理流程。首先须要从二进制流中辨认一个残缺的申请包,这就是所谓的解码,行将二进制流转换为申请对象,解码成 RemotingCommand,而后读事件会流传到 NettyServerHandler,最终执行 NettyRemotingAbstract 的 processRequestCommand,次要是依据 requestCode 获取指定的命令执行线程池与 NettyRequestProcessor,并执行对应的逻辑,而后通过网络将执行后果返回给客户端。
4) . 客户端收到服务端的响应后,读事件触发,执行解码(NettyDecoder),而后读事件会流传到 NettyClientHandler,并解决响应后果。

三,RocketMQ 中 Remoting 通信模块具体实现

  1. 客户端的创立。在 RocketMQ 中客户端的实现类:NettyRemotingClient。其创立外围代码被封装在 start 办法中。
    public void start() {
        // 创立默认事件执行线程组,后续事件处理器即(ChannelPipeline 中 addLast 中事件处理器)在该线程组中执行
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
            new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
                }
            });
        // 创立 Netty 客户端,group 指定 Work 线程组,读写事件都会在这个线程组里执行(也就是 IO 线程);channel 指定通道类型,这里应用 NIO 通道
        Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)// 是否禁用 Nagle,如果设置为 true 示意立刻发送,如果设置为 false,如果一个数据包比拟小,会尝试期待更多的包在一起发送
            .option(ChannelOption.SO_KEEPALIVE, false)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())// 连贯超时工夫,超时未连贯胜利抛出异样
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())// 与上面一起,套接字发送缓存区与套接字接管缓存区大小,64kb
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
            .handler(new ChannelInitializer<SocketChannel>() {// 通过 handle 构建事件处理链条
                @Override
                public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();
                    if (nettyClientConfig.isUseTLS()) {if (null != sslContext) {pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                            log.info("Prepend SSL handler");
                        } else {log.warn("Connections are insecure as SSLContext is null!");
                        }
                    }
                    /**addLast 里要是没有传入 EventExecutorGroup,那事件的执行默认在 Work 线程组 */
                    pipeline.addLast(//Netty 的外围扩大点,应用程序的业务逻辑能够通过该事件处理器进行扩大
                        defaultEventExecutorGroup,
                        new NettyEncoder(),//RocketMQ 申请编码器,即协定编码器
                        new NettyDecoder(),//RocketMQ 申请解码器,即协定解码器
                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),// 闲暇检测
                        new NettyConnectManageHandler(),// 连贯管理器
                        new NettyClientHandler());//Netty 客户端业务处理器,进行业务逻辑的解决
                }
            });
        // 删除过期申请的定时工作
        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {NettyRemotingClient.this.scanResponseTable();
                } catch (Throwable e) {log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);

        if (this.channelEventListener != null) {this.nettyEventExecutor.start();
        }
    }

具体步骤:
1),创立 DefaultEventExecutorGroup,默认事件执行线程组
2),调用 Bootstrap 的 group 办法指定一个 Work 线程组,默认状况下读写事件在该线程组中执行,也就是 IO 线程;同时通过 channel 办法指定通道类型,这里采纳 NIO
3),通过 Bootstrap 的 option 办法指定网络参考
4),最初,通过 Bootstrap 的 hanle 办法构建事件处理链条

  1. 建设连贯
    下面第 1 步只是创立了客户端,并没有建设连贯。在发送音讯的时候才会去建设连贯,相干代码如下:
    if (createNewConnection) {ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
        log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
        cw = new ChannelWrapper(channelFuture);
        this.channelTables.put(addr, cw);
    }

连贯建设后会进行缓存不便后续复用。

  1. 音讯的发送 这里以同步发送为例
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            // 有响应后进行回调,这就是异步异步
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {responseFuture.setSendRequestOK(true);
                        return;
                    } else {responseFuture.setSendRequestOK(false);
                    }

                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    responseFuture.putResponse(null);// 计数器递加,进行唤醒操作
                    log.warn("send a request command to channel <" + addr + "> failed.");
                }
            });

            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);// 调用计数器 await 期待操作

1),首先封装一个 ResponseFuture,而后与申请号一起存入 CurrentHashMap 里。当客户端收到服务端的响应后,须要依据 opaque 查找到对应的 ResponseFuture,从而唤醒客户端
2),接着通过 Netty 调用 Channel 的 writeAndFlush 办法进行申请的发送,外部会应用编码器 NettyEncoder 将 RemotingCommand request 编码
3),当响应返回时会回调客户端,并唤醒客户端

  1. 服务端的创立 这里的服务端指的是 Broker

1),创立 Boss 与 Worker 两个线程组.Boss 线程组是主从 Reactor 里的主 Reactor,用来监听连贯,Worker 线程组是主从 Reactor 里的从 Reactor 用来解决读写事件。

                this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
                }
            });

            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });

2),创立默认事件执行线程组

    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });

3),应用 Netty ServerBootstrap 服务端启动类构建服务端

        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)// 指定 boss 与 worker 两个线程组
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)// 指定通道类型
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))// 绑定到服务端指定的 IP/PORT
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                new HandshakeHandler(TlsSystemConfig.tlsMode))
                            .addLast(defaultEventExecutorGroup,
                                new NettyEncoder(),
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                new NettyConnectManageHandler(),
                                new NettyServerHandler());
                    }
                });

4),调用 ServerBootstrap 的 bind 办法绑定到指定端口

ChannelFuture sync = this.serverBootstrap.bind().sync();

四,通信协议

在 Client 和 Server 之间实现一次音讯发送时,须要对发送的音讯进行一个协定约定。协定内容次要能够分为以下 4 局部:

(1) 音讯长度:总长度,四个字节存储,占用一个 int 类型;
(2) 序列化类型 & 音讯头长度:同样占用一个 int 类型,第一个字节示意序列化类型,前面三个字节示意音讯头长度;
(3) 音讯头数据:通过序列化后的音讯头数据;
(4) 音讯主体数据:音讯主体的二进制字节数据内容;

待补充内容:
netty 的二次编码
线程隔离

正文完
 0