关于java:通过大量实战案例分解Netty中是如何解决拆包黏包问题的

22次阅读

共计 11771 个字符,预计需要花费 30 分钟才能阅读完成。

TCP 传输协定是基于数据流传输的,而基于流化的数据是没有界线的,当客户端向服务端发送数据时,可能会把一个残缺的数据报文拆分成多个小报文进行发送,也可能将多个报文合并成一个大报文进行发送。

在这样的状况下,有可能会呈现图 3 - 1 所示的状况。

  • 服务端凑巧读到了两个残缺的数据包 A 和 B,没有呈现拆包 / 粘包问题;
  • 服务端接管到 A 和 B 粘在一起的数据包,服务端须要解析出 A 和 B;
  • 服务端收到残缺的 A 和 B 的一部分数据包 B-1,服务端须要解析出残缺的 A,并期待读取残缺的 B 数据包;
  • 服务端接管到 A 的一部分数据包 A-1,此时须要期待接管到残缺的 A 数据包;
  • 数据包 A 较大,服务端须要屡次才能够接管完数据包 A。

<center> 图 3 -1 粘包和拆包问题 </center>

因为存在拆包 / 粘包问题,接管方很难界定数据包的边界在哪里,所以可能会读取到不残缺的数据导致数据解析呈现问题。

拆包粘包问题实战

上面演示一个拆包粘包问题

PackageNettyServer

public class PackageNettyServer {public static void main(String[] args) {EventLoopGroup bossGroup=new NioEventLoopGroup();
        EventLoopGroup workGroup=new NioEventLoopGroup();
        try{ServerBootstrap serverBootstrap=new ServerBootstrap();
            serverBootstrap.group(bossGroup,workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new SimpleServerHandler());
                        }
                    });
            ChannelFuture channelFuture=serverBootstrap.bind(8080).sync(); // 绑定端口
            channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();
        } finally {bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();}
    }
}

SimpleServerHandler

public class SimpleServerHandler extends ChannelInboundHandlerAdapter {
    private int count;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf in=(ByteBuf) msg;
        byte[] buffer=new byte[in.readableBytes()]; // 长度为可读的字节数
        in.readBytes(buffer); // 读取到字节数组中
        String message=new String (buffer,"UTF-8");
        System.out.println("服务端收到的音讯内容:"+message+"\n 服务端收到的音讯数量"+(++count));
        ByteBuf resBB= Unpooled.copiedBuffer(UUID.randomUUID().toString(), Charset.forName("utf-8"));
        ctx.writeAndFlush(resBB);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();// 敞开连贯
    }
}

PackageNettyClient

public class PackageNettyClient {public static void main(String[] args) {EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
        try {Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new SimpleClientHandler());
                    }
                });
            ChannelFuture channelFuture=bootstrap.connect("localhost",8080).sync();
            channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();
        } finally {eventLoopGroup.shutdownGracefully();
        }
    }
}

SimpleClientHandler

public class SimpleClientHandler extends ChannelInboundHandlerAdapter {

    private int count;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("客户端和服务端胜利建设连贯");
        // 客户端和服务端建设连贯后,发送十次音讯给服务端
        for (int i = 0; i < 10; i++) {ByteBuf buf= Unpooled.copiedBuffer("客户端音讯"+i, Charset.forName("utf-8"));
            ctx.writeAndFlush(buf);
        }
        super.channelActive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 接管服务端发过来的音讯
        System.out.println("接管到服务端返回的信息");
        ByteBuf buf=(ByteBuf)msg;
        byte[] buffer=new byte[buf.readableBytes()];
        buf.readBytes(buffer);
        String message=new String(buffer,Charset.forName("utf-8"));
        System.out.println("客户端收到的音讯内容为:"+message);
        System.out.println("客户端收到的音讯数量为:"+(++count));
        super.channelRead(ctx, msg);
    }
}

运行上述案例后,会呈现粘包和拆包问题。

应用层定义通信协议

如何解决拆包和粘包问题呢?

个别咱们会在应用层定义通信协议。其实思维也很简略,就是通信单方约定一个通信报文协定,服务端收到报文之后,依照约定的协定进行解码,从而避免出现粘包和拆包问题。

其实大家把这个问题往深度思考一下就不难发现,之所以在拆包粘包之后导致收到音讯端的内容解析呈现谬误,是因为程序无奈辨认一个残缺音讯,也就是不晓得如何把拆包之后的音讯组合成一个残缺音讯,以及将粘包的数据依照某个规定拆分造成多个残缺音讯。所以基于这个角度思考,咱们只须要针对音讯做一个通信单方约定的辨认规定即可。

音讯长度固定

每个数据报文都须要一个固定的长度,当接管方累计读取到固定长度的报文后,就认为曾经取得了一个残缺的音讯,当发送方的数据小于固定长度时,则须要空位补齐.

如图 3 - 2 所示,假如咱们固定音讯长度是 4,那么没有达到长度的报文,须要通过一个空位来补齐,从而使得音讯可能造成一个整体。

<center> 图 3 -2</center>

这种形式很简略,然而毛病也很显著,对于没有固定长度的音讯,不分明如何设置长度,而且如果长度设置过大会造成字节节约,长度太小又会影响音讯传输,所以个别状况下不会采纳这种形式。

特定分隔符

既然没方法通过固定长度来宰割音讯,那能不能在消息报文中减少一个宰割符呢?而后接管方依据特定的分隔符来进行音讯拆分。比方咱们采纳 \r\n 来进行宰割,如图 3 - 3 所示。

<center> 图 3 -3</center>

对于特定分隔符的应用场景中,须要留神分隔符和音讯体中的字符不要存在抵触,否则会呈现音讯拆分谬误的问题。

音讯长度加音讯内容加分隔符

基于音讯长度 + 音讯内容 + 分隔符的形式进行数据通信,这个之前大家在 Redis 中学习过,redis 的报文协定定义如下。

*3\r\n$3\r\nSET\r\n$4\r\nname\r\n$3\r\nmic

能够发现消息报文蕴含三个维度

  • 音讯长度
  • 音讯分隔符
  • 音讯内容

这种形式在我的项目中是十分常见的协定,首先通过音讯头中的总长度来判断以后一个残缺音讯所携带的参数个数。而后在音讯体中,再通过音讯内容长度以及音讯体作为一个组合,最初通过 \r\n 进行宰割。服务端收到这个音讯后,就能够依照该规定进行解析失去一个残缺的命令进行执行。

Zookeeper 中的音讯协定

在 Zookeeper 中应用了 Jute 协定,这是 zookeeper 自定义音讯协定,申请协定定义如图 3 - 4 所示。

xid 用于记录客户端申请发动的先后序号,用来确保单个客户端申请的响应程序。type 代表申请的操作类型,常见的包含创立节点、删除节点和获取节点数据等。
协定的申请体局部是指申请的主体内容局部,蕴含了申请的所有操作内容。不同的申请类型,其申请体局部的构造是不同的。

<img src=”https://mic-blob-bucket.oss-cn-beijing.aliyuncs.com/202111122058016.png” alt=”img” style=”zoom:80%;” />

<center> 图 3 -4</center>

响应协定定义如图 3 - 5 所示。

协定的响应头中的 xid 和上文中提到的申请头中的 xid 是统一的,响应中只是将申请中的 xid 原值返回。zxid 代表 ZooKeeper 服务器上以后最新的事务 ID。err 则是一个错误码,当申请处理过程中出现异常状况时,会在这个错误码中标识进去。协定的响应体局部是指响应的主体内容局部,蕴含了响应的所有返回数据。不同的响应类型,其响应体局部的构造是不同的。

<img src=”https://mic-blob-bucket.oss-cn-beijing.aliyuncs.com/202111122058905.png” alt=”img” style=”zoom: 67%;” />

<center> 图 3 -5</center>

Netty 中的编解码器

在 Netty 中,默认帮咱们提供了一些罕用的编解码器用来解决拆包粘包的问题。上面简略演示几种解码器的应用。

FixedLengthFrameDecoder 解码器

固定长度解码器 FixedLengthFrameDecoder 的原理很简略,就是通过构造方法设置一个固定音讯大小 frameLength,无论接管方一次收到多大的数据,都会严格依照 frameLength 进行解码。

如果累计读取的长度大小为 frameLength 的音讯,那么解码器会认为曾经获取到了一个残缺的音讯,如果音讯长度小于 frameLength,那么该解码器会始终期待后续数据包的达到,晓得取得指定长度后返回。

应用办法如下,在 3.3 节中演示的代码的 Server 端,减少一个 FixedLengthFrameDecoder,长度为 10。

ServerBootstrap serverBootstrap=new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {ch.pipeline()
                .addLast(new FixedLengthFrameDecoder(10)) // 减少解码器
                .addLast(new SimpleServerHandler());
        }
    });

DelimiterBasedFrameDecoder 解码器

非凡分隔符解码器:DelimiterBasedFrameDecoder,它有以下几个属性

  • delimiters,delimiters 指定非凡分隔符,参数类型是 ByteBuf,ByteBuf 能够传递一个数组,意味着咱们能够同时指定多个分隔符,但最终会抉择长度最短的分隔符进行拆分。

    比方接管方收到的音讯体为

    hello\nworld\r\n

    此时指定多个分隔符 \n\r\n,那么最终会抉择最短的分隔符解码,失去如下数据

    hello | world |

  • maxLength,示意报文的最大长度限度,如果超过 maxLength 还没检测到指定分隔符,将会抛出 TooLongFrameException。
  • failFast,示意容错机制,它与 maxLength 配合应用。如果 failFast=true,当超过 maxLength 后会立即抛出 TooLongFrameException,不再进行解码。如果 failFast=false,那么会等到解码出一个残缺的音讯后才会抛出 TooLongFrameException
  • stripDelimiter,它的作用是判断解码后的音讯是否去除分隔符,如果 stripDelimiter=false,而制订的特定分隔符是\n,那么数据解码的形式如下。

    hello\nworld\r\n

    当 stripDelimiter=false 时,解码后失去

    hello\n | world\r\n

DecoderNettyServer

上面演示一下 DelimiterBasedFrameDecoder 的用法。

public class DecoderNettyServer {public static void main(String[] args) {EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {ByteBuf delimiter= Unpooled.copiedBuffer("&".getBytes());
                            ch.pipeline()
                                    .addLast(new DelimiterBasedFrameDecoder(10,true,true,delimiter))
                                    .addLast(new PrintServerHandler());
                        }
                    });
            ChannelFuture channelFuture = serverBootstrap.bind(8080).sync(); // 绑定端口
            channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();
        } finally {}}
}

PrintServerHandler

定义一个一般的 Inbound,打印接管到的数据。

public class PrintServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf=(ByteBuf)msg;
        System.out.println("Receive client Msg:"+buf.toString(CharsetUtil.UTF_8));
    }
}

演示办法

  • 进入到 cmd 的命令窗口,执行 telnet localhost 8080 回车
  • 在 telnet 窗口按下 Ctrl+]组合键,进入到一个 telnet 界面
  • 在该界面持续按回车,进入到一个新的窗口,这个时候能够开始输出字符,此时的命令窗口会带有数据回写。
  • 开始输出字符 hello&world,就能够看到演示成果

LengthFieldBasedFrameDecoder 解码器

LengthFieldBasedFrameDecoder 是长度域解码器,它是解决拆包粘包最罕用的解码器,基本上能笼罩大部分基于长度拆包的场景。其中开源的消息中间件 RocketMQ 就是应用该解码器进行解码的。

首先来阐明一下该解码器的外围参数

  • lengthFieldOffset,长度字段的偏移量,也就是寄存长度数据的起始地位
  • lengthFieldLength,长度字段锁占用的字节数
  • lengthAdjustment,在一些较为简单的协定设计中,长度域不仅仅蕴含音讯的长度,还蕴含其余数据比方版本号、数据类型、数据状态等,这个时候咱们能够应用 lengthAdjustment 进行修改,它的值 = 包体的长度值 - 长度域的值
  • initialBytesToStrip,解码后须要跳过的初始字节数,也就是音讯内容字段的起始地位
  • lengthFieldEndOffset,长度字段完结的偏移量,该属性的值 =lengthFieldOffset+lengthFieldLength

下面这些参数了解起来比拟难,咱们通过几个案例来阐明一下。

音讯长度 + 音讯内容的解码

假如存在图 3 - 6 所示的由长度和音讯内容组成的数据包,其中 length 示意报文长度,用 16 进制示意,共占用 2 个字节,那么该协定对应的编解码器参数设置如下。

  • lengthFieldOffset=0,因为 Length 字段就在报文的开始地位
  • lengthFieldLength=2,协定设计的固定长度为 2 个字节
  • lengthAdjustment=0,Length 字段质保函音讯长度,不须要做修改
  • initialBytesToStrip=0,解码内容是 Length+content,不须要跳过任何初始字节。

<center> 图 3 -6</center>

截断解码后果

如果咱们心愿解码后的后果中只蕴含音讯内容,其余局部不变,如图 3 - 7 所示。对应解码器参数组合如下

  • lengthFieldOffset=0,因为 Length 字段就在报文开始地位
  • lengthFieldLength=2,协定设计的固定长度
  • lengthAdjustment=0,Length 字段只蕴含音讯长度,不须要做任何修改
  • initialBytesToStrip=2,跳过 length 字段的字节长度,解码后 ByteBuf 只蕴含 Content 字段。

<center> 图 3 -7</center>

长度字段蕴含音讯内容

如图 3 - 8 所示,如果 Length 字段中蕴含 Length 字段本身的长度以及 Content 字段所占用的字节数,那么 Length 的值为 0x00d(2+11=13 字节),在这种状况下解码器的参数组合如下

  • lengthFieldOffset=0,因为 Length 字段就在报文开始的地位
  • lengthFieldLength=2,协定设计的固定长度
  • lengthAdjustment=-2,长度字段为 13 字节,须要减 2 才是拆包所须要的长度。
  • initialBytesToStrip=0,解码后内容仍然是 Length+Content,不须要跳过任何初始字节

<center> 图 3 -8</center>

基于长度字段偏移的解码

如图 3 - 9 所示,Length 字段曾经不再是报文的起始地位,Length 字段的值是 0x000b,示意 content 字段占 11 个字节,那么此时解码器的参数配置如下:

  • lengthFieldOffset=2,须要跳过 Header 所占用的 2 个字节,才是 Length 的起始地位
  • lengthFieldLength=2,协定设计的固定长度
  • lengthAdjustment=0,Length 字段只蕴含音讯长度,不须要做任何修改
  • initialBytesToStrip=0,解码后内容仍然是 Length+Content,不须要跳过任何初始字节

<center> 图 3 -9</center>

基于长度偏移和长度修改解码

如图 3 -10 所示,Length 字段前后别离有 hdr1 和 hdr2 字段,各占据 1 个字节,所以须要做长度字段的便宜,还须要做 lengthAdjustment 的修改,相干参数配置如下。

  • lengthFieldOffset=1,须要跳过 hdr1 所占用的 1 个字节,才是 Length 的起始地位
  • lengthFieldLength=2,协定设计的固定长度
  • lengthAdjustment=1,因为 hdr2+content 一共占了 1 +11=12 字节,所以 Length 字段值 (11 字节) 加上 lengthAdjustment(1)能力失去 hdr2+Content 的内容(12 字节)
  • initialBytesToStrip=3,解码后跳过 hdr1 和 length 字段,共 3 个字节

<center> 图 3 -10</center>

解码器实战

比方咱们定义如下音讯头,客户端通过该音讯协定发送数据,服务端收到该音讯后须要进行解码

先定义客户端,其中 Length 局部,能够应用 Netty 自带的 LengthFieldPrepender 来实现,它能够计算以后发送音讯的二进制字节长度,而后把该长度增加到 ByteBuf 的缓冲区头中。

public class LengthFieldBasedFrameDecoderClient {public static void main(String[] args) {EventLoopGroup workGroup=new NioEventLoopGroup();
        Bootstrap b=new Bootstrap();
        b.group(workGroup)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {ch.pipeline()
                        // 如果协定中的第一个字段为长度字段,// netty 提供了 LengthFieldPrepender 编码器,// 它能够计算以后待发送音讯的二进制字节长度,将该长度增加到 ByteBuf 的缓冲区头中
                        .addLast(new LengthFieldPrepender(2,0,false))
                        // 应用 StringEncoder,在通过 writeAndFlush 时,不须要本人转化成 ByteBuf
                        //StringEncoder 会主动做这个事件
                        .addLast(new StringEncoder())
                        .addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush("i am request!");
                                ctx.writeAndFlush("i am a another request!");
                            }
                        });
                }
            });
        try {ChannelFuture channelFuture=b.connect("localhost",8080).sync();
            channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();
        }finally {workGroup.shutdownGracefully();
        }
    }
}

上述代码运行时,会失去两个报文。

上面是 Server 端的代码,减少了 LengthFieldBasedFrameDecoder 解码器,其中有两个参数的值如下

  • lengthFieldLength:2,示意 length 所占用的字节数为 2
  • initialBytesToStrip: 2,示意解码后跳过 length 的 2 个字节,失去 content 内容
public class LengthFieldBasedFrameDecoderServer {public static void main(String[] args) {EventLoopGroup bossGroup=new NioEventLoopGroup();
        EventLoopGroup workGroup=new NioEventLoopGroup();
        try{ServerBootstrap serverBootstrap=new ServerBootstrap();
            serverBootstrap.group(bossGroup,workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {ch.pipeline()
                                    .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,2,0,2))
                                    .addLast(new StringDecoder())
                                    .addLast(new ChannelInboundHandlerAdapter(){
                                        @Override
                                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("receive message:"+msg);
                                        }
                                    });
                        }
                    });
            ChannelFuture channelFuture=serverBootstrap.bind(8080).sync(); // 绑定端口
            channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();
        } finally {bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();}
    }
}

总结

后面咱们剖析的几个罕用解码器,只是帮咱们解决了半包和粘包的问题,最终会让接受者收到一个残缺无效的申请报文并且封装到 ByteBuf 中,而这个报文内容是否有其余的编码方式,比方序列化等,还须要独自进行解析解决。

另外,很多的中间件,都会定义本人的报文协定,这些报文协定除了自身解决粘包半包问题以外,还会传递一些其余有意义的数据,比方 zookeeper 的 jute、dubbo 框架的 dubbo 协定等。

版权申明:本博客所有文章除特地申明外,均采纳 CC BY-NC-SA 4.0 许可协定。转载请注明来自 Mic 带你学架构
如果本篇文章对您有帮忙,还请帮忙点个关注和赞,您的保持是我一直创作的能源。欢送关注同名微信公众号获取更多技术干货!

正文完
 0