共计 6998 个字符,预计需要花费 18 分钟才能阅读完成。
一. 什么是 TCP 粘包半包
客户端发送数据包给服务端, 因服务端一次读取到的字节数是不确定的, 有好几种状况.
- 服务端分两次读取到了两个独立的数据包,没有粘包和拆包;
- 服务端一次接管到了两个数据包,粘合在一起,被称为 TCP 粘包;
- 服务端分两次读取到了两个数据包,第一次读取到了残缺的包和另外一个包的局部内容,第二次读取到了另一个包的残余内容,这被称为 TCP 拆包;
- 服务端分两次读取到了两个数据包,第一次读取到了包的局部内容,第二次读取到了之前未读完的包残余内容和另一个包, 产生了拆包和粘包。
- 服务端 TCP 接管滑动窗口很小,数据包比拟大, 即服务端分屡次能力将 包接管齐全, 产生屡次拆包
二. 粘包半包的起因
1. 粘包
TCP 协定 :自身是 面向连贯的牢靠地协定 - 三次握手机制。
客户端与服务器会维持一个连贯(Channel),在连接不断开的状况下,能够将多个数据包发往服务器,然而发送的网络数据包太小,那么自身会启用 Nagle 算法(可配置是否启用)对较小的数据包进行合并(因而,TCP 的网络提早要 UDP 的高些)而后再发送(超时或者包大小足够)。
服务器在接管到音讯(数据流)的时候就无奈辨别哪些数据包是客户端本人离开发送的,这样产生了粘包;
服务器在接管到数据后,放到缓冲区中,如果音讯没有被及时从缓存区取走,下次在取数据的时候可能就会呈现一次取出多个
数据包的状况,造成粘包景象。
UDP:自身作为无连贯的不牢靠的传输协定(适宜频繁发送较小的数据包),他不会对数据包进行合并发送,间接是一端发送什么数据,间接就收回去了,既然他不会对数据合并,每一个数据包都是残缺的(数据 +UDP 头 +IP 头等等发一 次数据封装一次)也就没有粘包了。
2. 半包
分包产生的起因:可能是 IP 分片传输导致的,也可能是传输过程中失落部 分包导致呈现的半包,还有可能就是一个包可能被分成了两次传输,在取数据的时候,先取到了一部分(还可能与接管的缓冲区大小有关系),总之就是一个数据包被分成了屡次接管。
更具体的起因有三个,别离如下。
- 应用程序写入数据的字节大小大于套接字发送缓冲区的大小
- 进行 MSS 大小的 TCP 分段。MSS 是最大报文段长度的缩写。MSS 是 TCP 报文段中的数据字段的最大长度。数据字段加上 TCP 首部才等于整个的 TCP 报文段。所以 MSS 并不是
TCP 报文段的最大长度,而是:MSS=TCP 报文段长度 -TCP 首部长度
- 以太网的 payload 大于 MTU 进行 IP 分片。MTU 指:一种通信协议的某一层下面所能
通过的最大数据包大小。如果 IP 层有一个数据包要传,而且数据的长度比链路层的 MTU 大,
那么 IP 层就会进行分片,把数据包分成托干片,让每一片都不超过 MTU。留神,IP 分片可
以产生在原始发送端主机上,也能够产生在两头路由器上。
3. 解决粘包半包问题
因为底层的 TCP 无奈了解下层的业务数据,所以在底层是无奈保障数据包不被拆分和重组的,这个问题只能通过下层的利用协定栈设计来解决。业界的支流协定的解决方案,能够归纳如下。
(1)在包尾减少宰割符,比方回车换行符进行宰割,例如 FTP 协定;
(2)音讯定长,例如每个报文的大小为固定长度 200 字节,如果不够,空位补空格;
(3)将音讯分为音讯头和音讯体,音讯头中蕴含示意音讯总长度(或者音讯体长度)的字段,通常设计思路为音讯头的第一个字段应用 int32 来示意音讯的总长度,LengthFieldBasedFrameDecoder。
上面列举一个包尾减少分隔符的例子:
服务端程序:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 入站处理器
*/
@ChannelHandler.Sharable
public class DelimiterServerHandler extends ChannelInboundHandlerAdapter {private AtomicInteger counter = new AtomicInteger(0);
private AtomicInteger completeCounter = new AtomicInteger(0);
/*** 服务端读取到网络数据后的解决 */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf in = (ByteBuf)msg;
String request = in.toString(CharsetUtil.UTF_8);
System.out.println("Server Accept["+request
+"] and the counter is:"+counter.incrementAndGet());
String resp = "Hello,"+request+". Welcome to Netty World!"
+ DelimiterEchoServer.DELIMITER_SYMBOL;
ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
}
/*** 服务端读取实现网络数据后的解决 */
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {ctx.fireChannelReadComplete();
System.out.println("the ReadComplete count is"
+completeCounter.incrementAndGet());
}
/*** 产生异样后的解决 */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();
ctx.close();}
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import java.net.InetSocketAddress;
/**
* 服务端
*/
public class DelimiterEchoServer {
public static final String DELIMITER_SYMBOL = "@~";
public static final int PORT = 9997;
public static void main(String[] args) throws InterruptedException {DelimiterEchoServer delimiterEchoServer = new DelimiterEchoServer();
System.out.println("服务器行将启动");
delimiterEchoServer.start();}
public void start() throws InterruptedException {final DelimiterServerHandler serverHandler = new DelimiterServerHandler();
EventLoopGroup group = new NioEventLoopGroup();/* 线程组 */
try {ServerBootstrap b = new ServerBootstrap();/* 服务端启动必须 */
b.group(group)/* 将线程组传入 */
.channel(NioServerSocketChannel.class)/* 指定应用 NIO 进行网络传输 */
.localAddress(new InetSocketAddress(PORT))/* 指定服务器监听端口 */
/* 服务端每接管到一个连贯申请,就会新启一个 socket 通信,也就是 channel,所以上面这段代码的作用就是为这个子 channel 减少 handle*/
.childHandler(new ChannelInitializerImp());
ChannelFuture f = b.bind().sync();/* 异步绑定到服务器,sync() 会阻塞直到实现 */
System.out.println("服务器启动实现,期待客户端的连贯和数据.....");
f.channel().closeFuture().sync();/* 阻塞直到服务器的 channel 敞开 */} finally {group.shutdownGracefully().sync();/* 优雅敞开线程组 */}
}
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer(DELIMITER_SYMBOL
.getBytes());
ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024,
delimiter));
ch.pipeline().addLast(new DelimiterServerHandler());
}
}
}
客户端程序
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* 入站处理器
*/
public class DelimiterClientHandler extends SimpleChannelInboundHandler<ByteBuf> {private AtomicInteger counter = new AtomicInteger(0);
/*** 客户端读取到网络数据后的解决 */
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {System.out.println("client Accept["+msg.toString(CharsetUtil.UTF_8)
+"] and the counter is:"+counter.incrementAndGet());
}
/*** 客户端被告诉 channel 沉闷后,做事 */
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf msg = null;
String request = "Mark,Lison,Peter,James,Deer"
+ DelimiterEchoServer.DELIMITER_SYMBOL;
for(int i=0;i<10;i++){msg = Unpooled.buffer(request.length());
msg.writeBytes(request.getBytes());
ctx.writeAndFlush(msg);
}
}
/*** 产生异样后的解决 */
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();
ctx.close();}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
/**
* 客户端
*/
public class DelimiterEchoClient {
private final String host;
public DelimiterEchoClient(String host) {this.host = host;}
public void start() throws InterruptedException {EventLoopGroup group = new NioEventLoopGroup();/* 线程组 */
try {final Bootstrap b = new Bootstrap();;/* 客户端启动必须 */
b.group(group)/* 将线程组传入 */
.channel(NioSocketChannel.class)/* 指定应用 NIO 进行网络传输 */
.remoteAddress(new InetSocketAddress(host,DelimiterEchoServer.PORT))/* 配置要连贯服务器的 ip 地址和端口 */
.handler(new ChannelInitializerImp());
ChannelFuture f = b.connect().sync();
System.out.println("已连贯到服务器.....");
f.channel().closeFuture().sync();} finally {group.shutdownGracefully().sync();}
}
private static class ChannelInitializerImp extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ByteBuf delimiter
= Unpooled.copiedBuffer(DelimiterEchoServer.DELIMITER_SYMBOL
.getBytes());
ch.pipeline().addLast( new DelimiterBasedFrameDecoder(1024,
delimiter));
ch.pipeline().addLast(new DelimiterClientHandler());
}
}
public static void main(String[] args) throws InterruptedException {new DelimiterEchoClient("127.0.0.1").start();}
}
要害代码:
1. 建设连贯后, 客户端给服务端发数据包, 每次发送已特殊字符 `
@~ 结尾。
2. 服务端收到数据包后通过 DelimiterBasedFrameDecoder 即分隔符根底框架解码器解码为一个个带有分隔符的数据包。
3. 再到服务端的业务层处理器 DelimiterServerHandler