简介

netty提供了一个从ByteBuf到用户自定义的message的解码器叫做ByteToMessageDecoder,要应用这个decoder,咱们须要继承这个decoder,并实现decode办法,从而在这个办法中实现ByteBuf中的内容到用户自定义message对象的转换。

那么在应用ByteToMessageDecoder的过程中会遇到什么问题呢?为什么又会有一个ReplayingDecoder呢?带着这个问题咱们一起来看看吧。

ByteToMessageDecoder可能遇到的问题

要想实现本人的解码器将ByteBuf转换成为本人的音讯对象,能够继承ByteToMessageDecoder,而后实现其中的decode办法即可,先来看下decode办法的定义:

     protected void decode(ChannelHandlerContext ctx,                             ByteBuf buf, List<Object> out) throws Exception 

输出的参数中buf是要解码的ByteBuf,out是解码过后的对象列表,咱们须要把ByteBuf中的数据转换成为咱们本人的对象退出out的list中。

那么这里可能会遇到一个问题,因为咱们在调用decode办法的时候buf中的数据可能还没有筹备好,比方咱们须要一个Integer,然而buf中的数据不够一个整数,那么就须要一些buf中数据逻辑的判断,咱们以一个带有音讯长度的Buf对象来形容一下这个过程。

所谓带有音讯长度的Buf对象,就是说Buf音讯中的前4位,形成了一个整数,这个整数示意的是buf中后续音讯的长度。

所以咱们读取音讯进行转换的流程是,先读取后面4个字节,失去音讯的长度,而后再读取该长度的字节,这就是咱们真正要获取的音讯内容。

来看一下如果是继承自ByteToMessageDecoder应该怎么实现这个逻辑呢?

   public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {        @Override     protected void decode(ChannelHandlerContext ctx,                             ByteBuf buf, List<Object> out) throws Exception {         if (buf.readableBytes() < 4) {          return;       }         buf.markReaderIndex();       int length = buf.readInt();         if (buf.readableBytes() < length) {          buf.resetReaderIndex();          return;       }         out.add(buf.readBytes(length));     }   }

在decode中,咱们首先须要判断buf中可读的字节有没有4个,没有的话间接返回。如果有,则先读取这4个字节的长度,而后再判断buf中的可读字节是否小于应该读取的长度,如果小于,则阐明数据还没有筹备好,须要调用resetReaderIndex进行重置。

最初,如果所有的条件都满足,才真正进行读取工作。

有没有一个方法能够不提前进行判断,能够间接依照本人想要的内容来读取buf的形式呢?答案就是ReplayingDecoder。

咱们先来看一下下面的例子用ReplayingDecoder重写是什么状况:

   public class IntegerHeaderFrameDecoder        extends ReplayingDecoder<Void> {       protected void decode(ChannelHandlerContext ctx,                             ByteBuf buf, List<Object> out) throws Exception {         out.add(buf.readBytes(buf.readInt()));     }   }

应用ReplayingDecoder,咱们能够疏忽buf是否曾经接管到了足够的可读数据,间接读取即可。

相比之下ReplayingDecoder十分的简略。接下来,咱们来探索一下ReplayingDecoder的实现原理。

ReplayingDecoder的实现原理

ReplayingDecoder实际上是ByteToMessageDecoder的一个子类,它的定义如下:

public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder 

在ByteToMessageDecoder中,最重要的办法是channelRead,在这个办法中理论调用了callDecode(ctx, cumulation, out);来实现cumulation到out的解码过程。

ReplayingDecoder的机密就在于对这个办法的重写,咱们来看下这个办法的具体实现:

   protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {        replayable.setCumulation(in);        try {            while (in.isReadable()) {                int oldReaderIndex = checkpoint = in.readerIndex();                int outSize = out.size();                if (outSize > 0) {                    fireChannelRead(ctx, out, outSize);                    out.clear();                    if (ctx.isRemoved()) {                        break;                    }                    outSize = 0;                }                S oldState = state;                int oldInputLength = in.readableBytes();                try {                    decodeRemovalReentryProtection(ctx, replayable, out);                    if (ctx.isRemoved()) {                        break;                    }                    if (outSize == out.size()) {                        if (oldInputLength == in.readableBytes() && oldState == state) {                            throw new DecoderException(                                    StringUtil.simpleClassName(getClass()) + ".decode() must consume the inbound " +                                    "data or change its state if it did not decode anything.");                        } else {                            continue;                        }                    }                } catch (Signal replay) {                    replay.expect(REPLAY);                    if (ctx.isRemoved()) {                        break;                    }                    // Return to the checkpoint (or oldPosition) and retry.                    int checkpoint = this.checkpoint;                    if (checkpoint >= 0) {                        in.readerIndex(checkpoint);                    } else {                    }                    break;                }                if (oldReaderIndex == in.readerIndex() && oldState == state) {                    throw new DecoderException(                           StringUtil.simpleClassName(getClass()) + ".decode() method must consume the inbound data " +                           "or change its state if it decoded something.");                }                if (isSingleDecode()) {                    break;                }            }        } catch (DecoderException e) {            throw e;        } catch (Exception cause) {            throw new DecoderException(cause);        }    }

这里的实现和ByteToMessageDecoder不同的是ReplayingDecoder中定义了一个checkpoint,这个checkpint是在尝试进行数据解码之初设置的:

int oldReaderIndex = checkpoint = in.readerIndex();

如果是在解码的过程中呈现了异样,则应用checkpoint重置index:

    int checkpoint = this.checkpoint;         if (checkpoint >= 0) {            in.readerIndex(checkpoint);        } else {    }

这里捕捉的异样是Signal,Signal是什么呢?

Signal是一个Error对象:

public final class Signal extends Error implements Constant<Signal> 

这个异样是从replayable中抛出来的。

replayable是一个特有的ByteBuf对象,叫做ReplayingDecoderByteBuf:

final class ReplayingDecoderByteBuf extends ByteBuf

在ReplayingDecoderByteBuf中定义了Signal属性:

    private static final Signal REPLAY = ReplayingDecoder.REPLAY;

这个Signal异样是从ReplayingDecoderByteBuf中的get办法中抛出的,这里以getInt为例,看一下异样是如何抛出的:

    public int getInt(int index) {        checkIndex(index, 4);        return buffer.getInt(index);    }

getInt办法首先会去调用checkIndex办法进行buff中的长度检测,如果小于要读取的长度,则会抛出异样REPLAY:

    private void checkIndex(int index, int length) {        if (index + length > buffer.writerIndex()) {            throw REPLAY;        }    }

这就是Signal异样的由来。

总结

以上就是对ReplayingDecoder的介绍,尽管ReplayingDecoder好用,然而从它的实现能够看出,ReplayingDecoder是通过抛出异样来一直的重试,所以在某些非凡的状况下会造成性能的降落。

也就是说在缩小咱们代码量的同时,升高了程序的执行效率。看来要想马儿跑又想马儿不吃草,这样的坏事是不可能的了。

本文已收录于 http://www.flydean.com/14-4-netty-replayingdecoder/

最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

欢送关注我的公众号:「程序那些事」,懂技术,更懂你!