关于netty:抓到-Netty-一个-Bug-顺带来透彻地聊一下-Netty-是如何高效接收网络连接的

欢送关注公众号:bin的技术小屋,浏览公众号原文

本系列Netty源码解析文章基于 4.1.56.Final版本

对于一个高性能网络通讯框架来说,最最重要也是最外围的工作就是如何高效的接管客户端连贯,这就好比咱们开了一个饭店,那么迎接客人就是饭店最重要的工作,咱们要先把客人迎接进来,不能让客人一看人多就走掉,只有客人进来了,哪怕菜做的慢一点也没关系。

本文笔者就来为大家介绍下netty这块最外围的内容,看看netty是如何高效的接管客户端连贯的。

下图为笔者在一个月黑风高天空显得那么高深边远的夜晚,闲来无事,于是捧起Netty对于如何接管连贯这部分源码细细品读的时候,意外的发现了一个影响Netty接管连贯吞吐的一个Bug。

于是笔者就在Github提了一个Issue#11708,论述了下这个Bug产生的起因以及导致的后果并和Netty的作者一起探讨了下修复措施。如上图所示。

Issue#11708:https://github.com/netty/nett…

这里先不具体解释这个Issue,也不倡议大家当初就关上这个Issue查看,笔者会在本文的介绍中随着源码深刻的解读缓缓的为大家一层一层地拨开迷雾。

之所以在文章的结尾把这个拎进去,笔者是想让大家带着狐疑,扫视,观赏,崇拜,敬畏的态度来一起品读世界顶级程序员编写的代码。由衷的感激他们在这一畛域做出的奉献。

好了,问题抛出来后,咱们就带着这个疑难来开始本文的内容吧~~~

前文回顾

依照老规矩,再开始本文的内容之前,咱们先来回顾下前边几篇文章的概要内容帮忙大家梳理一个框架全貌进去。

笔者这里再次想和读者敌人们强调的是本文能够独立观看,并不依赖前边系列文章的内容,只是大家如果对相干细节局部感兴趣的话,能够在浏览完本文之后在去回看相干文章。

在前边的系列文章中,笔者为大家介绍了驱动Netty整个框架运行的外围引擎Reactor的创立,启动,运行的全流程。从当初开始Netty的整个外围框架就开始运转起来开始工作了,本文要介绍的次要内容就是Netty在启动之后要做的第一件事件:监听端口地址,高效接管客户端连贯。

在《聊聊Netty那些事儿之从内核角度看IO模型》一文中,咱们是从整个网络框架的基石IO模型的角度整体论述了下Netty的IO线程模型。

而Netty中的Reactor正是IO线程在Netty中的模型定义。Reactor在Netty中是以Group的模式呈现的,分为:

  • 主Reactor线程组也就是咱们在启动代码中配置的EventLoopGroup bossGroup,main reactor group中的reactor次要负责监听客户端连贯事件,高效的解决客户端连贯。也是本文咱们要介绍的重点。
  • 从Reactor线程组也就是咱们在启动代码中配置的EventLoopGroup workerGroup,sub reactor group中的reactor次要负责解决客户端连贯上的IO事件,以及异步工作的执行。

最初咱们得出Netty的整个IO模型如下:

本文咱们探讨的重点就是MainReactorGroup的外围工作上图中所示的步骤1,步骤2,步骤3。

在从整体上介绍完Netty的IO模型之后,咱们又在《Reactor在Netty中的实现(创立篇)》中残缺的介绍了Netty框架的骨架主从Reactor组的搭建过程,论述了Reactor是如何被创立进去的,并介绍了它的外围组件如下图所示:

  • thread即为Reactor中的IO线程,次要负责监听IO事件,解决IO工作,执行异步工作。
  • selector则是JDK NIO对操作系统底层IO多路复用技术实现的封装。用于监听IO就绪事件。
  • taskQueue用于保留Reactor须要执行的异步工作,这些异步工作能够由用户在业务线程中向Reactor提交,也能够是Netty框架提交的一些本身外围的工作。
  • scheduledTaskQueue则是保留Reactor中执行的定时工作。代替了原有的工夫轮来执行延时工作。
  • tailQueue保留了在Reactor须要执行的一些尾部收尾工作,在一般工作执行完后 Reactor线程会执行尾部工作,比方对Netty 的运行状态做一些统计数据,例如工作循环的耗时、占用物理内存的大小等等

在骨架搭建结束之后,咱们随后又在在《具体图解Netty Reactor启动全流程》》一文中介绍了本文的配角服务端NioServerSocketChannel的创立,初始化,绑定端口地址,向main reactor注册监听OP_ACCEPT事件的残缺过程

main reactor如何解决OP_ACCEPT事件将会是本文的次要内容。

自此Netty框架的main reactor group曾经启动结束,开始筹备监听OP_accept事件,当客户端连贯上来之后,OP_ACCEPT事件沉闷,main reactor开始解决OP_ACCEPT事件接管客户端连贯了。

而netty中的IO事件分为:OP_ACCEPT事件,OP_READ事件,OP_WRITE事件和OP_CONNECT事件,netty对于IO事件的监听和解决对立封装在Reactor模型中,这四个IO事件的处理过程也是咱们后续文章中要独自拿进去介绍的,本文咱们聚焦OP_ACCEPT事件的解决。

而为了让大家可能对IO事件的解决有一个完整性的意识,笔者写了《一文聊透Netty外围引擎Reactor的运行架构》这篇文章,在文章中具体介绍了Reactor线程的整体运行框架。

Reactor线程会在一个死循环中996不停的运行,在循环中会一直的轮询监听Selector上的IO事件,当IO事件沉闷后,Reactor从Selector上被唤醒转去执行IO就绪事件的解决,在这个过程中咱们引出了上述四种IO事件的解决入口函数。

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        //获取Channel的底层操作类Unsafe
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            ......如果SelectionKey曾经生效则敞开对应的Channel......
        }

        try {
            //获取IO就绪事件
            int readyOps = k.readyOps();
            //解决Connect事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                //移除对Connect事件的监听,否则Selector会始终告诉
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                //触发channelActive事件处理Connect事件
                unsafe.finishConnect();
            }

            //解决Write事件
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }

             //解决Read事件或者Accept事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

本文笔者将会为大家重点介绍OP_ACCEPT事件的解决入口函数unsafe.read()的整个源码实现。

当客户端连贯实现三次握手之后,main reactor中的selector产生OP_ACCEPT事件沉闷,main reactor随即被唤醒,来到了OP_ACCEPT事件的解决入口函数开始接管客户端连贯。

1. Main Reactor解决OP_ACCEPT事件

Main Reactor轮询到NioServerSocketChannel上的OP_ACCEPT事件就绪时,Main Reactor线程就会从JDK Selector上的阻塞轮询APIselector.select(timeoutMillis)调用中返回。转而去解决NioServerSocketChannel上的OP_ACCEPT事件

public final class NioEventLoop extends SingleThreadEventLoop {

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        ..............省略.................

        try {
            int readyOps = k.readyOps();

            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
               ..............解决OP_CONNECT事件.................
            }


            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
              ..............解决OP_WRITE事件.................
            }


            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                //本文重点解决OP_ACCEPT事件
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

}
  • 解决IO就绪事件的入口函数processSelectedKey 中的参数AbstractNioChannel ch正是Netty服务端NioServerSocketChannel。因为此时的执行线程为main reactor线程,而main reactor上注册的正是netty服务端NioServerSocketChannel负责监听端口地址,接管客户端连贯。
  • 通过ch.unsafe()获取到的NioUnsafe操作类正是NioServerSocketChannel中对底层JDK NIO ServerSocketChannel的Unsafe底层操作类。

Unsafe接口是Netty对Channel底层操作行为的封装,比方NioServerSocketChannel的底层Unsafe操作类干的事件就是绑定端口地址解决OP_ACCEPT事件

这里咱们看到,Netty将OP_ACCEPT事件解决的入口函数封装在NioServerSocketChannel里的底层操作类Unsafe的read办法中。

而NioServerSocketChannel中的Unsafe操作类实现类型为NioMessageUnsafe定义在上图继承构造中的AbstractNioMessageChannel父类中

上面咱们到NioMessageUnsafe#read办法中来看下Netty对OP_ACCPET事件的具体处理过程:

2. 接管客户端连贯外围流程框架总览

咱们还是依照老规矩,先从整体上把整个OP_ACCEPT事件的逻辑解决框架提取进去,让大家先总体仰视下流程全貌,而后在针对每个外围点位进行各个击破。

main reactor线程是在一个do...while{...}循环read loop中一直的调用JDK NIO serverSocketChannel.accept()办法来接管实现三次握手的客户端连贯NioSocketChannel的,并将接管到的客户端连贯NioSocketChannel长期保留在List<Object> readBuf汇合中,后续会服务端NioServerSocketChannel的pipeline中通过ChannelRead事件来传递,最终会在ServerBootstrapAcceptor这个ChannelHandler中被解决初始化,并将其注册到Sub Reator Group中。

这里的read loop循环会被限定只能读取16次,当main reactor从NioServerSocketChannel中读取客户端连贯NioSocketChannel的次数达到16次之后,无论此时是否还有客户端连贯都不能在持续读取了。

因为咱们在《一文聊透Netty外围引擎Reactor的运行架构》一文中提到,netty对reactor线程压迫的比拟狠,要干的事件很多,除了要监听轮询IO就绪事件,解决IO就绪事件,还须要执行用户和netty框架本省提交的异步工作和定时工作。

所以这里的main reactor线程不能在read loop中无限度的执行上来,因为还须要调配工夫去执行异步工作,不能因为无限度的接管客户端连贯而耽搁了异步工作的执行。所以这里将read loop的循环次数限定为16次。

如果main reactor线程在read loop中读取客户端连贯NioSocketChannel的次数曾经满了16次,即便此时还有客户端连贯未接管,那么main reactor线程也不会再去接管了,而是转去执行异步工作,当异步工作执行结束后,还会在回来执行残余接管连贯的工作。

main reactor线程退出read loop循环的条件有两个:

  1. 在限定的16次读取中,曾经没有新的客户端连贯要接管了。退出循环。
  2. 从NioServerSocketChannel中读取客户端连贯的次数达到了16次,无论此时是否还有客户端连贯都须要退出循环。

以上就是Netty在接管客户端连贯时的整体外围逻辑,上面笔者将这部分逻辑的外围源码实现框架提取进去,不便大家根据上述外围逻辑与源码中的解决模块对应起来,还是那句话,这里只须要总体把握外围解决流程,不须要读懂每一行代码,笔者会在文章的后边分模块来各个击破它们。

public abstract class AbstractNioMessageChannel extends AbstractNioChannel {

  private final class NioMessageUnsafe extends AbstractNioUnsafe {

        //寄存连贯建设后,创立的客户端SocketChannel
        private final List<Object> readBuf = new ArrayList<Object>();

        @Override
        public void read() {
            //必须在Main Reactor线程中执行
            assert eventLoop().inEventLoop();
            //留神上面的config和pipeline都是服务端ServerSocketChannel中的
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            //创立接收数据Buffer分配器(用于调配容量大小适合的byteBuffer用来包容接收数据)
            //在接管连贯的场景中,这里的allocHandle只是用于管制read loop的循环读取创立连贯的次数。
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        //底层调用NioServerSocketChannel->doReadMessages 创立客户端SocketChannel
                        int localRead = doReadMessages(readBuf);

                        //已无新的连贯可接管则退出read loop
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
                        //统计在以后事件循环中曾经读取到得Message数量(创立连贯的个数)
                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());//判断是否曾经读满16次
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    //在NioServerSocketChannel对应的pipeline中流传ChannelRead事件
                    //初始化客户端SocketChannel,并将其绑定到Sub Reactor线程组中的一个Reactor上
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                //革除本次accept 创立的客户端SocketChannel汇合
                readBuf.clear();
                allocHandle.readComplete();
                //触发readComplete事件流传
                pipeline.fireChannelReadComplete();
                ....................省略............
            } finally {
                ....................省略............
            }
        }
    }
  }
}

这里首先要通过断言 assert eventLoop().inEventLoop()确保解决接管客户端连贯的线程必须为Main Reactor 线程。

而main reactor中次要注册的是服务端NioServerSocketChannel,次要负责解决OP_ACCEPT事件,所以以后main reactor线程是在NioServerSocketChannel中执行接管连贯的工作。

所以这里咱们通过config()获取到的是NioServerSocketChannel的属性配置类NioServerSocketChannelConfig,它是在Reactor的启动阶段被创立进去的。

    public NioServerSocketChannel(ServerSocketChannel channel) {
        //父类AbstractNioChannel中保留JDK NIO原生ServerSocketChannel以及要监听的事件OP_ACCEPT
        super(null, channel, SelectionKey.OP_ACCEPT);
        //DefaultChannelConfig中设置用于Channel接收数据用的buffer->AdaptiveRecvByteBufAllocator
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

同理这里通过pipeline()获取到的也是NioServerSocketChannel中的pipeline。它会在NioServerSocketChannel向main reactor注册胜利之后被初始化。

前边提到main reactor线程会被限定只能在read loop中向NioServerSocketChannel读取16次客户端连贯,所以在开始read loop之前,咱们须要创立一个可能保留记录读取次数的对象,在每次read loop循环之后,能够依据这个对象来判断是否完结read loop。

这个对象就是这里的 RecvByteBufAllocator.Handle allocHandle 专门用于统计read loop中接管客户端连贯的次数,以及判断是否该完结read loop转去执行异步工作。

当这所有准备就绪之后,main reactor线程就开始在do{....}while(...)循环中接管客户端连贯了。

在 read loop中通过调用doReadMessages函数接管实现三次握手的客户端连贯,底层会调用到JDK NIO ServerSocketChannel的accept办法,从内核全连贯队列中取出客户端连贯。

返回值localRead 示意接管到了多少客户端连贯,客户端连贯通过accept办法只会一个一个的接管,所以这里的localRead 失常状况下都会返回1,当localRead <= 0时意味着曾经没有新的客户端连贯能够接管了,本次main reactor接管客户端的工作到这里就完结了,跳出read loop。开始新的一轮IO事件的监听解决。

    public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
                @Override
                public SocketChannel run() throws IOException {
                    return serverSocketChannel.accept();
                }
            });
        } catch (PrivilegedActionException e) {
            throw (IOException) e.getCause();
        }
    }

随后会将接管到的客户端连贯占时寄存到 List<Object> readBuf 汇合中。

  private final class NioMessageUnsafe extends AbstractNioUnsafe {

        //寄存连贯建设后,创立的客户端SocketChannel
        private final List<Object> readBuf = new ArrayList<Object>();
}

调用allocHandle.incMessagesRead统计本次事件循环中接管到的客户端连贯个数,最初在read loop开端通过allocHandle.continueReading判断是否达到了限定的16次。从而决定main reactor线程是持续接管客户端连贯还是转去执行异步工作。

main reactor线程退出read loop的两个条件:

  1. 在限定的16次读取中,曾经没有新的客户端连贯要接管了。退出循环。
  2. 从NioServerSocketChannel中读取客户端连贯的次数达到了16次,无论此时是否还有客户端连贯都须要退出循环。

当满足以上两个退出条件时,main reactor线程就会退出read loop,因为在read loop中接管到的客户端连贯全副暂存在List<Object> readBuf 汇合中,随后开始遍历readBuf,在NioServerSocketChannel的pipeline中流传ChannelRead事件。

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    //NioServerSocketChannel对应的pipeline中流传read事件
                    //io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor.channelRead
                    //初始化客户端SocketChannel,并将其绑定到Sub Reactor线程组中的一个Reactor上
                    pipeline.fireChannelRead(readBuf.get(i));
                }

最终pipeline中的ChannelHandler(ServerBootstrapAcceptor)会响应ChannelRead事件,并在相应回调函数中初始化客户端NioSocketChannel,并将其注册到Sub Reactor Group中。尔后客户端NioSocketChannel绑定到的sub reactor就开始监听解决客户端连贯上的读写事件了。

Netty整个接管客户端的逻辑过程如下图步骤1,2,3所示。

以上内容就是笔者提取进去的整体流程框架,上面咱们来将其中波及到的重要外围模块拆开,一个一个具体解读下。

3. RecvByteBufAllocator简介

Reactor在解决对应Channel上的IO数据时,都会采纳一个ByteBuffer来接管Channel上的IO数据。而本大节要介绍的RecvByteBufAllocator正是用来调配ByteBuffer的一个分配器。

还记得这个RecvByteBufAllocator 在哪里被创立的吗??

在《聊聊Netty那些事儿之Reactor在Netty中的实现(创立篇)》一文中,在介绍NioServerSocketChannel的创立过程中提到,对应Channel的配置类NioServerSocketChannelConfig也会随着NioServerSocketChannel的创立而创立。

    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

在创立NioServerSocketChannelConfig的过程中会创立RecvByteBufAllocator

   public DefaultChannelConfig(Channel channel) {
            this(channel, new AdaptiveRecvByteBufAllocator());
    }

这里咱们看到NioServerSocketChannel中的RecvByteBufAllocator理论类型为AdaptiveRecvByteBufAllocator,顾名思义,这个类型的RecvByteBufAllocator能够依据Channel上每次到来的IO数据大小来自适应动静调整ByteBuffer的容量。

对于服务端NioServerSocketChannel来说,它上边的IO数据就是客户端的连贯,它的长度和类型都是固定的,所以在接管客户端连贯的时候并不需要这样的一个ByteBuffer来接管,咱们会将接管到的客户端连贯寄存在List<Object> readBuf汇合中

对于客户端NioSocketChannel来说,它上边的IO数据时客户端发送来的网络数据,长度是不定的,所以才会须要这样一个能够依据每次IO数据的大小来自适应动静调整容量的ByteBuffer来接管。

那么看起来这个RecvByteBufAllocator和本文的主题不是很关联,因为在接管连贯的过程中并不会怎么用到它,这个类笔者还会在前面的文章中具体介绍,之所以这里把它拎进去独自介绍是因为它和本文结尾提到的Bug有关系,这个Bug就是由这个类引起的。

3.1 RecvByteBufAllocator.Handle的获取

在本文中,咱们是通过NioServerSocketChannel中的unsafe底层操作类来获取RecvByteBufAllocator.Handle的

final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
protected abstract class AbstractUnsafe implements Unsafe {
        @Override
        public RecvByteBufAllocator.Handle recvBufAllocHandle() {
            if (recvHandle == null) {
                recvHandle = config().getRecvByteBufAllocator().newHandle();
            }
            return recvHandle;
        }
}

咱们看到最终会在NioServerSocketChannel的配置类NioServerSocketChannelConfig中获取到AdaptiveRecvByteBufAllocator

public class DefaultChannelConfig implements ChannelConfig {
    //用于Channel接收数据用的buffer分配器  类型为AdaptiveRecvByteBufAllocator
    private volatile RecvByteBufAllocator rcvBufAllocator;
}

AdaptiveRecvByteBufAllocator 中会创立自适应动静调整容量的ByteBuffer分配器。

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

    @Override
    public Handle newHandle() {
        return new HandleImpl(minIndex, maxIndex, initial);
    }
    
    private final class HandleImpl extends MaxMessageHandle {
                  .................省略................
    }
}

这里的newHandle办法返回的具体类型为MaxMessageHandle ,这个MaxMessageHandle里边保留了每次从Channel中读取IO数据的容量指标,不便下次读取时调配适合大小的buffer

每次在应用allocHandle 前须要调用allocHandle.reset(config);重置里边的统计指标。

    public abstract class MaxMessageHandle implements ExtendedHandle {
        private ChannelConfig config;
        //每次事件轮询时,最多读取16次
        private int maxMessagePerRead;
        //本次事件轮询总共读取的message数,这里指的是接管连贯的数量
        private int totalMessages;
        //本次事件轮询总共读取的字节数
        private int totalBytesRead;

       @Override
        public void reset(ChannelConfig config) {
            this.config = config;
            //默认每次最多读取16次
            maxMessagePerRead = maxMessagesPerRead();
            totalMessages = totalBytesRead = 0;
        }
    }
  • maxMessagePerRead:用于管制每次read loop里最大能够循环读取的次数,默认为16次,可在启动配置类ServerBootstrap中通过ChannelOption.MAX_MESSAGES_PER_READ选项设置。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
  .channel(NioServerSocketChannel.class)
  .option(ChannelOption.MAX_MESSAGES_PER_READ, 自定义次数)
  • totalMessages:用于统计read loop中总共接管的连贯个数,每次read loop循环后会调用allocHandle.incMessagesRead减少记录接管到的连贯个数。
        @Override
        public final void incMessagesRead(int amt) {
            totalMessages += amt;
        }
  • totalBytesRead:用于统计在read loop中总共接管到客户端连贯上的数据大小,这个字段次要用于sub reactor在接管客户端NioSocketChannel上的网络数据用的,本文咱们介绍的是main reactor接管客户端连贯,所以这里并不会用到这个字段。这个字段会在sub reactor每次读取完NioSocketChannel上的网络数据时减少记录。
        @Override
        public void lastBytesRead(int bytes) {
            lastBytesRead = bytes;
            if (bytes > 0) {
                totalBytesRead += bytes;
            }
        }

MaxMessageHandler中还有一个十分重要的办法就是在每次read loop开端会调用allocHandle.continueReading()办法来判断读取连贯次数是否已满16次,来决定main reactor线程是否退出循环。

                  do {
                        //底层调用NioServerSocketChannel->doReadMessages 创立客户端SocketChannel
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
                        //统计在以后事件循环中曾经读取到得Message数量(创立连贯的个数)
                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());

红框中圈进去的两个判断条件和本文主题无关,咱们这里不须要关注,笔者会在前面的文章具体介绍。

  • totalMessages < maxMessagePerRead:在本文的接管客户端连贯场景中,这个条件用于判断main reactor线程在read loop中的读取次数是否超过了16次。如果超过16次就会返回false,main reactor线程退出循环。
  • totalBytesRead > 0:用于判断当客户端NioSocketChannel上的OP_READ事件沉闷时,sub reactor线程在read loop中是否读取到了网络数据。

以上内容就是RecvByteBufAllocator.Handle在接管客户端连贯场景下的作用,大家这里认真看下这个allocHandle.continueReading()办法退出循环的判断条件,再联合整个do{....}while(...)接管连贯循环体,感触下是否哪里有些不对劲?Bug行将呈现~~~

4. 啊哈!!Bug ! !

netty不论是在本文中解决接管客户端连贯的场景还是在解决接管客户端连贯上的网络数据场景都会在一个do{....}while(...)循环read loop中一直的解决。

同时也都会利用在上一大节中介绍的RecvByteBufAllocator.Handle来记录每次read loop接管到的连贯个数和从连贯上读取到的网络数据大小。

从而在read loop的开端都会通过allocHandle.continueReading()办法判断是否应该退出read loop循环完结连贯的接管流程或者是完结连贯上数据的读取流程。

无论是用于接管客户端连贯的main reactor也好还是用于接管客户端连贯上的网络数据的sub reactor也好,它们的运行框架都是一样的,只不过是具体分工不同。

所以netty这里想用对立的RecvByteBufAllocator.Handle来解决以上两种场景。

RecvByteBufAllocator.Handle中的totalBytesRead字段次要记录sub reactor线程在解决客户端NioSocketChannel中OP_READ事件沉闷时,总共在read loop中读取到的网络数据,而这里是main reactor线程在接管客户端连贯所以这个字段并不会被设置。totalBytesRead字段的值在本文中永远会是0

所以无论同时有多少个客户端并发连贯到服务端上,在接管连贯的这个read loop中永远只会承受一个连贯就会退出循环,因为allocHandle.continueReading()办法中的判断条件totalBytesRead > 0永远会返回false

                  do {
                        //底层调用NioServerSocketChannel->doReadMessages 创立客户端SocketChannel
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
                        //统计在以后事件循环中曾经读取到得Message数量(创立连贯的个数)
                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());

而netty的本意是在这个read loop循环中尽可能多的去接管客户端的并发连贯,同时又不影响main reactor线程执行异步工作。然而因为这个Bug,main reactor在这个循环中只执行一次就完结了。这也肯定水平上就影响了netty的吞吐

让咱们设想下这样的一个场景,当有16个客户端同时并发连贯到了服务端,这时NioServerSocketChannel上的OP_ACCEPT事件沉闷,main reactor从Selector上被唤醒,随后执行OP_ACCEPT事件的解决。

public final class NioEventLoop extends SingleThreadEventLoop {
    @Override
    protected void run() {
        int selectCnt = 0;
        for (;;) {
            try { 
                int strategy;
                try {
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:                  
                          ............省略.........
                    case SelectStrategy.BUSY_WAIT:

                          ............省略.........
                    case SelectStrategy.SELECT:
                            ............监听轮询IO事件.........
                    default:
                    }
                } catch (IOException e) {
                    ............省略.........
                }

                ............解决IO就绪事件.........
                ............执行异步工作.........
    }
}

然而因为这个Bug的存在,main reactor在接管客户端连贯的这个read loop中只接管了一个客户端连贯就匆匆返回了。

      private final class NioMessageUnsafe extends AbstractNioUnsafe {
                    do {
                        int localRead = doReadMessages(readBuf);
                        .........省略...........
                    } while (allocHandle.continueReading());
     }

而后依据下图中这个Reactor的运行构造去执行异步工作,随后绕一大圈又会回到NioEventLoop#run办法中从新发动一轮OP_ACCEPT事件轮询。

因为当初还有15个客户端并发连贯没有被接管,所以此时Main Reactor线程并不会在selector.select()上阻塞,最终绕一圈又会回到NioMessageUnsafe#read办法的do{.....}while()循环。在接管一个连贯之后又退出循环。

原本咱们能够在一次read loop中把这16个并发的客户端连贯全副接管结束的,因为这个Bug,main reactor须要一直的发动OP_ACCEPT事件的轮询,绕了很大一个圈子。同时也减少了许多不必要的selector.select()零碎调用开销

这时大家在看这个Issue#11708中的探讨是不是就清晰很多了~~

Issue#11708:https://github.com/netty/nett…

4.1 Bug的修复

笔者在写这篇文章的时候,Netty最新版本是4.1.68.final,这个Bug在4.1.69.final中被修复。

因为该Bug产生的起因正是因为服务端NioServerSocketChannel(用于监听端口地址和接管客户端连贯)和 客户端NioSocketChannel(用于通信)中的Config配置类混用了同一个ByteBuffer分配器AdaptiveRecvByteBufAllocator而导致的。

所以在新版本修复中专门为服务端ServerSocketChannel中的Config配置类引入了一个新的ByteBuffer分配器ServerChannelRecvByteBufAllocator,专门用于服务端ServerSocketChannel接管客户端连贯的场景。

ServerChannelRecvByteBufAllocator的父类DefaultMaxMessagesRecvByteBufAllocator中引入了一个新的字段ignoreBytesRead,用于示意是否疏忽网络字节的读取,在创立服务端Channel配置类NioServerSocketChannelConfig的时候,这个字段会被赋值为true

当main reactor线程在read loop循环中接管客户端连贯的时候。

      private final class NioMessageUnsafe extends AbstractNioUnsafe {

                    do {
                        int localRead = doReadMessages(readBuf);
                        .........省略...........
                    } while (allocHandle.continueReading());
     }

在read loop循环的开端就会采纳从ServerChannelRecvByteBufAllocator 中创立的MaxMessageHandle#continueReading办法来判断读取连贯次数是否超过了16次。因为这里的ignoreBytesRead == true这回咱们就会疏忽totalBytesRead == 0的状况,从而使得接管连贯的read loop得以持续地执行上来。在一个read loop中一次性把16个连贯全副接管结束。

以上就是对这个Bug产生的起因,以及发现的过程,最初修复的计划一个全面的介绍,因而笔者也呈现在了netty 4.1.69.final版本发布公告里的thank-list中。哈哈,真是令人开心的一件事件~~~

通过以上对netty接管客户端连贯的全流程剖析和对这个Bug前因后果以及修复计划的介绍,大家当初肯定曾经了解了整个接管连贯的流程框架。

接下来笔者就把这个流程中波及到的一些外围模块在独自拎进去从细节动手,为大家各个击破~~~

5. doReadMessages接管客户端连贯

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

}
  • 通过javaChannel()获取封装在Netty服务端NioServerSocketChannel中的JDK 原生 ServerSocketChannel
    @Override
    protected ServerSocketChannel javaChannel() {
        return (ServerSocketChannel) super.javaChannel();
    }
  • 通过JDK NIO 原生ServerSocketChannelaccept办法获取JDK NIO 原生客户端连贯SocketChannel
    public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
                @Override
                public SocketChannel run() throws IOException {
                    return serverSocketChannel.accept();
                }
            });
        } catch (PrivilegedActionException e) {
            throw (IOException) e.getCause();
        }
    }

这一步就是咱们在《聊聊Netty那些事儿之从内核角度看IO模型》介绍到的调用监听Socketaccept办法,内核会基于监听Socket创立进去一个新的Socket专门用于与客户端之间的网络通信这个咱们称之为客户端连贯Socket。这里的ServerSocketChannel就相似于监听SocketSocketChannel就相似于客户端连贯Socket

因为咱们在创立NioServerSocketChannel的时候,会将JDK NIO 原生ServerSocketChannel设置为非阻塞,所以这里当ServerSocketChannel上有客户端连贯时就会间接创立SocketChannel,如果此时并没有客户端连贯时accept调用就会立即返回null并不会阻塞。

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            //设置Channel为非阻塞 配合IO多路复用模型
            ch.configureBlocking(false);
        } catch (IOException e) {
          ..........省略.............
        }
    }

5.1 创立客户端NioSocketChannel

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
          .........省略.......
        }

        return 0;
    }

}

这里会依据ServerSocketChannelaccept办法获取到JDK NIO 原生SocketChannel(用于底层真正与客户端通信的Channel),来创立Netty中的NioSocketChannel

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {

    public NioSocketChannel(Channel parent, SocketChannel socket) {
        super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }

}

创立客户端NioSocketChannel的过程其实和之前讲的创立服务端NioServerSocketChannel大体流程是一样的,咱们这里只对客户端NioSocketChannel和服务端NioServerSocketChannel在创立过程中的不同之处做一个比照。

具体细节局部大家能够在回看下《具体图解Netty Reactor启动全流程》一文中对于NioServerSocketChannel的创立的具体细节。

5.3 比照NioSocketChannel与NioServerSocketChannel的不同

1:Channel的档次不同

在咱们介绍Reactor的创立文章中,咱们提到Netty中的Channel是具备档次的。因为客户端NioSocketChannel是在main reactor接管连贯时在服务端NioServerSocketChannel中被创立的,所以在创立客户端NioSocketChannel的时候会通过构造函数指定了parent属性为NioServerSocketChanel。并将JDK NIO 原生SocketChannel封装进Netty的客户端NioSocketChannel中。

而在Reactor启动过程中创立NioServerSocketChannel的时候parent属性指定是null。因为它就是顶层的Channel,负责创立客户端NioSocketChannel

    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

2:向Reactor注册的IO事件不同

客户端NioSocketChannel向Sub Reactor注册的是SelectionKey.OP_READ事件,而服务端NioServerSocketChannel向Main Reactor注册的是SelectionKey.OP_ACCEPT事件

public abstract class AbstractNioByteChannel extends AbstractNioChannel {

    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }

}

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

   public NioServerSocketChannel(ServerSocketChannel channel) {
        //父类AbstractNioChannel中保留JDK NIO原生ServerSocketChannel以及要监听的事件OP_ACCEPT
        super(null, channel, SelectionKey.OP_ACCEPT);
        //DefaultChannelConfig中设置用于Channel接收数据用的buffer->AdaptiveRecvByteBufAllocator
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
}

3: 功能属性不同造成继承构造的不同

客户端NioSocketChannel继承的是AbstractNioByteChannel,而服务端NioServerSocketChannel继承的是AbstractNioMessageChannel
它们继承的这两个抽象类一个前缀是Byte,一个前缀是Message有什么区别吗??

客户端NioSocketChannel次要解决的是服务端与客户端的通信,这里波及到接管客户端发送来的数据,而Sub Reactor线程NioSocketChannel中读取的正是网络通信数据单位为Byte

服务端NioServerSocketChannel次要负责解决OP_ACCEPT事件,创立用于通信的客户端NioSocketChannel。这时候客户端与服务端还没开始通信,所以Main Reactor线程NioServerSocketChannel的读取对象为Message。这里的Message指的就是底层的SocketChannel客户端连贯。


以上就是NioSocketChannelNioServerSocketChannel创立过程中的不同之处,前面的过程就一样了。

  • 在AbstractNioChannel 类中封装JDK NIO 原生的SocketChannel,并将其底层的IO模型设置为非阻塞,保留须要监听的IO事件OP_READ
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            //设置Channel为非阻塞 配合IO多路复用模型
            ch.configureBlocking(false);
        } catch (IOException e) {

        }
    }
  • 为客户端NioSocketChannel创立全局惟一的channelId,创立客户端NioSocketChannel的底层操作类NioByteUnsafe,创立pipeline。
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        //channel全局惟一ID machineId+processId+sequence+timestamp+random
        id = newId();
        //unsafe用于底层socket的读写操作
        unsafe = newUnsafe();
        //为channel调配独立的pipeline用于IO事件编排
        pipeline = newChannelPipeline();
    }
  • 在NioSocketChannelConfig的创立过程中,将NioSocketChannel的RecvByteBufAllocator类型设置为AdaptiveRecvByteBufAllocator
    public DefaultChannelConfig(Channel channel) {
            this(channel, new AdaptiveRecvByteBufAllocator());
    }

在Bug修复后的版本中服务端NioServerSocketChannel的RecvByteBufAllocator类型设置为ServerChannelRecvByteBufAllocator

最终咱们失去的客户端NioSocketChannel构造如下:

6. ChannelRead事件的响应

在前边介绍接管连贯的整体外围流程框架的时候,咱们提到main reactor线程是在一个do{.....}while(...)循环read loop中一直的调用ServerSocketChannel#accept办法来接管客户端的连贯。

当满足退出read loop循环的条件有两个:

  1. 在限定的16次读取中,曾经没有新的客户端连贯要接管了。退出循环。
  2. 从NioServerSocketChannel中读取客户端连贯的次数达到了16次,无论此时是否还有客户端连贯都须要退出循环。

main reactor就会退出read loop循环,此时接管到的客户端连贯NioSocketChannel暂存与List<Object> readBuf汇合中。


    private final class NioMessageUnsafe extends AbstractNioUnsafe {

        private final List<Object> readBuf = new ArrayList<Object>();

        @Override
        public void read() {
            try {
                try {
                    do {
                        ........省略.........
                        //底层调用NioServerSocketChannel->doReadMessages 创立客户端SocketChannel
                        int localRead = doReadMessages(readBuf);
                        ........省略.........
                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());

                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                
                  ........省略.........
            } finally {
                  ........省略.........
            }
        }
    }

随后main reactor线程会遍历List<Object> readBuf汇合中的NioSocketChannel,并在NioServerSocketChannel的pipeline中流传ChannelRead事件。

最终ChannelRead事件会流传到ServerBootstrapAcceptor 中,这里正是Netty解决客户端连贯的外围逻辑所在。

ServerBootstrapAcceptor 次要的作用就是初始化客户端NioSocketChannel,并将客户端NioSocketChannel注册到Sub Reactor Group中,并监听OP_READ事件

在ServerBootstrapAcceptor 中会初始化客户端NioSocketChannel的这些属性。

比方:从Reactor组EventLoopGroup childGroup,用于初始化NioSocketChannel中的pipeline用到的ChannelHandler childHandler,以及NioSocketChannel中的一些childOptions childAttrs

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

        private final EventLoopGroup childGroup;
        private final ChannelHandler childHandler;
        private final Entry<ChannelOption<?>, Object>[] childOptions;
        private final Entry<AttributeKey<?>, Object>[] childAttrs;

        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            //向客户端NioSocketChannel的pipeline中
            //增加在启动配置类ServerBootstrap中配置的ChannelHandler
            child.pipeline().addLast(childHandler);

            //利用配置的属性初始化客户端NioSocketChannel
            setChannelOptions(child, childOptions, logger);
            setAttributes(child, childAttrs);

            try {
                /**
                 * 1:在Sub Reactor线程组中抉择一个Reactor绑定
                 * 2:将客户端SocketChannel注册到绑定的Reactor上
                 * 3:SocketChannel注册到sub reactor中的selector上,并监听OP_READ事件
                 * */
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
}

正是在这里,netty会将咱们在《具体图解Netty Reactor启动全流程》的启动示例程序中在ServerBootstrap中配置的客户端NioSocketChannel的所有属性(child前缀配置)初始化到NioSocketChannel中。

public final class EchoServer {
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure the server.
        //创立主从Reactor线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主从Reactor
             .channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型
             .option(ChannelOption.SO_BACKLOG, 100)//设置主Reactor中channel的option选项
             .handler(new LoggingHandler(LogLevel.INFO))//设置主Reactor中Channel->pipline->handler
             .childHandler(new ChannelInitializer<SocketChannel>() {//设置从Reactor中注册channel的pipeline
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // Start the server. 绑定端口启动服务,开始监听accept事件
            ChannelFuture f = b.bind(PORT).sync();
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

以上示例代码中通过ServerBootstrap配置的NioSocketChannel相干属性,会在Netty启动并开始初始化NioServerSocketChannel的时候将ServerBootstrapAcceptor 的创立初始化工作封装成异步工作,而后在NioServerSocketChannel注册到Main Reactor中胜利后执行。

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

    @Override
    void init(Channel channel) {
        ................省略................

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                ................省略................
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
}

在通过ServerBootstrapAccptor#chanelRead回调的解决之后,此时客户端NioSocketChannel中pipeline的构造为:

随后会将初始化好的客户端NioSocketChannel向Sub Reactor Group中注册,并监听OP_READ事件

如下图中的步骤3所示:

7. 向SubReactorGroup中注册NioSocketChannel

                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });

客户端NioSocketChannel向Sub Reactor Group注册的流程齐全和服务端NioServerSocketChannel向Main Reactor Group注册流程一样。

对于服务端NioServerSocketChannel的注册流程,笔者曾经在《具体图解Netty Reactor启动全流程》一文中做出了具体的介绍,对相干细节感兴趣的同学能够在回看下。

这里笔者在带大家简要回顾下整个注册过程并着重区别比照客户端NioSocetChannel与服务端NioServerSocketChannel注册过程中不同的中央。

7.1 从Sub Reactor Group中选取一个Sub Reactor进行绑定

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

   @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

    @Override
    public EventExecutor next() {
        return chooser.next();
    }

}

7.2 向绑定的Sub Reactor上注册NioSocketChannel

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    @Override
    public ChannelFuture register(Channel channel) {
        //注册channel到绑定的Reactor上
        return register(new DefaultChannelPromise(channel, this));
    }

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        //unsafe负责channel底层的各种操作
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

}
  • 过后咱们在介绍NioServerSocketChannel的注册过程时,这里的promise.channel()NioServerSocketChannel。底层的unsafe操作类为NioMessageUnsafe
  • 此时这里的promise.channel()NioSocketChannel。底层的unsafe操作类为NioByteUnsafe
        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ..............省略....................
            //此时这里的eventLoop为Sub Reactor
            AbstractChannel.this.eventLoop = eventLoop;

            /**
             * 执行channel注册的操作必须是Reactor线程来实现
             *
             * 1: 如果以后执行线程是Reactor线程,则间接执行register0进行注册
             * 2:如果以后执行线程是内部线程,则须要将register0注册操作 封装程异步Task 由Reactor线程执行
             * */
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    ..............省略....................
                }
            }
        }

留神此时传递进来的EventLoop eventLoop为Sub Reactor

但此时的执行线程为Main Reactor线程,并不是Sub Reactor线程(此时还未启动)

所以这里的eventLoop.inEventLoop()返回的是false

else分支中向绑定的Sub Reactor提交注册NioSocketChannel的工作。

当注册工作提交后,此时绑定的Sub Reactor线程启动。

7.3 register0

咱们又来到了Channel注册的老中央register0办法。在《具体图解Netty Reactor启动全流程》中咱们花了大量的篇幅介绍了这个办法。这里咱们只比照NioSocketChannelNioServerSocketChannel不同的中央。

 private void register0(ChannelPromise promise) {
            try {
                ................省略..................
                boolean firstRegistration = neverRegistered;
                //执行真正的注册操作
                doRegister();
                //批改注册状态
                neverRegistered = false;
                registered = true;

                pipeline.invokeHandlerAddedIfNeeded();

                if (isActive()) {
                    if (firstRegistration) {
                        //触发channelActive事件
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                 ................省略..................
            }
        }

这里 doRegister()办法将NioSocketChannel注册到Sub Reactor中的Selector上。

public abstract class AbstractNioChannel extends AbstractChannel {

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                ...............省略...............
            }
        }
    }

}

这里是Netty客户端NioSocketChannel与JDK NIO 原生 SocketChannel关联的中央。此时注册的IO事件仍然是0。目标也是只是为了获取NioSocketChannel在Selector中的SelectionKey

同时通过SelectableChannel#register办法将Netty自定义的NioSocketChannel(这里的this指针)附着在SelectionKey的attechment属性上,实现Netty自定义Channel与JDK NIO Channel的关系绑定。这样在每次对Selector进行IO就绪事件轮询时,Netty 都能够从 JDK NIO Selector返回的SelectionKey中获取到自定义的Channel对象(这里指的就是NioSocketChannel)。

随后调用pipeline.invokeHandlerAddedIfNeeded()回调客户端NioSocketChannel上pipeline中的所有ChannelHandler的handlerAdded办法,此时pipeline的构造中只有一个ChannelInitializer。最终会在ChannelInitializer#handlerAdded回调办法中初始化客户端NioSocketChannelpipeline

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            if (initChannel(ctx)) {
                //初始化工作实现后,须要将本身从pipeline中移除
                removeState(ctx);
            }
        }
    }

    protected abstract void initChannel(C ch) throws Exception;
}

对于对Channel中pipeline的具体初始化过程,对细节局部感兴趣的同学能够回看下《具体图解Netty Reactor启动全流程》

此时客户端NioSocketChannel中的pipeline中的构造就变为了咱们自定义的样子,在示例代码中咱们自定义的ChannelHandlerEchoServerHandler

@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {

        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

当客户端NioSocketChannel中的pipeline初始化结束后,netty就开始调用safeSetSuccess(promise)办法回调regFuture中注册的ChannelFutureListener,告诉客户端NioSocketChannel曾经胜利注册到Sub Reactor上了。

               childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });

在服务端NioServerSocketChannel注册的时候咱们会在listener中向Main Reactor提交bind绑定端口地址工作。然而在NioSocketChannel注册的时候,只会在listener中解决一下注册失败的状况。

当Sub Reactor线程告诉ChannelFutureListener注册胜利之后,随后就会调用pipeline.fireChannelRegistered()在客户端NioSocketChannel的pipeline中流传ChannelRegistered事件

这里笔者重点要强调下,在之前介绍NioServerSocketChannel注册的时候,咱们提到因为此时NioServerSocketChannel并未绑定端口地址,所以这时的NioServerSocketChannel并未激活,这里的isActive()返回falseregister0办法间接返回。

服务端NioServerSocketChannel判断是否激活的规范为端口是否绑定胜利。

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {
    @Override
    public boolean isActive() {
        return isOpen() && javaChannel().socket().isBound();
    }
}

客户端NioSocketChannel判断是否激活的规范为是否处于Connected状态。那么显然这里必定是处于connected状态的。

    @Override
    public boolean isActive() {
        SocketChannel ch = javaChannel();
        return ch.isOpen() && ch.isConnected();
    }

NioSocketChannel曾经处于connected状态,这里并不需要绑定端口,所以这里的isActive()返回true

           if (isActive()) {
                    /**
                     * 客户端SocketChannel注册胜利后会走这里,在channelActive事件回调中注册OP_READ事件
                     * */
                    if (firstRegistration) {
                        //触发channelActive事件
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        .......省略..........
                    }
                }
            }

最初调用pipeline.fireChannelActive()在NioSocketChannel中的pipeline流传ChannelActive事件,最终在pipeline的头结点HeadContext中响应并注册OP_READ事件Sub Reactor中的Selector上。

public abstract class AbstractNioChannel extends AbstractChannel { {

    @Override
    protected void doBeginRead() throws Exception {
        ..............省略................

        final int interestOps = selectionKey.interestOps();
        /**
         * 1:ServerSocketChannel 初始化时 readInterestOp设置的是OP_ACCEPT事件
         * 2:SocketChannel 初始化时 readInterestOp设置的是OP_READ事件
         * */
        if ((interestOps & readInterestOp) == 0) {
            //注册监听OP_ACCEPT或者OP_READ事件
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

}

留神这里的readInterestOp为客户端NioSocketChannel在初始化时设置的OP_READ事件


到这里,Netty中的Main Reactor接管连贯的整个流程,咱们就介绍完了,此时Netty中主从Reactor组的构造就变为:

总结

本文咱们介绍了NioServerSocketChannel解决客户端连贯事件的整个过程。

  • 接管连贯的整个解决框架。
  • 影响Netty接管连贯吞吐的Bug产生的起因,以及修复的计划。
  • 创立并初始化客户端NioSocketChannel
  • 初始化NioSocketChannel中的pipeline
  • 客户端NioSocketChannelSub Reactor注册的过程

其中咱们也比照了NioServerSocketChannelNioSocketChannel在创立初始化以及前面向Reactor注册过程中的差别之处。

当客户端NioSocketChannel接管结束并向Sub Reactor注册胜利后,那么接下来Sub Reactor就开始监听注册其上的所有客户端NioSocketChannelOP_READ事件,并期待客户端向服务端发送网络数据。

前面Reactor的配角就该变为Sub Reactor以及注册在其上的客户端NioSocketChannel了。

下篇文章,咱们将会探讨Netty是如何接管网络数据的~~ 咱们下篇文章见

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理