Netty服务端和客户端

36次阅读

共计 5269 个字符,预计需要花费 14 分钟才能阅读完成。

欢迎关注公众号:【爱编程
如果有需要后台回复 2019 赠送 1T 的学习资料 哦!!

本文是基于 Netty4.1.36 进行分析

服务端

Netty 服务端的启动代码基本都是如下:

private void start() throws Exception {final EchoServerHandler serverHandler = new EchoServerHandler();
        /**
         * NioEventLoop 并不是一个纯粹的 I / O 线程,它除了负责 I / O 的读写之外
         * 创建了两个 NioEventLoopGroup,* 它们实际是两个独立的 Reactor 线程池。* 一个用于接收客户端的 TCP 连接,* 另一个用于处理 I / O 相关的读写操作,或者执行系统 Task、定时任务 Task 等。*/
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();

        try {
            //ServerBootstrap 负责初始化 netty 服务器,并且开始监听端口的 socket 请求
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, childGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
//                            为监听客户端 read/write 事件的 Channel 添加用户自定义的 ChannelHandler
                            socketChannel.pipeline().addLast(serverHandler);
                        }
                    });

            ChannelFuture f = b.bind().sync();

            f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();
        } finally {bossGroup.shutdownGracefully().sync();
            childGroup.shutdownGracefully().sync();
        }
    }

从上图的代码可以总结为以下几个步骤:

1、创建 ServerBootStrap 实例
2、设置并绑定 Reactor 线程池:EventLoopGroup,EventLoop 就是处理所有注册到本线程的 Selector 上面的 Channel
3、设置并绑定服务端的 channel
4、5、创建处理网络事件的 ChannelPipeline 和 handler,网络时间以流的形式在其中流转,handler 完成多数的功能定制:比如编解码 SSl 安全认证
6、绑定并启动监听端口
7、当轮训到准备就绪的 channel 后,由 Reactor 线程:NioEventLoop 执行 pipline 中的方法,最终调度并执行 channelHandler

服务端创建时序图

ServerBootStrap 引导启动服务端

它就是主要引导启动服务端,工作包括以下:

  • 1. 创建服务端 Channel
  • 2. 初始化服务端 Channel
  • 3. 将 Channel 注册到 selector
  • 4. 端口绑定

1. 创建服务端 Channel

流程:
首先从用户代码的 bind()其实就是 AbstractBootstrap.bind(), 然后通过 反射 工厂将用户通过 b.channel(NioServerSocketChannel.class)传入的 NioServerSocketChannel 通过调用底层的 jdk 的 SelectorProvider 创建 channel,同时也接着创建好对应的ChannelPipeline
详情可以参考下图,自己去查看一下源码:

2. 初始化服务端 Channel

主要工作如下:

1)设置的 option 缓存到 NioServerSocketChannelConfig 里
2)设置的 attr 设置到 channel 里
3)保存配置的 childOptions,配置的 childAttrs 到 ServerBootstrapAcceptor 里
4)往 NioSocketChannel 的 pipeline 中添加一个ServerBootstrapAcceptor

主要的核心源码如下:

 @Override
    void init(Channel channel) throws Exception {final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
        }
        synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        }

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

小结:
总体如上面工作流程所述。
特别地建议:查看 ServerBootstrapAcceptor 源码,你可以发现 ServerBootstrapAcceptor 在 channelRead 事件触发的时候(也就有客户端连接的时候),把 childHandler 加到 childChannel Pipeline 的末尾,设置 childHandler 的 options 和 attrs,最后把 childHandler 注册进 childGroup

3. 将 Channel 注册到 selector

注册过程如下图

小结:
Channel 注册过程所做的工作就是将 Channel 与对应的 EventLoop 关联。

1). 每个 Channel 都会关联一个特定的 EventLoop, 并且这个 Channel 中的所有 IO 操作都是在这个 EventLoop 中执行的;

2). 当关联好 Channel 和 EventLoop 后, 会继续调用底层的 Java NIO SocketChannel 的 register 方法, 将底层的 Java NIO SocketChannel 注册到指定的 selector 中.

通过这两步, 就完成了 Netty Channel 的注册过程.

4. 端口绑定

端口绑定的源码流程基本如下图,详情可以还是你自己读一下源码比较好点。

小结:
其实 netty 端口绑定是调用 jdk 的 javaChannel().bind(localAddress, config.getBacklog()); 进行绑定,然后 TCP 链路建立成功,Channel 激活事件,通过 channelPipeline 进行传播。

客户端

客户端启动的常规代码如下:

  private void start() throws Exception {

        /**
         * Netty 用于接收客户端请求的线程池职责如下。*(1)接收客户端 TCP 连接,初始化 Channel 参数;*(2)将链路状态变更事件通知给 ChannelPipeline
         */
        EventLoopGroup group = new NioEventLoopGroup();
        try {Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host,port))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            // 绑定端口
            ChannelFuture f = b.connect().sync();

            f.channel().closeFuture().sync();} catch (Exception e) {group.shutdownGracefully().sync();}


    }

流程:

1. 用户线程创建 Bootstrap 实例,通过 API 设置创建客户端相关的参数,异步发起客户端连接。
2. 创建处理客户端连接、I/ O 读写的 Reactor 线程组 NioEventLoopGroup,默认为 CPU 内核数的 2 倍。
3. 通过 Bootstrap 的 ChannelFactory 和用户指定的 Channel 类型创建用于客户端 NioSocketChannel,它的功能类似于 JDK NIO 类库提供的 SocketChannel
4. 创建默认的 Channel Handler Pipeline,用于调度和执行网路事件。
5. 异步发起 TCP 连接,判断连接是否成功。如果成功,则直接将 NioSocketChannel 注册到多路复用器上,监听读操作位,用于数据包读取和消息发送,如果没有立即连接成功,则注册连接监听为到多路复用器,等待连接结果。
6. 注册对应的网络监听状态为到多路复用器。
7. 由多路复用器在 I / O 现场中轮询个 Channel,处理连接结果。
8. 如果连接成功,设置 Future 结果,发送连接成功事件,触发 ChannelPipeline 执行。
9. 由 ChannelPipeline 调度执行系统和用户的 ChannelHandler,执行逻辑。

源码调用流程如下图:

小结:
客户端是如何发起 TCP 连接的?

如下图:

特别提醒:
在 AbstractChannelHandlerContext.connect()#findContextOutbound 这步操作是返回的结果 next 其实是 头节点 ,也就是说在下一步 next.invokeConnect() 这里的 next 就是 头节点,所以最终是调用HeadContext .connect()

总结

本文主要讲述 netty 服务端和客户端的简单工作流程。
具体服务端与客户端如何通信,以及内存管理等方面的知识下一次再写。

最后

如果对 Java、大数据感兴趣请长按二维码关注一波,我会努力带给你们价值。觉得对你哪怕有一丁点帮助的请帮忙点个赞或者转发哦。
关注公众号 【爱编码】,回复2019 有相关资料哦。

正文完
 0