简介

在netty中咱们须要传递各种类型的音讯,这些message能够是字符串,能够是数组,也能够是自定义的对象。不同的对象之间可能须要相互转换,这样就须要一个能够自在进行转换的转换器,为了对立编码规定和不便用户的扩大,netty提供了一套音讯之间进行转换的框架。本文将会解说这个框架的具体实现。

框架简介

netty为音讯和音讯之间的转换提供了三个类,这三个类都是抽象类,别离是MessageToMessageDecoder,MessageToMessageEncoder和MessageToMessageCodec。

先来看下他们的定义:

public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter
public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler 

MessageToMessageEncoder继承自ChannelOutboundHandlerAdapter,负责向channel中写音讯。

MessageToMessageDecoder继承自ChannelInboundHandlerAdapter,负责从channel中读取音讯。

MessageToMessageCodec继承自ChannelDuplexHandler,它是一个双向的handler,能够从channel中读取音讯,也能够向channel中写入音讯。

有了这三个抽象类,咱们再看下这三个类的具体实现。

MessageToMessageEncoder

先看一下音讯的编码器MessageToMessageEncoder,编码器中最重要的办法就是write,看下write的实现:

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {        CodecOutputList out = null;        try {            if (acceptOutboundMessage(msg)) {                out = CodecOutputList.newInstance();                @SuppressWarnings("unchecked")                I cast = (I) msg;                try {                    encode(ctx, cast, out);                } finally {                    ReferenceCountUtil.release(cast);                }                if (out.isEmpty()) {                    throw new EncoderException(                            StringUtil.simpleClassName(this) + " must produce at least one message.");                }            } else {                ctx.write(msg, promise);            }        } catch (EncoderException e) {            throw e;        } catch (Throwable t) {            throw new EncoderException(t);        } finally {            if (out != null) {                try {                    final int sizeMinusOne = out.size() - 1;                    if (sizeMinusOne == 0) {                        ctx.write(out.getUnsafe(0), promise);                    } else if (sizeMinusOne > 0) {                        if (promise == ctx.voidPromise()) {                            writeVoidPromise(ctx, out);                        } else {                            writePromiseCombiner(ctx, out, promise);                        }                    }                } finally {                    out.recycle();                }            }        }    }

write办法承受一个须要转换的原始对象msg,和一个示意channel读写进度的ChannelPromise。

首先会对msg进行一个类型判断,这个判断办法是在acceptOutboundMessage中实现的。

    public boolean acceptOutboundMessage(Object msg) throws Exception {        return matcher.match(msg);    }

这里的matcher是一个TypeParameterMatcher对象,它是一个在MessageToMessageEncoder构造函数中初始化的属性:

    protected MessageToMessageEncoder() {        matcher = TypeParameterMatcher.find(this, MessageToMessageEncoder.class, "I");    }

这里的I就是要匹配的msg类型。

如果不匹配,则持续调用ctx.write(msg, promise); 将音讯不做任何转换的写入到channel中,供下一个handler调用。

如果匹配胜利,则会调用外围的encode办法:encode(ctx, cast, out);

留神,encode办法在MessageToMessageEncoder中是一个形象办法,须要用户在继承类中自行扩大。

encode办法实际上是将msg对象转换成为要转换的对象,而后增加到out中。这个out是一个list对象,具体而言是一个CodecOutputList对象,作为一个list,out是一个能够存储多个对象的列表。

那么out是什么时候写入到channel中去的呢?

别急,在write办法中最初有一个finally代码块,在这个代码块中,会将out写入到channel外面。

因为out是一个List,可能会呈现out中的对象局部写胜利的状况,所以这里须要特地解决。

首先判断out中是否只有一个对象,如果是一个对象,那么间接写到channel中即可。如果out中多于一个对象,那么又分成两种状况,第一种状况是传入的promise是一个voidPromise,那么调用writeVoidPromise办法。

什么是voidPromise呢?

咱们晓得Promise有多种状态,能够通过promise的状态变动理解到数据写入的状况。对于voidPromise来说,它只关怀一种失败的状态,其余的状态都不关怀。

如果用户关怀promise的其余状态,则会调用writePromiseCombiner办法,将多个对象的状态合并为一个promise返回。

事实上,在writeVoidPromise和writePromiseCombiner中,out中的对象都是一个一个的取出来,写入到channel中的,所以才会生成多个promise和须要将promise进行合并的状况:

    private static void writeVoidPromise(ChannelHandlerContext ctx, CodecOutputList out) {        final ChannelPromise voidPromise = ctx.voidPromise();        for (int i = 0; i < out.size(); i++) {            ctx.write(out.getUnsafe(i), voidPromise);        }    }    private static void writePromiseCombiner(ChannelHandlerContext ctx, CodecOutputList out, ChannelPromise promise) {        final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());        for (int i = 0; i < out.size(); i++) {            combiner.add(ctx.write(out.getUnsafe(i)));        }        combiner.finish(promise);    }

MessageToMessageDecoder

和encoder对应的就是decoder了,MessageToMessageDecoder的逻辑和MessageToMessageEncoder差不多。

首先也是须要判断读取的音讯类型,这里也定义了一个TypeParameterMatcher对象,用来检测传入的音讯类型:

    protected MessageToMessageDecoder() {        matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I");    }

decoder中重要的办法是channelRead办法,咱们看下它的实现:

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        CodecOutputList out = CodecOutputList.newInstance();        try {            if (acceptInboundMessage(msg)) {                @SuppressWarnings("unchecked")                I cast = (I) msg;                try {                    decode(ctx, cast, out);                } finally {                    ReferenceCountUtil.release(cast);                }            } else {                out.add(msg);            }        } catch (DecoderException e) {            throw e;        } catch (Exception e) {            throw new DecoderException(e);        } finally {            try {                int size = out.size();                for (int i = 0; i < size; i++) {                    ctx.fireChannelRead(out.getUnsafe(i));                }            } finally {                out.recycle();            }        }    }

首先检测msg的类型,只有承受的类型才进行decode解决,否则将msg退出到CodecOutputList中。

最初在finally代码块中将out中的对象一个个取出来,调用ctx.fireChannelRead进行读取。

音讯转换的要害办法是decode,这个办法也是一个形象办法,须要在继承类中实现具体的性能。

MessageToMessageCodec

后面解说了一个编码器和一个解码器,他们都是单向的。最初要解说的codec叫做MessageToMessageCodec,这个codec是一个双向的,即能够接管音讯,也能够发送音讯。

先看下它的定义:

public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler

MessageToMessageCodec继承自ChannelDuplexHandler,接管两个泛型参数别离是INBOUND_IN和OUTBOUND_IN。

它定义了两个TypeParameterMatcher,别离用来过滤inboundMsg和outboundMsg:

    protected MessageToMessageCodec() {        inboundMsgMatcher = TypeParameterMatcher.find(this, MessageToMessageCodec.class, "INBOUND_IN");        outboundMsgMatcher = TypeParameterMatcher.find(this, MessageToMessageCodec.class, "OUTBOUND_IN");    }

别离实现了channelRead和write办法,用来读写音讯:

    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        decoder.channelRead(ctx, msg);    }    @Override    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {        encoder.write(ctx, msg, promise);    }

这里的decoder和encoder实际上就是后面咱们讲到的MessageToMessageDecoder和MessageToMessageEncoder:

    private final MessageToMessageEncoder<Object> encoder = new MessageToMessageEncoder<Object>() {        @Override        public boolean acceptOutboundMessage(Object msg) throws Exception {            return MessageToMessageCodec.this.acceptOutboundMessage(msg);        }        @Override        @SuppressWarnings("unchecked")        protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {            MessageToMessageCodec.this.encode(ctx, (OUTBOUND_IN) msg, out);        }    };    private final MessageToMessageDecoder<Object> decoder = new MessageToMessageDecoder<Object>() {        @Override        public boolean acceptInboundMessage(Object msg) throws Exception {            return MessageToMessageCodec.this.acceptInboundMessage(msg);        }        @Override        @SuppressWarnings("unchecked")        protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {            MessageToMessageCodec.this.decode(ctx, (INBOUND_IN) msg, out);        }    };

能够看到MessageToMessageCodec实际上就是对MessageToMessageDecoder和MessageToMessageEncoder的封装,如果须要对MessageToMessageCodec进行扩大的话,须要实现上面两个办法:

    protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out)            throws Exception;    protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out)            throws Exception;

总结

netty中提供的MessageToMessage的编码框架是前面对编码解码器进行扩大的根底。只有深刻理解其中的原理,咱们对于新的编码解码器使用起来能力得心应手。

本文已收录于 http://www.flydean.com/14-0-1-netty-codec-msg-to-msg/

最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

欢送关注我的公众号:「程序那些事」,懂技术,更懂你!