关于java:netty系列之netty中的核心MessageToByte编码器

42次阅读

共计 4620 个字符,预计需要花费 12 分钟才能阅读完成。

简介

之前的文章中,咱们解说了 netty 中从一个 message 转换成为另外一个 message 的框架叫做 MessageToMessage 编码器。然而 message to message 只思考了 channel 中音讯在处理过程中的转换,然而咱们晓得 channel 中最终传输的数据肯定是 ByteBuf,所以咱们还须要一个 message 和 ByteBuf 互相转换的框架,这个框架就叫做 MessageToByte。

留神,这里的 byte 指的是 ByteBuf 而不是 byte 这个字节类型。

MessageToByte 框架简介

为了不便扩大和用户的自定义,netty 封装了一套 MessageToByte 框架,这个框架中有三个外围的类,别离是 MessageToByteEncoder,ByteToMessageDecoder 和 ByteToMessageCodec。

咱们别离看一下这三个外围类的定义:

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter 
public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler 

这三个类别离继承自 ChannelOutboundHandlerAdapter,ChannelInboundHandlerAdapter 和 ChannelDuplexHandler,别离示意的是向 channel 中写音讯,从 channel 中读音讯和一个向 channel 中读写音讯的双向操作。

这三个类都是抽象类,接下来咱们会详细分析这三个类的具体实现逻辑。

MessageToByteEncoder

先来看 encoder,如果你比照 MessageToByteEncoder 和 MessageToMessageEncoder 的源码实现,能够发现他们有诸多相似之处。

首先在 MessageToByteEncoder 中定义了一个用作音讯类型匹配的 TypeParameterMatcher。

这个 matcher 用来匹配收到的音讯类型,如果类型匹配则进行音讯的转换操作,否则间接将音讯写入 channel 中。

和 MessageToMessageEncoder 不同的是,MessageToByteEncoder 多了一个 preferDirect 字段,这个字段示意音讯转换成为 ByteBuf 的时候是应用 diret Buf 还是 heap Buf。

这个字段的应用状况如下:

    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,
                               boolean preferDirect) throws Exception {if (preferDirect) {return ctx.alloc().ioBuffer();} else {return ctx.alloc().heapBuffer();}
    }

最初来看一下它的外围办法 write:

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {if (acceptOutboundMessage(msg)) {@SuppressWarnings("unchecked")
                I cast = (I) msg;
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {encode(ctx, cast, buf);
                } finally {ReferenceCountUtil.release(cast);
                }

                if (buf.isReadable()) {ctx.write(buf, promise);
                } else {buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {ctx.write(msg, promise);
            }
        } catch (EncoderException e) {throw e;} catch (Throwable e) {throw new EncoderException(e);
        } finally {if (buf != null) {buf.release();
            }
        }
    }

下面咱们曾经提到了,write 办法首先通过 matcher 来判断是否是要承受的音讯类型,如果是的话就调用 encode 办法,将音讯对象转换成为 ByteBuf,如果不是,则间接将音讯写入 channel 中。

和 MessageToMessageEncoder 不同的是,encode 办法须要传入一个 ByteBuf 对象,而不是 CodecOutputList。

MessageToByteEncoder 有一个须要实现的形象办法 encode 如下,

    protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;

ByteToMessageDecoder

ByteToMessageDecoder 用来将 channel 中的 ByteBuf 音讯转换成为特定的音讯类型,其中 Decoder 中最重要的办法就是好 channelRead 办法, 如下所示:

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof ByteBuf) {CodecOutputList out = CodecOutputList.newInstance();
            try {
                first = cumulation == null;
                cumulation = cumulator.cumulate(ctx.alloc(),
                        first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {throw e;} catch (Exception e) {throw new DecoderException(e);
            } finally {
                try {if (cumulation != null && !cumulation.isReadable()) {
                        numReads = 0;
                        cumulation.release();
                        cumulation = null;
                    } else if (++numReads >= discardAfterReads) {
                        numReads = 0;
                        discardSomeReadBytes();}

                    int size = out.size();
                    firedChannelRead |= out.insertSinceRecycled();
                    fireChannelRead(ctx, out, size);
                } finally {out.recycle();
                }
            }
        } else {ctx.fireChannelRead(msg);
        }
    }

channelRead 接管要进行音讯读取的 Object 对象,因为这里只承受 ByteBuf 音讯,所以在办法外部调用了 msg instanceof ByteBuf 来判断音讯的类型,如果不是 ByteBuf 类型的音讯则不进行音讯的转换。

输入的对象是 CodecOutputList,在将 ByteBuf 转换成为 CodecOutputList 之后,调用 fireChannelRead 办法将 out 对象传递上来。

这里的要害就是如何将接管到的 ByteBuf 转换成为 CodecOutputList。

转换的办法叫做 callDecode,它接管一个叫做 cumulation 的参数,在下面的办法中,咱们还看到一个和 cumulation 十分相似的名称叫做 cumulator。那么他们两个有什么区别呢?

在 ByteToMessageDecoder 中 cumulation 是一个 ByteBuf 对象,而 Cumulator 是一个接口,这个接口定义了一个 cumulate 办法:

    public interface Cumulator {ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
    }

Cumulator 用来将传入的 ByteBuf 合并成为一个新的 ByteBuf。

ByteToMessageDecoder 中定义了两种 Cumulator,别离是 MERGE_CUMULATOR 和 COMPOSITE_CUMULATOR。

MERGE_CUMULATOR 是将传入的 ByteBuf 通过 memory copy 的形式拷贝到指标 ByteBuf cumulation 中。

而 COMPOSITE_CUMULATOR 则是将 ByteBuf 增加到一个 CompositeByteBuf 的构造中,并不做 memory copy,因为指标的构造比较复杂,所以速度会比间接进行 memory copy 要慢。

用户要扩大的办法就是 decode 办法,用来将一个 ByteBuf 转换成为其余对象:

    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

ByteToMessageCodec

最初要介绍的类是 ByteToMessageCodec,ByteToMessageCodec 示意的是 message 和 ByteBuf 之间的相互转换,它外面的 encoder 和 decoder 别离就是下面讲到的 MessageToByteEncoder 和 ByteToMessageDecoder。

用户能够继承 ByteToMessageCodec 来同时实现 encode 和 decode 的性能,所以须要实现 encode 和 decode 这两个办法:

    protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;

    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

ByteToMessageCodec 的实质就是封装了 MessageToByteEncoder 和 ByteToMessageDecoder,而后实现了编码和解码的性能。

总结

如果想实现 ByteBuf 和用户自定义音讯的间接转换,那么抉择 netty 提供的下面三个编码器是一个很好的抉择。

本文已收录于 http://www.flydean.com/14-0-2-netty-codec-msg-to-bytebuf/

最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

欢送关注我的公众号:「程序那些事」, 懂技术,更懂你!

正文完
 0