解码

ByteToMessageDecoder

解码步骤

  1. 累加字节流到cumulation
  2. 调用子类的decode办法解析(子类实现了各种解码形式)
  3. 将解析到的ByteBuf向下流传
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {    @Override    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {        ByteBuf buffer;        // 如果累加器容量不够,就进行扩容        if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()                || cumulation.refCnt() > 1) {            buffer = expandCumulation(alloc, cumulation, in.readableBytes());        } else {            buffer = cumulation;        }        // 累加数据        buffer.writeBytes(in);        in.release();        return buffer;    }};
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {    if (msg instanceof ByteBuf) {        CodecOutputList out = CodecOutputList.newInstance();        try {            ByteBuf data = (ByteBuf) msg;            first = cumulation == null;            // 第一次写入累加器            if (first) {                cumulation = data;            } else {                // 累加数据                cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);            }            // 解析,将解析的后果传入out(out是一个List)            callDecode(ctx, cumulation, out);        } catch (DecoderException e) {            throw e;        } catch (Throwable t) {            throw new DecoderException(t);        } finally {            if (cumulation != null && !cumulation.isReadable()) {                numReads = 0;                cumulation.release();                cumulation = null;            } else if (++ numReads >= discardAfterReads) {                numReads = 0;                discardSomeReadBytes();            }            int size = out.size();            decodeWasNull = !out.insertSinceRecycled();            // 将解析到的内容向下流传            fireChannelRead(ctx, out, size);            out.recycle();        }    } else {        ctx.fireChannelRead(msg);    }}protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {    try {        while (in.isReadable()) {            int outSize = out.size();            // 如果曾经解析出对象            if (outSize > 0) {                // 流传事件                fireChannelRead(ctx, out, outSize);                // 清空out                out.clear();                if (ctx.isRemoved()) {                    break;                }                outSize = 0;            }            int oldInputLength = in.readableBytes();            decode(ctx, in, out);            if (ctx.isRemoved()) {                break;            }            // outSize == out.size()表明本次解析没有解析出新的数据            if (outSize == out.size()) {                // 没有从累加器读取数据                // 以上两种状况同时产生,阐明累加器中的内容不足以拼装成一个残缺的数据包                // 所以就要进行解析,让累加器取获取更多的数据                if (oldInputLength == in.readableBytes()) {                    break;                } else {                    continue;                }            }            if (oldInputLength == in.readableBytes()) {                throw new DecoderException(                        StringUtil.simpleClassName(getClass()) +                        ".decode() did not read anything but decoded a message.");            }            if (isSingleDecode()) {                break;            }        }    } catch (DecoderException e) {        throw e;    } catch (Throwable cause) {        throw new DecoderException(cause);    }}

decoder

1.基于固定长度的解码器-FixedLengthFrameDecoder

/** * A decoder that splits the received {@link ByteBuf}s by the fixed number * of bytes. For example, if you received the following four fragmented packets: * <pre> * +---+----+------+----+ * | A | BC | DEFG | HI | * +---+----+------+----+ * </pre> * A {@link FixedLengthFrameDecoder}{@code (3)} will decode them into the * following three packets with the fixed length: * <pre> * +-----+-----+-----+ * | ABC | DEF | GHI | * +-----+-----+-----+ * </pre> */

每次从累加器读取指定长度的字节流解析。

2.基于行的解码器-LineBasedFrameDecoder

以换行符("\n"或"\r\n")为宰割,将字节流解析成残缺的数据包。
如果两个换行符之间的字节流超过可能解析的最大长度,就会把这两个换行符之间的内容抛弃。

if (length > maxLength) {    buffer.readerIndex(eol + delimLength);    fail(ctx, length);    return null;}

通过挪动ByteBuf的读指针实现,下次从挪动后的地位开始读,抛弃之前的内容。

3.基于分隔符的解码器-DelimiterBasedFrameDecoder

依据传入的分隔符,将分隔符之间的字节流解析成数据包。

如果传入的分隔符只有"\n"和"\r\n",间接调用行解码器。

if (lineBasedDecoder != null) {    return lineBasedDecoder.decode(ctx, buffer);}

失常状况下,每次都找到下一个最近的分隔符。同样,如果超过可能解析的最大长度,会把这段抛弃。

4.基于长度域的解码器

长度域示意从以后(长度域)地位要向后解析多少字节

lengthFieldOffset: 长度域在以后字节流的偏移量
lengthFieldLength: 长度域所占字节数
lengthAdjustment: 长度域加上lengthAdjustment才是真正要解析的字节长度
initialBytesToStrip: 从字节流开始的地位须要跳过多少字节

/*** <pre> * lengthFieldOffset   = 1 (= the length of HDR1) * lengthFieldLength   = 2 * <b>lengthAdjustment</b>    = <b>1</b> (= the length of HDR2) * <b>initialBytesToStrip</b> = <b>3</b> (= the length of HDR1 + LEN) * * BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes) * +------+--------+------+----------------+      +------+----------------+ * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content | * | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" | * +------+--------+------+----------------+      +------+----------------+ * </pre> */

解码时,先依据以上属性计算要解码的字节流范畴,而后对它们分段解析成数据包。

编码

MessageToByteEncoder

编码总体流程: 匹配对象 -> 分配内存 -> 编码实现 -> 开释对象 -> 流传数据 -> 开释内存

MessageToByteEncoder#write()

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {    ByteBuf buf = null;    try {        // 匹配对象: 查看编码器能不能解决这个对象(依据定义解码器时传入的泛型判断)        if (acceptOutboundMessage(msg)) {            @SuppressWarnings("unchecked")            I cast = (I) msg;            // 调配一个ByteBuf用于寄存编码后的数据            buf = allocateBuffer(ctx, cast, preferDirect);            try {                // 自定义编码方式,对msg编码,将后果放到buf里                encode(ctx, cast, buf);            } finally {                // 开释原始对象                ReferenceCountUtil.release(cast);            }            if (buf.isReadable()) {                // 向后面的节点流传                ctx.write(buf, promise);            } else {                buf.release();                ctx.write(Unpooled.EMPTY_BUFFER, promise);            }            buf = null;        } else {            ctx.write(msg, promise);        }    } catch (EncoderException e) {        throw e;    } catch (Throwable e) {        throw new EncoderException(e);    } finally {        if (buf != null) {            // 开释buf            buf.release();        }    }}

自定义的一个编码器

public class Encoder extends MessageToByteEncoder<User> {    @Override    protected void encode(ChannelHandlerContext ctx, User user, ByteBuf out) throws Exception {        byte[] bytes = user.getName().getBytes();        out.writeInt(4 + bytes.length);        out.writeInt(user.getAge());        out.writeBytes(bytes);    }}

而后能够将这个编码器退出到pipeline,对User类型的对象进行编码。

pipeline中的节点调用write办法后,会将编码后的数据流传到head节点,最终调用head节点的write办法

head节点的unsafe.write()

public final void write(Object msg, ChannelPromise promise) {    assertEventLoop();    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;    if (outboundBuffer == null) {        safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);        ReferenceCountUtil.release(msg);        return;    }    int size;    try {        // direct化ByteBuf: 如果msg是堆内的,转化为堆外        msg = filterOutboundMessage(msg);        size = pipeline.estimatorHandle().size(msg);        if (size < 0) {            size = 0;        }    } catch (Throwable t) {        safeSetFailure(promise, t);        ReferenceCountUtil.release(msg);        return;    }    // 将msg增加到写缓冲区    outboundBuffer.addMessage(msg, size, promise);}public void addMessage(Object msg, int size, ChannelPromise promise) {    // 先将msg封装成Entry    Entry entry = Entry.newInstance(msg, size, total(msg), promise);    if (tailEntry == null) {        flushedEntry = null;        tailEntry = entry;    } else {        Entry tail = tailEntry;        tail.next = entry;        tailEntry = entry;    }    if (unflushedEntry == null) {        unflushedEntry = entry;    }    // 设置写状态    incrementPendingOutboundBytes(size, false);}

outboundBuffer相当于一个链表,通过三个指针标识msg的状态

flushedEntry: 曾经被刷新的msg
unflushedEntry: 未被刷新的msg
tailEntry: 链表末端的msg

屡次write()后三个指针的指向:

unflushedEntry到tailEntry之间的都没有被刷新

incrementPendingOutboundBytes会将outboundBuffer里有多少待刷新的字节统计进去,如果超过它的阈值,将会标记为不可写入。
要等到下次flush后,能力将状态改为可写。

head节点的unsafe.flush()

将outboundBuffer累加的字节传递给socket

public final void flush() {    assertEventLoop();    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;    if (outboundBuffer == null) {        return;    }    outboundBuffer.addFlush();    flush0();}// 增加刷新标记并设置写状态public void addFlush() {    // 找到还没有刷新的第一个Entry    Entry entry = unflushedEntry;    if (entry != null) {        if (flushedEntry == null) {            // there is no flushedEntry yet, so start with the entry            flushedEntry = entry;        }        do {            flushed ++;            if (!entry.promise.setUncancellable()) {                // Was cancelled so make sure we free up memory and notify about the freed bytes                int pending = entry.cancel();                // 如果之前outboundBuffer被标记为不可写,查看outboundBuffer里的字节数,判断是否可写并设置为可写状态                decrementPendingOutboundBytes(pending, false, true);            }            entry = entry.next;        } while (entry != null);        unflushedEntry = null;    }}

调用addFlush后的状态

将要从第一个Entry向后把所有Entry刷新到socket。

flush0() -> doWrite() -> doWriteBytes()

protected int doWriteBytes(ByteBuf buf) throws Exception {    final int expectedWrittenBytes = buf.readableBytes();    // 用到了JDK创立的channel,也就是socket连贯    return buf.readBytes(javaChannel(), expectedWrittenBytes);}public int readBytes(GatheringByteChannel out, int length) throws IOException {    checkReadableBytes(length);    int readBytes = getBytes(readerIndex, out, length, true);    readerIndex += readBytes;    // 返回刷新的字节数    return readBytes;}private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {    checkIndex(index, length);    if (length == 0) {        return 0;    }    // JDK Nio的ByteBuffer    // 将netty的ByteBuf转到ByteBuffer    ByteBuffer tmpBuf;    if (internal) {        tmpBuf = internalNioBuffer();    } else {        tmpBuf = memory.duplicate();    }    index = idx(index);    tmpBuf.clear().position(index).limit(index + length);    // 最终刷新到到socket,返回字节数    return out.write(tmpBuf);}