一.什么是TCP粘包半包
客户端发送数据包给服务端,因服务端一次读取到的字节数是不确定的,有好几种状况.
- 服务端分两次读取到了两个独立的数据包,没有粘包和拆包;
- 服务端一次接管到了两个数据包,粘合在一起,被称为 TCP 粘包;
- 服务端分两次读取到了两个数据包, 第一次读取到了残缺的包和另外一个包的局部内容,第二次读取到了另一个包的残余内容, 这被称为 TCP 拆包;
- 服务端分两次读取到了两个数据包, 第一次读取到了包的局部内容 , 第二次读取到了之前未读完的包残余内容和另一个包,产生了拆包和粘包。
- 服务端 TCP 接管滑动窗口很小,数据包比拟大, 即服务端分屡次能力将 包接管齐全,产生屡次拆包
二.粘包半包的起因
1.粘包
TCP协定:自身是 面向连贯的牢靠地协定-三次握手机制。
客户端与服务器会维持一个连贯(Channel) ,在连接不断开的状况下, 能够将多个数据包发往服务器,然而发送的网络数据包太小, 那么自身会启用 Nagle 算法(可配置是否启用) 对较小的数据包进行合并(因而,TCP 的网络提早要 UDP 的高些)而后再发送(超时或者包大小足够)。
服务器在接管到音讯(数据流)的时候就无奈辨别哪些数据包是客户端本人离开发送的,这样产生了粘包;
服务器在接管到数据后,放到缓冲区中,如果音讯没有被及时从缓存区取走,下次在取数据的时候可能就会呈现一次取出多个
数据包的状况,造成粘包景象。
UDP: 自身作为无连贯的不牢靠的传输协定(适宜频繁发送较小的数据包) , 他不会对数据包进行合并发送,间接是一端发送什么数据, 间接就收回去了, 既然他不会对数据合并, 每一个数据包都是残缺的(数据+UDP 头+IP 头等等发一 次数据封装一次) 也就没有粘包了。
2.半包
分包产生的起因:可能是IP分片传输导致的, 也可能是传输过程中失落部 分包导致呈现的半包, 还有可能就是一个包可能被分成了两次传输, 在取数据的时候,先取到了一部分(还可能与接管的缓冲区大小有关系) , 总之就是一个数据包被分成了屡次接管。
更具体的起因有三个, 别离如下。
- 应用程序写入数据的字节大小大于套接字发送缓冲区的大小
- 进行 MSS 大小的 TCP 分段。 MSS 是最大报文段长度的缩写。 MSS 是 TCP 报文段中的数据字段的最大长度。 数据字段加上 TCP 首部才等于整个的 TCP 报文段。 所以 MSS 并不是
TCP 报文段的最大长度, 而是: MSS=TCP 报文段长度-TCP 首部长度
- 以太网的 payload 大于 MTU 进行 IP 分片。 MTU 指: 一种通信协议的某一层下面所能
通过的最大数据包大小。 如果 IP 层有一个数据包要传, 而且数据的长度比链路层的 MTU 大,
那么 IP 层就会进行分片, 把数据包分成托干片, 让每一片都不超过 MTU。 留神, IP 分片可
以产生在原始发送端主机上, 也能够产生在两头路由器上。
3.解决粘包半包问题
因为底层的 TCP 无奈了解下层的业务数据, 所以在底层是无奈保障数据包不被拆分和重组的, 这个问题只能通过下层的利用协定栈设计来解决。业界的支流协定的解决方案,能够归纳如下。
(1) 在包尾减少宰割符, 比方回车换行符进行宰割, 例如 FTP 协定;
(2) 音讯定长, 例如每个报文的大小为固定长度 200 字节, 如果不够, 空位补空格;
(3) 将音讯分为音讯头和音讯体, 音讯头中蕴含示意音讯总长度(或者音讯体长度)的字段, 通常设计思路为音讯头的第一个字段应用 int32 来示意音讯的总长度,LengthFieldBasedFrameDecoder。
上面列举一个包尾减少分隔符的例子:
服务端程序:
import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;import java.util.concurrent.atomic.AtomicInteger;/** * 入站处理器 */@ChannelHandler.Sharablepublic class DelimiterServerHandler extends ChannelInboundHandlerAdapter { private AtomicInteger counter = new AtomicInteger(0); private AtomicInteger completeCounter = new AtomicInteger(0); /*** 服务端读取到网络数据后的解决*/ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf)msg; String request = in.toString(CharsetUtil.UTF_8); System.out.println("Server Accept["+request +"] and the counter is:"+counter.incrementAndGet()); String resp = "Hello,"+request+". Welcome to Netty World!" + DelimiterEchoServer.DELIMITER_SYMBOL; ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes())); } /*** 服务端读取实现网络数据后的解决*/ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); System.out.println("the ReadComplete count is " +completeCounter.incrementAndGet()); } /*** 产生异样后的解决*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;import java.net.InetSocketAddress;/** * 服务端 */public class DelimiterEchoServer { public static final String DELIMITER_SYMBOL = "@~"; public static final int PORT = 9997; public static void main(String[] args) throws InterruptedException { DelimiterEchoServer delimiterEchoServer = new DelimiterEchoServer(); System.out.println("服务器行将启动"); delimiterEchoServer.start(); } public void start() throws InterruptedException { final DelimiterServerHandler serverHandler = new DelimiterServerHandler(); EventLoopGroup group = new NioEventLoopGroup();/*线程组*/ try { ServerBootstrap b = new ServerBootstrap();/*服务端启动必须*/ b.group(group)/*将线程组传入*/ .channel(NioServerSocketChannel.class)/*指定应用NIO进行网络传输*/ .localAddress(new InetSocketAddress(PORT))/*指定服务器监听端口*/ /*服务端每接管到一个连贯申请,就会新启一个socket通信,也就是channel, 所以上面这段代码的作用就是为这个子channel减少handle*/ .childHandler(new ChannelInitializerImp()); ChannelFuture f = b.bind().sync();/*异步绑定到服务器,sync()会阻塞直到实现*/ System.out.println("服务器启动实现,期待客户端的连贯和数据....."); f.channel().closeFuture().sync();/*阻塞直到服务器的channel敞开*/ } finally { group.shutdownGracefully().sync();/*优雅敞开线程组*/ } } private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER_SYMBOL .getBytes()); ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024, delimiter)); ch.pipeline().addLast(new DelimiterServerHandler()); } }}
客户端程序
import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.util.CharsetUtil;/** * 入站处理器 */public class DelimiterClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private AtomicInteger counter = new AtomicInteger(0); /*** 客户端读取到网络数据后的解决*/ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8) +"] and the counter is:"+counter.incrementAndGet()); } /*** 客户端被告诉channel沉闷后,做事*/ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf msg = null; String request = "Mark,Lison,Peter,James,Deer" + DelimiterEchoServer.DELIMITER_SYMBOL; for(int i=0;i<10;i++){ msg = Unpooled.buffer(request.length()); msg.writeBytes(request.getBytes()); ctx.writeAndFlush(msg); } } /*** 产生异样后的解决*/ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;/** * 客户端 */public class DelimiterEchoClient { private final String host; public DelimiterEchoClient(String host) { this.host = host; } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup();/*线程组*/ try { final Bootstrap b = new Bootstrap();;/*客户端启动必须*/ b.group(group)/*将线程组传入*/ .channel(NioSocketChannel.class)/*指定应用NIO进行网络传输*/ .remoteAddress(new InetSocketAddress(host,DelimiterEchoServer.PORT))/*配置要连贯服务器的ip地址和端口*/ .handler(new ChannelInitializerImp()); ChannelFuture f = b.connect().sync(); System.out.println("已连贯到服务器....."); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } private static class ChannelInitializerImp extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer(DelimiterEchoServer.DELIMITER_SYMBOL .getBytes()); ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024, delimiter)); ch.pipeline().addLast(new DelimiterClientHandler()); } } public static void main(String[] args) throws InterruptedException { new DelimiterEchoClient("127.0.0.1").start(); }}
要害代码:
1.建设连贯后,客户端给服务端发数据包,每次发送已特殊字符`
@~结尾。
2.服务端收到数据包后通过DelimiterBasedFrameDecoder即分隔符根底框架解码器解码为一个个带有分隔符的数据包。
3.再到服务端的业务层处理器DelimiterServerHandler