关于java:netty系列之netty中的自动解码器ReplayingDecoder

39次阅读

共计 4343 个字符,预计需要花费 11 分钟才能阅读完成。

简介

netty 提供了一个从 ByteBuf 到用户自定义的 message 的解码器叫做 ByteToMessageDecoder, 要应用这个 decoder,咱们须要继承这个 decoder,并实现 decode 办法,从而在这个办法中实现 ByteBuf 中的内容到用户自定义 message 对象的转换。

那么在应用 ByteToMessageDecoder 的过程中会遇到什么问题呢?为什么又会有一个 ReplayingDecoder 呢?带着这个问题咱们一起来看看吧。

ByteToMessageDecoder 可能遇到的问题

要想实现本人的解码器将 ByteBuf 转换成为本人的音讯对象,能够继承 ByteToMessageDecoder,而后实现其中的 decode 办法即可,先来看下 decode 办法的定义:

     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception 

输出的参数中 buf 是要解码的 ByteBuf,out 是解码过后的对象列表,咱们须要把 ByteBuf 中的数据转换成为咱们本人的对象退出 out 的 list 中。

那么这里可能会遇到一个问题,因为咱们在调用 decode 办法的时候 buf 中的数据可能还没有筹备好,比方咱们须要一个 Integer,然而 buf 中的数据不够一个整数,那么就须要一些 buf 中数据逻辑的判断, 咱们以一个带有音讯长度的 Buf 对象来形容一下这个过程。

所谓带有音讯长度的 Buf 对象,就是说 Buf 音讯中的前 4 位,形成了一个整数,这个整数示意的是 buf 中后续音讯的长度。

所以咱们读取音讯进行转换的流程是,先读取后面 4 个字节,失去音讯的长度,而后再读取该长度的字节,这就是咱们真正要获取的音讯内容。

来看一下如果是继承自 ByteToMessageDecoder 应该怎么实现这个逻辑呢?

   public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {
  
      @Override
     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {if (buf.readableBytes() < 4) {return;}
  
       buf.markReaderIndex();
       int length = buf.readInt();
  
       if (buf.readableBytes() < length) {buf.resetReaderIndex();
          return;
       }
  
       out.add(buf.readBytes(length));
     }
   }

在 decode 中,咱们首先须要判断 buf 中可读的字节有没有 4 个,没有的话间接返回。如果有,则先读取这 4 个字节的长度,而后再判断 buf 中的可读字节是否小于应该读取的长度,如果小于,则阐明数据还没有筹备好,须要调用 resetReaderIndex 进行重置。

最初,如果所有的条件都满足,才真正进行读取工作。

有没有一个方法能够不提前进行判断,能够间接依照本人想要的内容来读取 buf 的形式呢?答案就是 ReplayingDecoder。

咱们先来看一下下面的例子用 ReplayingDecoder 重写是什么状况:

   public class IntegerHeaderFrameDecoder
        extends ReplayingDecoder<Void> {
  
     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {out.add(buf.readBytes(buf.readInt()));
     }
   }

应用 ReplayingDecoder,咱们能够疏忽 buf 是否曾经接管到了足够的可读数据,间接读取即可。

相比之下 ReplayingDecoder 十分的简略。接下来,咱们来探索一下 ReplayingDecoder 的实现原理。

ReplayingDecoder 的实现原理

ReplayingDecoder 实际上是 ByteToMessageDecoder 的一个子类,它的定义如下:

public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder 

在 ByteToMessageDecoder 中,最重要的办法是 channelRead, 在这个办法中理论调用了 callDecode(ctx, cumulation, out); 来实现 cumulation 到 out 的解码过程。

ReplayingDecoder 的机密就在于对这个办法的重写,咱们来看下这个办法的具体实现:

   protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {replayable.setCumulation(in);
        try {while (in.isReadable()) {int oldReaderIndex = checkpoint = in.readerIndex();
                int outSize = out.size();
                if (outSize > 0) {fireChannelRead(ctx, out, outSize);
                    out.clear();
                    if (ctx.isRemoved()) {break;}
                    outSize = 0;
                }
                S oldState = state;
                int oldInputLength = in.readableBytes();
                try {decodeRemovalReentryProtection(ctx, replayable, out);
                    if (ctx.isRemoved()) {break;}
                    if (outSize == out.size()) {if (oldInputLength == in.readableBytes() && oldState == state) {
                            throw new DecoderException(StringUtil.simpleClassName(getClass()) + ".decode() must consume the inbound" +
                                    "data or change its state if it did not decode anything.");
                        } else {continue;}
                    }
                } catch (Signal replay) {replay.expect(REPLAY);
                    if (ctx.isRemoved()) {break;}

                    // Return to the checkpoint (or oldPosition) and retry.
                    int checkpoint = this.checkpoint;
                    if (checkpoint >= 0) {in.readerIndex(checkpoint);
                    } else { }
                    break;
                }
                if (oldReaderIndex == in.readerIndex() && oldState == state) {
                    throw new DecoderException(StringUtil.simpleClassName(getClass()) + ".decode() method must consume the inbound data" +
                           "or change its state if it decoded something.");
                }
                if (isSingleDecode()) {break;}
            }
        } catch (DecoderException e) {throw e;} catch (Exception cause) {throw new DecoderException(cause);
        }
    }

这里的实现和 ByteToMessageDecoder 不同的是 ReplayingDecoder 中定义了一个 checkpoint, 这个 checkpint 是在尝试进行数据解码之初设置的:

int oldReaderIndex = checkpoint = in.readerIndex();

如果是在解码的过程中呈现了异样,则应用 checkpoint 重置 index:

    int checkpoint = this.checkpoint;
         if (checkpoint >= 0) {in.readerIndex(checkpoint);
        } else {}

这里捕捉的异样是 Signal,Signal 是什么呢?

Signal 是一个 Error 对象:

public final class Signal extends Error implements Constant<Signal> 

这个异样是从 replayable 中抛出来的。

replayable 是一个特有的 ByteBuf 对象,叫做 ReplayingDecoderByteBuf:

final class ReplayingDecoderByteBuf extends ByteBuf

在 ReplayingDecoderByteBuf 中定义了 Signal 属性:

    private static final Signal REPLAY = ReplayingDecoder.REPLAY;

这个 Signal 异样是从 ReplayingDecoderByteBuf 中的 get 办法中抛出的,这里以 getInt 为例, 看一下异样是如何抛出的:

    public int getInt(int index) {checkIndex(index, 4);
        return buffer.getInt(index);
    }

getInt 办法首先会去调用 checkIndex 办法进行 buff 中的长度检测,如果小于要读取的长度,则会抛出异样 REPLAY:

    private void checkIndex(int index, int length) {if (index + length > buffer.writerIndex()) {throw REPLAY;}
    }

这就是 Signal 异样的由来。

总结

以上就是对 ReplayingDecoder 的介绍,尽管 ReplayingDecoder 好用,然而从它的实现能够看出,ReplayingDecoder 是通过抛出异样来一直的重试,所以在某些非凡的状况下会造成性能的降落。

也就是说在缩小咱们代码量的同时,升高了程序的执行效率。看来要想马儿跑又想马儿不吃草,这样的坏事是不可能的了。

本文已收录于 http://www.flydean.com/14-4-netty-replayingdecoder/

最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

欢送关注我的公众号:「程序那些事」, 懂技术,更懂你!

正文完
 0