关于netty:Netty-如何高效接收网络数据一文聊透-ByteBuffer-动态自适应扩缩容机制

10次阅读

共计 26553 个字符,预计需要花费 67 分钟才能阅读完成。

本系列 Netty 源码解析文章基于 4.1.56.Final版本

前文回顾

在前边的系列文章中,咱们从内核如何收发网络数据开始以一个 C10K 的问题作为主线具体从内核角度论述了网络 IO 模型的演变,最终在此基础上引出了 Netty 的网络 IO 模型如下图所示:

具体内容可回看《从内核角度看 IO 模型的演变》

后续咱们又围绕着 Netty 的主从 Reactor 网络 IO 线程模型,在《Reactor 模型在 Netty 中的实现》一文中具体论述了 Netty 的主从 Reactor 模型的创立,以及介绍了 Reactor 模型的要害组件。搭建了 Netty 的外围骨架如下图所示:

在外围骨架搭建结束之后,咱们随后又在《具体图解 Reactor 启动全流程》一文中论述了 Reactor 启动的全流程,一个十分重要的外围组件 NioServerSocketChannel 开始在这里首次亮相,承当着一个网络框架最重要的工作 – 高效接管网络连接。咱们介绍了 NioServerSocketChannel 的创立,初始化,向 Main Reactor 注册并监听 OP_ACCEPT 事件的整个流程。在此基础上,Netty 得以整装待发,严阵以待开始迎接海量的客户端连贯。

随后紧接着咱们在《Netty 如何高效接管网络连接》一文中具体介绍了 Netty 高效接管客户端网络连接的全流程,在这里 Netty 的外围重要组件 NioServerSocketChannel 开始正是退场,在 NioServerSocketChannel 中咱们创立了客户端连贯 NioSocketChannel,并具体介绍了 NioSocketChannel 的初始化过程,随后通过在 NioServerSocketChannel 的 pipeline 中触发 ChannelRead 事件,并最终在 ServerBootstrapAcceptor 中将客户端连贯 NioSocketChannel 注册到 Sub Reactor 中开始监听客户端连贯上的 OP_READ 事件,筹备接管客户端发送的网络数据也就是本文的主题内容。

自此 Netty 的外围组件全副就绪并启动结束,开始腾飞~~~

之前文章中的配角是 Netty 中主 Reactor 组中的 Main Reactor 以及注册在 Main Reactor 上边的 NioServerSocketChannel,那么从本文开始,咱们文章中的配角就切换为 Sub Reactor 以及注册在 SubReactor 上的 NioSocketChannel 了。

上面就让咱们正式进入明天的主题,看一下 Netty 是如何解决 OP_READ 事件以及如何高效接管网络数据的。

1. Sub Reactor 解决 OP_READ 事件流程总览

客户端发动零碎 IO 调用向服务端发送数据之后,当网络数据达到服务端的网卡并通过内核协定栈的解决,最终数据达到 Socket 的接收缓冲区之后,Sub Reactor 轮询到 NioSocketChannel 上的 OP_READ 事件 就绪,随后 Sub Reactor 线程就会从 JDK Selector 上的阻塞轮询 APIselector.select(timeoutMillis)调用中返回。转而去解决 NioSocketChannel 上的OP_READ 事件

留神这里的 Reactor 为负责解决客户端连贯的 Sub Reactor。连贯的类型为 NioSocketChannel,解决的事件为 OP_READ 事件。

在之前的文章中笔者曾经屡次强调过了,Reactor 在解决 Channel 上的 IO 事件入口函数为NioEventLoop#processSelectedKey

public final class NioEventLoop extends SingleThreadEventLoop {private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        .............. 省略.................

        try {int readyOps = k.readyOps();

            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {.............. 解决 OP_CONNECT 事件.................}


            if ((readyOps & SelectionKey.OP_WRITE) != 0) {.............. 解决 OP_WRITE 事件.................}


            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                // 本文重点解决 OP_ACCEPT 事件
                unsafe.read();}
        } catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());
        }
    }

}

这里须要重点强调的是,以后的执行线程当初曾经变成了 Sub Reactor,而 Sub Reactor 上注册的正是 netty 客户端 NioSocketChannel 负责解决连贯上的读写事件。

所以这里入口函数的参数 AbstractNioChannel ch 则是 IO 就绪的客户端连贯NioSocketChannel

结尾通过 ch.unsafe() 获取到的 NioUnsafe 操作类正是 NioSocketChannel 中对底层 JDK NIO SocketChannel 的 Unsafe 底层操作类。实现类型为 NioByteUnsafe 定义在下图继承构造中的 AbstractNioByteChannel 父类中。

上面咱们到 NioByteUnsafe#read 办法中来看下 Netty 对 OP_READ 事件 的具体处理过程:

2. Netty 接管网络数据流程总览

咱们间接依照老规矩,先从整体上把整个 OP_READ 事件的逻辑解决框架提取进去,让大家先总体仰视下流程全貌,而后在针对每个外围点位进行各个击破。

流程中相干置灰的步骤为 Netty 解决连贯敞开时的逻辑,和本文宗旨无关,咱们这里临时疏忽,等后续笔者介绍连贯敞开时,会独自开一篇文章具体为大家介绍。

从下面这张 Netty 接管网络数据总体流程图能够看出 NioSocketChannel 在接管网络数据的整个流程和咱们在上篇文章《Netty 如何高效接管网络连接》中介绍的 NioServerSocketChannel 在接管客户端连贯时的流程在总体框架上是一样的。

NioSocketChannel 在接管网络数据的过程解决中,也是通过在一个 do{....}while(...) 循环 read loop 中一直的循环读取连贯 NioSocketChannel 上的数据。

同样在 NioSocketChannel 读取连贯数据的 read loop 中也是受最大读取次数的限度。默认配置最多只能读取 16 次,超过 16 次无论此时 NioSocketChannel 中是否还有数据可读都不能在进行读取了。

这里 read loop 循环最大读取次数可在启动配置类 ServerBootstrap 中通过 ChannelOption.MAX_MESSAGES_PER_READ 选项设置,默认为 16。

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
  .channel(NioServerSocketChannel.class)
  .option(ChannelOption.MAX_MESSAGES_PER_READ, 自定义次数)

Netty 这里为什么非得限度 read loop 的最大读取次数呢?为什么不在 read loop 中一次性把数据读取完呢?

这时候就是考验咱们大局观的时候了,在前边的文章介绍中咱们提到 Netty 的 IO 模型为主从 Reactor 线程组模型,在 Sub Reactor Group 中蕴含了多个 Sub Reactor 专门用于监听解决客户端连贯上的 IO 事件。

为了可能高效有序的解决全量客户端连贯上的读写事件,Netty 将服务端承载的全量客户端连贯摊派到多个 Sub Reactor 中解决,同时也能保障Channel 上 IO 解决的线程安全性

其中一个 Channel 只能调配给一个固定的 Reactor。一个 Reactor 负责解决多个 Channel 上的 IO 就绪事件,Reactor 与 Channel 之间的对应关系如下图所示:

而一个 Sub Reactor 上注册了多个 NioSocketChannel,Netty 不可能在一个 NioSocketChannel 上无限度的解决上来,要将读取数据的机会平均摊派给其余 NioSocketChannel,所以须要限定每个 NioSocketChannel 上的最大读取次数。

此外,Sub Reactor 除了须要监听解决所有注册在它上边的 NioSocketChannel 中的 IO 就绪事件之外,还须要腾出事件来解决有用户线程提交过去的异步工作。从这一点看,Netty 也不会始终停留在 NioSocketChannel 的 IO 解决上。所以限度 read loop 的最大读取次数是十分必要的。

对于 Reactor 的整体运行架构,对细节局部感兴趣的同学能够回看下笔者的《一文聊透 Netty 外围引擎 Reactor 的运行架构》这篇文章。

所以基于这个起因,咱们须要在 read loop 循环中,每当通过 doReadBytes 办法从 NioSocketChannel 中读取到数据时(办法返回值会大于 0,并记录在 allocHandle.lastBytesRead 中),都须要通过 allocHandle.incMessagesRead(1) 办法统计曾经读取的次数。当达到 16 次时不论 NioSocketChannel 是否还有数据可读,都须要在 read loop 开端退出循环。转去执行 Sub Reactor 上的异步工作。以及其余 NioSocketChannel 上的 IO 就绪事件。平均分配,雨露均沾!!

public abstract class MaxMessageHandle implements ExtendedHandle {

        //read loop 总共读取了多少次
        private int totalMessages;

       @Override
        public final void incMessagesRead(int amt) {totalMessages += amt;}

}

本次 read loop 读取到的数据大小会记录在 allocHandle.lastBytesRead

public abstract class MaxMessageHandle implements ExtendedHandle {

         // 本次 read loop 读取到的字节数
        private int lastBytesRead;
        // 整个 read loop 循环总共读取的字节数
        private int totalBytesRead;

        @Override
        public void lastBytesRead(int bytes) {
            lastBytesRead = bytes;
            if (bytes > 0) {totalBytesRead += bytes;}
        }
}
  • lastBytesRead < 0:示意客户端被动发动了连贯敞开流程,Netty 开始连贯敞开解决流程。这个和本文的宗旨无关,咱们先不必管。前面笔者会专门用一篇文章来详解敞开流程。
  • lastBytesRead = 0:示意以后 NioSocketChannel 上的数据曾经全副读取结束,没有数据可读了。本次 OP_READ 事件圆满处理完毕,能够开开心心的退出 read loop。
  • lastBytesRead > 0:示意在本次 read loop 中从 NioSocketChannel 中读取到了数据,会在 NioSocketChannel 的 pipeline 中触发 ChannelRead 事件。进而在 pipeline 中负责 IO 解决的 ChannelHandelr 中响应,解决网络申请。
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {....... 解决网络申请,比方解码, 反序列化等操作.......}
}

最初会在 read loop 循环的开端调用 allocHandle.continueReading() 判断是否完结本次 read loop 循环。这里的完结循环条件的判断会比咱们在介绍 NioServerSocketChannel 接管连贯时的判断条件简单很多,笔者会将这个判断条件的具体解析放在文章前面细节局部为大家解读,这里大家只须要把握总体外围流程,不须要关注太多细节。

总体上在 NioSocketChannel 中读取网络数据的 read loop 循环完结条件须要满足以下几点:

  • 以后 NioSocketChannel 中的数据曾经全副读取结束,则退出循环。
  • 本轮 read loop 如果没有读到任何数据,则退出循环。
  • read loop 的读取次数达到 16 次,退出循环。

当满足这里的 read loop 退出条件之后,Sub Reactor 线程就会退出循环,随后会调用 allocHandle.readComplete() 办法依据本轮 read loop 总共读取到的字节数 totalBytesRead 来决定是否对用于接管下一轮 OP_READ 事件数据的 ByteBuffer 进行扩容或者缩容。

最初在 NioSocketChannel 的 pipeline 中触发ChannelReadComplete 事件,告诉 ChannelHandler 本次 OP_READ 事件曾经处理完毕。


public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {....... 解决网络申请,比方解码, 反序列化等操作.......}

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ...... 本次 OP_READ 事件处理完毕.......
        ...... 决定是否向客户端响应处理结果......
    }
}

2.1 ChannelRead 与 ChannelReadComplete 事件的区别

有些小伙伴可能对 Netty 中的一些流传事件触发的机会,或者事件之间的区别了解的不是很分明,概念容易混同。在前面的文章中笔者也会从源码的角度登程给大家说分明 Netty 中定义的所有异步事件,以及这些事件之间的区别和分割和触发机会,流传机制。

这里咱们次要探讨本文主题中波及到的两个事件:ChannelRead 事件与 ChannelReadComplete 事件。

从上述介绍的 Netty 接管网络数据流程总览中咱们能够看出 ChannelRead 事件ChannelReadComplete 事件 是不一样的,然而对于刚接触 Netty 的小伙伴来说从命名上乍一看感觉又差不多。

上面咱们来看这两个事件之间的差异:

Netty 服务端对于一次 OP_READ 事件的解决,会在一个 do{}while() 循环 read loop 中分屡次从客户端 NioSocketChannel 中读取网络数据。每次读取咱们调配的 ByteBuffer 容量大小,初始容量为 2048。

  • ChanneRead 事件:一次循环读取一次数据,就触发一次ChannelRead 事件。本次最多读取在 read loop 循环开始调配的 DirectByteBuffer 容量大小。这个容量会动静调整,文章后续笔者会具体介绍。
  • ChannelReadComplete 事件 :当读取不到数据或者不满足continueReading 的任意一个条件就会退出 read loop,这时就会触发 ChannelReadComplete 事件。示意本次OP_READ 事件 处理完毕。

这里须要特地留神下 触发 ChannelReadComplete 事件 并不代表 NioSocketChannel 中的数据曾经读取完了,只能阐明本次 OP_READ 事件 处理完毕。因为有可能是客户端发送的数据太多,Netty 读了 16 次 还没读完,那就只能等到下次 OP_READ 事件 到来的时候在进行读取了。


以上内容就是 Netty 在接管客户端发送网络数据的全副外围逻辑。目前为止咱们还未波及到这部分的骨干外围源码,笔者想的是先给大家把外围逻辑解说分明之后,这样了解起来外围骨干源码会更加清晰透彻。

通过前边对网络数据接管的外围逻辑介绍,笔者在把这张流程图放进去,大家能够联合这张图在来回忆下骨干外围逻辑。

上面笔者会联合这张流程图,给大家把这部分的外围骨干源码框架展示进去,大家能够将咱们介绍过的外围逻辑与骨干源码做个一一对应,还是那句老话,咱们要从骨干框架层面把握整体解决流程,不须要读懂每一行代码,文章后续笔者会将这个过程中波及到的外围点位给大家拆开来各个击破!!

3. 源码外围框架总览

        @Override
        public final void read() {final ChannelConfig config = config();

            ............... 解决半敞开相干代码省略...............
            // 获取 NioSocketChannel 的 pipeline
            final ChannelPipeline pipeline = pipeline();
            //PooledByteBufAllocator 具体用于理论调配 ByteBuf 的分配器
            final ByteBufAllocator allocator = config.getAllocator();
            // 自适应 ByteBuf 分配器 AdaptiveRecvByteBufAllocator , 用于动静调节 ByteBuf 容量
            // 须要与具体的 ByteBuf 分配器配合应用 比方这里的 PooledByteBufAllocator
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            //allocHandler 用于统计每次读取数据的大小,不便下次调配适合大小的 ByteBuf
            // 重置革除上次的统计指标
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    // 利用 PooledByteBufAllocator 调配适合大小的 byteBuf 初始大小为 2048
                    byteBuf = allocHandle.allocate(allocator);
                    // 记录本次读取了多少字节数
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    // 如果本次没有读取到任何字节,则退出循环 进行下一轮事件轮询
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {...... 示意客户端发动连贯敞开.....}
                        break;
                    }

                    //read loop 读取数据次数 +1
                    allocHandle.incMessagesRead(1);
                    // 客户端 NioSocketChannel 的 pipeline 中触发 ChannelRead 事件
                    pipeline.fireChannelRead(byteBuf);
                    // 解除本次读取数据调配的 ByteBuffer 援用,不便下一轮 read loop 调配
                    byteBuf = null;
                } while (allocHandle.continueReading());// 判断是否应该持续 read loop

                // 依据本次 read loop 总共读取的字节数,决定下次是否扩容或者缩容
                allocHandle.readComplete();
                // 在 NioSocketChannel 的 pipeline 中触发 ChannelReadComplete 事件,示意一次 read 事件处理完毕
                // 但这并不示意 客户端发送来的数据曾经全副读完,因为如果数据太多的话,这里只会读取 16 次,剩下的会等到下次 read 事件到来后在解决
                pipeline.fireChannelReadComplete();

                ......... 省略连贯敞开流程解决.........
            } catch (Throwable t) {............... 省略...............} finally {............... 省略...............}
        }
    }

这里再次强调下以后执行线程为 Sub Reactor 线程,解决连贯数据读取逻辑是在 NioSocketChannel 中。

首先通过 config() 获取客户端 NioSocketChannel 的 Channel 配置类 NioSocketChannelConfig。

通过 pipeline() 获取 NioSocketChannel 的 pipeline。咱们在《具体图解 Netty Reactor 启动全流程》一文中提到的 Netty 服务端模板所举的示例中,NioSocketChannelde pipeline 中只有一个 EchoChannelHandler。

3.1 调配 DirectByteBuffer 接管网络数据

Sub Reactor 在接管 NioSocketChannel 上的 IO 数据时,都会调配一个 ByteBuffer 用来寄存接管到的 IO 数据。

这里大家可能感觉比拟奇怪,为什么在 NioSocketChannel 接收数据这里会有两个 ByteBuffer 分配器呢?一个是 ByteBufAllocator,另一个是 RecvByteBufAllocator。

    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();

这两个 ByteBuffer 又有什么区别和分割呢?

在上篇文章《抓到 Netty 一个 Bug,顺带来透彻地聊一下 Netty 是如何高效接管网络连接》中,笔者为了论述上篇文章中提到的 Netty 在接管网络连接时的 Bug 时,简略和大家介绍了下这个 RecvByteBufAllocator。

在上篇文章提到的 NioServerSocketChannelConfig 中,这里的 RecvByteBufAllocator 类型为 ServerChannelRecvByteBufAllocator。

还记得这个 ServerChannelRecvByteBufAllocator 类型在 4.1.69.final 版本引入是为了解决笔者在上篇文章中提到的那个 Bug 吗?在 4.1.69.final 版本之前,NioServerSocketChannelConfig 中的 RecvByteBufAllocator 类型为 AdaptiveRecvByteBufAllocator。

而在本文中 NioSocketChannelConfig 中的 RecvByteBufAllocator 类型为 AdaptiveRecvByteBufAllocator。

所以这里 recvBufAllocHandle() 取得到的 RecvByteBufAllocator 为 AdaptiveRecvByteBufAllocator。顾名思义,这个类型的 RecvByteBufAllocator 能够依据 NioSocketChannel 上每次到来的 IO 数据大小来自适应动静调整 ByteBuffer 的容量。

对于客户端 NioSocketChannel 来说,它里边蕴含的 IO 数据时客户端发送来的网络数据,长度是不定的,所以才会须要这样一个能够依据每次 IO 数据的大小来自适应动静调整容量的 ByteBuffer 来接管。

如果咱们把用于接收数据用的 ByteBuffer 看做一个桶的话,那么小数据用大桶装或者大数据用小桶装必定是不适合的,所以咱们须要依据接收数据的大小来动静调整桶的容量。而 AdaptiveRecvByteBufAllocator 的作用正是用来依据每次接收数据的容量大小来动静调整 ByteBuffer 的容量的。

当初 RecvByteBufAllocator 笔者为大家解释分明了,接下来咱们持续看 ByteBufAllocator。

大家这里须要留神的是 AdaptiveRecvByteBufAllocator 并不会真正的去调配 ByteBuffer,它只是负责动静调整调配 ByteBuffer 的大小。

而真正具体执行内存调配动作的是这里的 ByteBufAllocator 类型为 PooledByteBufAllocator。它会依据 AdaptiveRecvByteBufAllocator 动静调整进去的大小去真正的申请内存调配 ByteBuffer。

PooledByteBufAllocator 为 Netty 中的内存池,用来治理堆外内存 DirectByteBuffer。

AdaptiveRecvByteBufAllocator 中的 allocHandle 在上篇文章中咱们也介绍过了,它的理论类型为 MaxMessageHandle。

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

    @Override
    public Handle newHandle() {return new HandleImpl(minIndex, maxIndex, initial);
    }
    
    private final class HandleImpl extends MaxMessageHandle {................. 省略................}
}

在 MaxMessageHandle 中蕴含了用于动静调整 ByteBuffer 容量的统计指标。

   public abstract class MaxMessageHandle implements ExtendedHandle {
        private ChannelConfig config;

        // 用于管制每次 read loop 里最大能够循环读取的次数,默认为 16 次
        // 可在启动配置类 ServerBootstrap 中通过 ChannelOption.MAX_MESSAGES_PER_READ 选项设置。private int maxMessagePerRead;

        // 用于统计 read loop 中总共接管的连贯个数,NioSocketChannel 中示意读取数据的次数
        // 每次 read loop 循环后会调用 allocHandle.incMessagesRead 减少记录接管到的连贯个数
        private int totalMessages;

        // 用于统计在 read loop 中总共接管到客户端连贯上的数据大小
        private int totalBytesRead;

        // 示意本次 read loop 尝试读取多少字节,byteBuffer 残余可写的字节数
        private int attemptedBytesRead;

        // 本次 read loop 读取到的字节数
        private int lastBytesRead;
        
        // 预计下一次调配 buffer 的容量,初始:2048
        private int nextReceiveBufferSize;
        ........... 省略.............
}

在每轮 read loop 开始之前,都会调用 allocHandle.reset(config) 重置清空上一轮 read loop 的统计指标。

        @Override
        public void reset(ChannelConfig config) {
            this.config = config;
            // 默认每次最多读取 16 次
            maxMessagePerRead = maxMessagesPerRead();
            totalMessages = totalBytesRead = 0;
        }

在每次开始从 NioSocketChannel 中读取数据之前,须要利用 PooledByteBufAllocator 在内存池中为 ByteBuffer 分配内存,默认初始化大小为 2048,这个容量由guess() 办法 决定。

        byteBuf = allocHandle.allocate(allocator);
        @Override
        public ByteBuf allocate(ByteBufAllocator alloc) {return alloc.ioBuffer(guess());
        }

        @Override
        public int guess() {
            // 预计下一次调配 buffer 的容量,一开始为 2048
            return nextReceiveBufferSize;
        }

在每次通过 doReadBytes 从 NioSocketChannel 中读取到数据后,都会调用 allocHandle.lastBytesRead(doReadBytes(byteBuf)) 记录本次读取了多少字节数据,并统计本轮 read loop 目前总共读取了多少字节。

        @Override
        public void lastBytesRead(int bytes) {
            lastBytesRead = bytes;
            if (bytes > 0) {totalBytesRead += bytes;}
        }

每次循环从 NioSocketChannel 中读取数据之后,都会调用allocHandle.incMessagesRead(1)。统计以后曾经读取了多少次。如果超过了最大读取限度此时 16 次,就须要退出 read loop。去解决其余 NioSocketChannel 上的 IO 事件。

        @Override
        public final void incMessagesRead(int amt) {totalMessages += amt;}

在每次 read loop 循环的开端都须要通过调用 allocHandle.continueReading() 来判断是否持续 read loop 循环读取 NioSocketChannel 中的数据。

        @Override
        public boolean continueReading() {return continueReading(defaultMaybeMoreSupplier);
        }

        private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
            @Override
            public boolean get() {
                // 判断本次读取 byteBuffer 是否满载而归
                return attemptedBytesRead == lastBytesRead;
            }
        };

        @Override
        public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {return config.isAutoRead() &&
                   (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
                   totalMessages < maxMessagePerRead &&
                   totalBytesRead > 0;
        }
  • attemptedBytesRead :示意以后 ByteBuffer 预计尝试要写入的字节数。
  • lastBytesRead :示意本次 read loop 实在读取到了多少个字节。

defaultMaybeMoreSupplier 用于判断通过本次 read loop 读取数据后,ByteBuffer 是否满载而归。如果是满载而归的话(attemptedBytesRead == lastBytesRead),表明可能 NioSocketChannel 里还有数据。如果不是满载而归,表明 NioSocketChannel 里没有数据了曾经。

是否持续进行 read loop 须要 同时 满足以下几个条件:

  • totalMessages < maxMessagePerRead 以后读取次数是否曾经超过 16 次,如果超过,就退出do(...)while() 循环。进行下一轮 OP_READ 事件 的轮询。因为每个 Sub Reactor 治理了多个 NioSocketChannel,不能在一个 NioSocketChannel 上占用太多工夫,要将机会平均地调配给 Sub Reactor 所治理的所有 NioSocketChannel。
  • totalBytesRead > 0 本次 OP_READ 事件 解决是否读取到了数据,如果曾经没有数据可读了,那么就间接退出 read loop。
  • !respectMaybeMoreData || maybeMoreDataSupplier.get() 这个条件比较复杂,它其实就是通过 respectMaybeMoreData 字段来管制 NioSocketChannel 中可能还有数据可读的状况下该如何解决。

    • maybeMoreDataSupplier.get():true 示意本次读取从 NioSocketChannel 中读取数据,ByteBuffer 满载而归。阐明可能 NioSocketChannel 中还有数据没读完。fasle 示意 ByteBuffer 还没有装满,阐明 NioSocketChannel 中曾经没有数据可读了。
    • respectMaybeMoreData = true示意要对可能还有更多数据进行解决的这种状况要 respect 认真对待, 如果本次循环读取到的数据曾经装满 ByteBuffer,示意前面可能还有数据,那么就要进行读取。如果ByteBuffer 还没装满示意曾经没有数据可读了那么就退出循环。
    • respectMaybeMoreData = false示意对可能还有更多数据的这种状况不认真对待 not respect。不论本次循环读取数据 ByteBuffer 是否满载而归,都要持续进行读取,直到读取不到数据在退出循环,属于无脑读取。

同时满足以上三个条件,那么 read loop 持续进行。持续从 NioSocketChannel 中读取数据,直到读取不到或者不满足三个条件中的任意一个为止。

3.2 从 NioSocketChannel 中读取数据

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {

    @Override
    protected int doReadBytes(ByteBuf byteBuf) throws Exception {final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.attemptedBytesRead(byteBuf.writableBytes());    
        return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
    }
}

这里会间接调用底层 JDK NIO 的 SocketChannel#read 办法将数据读取到 DirectByteBuffer 中。读取数据大小为本次调配的 DirectByteBuffer 容量,初始为 2048。

4. ByteBuffer 动静自适应扩缩容机制

因为咱们一开始并不知道客户端会发送多大的网络数据,所以这里先利用 PooledByteBufAllocator 调配一个初始容量为 2048 的 DirectByteBuffer 用于接收数据。

  byteBuf = allocHandle.allocate(allocator);

这就好比咱们须要拿着一个桶去排队装水,然而第一次去装的时候,咱们并不知道管理员会给咱们调配多少水,桶拿大了也不适合拿小了也不适合,于是咱们就先预估一个差不多容量大小的桶,如果调配的多了,咱们下次就拿更大一点的桶,如果调配少了,下次咱们就拿一个小点的桶。

在这种场景下,咱们须要 ByteBuffer 能够主动依据每次网络数据的大小来动静自适应调整本人的容量。

而 ByteBuffer 动静自适应扩缩容机制依赖于 AdaptiveRecvByteBufAllocator 类的实现。让咱们先回到 AdaptiveRecvByteBufAllocator 类的创立终点开始说起~~

4.1 AdaptiveRecvByteBufAllocator 的创立

在前文《Netty 是如何高效接管网络连接》中咱们提到,当 Main Reactor 监听到 OP_ACCPET 事件沉闷后,会在 NioServerSocketChannel 中 accept 实现三次握手的客户端连贯。并创立 NioSocketChannel,随同着 NioSocketChannel 的创立其对应的配置类 NioSocketChannelConfig 类也会随之创立。

    public NioSocketChannel(Channel parent, SocketChannel socket) {super(parent, socket);
        config = new NioSocketChannelConfig(this, socket.socket());
    }

最终会在 NioSocketChannelConfig 的父类 DefaultChannelConfig 的结构器中创立 AdaptiveRecvByteBufAllocator 。并保留在RecvByteBufAllocator rcvBufAllocator 字段中。

public class DefaultChannelConfig implements ChannelConfig {

    // 用于 Channel 接收数据用的 buffer 分配器  AdaptiveRecvByteBufAllocator
    private volatile RecvByteBufAllocator rcvBufAllocator;

    public DefaultChannelConfig(Channel channel) {this(channel, new AdaptiveRecvByteBufAllocator());
    }

}

new AdaptiveRecvByteBufAllocator() 创立 AdaptiveRecvByteBufAllocator 类实例的时候会先触发 AdaptiveRecvByteBufAllocator 类的初始化。

咱们先来看下 AdaptiveRecvByteBufAllocator 类的初始化都做了些什么事件:

4.2 AdaptiveRecvByteBufAllocator 类的初始化

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

    // 扩容步长
    private static final int INDEX_INCREMENT = 4;
    // 缩容步长
    private static final int INDEX_DECREMENT = 1;

    //RecvBuf 调配容量表(扩缩容索引表)依照表中记录的容量大小进行扩缩容
    private static final int[] SIZE_TABLE;

   static {
        // 初始化 RecvBuf 容量调配表
        List<Integer> sizeTable = new ArrayList<Integer>();
        // 当调配容量小于 512 时,扩容单位为 16 递增
        for (int i = 16; i < 512; i += 16) {sizeTable.add(i);
        }

        // 当调配容量大于 512 时,扩容单位为一倍
        for (int i = 512; i > 0; i <<= 1) {sizeTable.add(i);
        }

        // 初始化 RecbBuf 扩缩容索引表
        SIZE_TABLE = new int[sizeTable.size()];
        for (int i = 0; i < SIZE_TABLE.length; i ++) {SIZE_TABLE[i] = sizeTable.get(i);
        }
    }
}

AdaptiveRecvByteBufAllocator 次要的作用就是为接收数据的 ByteBuffer 进行扩容缩容,那么每次怎么扩容?扩容多少?怎么缩容?缩容多少呢??

这四个问题将是本大节笔者要为大家解答的内容~~~

Netty 中定义了一个 int 型 的数组 SIZE_TABLE 来存储每个扩容单位对应的容量大小。建设起扩缩容的容量索引表。每次扩容多少,缩容多少全副记录在这个容量索引表中。

在 AdaptiveRecvByteBufAllocatorl 类初始化的时候会在 static{} 动态代码块中对扩缩容索引表 SIZE_TABLE 进行初始化。

从源码中咱们能够看出 SIZE_TABLE 的初始化分为两个局部:

  • 当索引容量小于 512 时,SIZE_TABLE 中定义的容量索引是从 16 开始16递增。
  • 当索引容量大于 512 时,SIZE_TABLE 中定义的容量索引是按前一个索引容量的 2 倍递增。

4.3 扩缩容逻辑

当初扩缩容索引表 SIZE_TABLE 曾经初始化结束了,那么当咱们须要对 ByteBuffer 进行扩容或者缩容的时候如何依据 SIZE_TABLE 决定扩容多少或者缩容多少呢??

这就用到了在 AdaptiveRecvByteBufAllocator 类中定义的扩容步长 INDEX_INCREMENT = 4,缩容步长INDEX_DECREMENT = 1 了。

咱们就以下面两副扩缩容容量索引表 SIZE_TABLE 中的容量索引展现截图为例,来介绍下扩缩容逻辑,假如咱们以后 ByteBuffer 的容量索引为33,对应的容量为2048

4.3.1 扩容

当对容量为 2048 的 ByteBuffer 进行扩容时,依据以后的容量索引 index = 33 加上 扩容步长INDEX_INCREMENT = 4 计算出扩容后的容量索引为 37,那么扩缩容索引表SIZE_TABLE 下标 37 对应的容量就是本次 ByteBuffer 扩容后的容量SIZE_TABLE[37] = 32768

4.3.1 缩容

同理对容量为 2048 的 ByteBuffer 进行缩容时,咱们就须要用以后容量索引 index = 33 减去 缩容步长INDEX_DECREMENT = 1 计算出缩容后的容量索引 32,那么扩缩容索引表SIZE_TABLE 下标 32 对应的容量就是本次 ByteBuffer 缩容后的容量SIZE_TABLE[32] = 1024

4.4 扩缩容机会

public abstract class AbstractNioByteChannel extends AbstractNioChannel {
        @Override
        public final void read() {
            ......... 省略......
            try {
                do {......... 省略......} while (allocHandle.continueReading());

                // 依据本次 read loop 总共读取的字节数,决定下次是否扩容或者缩容
                allocHandle.readComplete();

                ......... 省略.........

            } catch (Throwable t) {............... 省略...............} finally {............... 省略...............}
        }
}

在每轮 read loop 完结之后,咱们都会调用 allocHandle.readComplete() 来依据在 allocHandle 中统计的在本轮 read loop 中读取字节总大小,来决定在下一轮 read loop 中是否对 DirectByteBuffer 进行扩容或者缩容。

public abstract class MaxMessageHandle implements ExtendedHandle {

       @Override
       public void readComplete() {
                // 是否对 recvbuf 进行扩容缩容
                record(totalBytesRead());
       }

       private void record(int actualReadBytes) {if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {if (decreaseNow) {index = max(index - INDEX_DECREMENT, minIndex);
                    nextReceiveBufferSize = SIZE_TABLE[index];
                    decreaseNow = false;
                } else {decreaseNow = true;}
            } else if (actualReadBytes >= nextReceiveBufferSize) {index = min(index + INDEX_INCREMENT, maxIndex);
                nextReceiveBufferSize = SIZE_TABLE[index];
                decreaseNow = false;
            }
        }        
}

咱们以以后 ByteBuffer 容量为 2048,容量索引index = 33 为例,对 allocHandle 的扩容缩容规定进行阐明。

扩容步长INDEX_INCREMENT = 4,缩容步长INDEX_DECREMENT = 1

4.4.1 缩容

  • 如果本次 OP_READ 事件 理论读取到的总字节数 actualReadBytes 在 SIZE_TABLE[index – INDEX_DECREMENT]与 SIZE_TABLE[index]之间的话,也就是如果本轮 read loop 完结之后总共读取的字节数在 [1024,2048] 之间。阐明此时调配的 ByteBuffer 容量正好,不须要进行缩容也不须要进行扩容。
    比方本次 actualReadBytes = 2000,正好处在10242048之间。阐明 2048 的容量正好。
  • 如果 actualReadBytes 小于等于 SIZE_TABLE[index – INDEX_DECREMENT],也就是如果本轮 read loop 完结之后总共读取的字节数小于等于1024。示意本次读取到的字节数比以后 ByteBuffer 容量的下一级容量还要小,阐明以后 ByteBuffer 的容量调配的有些大了,设置缩容标识decreaseNow = true。当下次OP_READ 事件 持续满足缩容条件的时候,开始真正的进行缩容。缩容后的容量为 SIZE_TABLE[index – INDEX_DECREMENT],但不能小于 SIZE_TABLE[minIndex]。

留神须要满足两次缩容条件才会进行缩容,且缩容步长为 1,缩容比拟审慎

4.4.2 扩容

如果本次 OP_READ 事件 解决总共读取的字节数 actualReadBytes 大于等于 以后 ByteBuffer 容量(nextReceiveBufferSize) 时,阐明 ByteBuffer 调配的容量有点小了,须要进行扩容。扩容后的容量为 SIZE_TABLE[index + INDEX_INCREMENT],但不能超过 SIZE_TABLE[maxIndex]。

满足一次扩容条件就进行扩容,并且扩容步长为 4,扩容比拟奔放

4.5 AdaptiveRecvByteBufAllocator 类的实例化

AdaptiveRecvByteBufAllocator 类的实例化次要是确定 ByteBuffer 的初始容量,以及最小容量和最大容量在扩缩容索引表 SIZE_TABLE 中的下标:minIndex maxIndex

AdaptiveRecvByteBufAllocator 定义了三个对于 ByteBuffer 容量的字段:

  • DEFAULT_MINIMUM:示意 ByteBuffer 最小的容量,默认为64,也就是无论 ByteBuffer 在怎么缩容,容量也不会低于64
  • DEFAULT_INITIAL :示意 ByteBuffer 的初始化容量。默认为2048
  • DEFAULT_MAXIMUM:示意 ByteBuffer 的最大容量,默认为65536,也就是无论 ByteBuffer 在怎么扩容,容量也不会超过65536
public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

    static final int DEFAULT_MINIMUM = 64;
    static final int DEFAULT_INITIAL = 2048;
    static final int DEFAULT_MAXIMUM = 65536;

    public AdaptiveRecvByteBufAllocator() {this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
    }

    public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
       
         ................. 省略异样查看逻辑.............

        // 计算 minIndex maxIndex
        // 在 SIZE_TABLE 中二分查找最小 >= minimum 的容量索引:3
        int minIndex = getSizeTableIndex(minimum);
        if (SIZE_TABLE[minIndex] < minimum) {this.minIndex = minIndex + 1;} else {this.minIndex = minIndex;}

        // 在 SIZE_TABLE 中二分查找最大 <= maximum 的容量索引:38
        int maxIndex = getSizeTableIndex(maximum);
        if (SIZE_TABLE[maxIndex] > maximum) {this.maxIndex = maxIndex - 1;} else {this.maxIndex = maxIndex;}

        this.initial = initial;
    }
}

接下来的事件就是确定最小容量 DEFAULT_MINIMUM 在 SIZE_TABLE 中的下标minIndex,以及最大容量 DEFAULT_MAXIMUM 在 SIZE_TABLE 中的下标maxIndex

从 AdaptiveRecvByteBufAllocator 类初始化的过程中,咱们能够看出 SIZE_TABLE 中存储的数据特色是一个有序的汇合。

咱们能够通过 二分查找 在 SIZE_TABLE 中找出 第一个 容量大于等于 DEFAULT_MINIMUM 的容量索引minIndex

同理通过 二分查找 在 SIZE_TABLE 中找出 最初一个 容量小于等于 DEFAULT_MAXIMUM 的容量索引maxIndex

依据上一大节对于 SIZE_TABLE 中容量数据分布的截图,咱们能够看出minIndex = 3maxIndex = 38

4.5.1 二分查找容量索引下标

    private static int getSizeTableIndex(final int size) {for (int low = 0, high = SIZE_TABLE.length - 1;;) {if (high < low) {return low;}
            if (high == low) {return high;}

            int mid = low + high >>> 1;// 无符号右移,高位始终补 0
            int a = SIZE_TABLE[mid];
            int b = SIZE_TABLE[mid + 1];
            if (size > b) {low = mid + 1;} else if (size < a) {high = mid - 1;} else if (size == a) {return mid;} else {return mid + 1;}
        }
    }

常常刷 LeetCode 的小伙伴必定一眼就看出这个是 二分查找的模板 了。

它的目标就是依据给定容量,在扩缩容索引表 SIZE_TABLE 中,通过 二分查找 找到 最贴近 给定 size 的容量的索引下标(第一个大于等于 size 的容量)

4.6 RecvByteBufAllocator.Handle

前边咱们提到最终动静调整 ByteBuffer 容量的是由 AdaptiveRecvByteBufAllocator 中的 Handler 负责的,咱们来看下这个 allocHandle 的创立过程。

protected abstract class AbstractUnsafe implements Unsafe {

        private RecvByteBufAllocator.Handle recvHandle;

        @Override
        public RecvByteBufAllocator.Handle recvBufAllocHandle() {if (recvHandle == null) {recvHandle = config().getRecvByteBufAllocator().newHandle();
            }
            return recvHandle;
        }

}

从 allocHandle 的获取过程咱们看到最 allocHandle 的创立是由 AdaptiveRecvByteBufAllocator#newHandle 办法执行的。

public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {

    @Override
    public Handle newHandle() {return new HandleImpl(minIndex, maxIndex, initial);
    }

    private final class HandleImpl extends MaxMessageHandle {
        // 最小容量在扩缩容索引表中的 index
        private final int minIndex;
        // 最大容量在扩缩容索引表中的 index
        private final int maxIndex;
        // 以后容量在扩缩容索引表中的 index 初始 33 对应容量 2048
        private int index;
        // 预计下一次调配 buffer 的容量,初始:2048
        private int nextReceiveBufferSize;
        // 是否缩容
        private boolean decreaseNow;

        HandleImpl(int minIndex, int maxIndex, int initial) {
            this.minIndex = minIndex;
            this.maxIndex = maxIndex;

            // 在扩缩容索引表中二分查找到最小大于等于 initial 的容量
            index = getSizeTableIndex(initial);
            //2048
            nextReceiveBufferSize = SIZE_TABLE[index];
        }

        ....................... 省略...................
    }

}

这里咱们看到 Netty 中用于动静调整 ByteBuffer 容量的 allocHandle 的理论类型为MaxMessageHandle

上面咱们来介绍下 HandleImpl 中的外围字段,它们都和 ByteBuffer 的容量无关:

  • minIndex:最小容量在扩缩容索引表 SIZE_TABE 中的 index。默认是3
  • maxIndex:最大容量在扩缩容索引表 SIZE_TABE 中的 index。默认是38
  • index:以后容量在扩缩容索引表 SIZE_TABE 中的 index。初始是33
  • nextReceiveBufferSize:预计下一次调配 buffer 的容量,初始为 2048。在每次申请内存调配 ByteBuffer 的时候,采纳nextReceiveBufferSize 的值指定容量。
  • decreaseNow: 是否须要进行缩容。

5. 应用堆外内存为 ByteBuffer 分配内存

AdaptiveRecvByteBufAllocator 类只是负责动静调整 ByteBuffer 的容量,而具体为 ByteBuffer 申请内存空间的是由 PooledByteBufAllocator 负责。

5.1 类名前缀 Pooled 的来历

在咱们应用 Java 进行日常开发过程中,在为对象分配内存空间的时候咱们都会抉择在 JVM 堆中为对象分配内存,这样做对咱们 Java 开发者特地的敌对,咱们只管应用就好而不用过多关怀这块申请的内存如何回收,因为 JVM 堆齐全受 Java 虚拟机管制治理,Java 虚构机会帮忙咱们回收不再应用的内存。

然而 JVM 在进行垃圾回收时候的 stop the world 会对咱们应用程序的性能造成肯定的影响。

除此之外咱们在《聊聊 Netty 那些事儿之从内核角度看 IO 模型》一文中介绍 IO 模型的时候提到,当数据达到网卡时,网卡会通过 DMA 的形式将数据拷贝到内核空间中,这是 第一次拷贝 。当用户线程在用户空间发动零碎 IO 调用时,CPU 会将内核空间的数据再次拷贝到用户空间。这是 第二次拷贝

于此不同的是当咱们在 JVM 中发动 IO 调用时,比方咱们应用 JVM 堆内存读取 Socket 接收缓冲区 中的数据时,会多一次内存拷贝 ,CPU 在 第二次拷贝 中将数据从内核空间拷贝到用户空间时,此时的用户空间站在 JVM 角度是 堆外内存 ,所以还须要将堆外内存中的数据拷贝到 堆内内存 中。这就是 第三次内存拷贝

同理当咱们在 JVM 中发动 IO 调用向 Socket 发送缓冲区 写入数据时,JVM 会将 IO 数据先 拷贝 堆外内存,而后能力发动零碎 IO 调用。

那为什么操作系统不间接应用 JVM 的 堆内内存 进行 IO 操作 呢?

因为 JVM 的内存布局和操作系统调配的内存是不一样的,操作系统不可能依照 JVM 标准来读写数据,所以就须要 第三次拷贝 两头做个转换将堆外内存中的数据拷贝到 JVM 堆中。


所以基于上述内容,在应用 JVM 堆内内存时会产生以下两点性能影响:

  1. JVM 在垃圾回收堆内内存时,会产生 stop the world 导致应用程序卡顿。
  2. 在进行 IO 操作的时候,会多产生一次由堆外内存到堆内内存的拷贝。

基于以上两点应用 JVM 堆内内存 对性能造成的影响 ,于是对性能有卓越谋求的 Netty 采纳 堆外内存 也就是 DirectBuffer 来为 ByteBuffer 分配内存空间。

采纳堆外内存为 ByteBuffer 分配内存的益处就是:

  • 堆外内存间接受操作系统的治理,不会受 JVM 的治理,所以 JVM 垃圾回收对应用程序的性能影响就没有了。
  • 网络数据达到之后间接在 堆外内存 上接管,过程读取网络数据时间接在堆外内存中读取,所以就防止了 第三次内存拷贝

所以 Netty 在进行 I/O 操作时都是应用的堆外内存,能够防止数据从 JVM 堆内存到堆外内存的拷贝。然而因为堆外内存不受 JVM 的治理,所以就须要额定关注对内存的应用和开释,稍有不慎就会造成内存泄露,于是 Netty 就引入了 内存池 堆外内存 进行对立治理。

PooledByteBufAllocator 类的这个前缀 Pooled 就是 内存池 的意思,这个类会应用 Netty 的内存池为 ByteBuffer 调配 堆外内存

5.2 PooledByteBufAllocator 的创立

创立机会

在服务端 NioServerSocketChannel 的配置类 NioServerSocketChannelConfig 以及客户端 NioSocketChannel 的配置类 NioSocketChannelConfig实例化的时候会触发PooledByteBufAllocator 的创立。

public class DefaultChannelConfig implements ChannelConfig {
    //PooledByteBufAllocator
    private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;

    .......... 省略......
}

创立进去的 PooledByteBufAllocator 实例保留在 DefaultChannelConfig 类 中的 ByteBufAllocator allocator 字段中。

创立过程

public interface ByteBufAllocator {

    ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;
    
    .................. 省略............
}
public final class ByteBufUtil {

    static final ByteBufAllocator DEFAULT_ALLOCATOR;

    static {
        String allocType = SystemPropertyUtil.get("io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
        allocType = allocType.toLowerCase(Locale.US).trim();

        ByteBufAllocator alloc;
        if ("unpooled".equals(allocType)) {
            alloc = UnpooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else if ("pooled".equals(allocType)) {
            alloc = PooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else {
            alloc = PooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
        }

        DEFAULT_ALLOCATOR = alloc;
        
        ................... 省略..................
    }
}

从 ByteBufUtil 类的初始化过程咱们能够看出,在为 ByteBuffer 分配内存的时候是否应用内存池在 Netty 中是能够配置的。

  • 通过零碎变量-D io.netty.allocator.type 能够配置是否应用内存池为 ByteBuffer 分配内存。默认状况下是须要应用内存池的。然而在安卓零碎中默认是不应用内存池的。
  • 通过 PooledByteBufAllocator.DEFAULT 获取 内存池 ByteBuffer 分配器
   public static final PooledByteBufAllocator DEFAULT =
            new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());

因为本文的主线是介绍 Sub Reactor 解决 OP_READ 事件 的残缺过程,所以这里只介绍主线相干的内容,这里只是简略介绍下在接收数据的时候为什么会用 PooledByteBufAllocator 来为 ByteBuffer 分配内存。而内存池的架构设计比较复杂,所以笔者前面会独自写一篇对于 Netty 内存治理的文章。


总结

本文介绍了 Sub Reactor 线程在解决 OP_READ 事件的整个过程。并深刻分析了 AdaptiveRecvByteBufAllocator 类动静调整 ByteBuffer 容量的原理。

同时也介绍了 Netty 为什么会应用堆外内存来为 ByteBuffer 分配内存,并由此引出了 Netty 的内存池分配器 PooledByteBufAllocator。

在介绍 AdaptiveRecvByteBufAllocator 类和 PooledByteBufAllocator 一起组合实现动静地为 ByteBuffer 调配容量的时候,笔者不禁想起了多年前看过的《Effective Java》中第 16 条 复合优先于继承

Netty 在这里也遵循了这条军规,首先两个类设计的都是繁多的性能。

  • AdaptiveRecvByteBufAllocator 类只负责动静的调整 ByteBuffer 容量,并不论具体的内存调配。
  • PooledByteBufAllocator 类负责具体的内存调配,用内存池的形式。

这样设计的就比拟灵便,具体内存调配的工作交给具体的ByteBufAllocator, 能够应用内存池的调配形式PooledByteBufAllocator,也能够不应用内存池的调配形式UnpooledByteBufAllocator。具体的内存能够采纳 JVM 堆内内存(HeapBuffer),也能够应用堆外内存(DirectBuffer)。

AdaptiveRecvByteBufAllocator 只须要关注调整它们的容量工作就能够了,而并不需要关注它们具体的内存调配形式。

最初通过 io.netty.channel.RecvByteBufAllocator.Handle#allocate 办法灵便组合不同的内存调配形式。这也是 装璜模式 的一种利用。

byteBuf = allocHandle.allocate(allocator);

好了,明天的内容就到这里,咱们下篇文章见~~~~

正文完
 0