关于netty:Netty源码解析-零拷贝机制与ByteBuf

50次阅读

共计 5000 个字符,预计需要花费 13 分钟才能阅读完成。

本文来分享 Netty 中的零拷贝机制以及内存缓冲区 ByteBuf 的实现。
源码剖析基于 Netty 4.1.52

Netty 中的零拷贝

Netty 中零拷贝机制次要有以下几种
1. 文件传输类 DefaultFileRegion#transferTo,调用 FileChannel#transferTo,间接将文件缓冲区的数据发送到指标 Channel,缩小用户缓冲区的拷贝(通过 linux 的 sendfile 函数)。
应用 read 和 write 过程如下

应用 sendfile

能够看到,应用 sendfile 函数能够缩小数据拷贝以及用户态,内核态的切换

可参考: 操作系统和 Web 服务器那点事儿

2.Netty 中提供了一些操作内存缓冲区的办法,如
Unpooled#wrappedBuffer 办法,将 byte 数据,(jvm)ByteBuffer 转换为 ByteBuf
CompositeByteBuf#addComponents 办法,合并 ByteBuf
ByteBuf#slice 办法,提取 ByteBuf 中局部数据片段
ByteBuf#duplicate,复制一个内存缓冲区
这些办法都是基于对象援用的操作,并没有内存拷贝,而是内存共享

3. 应用堆外内存 (jvm)ByteBuffer 对 Socket 读写
如果应用 JVM 的堆内存读取 Socket 数据,JVM 会将 Socket 数据读取到间接内存,再拷贝一份到堆内存中,写入数据到 Socket 也须要将堆内存拷贝一份到间接内存中,而后才写入 Socket 中。
因为操作系统进行 io 操作须要一个稳固的间断空间的字节空间, 然而 java 堆上的字节空间会随着 gc 进行而进行挪动, 如果操作系统读取堆上的空间, 就会出错。
应用堆外内存能够防止该拷贝操作。
留神,这里从内核缓冲区拷贝到用户缓冲区的操作并不能省略,毕竟咱们须要对数据进行操作,所以还是要拷贝到用户态的。
可参考:
知乎 –Java NIO 中,对于 DirectBuffer,HeapBuffer 的疑难
知乎 –Java NIO direct buffer 的劣势在哪儿?

ByteBuf

ByteBuf 是用于与 Channel 交互的内存缓冲区,提供程序拜访和随机拜访。
Netty4 中将 ByteBuf 调整为抽象类,从而晋升吞吐量。

1.ByteBuffer
先理解一下 ByteBuffer,ByteBuffer 是 JVM 提供的字节内存缓冲区。ByteBuf 是在 ByteBuffer 上进行的扩大,底层还是应用 ByteBuffer。

ByteBuffer 有两个子类,DirectByteBuffer 和 HeapByteBuffer。
HeapByteBuffer 应用 ByteBuffer#hb(byte[])存储数据。
DirectByteBuffer 是堆外内存,应用的是操作系统的间接内存,它保护了一个援用 address 指向了底层数据,从而操作数据。(并没有应用 ByteBuffer#buff)

Buffer 外围属性

int position; // 以后操作地位。int mark;     // 为某一读过的地位做标记,便于某些时候回退到该地位。int capacity; // 初始化时候的容量。int limit;    // 读写的限度地位,读写超出该地位会报错

读写操作都是基于 position,并以 limit 为限度的。mark,position,limit,capacity 关系如下

0 <= mark <= position <= limit <= capacity

ByteBuffer 提供了如下办法调整这些标记地位:

  • clear

limit = position = 0
个别在把数据写入 Buffer 前调用

  • flip

limit = position
position = 0
个别在从 Buffer 读出数据前调用

  • rewind

position=0
limit 不变
个别在把数据重写入 Buffer 前调用。

  • compacting

革除曾经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的前面

ByteBuffer 还提供了一些操作缓冲区的办法

  • duplicate

创立新字节缓冲区,共享以后缓冲区内容

  • slice

创立新字节缓冲区,共享以后缓冲区内容子序列。

Netty 的 ByteBuf 应用 readerIndex 标记读地位,writerIndex 标记写地位,比(jvm)ByteBuffer 设计更优雅。

  +-------------------+------------------+------------------+
  | discardable bytes |  readable bytes  |  writable bytes  |
  |                   |     (CONTENT)    |                  |
  +-------------------+------------------+------------------+
  |                   |                  |                  |
  0      <=      readerIndex   <=   writerIndex    <=    capacity

ByteBuf 提供 readerIndex/writerIndex 等办法获取或设置这两个值,十分直观。另外,ByteBuf 提供了如下办法操作缓冲区

  • discardReadBytes

革除曾经读过的数据。未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的前面

  • duplicate

创立新字节缓冲区,共享以后缓冲区内容

  • slice(int index, int length)

创立共享内存的 ByteBuf,从 index 开始,长度为 length

  • readSlice(int length)

创立共享内存的 ByteBuf,从 readerIndex 开始,长度为 length

  • retainedDuplicate()

创立共享内存的 ByteBuf,并且以后 ByteBuf 的援用计数加 1

2. 接口关系

AbstractByteBuf:实现一些公共逻辑,如读写前查看地位。
AbstractReferenceCountedByteBuf,增加援用计数逻辑,实现援用计数回收间接内存。
PooledByteBuf:实现池化 ByteBuf 的公共逻辑。对于 Netty 中的内存池前面有文章解析。
PooledByteBuf#memory 是底层的内存存储,PooledDirectByteBuf 该字段是 ByteBuffer,PooledHeapByteBuf 则是 byte[]。

上面能够分为 Unsafe,No_Unsafe 两个维度。Unsafe 就是 sun.misc.Unsafe。
应用 Unsafe 能够进步性能,但 Unsafe 是 JDK 外部的类,并非公开规范,不肯定所有 JDK 都存在这个类,JDK 当前也有可能去掉这个类,所以 Netty 提供了两套实现。

3. 内存调配
前面有文章解析 Netty 内存池,分享 Netty 中如何分配内存给 ByteBuf。这里先不深刻。

4. 读写过程
上面看一下 ByteBuf 与 Channel 如何交互数据。
后面分享 Netty 读写过程的文章说过了,NioByteUnsafe#read 办法读取数据。

NioByteUnsafe#read -> NioSocketChannel#doReadBytes -> AbstractByteBuf#writeBytes -> PooledByteBuf#setBytes

public final int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
    try {return in.read(internalNioBuffer(index, length));
    } catch (ClosedChannelException ignored) {return -1;}
}

index 参数就是 writerIndex,internalNioBuffer 办法会结构一个新的 ByteBuffer,并设置 ByteBuffer#position 为 index
间接调用 ReadableByteChannel#read 读取数据

在《ChannelOutboundBuffer 与 flush 操作》中曾经分享过,
ChannelOutboundBuffer#nioBuffers 也是通过 internalNioBuffer 办法生成 ByteBuffer,
作为参数调用 NioSocketChannel#doWrite 办法,间接将数据拷贝到 Channel。
ByteBuf#internalNioBuffer -> PooledByteBuf#_internalNioBuffer

  final ByteBuffer _internalNioBuffer(int index, int length, boolean duplicate) {index = idx(index);
      ByteBuffer buffer = duplicate ? newInternalNioBuffer(memory) : internalNioBuffer();
      buffer.limit(index + length).position(index);
      return buffer;
  }

newInternalNioBuffer 由子类实现,构建对应的 DirectByteBuffer 或者 HeapByteBuffer,留神,这里的内存是共享的。

5. 援用计数
因为应用了间接内存,不能依赖 JVM 垃圾回收器开释内存,Netty 应用援用计数算法开释内存。

ReferenceCounted 接口,代表须要显式开释的援用计数对象,retain 办法减少援用计数,release 办法缩小援用计数。

AbstractReferenceCountedByteBuf 实现了 ReferenceCounted 接口,它保护了 refCnt 变量作为援用计数。
结构一个 AbstractReferenceCountedByteBuf 时,refCnt 为 1。
当援用计数 release 到 0 时,调用 deallocate()办法开释内存。

PooledByteBuf#deallocate

protected final void deallocate() {if (handle >= 0) {
        final long handle = this.handle;
        this.handle = -1;
        memory = null;
        tmpNioBuf = null;
        chunk.arena.free(chunk, handle, maxLength, cache);
        chunk = null;
        recycle();}
}

这里调用的是 PoolArena#free。
PoolArena 能够了解为一个内存池,这里 free 理论是将内寄存回内存池中,由内存池决定是否须要销毁底层间接内存。
PoolArena 前面有对应文章解析。

6. 内存销毁
销毁 DirectByteBuf,有两个形式
利用反射获取 Unsafe,调用 Unsafe#freeMemory
利用反射获取 DirectByteBuffer#cleaner(sun.misc.Cleaner),通过反射调用 cleaner#clean 办法
因为 Netty 不确认 JDK 中是否存在 sun.misc.Cleaner,所以它也实现了两套机制。

PoolArenaDirect#free -> Arena#destroyChunk

protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {if (PlatformDependent.useDirectBufferNoCleaner()) {PlatformDependent.freeDirectNoCleaner(chunk.memory);
    } else {PlatformDependent.freeDirectBuffer(chunk.memory);
    }
}

从 PlatformDependent 中确认是否应用 CLEANER

if (maxDirectMemory == 0 || !hasUnsafe() || !PlatformDependent0.hasDirectBufferNoCleanerConstructor()) {
    USE_DIRECT_BUFFER_NO_CLEANER = false;
    DIRECT_MEMORY_COUNTER = null;
}

满足以下条件中一个就应用 CLEANER,否则应用 NO_CLEANER

  1. 没有应用间接内存
  2. JVM 不反对 Unsafe
  3. ByteBuffer 不存在无 Cleaner 的构造函数

正文完
 0