乐趣区

关于java:Netty-是如何解决-TCP-粘包拆包的

作者:rickiyang

出处:www.cnblogs.com/rickiyang/p/11074235.html

咱们都晓得 TCP 是基于字节流的传输协定。

那么数据在通信层流传其实就像河水一样并没有显著的分界线,而数据具体示意什么意思什么中央有句号什么中央有分号这个对于 TCP 底层来说并不分明。应用层向 TCP 层发送用于网间传输的、用 8 位字节示意的数据流,而后 TCP 把数据流分区成适当长度的报文段,之后 TCP 把后果包传给 IP 层,由它来通过网络将包传送给接收端实体的 TCP 层。

所以对于这个数据拆分成大包小包的问题就是咱们明天要讲的粘包和拆包的问题。

1、TCP 粘包拆包问题阐明

粘包和拆包这两个概念预计大家还不分明,通过上面这张图咱们来剖析一下:

假如客户端别离发送两个数据包 D1,D2 个服务端,然而发送过程中数据是何种模式进行流传这个并不分明,别离有下列 4 种状况:

  1. 服务端一次承受到了 D1 和 D2 两个数据包,两个包粘在一起,称为粘包;
  2. 服务端分两次读取到数据包 D1 和 D2,没有产生粘包和拆包;
  3. 服务端分两次读到了数据包,第一次读到了 D1 和 D2 的局部内容,第二次读到了 D2 的剩下局部,这个称为拆包;
  4. 服务器分三次读到了数据局部,第一次读到了 D1 包,第二次读到了 D2 包的局部内容,第三次读到了 D2 包的剩下内容。

2、TCP 粘包产生起因

咱们晓得在 TCP 协定中,利用数据宰割成 TCP 认为最适宜发送的数据块,这部分是通过“MSS”(最大数据包长度)选项来管制的,通常这种机制也被称为一种协商机制,MSS 规定了 TCP 传往另一端的最大数据块的长度。这个值 TCP 协定在实现的时候往往用 MTU 值代替(须要减去 IP 数据包包头的大小 20Bytes 和 TCP 数据段的包头 20Bytes)所以往往 MSS 为 1460。通信单方会依据单方提供的 MSS 值得最小值确定为这次连贯的最大 MSS 值。

tcp 为进步性能,发送端会将须要发送的数据发送到缓冲区,期待缓冲区满了之后,再将缓冲中的数据发送到接管方。同理,接管方也有缓冲区这样的机制,来接收数据。

产生粘包拆包的起因次要有以下这些:

  1. 应用程序写入数据的字节大小大于套接字发送缓冲区的大小将产生拆包;
  2. 进行 MSS 大小的 TCP 分段。MSS 是 TCP 报文段中的数据字段的最大长度,当 TCP 报文长度 -TCP 头部长度 >mss 的时候将产生拆包;
  3. 应用程序写入数据小于套接字缓冲区大小,网卡将利用屡次写入的数据发送到网络上, 将产生粘包;
  4. 数据包大于 MTU 的时候将会进行切片。MTU 即 (Maxitum Transmission Unit) 最大传输单元, 因为以太网传输电气方面的限度,每个以太网帧都有最小的大小 64bytes 最大不能超过 1518bytes, 刨去以太网帧的帧头 14Bytes 和帧尾 CRC 校验局部 4Bytes, 那么剩下承载下层协定的中央也就是 Data 域最大就只能有 1500Bytes 这个值咱们就把它称之为 MTU。这个就是网络层协定十分关怀的中央,因为网络层协定比方 IP 协定会依据这个值来决定是否把下层传下来的数据进行分片。

3、如何解决 TCP 粘包拆包

咱们晓得 tcp 是无界的数据流,且协定自身无奈防止粘包,拆包的产生,那咱们只能在应用层数据协定上,加以控制。通常在制订传输数据时,能够应用如下办法:

  1. 设置定长音讯,服务端每次读取既定长度的内容作为一条残缺音讯;
  2. 应用带音讯头的协定、音讯头存储音讯开始标识及音讯长度信息,服务端获取音讯头的时候解析出音讯长度,而后向后读取该长度的内容;
  3. 设置音讯边界,服务端从网络流中按音讯边界拆散出音讯内容。比方在音讯开端加上换行符用以辨别音讯完结。

当然应用层还有更多简单的形式能够解决这个问题,这个就属于网络层的问题了,咱们还是用 java 提供的形式来解决这个问题。咱们先看一个例子看看粘包是如何产生的。

服务端:

public class HelloWordServer {
    private int port;

    public HelloWordServer(int port) {this.port = port;}

    public void start(){EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
                                    .channel(NioServerSocketChannel.class)
                                    .childHandler(new ServerChannelInitializer());

        try {ChannelFuture future = server.bind(port).sync();
            future.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();
        }finally {bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();}
    }

    public static void main(String[] args) {HelloWordServer server = new HelloWordServer(7788);
        server.start();}
}

服务端 Initializer:

public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();

        // 字符串解码 和 编码
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());

        // 本人的逻辑 Handler
        pipeline.addLast("handler", new HelloWordServerHandler());
    }
}

服务端 handler:

public class HelloWordServerHandler extends ChannelInboundHandlerAdapter {
    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {String body = (String)msg;
        System.out.println("server receive order :" + body + ";the counter is:" + ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {super.exceptionCaught(ctx, cause);
    }
}

客户端:

public class HelloWorldClient {
    private  int port;
    private  String address;

    public HelloWorldClient(int port,String address) {
        this.port = port;
        this.address = address;
    }

    public void start(){EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ClientChannelInitializer());

        try {ChannelFuture future = bootstrap.connect(address,port).sync();         
            future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();
        }finally {group.shutdownGracefully();
        }

    }

    public static void main(String[] args) {HelloWorldClient client = new HelloWorldClient(7788,"127.0.0.1");
        client.start();}
}

客户端 Initializer:

public class ClientChannelInitializer extends  ChannelInitializer<SocketChannel> {protected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();

        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());

        // 客户端的逻辑
        pipeline.addLast("handler", new HelloWorldClientHandler());
    }
}

客户端 handler:

public class HelloWorldClientHandler extends ChannelInboundHandlerAdapter {private byte[] req;
    private int counter;

    public BaseClientHandler() {
        req = ("Unless required by applicable law or agreed to in writing, software\n" +
                "distributed under the License is distributed on an \"AS IS\"BASIS,\n" +
                "WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
                "See the License for the specific language governing permissions and\n" +
                "limitations under the License.This connector uses the BIO implementation that requires the JSSE\n" +
                "style configuration. When using the APR/native implementation, the\n" +
                "penSSL style configuration is required as described in the APR/native\n" +
                "documentation.An Engine represents the entry point (within Catalina) that processes\n" +
                "every request.  The Engine implementation for Tomcat stand alone\n" +
                "analyzes the HTTP headers included with the request, and passes them\n" +
                "on to the appropriate Host (virtual host)# Unless required by applicable law or agreed to in writing, software\n" +
                "# distributed under the License is distributed on an \"AS IS\"BASIS,\n" +
                "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" +
                "# See the License for the specific language governing permissions and\n" +
                "# limitations under the License.# For example, set the org.apache.catalina.util.LifecycleBase logger to log\n" +
                "# each component that extends LifecycleBase changing state:\n" +
                "#org.apache.catalina.util.LifecycleBase.level = FINE"
                ).getBytes();}

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message;

        // 将下面的所有字符串作为一个音讯体发送进来
        message = Unpooled.buffer(req.length);
        message.writeBytes(req);
        ctx.writeAndFlush(message);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {String buf = (String)msg;
        System.out.println("Now is :" + buf + "; the counter is :"+ (++counter));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();
    }
}

运行客户端和服务端咱们能看到:

咱们看到这个长长的字符串被截成了 2 段发送,这就是产生了拆包的景象。同样粘包咱们也很容易去模仿,咱们把 BaseClientHandler 中的 channelActive 办法外面的:

message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);

这几行代码是把咱们下面的一长串字符转成的 byte 数组写进流里发送进来,那么咱们能够在这里把下面发送音讯的这几行循环几遍这样发送的内容增多了就有可能在拆包的时候把上一条音讯的一部分调配到下一条音讯外面了,批改如下:

for (int i = 0; i < 3; i++) {message = Unpooled.buffer(req.length);
    message.writeBytes(req);
    ctx.writeAndFlush(message);
}

改完之后咱们再运行一下,输入太长不好截图,咱们在输入后果中能看到循环 3 次之后的音讯服务端收到的就不是之前的残缺的一条了,而是被拆分了 4 次发送。

对于下面呈现的粘包和拆包的问题,Netty 已有思考,并且有施行的计划:LineBasedFrameDecoder。
咱们从新改写一下 ServerChannelInitializer:

public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();


        pipeline.addLast(new LineBasedFrameDecoder(2048));       
        // 字符串解码 和 编码
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());

        // 本人的逻辑 Handler
        pipeline.addLast("handler", new BaseServerHandler());
    }
}

新增:pipeline.addLast(new LineBasedFrameDecoder(2048))。同时,咱们还得对下面发送的音讯进行革新 BaseClientHandler:

public class BaseClientHandler extends ChannelInboundHandlerAdapter {private byte[] req;
    private int counter;

    req = ("Unless required by applicable dfslaw or agreed to in writing, software" +
                "distributed under the License is distributed on an \"AS IS\"BASIS," +
                "WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied." +
                "See the License for the specific language governing permissions and" +
                "limitations under the License.This connector uses the BIO implementation that requires the JSSE" +
                "style configuration. When using the APR/native implementation, the" +
                "penSSL style configuration is required as described in the APR/native" +
                "documentation.An Engine represents the entry point (within Catalina) that processes" +
                "every request.  The Engine implementation for Tomcat stand alone" +
                "analyzes the HTTP headers included with the request, and passes them" +
                "on to the appropriate Host (virtual host)# Unless required by applicable law or agreed to in writing, software" +
                "# distributed under the License is distributed on an \"AS IS\"BASIS," +
                "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied." +
                "# See the License for the specific language governing permissions and" +
                "# limitations under the License.# For example, set the org.apache.catalina.util.LifecycleBase logger to log" +
                "# each component that extends LifecycleBase changing state:" +
                "#org.apache.catalina.util.LifecycleBase.level = FINE\n"
                ).getBytes();  


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message;

        message = Unpooled.buffer(req.length);
        message.writeBytes(req);
        ctx.writeAndFlush(message);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {String buf = (String)msg;
        System.out.println("Now is :" + buf + "; the counter is :"+ (++counter));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();
    }
}

去掉所有的”\n”, 只保留字符串开端的这一个。起因稍后再说。channelActive 办法中咱们不用再用循环屡次发送音讯了,只发送一次就好 (第一个例子中发送一次的时候是产生了拆包的),而后咱们再次运行,大家会看到这么长一串字符只发送了一串就发送结束。程序输入我就不截图了。上面来解释一下 LineBasedFrameDecoder。

LineBasedFrameDecoder 的工作原理是它顺次遍历 ByteBuf 中的可读字节,判断看是否有”\n”或者”\r\n”,如果有,就以此地位为完结地位,从可读索引到完结地位区间的字节就组成了一行。它是以换行符为完结标记的解码器。反对携带结束符或者不携带结束符两种解码形式,同时反对配置单行的最大长度。如果间断读取到最大长度后依然没有发现换行符,就会抛出异样,同时疏忽掉之前读到的异样码流。这个对于咱们确定音讯最大长度的利用场景还是很有帮忙。

对于下面的判断看是否有”\n”或者”\r\n”以此作为完结的标记咱们可能回忆,要是没有”\n”或者”\r\n”那还有什么别的形式能够判断音讯是否完结呢。别放心,Netty 对于此曾经有思考,还有别的解码器能够帮忙咱们解决问题,

近期热文举荐:

1.1,000+ 道 Java 面试题及答案整顿 (2021 最新版)

2. 终于靠开源我的项目弄到 IntelliJ IDEA 激活码了,真香!

3. 阿里 Mock 工具正式开源,干掉市面上所有 Mock 工具!

4.Spring Cloud 2020.0.0 正式公布,全新颠覆性版本!

5.《Java 开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞 + 转发哦!

退出移动版