Netty-ByteBuf

57次阅读

共计 10823 个字符,预计需要花费 28 分钟才能阅读完成。

Java NIO 提供了 ByteBuffer 作为它的字节容器, 但是这个类使用起来过于复杂, 而且也有些繁琐.

Netty 的 ByteBuffer 的代替品是 ByteBuf.

ByteBuf 的 API

Netty 的数据处理 API 通过两个组件暴露

public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf>
public interface ByteBufHolder extends ReferenceCounted

下面是 ByteBuf API 的优点:

  • 它可以被用户自定义的缓冲区类扩展;
  • 通过内置的复合缓冲区类型实现了透明的零拷贝;
  • 容量可以按需增长;
  • 在读和写这两种模式之间雀环不需要调用 ByteBuffer 的 flip() 方法;
  • 读和写试用了不同的索引;
  • 支持方法的链式调用;
  • 支持引用计数;
  • 支持池化.

它是如何工作的

ByteBuf 维护了两个不同的索引: 一个用于读取, 一个用于写入. 当你从 ByteBuf 读取时, 它的 readerIndex 将会被递增到读取的字节数. 同样的, 当你写入 ByteBuf 时, 它的 writerIndex 也会被递增.

要了解这些索引两两之间的关系, 请考虑一下, 如果打算读取字节直到 readerIndex 达到和 writerIndex 同样的值时会发生什么.

在那时, 你将会到达 “ 可以读取的 ” 数据的末尾. 就如同试图读取超出数组末尾的数据一样, 会触发一个 IndexOutOfBoundsException.

名称以 read 或 write 开头的 ByteBuf 方法, 将会推进其对应的索引, 而名称以 set 或 get 开头的操作则不会. 后面的这些方法将在作为一个参数传入的一个相对索引上执行操作.

可以指定 ByteBuf 的最大容量. 默认的限制是 Interger.MAX_VALUE.

ByteBuf 的使用模式

堆缓冲区

最常用的 ByteBuf 模式是将数据存储在 JVM 的堆空间中. 这种模式被称为 支撑数组 , 它能在没有使用池化的情况下提供快速的分配和释放. 代码如下:

public static void main(String args[]) {ByteBuf heapBuf = Unpooled.copiedBuffer("heap space", CharsetUtil.UTF_8);
    if (heapBuf.hasArray()) { // 检查 ByteBuf 是否有一个支持数组
        byte[] array = heapBuf.array();
        int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();// 计算第一个字节的偏移量
        int length = heapBuf.readableBytes();// 计算可读字节数
        System.out.println(Arrays.toString(array));
        System.out.println(offset);
        System.out.println(length);
    } else {System.out.println("No Heap Array");
    }
}

hasArray() 返回 false 时尝试访问支持数组会抛出 UnsupportedOperationException. 这个用法与 JDK 的 ByteBuffer 类似.

直接缓冲区

直接缓冲区是另外一种 ByteBuf 模式.

直接缓冲区的内容并不是驻留在 Java 的堆上, 利用 java.nio.ByteBuffer 是借助于 JVM 调用操作系统的底层通信函数, 直接操作直接缓冲区可以减少中间缓冲区的复制操作, 进而提供程序性能, 其目的是:

  • 通过免去中间交换的内存拷贝(从程序内存拷贝到系统内存), 提升 IO 处理速度; 直接缓冲区的内容可以驻留在垃圾回收扫描的堆区以外.
  • DirectBuffer 在 -XX:MaxDirectMemorySize=xxM 大小限制下, 使用 Heap 之外的内存, GC 对此”无能为力”, 也就意味着规避了在高负载下频繁的 GC 过程对应用线程的中断影响.

但是直接缓冲区的缺点是在内存空间的分配和释放上比堆缓冲区更复杂, 另外一个缺点是如果要将数据传递给遗留代码处理, 因为数据不是在堆上, 你可能不得不作出一个副本, 如下:

public static void main(String args[]) {ByteBuf directBuf = Unpooled.directBuffer(100);
    directBuf.writeBytes("direct buffer".getBytes());
    if (!directBuf.hasArray()) { // 检查 ByteBuf 是否由数组支撑. 如果不是, 则这是一个直接缓冲区
        int length = directBuf.readableBytes(); // 获取可读字节数
        byte[] array = new byte[length]; // 分配一个新的数组来保存具有该长度的字节数据
        directBuf.getBytes(directBuf.readerIndex(), array); // 将字节复制到该数组
        System.out.println(Arrays.toString(array));
        System.out.println(length);
    }
}

复合缓冲区

复合缓冲区为多个 ByteBuf 提供一个聚合视图, 可以根据需要添加或者删除 ByteBuf 实例.

Netty 提供了 ByteBuf 的子类 CompositeByteBuf 类来处理复合缓冲区, CompositeByteBuf 只是一个视图.

CompositeByteBuf 中的 ByteBuf 实例可能同时包含直接内存分配和非直接内存分配.

如果其中只有一个实例, 那么对 CompositeByteBuf 上的 hasArray() 方法的调用将返回该组件上的 hasArray()方法的值; 否则它将返回 false. 因为有可能包含直接缓冲区或堆缓冲区.

例如, 一条消息由 headerbody 两部分组成, 将 headerbody 组装成一条消息发送出去, 可能 body 相同, 只是 header 不同, 使用 CompositeByteBuf 就不用每次都重新分配一个新的缓冲区.

下图显示CompositeByteBuf 组成 headerbody:

下面代码显示了使用 JDK 的 ByteBuffer 的一个实现. 两个 ByteBuffer 的数组创建保存消息的组件, 第三个创建用于保存所有数据的副本.

// 使用数组保存消息的各个部分
ByteBuffer[] message = { header, body};

// 使用副本来合并这两个部分
ByteBuffer message2 = ByteBuffer.allocate(header.remaining() + body.remaining());
message2.put(header);
message2.put(body);
message2.flip();

这种做法显然是低效的; 分配和复制操作不是最优的方法, 操纵数组使代码显得很笨拙.

下面看使用 CompositeByteBuf 的改进版本.

/**
 * Netty 通过一个 ByteBuf 子类——CompositeByteBuf——实现了组合模式,它提供了一
 个将多个缓冲区表示为单个合并缓冲区的虚拟表示
 */
public class CompositeBuf {public static void main(String args[]){CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
        ByteBuf headerBuf = Unpooled.copiedBuffer("head", CharsetUtil.UTF_8); // can be backing or direct
        ByteBuf bodyBuf = Unpooled.copiedBuffer("body", CharsetUtil.UTF_8); // can be backing or direct
        messageBuf.addComponents(headerBuf, bodyBuf);
        System.out.println("Remove Head Before------------------");
        printCompositeBuffer(messageBuf);
        for (ByteBuf buf : messageBuf) {System.out.println(buf.toString(CharsetUtil.UTF_8));
        }
        messageBuf.removeComponent(0); // remove the header
        System.out.println("Remove Head After------------------");
        printCompositeBuffer(messageBuf);
        for (ByteBuf buf : messageBuf) {System.out.println(buf.toString(CharsetUtil.UTF_8));
        }
    }

    public static void printCompositeBuffer(CompositeByteBuf compBuf){int length = compBuf.readableBytes();
        byte[] array = new byte[length];
        compBuf.getBytes(compBuf.readerIndex(), array);
        System.out.println (Arrays.toString(array));
        System.out.println (length);
    }
}

Netty 尝试使用 CompositeByteBuf 优化 socket I/O 操作, 消除原生 JDK 中可能存在的的性能低和内存消耗问题.

虽然这是在 Netty 的核心代码中进行的优化, 并且是不对外暴露的, 但是作为开发者还是应该意识到其影响.

ByteBuf 分配

从空间初始化方式上来分, ByteBuf 分为: 缓存方式分配和非缓存方式分配.

5.1 使用 ByteBufAllocator 接口分配

为了降低分配和释放内存的开销, Netty 通过 interface ByteBufAllocator 实现了 ByteBuf 的池化, 它可以用来分配我们所描述过的任意类型的 ByteBuf 实例 (基于堆缓冲区的, 基于直接缓冲区的, 基于复合缓冲区的).

// 返回一个基于堆或者直接内存存储的 ByteBuf
ByteBuf buffer();
ByteBuf buffer(int initialCapacity);
ByteBuf buffer(int initialCapacity, int maxCapacity);

// 返回一个基于堆内存的 ByteBuf
ByteBuf heapBuffer();
ByteBuf heapBuffer(int initialCapacity);
ByteBuf heapBuffer(int initialCapacity, int maxCapacity);

// 返回一个基于直接内存的 ByteBuf
ByteBuf directBuffer();
ByteBuf directBuffer(int initialCapacity);
ByteBuf directBuffer(int initialCapacity, int maxCapacity);

// 通过设置一个最大值, 基于堆内存或直接内存, 返回一个混合缓冲区.
CompositeByteBuf compositeBuffer(int maxNumComponents);
CompositeByteBuf compositeHeapBuffer();
CompositeByteBuf compositeHeapBuffer(int maxNumComponents);
CompositeByteBuf compositeDirectBuffer();
CompositeByteBuf compositeDirectBuffer(int maxNumComponents);

// 返回一个用于套接字 I/O 操作的 ByteBuf
ioBuffer()

ioBuffer() 重点
默认的, 当所运行的环境具有 sun.misc.Unsafe 支持时, 返回基于直接内存的 ByteBuf, 否则返回一个基于堆内存的 ByteBuf; 当指定使用 PreferHeapByteBufAllocator 时, 则只会返回基于堆内存的 ByteBuf.

可以通过 Channle(每个都可以有一个不同的 ByteBufAllocator 实例) 或者 ChannleHandlerContext 获取到 ByteBufAllocator.

Netty 提供了两种 ByteBufAllocator 的实现:

Channel channel = ...;
ByteBufAllocator byteBufAllocator = channel.alloc();

ChannelHandlerContext channelHandlerContext = ...;
ByteBufAllocator bufAllocator = channelHandlerContext.alloc();

前者池化了 ByteBuf 的实例以提高性能并最大限度地减少内存碎片(PooledByteBufAllocator 默认).
后者不池化 ByteBuf 实例, 并且在每次它被调用时都会返回一个新的实例(UnpooledByteBufAllocator).

使用 Unpooled 缓冲区分配

可能某些情况下, 你未能获取一个到 ByteBufAllocator 的引用. 对于这种情况, Netty 提供了一个简单的称为 Unpooled(封装了 UnpooledByteBufAllocator) 的工具类, 它提供了静态的辅助方法来创建未池化的 ByteBuf 实例. 提供的方法如下:

  • buffer: 返回一个未池化的基于堆内存存储的 ByteBuf.
  • directBuffer: 返回一个未池化的基于直接内存存储的 ByteBuf
  • wrappedBuffer: 返回一个包装了给定数据的 ByteBuf
  • copiedBuffer: 返回一个复制了给定数据的 ByteBuf
ByteBuf directBuf = Unpooled.directBuffer(100);
directBuf.writeBytes("direct buffer".getBytes());
System.out.println(directBuf.indexOf(0,directBuf.readableBytes(), (byte) 'u'));

Unpooled 类还使得 ByteBuf 同样可用于那些并不需要 Netty 的其他组件的非网络项目, 使得其能得益于高性能的可扩展的缓冲区 API.

随机访问索引

如同在普通的 Java 字节数组中一样, ByteBuf 的索引是从零开始的: 第一个字节的索引是 0, 最后一个字节的索引总是 capacity() - 1, 可以通过 index 进行访问:

public static void main(String args[]) {ByteBuf directBuf = Unpooled.directBuffer(100);
    directBuf.writeBytes("direct buffer".getBytes());
    System.out.println(directBuf.getByte(5));
}

这种方式并不会改变 readerIndexweiterIndex. 如果有需要, 也可以通过调用 readerIndex(index) 或者 writerIndex(index) 来手动移动这两者.

顺序访问索引

虽然 ByteBuf 同时具有读索引和写索引, 但是 JDK 的 ByteBuffer 却只有一个索引, 这也就是为什么必须调用 flip() 方法来在读模式和写模式之间进行切换的原因. ByteBuf 被两个索引划分成 3 个区域.

可丢弃的字节

可丢弃字节的分段包含了已经被读过的字节. 通过调用 discardReadBytes() 方法, 可以丢弃它们并回收空间. 这个分段的初始大小为 0, 存储在 readerIndex 中, 会随着 read 操作的执行而增加.

下图展示的缓冲区上调用 discardReadBytes() 方法后的结果. 可以看到, 可丢弃字节分段中的空间已经变为可写的了.

注意, 在调用 discardReadBytes() 之后, 对可写分段的内容并没有任何的保证.

频繁地调用 discardReadBytes() 方法可以确保可写分段的最大化, 但是这将极有可能会导致内存复制, 因为可读字节 (图中标记为 CONTENT 的部分) 必须被移动到缓冲区的开始位置. 我们建议只在有真正需要的时候才这样做, 例如, 当内存非常宝贵的时候.

可读字节

ByteBuf 的可读字节分段存储了实际数据. 新分配的、包装的或者复制的缓冲区默认 readerIndex 值为 0. 任何名称以 read 或者 skip 开头的操作都将检索或者跳过位于当前 readerIndex 的数据, 并且将它增加已读字节数.

如果尝试在缓冲区的可读字节数已经耗尽时从中读取数据, 那么将会引发一个 IndexOutOfBoundsException.

可写字节

可写字节分段是指一个拥有未定义内容的、写入就绪的内存区域. 新分配的缓冲区的 writerIndex 的默认值为 0. 任何名称以 write 开头的操作都将从当前的 writerIndex 处开始写数据, 并将它增加已经写入的字节数.

如果写操作的目标也是 ByteBuf, 并且没有指定源索引的值, 则源缓冲区的 readerIndex 也同样会被增加相同的大小.

如果尝试往目标写入超过目标容量的数据, 将会引发一个 IndexOutOfBoundException.

例子

        ByteBuf directBuf = Unpooled.directBuffer(100);
        directBuf.writeBytes("direct buffer".getBytes());
        System.out.println("可写字节容量:"+directBuf.writableBytes());
        System.out.println("初始化可读字节:"+directBuf.readableBytes());
        System.out.println("初始化可丢弃字节:"+directBuf.readerIndex()+"\n");
        directBuf.readBytes(2);
        System.out.println("读取两个字节"+"\n");
        System.out.println("读取后可写字节容量:"+directBuf.writableBytes());
        System.out.println("读取后可读字节:"+directBuf.readableBytes());
        System.out.println("读取后可丢弃字节:"+directBuf.readerIndex()+"\n");
        directBuf.discardReadBytes();
        System.out.println("执行 discardReadBytes 后可写字节容量:"+directBuf.writableBytes());
        System.out.println("执行 discardReadBytes 后可读字节:"+directBuf.readableBytes());
        System.out.println("执行 discardReadBytes 后可丢弃字节:"+directBuf.readerIndex());

输出为:

可写字节容量:87
初始化可读字节:13
初始化可丢弃字节:0

读取两个字节

读取后可写字节容量:87
读取后可读字节:11
读取后可丢弃字节:2

执行 discardReadBytes 后可写字节容量:89
执行 discardReadBytes 后可读字节:11
执行 discardReadBytes 后可丢弃字节:0

索引管理

索引管理的相关操作如下:

  • 可以通过调用 markReaderIndex()markWriterIndex()resetWriterIndex()resetReaderIndex() 来标记和重置 ByteBufreaderIndexwriterIndex.
  • 可以通过调用 readerIndex(int) 或者 writerIndex(int) 来将索引移动到指定位置. 试图将任何一个索引设置到一个无效的位置都将导致一个 IndexOutOfBoundsException.
  • 可以通过调用 clear() 方法来将 readerIndexwriterIndex 都设置为 0. 调用 clear() 比调用 discardReadBytes() 轻量得多, 因为它将只是重置索引而不会复制任何的内存.

示例如下:

        ByteBuf directBuf = Unpooled.directBuffer(100);
        directBuf.writeBytes("direct buffer".getBytes());
        System.out.println("初始化可读字节:"+directBuf.readableBytes());
        directBuf.markReaderIndex();
        System.out.println("执行 markReaderIndex"+"\n");// 标记读索引
        directBuf.readBytes(2);
        System.out.println("读取两个字节"+"\n");
        System.out.println("读取后可读字节:"+directBuf.readableBytes());
        directBuf.resetReaderIndex();// 恢复读索引
        System.out.println("执行 resetReaderIndex 后可读字节:"+directBuf.readableBytes());
        directBuf.clear();
        System.out.println("执行 clear 后可读字节:"+directBuf.readableBytes());
        directBuf.readBytes(2);// 可读字节变为 0,此时再读取会抛出 IndexOutOfBoundsException

输出如下:

初始化可读字节:13
执行 markReaderIndex

读取两个字节

读取后可读字节:11
执行 resetReaderIndex 后可读字节:13
执行 clear 后可读字节:0
Exception in thread "main" java.lang.IndexOutOfBoundsException: readerIndex(0) + length(2) exceeds writerIndex(0): UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(ridx: 0, widx: 0, cap: 100)
    at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1403)
    at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1390)
    at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:843)
    at com.eric.bytebuf.OperationSample.indexManage(OperationSample.java:32)
    at com.eric.bytebuf.OperationSample.main(OperationSample.java:16)

派生缓冲区

派生缓冲区为 ByteBuf 提供了以专门的方式来呈现其内容的视图。这类视图是通过以下方法被创建的:

duplicate()
slice()
slice(int, int)
Unpooled.unmodifiableBuffer(…)
order(ByteOrder)
readSlice(int)

上述这些方法都将返回一个新的 ByteBuf 实例, 它具有自己的读索引、写索引和标记 索引.

但是其内部存储和原始对象是共享的. 该种方式创建成本很低廉, 但是这也意味着, 如果你修改了它的内容, 也同时修改了其对应的源实例, 所以要小心.

如果需要一个现有缓冲区的真实副本, 请使用 copy() 或者 copy(int, int) 方法. 不同于派生缓冲区, 由这个调用所返回的 ByteBuf 拥有独立的数据副本.

        Charset utf8 = Charset.forName("UTF-8");
        ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
        ByteBuf slice = buf.slice(0, 15);
        System.out.println(slice.toString(utf8));
        buf.setByte(0,'J');
        assert buf.getByte(0) == slice.getByte(0);

读 / 写操作

正如前文所提到过的, 有两种类别的读 / 写操作:

  • get()set() 操作, 从给定的索引开始,并且保持索引不变;
  • read()write() 操作, 从给定的索引开始, 并且会根据已经访问过的字节数对索引进行调整.

ByteBufHolder

我们经常发现, 除了实际的数据负载之外, 我们还需要存储各种属性值. HTTP 响应便是一个很好的例子, 除了表示为字节的内容, 还包括状态码、cookie 等.

为了处理这种常见的用例, Netty 提供了 ByteBufHolder. ByteBufHolder 也为 Netty 的高级特性提供了支持, 如缓冲区池化, 其中可以从池中借用 ByteBuf, 并且在需要时自动释放.

ByteBufHolder 只有几种用于访问底层数据和引用计数的方法:

  • content(): 返回由这个 ByteBufHolder 所持有的 ByteBuf.
  • copy(): 返回这个 ByteBufHolder 的一个深拷贝, 包括一个其所包含的 ByteBuf 的非共享拷贝.
  • duplicate(): 返回这个 ByteBufHolder 的一个浅拷贝, 包括一个其所包含的 ByteBuf 的共享拷贝.

系统默认自带了一系列的 ByteBufHolder, 以 MemoryFileUpload 为例, 该类通过封装将 filename, contentType, contentTransferEncoding 属性与对应的 file 进行关联.

正文完
 0