关于java:netty系列之netty中的核心MessageToMessage编码器

40次阅读

共计 6512 个字符,预计需要花费 17 分钟才能阅读完成。

简介

在 netty 中咱们须要传递各种类型的音讯,这些 message 能够是字符串,能够是数组,也能够是自定义的对象。不同的对象之间可能须要相互转换,这样就须要一个能够自在进行转换的转换器,为了对立编码规定和不便用户的扩大,netty 提供了一套音讯之间进行转换的框架。本文将会解说这个框架的具体实现。

框架简介

netty 为音讯和音讯之间的转换提供了三个类,这三个类都是抽象类,别离是 MessageToMessageDecoder,MessageToMessageEncoder 和 MessageToMessageCodec。

先来看下他们的定义:

public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter
public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler 

MessageToMessageEncoder 继承自 ChannelOutboundHandlerAdapter,负责向 channel 中写音讯。

MessageToMessageDecoder 继承自 ChannelInboundHandlerAdapter,负责从 channel 中读取音讯。

MessageToMessageCodec 继承自 ChannelDuplexHandler,它是一个双向的 handler,能够从 channel 中读取音讯,也能够向 channel 中写入音讯。

有了这三个抽象类,咱们再看下这三个类的具体实现。

MessageToMessageEncoder

先看一下音讯的编码器 MessageToMessageEncoder,编码器中最重要的办法就是 write, 看下 write 的实现:

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        CodecOutputList out = null;
        try {if (acceptOutboundMessage(msg)) {out = CodecOutputList.newInstance();
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {encode(ctx, cast, out);
                } finally {ReferenceCountUtil.release(cast);
                }

                if (out.isEmpty()) {
                    throw new EncoderException(StringUtil.simpleClassName(this) + "must produce at least one message.");
                }
            } else {ctx.write(msg, promise);
            }
        } catch (EncoderException e) {throw e;} catch (Throwable t) {throw new EncoderException(t);
        } finally {if (out != null) {
                try {final int sizeMinusOne = out.size() - 1;
                    if (sizeMinusOne == 0) {ctx.write(out.getUnsafe(0), promise);
                    } else if (sizeMinusOne > 0) {if (promise == ctx.voidPromise()) {writeVoidPromise(ctx, out);
                        } else {writePromiseCombiner(ctx, out, promise);
                        }
                    }
                } finally {out.recycle();
                }
            }
        }
    }

write 办法承受一个须要转换的原始对象 msg,和一个示意 channel 读写进度的 ChannelPromise。

首先会对 msg 进行一个类型判断,这个判断办法是在 acceptOutboundMessage 中实现的。

    public boolean acceptOutboundMessage(Object msg) throws Exception {return matcher.match(msg);
    }

这里的 matcher 是一个 TypeParameterMatcher 对象,它是一个在 MessageToMessageEncoder 构造函数中初始化的属性:

    protected MessageToMessageEncoder() {matcher = TypeParameterMatcher.find(this, MessageToMessageEncoder.class, "I");
    }

这里的 I 就是要匹配的 msg 类型。

如果不匹配,则持续调用 ctx.write(msg, promise); 将音讯不做任何转换的写入到 channel 中,供下一个 handler 调用。

如果匹配胜利,则会调用外围的 encode 办法:encode(ctx, cast, out);

留神,encode 办法在 MessageToMessageEncoder 中是一个形象办法,须要用户在继承类中自行扩大。

encode 办法实际上是将 msg 对象转换成为要转换的对象,而后增加到 out 中。这个 out 是一个 list 对象,具体而言是一个 CodecOutputList 对象,作为一个 list,out 是一个能够存储多个对象的列表。

那么 out 是什么时候写入到 channel 中去的呢?

别急,在 write 办法中最初有一个 finally 代码块,在这个代码块中,会将 out 写入到 channel 外面。

因为 out 是一个 List,可能会呈现 out 中的对象局部写胜利的状况,所以这里须要特地解决。

首先判断 out 中是否只有一个对象,如果是一个对象,那么间接写到 channel 中即可。如果 out 中多于一个对象,那么又分成两种状况,第一种状况是传入的 promise 是一个 voidPromise,那么调用 writeVoidPromise 办法。

什么是 voidPromise 呢?

咱们晓得 Promise 有多种状态,能够通过 promise 的状态变动理解到数据写入的状况。对于 voidPromise 来说,它只关怀一种失败的状态,其余的状态都不关怀。

如果用户关怀 promise 的其余状态,则会调用 writePromiseCombiner 办法,将多个对象的状态合并为一个 promise 返回。

事实上,在 writeVoidPromise 和 writePromiseCombiner 中,out 中的对象都是一个一个的取出来,写入到 channel 中的, 所以才会生成多个 promise 和须要将 promise 进行合并的状况:

    private static void writeVoidPromise(ChannelHandlerContext ctx, CodecOutputList out) {final ChannelPromise voidPromise = ctx.voidPromise();
        for (int i = 0; i < out.size(); i++) {ctx.write(out.getUnsafe(i), voidPromise);
        }
    }

    private static void writePromiseCombiner(ChannelHandlerContext ctx, CodecOutputList out, ChannelPromise promise) {final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
        for (int i = 0; i < out.size(); i++) {combiner.add(ctx.write(out.getUnsafe(i)));
        }
        combiner.finish(promise);
    }

MessageToMessageDecoder

和 encoder 对应的就是 decoder 了,MessageToMessageDecoder 的逻辑和 MessageToMessageEncoder 差不多。

首先也是须要判断读取的音讯类型,这里也定义了一个 TypeParameterMatcher 对象,用来检测传入的音讯类型:

    protected MessageToMessageDecoder() {matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I");
    }

decoder 中重要的办法是 channelRead 办法,咱们看下它的实现:

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {CodecOutputList out = CodecOutputList.newInstance();
        try {if (acceptInboundMessage(msg)) {@SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {decode(ctx, cast, out);
                } finally {ReferenceCountUtil.release(cast);
                }
            } else {out.add(msg);
            }
        } catch (DecoderException e) {throw e;} catch (Exception e) {throw new DecoderException(e);
        } finally {
            try {int size = out.size();
                for (int i = 0; i < size; i++) {ctx.fireChannelRead(out.getUnsafe(i));
                }
            } finally {out.recycle();
            }
        }
    }

首先检测 msg 的类型,只有承受的类型才进行 decode 解决,否则将 msg 退出到 CodecOutputList 中。

最初在 finally 代码块中将 out 中的对象一个个取出来,调用 ctx.fireChannelRead 进行读取。

音讯转换的要害办法是 decode,这个办法也是一个形象办法,须要在继承类中实现具体的性能。

MessageToMessageCodec

后面解说了一个编码器和一个解码器,他们都是单向的。最初要解说的 codec 叫做 MessageToMessageCodec,这个 codec 是一个双向的,即能够接管音讯,也能够发送音讯。

先看下它的定义:

public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler

MessageToMessageCodec 继承自 ChannelDuplexHandler,接管两个泛型参数别离是 INBOUND_IN 和 OUTBOUND_IN。

它定义了两个 TypeParameterMatcher,别离用来过滤 inboundMsg 和 outboundMsg:

    protected MessageToMessageCodec() {inboundMsgMatcher = TypeParameterMatcher.find(this, MessageToMessageCodec.class, "INBOUND_IN");
        outboundMsgMatcher = TypeParameterMatcher.find(this, MessageToMessageCodec.class, "OUTBOUND_IN");
    }

别离实现了 channelRead 和 write 办法,用来读写音讯:

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {decoder.channelRead(ctx, msg);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {encoder.write(ctx, msg, promise);
    }

这里的 decoder 和 encoder 实际上就是后面咱们讲到的 MessageToMessageDecoder 和 MessageToMessageEncoder:

    private final MessageToMessageEncoder<Object> encoder = new MessageToMessageEncoder<Object>() {

        @Override
        public boolean acceptOutboundMessage(Object msg) throws Exception {return MessageToMessageCodec.this.acceptOutboundMessage(msg);
        }

        @Override
        @SuppressWarnings("unchecked")
        protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {MessageToMessageCodec.this.encode(ctx, (OUTBOUND_IN) msg, out);
        }
    };

    private final MessageToMessageDecoder<Object> decoder = new MessageToMessageDecoder<Object>() {

        @Override
        public boolean acceptInboundMessage(Object msg) throws Exception {return MessageToMessageCodec.this.acceptInboundMessage(msg);
        }

        @Override
        @SuppressWarnings("unchecked")
        protected void decode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {MessageToMessageCodec.this.decode(ctx, (INBOUND_IN) msg, out);
        }
    };

能够看到 MessageToMessageCodec 实际上就是对 MessageToMessageDecoder 和 MessageToMessageEncoder 的封装,如果须要对 MessageToMessageCodec 进行扩大的话,须要实现上面两个办法:

    protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out)
            throws Exception;

    protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out)
            throws Exception;

总结

netty 中提供的 MessageToMessage 的编码框架是前面对编码解码器进行扩大的根底。只有深刻理解其中的原理,咱们对于新的编码解码器使用起来能力得心应手。

本文已收录于 http://www.flydean.com/14-0-1-netty-codec-msg-to-msg/

最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

欢送关注我的公众号:「程序那些事」, 懂技术,更懂你!

正文完
 0