本系列Netty源码解析文章基于 4.1.56.Final版本

前文回顾

在前边的系列文章中,咱们从内核如何收发网络数据开始以一个C10K的问题作为主线具体从内核角度论述了网络IO模型的演变,最终在此基础上引出了Netty的网络IO模型如下图所示:

具体内容可回看《从内核角度看IO模型的演变》

后续咱们又围绕着Netty的主从Reactor网络IO线程模型,在《Reactor模型在Netty中的实现》一文中具体论述了Netty的主从Reactor模型的创立,以及介绍了Reactor模型的要害组件。搭建了Netty的外围骨架如下图所示:

在外围骨架搭建结束之后,咱们随后又在《具体图解Reactor启动全流程》一文中论述了Reactor启动的全流程,一个十分重要的外围组件NioServerSocketChannel开始在这里首次亮相,承当着一个网络框架最重要的工作--高效接管网络连接。咱们介绍了NioServerSocketChannel的创立,初始化,向Main Reactor注册并监听OP_ACCEPT事件的整个流程。在此基础上,Netty得以整装待发,严阵以待开始迎接海量的客户端连贯。

随后紧接着咱们在《Netty如何高效接管网络连接》一文中具体介绍了Netty高效接管客户端网络连接的全流程,在这里Netty的外围重要组件NioServerSocketChannel开始正是退场,在NioServerSocketChannel中咱们创立了客户端连贯NioSocketChannel,并具体介绍了NioSocketChannel的初始化过程,随后通过在NioServerSocketChannel的pipeline中触发ChannelRead事件,并最终在ServerBootstrapAcceptor中将客户端连贯NioSocketChannel注册到Sub Reactor中开始监听客户端连贯上的OP_READ事件,筹备接管客户端发送的网络数据也就是本文的主题内容。

自此Netty的外围组件全副就绪并启动结束,开始腾飞~~~

之前文章中的配角是Netty中主Reactor组中的Main Reactor以及注册在Main Reactor上边的NioServerSocketChannel,那么从本文开始,咱们文章中的配角就切换为Sub Reactor以及注册在SubReactor上的NioSocketChannel了。

上面就让咱们正式进入明天的主题,看一下Netty是如何解决OP_READ事件以及如何高效接管网络数据的。

1. Sub Reactor解决OP_READ事件流程总览

客户端发动零碎IO调用向服务端发送数据之后,当网络数据达到服务端的网卡并通过内核协定栈的解决,最终数据达到Socket的接收缓冲区之后,Sub Reactor轮询到NioSocketChannel上的OP_READ事件就绪,随后Sub Reactor线程就会从JDK Selector上的阻塞轮询APIselector.select(timeoutMillis)调用中返回。转而去解决NioSocketChannel上的OP_READ事件

留神这里的Reactor为负责解决客户端连贯的Sub Reactor。连贯的类型为NioSocketChannel,解决的事件为OP_READ事件。

在之前的文章中笔者曾经屡次强调过了,Reactor在解决Channel上的IO事件入口函数为NioEventLoop#processSelectedKey

public final class NioEventLoop extends SingleThreadEventLoop {    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();        ..............省略.................        try {            int readyOps = k.readyOps();            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {               ..............解决OP_CONNECT事件.................            }            if ((readyOps & SelectionKey.OP_WRITE) != 0) {              ..............解决OP_WRITE事件.................            }            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {                //本文重点解决OP_ACCEPT事件                unsafe.read();            }        } catch (CancelledKeyException ignored) {            unsafe.close(unsafe.voidPromise());        }    }}

这里须要重点强调的是,以后的执行线程当初曾经变成了Sub Reactor,而Sub Reactor上注册的正是netty客户端NioSocketChannel负责解决连贯上的读写事件。

所以这里入口函数的参数AbstractNioChannel ch则是IO就绪的客户端连贯NioSocketChannel

结尾通过ch.unsafe()获取到的NioUnsafe操作类正是NioSocketChannel中对底层JDK NIO SocketChannel的Unsafe底层操作类。实现类型为NioByteUnsafe定义在下图继承构造中的AbstractNioByteChannel父类中。

上面咱们到NioByteUnsafe#read办法中来看下Netty对OP_READ事件的具体处理过程:

2. Netty接管网络数据流程总览

咱们间接依照老规矩,先从整体上把整个OP_READ事件的逻辑解决框架提取进去,让大家先总体仰视下流程全貌,而后在针对每个外围点位进行各个击破。

流程中相干置灰的步骤为Netty解决连贯敞开时的逻辑,和本文宗旨无关,咱们这里临时疏忽,等后续笔者介绍连贯敞开时,会独自开一篇文章具体为大家介绍。

从下面这张Netty接管网络数据总体流程图能够看出NioSocketChannel在接管网络数据的整个流程和咱们在上篇文章《Netty如何高效接管网络连接》中介绍的NioServerSocketChannel在接管客户端连贯时的流程在总体框架上是一样的。

NioSocketChannel在接管网络数据的过程解决中,也是通过在一个do{....}while(...)循环read loop中一直的循环读取连贯NioSocketChannel上的数据。

同样在NioSocketChannel读取连贯数据的read loop中也是受最大读取次数的限度。默认配置最多只能读取16次,超过16次无论此时NioSocketChannel中是否还有数据可读都不能在进行读取了。

这里read loop循环最大读取次数可在启动配置类ServerBootstrap中通过ChannelOption.MAX_MESSAGES_PER_READ选项设置,默认为16。

ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup)  .channel(NioServerSocketChannel.class)  .option(ChannelOption.MAX_MESSAGES_PER_READ, 自定义次数)

Netty这里为什么非得限度read loop的最大读取次数呢?为什么不在read loop中一次性把数据读取完呢?

这时候就是考验咱们大局观的时候了,在前边的文章介绍中咱们提到Netty的IO模型为主从Reactor线程组模型,在Sub Reactor Group中蕴含了多个Sub Reactor专门用于监听解决客户端连贯上的IO事件。

为了可能高效有序的解决全量客户端连贯上的读写事件,Netty将服务端承载的全量客户端连贯摊派到多个Sub Reactor中解决,同时也能保障Channel上IO解决的线程安全性

其中一个Channel只能调配给一个固定的Reactor。一个Reactor负责解决多个Channel上的IO就绪事件,Reactor与Channel之间的对应关系如下图所示:

而一个Sub Reactor上注册了多个NioSocketChannel,Netty不可能在一个NioSocketChannel上无限度的解决上来,要将读取数据的机会平均摊派给其余NioSocketChannel,所以须要限定每个NioSocketChannel上的最大读取次数。

此外,Sub Reactor除了须要监听解决所有注册在它上边的NioSocketChannel中的IO就绪事件之外,还须要腾出事件来解决有用户线程提交过去的异步工作。从这一点看,Netty也不会始终停留在NioSocketChannel的IO解决上。所以限度read loop的最大读取次数是十分必要的。

对于Reactor的整体运行架构,对细节局部感兴趣的同学能够回看下笔者的《一文聊透Netty外围引擎Reactor的运行架构》这篇文章。

所以基于这个起因,咱们须要在read loop循环中,每当通过doReadBytes办法从NioSocketChannel中读取到数据时(办法返回值会大于0,并记录在allocHandle.lastBytesRead中),都须要通过allocHandle.incMessagesRead(1)办法统计曾经读取的次数。当达到16次时不论NioSocketChannel是否还有数据可读,都须要在read loop开端退出循环。转去执行Sub Reactor上的异步工作。以及其余NioSocketChannel上的IO就绪事件。平均分配,雨露均沾!!

public abstract class MaxMessageHandle implements ExtendedHandle {        //read loop总共读取了多少次        private int totalMessages;       @Override        public final void incMessagesRead(int amt) {            totalMessages += amt;        }}

本次read loop读取到的数据大小会记录在allocHandle.lastBytesRead

public abstract class MaxMessageHandle implements ExtendedHandle {         //本次read loop读取到的字节数        private int lastBytesRead;        //整个read loop循环总共读取的字节数        private int totalBytesRead;        @Override        public void lastBytesRead(int bytes) {            lastBytesRead = bytes;            if (bytes > 0) {                totalBytesRead += bytes;            }        }}
  • lastBytesRead < 0:示意客户端被动发动了连贯敞开流程,Netty开始连贯敞开解决流程。这个和本文的宗旨无关,咱们先不必管。前面笔者会专门用一篇文章来详解敞开流程。
  • lastBytesRead = 0:示意以后NioSocketChannel上的数据曾经全副读取结束,没有数据可读了。本次OP_READ事件圆满处理完毕,能够开开心心的退出read loop。
  • lastBytesRead > 0:示意在本次read loop中从NioSocketChannel中读取到了数据,会在NioSocketChannel的pipeline中触发ChannelRead事件。进而在pipeline中负责IO解决的ChannelHandelr中响应,解决网络申请。

public class EchoServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {          .......解决网络申请,比方解码,反序列化等操作.......    }}

最初会在read loop循环的开端调用allocHandle.continueReading()判断是否完结本次read loop循环。这里的完结循环条件的判断会比咱们在介绍NioServerSocketChannel接管连贯时的判断条件简单很多,笔者会将这个判断条件的具体解析放在文章前面细节局部为大家解读,这里大家只须要把握总体外围流程,不须要关注太多细节。

总体上在NioSocketChannel中读取网络数据的read loop循环完结条件须要满足以下几点:

  • 以后NioSocketChannel中的数据曾经全副读取结束,则退出循环。
  • 本轮read loop如果没有读到任何数据,则退出循环。
  • read loop的读取次数达到16次,退出循环。

当满足这里的read loop退出条件之后,Sub Reactor线程就会退出循环,随后会调用allocHandle.readComplete()办法依据本轮read loop总共读取到的字节数totalBytesRead来决定是否对用于接管下一轮OP_READ事件数据的ByteBuffer进行扩容或者缩容。

最初在NioSocketChannel的pipeline中触发ChannelReadComplete事件,告诉ChannelHandler本次OP_READ事件曾经处理完毕。

public class EchoServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {       .......解决网络申请,比方解码,反序列化等操作.......    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) {        ......本次OP_READ事件处理完毕.......        ......决定是否向客户端响应处理结果......    }}

2.1 ChannelRead与ChannelReadComplete事件的区别

有些小伙伴可能对Netty中的一些流传事件触发的机会,或者事件之间的区别了解的不是很分明,概念容易混同。在前面的文章中笔者也会从源码的角度登程给大家说分明Netty中定义的所有异步事件,以及这些事件之间的区别和分割和触发机会,流传机制。

这里咱们次要探讨本文主题中波及到的两个事件:ChannelRead事件与ChannelReadComplete事件。

从上述介绍的Netty接管网络数据流程总览中咱们能够看出ChannelRead事件ChannelReadComplete事件是不一样的,然而对于刚接触Netty的小伙伴来说从命名上乍一看感觉又差不多。

上面咱们来看这两个事件之间的差异:

Netty服务端对于一次OP_READ事件的解决,会在一个do{}while()循环read loop中分屡次从客户端NioSocketChannel中读取网络数据。每次读取咱们调配的ByteBuffer容量大小,初始容量为2048。

  • ChanneRead事件:一次循环读取一次数据,就触发一次ChannelRead事件。本次最多读取在read loop循环开始调配的DirectByteBuffer容量大小。这个容量会动静调整,文章后续笔者会具体介绍。
  • ChannelReadComplete事件:当读取不到数据或者不满足continueReading 的任意一个条件就会退出read loop,这时就会触发ChannelReadComplete事件。示意本次OP_READ事件处理完毕。
这里须要特地留神下触发ChannelReadComplete事件并不代表NioSocketChannel中的数据曾经读取完了,只能阐明本次OP_READ事件处理完毕。因为有可能是客户端发送的数据太多,Netty读了16次还没读完,那就只能等到下次OP_READ事件到来的时候在进行读取了。

以上内容就是Netty在接管客户端发送网络数据的全副外围逻辑。目前为止咱们还未波及到这部分的骨干外围源码,笔者想的是先给大家把外围逻辑解说分明之后,这样了解起来外围骨干源码会更加清晰透彻。

通过前边对网络数据接管的外围逻辑介绍,笔者在把这张流程图放进去,大家能够联合这张图在来回忆下骨干外围逻辑。

上面笔者会联合这张流程图,给大家把这部分的外围骨干源码框架展示进去,大家能够将咱们介绍过的外围逻辑与骨干源码做个一一对应,还是那句老话,咱们要从骨干框架层面把握整体解决流程,不须要读懂每一行代码,文章后续笔者会将这个过程中波及到的外围点位给大家拆开来各个击破!!

3. 源码外围框架总览

        @Override        public final void read() {            final ChannelConfig config = config();            ...............解决半敞开相干代码省略...............            //获取NioSocketChannel的pipeline            final ChannelPipeline pipeline = pipeline();            //PooledByteBufAllocator 具体用于理论调配ByteBuf的分配器            final ByteBufAllocator allocator = config.getAllocator();            //自适应ByteBuf分配器 AdaptiveRecvByteBufAllocator ,用于动静调节ByteBuf容量            //须要与具体的ByteBuf分配器配合应用 比方这里的PooledByteBufAllocator            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();            //allocHandler用于统计每次读取数据的大小,不便下次调配适合大小的ByteBuf            //重置革除上次的统计指标            allocHandle.reset(config);            ByteBuf byteBuf = null;            boolean close = false;            try {                do {                    //利用PooledByteBufAllocator调配适合大小的byteBuf 初始大小为2048                    byteBuf = allocHandle.allocate(allocator);                    //记录本次读取了多少字节数                    allocHandle.lastBytesRead(doReadBytes(byteBuf));                    //如果本次没有读取到任何字节,则退出循环 进行下一轮事件轮询                    if (allocHandle.lastBytesRead() <= 0) {                        // nothing was read. release the buffer.                        byteBuf.release();                        byteBuf = null;                        close = allocHandle.lastBytesRead() < 0;                        if (close) {                            ......示意客户端发动连贯敞开.....                        }                        break;                    }                    //read loop读取数据次数+1                    allocHandle.incMessagesRead(1);                    //客户端NioSocketChannel的pipeline中触发ChannelRead事件                    pipeline.fireChannelRead(byteBuf);                    //解除本次读取数据调配的ByteBuffer援用,不便下一轮read loop调配                    byteBuf = null;                } while (allocHandle.continueReading());//判断是否应该持续read loop                //依据本次read loop总共读取的字节数,决定下次是否扩容或者缩容                allocHandle.readComplete();                //在NioSocketChannel的pipeline中触发ChannelReadComplete事件,示意一次read事件处理完毕                //但这并不示意 客户端发送来的数据曾经全副读完,因为如果数据太多的话,这里只会读取16次,剩下的会等到下次read事件到来后在解决                pipeline.fireChannelReadComplete();                .........省略连贯敞开流程解决.........            } catch (Throwable t) {                ...............省略...............            } finally {               ...............省略...............            }        }    }
这里再次强调下以后执行线程为Sub Reactor线程,解决连贯数据读取逻辑是在NioSocketChannel中。

首先通过config()获取客户端NioSocketChannel的Channel配置类NioSocketChannelConfig。

通过pipeline()获取NioSocketChannel的pipeline。咱们在《具体图解Netty Reactor启动全流程》一文中提到的Netty服务端模板所举的示例中,NioSocketChannelde pipeline中只有一个EchoChannelHandler。

3.1 调配DirectByteBuffer接管网络数据

Sub Reactor在接管NioSocketChannel上的IO数据时,都会调配一个ByteBuffer用来寄存接管到的IO数据。

这里大家可能感觉比拟奇怪,为什么在NioSocketChannel接收数据这里会有两个ByteBuffer分配器呢?一个是ByteBufAllocator,另一个是RecvByteBufAllocator。

    final ByteBufAllocator allocator = config.getAllocator();    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();

这两个ByteBuffer又有什么区别和分割呢?

在上篇文章《抓到Netty一个Bug,顺带来透彻地聊一下Netty是如何高效接管网络连接》中,笔者为了论述上篇文章中提到的Netty在接管网络连接时的Bug时,简略和大家介绍了下这个RecvByteBufAllocator。

在上篇文章提到的NioServerSocketChannelConfig中,这里的RecvByteBufAllocator类型为ServerChannelRecvByteBufAllocator。

还记得这个ServerChannelRecvByteBufAllocator类型在4.1.69.final版本引入是为了解决笔者在上篇文章中提到的那个Bug吗?在4.1.69.final版本之前,NioServerSocketChannelConfig中的RecvByteBufAllocator类型为AdaptiveRecvByteBufAllocator。

而在本文中NioSocketChannelConfig中的RecvByteBufAllocator类型为AdaptiveRecvByteBufAllocator。

所以这里recvBufAllocHandle()取得到的RecvByteBufAllocator为AdaptiveRecvByteBufAllocator。顾名思义,这个类型的RecvByteBufAllocator能够依据NioSocketChannel上每次到来的IO数据大小来自适应动静调整ByteBuffer的容量。

对于客户端NioSocketChannel来说,它里边蕴含的IO数据时客户端发送来的网络数据,长度是不定的,所以才会须要这样一个能够依据每次IO数据的大小来自适应动静调整容量的ByteBuffer来接管。

如果咱们把用于接收数据用的ByteBuffer看做一个桶的话,那么小数据用大桶装或者大数据用小桶装必定是不适合的,所以咱们须要依据接收数据的大小来动静调整桶的容量。而AdaptiveRecvByteBufAllocator的作用正是用来依据每次接收数据的容量大小来动静调整ByteBuffer的容量的。

当初RecvByteBufAllocator笔者为大家解释分明了,接下来咱们持续看ByteBufAllocator。

大家这里须要留神的是AdaptiveRecvByteBufAllocator并不会真正的去调配ByteBuffer,它只是负责动静调整调配ByteBuffer的大小。

而真正具体执行内存调配动作的是这里的ByteBufAllocator类型为PooledByteBufAllocator。它会依据AdaptiveRecvByteBufAllocator动静调整进去的大小去真正的申请内存调配ByteBuffer。

PooledByteBufAllocator为Netty中的内存池,用来治理堆外内存DirectByteBuffer。

AdaptiveRecvByteBufAllocator中的allocHandle在上篇文章中咱们也介绍过了,它的理论类型为MaxMessageHandle。

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {    @Override    public Handle newHandle() {        return new HandleImpl(minIndex, maxIndex, initial);    }        private final class HandleImpl extends MaxMessageHandle {                  .................省略................    }}

在MaxMessageHandle中蕴含了用于动静调整ByteBuffer容量的统计指标。

   public abstract class MaxMessageHandle implements ExtendedHandle {        private ChannelConfig config;        //用于管制每次read loop里最大能够循环读取的次数,默认为16次        //可在启动配置类ServerBootstrap中通过ChannelOption.MAX_MESSAGES_PER_READ选项设置。        private int maxMessagePerRead;        //用于统计read loop中总共接管的连贯个数,NioSocketChannel中示意读取数据的次数        //每次read loop循环后会调用allocHandle.incMessagesRead减少记录接管到的连贯个数        private int totalMessages;        //用于统计在read loop中总共接管到客户端连贯上的数据大小        private int totalBytesRead;        //示意本次read loop 尝试读取多少字节,byteBuffer残余可写的字节数        private int attemptedBytesRead;        //本次read loop读取到的字节数        private int lastBytesRead;                //预计下一次调配buffer的容量,初始:2048        private int nextReceiveBufferSize;        ...........省略.............}

在每轮read loop开始之前,都会调用allocHandle.reset(config)重置清空上一轮read loop的统计指标。

        @Override        public void reset(ChannelConfig config) {            this.config = config;            //默认每次最多读取16次            maxMessagePerRead = maxMessagesPerRead();            totalMessages = totalBytesRead = 0;        }

在每次开始从NioSocketChannel中读取数据之前,须要利用PooledByteBufAllocator在内存池中为ByteBuffer分配内存,默认初始化大小为2048,这个容量由guess()办法决定。

        byteBuf = allocHandle.allocate(allocator);
        @Override        public ByteBuf allocate(ByteBufAllocator alloc) {            return alloc.ioBuffer(guess());        }        @Override        public int guess() {            //预计下一次调配buffer的容量,一开始为2048            return nextReceiveBufferSize;        }

在每次通过doReadBytes从NioSocketChannel中读取到数据后,都会调用allocHandle.lastBytesRead(doReadBytes(byteBuf))记录本次读取了多少字节数据,并统计本轮read loop目前总共读取了多少字节。

        @Override        public void lastBytesRead(int bytes) {            lastBytesRead = bytes;            if (bytes > 0) {                totalBytesRead += bytes;            }        }

每次循环从NioSocketChannel中读取数据之后,都会调用allocHandle.incMessagesRead(1)。统计以后曾经读取了多少次。如果超过了最大读取限度此时16次,就须要退出read loop。去解决其余NioSocketChannel上的IO事件。

        @Override        public final void incMessagesRead(int amt) {            totalMessages += amt;        }

在每次read loop循环的开端都须要通过调用allocHandle.continueReading()来判断是否持续read loop循环读取NioSocketChannel中的数据。

        @Override        public boolean continueReading() {            return continueReading(defaultMaybeMoreSupplier);        }        private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {            @Override            public boolean get() {                //判断本次读取byteBuffer是否满载而归                return attemptedBytesRead == lastBytesRead;            }        };        @Override        public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {            return config.isAutoRead() &&                   (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&                   totalMessages < maxMessagePerRead &&                   totalBytesRead > 0;        }
  • attemptedBytesRead :示意以后ByteBuffer预计尝试要写入的字节数。
  • lastBytesRead :示意本次read loop实在读取到了多少个字节。

defaultMaybeMoreSupplier 用于判断通过本次read loop读取数据后,ByteBuffer是否满载而归。如果是满载而归的话(attemptedBytesRead == lastBytesRead),表明可能NioSocketChannel里还有数据。如果不是满载而归,表明NioSocketChannel里没有数据了曾经。

是否持续进行read loop须要同时满足以下几个条件:

  • totalMessages < maxMessagePerRead 以后读取次数是否曾经超过16次,如果超过,就退出do(...)while()循环。进行下一轮OP_READ事件的轮询。因为每个Sub Reactor治理了多个NioSocketChannel,不能在一个NioSocketChannel上占用太多工夫,要将机会平均地调配给Sub Reactor所治理的所有NioSocketChannel。
  • totalBytesRead > 0 本次OP_READ事件解决是否读取到了数据,如果曾经没有数据可读了,那么就间接退出read loop。
  • !respectMaybeMoreData || maybeMoreDataSupplier.get() 这个条件比较复杂,它其实就是通过respectMaybeMoreData字段来管制NioSocketChannel中可能还有数据可读的状况下该如何解决。

    • maybeMoreDataSupplier.get():true示意本次读取从NioSocketChannel中读取数据,ByteBuffer满载而归。阐明可能NioSocketChannel中还有数据没读完。fasle示意ByteBuffer还没有装满,阐明NioSocketChannel中曾经没有数据可读了。
    • respectMaybeMoreData = true示意要对可能还有更多数据进行解决的这种状况要respect认真对待,如果本次循环读取到的数据曾经装满ByteBuffer,示意前面可能还有数据,那么就要进行读取。如果ByteBuffer还没装满示意曾经没有数据可读了那么就退出循环。

    • respectMaybeMoreData = false示意对可能还有更多数据的这种状况不认真对待 not respect。不论本次循环读取数据ByteBuffer是否满载而归,都要持续进行读取,直到读取不到数据在退出循环,属于无脑读取。

同时满足以上三个条件,那么read loop持续进行。持续从NioSocketChannel中读取数据,直到读取不到或者不满足三个条件中的任意一个为止。

3.2 从NioSocketChannel中读取数据

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {    @Override    protected int doReadBytes(ByteBuf byteBuf) throws Exception {        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();        allocHandle.attemptedBytesRead(byteBuf.writableBytes());            return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());    }}

这里会间接调用底层JDK NIO的SocketChannel#read办法将数据读取到DirectByteBuffer中。读取数据大小为本次调配的DirectByteBuffer容量,初始为2048。

4. ByteBuffer动静自适应扩缩容机制

因为咱们一开始并不知道客户端会发送多大的网络数据,所以这里先利用PooledByteBufAllocator调配一个初始容量为2048的DirectByteBuffer用于接收数据。

  byteBuf = allocHandle.allocate(allocator);

这就好比咱们须要拿着一个桶去排队装水,然而第一次去装的时候,咱们并不知道管理员会给咱们调配多少水,桶拿大了也不适合拿小了也不适合,于是咱们就先预估一个差不多容量大小的桶,如果调配的多了,咱们下次就拿更大一点的桶,如果调配少了,下次咱们就拿一个小点的桶。

在这种场景下,咱们须要ByteBuffer能够主动依据每次网络数据的大小来动静自适应调整本人的容量。

而ByteBuffer动静自适应扩缩容机制依赖于AdaptiveRecvByteBufAllocator类的实现。让咱们先回到AdaptiveRecvByteBufAllocator类的创立终点开始说起~~

4.1 AdaptiveRecvByteBufAllocator的创立

在前文《Netty是如何高效接管网络连接》中咱们提到,当Main Reactor监听到OP_ACCPET事件沉闷后,会在NioServerSocketChannel中accept实现三次握手的客户端连贯。并创立NioSocketChannel,随同着NioSocketChannel的创立其对应的配置类NioSocketChannelConfig类也会随之创立。

    public NioSocketChannel(Channel parent, SocketChannel socket) {        super(parent, socket);        config = new NioSocketChannelConfig(this, socket.socket());    }

最终会在NioSocketChannelConfig的父类DefaultChannelConfig的结构器中创立AdaptiveRecvByteBufAllocator 。并保留在RecvByteBufAllocator rcvBufAllocator字段中。

public class DefaultChannelConfig implements ChannelConfig {    //用于Channel接收数据用的buffer分配器  AdaptiveRecvByteBufAllocator    private volatile RecvByteBufAllocator rcvBufAllocator;    public DefaultChannelConfig(Channel channel) {            this(channel, new AdaptiveRecvByteBufAllocator());    }}

new AdaptiveRecvByteBufAllocator()创立AdaptiveRecvByteBufAllocator类实例的时候会先触发AdaptiveRecvByteBufAllocator类的初始化。

咱们先来看下AdaptiveRecvByteBufAllocator类的初始化都做了些什么事件:

4.2 AdaptiveRecvByteBufAllocator类的初始化

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {    //扩容步长    private static final int INDEX_INCREMENT = 4;    //缩容步长    private static final int INDEX_DECREMENT = 1;    //RecvBuf调配容量表(扩缩容索引表)依照表中记录的容量大小进行扩缩容    private static final int[] SIZE_TABLE;   static {        //初始化RecvBuf容量调配表        List<Integer> sizeTable = new ArrayList<Integer>();        //当调配容量小于512时,扩容单位为16递增        for (int i = 16; i < 512; i += 16) {            sizeTable.add(i);        }        //当调配容量大于512时,扩容单位为一倍        for (int i = 512; i > 0; i <<= 1) {            sizeTable.add(i);        }        //初始化RecbBuf扩缩容索引表        SIZE_TABLE = new int[sizeTable.size()];        for (int i = 0; i < SIZE_TABLE.length; i ++) {            SIZE_TABLE[i] = sizeTable.get(i);        }    }}

AdaptiveRecvByteBufAllocator 次要的作用就是为接收数据的ByteBuffer进行扩容缩容,那么每次怎么扩容?扩容多少?怎么缩容?缩容多少呢??

这四个问题将是本大节笔者要为大家解答的内容~~~

Netty中定义了一个int型的数组SIZE_TABLE 来存储每个扩容单位对应的容量大小。建设起扩缩容的容量索引表。每次扩容多少,缩容多少全副记录在这个容量索引表中。

在AdaptiveRecvByteBufAllocatorl类初始化的时候会在static{}动态代码块中对扩缩容索引表SIZE_TABLE 进行初始化。

从源码中咱们能够看出SIZE_TABLE 的初始化分为两个局部:

  • 当索引容量小于512时,SIZE_TABLE 中定义的容量索引是从16开始16递增。

  • 当索引容量大于512时,SIZE_TABLE 中定义的容量索引是按前一个索引容量的2倍递增。

4.3 扩缩容逻辑

当初扩缩容索引表SIZE_TABLE 曾经初始化结束了,那么当咱们须要对ByteBuffer进行扩容或者缩容的时候如何依据SIZE_TABLE 决定扩容多少或者缩容多少呢??

这就用到了在AdaptiveRecvByteBufAllocator类中定义的扩容步长INDEX_INCREMENT = 4,缩容步长INDEX_DECREMENT = 1了。

咱们就以下面两副扩缩容容量索引表SIZE_TABLE 中的容量索引展现截图为例,来介绍下扩缩容逻辑,假如咱们以后ByteBuffer的容量索引为33,对应的容量为2048

4.3.1 扩容

当对容量为2048的ByteBuffer进行扩容时,依据以后的容量索引index = 33 加上 扩容步长INDEX_INCREMENT = 4计算出扩容后的容量索引为37,那么扩缩容索引表SIZE_TABLE下标37对应的容量就是本次ByteBuffer扩容后的容量SIZE_TABLE[37] = 32768

4.3.1 缩容

同理对容量为2048的ByteBuffer进行缩容时,咱们就须要用以后容量索引index = 33 减去 缩容步长INDEX_DECREMENT = 1计算出缩容后的容量索引32,那么扩缩容索引表SIZE_TABLE下标32对应的容量就是本次ByteBuffer缩容后的容量SIZE_TABLE[32] = 1024

4.4 扩缩容机会

public abstract class AbstractNioByteChannel extends AbstractNioChannel {        @Override        public final void read() {            .........省略......            try {                do {                      .........省略......                } while (allocHandle.continueReading());                //依据本次read loop总共读取的字节数,决定下次是否扩容或者缩容                allocHandle.readComplete();                .........省略.........            } catch (Throwable t) {                ...............省略...............            } finally {               ...............省略...............            }        }}

在每轮read loop完结之后,咱们都会调用allocHandle.readComplete()来依据在allocHandle中统计的在本轮read loop中读取字节总大小,来决定在下一轮read loop中是否对DirectByteBuffer进行扩容或者缩容。

public abstract class MaxMessageHandle implements ExtendedHandle {       @Override       public void readComplete() {                //是否对recvbuf进行扩容缩容                record(totalBytesRead());       }       private void record(int actualReadBytes) {            if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {                if (decreaseNow) {                    index = max(index - INDEX_DECREMENT, minIndex);                    nextReceiveBufferSize = SIZE_TABLE[index];                    decreaseNow = false;                } else {                    decreaseNow = true;                }            } else if (actualReadBytes >= nextReceiveBufferSize) {                index = min(index + INDEX_INCREMENT, maxIndex);                nextReceiveBufferSize = SIZE_TABLE[index];                decreaseNow = false;            }        }        }

咱们以以后ByteBuffer容量为2048,容量索引index = 33为例,对allocHandle的扩容缩容规定进行阐明。

扩容步长INDEX_INCREMENT = 4,缩容步长INDEX_DECREMENT = 1

4.4.1 缩容

  • 如果本次OP_READ事件理论读取到的总字节数actualReadBytes在SIZE_TABLE[index - INDEX_DECREMENT]与SIZE_TABLE[index]之间的话,也就是如果本轮read loop完结之后总共读取的字节数在[1024,2048]之间。阐明此时调配的ByteBuffer容量正好,不须要进行缩容也不须要进行扩容。
    比方本次actualReadBytes = 2000,正好处在10242048之间。阐明2048的容量正好。
  • 如果actualReadBytes 小于等于 SIZE_TABLE[index - INDEX_DECREMENT],也就是如果本轮read loop完结之后总共读取的字节数小于等于1024。示意本次读取到的字节数比以后ByteBuffer容量的下一级容量还要小,阐明以后ByteBuffer的容量调配的有些大了,设置缩容标识decreaseNow = true。当下次OP_READ事件持续满足缩容条件的时候,开始真正的进行缩容。缩容后的容量为SIZE_TABLE[index - INDEX_DECREMENT],但不能小于SIZE_TABLE[minIndex]。
留神须要满足两次缩容条件才会进行缩容,且缩容步长为1,缩容比拟审慎

4.4.2 扩容

如果本次OP_READ事件解决总共读取的字节数actualReadBytes 大于等于 以后ByteBuffer容量(nextReceiveBufferSize)时,阐明ByteBuffer调配的容量有点小了,须要进行扩容。扩容后的容量为SIZE_TABLE[index + INDEX_INCREMENT],但不能超过SIZE_TABLE[maxIndex]。

满足一次扩容条件就进行扩容,并且扩容步长为4, 扩容比拟奔放

4.5 AdaptiveRecvByteBufAllocator类的实例化

AdaptiveRecvByteBufAllocator类的实例化次要是确定ByteBuffer的初始容量,以及最小容量和最大容量在扩缩容索引表SIZE_TABLE中的下标:minIndex maxIndex

AdaptiveRecvByteBufAllocator定义了三个对于ByteBuffer容量的字段:

  • DEFAULT_MINIMUM :示意ByteBuffer最小的容量,默认为64,也就是无论ByteBuffer在怎么缩容,容量也不会低于64
  • DEFAULT_INITIAL :示意ByteBuffer的初始化容量。默认为2048
  • DEFAULT_MAXIMUM :示意ByteBuffer的最大容量,默认为65536,也就是无论ByteBuffer在怎么扩容,容量也不会超过65536
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {    static final int DEFAULT_MINIMUM = 64;    static final int DEFAULT_INITIAL = 2048;    static final int DEFAULT_MAXIMUM = 65536;    public AdaptiveRecvByteBufAllocator() {        this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);    }    public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {                .................省略异样查看逻辑.............        //计算minIndex maxIndex        //在SIZE_TABLE中二分查找最小 >= minimum的容量索引 :3        int minIndex = getSizeTableIndex(minimum);        if (SIZE_TABLE[minIndex] < minimum) {            this.minIndex = minIndex + 1;        } else {            this.minIndex = minIndex;        }        //在SIZE_TABLE中二分查找最大 <= maximum的容量索引 :38        int maxIndex = getSizeTableIndex(maximum);        if (SIZE_TABLE[maxIndex] > maximum) {            this.maxIndex = maxIndex - 1;        } else {            this.maxIndex = maxIndex;        }        this.initial = initial;    }}

接下来的事件就是确定最小容量DEFAULT_MINIMUM 在SIZE_TABLE中的下标minIndex,以及最大容量DEFAULT_MAXIMUM 在SIZE_TABLE中的下标maxIndex

从AdaptiveRecvByteBufAllocator类初始化的过程中,咱们能够看出SIZE_TABLE中存储的数据特色是一个有序的汇合。

咱们能够通过二分查找在SIZE_TABLE中找出第一个容量大于等于DEFAULT_MINIMUM的容量索引minIndex

同理通过二分查找在SIZE_TABLE中找出最初一个容量小于等于DEFAULT_MAXIMUM的容量索引maxIndex

依据上一大节对于SIZE_TABLE中容量数据分布的截图,咱们能够看出minIndex = 3maxIndex = 38

4.5.1 二分查找容量索引下标

    private static int getSizeTableIndex(final int size) {        for (int low = 0, high = SIZE_TABLE.length - 1;;) {            if (high < low) {                return low;            }            if (high == low) {                return high;            }            int mid = low + high >>> 1;//无符号右移,高位始终补0            int a = SIZE_TABLE[mid];            int b = SIZE_TABLE[mid + 1];            if (size > b) {                low = mid + 1;            } else if (size < a) {                high = mid - 1;            } else if (size == a) {                return mid;            } else {                return mid + 1;            }        }    }
常常刷LeetCode的小伙伴必定一眼就看出这个是二分查找的模板了。

它的目标就是依据给定容量,在扩缩容索引表SIZE_TABLE中,通过二分查找找到最贴近给定size的容量的索引下标(第一个大于等于 size的容量)

4.6 RecvByteBufAllocator.Handle

前边咱们提到最终动静调整ByteBuffer容量的是由AdaptiveRecvByteBufAllocator中的Handler负责的,咱们来看下这个allocHandle的创立过程。

protected abstract class AbstractUnsafe implements Unsafe {        private RecvByteBufAllocator.Handle recvHandle;        @Override        public RecvByteBufAllocator.Handle recvBufAllocHandle() {            if (recvHandle == null) {                recvHandle = config().getRecvByteBufAllocator().newHandle();            }            return recvHandle;        }}

从allocHandle的获取过程咱们看到最allocHandle的创立是由AdaptiveRecvByteBufAllocator#newHandle办法执行的。

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {    @Override    public Handle newHandle() {        return new HandleImpl(minIndex, maxIndex, initial);    }    private final class HandleImpl extends MaxMessageHandle {        //最小容量在扩缩容索引表中的index        private final int minIndex;        //最大容量在扩缩容索引表中的index        private final int maxIndex;        //以后容量在扩缩容索引表中的index 初始33 对应容量2048        private int index;        //预计下一次调配buffer的容量,初始:2048        private int nextReceiveBufferSize;        //是否缩容        private boolean decreaseNow;        HandleImpl(int minIndex, int maxIndex, int initial) {            this.minIndex = minIndex;            this.maxIndex = maxIndex;            //在扩缩容索引表中二分查找到最小大于等于initial 的容量            index = getSizeTableIndex(initial);            //2048            nextReceiveBufferSize = SIZE_TABLE[index];        }        .......................省略...................    }}

这里咱们看到Netty中用于动静调整ByteBuffer容量的allocHandle 的理论类型为MaxMessageHandle

上面咱们来介绍下HandleImpl 中的外围字段,它们都和ByteBuffer的容量无关:

  • minIndex :最小容量在扩缩容索引表SIZE_TABE中的index。默认是3
  • maxIndex :最大容量在扩缩容索引表SIZE_TABE中的index。默认是38
  • index :以后容量在扩缩容索引表SIZE_TABE中的index。初始是33
  • nextReceiveBufferSize :预计下一次调配buffer的容量,初始为2048。在每次申请内存调配ByteBuffer的时候,采纳nextReceiveBufferSize的值指定容量。
  • decreaseNow : 是否须要进行缩容。

5. 应用堆外内存为ByteBuffer分配内存

AdaptiveRecvByteBufAllocator类只是负责动静调整ByteBuffer的容量,而具体为ByteBuffer申请内存空间的是由PooledByteBufAllocator负责。

5.1 类名前缀Pooled的来历

在咱们应用Java进行日常开发过程中,在为对象分配内存空间的时候咱们都会抉择在JVM堆中为对象分配内存,这样做对咱们Java开发者特地的敌对,咱们只管应用就好而不用过多关怀这块申请的内存如何回收,因为JVM堆齐全受Java虚拟机管制治理,Java虚构机会帮忙咱们回收不再应用的内存。

然而JVM在进行垃圾回收时候的stop the world会对咱们应用程序的性能造成肯定的影响。

除此之外咱们在《聊聊Netty那些事儿之从内核角度看IO模型》一文中介绍IO模型的时候提到,当数据达到网卡时,网卡会通过DMA的形式将数据拷贝到内核空间中,这是第一次拷贝。当用户线程在用户空间发动零碎IO调用时,CPU会将内核空间的数据再次拷贝到用户空间。这是第二次拷贝

于此不同的是当咱们在JVM中发动IO调用时,比方咱们应用JVM堆内存读取Socket接收缓冲区中的数据时,会多一次内存拷贝,CPU在第二次拷贝中将数据从内核空间拷贝到用户空间时,此时的用户空间站在JVM角度是堆外内存,所以还须要将堆外内存中的数据拷贝到堆内内存中。这就是第三次内存拷贝

同理当咱们在JVM中发动IO调用向Socket发送缓冲区写入数据时,JVM会将IO数据先拷贝堆外内存,而后能力发动零碎IO调用。

那为什么操作系统不间接应用JVM的堆内内存进行IO操作呢?

因为JVM的内存布局和操作系统调配的内存是不一样的,操作系统不可能依照JVM标准来读写数据,所以就须要第三次拷贝两头做个转换将堆外内存中的数据拷贝到JVM堆中。


所以基于上述内容,在应用JVM堆内内存时会产生以下两点性能影响:

  1. JVM在垃圾回收堆内内存时,会产生stop the world导致应用程序卡顿。
  2. 在进行IO操作的时候,会多产生一次由堆外内存到堆内内存的拷贝。

基于以上两点应用JVM堆内内存对性能造成的影响,于是对性能有卓越谋求的Netty采纳堆外内存也就是DirectBuffer来为ByteBuffer分配内存空间。

采纳堆外内存为ByteBuffer分配内存的益处就是:

  • 堆外内存间接受操作系统的治理,不会受JVM的治理,所以JVM垃圾回收对应用程序的性能影响就没有了。
  • 网络数据达到之后间接在堆外内存上接管,过程读取网络数据时间接在堆外内存中读取,所以就防止了第三次内存拷贝

所以Netty在进行 I/O 操作时都是应用的堆外内存,能够防止数据从 JVM 堆内存到堆外内存的拷贝。然而因为堆外内存不受JVM的治理,所以就须要额定关注对内存的应用和开释,稍有不慎就会造成内存泄露,于是Netty就引入了内存池堆外内存进行对立治理。

PooledByteBufAllocator类的这个前缀Pooled就是内存池的意思,这个类会应用Netty的内存池为ByteBuffer调配堆外内存

5.2 PooledByteBufAllocator的创立

创立机会

在服务端NioServerSocketChannel的配置类NioServerSocketChannelConfig以及客户端NioSocketChannel的配置类NioSocketChannelConfig实例化的时候会触发PooledByteBufAllocator的创立。

public class DefaultChannelConfig implements ChannelConfig {    //PooledByteBufAllocator    private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;    ..........省略......}

创立进去的PooledByteBufAllocator实例保留在DefaultChannelConfig类中的ByteBufAllocator allocator字段中。

创立过程

public interface ByteBufAllocator {    ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;        ..................省略............}
public final class ByteBufUtil {    static final ByteBufAllocator DEFAULT_ALLOCATOR;    static {        String allocType = SystemPropertyUtil.get(                "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");        allocType = allocType.toLowerCase(Locale.US).trim();        ByteBufAllocator alloc;        if ("unpooled".equals(allocType)) {            alloc = UnpooledByteBufAllocator.DEFAULT;            logger.debug("-Dio.netty.allocator.type: {}", allocType);        } else if ("pooled".equals(allocType)) {            alloc = PooledByteBufAllocator.DEFAULT;            logger.debug("-Dio.netty.allocator.type: {}", allocType);        } else {            alloc = PooledByteBufAllocator.DEFAULT;            logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);        }        DEFAULT_ALLOCATOR = alloc;                ...................省略..................    }}

从ByteBufUtil类的初始化过程咱们能够看出,在为ByteBuffer分配内存的时候是否应用内存池在Netty中是能够配置的。

  • 通过零碎变量-D io.netty.allocator.type 能够配置是否应用内存池为ByteBuffer分配内存。默认状况下是须要应用内存池的。然而在安卓零碎中默认是不应用内存池的。
  • 通过PooledByteBufAllocator.DEFAULT获取内存池ByteBuffer分配器
   public static final PooledByteBufAllocator DEFAULT =            new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
因为本文的主线是介绍Sub Reactor解决OP_READ事件的残缺过程,所以这里只介绍主线相干的内容,这里只是简略介绍下在接收数据的时候为什么会用PooledByteBufAllocator来为ByteBuffer分配内存。而内存池的架构设计比较复杂,所以笔者前面会独自写一篇对于Netty内存治理的文章。

总结

本文介绍了Sub Reactor线程在解决OP_READ事件的整个过程。并深刻分析了AdaptiveRecvByteBufAllocator类动静调整ByteBuffer容量的原理。

同时也介绍了Netty为什么会应用堆外内存来为ByteBuffer分配内存,并由此引出了Netty的内存池分配器PooledByteBufAllocator 。

在介绍AdaptiveRecvByteBufAllocator类和PooledByteBufAllocator一起组合实现动静地为ByteBuffer调配容量的时候,笔者不禁想起了多年前看过的《Effective Java》中第16条 复合优先于继承

Netty在这里也遵循了这条军规,首先两个类设计的都是繁多的性能。

  • AdaptiveRecvByteBufAllocator类只负责动静的调整ByteBuffer容量,并不论具体的内存调配。
  • PooledByteBufAllocator类负责具体的内存调配,用内存池的形式。

这样设计的就比拟灵便,具体内存调配的工作交给具体的ByteBufAllocator,能够应用内存池的调配形式PooledByteBufAllocator,也能够不应用内存池的调配形式UnpooledByteBufAllocator。具体的内存能够采纳JVM堆内内存(HeapBuffer),也能够应用堆外内存(DirectBuffer)。

AdaptiveRecvByteBufAllocator只须要关注调整它们的容量工作就能够了,而并不需要关注它们具体的内存调配形式。

最初通过io.netty.channel.RecvByteBufAllocator.Handle#allocate办法灵便组合不同的内存调配形式。这也是装璜模式的一种利用。

byteBuf = allocHandle.allocate(allocator);

好了,明天的内容就到这里,咱们下篇文章见~~~~