关于java:Netty解决粘包半包问题

3次阅读

共计 6998 个字符,预计需要花费 18 分钟才能阅读完成。

一. 什么是 TCP 粘包半包

客户端发送数据包给服务端, 因服务端一次读取到的字节数是不确定的, 有好几种状况.

  1. 服务端分两次读取到了两个独立的数据包,没有粘包和拆包;
  2. 服务端一次接管到了两个数据包,粘合在一起,被称为 TCP 粘包;
  3. 服务端分两次读取到了两个数据包,第一次读取到了残缺的包和另外一个包的局部内容,第二次读取到了另一个包的残余内容,这被称为 TCP 拆包;
  4. 服务端分两次读取到了两个数据包,第一次读取到了包的局部内容,第二次读取到了之前未读完的包残余内容和另一个包, 产生了拆包和粘包。
  5. 服务端 TCP 接管滑动窗口很小,数据包比拟大, 即服务端分屡次能力将 包接管齐全, 产生屡次拆包

二. 粘包半包的起因

1. 粘包

TCP 协定 :自身是 面向连贯的牢靠地协定 - 三次握手机制。

客户端与服务器会维持一个连贯(Channel),在连接不断开的状况下,能够将多个数据包发往服务器,然而发送的网络数据包太小,那么自身会启用 Nagle 算法(可配置是否启用)对较小的数据包进行合并(因而,TCP 的网络提早要 UDP 的高些)而后再发送(超时或者包大小足够)。

服务器在接管到音讯(数据流)的时候就无奈辨别哪些数据包是客户端本人离开发送的,这样产生了粘包;
服务器在接管到数据后,放到缓冲区中,如果音讯没有被及时从缓存区取走,下次在取数据的时候可能就会呈现一次取出多个
数据包的状况,造成粘包景象。


UDP:自身作为无连贯的不牢靠的传输协定(适宜频繁发送较小的数据包),他不会对数据包进行合并发送,间接是一端发送什么数据,间接就收回去了,既然他不会对数据合并,每一个数据包都是残缺的(数据 +UDP 头 +IP 头等等发一 次数据封装一次)也就没有粘包了。


2. 半包

分包产生的起因:可能是 IP 分片传输导致的,也可能是传输过程中失落部 分包导致呈现的半包,还有可能就是一个包可能被分成了两次传输,在取数据的时候,先取到了一部分(还可能与接管的缓冲区大小有关系),总之就是一个数据包被分成了屡次接管。
更具体的起因有三个,别离如下。

  1. 应用程序写入数据的字节大小大于套接字发送缓冲区的大小
  2. 进行 MSS 大小的 TCP 分段。MSS 是最大报文段长度的缩写。MSS 是 TCP 报文段中的数据字段的最大长度。数据字段加上 TCP 首部才等于整个的 TCP 报文段。所以 MSS 并不是

TCP 报文段的最大长度,而是:MSS=TCP 报文段长度 -TCP 首部长度

  1. 以太网的 payload 大于 MTU 进行 IP 分片。MTU 指:一种通信协议的某一层下面所能

通过的最大数据包大小。如果 IP 层有一个数据包要传,而且数据的长度比链路层的 MTU 大,
那么 IP 层就会进行分片,把数据包分成托干片,让每一片都不超过 MTU。留神,IP 分片可
以产生在原始发送端主机上,也能够产生在两头路由器上。


3. 解决粘包半包问题

因为底层的 TCP 无奈了解下层的业务数据,所以在底层是无奈保障数据包不被拆分和重组的,这个问题只能通过下层的利用协定栈设计来解决。业界的支流协定的解决方案,能够归纳如下。
(1)在包尾减少宰割符,比方回车换行符进行宰割,例如 FTP 协定;
(2)音讯定长,例如每个报文的大小为固定长度 200 字节,如果不够,空位补空格;
(3)将音讯分为音讯头和音讯体,音讯头中蕴含示意音讯总长度(或者音讯体长度)的字段,通常设计思路为音讯头的第一个字段应用 int32 来示意音讯的总长度,LengthFieldBasedFrameDecoder。

上面列举一个包尾减少分隔符的例子:

服务端程序:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 入站处理器
 */
@ChannelHandler.Sharable
public class DelimiterServerHandler extends ChannelInboundHandlerAdapter {private AtomicInteger counter = new AtomicInteger(0);
    private AtomicInteger completeCounter = new AtomicInteger(0);

    /*** 服务端读取到网络数据后的解决 */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf in = (ByteBuf)msg;
        String request = in.toString(CharsetUtil.UTF_8);
        System.out.println("Server Accept["+request
                +"] and the counter is:"+counter.incrementAndGet());
        String resp = "Hello,"+request+". Welcome to Netty World!"
                + DelimiterEchoServer.DELIMITER_SYMBOL;
        ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
    }

    /*** 服务端读取实现网络数据后的解决 */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)
            throws Exception {ctx.fireChannelReadComplete();
        System.out.println("the ReadComplete count is"
                +completeCounter.incrementAndGet());
    }

    /*** 产生异样后的解决 */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();
        ctx.close();}
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;

import java.net.InetSocketAddress;

/**
 * 服务端
 */
public class DelimiterEchoServer {

    public static final String DELIMITER_SYMBOL = "@~";
    public static final int PORT = 9997;

    public static void main(String[] args) throws InterruptedException {DelimiterEchoServer delimiterEchoServer = new DelimiterEchoServer();
        System.out.println("服务器行将启动");
        delimiterEchoServer.start();}

    public void start() throws InterruptedException {final DelimiterServerHandler serverHandler = new DelimiterServerHandler();
        EventLoopGroup group = new NioEventLoopGroup();/* 线程组 */
        try {ServerBootstrap b = new ServerBootstrap();/* 服务端启动必须 */
            b.group(group)/* 将线程组传入 */
                .channel(NioServerSocketChannel.class)/* 指定应用 NIO 进行网络传输 */
                .localAddress(new InetSocketAddress(PORT))/* 指定服务器监听端口 */
                /* 服务端每接管到一个连贯申请,就会新启一个 socket 通信,也就是 channel,所以上面这段代码的作用就是为这个子 channel 减少 handle*/
                .childHandler(new ChannelInitializerImp());
            ChannelFuture f = b.bind().sync();/* 异步绑定到服务器,sync() 会阻塞直到实现 */
            System.out.println("服务器启动实现,期待客户端的连贯和数据.....");
            f.channel().closeFuture().sync();/* 阻塞直到服务器的 channel 敞开 */} finally {group.shutdownGracefully().sync();/* 优雅敞开线程组 */}
    }

    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER_SYMBOL
                    .getBytes());
            ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024,
                    delimiter));
            ch.pipeline().addLast(new DelimiterServerHandler());
        }
    }

}

客户端程序



import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;


/**
 * 入站处理器
 */
public class DelimiterClientHandler extends SimpleChannelInboundHandler<ByteBuf> {private AtomicInteger counter = new AtomicInteger(0);

    /*** 客户端读取到网络数据后的解决 */
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)
                +"] and the counter is:"+counter.incrementAndGet());
    }

    /*** 客户端被告诉 channel 沉闷后,做事 */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf msg = null;
        String request = "Mark,Lison,Peter,James,Deer"
                +  DelimiterEchoServer.DELIMITER_SYMBOL;
        for(int i=0;i<10;i++){msg = Unpooled.buffer(request.length());
            msg.writeBytes(request.getBytes());
            ctx.writeAndFlush(msg);
        }
    }

    /*** 产生异样后的解决 */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();
        ctx.close();}
}


import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;

/**
 * 客户端
 */
public class DelimiterEchoClient {

    private final String host;

    public DelimiterEchoClient(String host) {this.host = host;}

    public void start() throws InterruptedException {EventLoopGroup group = new NioEventLoopGroup();/* 线程组 */
        try {final Bootstrap b = new Bootstrap();;/* 客户端启动必须 */
            b.group(group)/* 将线程组传入 */
                    .channel(NioSocketChannel.class)/* 指定应用 NIO 进行网络传输 */
                    .remoteAddress(new InetSocketAddress(host,DelimiterEchoServer.PORT))/* 配置要连贯服务器的 ip 地址和端口 */
                    .handler(new ChannelInitializerImp());
            ChannelFuture f = b.connect().sync();
            System.out.println("已连贯到服务器.....");
            f.channel().closeFuture().sync();} finally {group.shutdownGracefully().sync();}
    }

    private static class ChannelInitializerImp extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ByteBuf delimiter
                    = Unpooled.copiedBuffer(DelimiterEchoServer.DELIMITER_SYMBOL
                    .getBytes());
            ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024,
                    delimiter));
            ch.pipeline().addLast(new DelimiterClientHandler());
        }
    }

    public static void main(String[] args) throws InterruptedException {new DelimiterEchoClient("127.0.0.1").start();}
}

要害代码:
1. 建设连贯后, 客户端给服务端发数据包, 每次发送已特殊字符 `
@~ 结尾。

2. 服务端收到数据包后通过 DelimiterBasedFrameDecoder 即分隔符根底框架解码器解码为一个个带有分隔符的数据包。

3. 再到服务端的业务层处理器 DelimiterServerHandler

正文完
 0