简介

之前的文章中,咱们解说了netty中从一个message转换成为另外一个message的框架叫做MessageToMessage编码器。然而message to message只思考了channel中音讯在处理过程中的转换,然而咱们晓得channel中最终传输的数据肯定是ByteBuf,所以咱们还须要一个message和ByteBuf互相转换的框架,这个框架就叫做MessageToByte。

留神,这里的byte指的是ByteBuf而不是byte这个字节类型。

MessageToByte框架简介

为了不便扩大和用户的自定义,netty封装了一套MessageToByte框架,这个框架中有三个外围的类,别离是MessageToByteEncoder,ByteToMessageDecoder和ByteToMessageCodec。

咱们别离看一下这三个外围类的定义:

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter 
public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler 

这三个类别离继承自ChannelOutboundHandlerAdapter,ChannelInboundHandlerAdapter和ChannelDuplexHandler,别离示意的是向channel中写音讯,从channel中读音讯和一个向channel中读写音讯的双向操作。

这三个类都是抽象类,接下来咱们会详细分析这三个类的具体实现逻辑。

MessageToByteEncoder

先来看encoder,如果你比照MessageToByteEncoder和MessageToMessageEncoder的源码实现,能够发现他们有诸多相似之处。

首先在MessageToByteEncoder中定义了一个用作音讯类型匹配的TypeParameterMatcher。

这个matcher用来匹配收到的音讯类型,如果类型匹配则进行音讯的转换操作,否则间接将音讯写入channel中。

和MessageToMessageEncoder不同的是,MessageToByteEncoder多了一个preferDirect字段,这个字段示意音讯转换成为ByteBuf的时候是应用diret Buf还是heap Buf。

这个字段的应用状况如下:

    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,                               boolean preferDirect) throws Exception {        if (preferDirect) {            return ctx.alloc().ioBuffer();        } else {            return ctx.alloc().heapBuffer();        }    }

最初来看一下它的外围办法write:

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {        ByteBuf buf = null;        try {            if (acceptOutboundMessage(msg)) {                @SuppressWarnings("unchecked")                I cast = (I) msg;                buf = allocateBuffer(ctx, cast, preferDirect);                try {                    encode(ctx, cast, buf);                } finally {                    ReferenceCountUtil.release(cast);                }                if (buf.isReadable()) {                    ctx.write(buf, promise);                } else {                    buf.release();                    ctx.write(Unpooled.EMPTY_BUFFER, promise);                }                buf = null;            } else {                ctx.write(msg, promise);            }        } catch (EncoderException e) {            throw e;        } catch (Throwable e) {            throw new EncoderException(e);        } finally {            if (buf != null) {                buf.release();            }        }    }

下面咱们曾经提到了,write办法首先通过matcher来判断是否是要承受的音讯类型,如果是的话就调用encode办法,将音讯对象转换成为ByteBuf,如果不是,则间接将音讯写入channel中。

和MessageToMessageEncoder不同的是,encode办法须要传入一个ByteBuf对象,而不是CodecOutputList。

MessageToByteEncoder有一个须要实现的形象办法encode如下,

    protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;

ByteToMessageDecoder

ByteToMessageDecoder用来将channel中的ByteBuf音讯转换成为特定的音讯类型,其中Decoder中最重要的办法就是好channelRead办法,如下所示:

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        if (msg instanceof ByteBuf) {            CodecOutputList out = CodecOutputList.newInstance();            try {                first = cumulation == null;                cumulation = cumulator.cumulate(ctx.alloc(),                        first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);                callDecode(ctx, cumulation, out);            } catch (DecoderException e) {                throw e;            } catch (Exception e) {                throw new DecoderException(e);            } finally {                try {                    if (cumulation != null && !cumulation.isReadable()) {                        numReads = 0;                        cumulation.release();                        cumulation = null;                    } else if (++numReads >= discardAfterReads) {                        numReads = 0;                        discardSomeReadBytes();                    }                    int size = out.size();                    firedChannelRead |= out.insertSinceRecycled();                    fireChannelRead(ctx, out, size);                } finally {                    out.recycle();                }            }        } else {            ctx.fireChannelRead(msg);        }    }

channelRead接管要进行音讯读取的Object对象,因为这里只承受ByteBuf音讯,所以在办法外部调用了msg instanceof ByteBuf 来判断音讯的类型,如果不是ByteBuf类型的音讯则不进行音讯的转换。

输入的对象是CodecOutputList,在将ByteBuf转换成为CodecOutputList之后,调用fireChannelRead办法将out对象传递上来。

这里的要害就是如何将接管到的ByteBuf转换成为CodecOutputList。

转换的办法叫做callDecode,它接管一个叫做cumulation的参数,在下面的办法中,咱们还看到一个和cumulation十分相似的名称叫做cumulator。那么他们两个有什么区别呢?

在ByteToMessageDecoder中cumulation是一个ByteBuf对象,而Cumulator是一个接口,这个接口定义了一个cumulate办法:

    public interface Cumulator {        ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);    }

Cumulator用来将传入的ByteBuf合并成为一个新的ByteBuf。

ByteToMessageDecoder中定义了两种Cumulator,别离是MERGE_CUMULATOR和COMPOSITE_CUMULATOR。

MERGE_CUMULATOR是将传入的ByteBuf通过memory copy的形式拷贝到指标ByteBuf cumulation中。

而COMPOSITE_CUMULATOR则是将ByteBuf增加到一个 CompositeByteBuf 的构造中,并不做memory copy,因为指标的构造比较复杂,所以速度会比间接进行memory copy要慢。

用户要扩大的办法就是decode办法,用来将一个ByteBuf转换成为其余对象:

    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

ByteToMessageCodec

最初要介绍的类是ByteToMessageCodec,ByteToMessageCodec示意的是message和ByteBuf之间的相互转换,它外面的encoder和decoder别离就是下面讲到的MessageToByteEncoder和ByteToMessageDecoder。

用户能够继承ByteToMessageCodec来同时实现encode和decode的性能,所以须要实现encode和decode这两个办法:

    protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

ByteToMessageCodec的实质就是封装了MessageToByteEncoder和ByteToMessageDecoder,而后实现了编码和解码的性能。

总结

如果想实现ByteBuf和用户自定义音讯的间接转换,那么抉择netty提供的下面三个编码器是一个很好的抉择。

本文已收录于 http://www.flydean.com/14-0-2-netty-codec-msg-to-bytebuf/

最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

欢送关注我的公众号:「程序那些事」,懂技术,更懂你!