解码
ByteToMessageDecoder
解码步骤
- 累加字节流到 cumulation
- 调用子类的 decode 办法解析(子类实现了各种解码形式)
- 将解析到的 ByteBuf 向下流传
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
// 如果累加器容量不够,就进行扩容
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1) {buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {buffer = cumulation;}
// 累加数据
buffer.writeBytes(in);
in.release();
return buffer;
}
};
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof ByteBuf) {CodecOutputList out = CodecOutputList.newInstance();
try {ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
// 第一次写入累加器
if (first) {cumulation = data;} else {
// 累加数据
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 解析,将解析的后果传入 out(out 是一个 List)
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {throw e;} catch (Throwable t) {throw new DecoderException(t);
} finally {if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
// 将解析到的内容向下流传
fireChannelRead(ctx, out, size);
out.recycle();}
} else {ctx.fireChannelRead(msg);
}
}
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {while (in.isReadable()) {int outSize = out.size();
// 如果曾经解析出对象
if (outSize > 0) {
// 流传事件
fireChannelRead(ctx, out, outSize);
// 清空 out
out.clear();
if (ctx.isRemoved()) {break;}
outSize = 0;
}
int oldInputLength = in.readableBytes();
decode(ctx, in, out);
if (ctx.isRemoved()) {break;}
// outSize == out.size()表明本次解析没有解析出新的数据
if (outSize == out.size()) {
// 没有从累加器读取数据
// 以上两种状况同时产生,阐明累加器中的内容不足以拼装成一个残缺的数据包
// 所以就要进行解析,让累加器取获取更多的数据
if (oldInputLength == in.readableBytes()) {break;} else {continue;}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {break;}
}
} catch (DecoderException e) {throw e;} catch (Throwable cause) {throw new DecoderException(cause);
}
}
decoder
1. 基于固定长度的解码器 -FixedLengthFrameDecoder
/**
* A decoder that splits the received {@link ByteBuf}s by the fixed number
* of bytes. For example, if you received the following four fragmented packets:
* <pre>
* +---+----+------+----+
* | A | BC | DEFG | HI |
* +---+----+------+----+
* </pre>
* A {@link FixedLengthFrameDecoder}{@code (3)} will decode them into the
* following three packets with the fixed length:
* <pre>
* +-----+-----+-----+
* | ABC | DEF | GHI |
* +-----+-----+-----+
* </pre>
*/
每次从累加器读取指定长度的字节流解析。
2. 基于行的解码器 -LineBasedFrameDecoder
以换行符 (“\n” 或 ”\r\n”) 为宰割,将字节流解析成残缺的数据包。
如果两个换行符之间的字节流超过可能解析的最大长度,就会把这两个换行符之间的内容抛弃。
if (length > maxLength) {buffer.readerIndex(eol + delimLength);
fail(ctx, length);
return null;
}
通过挪动 ByteBuf 的读指针实现,下次从挪动后的地位开始读,抛弃之前的内容。
3. 基于分隔符的解码器 -DelimiterBasedFrameDecoder
依据传入的分隔符,将分隔符之间的字节流解析成数据包。
如果传入的分隔符只有 ”\n” 和 ”\r\n”,间接调用行解码器。
if (lineBasedDecoder != null) {return lineBasedDecoder.decode(ctx, buffer);
}
失常状况下,每次都找到下一个最近的分隔符。同样,如果超过可能解析的最大长度,会把这段抛弃。
4. 基于长度域的解码器
长度域示意从以后 (长度域) 地位要向后解析多少字节
lengthFieldOffset: 长度域在以后字节流的偏移量
lengthFieldLength: 长度域所占字节数
lengthAdjustment: 长度域加上 lengthAdjustment 才是真正要解析的字节长度
initialBytesToStrip: 从字节流开始的地位须要跳过多少字节
/**
* <pre>
* lengthFieldOffset = 1 (= the length of HDR1)
* lengthFieldLength = 2
* <b>lengthAdjustment</b> = <b>1</b> (= the length of HDR2)
* <b>initialBytesToStrip</b> = <b>3</b> (= the length of HDR1 + LEN)
*
* BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
* +------+--------+------+----------------+ +------+----------------+
* | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
* | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
* +------+--------+------+----------------+ +------+----------------+
* </pre>
*/
解码时,先依据以上属性计算要解码的字节流范畴,而后对它们分段解析成数据包。
编码
MessageToByteEncoder
编码总体流程: 匹配对象 -> 分配内存 -> 编码实现 -> 开释对象 -> 流传数据 -> 开释内存
MessageToByteEncoder#write()
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {// 匹配对象: 查看编码器能不能解决这个对象(依据定义解码器时传入的泛型判断)
if (acceptOutboundMessage(msg)) {@SuppressWarnings("unchecked")
I cast = (I) msg;
// 调配一个 ByteBuf 用于寄存编码后的数据
buf = allocateBuffer(ctx, cast, preferDirect);
try {
// 自定义编码方式,对 msg 编码,将后果放到 buf 里
encode(ctx, cast, buf);
} finally {
// 开释原始对象
ReferenceCountUtil.release(cast);
}
if (buf.isReadable()) {
// 向后面的节点流传
ctx.write(buf, promise);
} else {buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {ctx.write(msg, promise);
}
} catch (EncoderException e) {throw e;} catch (Throwable e) {throw new EncoderException(e);
} finally {if (buf != null) {
// 开释 buf
buf.release();}
}
}
自定义的一个编码器
public class Encoder extends MessageToByteEncoder<User> {
@Override
protected void encode(ChannelHandlerContext ctx, User user, ByteBuf out) throws Exception {byte[] bytes = user.getName().getBytes();
out.writeInt(4 + bytes.length);
out.writeInt(user.getAge());
out.writeBytes(bytes);
}
}
而后能够将这个编码器退出到 pipeline,对 User 类型的对象进行编码。
pipeline 中的节点调用 write 办法后,会将编码后的数据流传到 head 节点,最终调用 head 节点的 write 办法
head 节点的 unsafe.write()
public final void write(Object msg, ChannelPromise promise) {assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
// direct 化 ByteBuf: 如果 msg 是堆内的,转化为堆外
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {size = 0;}
} catch (Throwable t) {safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
// 将 msg 增加到写缓冲区
outboundBuffer.addMessage(msg, size, promise);
}
public void addMessage(Object msg, int size, ChannelPromise promise) {
// 先将 msg 封装成 Entry
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {unflushedEntry = entry;}
// 设置写状态
incrementPendingOutboundBytes(size, false);
}
outboundBuffer 相当于一个链表,通过三个指针标识 msg 的状态
flushedEntry: 曾经被刷新的 msg
unflushedEntry: 未被刷新的 msg
tailEntry: 链表末端的 msg
屡次 write()后三个指针的指向:
unflushedEntry 到 tailEntry 之间的都没有被刷新
incrementPendingOutboundBytes 会将 outboundBuffer 里有多少待刷新的字节统计进去,如果超过它的阈值,将会标记为不可写入。
要等到下次 flush 后,能力将状态改为可写。
head 节点的 unsafe.flush()
将 outboundBuffer 累加的字节传递给 socket
public final void flush() {assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {return;}
outboundBuffer.addFlush();
flush0();}
// 增加刷新标记并设置写状态
public void addFlush() {
// 找到还没有刷新的第一个 Entry
Entry entry = unflushedEntry;
if (entry != null) {if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
// 如果之前 outboundBuffer 被标记为不可写,查看 outboundBuffer 里的字节数,判断是否可写并设置为可写状态
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
unflushedEntry = null;
}
}
调用 addFlush 后的状态
将要从第一个 Entry 向后把所有 Entry 刷新到 socket。
flush0() -> doWrite() -> doWriteBytes()
protected int doWriteBytes(ByteBuf buf) throws Exception {final int expectedWrittenBytes = buf.readableBytes();
// 用到了 JDK 创立的 channel,也就是 socket 连贯
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
public int readBytes(GatheringByteChannel out, int length) throws IOException {checkReadableBytes(length);
int readBytes = getBytes(readerIndex, out, length, true);
readerIndex += readBytes;
// 返回刷新的字节数
return readBytes;
}
private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {checkIndex(index, length);
if (length == 0) {return 0;}
// JDK Nio 的 ByteBuffer
// 将 netty 的 ByteBuf 转到 ByteBuffer
ByteBuffer tmpBuf;
if (internal) {tmpBuf = internalNioBuffer();
} else {tmpBuf = memory.duplicate();
}
index = idx(index);
tmpBuf.clear().position(index).limit(index + length);
// 最终刷新到到 socket,返回字节数
return out.write(tmpBuf);
}