乐趣区

关于netty:netty系列之基于流的数据传输

简介

咱们晓得由两种数据的传输方式,别离是字符流和字节流,字符流的意思是传输的对象就是字符串,格局曾经被设置好了,发送方和接管方依照特定的格局去读取就行了,而字节流是指将数据作为最原始的二进制字节来进行传输。

明天给大家介绍一下在 netty 中的基于流的数据传输。

package 和 byte

相熟 TCP/IP 协定的同学应该晓得,在 TCP/IP 中,因为底层协定有反对的数据包的最大值,所以对于大数据传输来说,须要对数据进行拆分和封包解决,并将这些拆分组装过的包进行发送,最初在接管方对这些包进行组合。在各个包中有固定的构造,所以接管方能够很分明的晓得到底应该组合多少个包作为最终的后果。

那么对于 netty 来说,channel 中传输的是 ByteBuf,实际上最最最底层的就是 byte 数组。对于这种 byte 数组来说,接管方并不知道到底应该组合多少个 byte 来合成原来的音讯,所以须要在接收端对收到的 byte 进行组合,从而生成最终的数据。

那么对于 netty 中的 byte 数据流应该怎么组合呢?咱们接下来看两种组合办法。

手动组合

这种组合的形式的基本思路是结构一个指标大小的 ByteBuf,而后将接管到的 byte 通过调用 ByteBuf 的 writeBytes 办法写入到 ByteBuf 中。最初从 ByteBuf 中读取对应的数据。

比方咱们想从服务端发送一个 int 数字给客户端,一般来说 int 是 32bits, 而后一个 byte 是 8bits,那么一个 int 就须要 4 个 bytes 组成。

在 server 端,能够建设一个 byte 的数组,数组中蕴含 4 个元素。将 4 个元素的 byte 发送给客户端,那么客户端该如何解决呢?

首先咱们须要建设一个 clientHander,这个 handler 应该继承 ChannelInboundHandlerAdapter,并且在其 handler 被增加到 ChannelPipeline 的时候初始化一个蕴含 4 个 byte 的 byteBuf。

handler 被增加的时候会触发一个 handlerAdded 事件,所以咱们能够这样写:

    private ByteBuf buf;
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        // 创立一个 4 个 byte 的缓冲器
        buf = ctx.alloc().buffer(4); 
    }

上例中,咱们从 ctx 调配了一个 4 个字节的缓冲器,并将其赋值给 handler 中的公有变量 buf。

当 handler 执行结束,从 ChannelPipeline 中删除的时候,会触发 handlerRemoved 事件,在这个事件中,咱们能够对调配的 Bytebuf 进行清理,通常来说,能够调用其 release 办法,如下所示:

    public void handlerRemoved(ChannelHandlerContext ctx) {buf.release(); // 开释 buf
        buf = null;
    }

而后最要害的一步就是从 channel 中读取 byte 并将其放到 4 个字节的 byteBuf 中。在之前的文章中咱们提到了,能够在 channelRead 办法中,解决音讯读取的逻辑。

    public void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // 写入一个 byte
        m.release();
        
        if (buf.readableBytes() >= 4) { // 曾经凑够 4 个 byte,将 4 个 byte 组合称为一个 int
            long result = buf.readUnsignedInt();
            ctx.close();}
    }

每次触发 channelRead 办法,都会将读取到的一个字节的 byte 通过调用 writeBytes 办法写入 buf 中。当 buf 的可读 byte 大于等于 4 个的时候就阐明 4 个字节曾经读满了,能够对其进行操作了。

这里咱们将 4 个字节组合成一个 unsignedInt,并应用 readUnsignedInt 办法从 buf 中读取进去组合称为一个 int 数字。

下面的例子尽管能够解决 4 个字节的 byte 问题,然而如果数据结构再负责一点,下面的形式就会力不从心,须要思考太多的数据组合问题。接下来咱们看另外一种形式。

Byte 的转换类

netty 提供了一个 ByteToMessageDecoder 的转换类,能够不便的对 Byte 转换为其余的类型。

咱们只须要从新其中的 decode 办法,就能够实现对 ByteBuf 的转换:

       public class SquareDecoder extends ByteToMessageDecoder {
            @Override
           public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
                   throws Exception {out.add(in.readBytes(in.readableBytes()));
           }
       }

下面的例子将 byte 从 input 转换到 output 中,当然,你还能够在下面的办法中进行格局转换,如下所示:

public class TimeDecoder extends ByteToMessageDecoder { 
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {if (in.readableBytes() < 4) {return;}
        
        out.add(in.readBytes(4)); 
    }
}

下面的例子会先判断 in 中是否有 4 个 byte,如果有就将其读出来放到 out 中去。那么有同学会问了,输出不是一个 byte 一个 byte 来的吗?为什么这里能够一次读取到 4 个 byte?这是因为 ByteToMessageDecoder 内置了一个缓存安装,所以这里的 in 实际上是一个缓存汇合。

ReplayingDecoder

netty 还提供了一个更简略的转换 ReplayingDecoder,如果应用 ReplayingDecoder 从新下面的逻辑就是这样的:

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {out.add(in.readBytes(4));
    }
}

只须要一行代码即可。

事实上 ReplayingDecoder 是 ByteToMessageDecoder 的子类,是在 ByteToMessageDecoder 上丰盛了一些性能的后果。

他们两的区别在于 ByteToMessageDecoder 还须要通过调用 readableBytes 来判断是否有足够的能够读 byte,而应用 ReplayingDecoder 间接读取即可,它假如的是所有的 bytes 都曾经承受胜利了。

比方上面应用 ByteToMessageDecoder 的代码:

   public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {
  
      @Override
     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {if (buf.readableBytes() < 4) {return;}
  
       buf.markReaderIndex();
       int length = buf.readInt();
  
       if (buf.readableBytes() < length) {buf.resetReaderIndex();
          return;
       }
  
       out.add(buf.readBytes(length));
     }
   }
   

上例假设在 byte 的头部是一个 int 大小的数组,代表着 byte 数组的长度,须要先读取 int 值,而后再依据 int 值来读取对应的 byte 数据。

和上面的代码是等价的:

   public class IntegerHeaderFrameDecoder
        extends ReplayingDecoder<Void> {
  
     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {out.add(buf.readBytes(buf.readInt()));
     }
   }
   

下面代码少了判断的步骤。

那么这是怎么实现的呢?

事实上 ReplayingDecoder 会传递一个会抛出 Error 的 ByteBuf,当 ByteBuf 读取的 byte 个数不满足要求的时候,会抛出异样,当 ReplayingDecoder 捕捉到这个异样之后,会重置 buffer 的 readerIndex 到最后的状态,而后期待后续的数据进来,而后再次调用 decode 办法。

所以,ReplayingDecoder 的效率会比拟低,为了解决这个问题,netty 提供了 checkpoint() 办法。这是一个保留点,当报错的时候,能够不会退到最后的状态,而是回退到 checkpoint() 调用时候保留的状态,从而能够缩小不必要的节约。

总结

本文介绍了在 netty 中进行 stream 操作和变换的几种形式,心愿大家可能喜爱。

本文已收录于 http://www.flydean.com/07-netty-stream-based-transport/

最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

欢送关注我的公众号:「程序那些事」, 懂技术,更懂你!

退出移动版