Netty源码解析1-Buffer

5次阅读

共计 6391 个字符,预计需要花费 16 分钟才能阅读完成。

原文:GitHub 原文: https://github.com/wangzhiwub… 更多文章关注:多线程 / 集合 / 分布式 /Netty/NIO/RPC

Java 高级特性增强 - 集合
Java 高级特性增强 - 多线程
Java 高级特性增强 -Synchronized
Java 高级特性增强 -volatile
Java 高级特性增强 - 并发集合框架
Java 高级特性增强 - 分布式
Java 高级特性增强 -Zookeeper
Java 高级特性增强 -JVM
Java 高级特性增强 -NIO
RPC
zookeeper
JVM
NIO
其他更多

上一篇文章我们概要介绍了 Netty 的原理及结构,下面几篇文章我们开始对 Netty 的各个模块进行比较详细的分析。Netty 的结构最底层是 buffer 机制,这部分也相对独立,我们就先从 buffer 讲起。
What:buffer 简介
buffer 中文名又叫缓冲区,按照维基百科的解释,是 ” 在数据传输时,在内存里开辟的一块临时保存数据的区域 ”。它其实是一种化同步为异步的机制,可以解决数据传输的速率不对等以及不稳定的问题。
根据这个定义,我们可以知道涉及 I /O(特别是 I / O 写)的地方,基本会有 Buffer 了。就 Java 来说,我们非常熟悉的 Old I/O–InputStream&OutputStream 系列 API,基本都是在内部使用到了 buffer。Java 课程老师就教过,必须调用 OutputStream.flush(),才能保证数据写入生效!
而 NIO 中则直接将 buffer 这个概念封装成了对象,其中最常用的大概是 ByteBuffer 了。于是使用方式变为了:将数据写入 Buffer,flip()一下,然后将数据读出来。于是,buffer 的概念更加深入人心了!
Netty 中的 buffer 也不例外。不同的是,Netty 的 buffer 专为网络通讯而生,所以它又叫 ChannelBuffer(好吧其实没有什么因果关系…)。我们下面就来讲讲 Netty 中得 buffer。当然,关于 Netty,我们必须讲讲它的所谓 ”Zero-Copy-Capable” 机制。
TCP/IP 协议与 buffer
TCP/IP 协议是目前的主流网络协议。它是一个多层协议,最下层是物理层,最上层是应用层(HTTP 协议等),而做 Java 应用开发,一般只接触 TCP 以上,即传输层和应用层的内容。这也是 Netty 的主要应用场景。
TCP 报文有个比较大的特点,就是它传输的时候,会先把应用层的数据项拆开成字节,然后按照自己的传输需要,选择合适数量的字节进行传输。什么叫 ” 自己的传输需要 ”?首先 TCP 包有最大长度限制,那么太大的数据项肯定是要拆开的。其次因为 TCP 以及下层协议会附加一些协议头信息,如果数据项太小,那么可能报文大部分都是没有价值的头信息,这样传输是很不划算的。因此有了收集一定数量的小数据,并打包传输的 Nagle 算法 (这个东东在 HTTP 协议里会很讨厌,Netty 里可以用 setOption(“tcpNoDelay”, true) 关掉它)。
这么说可能太学院派了一点,我们举个例子吧:
发送时,我们这样分 3 次写入(‘|’ 表示两个 buffer 的分隔):
+—–+—–+—–+
| ABC | DEF | GHI |
+—–+—–+—–+

接收时,可能变成了这样:
+—-+——-+—+—+
| AB | CDEFG | H | I |
+—-+——-+—+—+

很好懂吧?可是,说了这么多,跟 buffer 有个什么关系呢?别急,我们来看下面一部分。
Buffer 中的分层思想
我们先回到之前的 messageReceived 方法:
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
// Send back the received message to the remote peer.
transferredBytes.addAndGet(((ChannelBuffer) e.getMessage()).readableBytes());
e.getChannel().write(e.getMessage());
}
这里 MessageEvent.getMessage()默认的返回值是一个 ChannelBuffer。我们知道,业务中需要的 ”Message”,其实是一条应用层级别的完整消息,而一般的 buffer 工作在传输层,与 ”Message” 是不能对应上的。那么这个 ChannelBuffer 是什么呢?
来一个官方给的图,我想这个答案就很明显了:

这里可以看到,TCP 层 HTTP 报文被分成了两个 ChannelBuffer,这两个 Buffer 对我们上层的逻辑 (HTTP 处理) 是没有意义的。但是两个 ChannelBuffer 被组合起来,就成为了一个有意义的 HTTP 报文,这个报文对应的 ChannelBuffer,才是能称之为 ”Message” 的东西。这里用到了一个词 ”Virtual Buffer”,也就是所谓的 ”Zero-Copy-Capable Byte Buffer” 了。顿时觉得豁然开朗了有没有!
我这里总结一下,如果说 NIO 的 Buffer 和 Netty 的 ChannelBuffer 最大的区别的话,就是前者仅仅是传输上的 Buffer,而后者其实是传输 Buffer 和抽象后的逻辑 Buffer 的结合。延伸开来说,NIO 仅仅是一个网络传输框架,而 Netty 是一个网络应用框架,包括网络以及应用的分层结构。
当然,在 Netty 里,默认使用 ChannelBuffer 表示 ”Message”,不失为一个比较实用的方法,但是 MessageEvent.getMessage()是可以存放一个 POJO 的,这样子抽象程度又高了一些,这个我们在以后讲到 ChannelPipeline 的时候会说到。
Netty 中的 ChannelBuffer 及实现
好了,终于来到了代码实现部分。之所以啰嗦了这么多,因为我觉得,关于 ”Zero-Copy-Capable Rich Byte Buffer”,理解为什么需要它,比理解它是怎么实现的,可能要更重要一点。
我想可能很多朋友跟我一样,喜欢 ” 顺藤摸瓜 ” 式读代码 – 找到一个入口,然后顺着查看它的调用,直到理解清楚。很幸运,ChannelBuffers(注意有 s!)就是这样一根 ” 藤 ”,它是所有 ChannelBuffer 实现类的入口,它提供了很多静态的工具方法来创建不同的 Buffer,靠“顺藤摸瓜”式读代码方式,大致能把各种 ChannelBuffer 的实现类摸个遍。先列一下 ChannelBuffer 相关类图。

此外还有 WrappedChannelBuffer 系列也是继承自 AbstractChannelBuffer,图放到了后面。
ChannelBuffer 中的 readerIndex 和 writerIndex
开始以为 Netty 的 ChannelBuffer 是对 NIO ByteBuffer 的一个封装,其实不是的,它是把 ByteBuffer 重新实现了一遍。
以最常用的 HeapChannelBuffer 为例,其底层也是一个 byte[],与 ByteBuffer 不同的是,它是可以同时进行读和写的,而不需要使用 flip()进行读写切换。ChannelBuffer 读写的核心代码在 AbstactChannelBuffer 里,这里通过 readerIndex 和 writerIndex 两个整数,分别指向当前读的位置和当前写的位置,并且,readerIndex 总是小于 writerIndex 的。贴两段代码,让大家能看的更明白一点:
public void writeByte(int value) {
setByte(writerIndex ++, value);
}

public byte readByte() {
if (readerIndex == writerIndex) {
throw new IndexOutOfBoundsException(“Readable byte limit exceeded: ”
+ readerIndex);
}
return getByte(readerIndex ++);
}

public int writableBytes() {
return capacity() – writerIndex;
}

public int readableBytes() {
return writerIndex – readerIndex;
}
我倒是觉得这样的方式非常自然,比单指针与 flip()要更加好理解一些。AbstactChannelBuffer 还有两个相应的 mark 指针 markedReaderIndex 和 markedWriterIndex,跟 NIO 的原理是一样的,这里不再赘述了。
字节序 Endianness 与 HeapChannelBuffer
在创建 Buffer 时,我们注意到了这样一个方法:public static ChannelBuffer buffer(ByteOrder endianness, int capacity);,其中 ByteOrder 是什么意思呢?
这里有个很基础的概念:字节序 (ByteOrder/Endianness)。它规定了多余一个字节的数字(int 啊 long 什么的),如何在内存中表示。BIG_ENDIAN(大端序) 表示高位在前,整型数 12 会被存储为 0 0 0 12 四字节,而 LITTLE_ENDIAN 则正好相反。可能搞 C /C++ 的程序员对这个会比较熟悉,而 Javaer 则比较陌生一点,因为 Java 已经把内存给管理好了。但是在网络编程方面,根据协议的不同,不同的字节序也可能会被用到。目前大部分协议还是采用大端序,可参考 RFC1700。
了解了这些知识,我们也很容易就知道为什么会有 BigEndianHeapChannelBuffer 和 LittleEndianHeapChannelBuffer 了!
DynamicChannelBuffer
DynamicChannelBuffer 是一个很方便的 Buffer,之所以叫 Dynamic 是因为它的长度会根据内容的长度来扩充,你可以像使用 ArrayList 一样,无须关心其容量。实现自动扩容的核心在于 ensureWritableBytes 方法,算法很简单:在写入前做容量检查,容量不够时,新建一个容量 x2 的 buffer,跟 ArrayList 的扩容是相同的。贴一段代码吧(为了代码易懂,这里我删掉了一些边界检查,只保留主逻辑):
public void writeByte(int value) {
ensureWritableBytes(1);
super.writeByte(value);
}

public void ensureWritableBytes(int minWritableBytes) {
if (minWritableBytes <= writableBytes()) {
return;
}

int newCapacity = capacity();
int minNewCapacity = writerIndex() + minWritableBytes;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}

ChannelBuffer newBuffer = factory().getBuffer(order(), newCapacity);
newBuffer.writeBytes(buffer, 0, writerIndex());
buffer = newBuffer;
}
CompositeChannelBuffer
CompositeChannelBuffer 是由多个 ChannelBuffer 组合而成的,可以看做一个整体进行读写。这里有一个技巧:CompositeChannelBuffer 并不会开辟新的内存并直接复制所有 ChannelBuffer 内容,而是直接保存了所有 ChannelBuffer 的引用,并在子 ChannelBuffer 里进行读写,从而实现了 ”Zero-Copy-Capable” 了。来段简略版的代码吧:
public class CompositeChannelBuffer{

//components 保存所有内部 ChannelBuffer
private ChannelBuffer[] components;
//indices 记录在整个 CompositeChannelBuffer 中,每个 components 的起始位置
private int[] indices;
// 缓存上一次读写的 componentId
private int lastAccessedComponentId;

public byte getByte(int index) {
// 通过 indices 中记录的位置索引到对应第几个子 Buffer
int componentId = componentId(index);
return components[componentId].getByte(index – indices[componentId]);
}

public void setByte(int index, int value) {
int componentId = componentId(index);
components[componentId].setByte(index – indices[componentId], value);
}

}
查找 componentId 的算法再次不作介绍了,大家自己实现起来也不会太难。值得一提的是,基于 ChannelBuffer 连续读写的特性,使用了顺序查找(而不是二分查找),并且用 lastAccessedComponentId 来进行缓存。
ByteBufferBackedChannelBuffer
前面说 ChannelBuffer 是自己的实现的,其实只说对了一半。ByteBufferBackedChannelBuffer 就是封装了 NIO ByteBuffer 的类,用于实现堆外内存的 Buffer(使用 NIO 的 DirectByteBuffer)。当然,其实它也可以放其他的 ByteBuffer 的实现类。代码实现就不说了,也没啥可说的。
WrappedChannelBuffer

WrappedChannelBuffer 都是几个对已有 ChannelBuffer 进行包装,完成特定功能的类。代码不贴了,实现都比较简单,列一下功能吧。

可以看到,关于实现方面,Netty 3.7 的 buffer 相关内容还是比较简单的,也没有太多费脑细胞的地方。
而 Netty 4.0 之后就不同了。4.0,ChannelBuffer 改名 ByteBuf,成了单独项目 buffer,并且为了性能优化,加入了 BufferPool 之类的机制,已经变得比较复杂了(本质倒没怎么变)。性能优化是个很复杂的事情,研究源码时,建议先避开这些东西,除非你对算法情有独钟。举个例子,Netty4.0 里为了优化,将 Map 换成了 Java 8 里 6000 行的 ConcurrentHashMapV8,你们感受一下…
参考资料:

TCP/IP 协议 http://zh.wikipedia.org/zh-cn/TCP/IP%E5%8D%8F%E8%AE%AE

Data_buffer http://en.wikipedia.org/wiki/Data_buffer

Endianness http://en.wikipedia.org/wiki/Endianness

请戳 GitHub 原文: https://github.com/wangzhiwubigdata/God-Of-BigData

关注公众号, 内推, 面试, 资源下载, 关注更多大数据技术~
大数据成神之路~ 预计更新 500+ 篇文章,已经更新 60+ 篇~

正文完
 0