解码
ByteToMessageDecoder
解码步骤
- 累加字节流到cumulation
- 调用子类的decode办法解析(子类实现了各种解码形式)
- 将解析到的ByteBuf向下流传
public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { ByteBuf buffer; // 如果累加器容量不够,就进行扩容 if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1) { buffer = expandCumulation(alloc, cumulation, in.readableBytes()); } else { buffer = cumulation; } // 累加数据 buffer.writeBytes(in); in.release(); return buffer; }};
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; // 第一次写入累加器 if (first) { cumulation = data; } else { // 累加数据 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } // 解析,将解析的后果传入out(out是一个List) callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { numReads = 0; discardSomeReadBytes(); } int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); // 将解析到的内容向下流传 fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); }}protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { while (in.isReadable()) { int outSize = out.size(); // 如果曾经解析出对象 if (outSize > 0) { // 流传事件 fireChannelRead(ctx, out, outSize); // 清空out out.clear(); if (ctx.isRemoved()) { break; } outSize = 0; } int oldInputLength = in.readableBytes(); decode(ctx, in, out); if (ctx.isRemoved()) { break; } // outSize == out.size()表明本次解析没有解析出新的数据 if (outSize == out.size()) { // 没有从累加器读取数据 // 以上两种状况同时产生,阐明累加器中的内容不足以拼装成一个残缺的数据包 // 所以就要进行解析,让累加器取获取更多的数据 if (oldInputLength == in.readableBytes()) { break; } else { continue; } } if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Throwable cause) { throw new DecoderException(cause); }}
decoder
1.基于固定长度的解码器-FixedLengthFrameDecoder
/** * A decoder that splits the received {@link ByteBuf}s by the fixed number * of bytes. For example, if you received the following four fragmented packets: * <pre> * +---+----+------+----+ * | A | BC | DEFG | HI | * +---+----+------+----+ * </pre> * A {@link FixedLengthFrameDecoder}{@code (3)} will decode them into the * following three packets with the fixed length: * <pre> * +-----+-----+-----+ * | ABC | DEF | GHI | * +-----+-----+-----+ * </pre> */
每次从累加器读取指定长度的字节流解析。
2.基于行的解码器-LineBasedFrameDecoder
以换行符("\n"或"\r\n")为宰割,将字节流解析成残缺的数据包。
如果两个换行符之间的字节流超过可能解析的最大长度,就会把这两个换行符之间的内容抛弃。
if (length > maxLength) { buffer.readerIndex(eol + delimLength); fail(ctx, length); return null;}
通过挪动ByteBuf的读指针实现,下次从挪动后的地位开始读,抛弃之前的内容。
3.基于分隔符的解码器-DelimiterBasedFrameDecoder
依据传入的分隔符,将分隔符之间的字节流解析成数据包。
如果传入的分隔符只有"\n"和"\r\n",间接调用行解码器。
if (lineBasedDecoder != null) { return lineBasedDecoder.decode(ctx, buffer);}
失常状况下,每次都找到下一个最近的分隔符。同样,如果超过可能解析的最大长度,会把这段抛弃。
4.基于长度域的解码器
长度域示意从以后(长度域)地位要向后解析多少字节
lengthFieldOffset: 长度域在以后字节流的偏移量
lengthFieldLength: 长度域所占字节数
lengthAdjustment: 长度域加上lengthAdjustment才是真正要解析的字节长度
initialBytesToStrip: 从字节流开始的地位须要跳过多少字节
/*** <pre> * lengthFieldOffset = 1 (= the length of HDR1) * lengthFieldLength = 2 * <b>lengthAdjustment</b> = <b>1</b> (= the length of HDR2) * <b>initialBytesToStrip</b> = <b>3</b> (= the length of HDR1 + LEN) * * BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes) * +------+--------+------+----------------+ +------+----------------+ * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content | * | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" | * +------+--------+------+----------------+ +------+----------------+ * </pre> */
解码时,先依据以上属性计算要解码的字节流范畴,而后对它们分段解析成数据包。
编码
MessageToByteEncoder
编码总体流程: 匹配对象 -> 分配内存 -> 编码实现 -> 开释对象 -> 流传数据 -> 开释内存
MessageToByteEncoder#write()
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try { // 匹配对象: 查看编码器能不能解决这个对象(依据定义解码器时传入的泛型判断) if (acceptOutboundMessage(msg)) { @SuppressWarnings("unchecked") I cast = (I) msg; // 调配一个ByteBuf用于寄存编码后的数据 buf = allocateBuffer(ctx, cast, preferDirect); try { // 自定义编码方式,对msg编码,将后果放到buf里 encode(ctx, cast, buf); } finally { // 开释原始对象 ReferenceCountUtil.release(cast); } if (buf.isReadable()) { // 向后面的节点流传 ctx.write(buf, promise); } else { buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else { ctx.write(msg, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally { if (buf != null) { // 开释buf buf.release(); } }}
自定义的一个编码器
public class Encoder extends MessageToByteEncoder<User> { @Override protected void encode(ChannelHandlerContext ctx, User user, ByteBuf out) throws Exception { byte[] bytes = user.getName().getBytes(); out.writeInt(4 + bytes.length); out.writeInt(user.getAge()); out.writeBytes(bytes); }}
而后能够将这个编码器退出到pipeline,对User类型的对象进行编码。
pipeline中的节点调用write办法后,会将编码后的数据流传到head节点,最终调用head节点的write办法
head节点的unsafe.write()
public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); ReferenceCountUtil.release(msg); return; } int size; try { // direct化ByteBuf: 如果msg是堆内的,转化为堆外 msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } // 将msg增加到写缓冲区 outboundBuffer.addMessage(msg, size, promise);}public void addMessage(Object msg, int size, ChannelPromise promise) { // 先将msg封装成Entry Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) { flushedEntry = null; tailEntry = entry; } else { Entry tail = tailEntry; tail.next = entry; tailEntry = entry; } if (unflushedEntry == null) { unflushedEntry = entry; } // 设置写状态 incrementPendingOutboundBytes(size, false);}
outboundBuffer相当于一个链表,通过三个指针标识msg的状态
flushedEntry: 曾经被刷新的msg
unflushedEntry: 未被刷新的msg
tailEntry: 链表末端的msg
屡次write()后三个指针的指向:
unflushedEntry到tailEntry之间的都没有被刷新
incrementPendingOutboundBytes会将outboundBuffer里有多少待刷新的字节统计进去,如果超过它的阈值,将会标记为不可写入。
要等到下次flush后,能力将状态改为可写。
head节点的unsafe.flush()
将outboundBuffer累加的字节传递给socket
public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0();}// 增加刷新标记并设置写状态public void addFlush() { // 找到还没有刷新的第一个Entry Entry entry = unflushedEntry; if (entry != null) { if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry; } do { flushed ++; if (!entry.promise.setUncancellable()) { // Was cancelled so make sure we free up memory and notify about the freed bytes int pending = entry.cancel(); // 如果之前outboundBuffer被标记为不可写,查看outboundBuffer里的字节数,判断是否可写并设置为可写状态 decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; } while (entry != null); unflushedEntry = null; }}
调用addFlush后的状态
将要从第一个Entry向后把所有Entry刷新到socket。
flush0() -> doWrite() -> doWriteBytes()
protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); // 用到了JDK创立的channel,也就是socket连贯 return buf.readBytes(javaChannel(), expectedWrittenBytes);}public int readBytes(GatheringByteChannel out, int length) throws IOException { checkReadableBytes(length); int readBytes = getBytes(readerIndex, out, length, true); readerIndex += readBytes; // 返回刷新的字节数 return readBytes;}private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException { checkIndex(index, length); if (length == 0) { return 0; } // JDK Nio的ByteBuffer // 将netty的ByteBuf转到ByteBuffer ByteBuffer tmpBuf; if (internal) { tmpBuf = internalNioBuffer(); } else { tmpBuf = memory.duplicate(); } index = idx(index); tmpBuf.clear().position(index).limit(index + length); // 最终刷新到到socket,返回字节数 return out.write(tmpBuf);}