乐趣区

关于netty:抓到-Netty-一个-Bug-顺带来透彻地聊一下-Netty-是如何高效接收网络连接的

欢送关注公众号:bin 的技术小屋,浏览公众号原文

本系列 Netty 源码解析文章基于 4.1.56.Final版本

对于一个高性能网络通讯框架来说,最最重要也是最外围的工作就是如何高效的接管客户端连贯,这就好比咱们开了一个饭店,那么迎接客人就是饭店最重要的工作,咱们要先把客人迎接进来,不能让客人一看人多就走掉,只有客人进来了,哪怕菜做的慢一点也没关系。

本文笔者就来为大家介绍下 netty 这块最外围的内容,看看 netty 是如何高效的接管客户端连贯的。

下图为笔者在一个月黑风高天空显得那么高深边远的夜晚,闲来无事,于是捧起 Netty 对于如何接管连贯这部分源码细细品读的时候,意外的发现了一个影响 Netty 接管连贯吞吐的一个 Bug。

于是笔者就在 Github 提了一个 Issue#11708,论述了下这个 Bug 产生的起因以及导致的后果并和 Netty 的作者一起探讨了下修复措施。如上图所示。

Issue#11708:https://github.com/netty/nett…

这里先不具体解释这个 Issue,也不倡议大家当初就关上这个 Issue 查看,笔者会在本文的介绍中随着源码深刻的解读缓缓的为大家一层一层地拨开迷雾。

之所以在文章的结尾把这个拎进去,笔者是想让大家带着狐疑,扫视,观赏,崇拜,敬畏的态度来一起品读世界顶级程序员编写的代码。由衷的感激他们在这一畛域做出的奉献。

好了,问题抛出来后,咱们就带着这个疑难来开始本文的内容吧~~~

前文回顾

依照老规矩,再开始本文的内容之前,咱们先来回顾下前边几篇文章的概要内容帮忙大家梳理一个框架全貌进去。

笔者这里再次想和读者敌人们强调的是本文能够独立观看,并不依赖前边系列文章的内容,只是大家如果对相干细节局部感兴趣的话,能够在浏览完本文之后在去回看相干文章。

在前边的系列文章中,笔者为大家介绍了驱动 Netty 整个框架运行的外围引擎 Reactor 的创立,启动,运行的全流程。从当初开始 Netty 的整个外围框架就开始运转起来开始工作了,本文要介绍的次要内容就是 Netty 在启动之后要做的第一件事件:监听端口地址,高效接管客户端连贯。

在《聊聊 Netty 那些事儿之从内核角度看 IO 模型》一文中,咱们是从整个网络框架的基石 IO 模型的角度整体论述了下 Netty 的 IO 线程模型。

而 Netty 中的 Reactor 正是 IO 线程在 Netty 中的模型定义。Reactor 在 Netty 中是以 Group 的模式呈现的,分为:

  • 主 Reactor 线程组也就是咱们在启动代码中配置的EventLoopGroup bossGroup,main reactor group 中的 reactor 次要负责监听客户端连贯事件,高效的解决客户端连贯。也是本文咱们要介绍的重点。
  • 从 Reactor 线程组也就是咱们在启动代码中配置的EventLoopGroup workerGroup,sub reactor group 中的 reactor 次要负责解决客户端连贯上的 IO 事件,以及异步工作的执行。

最初咱们得出 Netty 的整个 IO 模型如下:

本文咱们探讨的重点就是 MainReactorGroup 的外围工作上图中所示的步骤 1,步骤 2,步骤 3。

在从整体上介绍完 Netty 的 IO 模型之后,咱们又在《Reactor 在 Netty 中的实现(创立篇)》中残缺的介绍了 Netty 框架的骨架主从 Reactor 组的搭建过程,论述了 Reactor 是如何被创立进去的,并介绍了它的外围组件如下图所示:

  • thread即为 Reactor 中的 IO 线程,次要负责监听 IO 事件,解决 IO 工作,执行异步工作。
  • selector则是 JDK NIO 对操作系统底层 IO 多路复用技术实现的封装。用于监听 IO 就绪事件。
  • taskQueue用于保留 Reactor 须要执行的异步工作,这些异步工作能够由用户在业务线程中向 Reactor 提交,也能够是 Netty 框架提交的一些本身外围的工作。
  • scheduledTaskQueue则是保留 Reactor 中执行的定时工作。代替了原有的工夫轮来执行延时工作。
  • tailQueue保留了在 Reactor 须要执行的一些尾部收尾工作,在一般工作执行完后 Reactor 线程会执行尾部工作,比方对 Netty 的运行状态做一些统计数据,例如工作循环的耗时、占用物理内存的大小等等

在骨架搭建结束之后,咱们随后又在在《具体图解 Netty Reactor 启动全流程》》一文中介绍了 本文的配角服务端 NioServerSocketChannel 的创立,初始化,绑定端口地址,向 main reactor 注册监听 OP_ACCEPT 事件 的残缺过程

main reactor 如何解决 OP_ACCEPT 事件将会是本文的次要内容。

自此 Netty 框架的 main reactor group 曾经启动结束,开始筹备监听 OP_accept 事件,当客户端连贯上来之后,OP_ACCEPT 事件沉闷,main reactor 开始解决 OP_ACCEPT 事件接管客户端连贯了。

而 netty 中的 IO 事件分为:OP_ACCEPT 事件,OP_READ 事件,OP_WRITE 事件和 OP_CONNECT 事件,netty 对于 IO 事件的监听和解决对立封装在 Reactor 模型中,这四个 IO 事件的处理过程也是咱们后续文章中要独自拿进去介绍的,本文咱们聚焦 OP_ACCEPT 事件的解决。

而为了让大家可能对 IO 事件的解决有一个完整性的意识,笔者写了《一文聊透 Netty 外围引擎 Reactor 的运行架构》这篇文章,在文章中具体介绍了 Reactor 线程的整体运行框架。

Reactor 线程会在一个死循环中 996 不停的运行,在循环中会一直的轮询监听 Selector 上的 IO 事件,当 IO 事件沉闷后,Reactor 从 Selector 上被唤醒转去执行 IO 就绪事件的解决,在这个过程中咱们引出了上述四种 IO 事件的解决入口函数。

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        // 获取 Channel 的底层操作类 Unsafe
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {...... 如果 SelectionKey 曾经生效则敞开对应的 Channel......}

        try {
            // 获取 IO 就绪事件
            int readyOps = k.readyOps();
            // 解决 Connect 事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {int ops = k.interestOps();
                // 移除对 Connect 事件的监听,否则 Selector 会始终告诉
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                // 触发 channelActive 事件处理 Connect 事件
                unsafe.finishConnect();}

            // 解决 Write 事件
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();}

             // 解决 Read 事件或者 Accept 事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();
            }
        } catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());
        }
    }

本文笔者将会为大家重点介绍 OP_ACCEPT 事件 的解决入口函数 unsafe.read() 的整个源码实现。

当客户端连贯实现三次握手之后,main reactor 中的 selector 产生 OP_ACCEPT 事件 沉闷,main reactor 随即被唤醒,来到了 OP_ACCEPT 事件 的解决入口函数开始接管客户端连贯。

1. Main Reactor 解决 OP_ACCEPT 事件

Main Reactor 轮询到 NioServerSocketChannel 上的 OP_ACCEPT 事件 就绪时,Main Reactor 线程就会从 JDK Selector 上的阻塞轮询 APIselector.select(timeoutMillis)调用中返回。转而去解决 NioServerSocketChannel 上的OP_ACCEPT 事件

public final class NioEventLoop extends SingleThreadEventLoop {private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        .............. 省略.................

        try {int readyOps = k.readyOps();

            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {.............. 解决 OP_CONNECT 事件.................}


            if ((readyOps & SelectionKey.OP_WRITE) != 0) {.............. 解决 OP_WRITE 事件.................}


            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                // 本文重点解决 OP_ACCEPT 事件
                unsafe.read();}
        } catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());
        }
    }

}
  • 解决 IO 就绪事件的入口函数 processSelectedKey 中的参数 AbstractNioChannel ch 正是 Netty 服务端NioServerSocketChannel。因为此时的执行线程为 main reactor 线程,而 main reactor 上注册的正是 netty 服务端 NioServerSocketChannel 负责监听端口地址,接管客户端连贯。
  • 通过 ch.unsafe() 获取到的 NioUnsafe 操作类正是 NioServerSocketChannel 中对底层 JDK NIO ServerSocketChannel 的 Unsafe 底层操作类。

Unsafe 接口 是 Netty 对 Channel 底层操作行为的封装,比方 NioServerSocketChannel 的底层 Unsafe 操作类干的事件就是 绑定端口地址 解决 OP_ACCEPT 事件

这里咱们看到,Netty 将 OP_ACCEPT 事件 解决的入口函数封装在 NioServerSocketChannel 里的底层操作类 Unsafe 的 read 办法中。

而 NioServerSocketChannel 中的 Unsafe 操作类实现类型为 NioMessageUnsafe 定义在上图继承构造中的AbstractNioMessageChannel 父类中

上面咱们到 NioMessageUnsafe#read 办法中来看下 Netty 对 OP_ACCPET 事件 的具体处理过程:

2. 接管客户端连贯外围流程框架总览

咱们还是依照老规矩,先从整体上把整个 OP_ACCEPT 事件的逻辑解决框架提取进去,让大家先总体仰视下流程全貌,而后在针对每个外围点位进行各个击破。

main reactor 线程是在一个 do...while{...} 循环 read loop 中一直的调用 JDK NIO serverSocketChannel.accept()办法来接管实现三次握手的客户端连贯 NioSocketChannel 的,并将接管到的客户端连贯 NioSocketChannel 长期保留在 List<Object> readBuf 汇合中,后续会服务端 NioServerSocketChannel 的 pipeline 中通过 ChannelRead 事件来传递,最终会在 ServerBootstrapAcceptor 这个 ChannelHandler 中被解决初始化,并将其注册到 Sub Reator Group 中。

这里的 read loop 循环会被限定只能读取 16 次,当 main reactor 从 NioServerSocketChannel 中读取客户端连贯 NioSocketChannel 的次数达到16 次 之后,无论此时是否还有客户端连贯都不能在持续读取了。

因为咱们在《一文聊透 Netty 外围引擎 Reactor 的运行架构》一文中提到,netty 对 reactor 线程压迫的比拟狠,要干的事件很多,除了要监听轮询 IO 就绪事件,解决 IO 就绪事件,还须要执行用户和 netty 框架本省提交的异步工作和定时工作。

所以这里的 main reactor 线程不能在 read loop 中无限度的执行上来,因为还须要调配工夫去执行异步工作,不能因为无限度的接管客户端连贯而耽搁了异步工作的执行。所以这里将 read loop 的循环次数限定为 16 次。

如果 main reactor 线程在 read loop 中读取客户端连贯 NioSocketChannel 的次数曾经满了 16 次,即便此时还有客户端连贯未接管,那么 main reactor 线程也不会再去接管了,而是转去执行异步工作,当异步工作执行结束后,还会在回来执行残余接管连贯的工作。

main reactor 线程退出 read loop 循环的条件有两个:

  1. 在限定的 16 次读取中,曾经没有新的客户端连贯要接管了。退出循环。
  2. 从 NioServerSocketChannel 中读取客户端连贯的次数达到了 16 次,无论此时是否还有客户端连贯都须要退出循环。

以上就是 Netty 在接管客户端连贯时的整体外围逻辑,上面笔者将这部分逻辑的外围源码实现框架提取进去,不便大家根据上述外围逻辑与源码中的解决模块对应起来,还是那句话,这里只须要总体把握外围解决流程,不须要读懂每一行代码,笔者会在文章的后边分模块来各个击破它们。

public abstract class AbstractNioMessageChannel extends AbstractNioChannel {

  private final class NioMessageUnsafe extends AbstractNioUnsafe {

        // 寄存连贯建设后,创立的客户端 SocketChannel
        private final List<Object> readBuf = new ArrayList<Object>();

        @Override
        public void read() {
            // 必须在 Main Reactor 线程中执行
            assert eventLoop().inEventLoop();
            // 留神上面的 config 和 pipeline 都是服务端 ServerSocketChannel 中的
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            // 创立接收数据 Buffer 分配器(用于调配容量大小适合的 byteBuffer 用来包容接收数据)// 在接管连贯的场景中,这里的 allocHandle 只是用于管制 read loop 的循环读取创立连贯的次数。final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        // 底层调用 NioServerSocketChannel->doReadMessages 创立客户端 SocketChannel
                        int localRead = doReadMessages(readBuf);

                        // 已无新的连贯可接管则退出 read loop
                        if (localRead == 0) {break;}
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
                        // 统计在以后事件循环中曾经读取到得 Message 数量(创立连贯的个数)allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());// 判断是否曾经读满 16 次
                } catch (Throwable t) {exception = t;}

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    // 在 NioServerSocketChannel 对应的 pipeline 中流传 ChannelRead 事件
                    // 初始化客户端 SocketChannel,并将其绑定到 Sub Reactor 线程组中的一个 Reactor 上
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                // 革除本次 accept 创立的客户端 SocketChannel 汇合
                readBuf.clear();
                allocHandle.readComplete();
                // 触发 readComplete 事件流传
                pipeline.fireChannelReadComplete();
                .................... 省略............
            } finally {.................... 省略............}
        }
    }
  }
}

这里首先要通过断言 assert eventLoop().inEventLoop() 确保解决接管客户端连贯的线程必须为 Main Reactor 线程。

而 main reactor 中次要注册的是服务端 NioServerSocketChannel,次要负责解决OP_ACCEPT 事件,所以以后 main reactor 线程是在 NioServerSocketChannel 中执行接管连贯的工作。

所以这里咱们通过 config() 获取到的是 NioServerSocketChannel 的属性配置类NioServerSocketChannelConfig, 它是在 Reactor 的启动阶段被创立进去的。

    public NioServerSocketChannel(ServerSocketChannel channel) {
        // 父类 AbstractNioChannel 中保留 JDK NIO 原生 ServerSocketChannel 以及要监听的事件 OP_ACCEPT
        super(null, channel, SelectionKey.OP_ACCEPT);
        //DefaultChannelConfig 中设置用于 Channel 接收数据用的 buffer->AdaptiveRecvByteBufAllocator
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

同理这里通过 pipeline() 获取到的也是 NioServerSocketChannel 中的pipeline。它会在 NioServerSocketChannel 向 main reactor 注册胜利之后被初始化。

前边提到 main reactor 线程会被限定只能在 read loop 中向 NioServerSocketChannel 读取 16 次客户端连贯,所以在开始 read loop 之前,咱们须要创立一个可能保留记录读取次数的对象,在每次 read loop 循环之后,能够依据这个对象来判断是否完结 read loop。

这个对象就是这里的 RecvByteBufAllocator.Handle allocHandle 专门用于统计 read loop 中接管客户端连贯的次数,以及判断是否该完结 read loop 转去执行异步工作。

当这所有准备就绪之后,main reactor 线程就开始在 do{....}while(...) 循环中接管客户端连贯了。

在 read loop 中通过调用 doReadMessages 函数 接管实现三次握手的客户端连贯,底层会调用到 JDK NIO ServerSocketChannel 的 accept 办法,从内核全连贯队列中取出客户端连贯。

返回值 localRead 示意接管到了多少客户端连贯,客户端连贯通过 accept 办法只会一个一个的接管,所以这里的 localRead 失常状况下都会返回 1,当localRead <= 0 时意味着曾经没有新的客户端连贯能够接管了,本次 main reactor 接管客户端的工作到这里就完结了,跳出 read loop。开始新的一轮 IO 事件的监听解决。

    public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
        try {return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
                @Override
                public SocketChannel run() throws IOException {return serverSocketChannel.accept();
                }
            });
        } catch (PrivilegedActionException e) {throw (IOException) e.getCause();}
    }

随后会将接管到的客户端连贯占时寄存到 List<Object> readBuf 汇合中。

  private final class NioMessageUnsafe extends AbstractNioUnsafe {

        // 寄存连贯建设后,创立的客户端 SocketChannel
        private final List<Object> readBuf = new ArrayList<Object>();}

调用 allocHandle.incMessagesRead 统计本次事件循环中接管到的客户端连贯个数,最初在 read loop 开端通过 allocHandle.continueReading 判断是否达到了限定的 16 次。从而决定 main reactor 线程是持续接管客户端连贯还是转去执行异步工作。

main reactor 线程退出 read loop 的两个条件:

  1. 在限定的 16 次读取中,曾经没有新的客户端连贯要接管了。退出循环。
  2. 从 NioServerSocketChannel 中读取客户端连贯的次数达到了 16 次,无论此时是否还有客户端连贯都须要退出循环。

当满足以上两个退出条件时,main reactor 线程就会退出 read loop,因为在 read loop 中接管到的客户端连贯全副暂存在 List<Object> readBuf 汇合中, 随后开始遍历 readBuf,在 NioServerSocketChannel 的 pipeline 中流传 ChannelRead 事件。

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    //NioServerSocketChannel 对应的 pipeline 中流传 read 事件
                    //io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor.channelRead
                    // 初始化客户端 SocketChannel,并将其绑定到 Sub Reactor 线程组中的一个 Reactor 上
                    pipeline.fireChannelRead(readBuf.get(i));
                }

最终 pipeline 中的 ChannelHandler(ServerBootstrapAcceptor)会响应 ChannelRead 事件,并在相应回调函数中初始化客户端 NioSocketChannel,并将其注册到 Sub Reactor Group 中。尔后客户端 NioSocketChannel 绑定到的 sub reactor 就开始监听解决客户端连贯上的读写事件了。

Netty 整个接管客户端的逻辑过程如下图步骤 1,2,3 所示。

以上内容就是笔者提取进去的整体流程框架,上面咱们来将其中波及到的重要外围模块拆开,一个一个具体解读下。

3. RecvByteBufAllocator 简介

Reactor 在解决对应 Channel 上的 IO 数据时,都会采纳一个 ByteBuffer 来接管 Channel 上的 IO 数据。而本大节要介绍的 RecvByteBufAllocator 正是用来调配 ByteBuffer 的一个分配器。

还记得这个 RecvByteBufAllocator 在哪里被创立的吗??

在《聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现 (创立篇)》一文中,在介绍NioServerSocketChannel 的创立过程中提到,对应 Channel 的配置类 NioServerSocketChannelConfig 也会随着 NioServerSocketChannel 的创立而创立。

    public NioServerSocketChannel(ServerSocketChannel channel) {super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

在创立 NioServerSocketChannelConfig 的过程中会创立RecvByteBufAllocator

   public DefaultChannelConfig(Channel channel) {this(channel, new AdaptiveRecvByteBufAllocator());
    }

这里咱们看到 NioServerSocketChannel 中的 RecvByteBufAllocator 理论类型为AdaptiveRecvByteBufAllocator,顾名思义,这个类型的 RecvByteBufAllocator 能够依据 Channel 上每次到来的 IO 数据大小来自适应动静调整 ByteBuffer 的容量。

对于服务端 NioServerSocketChannel 来说,它上边的 IO 数据就是客户端的连贯,它的长度和类型都是固定的,所以在接管客户端连贯的时候并不需要这样的一个 ByteBuffer 来接管,咱们会将接管到的客户端连贯寄存在 List<Object> readBuf 汇合中

对于客户端 NioSocketChannel 来说,它上边的 IO 数据时客户端发送来的网络数据,长度是不定的,所以才会须要这样一个能够依据每次 IO 数据的大小来自适应动静调整容量的 ByteBuffer 来接管。

那么看起来这个 RecvByteBufAllocator 和本文的主题不是很关联,因为在接管连贯的过程中并不会怎么用到它,这个类笔者还会在前面的文章中具体介绍,之所以这里把它拎进去独自介绍是因为它和本文结尾提到的 Bug 有关系,这个 Bug 就是由这个类引起的。

3.1 RecvByteBufAllocator.Handle 的获取

在本文中,咱们是通过 NioServerSocketChannel 中的 unsafe 底层操作类来获取 RecvByteBufAllocator.Handle 的

final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
protected abstract class AbstractUnsafe implements Unsafe {
        @Override
        public RecvByteBufAllocator.Handle recvBufAllocHandle() {if (recvHandle == null) {recvHandle = config().getRecvByteBufAllocator().newHandle();
            }
            return recvHandle;
        }
}

咱们看到最终会在 NioServerSocketChannel 的配置类 NioServerSocketChannelConfig 中获取到AdaptiveRecvByteBufAllocator

public class DefaultChannelConfig implements ChannelConfig {
    // 用于 Channel 接收数据用的 buffer 分配器  类型为 AdaptiveRecvByteBufAllocator
    private volatile RecvByteBufAllocator rcvBufAllocator;
}

AdaptiveRecvByteBufAllocator 中会创立自适应动静调整容量的 ByteBuffer 分配器。

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

    @Override
    public Handle newHandle() {return new HandleImpl(minIndex, maxIndex, initial);
    }
    
    private final class HandleImpl extends MaxMessageHandle {................. 省略................}
}

这里的 newHandle 办法返回的具体类型为 MaxMessageHandle ,这个MaxMessageHandle 里边保留了每次从 Channel 中读取 IO 数据 的容量指标,不便下次读取时调配适合大小的buffer

每次在应用 allocHandle 前须要调用 allocHandle.reset(config); 重置里边的统计指标。

    public abstract class MaxMessageHandle implements ExtendedHandle {
        private ChannelConfig config;
        // 每次事件轮询时,最多读取 16 次
        private int maxMessagePerRead;
        // 本次事件轮询总共读取的 message 数, 这里指的是接管连贯的数量
        private int totalMessages;
        // 本次事件轮询总共读取的字节数
        private int totalBytesRead;

       @Override
        public void reset(ChannelConfig config) {
            this.config = config;
            // 默认每次最多读取 16 次
            maxMessagePerRead = maxMessagesPerRead();
            totalMessages = totalBytesRead = 0;
        }
    }
  • maxMessagePerRead:用于管制每次 read loop 里最大能够循环读取的次数,默认为 16 次,可在启动配置类 ServerBootstrap 中通过 ChannelOption.MAX_MESSAGES_PER_READ 选项设置。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
  .channel(NioServerSocketChannel.class)
  .option(ChannelOption.MAX_MESSAGES_PER_READ, 自定义次数)
  • totalMessages:用于统计 read loop 中总共接管的连贯个数,每次 read loop 循环后会调用 allocHandle.incMessagesRead 减少记录接管到的连贯个数。
        @Override
        public final void incMessagesRead(int amt) {totalMessages += amt;}
  • totalBytesRead:用于统计在 read loop 中总共接管到客户端连贯上的数据大小,这个字段次要用于 sub reactor 在接管客户端 NioSocketChannel 上的网络数据用的,本文咱们介绍的是 main reactor 接管客户端连贯,所以这里并不会用到这个字段。这个字段会在 sub reactor 每次读取完 NioSocketChannel 上的网络数据时减少记录。
        @Override
        public void lastBytesRead(int bytes) {
            lastBytesRead = bytes;
            if (bytes > 0) {totalBytesRead += bytes;}
        }

MaxMessageHandler 中还有一个十分重要的办法就是在每次 read loop 开端会调用 allocHandle.continueReading() 办法来判断读取连贯次数是否已满 16 次,来决定 main reactor 线程是否退出循环。

                  do {
                        // 底层调用 NioServerSocketChannel->doReadMessages 创立客户端 SocketChannel
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {break;}
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
                        // 统计在以后事件循环中曾经读取到得 Message 数量(创立连贯的个数)allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());

红框中圈进去的两个判断条件和本文主题无关,咱们这里不须要关注,笔者会在前面的文章具体介绍。

  • totalMessages < maxMessagePerRead:在本文的接管客户端连贯场景中,这个条件用于判断 main reactor 线程在 read loop 中的读取次数是否超过了 16 次。如果超过 16 次就会返回 false,main reactor 线程退出循环。
  • totalBytesRead > 0:用于判断当客户端 NioSocketChannel 上的 OP_READ 事件沉闷时,sub reactor 线程在 read loop 中是否读取到了网络数据。

以上内容就是 RecvByteBufAllocator.Handle 在接管客户端连贯场景下的作用,大家这里认真看下这个 allocHandle.continueReading() 办法退出循环的判断条件,再联合整个 do{....}while(...) 接管连贯循环体,感触下是否哪里有些不对劲?Bug 行将呈现~~~

4. 啊哈!!Bug ! !

netty 不论是在本文中解决接管客户端连贯的场景还是在解决接管客户端连贯上的网络数据场景都会在一个 do{....}while(...) 循环 read loop 中一直的解决。

同时也都会利用在上一大节中介绍的 RecvByteBufAllocator.Handle 来记录每次 read loop 接管到的连贯个数和从连贯上读取到的网络数据大小。

从而在 read loop 的开端都会通过 allocHandle.continueReading() 办法判断是否应该退出 read loop 循环完结连贯的接管流程或者是完结连贯上数据的读取流程。

无论是用于接管客户端连贯的 main reactor 也好还是用于接管客户端连贯上的网络数据的 sub reactor 也好,它们的运行框架都是一样的,只不过是具体分工不同。

所以 netty 这里想用对立的 RecvByteBufAllocator.Handle 来解决以上两种场景。

RecvByteBufAllocator.Handle 中的 totalBytesRead 字段次要记录 sub reactor 线程在解决客户端 NioSocketChannel 中 OP_READ 事件沉闷时,总共在 read loop 中读取到的网络数据,而这里是 main reactor 线程在接管客户端连贯所以这个字段并不会被设置。totalBytesRead 字段的值在本文中永远会是0

所以无论同时有多少个客户端并发连贯到服务端上,在接管连贯的这个 read loop 中永远只会承受一个连贯就会退出循环,因为 allocHandle.continueReading() 办法 中的判断条件 totalBytesRead > 0 永远会返回false

                  do {
                        // 底层调用 NioServerSocketChannel->doReadMessages 创立客户端 SocketChannel
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {break;}
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
                        // 统计在以后事件循环中曾经读取到得 Message 数量(创立连贯的个数)allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());

而 netty 的本意是在这个 read loop 循环中尽可能多的去接管客户端的并发连贯,同时又不影响 main reactor 线程执行异步工作。然而因为这个 Bug,main reactor 在这个循环中只执行一次就完结了。这也肯定水平上就影响了 netty 的吞吐

让咱们设想下这样的一个场景,当有 16 个客户端同时并发连贯到了服务端,这时 NioServerSocketChannel 上的 OP_ACCEPT 事件 沉闷,main reactor 从 Selector 上被唤醒,随后执行 OP_ACCEPT 事件 的解决。

public final class NioEventLoop extends SingleThreadEventLoop {
    @Override
    protected void run() {
        int selectCnt = 0;
        for (;;) {
            try { 
                int strategy;
                try {strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:                  
                          ............ 省略.........
                    case SelectStrategy.BUSY_WAIT:

                          ............ 省略.........
                    case SelectStrategy.SELECT:
                            ............ 监听轮询 IO 事件.........
                    default:
                    }
                } catch (IOException e) {............ 省略.........}

                ............ 解决 IO 就绪事件.........
                ............ 执行异步工作.........
    }
}

然而因为这个 Bug 的存在,main reactor 在接管客户端连贯的这个 read loop 中只接管了一个客户端连贯就匆匆返回了。

      private final class NioMessageUnsafe extends AbstractNioUnsafe {
                    do {int localRead = doReadMessages(readBuf);
                        ......... 省略...........
                    } while (allocHandle.continueReading());
     }

而后依据下图中这个 Reactor 的运行构造去执行异步工作,随后绕一大圈又会回到 NioEventLoop#run 办法中从新发动一轮 OP_ACCEPT 事件轮询。

因为当初还有 15 个客户端并发连贯没有被接管,所以此时 Main Reactor 线程并不会在 selector.select() 上阻塞,最终绕一圈又会回到 NioMessageUnsafe#read 办法的 do{.....}while() 循环。在接管一个连贯之后又退出循环。

原本咱们能够在一次 read loop 中把这 16 个并发的客户端连贯全副接管结束的,因为这个 Bug,main reactor 须要一直的发动 OP_ACCEPT 事件的轮询,绕了很大一个圈子。同时也减少了许多不必要的 selector.select()零碎调用开销

这时大家在看这个 Issue#11708 中的探讨是不是就清晰很多了~~

Issue#11708:https://github.com/netty/nett…

4.1 Bug 的修复

笔者在写这篇文章的时候,Netty 最新版本是 4.1.68.final,这个 Bug 在 4.1.69.final 中被修复。

因为该 Bug 产生的起因正是因为服务端 NioServerSocketChannel(用于监听端口地址和接管客户端连贯)和 客户端 NioSocketChannel(用于通信)中的 Config 配置类混用了同一个 ByteBuffer 分配器 AdaptiveRecvByteBufAllocator 而导致的。

所以在新版本修复中专门为服务端 ServerSocketChannel 中的 Config 配置类引入了一个新的 ByteBuffer 分配器ServerChannelRecvByteBufAllocator,专门用于服务端 ServerSocketChannel 接管客户端连贯的场景。

ServerChannelRecvByteBufAllocator 的父类 DefaultMaxMessagesRecvByteBufAllocator 中引入了一个新的字段ignoreBytesRead,用于示意是否疏忽网络字节的读取,在创立服务端 Channel 配置类 NioServerSocketChannelConfig 的时候,这个字段会被赋值为true

当 main reactor 线程在 read loop 循环中接管客户端连贯的时候。

      private final class NioMessageUnsafe extends AbstractNioUnsafe {

                    do {int localRead = doReadMessages(readBuf);
                        ......... 省略...........
                    } while (allocHandle.continueReading());
     }

在 read loop 循环的开端就会采纳从 ServerChannelRecvByteBufAllocator 中创立的 MaxMessageHandle#continueReading 办法来判断读取连贯次数是否超过了 16 次。因为这里的 ignoreBytesRead == true 这回咱们就会疏忽 totalBytesRead == 0 的状况,从而使得接管连贯的 read loop 得以持续地执行上来。在一个 read loop 中一次性把 16 个连贯全副接管结束。

以上就是对这个 Bug 产生的起因,以及发现的过程,最初修复的计划一个全面的介绍,因而笔者也呈现在了 netty 4.1.69.final 版本发布公告里的 thank-list 中。哈哈,真是令人开心的一件事件~~~

通过以上对 netty 接管客户端连贯的全流程剖析和对这个 Bug 前因后果以及修复计划的介绍,大家当初肯定曾经了解了整个接管连贯的流程框架。

接下来笔者就把这个流程中波及到的一些外围模块在独自拎进去从细节动手,为大家各个击破~~~

5. doReadMessages 接管客户端连贯

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = SocketUtils.accept(javaChannel());

        try {if (ch != null) {buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {ch.close();
            } catch (Throwable t2) {logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

}
  • 通过 javaChannel() 获取封装在 Netty 服务端 NioServerSocketChannel 中的JDK 原生 ServerSocketChannel
    @Override
    protected ServerSocketChannel javaChannel() {return (ServerSocketChannel) super.javaChannel();}
  • 通过 JDK NIO 原生ServerSocketChannelaccept 办法 获取 JDK NIO 原生 客户端连贯SocketChannel
    public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
        try {return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
                @Override
                public SocketChannel run() throws IOException {return serverSocketChannel.accept();
                }
            });
        } catch (PrivilegedActionException e) {throw (IOException) e.getCause();}
    }

这一步就是咱们在《聊聊 Netty 那些事儿之从内核角度看 IO 模型》介绍到的调用 监听 Socketaccept 办法,内核会基于 监听 Socket创立进去一个新的 Socket 专门用于与客户端之间的网络通信这个咱们称之为 客户端连贯 Socket。这里的 ServerSocketChannel 就相似于 监听 SocketSocketChannel就相似于 客户端连贯 Socket

因为咱们在创立 NioServerSocketChannel 的时候,会将 JDK NIO 原生ServerSocketChannel设置为 非阻塞 ,所以这里当ServerSocketChannel 上有客户端连贯时就会间接创立 SocketChannel,如果此时并没有客户端连贯时accept 调用 就会立即返回 null 并不会阻塞。

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            // 设置 Channel 为非阻塞 配合 IO 多路复用模型
            ch.configureBlocking(false);
        } catch (IOException e) {.......... 省略.............}
    }

5.1 创立客户端 NioSocketChannel

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {SocketChannel ch = SocketUtils.accept(javaChannel());

        try {if (ch != null) {buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {......... 省略.......}

        return 0;
    }

}

这里会依据 ServerSocketChannelaccept办法获取到 JDK NIO 原生SocketChannel(用于底层真正与客户端通信的 Channel),来创立 Netty 中的NioSocketChannel

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {public NioSocketChannel(Channel parent, SocketChannel socket) {super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }

}

创立客户端 NioSocketChannel 的过程其实和之前讲的创立服务端 NioServerSocketChannel 大体流程是一样的,咱们这里只对客户端 NioSocketChannel 和服务端 NioServerSocketChannel 在创立过程中的不同之处做一个比照。

具体细节局部大家能够在回看下《具体图解 Netty Reactor 启动全流程》一文中对于 NioServerSocketChannel 的创立的具体细节。

5.3 比照 NioSocketChannel 与 NioServerSocketChannel 的不同

1:Channel 的档次不同

在咱们介绍 Reactor 的创立文章中,咱们提到 Netty 中的 Channel 是具备档次的。因为客户端 NioSocketChannel 是在 main reactor 接管连贯时在服务端 NioServerSocketChannel 中被创立的,所以在创立客户端 NioSocketChannel 的时候会通过构造函数指定了 parent 属性为 NioServerSocketChanel。并将JDK NIO 原生SocketChannel封装进 Netty 的客户端 NioSocketChannel 中。

而在 Reactor 启动过程中创立 NioServerSocketChannel 的时候 parent 属性 指定是null。因为它就是顶层的Channel,负责创立客户端NioSocketChannel

    public NioServerSocketChannel(ServerSocketChannel channel) {super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

2:向 Reactor 注册的 IO 事件不同

客户端 NioSocketChannel 向 Sub Reactor 注册的是SelectionKey.OP_READ 事件,而服务端 NioServerSocketChannel 向 Main Reactor 注册的是SelectionKey.OP_ACCEPT 事件

public abstract class AbstractNioByteChannel extends AbstractNioChannel {protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {super(parent, ch, SelectionKey.OP_READ);
    }

}

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {public NioServerSocketChannel(ServerSocketChannel channel) {
        // 父类 AbstractNioChannel 中保留 JDK NIO 原生 ServerSocketChannel 以及要监听的事件 OP_ACCEPT
        super(null, channel, SelectionKey.OP_ACCEPT);
        //DefaultChannelConfig 中设置用于 Channel 接收数据用的 buffer->AdaptiveRecvByteBufAllocator
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
}

3: 功能属性不同造成继承构造的不同

客户端 NioSocketChannel 继承的是 AbstractNioByteChannel,而服务端NioServerSocketChannel 继承的是 AbstractNioMessageChannel
它们继承的这两个抽象类一个前缀是 Byte,一个前缀是Message 有什么区别吗??

客户端 NioSocketChannel 次要解决的是服务端与客户端的通信,这里波及到接管客户端发送来的数据,而 Sub Reactor 线程NioSocketChannel中读取的正是网络通信数据单位为Byte

服务端 NioServerSocketChannel 次要负责解决 OP_ACCEPT 事件,创立用于通信的客户端NioSocketChannel。这时候客户端与服务端还没开始通信,所以Main Reactor 线程NioServerSocketChannel的读取对象为 Message。这里的Message 指的就是底层的 SocketChannel 客户端连贯。


以上就是 NioSocketChannelNioServerSocketChannel创立过程中的不同之处,前面的过程就一样了。

  • 在 AbstractNioChannel 类中封装 JDK NIO 原生的 SocketChannel,并将其底层的 IO 模型设置为 非阻塞,保留须要监听的 IO 事件OP_READ
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            // 设置 Channel 为非阻塞 配合 IO 多路复用模型
            ch.configureBlocking(false);
        } catch (IOException e) {}}
  • 为客户端 NioSocketChannel 创立全局惟一的channelId,创立客户端 NioSocketChannel 的底层操作类NioByteUnsafe,创立 pipeline。
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        //channel 全局惟一 ID machineId+processId+sequence+timestamp+random
        id = newId();
        //unsafe 用于底层 socket 的读写操作
        unsafe = newUnsafe();
        // 为 channel 调配独立的 pipeline 用于 IO 事件编排
        pipeline = newChannelPipeline();}
  • 在 NioSocketChannelConfig 的创立过程中,将 NioSocketChannel 的 RecvByteBufAllocator 类型设置为AdaptiveRecvByteBufAllocator
    public DefaultChannelConfig(Channel channel) {this(channel, new AdaptiveRecvByteBufAllocator());
    }

在 Bug 修复后的版本中服务端 NioServerSocketChannel 的 RecvByteBufAllocator 类型设置为ServerChannelRecvByteBufAllocator

最终咱们失去的客户端 NioSocketChannel 构造如下:

6. ChannelRead 事件的响应

在前边介绍接管连贯的整体外围流程框架的时候,咱们提到 main reactor 线程是在一个 do{.....}while(...) 循环 read loop 中一直的调用 ServerSocketChannel#accept 办法来接管客户端的连贯。

当满足退出 read loop 循环的条件有两个:

  1. 在限定的 16 次读取中,曾经没有新的客户端连贯要接管了。退出循环。
  2. 从 NioServerSocketChannel 中读取客户端连贯的次数达到了 16 次,无论此时是否还有客户端连贯都须要退出循环。

main reactor 就会退出 read loop 循环,此时接管到的客户端连贯 NioSocketChannel 暂存与 List<Object> readBuf 汇合中。


    private final class NioMessageUnsafe extends AbstractNioUnsafe {private final List<Object> readBuf = new ArrayList<Object>();

        @Override
        public void read() {
            try {
                try {
                    do {
                        ........ 省略.........
                        // 底层调用 NioServerSocketChannel->doReadMessages 创立客户端 SocketChannel
                        int localRead = doReadMessages(readBuf);
                        ........ 省略.........
                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());

                } catch (Throwable t) {exception = t;}

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                
                  ........ 省略.........
            } finally {........ 省略.........}
        }
    }

随后 main reactor 线程会遍历 List<Object> readBuf 汇合中的 NioSocketChannel,并在 NioServerSocketChannel 的 pipeline 中流传 ChannelRead 事件。

最终 ChannelRead 事件 会流传到 ServerBootstrapAcceptor 中,这里正是 Netty 解决客户端连贯的外围逻辑所在。

ServerBootstrapAcceptor 次要的作用就是初始化客户端 NioSocketChannel,并将客户端 NioSocketChannel 注册到Sub Reactor Group 中,并监听OP_READ 事件

在 ServerBootstrapAcceptor 中会初始化客户端 NioSocketChannel 的这些属性。

比方:从 Reactor 组 EventLoopGroup childGroup,用于初始化NioSocketChannel 中的 pipeline 用到的 ChannelHandler childHandler,以及NioSocketChannel 中的一些 childOptions childAttrs

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

        private final EventLoopGroup childGroup;
        private final ChannelHandler childHandler;
        private final Entry<ChannelOption<?>, Object>[] childOptions;
        private final Entry<AttributeKey<?>, Object>[] childAttrs;

        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;

            // 向客户端 NioSocketChannel 的 pipeline 中
            // 增加在启动配置类 ServerBootstrap 中配置的 ChannelHandler
            child.pipeline().addLast(childHandler);

            // 利用配置的属性初始化客户端 NioSocketChannel
            setChannelOptions(child, childOptions, logger);
            setAttributes(child, childAttrs);

            try {
                /**
                 * 1:在 Sub Reactor 线程组中抉择一个 Reactor 绑定
                 * 2:将客户端 SocketChannel 注册到绑定的 Reactor 上
                 * 3:SocketChannel 注册到 sub reactor 中的 selector 上,并监听 OP_READ 事件
                 * */
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {forceClose(child, t);
            }
        }
}

正是在这里,netty 会将咱们在《具体图解 Netty Reactor 启动全流程》的启动示例程序中在 ServerBootstrap 中配置的客户端 NioSocketChannel 的所有属性(child 前缀配置)初始化到 NioSocketChannel 中。

public final class EchoServer {static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure the server.
        // 创立主从 Reactor 线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)// 配置主从 Reactor
             .channel(NioServerSocketChannel.class)// 配置主 Reactor 中的 channel 类型
             .option(ChannelOption.SO_BACKLOG, 100)// 设置主 Reactor 中 channel 的 option 选项
             .handler(new LoggingHandler(LogLevel.INFO))// 设置主 Reactor 中 Channel->pipline->handler
             .childHandler(new ChannelInitializer<SocketChannel>() {// 设置从 Reactor 中注册 channel 的 pipeline
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // Start the server. 绑定端口启动服务,开始监听 accept 事件
            ChannelFuture f = b.bind(PORT).sync();
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();} finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();}
    }
}

以上示例代码中通过 ServerBootstrap 配置的 NioSocketChannel 相干属性,会在 Netty 启动并开始初始化 NioServerSocketChannel 的时候将 ServerBootstrapAcceptor 的创立初始化工作封装成 异步工作 ,而后在NioServerSocketChannel 注册到 Main Reactor 中胜利后执行。

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

    @Override
    void init(Channel channel) {
        ................ 省略................

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline();
                ................ 省略................
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
}

在通过 ServerBootstrapAccptor#chanelRead 回调 的解决之后,此时客户端 NioSocketChannel 中 pipeline 的构造为:

随后会将初始化好的客户端 NioSocketChannel 向 Sub Reactor Group 中注册,并监听OP_READ 事件

如下图中的步骤 3 所示:

7. 向 SubReactorGroup 中注册 NioSocketChannel

                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());
                        }
                    }
                });

客户端 NioSocketChannel 向 Sub Reactor Group 注册的流程齐全和服务端 NioServerSocketChannel 向 Main Reactor Group 注册流程一样。

对于服务端 NioServerSocketChannel 的注册流程,笔者曾经在《具体图解 Netty Reactor 启动全流程》一文中做出了具体的介绍,对相干细节感兴趣的同学能够在回看下。

这里笔者在带大家简要回顾下整个注册过程并着重区别比照客户端 NioSocetChannel 与服务端 NioServerSocketChannel 注册过程中不同的中央。

7.1 从 Sub Reactor Group 中选取一个 Sub Reactor 进行绑定

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

   @Override
    public ChannelFuture register(Channel channel) {return next().register(channel);
    }

    @Override
    public EventExecutor next() {return chooser.next();
    }

}

7.2 向绑定的 Sub Reactor 上注册 NioSocketChannel

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    @Override
    public ChannelFuture register(Channel channel) {
        // 注册 channel 到绑定的 Reactor 上
        return register(new DefaultChannelPromise(channel, this));
    }

    @Override
    public ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");
        //unsafe 负责 channel 底层的各种操作
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

}
  • 过后咱们在介绍 NioServerSocketChannel 的注册过程时,这里的 promise.channel()NioServerSocketChannel。底层的 unsafe 操作类为NioMessageUnsafe
  • 此时这里的 promise.channel()NioSocketChannel。底层的 unsafe 操作类为NioByteUnsafe
        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            .............. 省略....................
            // 此时这里的 eventLoop 为 Sub Reactor
            AbstractChannel.this.eventLoop = eventLoop;

            /**
             * 执行 channel 注册的操作必须是 Reactor 线程来实现
             *
             * 1: 如果以后执行线程是 Reactor 线程,则间接执行 register0 进行注册
             * 2:如果以后执行线程是内部线程,则须要将 register0 注册操作 封装程异步 Task 由 Reactor 线程执行
             * */
            if (eventLoop.inEventLoop()) {register0(promise);
            } else {
                try {eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {register0(promise);
                        }
                    });
                } catch (Throwable t) {.............. 省略....................}
            }
        }

留神此时传递进来的 EventLoop eventLoop 为 Sub Reactor

但此时的执行线程为Main Reactor 线程,并不是 Sub Reactor 线程(此时还未启动)

所以这里的 eventLoop.inEventLoop() 返回的是false

else 分支 中向绑定的 Sub Reactor 提交注册 NioSocketChannel 的工作。

当注册工作提交后,此时绑定的 Sub Reactor 线程 启动。

7.3 register0

咱们又来到了 Channel 注册的老中央 register0 办法。在《具体图解 Netty Reactor 启动全流程》中咱们花了大量的篇幅介绍了这个办法。这里咱们只比照NioSocketChannelNioServerSocketChannel不同的中央。

 private void register0(ChannelPromise promise) {
            try {
                ................ 省略..................
                boolean firstRegistration = neverRegistered;
                // 执行真正的注册操作
                doRegister();
                // 批改注册状态
                neverRegistered = false;
                registered = true;

                pipeline.invokeHandlerAddedIfNeeded();

                if (isActive()) {if (firstRegistration) {
                        // 触发 channelActive 事件
                        pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();
                    }
                }
            } catch (Throwable t) {................ 省略..................}
        }

这里 doRegister()办法 将 NioSocketChannel 注册到 Sub Reactor 中的 Selector 上。

public abstract class AbstractNioChannel extends AbstractChannel {

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {............... 省略...............}
        }
    }

}

这里是 Netty 客户端 NioSocketChannel 与 JDK NIO 原生 SocketChannel 关联的中央。此时注册的 IO 事件 仍然是0。目标也是只是为了获取 NioSocketChannel 在 Selector 中的SelectionKey

同时通过 SelectableChannel#register 办法将 Netty 自定义的 NioSocketChannel(这里的 this 指针)附着在 SelectionKey 的 attechment 属性上,实现 Netty 自定义 Channel 与 JDK NIO Channel 的关系绑定。这样在每次对 Selector 进行 IO 就绪事件轮询时,Netty 都能够从 JDK NIO Selector 返回的 SelectionKey 中获取到自定义的 Channel 对象(这里指的就是 NioSocketChannel)。

随后调用 pipeline.invokeHandlerAddedIfNeeded() 回调客户端 NioSocketChannel 上 pipeline 中的所有 ChannelHandler 的 handlerAdded 办法,此时pipeline 的构造中只有一个 ChannelInitializer。最终会在ChannelInitializer#handlerAdded 回调办法中初始化客户端 NioSocketChannelpipeline

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isRegistered()) {if (initChannel(ctx)) {
                // 初始化工作实现后,须要将本身从 pipeline 中移除
                removeState(ctx);
            }
        }
    }

    protected abstract void initChannel(C ch) throws Exception;
}

对于对 Channel 中 pipeline 的具体初始化过程,对细节局部感兴趣的同学能够回看下《具体图解 Netty Reactor 启动全流程》

此时客户端 NioSocketChannel 中的 pipeline 中的构造就变为了咱们自定义的样子,在示例代码中咱们自定义的 ChannelHandlerEchoServerHandler

@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();}
}

当客户端 NioSocketChannel 中的 pipeline 初始化结束后,netty 就开始调用 safeSetSuccess(promise) 办法 回调 regFuture 中注册的ChannelFutureListener,告诉客户端 NioSocketChannel 曾经胜利注册到 Sub Reactor 上了。

               childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());
                        }
                    }
                });

在服务端 NioServerSocketChannel 注册的时候咱们会在 listener 中向 Main Reactor 提交 bind 绑定端口地址工作。然而在NioSocketChannel 注册的时候,只会在 listener 中解决一下注册失败的状况。

当 Sub Reactor 线程告诉 ChannelFutureListener 注册胜利之后,随后就会调用 pipeline.fireChannelRegistered() 在客户端 NioSocketChannel 的 pipeline 中流传ChannelRegistered 事件

这里笔者重点要强调下 ,在之前介绍 NioServerSocketChannel 注册的时候,咱们提到因为此时 NioServerSocketChannel 并未绑定端口地址,所以这时的 NioServerSocketChannel 并未激活,这里的isActive() 返回 falseregister0 办法 间接返回。

服务端 NioServerSocketChannel 判断是否激活的规范为端口是否绑定胜利。

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {
    @Override
    public boolean isActive() {return isOpen() && javaChannel().socket().isBound();}
}

客户端 NioSocketChannel 判断是否激活的规范为是否处于 Connected 状态。那么显然这里必定是处于connected 状态 的。

    @Override
    public boolean isActive() {SocketChannel ch = javaChannel();
        return ch.isOpen() && ch.isConnected();
    }

NioSocketChannel曾经处于 connected 状态,这里并不需要绑定端口,所以这里的isActive() 返回true

           if (isActive()) {
                    /**
                     * 客户端 SocketChannel 注册胜利后会走这里,在 channelActive 事件回调中注册 OP_READ 事件
                     * */
                    if (firstRegistration) {
                        // 触发 channelActive 事件
                        pipeline.fireChannelActive();} else if (config().isAutoRead()) {....... 省略..........}
                }
            }

最初调用 pipeline.fireChannelActive() 在 NioSocketChannel 中的 pipeline 流传 ChannelActive 事件,最终在pipeline 的头结点 HeadContext 中响应并注册 OP_READ 事件Sub Reactor中的 Selector 上。

public abstract class AbstractNioChannel extends AbstractChannel { {

    @Override
    protected void doBeginRead() throws Exception {
        .............. 省略................

        final int interestOps = selectionKey.interestOps();
        /**
         * 1:ServerSocketChannel 初始化时 readInterestOp 设置的是 OP_ACCEPT 事件
         * 2:SocketChannel 初始化时 readInterestOp 设置的是 OP_READ 事件
         * */
        if ((interestOps & readInterestOp) == 0) {
            // 注册监听 OP_ACCEPT 或者 OP_READ 事件
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

}

留神这里的 readInterestOp 为客户端 NioSocketChannel 在初始化时设置的OP_READ 事件


到这里,Netty 中的 Main Reactor 接管连贯的整个流程,咱们就介绍完了,此时 Netty 中主从 Reactor 组的构造就变为:

总结

本文咱们介绍了 NioServerSocketChannel 解决客户端连贯事件的整个过程。

  • 接管连贯的整个解决框架。
  • 影响 Netty 接管连贯吞吐的 Bug 产生的起因,以及修复的计划。
  • 创立并初始化客户端NioSocketChannel
  • 初始化 NioSocketChannel 中的pipeline
  • 客户端 NioSocketChannelSub Reactor注册的过程

其中咱们也比照了 NioServerSocketChannelNioSocketChannel在创立初始化以及前面向 Reactor 注册过程中的差别之处。

当客户端 NioSocketChannel 接管结束并向 Sub Reactor 注册胜利后,那么接下来 Sub Reactor 就开始监听注册其上的所有客户端 NioSocketChannelOP_READ 事件,并期待客户端向服务端发送网络数据。

前面 Reactor 的配角就该变为 Sub Reactor 以及注册在其上的客户端 NioSocketChannel 了。

下篇文章,咱们将会探讨 Netty 是如何接管网络数据的~~ 咱们下篇文章见

退出移动版