简介

咱们晓得由两种数据的传输方式,别离是字符流和字节流,字符流的意思是传输的对象就是字符串,格局曾经被设置好了,发送方和接管方依照特定的格局去读取就行了,而字节流是指将数据作为最原始的二进制字节来进行传输。

明天给大家介绍一下在netty中的基于流的数据传输。

package和byte

相熟TCP/IP协定的同学应该晓得,在TCP/IP中,因为底层协定有反对的数据包的最大值,所以对于大数据传输来说,须要对数据进行拆分和封包解决,并将这些拆分组装过的包进行发送,最初在接管方对这些包进行组合。在各个包中有固定的构造,所以接管方能够很分明的晓得到底应该组合多少个包作为最终的后果。

那么对于netty来说,channel中传输的是ByteBuf,实际上最最最底层的就是byte数组。对于这种byte数组来说,接管方并不知道到底应该组合多少个byte来合成原来的音讯,所以须要在接收端对收到的byte进行组合,从而生成最终的数据。

那么对于netty中的byte数据流应该怎么组合呢?咱们接下来看两种组合办法。

手动组合

这种组合的形式的基本思路是结构一个指标大小的ByteBuf,而后将接管到的byte通过调用ByteBuf的writeBytes办法写入到ByteBuf中。最初从ByteBuf中读取对应的数据。

比方咱们想从服务端发送一个int数字给客户端,一般来说int是32bits,而后一个byte是8bits,那么一个int就须要4个bytes组成。

在server端,能够建设一个byte的数组,数组中蕴含4个元素。将4个元素的byte发送给客户端,那么客户端该如何解决呢?

首先咱们须要建设一个clientHander,这个handler应该继承ChannelInboundHandlerAdapter,并且在其handler被增加到ChannelPipeline的时候初始化一个蕴含4个byte的byteBuf。

handler被增加的时候会触发一个handlerAdded事件,所以咱们能够这样写:

    private ByteBuf buf;        @Override    public void handlerAdded(ChannelHandlerContext ctx) {        //创立一个4个byte的缓冲器        buf = ctx.alloc().buffer(4);     }

上例中,咱们从ctx调配了一个4个字节的缓冲器,并将其赋值给handler中的公有变量buf。

当handler执行结束,从ChannelPipeline中删除的时候,会触发handlerRemoved事件,在这个事件中,咱们能够对调配的Bytebuf进行清理,通常来说,能够调用其release办法,如下所示:

    public void handlerRemoved(ChannelHandlerContext ctx) {        buf.release(); // 开释buf        buf = null;    }

而后最要害的一步就是从channel中读取byte并将其放到4个字节的byteBuf中。在之前的文章中咱们提到了,能够在channelRead办法中,解决音讯读取的逻辑。

    public void channelRead(ChannelHandlerContext ctx, Object msg) {        ByteBuf m = (ByteBuf) msg;        buf.writeBytes(m); // 写入一个byte        m.release();                if (buf.readableBytes() >= 4) { // 曾经凑够4个byte,将4个byte组合称为一个int            long result = buf.readUnsignedInt();            ctx.close();        }    }

每次触发channelRead办法,都会将读取到的一个字节的byte通过调用writeBytes办法写入buf中。当buf的可读byte大于等于4个的时候就阐明4个字节曾经读满了,能够对其进行操作了。

这里咱们将4个字节组合成一个unsignedInt,并应用readUnsignedInt办法从buf中读取进去组合称为一个int数字。

下面的例子尽管能够解决4个字节的byte问题,然而如果数据结构再负责一点,下面的形式就会力不从心,须要思考太多的数据组合问题。接下来咱们看另外一种形式。

Byte的转换类

netty提供了一个ByteToMessageDecoder的转换类,能够不便的对Byte转换为其余的类型。

咱们只须要从新其中的decode办法,就能够实现对ByteBuf的转换:

       public class SquareDecoder extends ByteToMessageDecoder {            @Override           public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)                   throws Exception {               out.add(in.readBytes(in.readableBytes()));           }       }

下面的例子将byte从input转换到output中,当然,你还能够在下面的办法中进行格局转换,如下所示:

public class TimeDecoder extends ByteToMessageDecoder {     @Override    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {         if (in.readableBytes() < 4) {            return;         }                out.add(in.readBytes(4));     }}

下面的例子会先判断in中是否有4个byte,如果有就将其读出来放到out中去。那么有同学会问了,输出不是一个byte一个byte来的吗?为什么这里能够一次读取到4个byte?这是因为ByteToMessageDecoder内置了一个缓存安装,所以这里的in实际上是一个缓存汇合。

ReplayingDecoder

netty还提供了一个更简略的转换ReplayingDecoder,如果应用ReplayingDecoder从新下面的逻辑就是这样的:

public class TimeDecoder extends ReplayingDecoder<Void> {    @Override    protected void decode(            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {        out.add(in.readBytes(4));    }}

只须要一行代码即可。

事实上ReplayingDecoder 是ByteToMessageDecoder 的子类,是在ByteToMessageDecoder上丰盛了一些性能的后果。

他们两的区别在于ByteToMessageDecoder 还须要通过调用readableBytes来判断是否有足够的能够读byte,而应用ReplayingDecoder间接读取即可,它假如的是所有的bytes都曾经承受胜利了。

比方上面应用ByteToMessageDecoder的代码:

   public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {        @Override     protected void decode(ChannelHandlerContext ctx,                             ByteBuf buf, List<Object> out) throws Exception {         if (buf.readableBytes() < 4) {          return;       }         buf.markReaderIndex();       int length = buf.readInt();         if (buf.readableBytes() < length) {          buf.resetReaderIndex();          return;       }         out.add(buf.readBytes(length));     }   }   

上例假设在byte的头部是一个int大小的数组,代表着byte数组的长度,须要先读取int值,而后再依据int值来读取对应的byte数据。

和上面的代码是等价的:

   public class IntegerHeaderFrameDecoder        extends ReplayingDecoder<Void> {       protected void decode(ChannelHandlerContext ctx,                             ByteBuf buf, List<Object> out) throws Exception {         out.add(buf.readBytes(buf.readInt()));     }   }   

下面代码少了判断的步骤。

那么这是怎么实现的呢?

事实上ReplayingDecoder 会传递一个会抛出 Error的 ByteBuf , 当 ByteBuf 读取的byte个数不满足要求的时候,会抛出异样,当ReplayingDecoder 捕捉到这个异样之后,会重置buffer的readerIndex到最后的状态,而后期待后续的数据进来,而后再次调用decode办法。

所以,ReplayingDecoder的效率会比拟低,为了解决这个问题,netty提供了checkpoint() 办法。这是一个保留点,当报错的时候,能够不会退到最后的状态,而是回退到checkpoint() 调用时候保留的状态,从而能够缩小不必要的节约。

总结

本文介绍了在netty中进行stream操作和变换的几种形式,心愿大家可能喜爱。

本文已收录于 http://www.flydean.com/07-netty-stream-based-transport/

最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

欢送关注我的公众号:「程序那些事」,懂技术,更懂你!