上一篇咱们讲了《Socket粘包问题的3种解决方案》,但没想到评论区居然炸了。介于大家的激情探讨,以及不同的反馈意见,本文就来做一个扩大和延长,试图找到问题的最优解,以及音讯通信的最优解决方案。

在正式开始之前,咱们先对上篇评论中的几个典型问题做一个简略的回复,不感兴趣的敌人可间接划过。

问题一:TCP存在粘包问题吗?

先说答案:TCP 自身并没有粘包和半包一说,因为 TCP 实质上只是一个传输控制协议(Transmission Control Protocol,TCP),它是一种面向连贯的、牢靠的、基于字节流的传输层通信协议,由 IETF 的 RFC 793 定义。

所谓的协定实质上是一个约定,就好比 Java 编程约定应用驼峰命名法一样,约定的意义是为了让通信单方,可能失常的进行音讯调换的,那粘包和半包问题又是如何产生的呢?

这是因为在 TCP 的交互中,数据是以字节流的模式进行传输的,而“流”的传输是没有边界的,因为没有边界所以就不能辨别音讯的归属,从而就会产生粘包和半包问题(粘包和半包的定义,详见上一篇)。所以说 TCP 协定自身并不存在粘包和半包问题,只是在应用中如果不能无效的确定流的边界就会产生粘包和半包问题。

问题二:分隔符是最优解决方案?

坦率的说,通过评论区大家的急躁“开导”,我也意识到了以结束符作为最终的解决方案存在肯定的局限性,比方当一条音讯两头如果呈现了结束符就会造成半包的问题,所以如果是简单的字符串要对内容进行编码和解码解决,这样能力保障结束符的正确性。

问题三:Socket 高效吗?

这个问题的答案是否定的,其实上文在结尾曾经形容了利用场景:「传统的 Socket 编程」,学习它的意义就在于了解更晚期更底层的一些常识,当然作为补充本文会提供更加高效的音讯通信计划——Netty 通信。


聊完了以上问题,接下来咱们先来补充一下上篇文章中提到的,将音讯分为音讯头和音讯体的代码实现。

一、封装音讯头和音讯体

在开始写服务器端和客户端之前,咱们先来编写一个音讯的封装类,应用它能够将音讯封装成音讯头和音讯体,如下图所示:

音讯头中存储音讯体的长度,从而确定了音讯的边界,便解决粘包和半包问题。

1.音讯封装类

音讯的封装类中提供了两个办法:一个是将音讯转换成音讯头 + 音讯体的办法,另一个是读取音讯头的办法,具体实现代码如下:

/** * 音讯封装类 */class SocketPacket {    // 音讯头存储的长度(占 8 字节)    static final int HEAD_SIZE = 8;    /**     * 将协定封装为:协定头 + 协定体     * @param context 音讯体(String 类型)     * @return byte[]     */    public byte[] toBytes(String context) {        // 协定体 byte 数组        byte[] bodyByte = context.getBytes();        int bodyByteLength = bodyByte.length;        // 最终封装对象        byte[] result = new byte[HEAD_SIZE + bodyByteLength];        // 借助 NumberFormat 将 int 转换为 byte[]        NumberFormat numberFormat = NumberFormat.getNumberInstance();        numberFormat.setMinimumIntegerDigits(HEAD_SIZE);        numberFormat.setGroupingUsed(false);        // 协定头 byte 数组        byte[] headByte = numberFormat.format(bodyByteLength).getBytes();        // 封装协定头        System.arraycopy(headByte, 0, result, 0, HEAD_SIZE);        // 封装协定体        System.arraycopy(bodyByte, 0, result, HEAD_SIZE, bodyByteLength);        return result;    }    /**     * 获取音讯头的内容(也就是音讯体的长度)     * @param inputStream     * @return     */    public int getHeader(InputStream inputStream) throws IOException {        int result = 0;        byte[] bytes = new byte[HEAD_SIZE];        inputStream.read(bytes, 0, HEAD_SIZE);        // 失去音讯体的字节长度        result = Integer.valueOf(new String(bytes));        return result;    }}

2.编写客户端

接下来咱们来定义客户端,在客户端中咱们增加一组待发送的音讯,随机给服务器端发送一个音讯,实现代码如下:

/** * 客户端 */class MySocketClient {    public static void main(String[] args) throws IOException {        // 启动 Socket 并尝试连贯服务器        Socket socket = new Socket("127.0.0.1", 9093);        // 发送音讯合集(随机发送一条音讯)        final String[] message = {"Hi,Java.", "Hi,SQL~", "关注公众号|Java中文社群."};        // 创立协定封装对象        SocketPacket socketPacket = new SocketPacket();        try (OutputStream outputStream = socket.getOutputStream()) {            // 给服务器端发送 10 次音讯            for (int i = 0; i < 10; i++) {                // 随机发送一条音讯                String msg = message[new Random().nextInt(message.length)];                // 将内容封装为:协定头+协定体                byte[] bytes = socketPacket.toBytes(msg);                // 发送音讯                outputStream.write(bytes, 0, bytes.length);                outputStream.flush();            }        }    }}

3.编写服务器端

服务器端咱们应用线程池来解决每个客户端的业务申请,实现代码如下:

/** * 服务器端 */class MySocketServer {    public static void main(String[] args) throws IOException {        // 创立 Socket 服务器端        ServerSocket serverSocket = new ServerSocket(9093);        // 获取客户端连贯        Socket clientSocket = serverSocket.accept();        // 应用线程池解决更多的客户端        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(100, 150, 100,                TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000));        threadPool.submit(() -> {            // 客户端音讯解决            processMessage(clientSocket);        });    }    /**     * 客户端音讯解决     * @param clientSocket     */    private static void processMessage(Socket clientSocket) {        // Socket 封装对象        SocketPacket socketPacket = new SocketPacket();        // 获取客户端发送的音讯对象        try (InputStream inputStream = clientSocket.getInputStream()) {            while (true) {                // 获取音讯头(也就是音讯体的长度)                int bodyLength = socketPacket.getHeader(inputStream);                // 音讯体 byte 数组                byte[] bodyByte = new byte[bodyLength];                // 每次理论读取字节数                int readCount = 0;                // 音讯体赋值下标                int bodyIndex = 0;                // 循环接管音讯头中定义的长度                while (bodyIndex <= (bodyLength - 1) &&                        (readCount = inputStream.read(bodyByte, bodyIndex, bodyLength)) != -1) {                    bodyIndex += readCount;                }                bodyIndex = 0;                // 胜利接管到客户端的音讯并打印                System.out.println("接管到客户端的信息:" + new String(bodyByte));            }        } catch (IOException ioException) {            System.out.println(ioException.getMessage());        }    }}

以上程序的执行后果如下:

从上述后果能够看出,音讯通信失常,客户端和服务器端的交互中并没有呈现粘包和半包的问题。

二、应用 Netty 实现高效通信

以上的内容都是针对传统 Socket 编程的,但要实现更加高效的通信和连贯对象的复用就要应用 NIO(Non-Blocking IO,非阻塞 IO)或者 AIO(Asynchronous IO,异步非阻塞 IO)了。

传统的 Socket 编程是 BIO(Blocking IO,同步阻塞 IO),它和 NIO 和 AIO 的区别如下:

  • BIO 来自传统的 java.io 包,它是基于流模型实现的,交互的形式是同步、阻塞形式,也就是说在读入输出流或者输入流时,在读写动作实现之前,线程会始终阻塞在那里,它们之间的调用是牢靠的线性程序。它的长处就是代码比较简单、直观;毛病就是 IO 的效率和扩展性很低,容易成为利用性能瓶颈。
  • NIO 是 Java 1.4 引入的 java.nio 包,提供了 Channel、Selector、Buffer 等新的形象,能够构建多路复用的、同步非阻塞 IO 程序,同时提供了更靠近操作系统底层高性能的数据操作形式。
  • AIO 是 Java 1.7 之后引入的包,是 NIO 的降级版本,提供了异步非梗塞的 IO 操作形式,因而人们叫它 AIO(Asynchronous IO),异步 IO 是基于事件和回调机制实现的,也就是利用操作之后会间接返回,不会梗塞在那里,当后盾解决实现,操作系统会告诉相应的线程进行后续的操作。
PS:AIO 能够看作是 NIO 的降级,它也叫 NIO 2。

传统 Socket 的通信流程:

NIO 的通信流程:

应用 Netty 代替传统 NIO 编程

NIO 的设计思路尽管很好,但它的代码编写比拟麻烦,比方 Buffer 的应用和 Selector 的编写等。并且在面对断线重连、包失落和粘包等简单问题时手动解决的老本都很大,因而咱们通常会应用 Netty 框架来代替传统的 NIO。

Netty 是什么?

Netty 是一个异步、事件驱动的用来做高性能、高可靠性的网络应用框架,应用它能够疾速轻松地开发网络应用程序,极大的简化了网络编程的复杂度。

Netty 次要长处有以下几个:

  1. 框架设计优雅,底层模型随便切换适应不同的网络协议要求;
  2. 提供很多规范的协定、平安、编码解码的反对;
  3. 简化了 NIO 应用中的诸多不便;
  4. 社区十分沉闷,很多开源框架中都应用了 Netty 框架,如 Dubbo、RocketMQ、Spark 等。

Netty 次要蕴含以下 3 个局部,如下图所示:

这 3 个局部的性能介绍如下。

1. Core 核心层

Core 核心层是 Netty 最精髓的内容,它提供了底层网络通信的通用形象和实现,包含可扩大的事件模型、通用的通信 API、反对零拷贝的 ByteBuf 等。

2. Protocol Support 协定反对层

协定反对层基本上笼罩了支流协定的编解码实现,如 HTTP、SSL、Protobuf、压缩、大文件传输、WebSocket、文本、二进制等支流协定,此外 Netty 还反对自定义应用层协定。Netty 丰盛的协定反对升高了用户的开发成本,基于 Netty 咱们能够疾速开发 HTTP、WebSocket 等服务。

3. Transport Service 传输服务层

传输服务层提供了网络传输能力的定义和实现办法。它反对 Socket、HTTP 隧道、虚拟机管道等传输方式。Netty 对 TCP、UDP 等数据传输做了形象和封装,用户能够更聚焦在业务逻辑实现上,而不用关系底层数据传输的细节。

Netty 应用

对 Netty 有了大略的意识之后,接下来咱们用 Netty 来编写一个根底的通信服务器,它蕴含两个端:服务器端和客户端,客户端负责发送音讯,服务器端负责接管并打印消息,具体的实现步骤如下。

1.增加 Netty 框架

首先咱们须要先增加 Netty 框架的反对,如果是 Maven 我的项目增加如下配置即可:

<!-- 增加 Netty 框架 --><!-- https://mvnrepository.com/artifact/io.netty/netty-all --><dependency>    <groupId>io.netty</groupId>    <artifactId>netty-all</artifactId>    <version>4.1.56.Final</version></dependency>
Netty 版本阐明

Netty 的 3.x 和 4.x 为支流的稳固版本,而最新的 5.x 曾经是放弃的测试版了,因而举荐应用 Netty 4.x 的最新稳定版。

2. 服务器端实现代码

依照官网的举荐,这里将服务器端的代码分为以下 3 个局部:

  • MyNettyServer:服务器端的外围业务代码;
  • ServerInitializer:服务器端通道(Channel)初始化;
  • ServerHandler:服务器端接管到信息之后的解决逻辑。
PS:Channel 字面意思为“通道”,它是网络通信的载体。Channel 提供了根本的 API 用于网络 I/O 操作,如 register、bind、connect、read、write、flush 等。Netty 本人实现的 Channel 是以 JDK NIO Channel 为根底的,相比拟于 JDK NIO,Netty 的 Channel 提供了更高层次的形象,同时屏蔽了底层 Socket 的复杂性,赋予了 Channel 更加弱小的性能,你在应用 Netty 时根本不须要再与 Java Socket 类间接打交道。

服务器端的实现代码如下:

// 定义服务器的端口号static final int PORT = 8007;/** * 服务器端 */static class MyNettyServer {    public static void main(String[] args) {        // 创立一个线程组,用来负责接管客户端连贯        EventLoopGroup bossGroup = new NioEventLoopGroup();        // 创立另一个线程组,用来负责 I/O 的读写        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            // 创立一个 Server 实例(可了解为 Netty 的入门类)            ServerBootstrap b = new ServerBootstrap();            // 将两个线程池设置到 Server 实例            b.group(bossGroup, workerGroup)                    // 设置 Netty 通道的类型为 NioServerSocket(非阻塞 I/O Socket 服务器)                    .channel(NioServerSocketChannel.class)                    // 设置建设连贯之后的执行器(ServerInitializer 是我创立的一个自定义类)                    .childHandler(new ServerInitializer());            // 绑定端口并且进行同步            ChannelFuture future = b.bind(PORT).sync();            // 对敞开通道进行监听            future.channel().closeFuture().sync();        } catch (InterruptedException e) {            e.printStackTrace();        } finally {            // 资源敞开            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }}/** * 服务端通道初始化 */static class ServerInitializer extends ChannelInitializer<SocketChannel> {    // 字符串编码器和解码器    private static final StringDecoder DECODER = new StringDecoder();    private static final StringEncoder ENCODER = new StringEncoder();    // 服务器端连贯之后的执行器(自定义的类)    private static final ServerHandler SERVER_HANDLER = new ServerHandler();    /**     * 初始化通道的具体执行办法     */    @Override    public void initChannel(SocketChannel ch) {        // 通道 Channel 设置        ChannelPipeline pipeline = ch.pipeline();        // 设置(字符串)编码器和解码器        pipeline.addLast(DECODER);        pipeline.addLast(ENCODER);        // 服务器端连贯之后的执行器,接管到音讯之后的业务解决        pipeline.addLast(SERVER_HANDLER);    }}/** * 服务器端接管到音讯之后的业务解决类 */static class ServerHandler extends SimpleChannelInboundHandler<String> {    /**     * 读取到客户端的音讯     */    @Override    public void channelRead0(ChannelHandlerContext ctx, String request) {        if (!request.isEmpty()) {            System.out.println("接到客户端的音讯:" + request);        }    }    /**     * 数据读取结束     */    @Override    public void channelReadComplete(ChannelHandlerContext ctx) {        ctx.flush();    }    /**     * 异样解决,打印异样并敞开通道     */    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        cause.printStackTrace();        ctx.close();    }}

3.客户端实现代码

客户端的代码实现也是分为以下 3 个局部:

  • MyNettyClient:客户端外围业务代码;
  • ClientInitializer:客户端通道初始化;
  • ClientHandler:接管到音讯之后的解决逻辑。

客户端的实现代码如下:

/** * 客户端 */static class MyNettyClient {    public static void main(String[] args) {        // 创立事件循环线程组(客户端的线程组只有一个)        EventLoopGroup group = new NioEventLoopGroup();        try {            // Netty 客户端启动对象            Bootstrap b = new Bootstrap();            // 设置启动参数            b.group(group)                    // 设置通道类型                    .channel(NioSocketChannel.class)                    // 设置启动执行器(负责启动事件的业务执行,ClientInitializer 为自定义的类)                    .handler(new ClientInitializer());            // 连贯服务器端并同步通道            Channel ch = b.connect("127.0.0.1", 8007).sync().channel();            // 发送音讯            ChannelFuture lastWriteFuture = null;            // 给服务器端发送 10 条音讯            for (int i = 0; i < 10; i++) {                // 发送给服务器音讯                lastWriteFuture = ch.writeAndFlush("Hi,Java.");            }            // 在敞开通道之前,同步刷新所有的音讯            if (lastWriteFuture != null) {                lastWriteFuture.sync();            }        } catch (InterruptedException e) {            e.printStackTrace();        } finally {            // 开释资源            group.shutdownGracefully();        }    }}/** * 客户端通道初始化类 */static class ClientInitializer extends ChannelInitializer<SocketChannel> {    // 字符串编码器和解码器    private static final StringDecoder DECODER = new StringDecoder();    private static final StringEncoder ENCODER = new StringEncoder();    // 客户端连贯胜利之后业务解决    private static final ClientHandler CLIENT_HANDLER = new ClientHandler();    /**     * 初始化客户端通道     */    @Override    public void initChannel(SocketChannel ch) {        ChannelPipeline pipeline = ch.pipeline();        // 设置(字符串)编码器和解码器        pipeline.addLast(DECODER);        pipeline.addLast(ENCODER);        // 客户端连贯胜利之后的业务解决        pipeline.addLast(CLIENT_HANDLER);    }}/** * 客户端连贯胜利之后的业务解决 */static class ClientHandler extends SimpleChannelInboundHandler<String> {    /**     * 读取到服务器端的音讯     */    @Override    protected void channelRead0(ChannelHandlerContext ctx, String msg) {        System.err.println("接到服务器的音讯:" + msg);    }    /**     * 异样解决,打印异样并敞开通道     */    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        cause.printStackTrace();        ctx.close();    }}

从以上代码能够看出,咱们代码实现的性能是,客户端给服务器端发送 10 条音讯。

编写完上述代码之后,咱们就能够启动服务器端和客户端了,启动之后,它们的执行后果如下:

从上述后果中能够看出,尽管客户端和服务器端实现了通信,但在 Netty 的应用中仍然存在粘包的问题,服务器端一次收到了 10 条音讯,而不是每次只收到一条音讯,因而接下来咱们要解决掉 Netty 中的粘包问题。

三、解决 Netty 粘包问题

在 Netty 中,解决粘包问题的罕用计划有以下 3 种:

  1. 设置固定大小的音讯长度,如果长度有余则应用空字符补救,它的毛病比拟显著,比拟耗费网络流量,因而不倡议应用;
  2. 应用分隔符来确定音讯的边界,从而防止粘包和半包问题的产生;
  3. 将音讯分为音讯头和音讯体,在头部中保留有以后整个音讯的长度,只有在读取到足够长度的音讯之后才算是读到了一个残缺的音讯。

接下来咱们别离来看后两种举荐的解决方案。

1.应用分隔符解决粘包问题

在 Netty 中提供了 DelimiterBasedFrameDecoder 类用来以特殊符号作为音讯的结束符,从而解决粘包和半包的问题。

它的外围实现代码是在初始化通道(Channel)时,通过设置 DelimiterBasedFrameDecoder 来分隔音讯,须要在客户端和服务器端都进行设置,具体实现代码如下。

服务器端外围实现代码如下:

/** * 服务端通道初始化 */static class ServerInitializer extends ChannelInitializer<SocketChannel> {    // 字符串编码器和解码器    private static final StringDecoder DECODER = new StringDecoder();    private static final StringEncoder ENCODER = new StringEncoder();    // 服务器端连贯之后的执行器(自定义的类)    private static final ServerHandler SERVER_HANDLER = new ServerHandler();    /**     * 初始化通道的具体执行办法     */    @Override    public void initChannel(SocketChannel ch) {        // 通道 Channel 设置        ChannelPipeline pipeline = ch.pipeline();        // 19 行:设置结尾分隔符【外围代码】(参数1:为音讯的最大长度,可自定义;参数2:分隔符[此处以换行符为分隔符])        pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));        // 设置(字符串)编码器和解码器        pipeline.addLast(DECODER);        pipeline.addLast(ENCODER);        // 服务器端连贯之后的执行器,接管到音讯之后的业务解决        pipeline.addLast(SERVER_HANDLER);    }}

外围代码为第 19 行,代码中曾经备注了办法的含意,这里就不再赘述。

客户端的外围实现代码如下:

/** * 客户端通道初始化类 */static class ClientInitializer extends ChannelInitializer<SocketChannel> {    // 字符串编码器和解码器    private static final StringDecoder DECODER = new StringDecoder();    private static final StringEncoder ENCODER = new StringEncoder();    // 客户端连贯胜利之后业务解决    private static final ClientHandler CLIENT_HANDLER = new ClientHandler();    /**     * 初始化客户端通道     */    @Override    public void initChannel(SocketChannel ch) {        ChannelPipeline pipeline = ch.pipeline();        // 17 行:设置结尾分隔符【外围代码】(参数1:为音讯的最大长度,可自定义;参数2:分隔符[此处以换行符为分隔符])        pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));        // 设置(字符串)编码器和解码器        pipeline.addLast(DECODER);        pipeline.addLast(ENCODER);        // 客户端连贯胜利之后的业务解决        pipeline.addLast(CLIENT_HANDLER);    }}

残缺的服务器端和客户端的实现代码如下:

import io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;import io.netty.handler.codec.Delimiters;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;public class NettyExample {    // 定义服务器的端口号    static final int PORT = 8007;    /**     * 服务器端     */    static class MyNettyServer {        public static void main(String[] args) {            // 创立一个线程组,用来负责接管客户端连贯            EventLoopGroup bossGroup = new NioEventLoopGroup();            // 创立另一个线程组,用来负责 I/O 的读写            EventLoopGroup workerGroup = new NioEventLoopGroup();            try {                // 创立一个 Server 实例(可了解为 Netty 的入门类)                ServerBootstrap b = new ServerBootstrap();                // 将两个线程池设置到 Server 实例                b.group(bossGroup, workerGroup)                        // 设置 Netty 通道的类型为 NioServerSocket(非阻塞 I/O Socket 服务器)                        .channel(NioServerSocketChannel.class)                        // 设置建设连贯之后的执行器(ServerInitializer 是我创立的一个自定义类)                        .childHandler(new ServerInitializer());                // 绑定端口并且进行同步                ChannelFuture future = b.bind(PORT).sync();                // 对敞开通道进行监听                future.channel().closeFuture().sync();            } catch (InterruptedException e) {                e.printStackTrace();            } finally {                // 资源敞开                bossGroup.shutdownGracefully();                workerGroup.shutdownGracefully();            }        }    }    /**     * 服务端通道初始化     */    static class ServerInitializer extends ChannelInitializer<SocketChannel> {        // 字符串编码器和解码器        private static final StringDecoder DECODER = new StringDecoder();        private static final StringEncoder ENCODER = new StringEncoder();        // 服务器端连贯之后的执行器(自定义的类)        private static final ServerHandler SERVER_HANDLER = new ServerHandler();        /**         * 初始化通道的具体执行办法         */        @Override        public void initChannel(SocketChannel ch) {            // 通道 Channel 设置            ChannelPipeline pipeline = ch.pipeline();            // 设置结尾分隔符            pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));            // 设置(字符串)编码器和解码器            pipeline.addLast(DECODER);            pipeline.addLast(ENCODER);            // 服务器端连贯之后的执行器,接管到音讯之后的业务解决            pipeline.addLast(SERVER_HANDLER);        }    }    /**     * 服务器端接管到音讯之后的业务解决类     */    static class ServerHandler extends SimpleChannelInboundHandler<String> {        /**         * 读取到客户端的音讯         */        @Override        public void channelRead0(ChannelHandlerContext ctx, String request) {            if (!request.isEmpty()) {                System.out.println("接到客户端的音讯:" + request);            }        }        /**         * 数据读取结束         */        @Override        public void channelReadComplete(ChannelHandlerContext ctx) {            ctx.flush();        }        /**         * 异样解决,打印异样并敞开通道         */        @Override        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {            cause.printStackTrace();            ctx.close();        }    }    /**     * 客户端     */    static class MyNettyClient {        public static void main(String[] args) {            // 创立事件循环线程组(客户端的线程组只有一个)            EventLoopGroup group = new NioEventLoopGroup();            try {                // Netty 客户端启动对象                Bootstrap b = new Bootstrap();                // 设置启动参数                b.group(group)                        // 设置通道类型                        .channel(NioSocketChannel.class)                        // 设置启动执行器(负责启动事件的业务执行,ClientInitializer 为自定义的类)                        .handler(new ClientInitializer());                // 连贯服务器端并同步通道                Channel ch = b.connect("127.0.0.1", PORT).sync().channel();                // 发送音讯                ChannelFuture lastWriteFuture = null;                // 给服务器端发送 10 条音讯                for (int i = 0; i < 10; i++) {                    // 发送给服务器音讯                    lastWriteFuture = ch.writeAndFlush("Hi,Java.\n");                }                // 在敞开通道之前,同步刷新所有的音讯                if (lastWriteFuture != null) {                    lastWriteFuture.sync();                }            } catch (InterruptedException e) {                e.printStackTrace();            } finally {                // 开释资源                group.shutdownGracefully();            }        }    }    /**     * 客户端通道初始化类     */    static class ClientInitializer extends ChannelInitializer<SocketChannel> {        // 字符串编码器和解码器        private static final StringDecoder DECODER = new StringDecoder();        private static final StringEncoder ENCODER = new StringEncoder();        // 客户端连贯胜利之后业务解决        private static final ClientHandler CLIENT_HANDLER = new ClientHandler();        /**         * 初始化客户端通道         */        @Override        public void initChannel(SocketChannel ch) {            ChannelPipeline pipeline = ch.pipeline();            // 设置结尾分隔符            pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));            // 设置(字符串)编码器和解码器            pipeline.addLast(DECODER);            pipeline.addLast(ENCODER);            // 客户端连贯胜利之后的业务解决            pipeline.addLast(CLIENT_HANDLER);        }    }    /**     * 客户端连贯胜利之后的业务解决     */    static class ClientHandler extends SimpleChannelInboundHandler<String> {        /**         * 读取到服务器端的音讯         */        @Override        protected void channelRead0(ChannelHandlerContext ctx, String msg) {            System.err.println("接到服务器的音讯:" + msg);        }        /**         * 异样解决,打印异样并敞开通道         */        @Override        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {            cause.printStackTrace();            ctx.close();        }    }}

最终的执行后果如下图所示:

从上述后果中能够看出,Netty 能够失常应用了,它曾经不存在粘包和半包问题了。

2.封装音讯解决粘包问题

此解决方案的外围是将音讯分为音讯头 + 音讯体,在音讯头中保留音讯体的长度,从而确定一条音讯的边界,这样就防止了粘包和半包问题了,它的实现过程如下图所示:

在 Netty 中能够通过 LengthFieldPrepender(编码)和 LengthFieldBasedFrameDecoder(解码)两个类实现音讯的封装。和上一个解决方案相似,咱们须要别离在服务器端和客户端通过设置通道(Channel)来解决粘包问题。

服务器端的外围代码如下:

/** * 服务端通道初始化 */static class ServerInitializer extends ChannelInitializer<SocketChannel> {    // 字符串编码器和解码器    private static final StringDecoder DECODER = new StringDecoder();    private static final StringEncoder ENCODER = new StringEncoder();    // 服务器端连贯之后的执行器(自定义的类)    private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler();    /**     * 初始化通道的具体执行办法     */    @Override    public void initChannel(SocketChannel ch) {        // 通道 Channel 设置        ChannelPipeline pipeline = ch.pipeline();        // 18 行:音讯解码:读取音讯头和音讯体        pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));        // 20 行:音讯编码:将音讯封装为音讯头和音讯体,在音讯前增加音讯体的长度        pipeline.addLast(new LengthFieldPrepender(4));        // 设置(字符串)编码器和解码器        pipeline.addLast(DECODER);        pipeline.addLast(ENCODER);        // 服务器端连贯之后的执行器,接管到音讯之后的业务解决        pipeline.addLast(SERVER_HANDLER);    }}

其中外围代码是 18 行和 20 行,通过 LengthFieldPrepender 实现编码(将音讯打包成音讯头 + 音讯体),通过 LengthFieldBasedFrameDecoder 实现解码(从封装的音讯中取出音讯的内容)。

LengthFieldBasedFrameDecoder 的参数阐明如下:

  • 参数 1:maxFrameLength - 发送的数据包最大长度;
  • 参数 2:lengthFieldOffset - 长度域偏移量,指的是长度域位于整个数据包字节数组中的下标;
  • 参数 3:lengthFieldLength - 长度域本人的字节数长度;
  • 参数 4:lengthAdjustment – 长度域的偏移量改正。 如果长度域的值,除了蕴含无效数据域的长度外,还蕴含了其余域(如长度域本身)长度,那么,就须要进行改正。改正的值为:包长 - 长度域的值 – 长度域偏移 – 长度域长;
  • 参数 5:initialBytesToStrip – 抛弃的起始字节数。抛弃处于无效数据后面的字节数量。比方后面有 4 个节点的长度域,则它的值为 4。

LengthFieldBasedFrameDecoder(1024,0,4,0,4) 的意思是:数据包最大长度为 1024,长度域占首部的四个字节,在读数据的时候去掉首部四个字节(即长度域)。

客户端的外围实现代码如下:

/** * 客户端通道初始化类 */static class ClientInitializer extends ChannelInitializer<SocketChannel> {    // 字符串编码器和解码器    private static final StringDecoder DECODER = new StringDecoder();    private static final StringEncoder ENCODER = new StringEncoder();    // 客户端连贯胜利之后业务解决    private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler();    /**     * 初始化客户端通道     */    @Override    public void initChannel(SocketChannel ch) {        ChannelPipeline pipeline = ch.pipeline();        // 音讯解码:读取音讯头和音讯体        pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));        // 音讯编码:将音讯封装为音讯头和音讯体,在响应字节数据后面增加音讯体长度        pipeline.addLast(new LengthFieldPrepender(4));        // 设置(字符串)编码器和解码器        pipeline.addLast(DECODER);        pipeline.addLast(ENCODER);        // 客户端连贯胜利之后的业务解决        pipeline.addLast(CLIENT_HANDLER);    }}

残缺的服务器端和客户端的实现代码如下:

import io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import io.netty.handler.codec.LengthFieldPrepender;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;/** * 通过封装 Netty 来解决粘包 */public class NettyExample {    // 定义服务器的端口号    static final int PORT = 8007;    /**     * 服务器端     */    static class MyNettyServer {        public static void main(String[] args) {            // 创立一个线程组,用来负责接管客户端连贯            EventLoopGroup bossGroup = new NioEventLoopGroup();            // 创立另一个线程组,用来负责 I/O 的读写            EventLoopGroup workerGroup = new NioEventLoopGroup();            try {                // 创立一个 Server 实例(可了解为 Netty 的入门类)                ServerBootstrap b = new ServerBootstrap();                // 将两个线程池设置到 Server 实例                b.group(bossGroup, workerGroup)                        // 设置 Netty 通道的类型为 NioServerSocket(非阻塞 I/O Socket 服务器)                        .channel(NioServerSocketChannel.class)                        // 设置建设连贯之后的执行器(ServerInitializer 是我创立的一个自定义类)                        .childHandler(new NettyExample.ServerInitializer());                // 绑定端口并且进行同步                ChannelFuture future = b.bind(PORT).sync();                // 对敞开通道进行监听                future.channel().closeFuture().sync();            } catch (InterruptedException e) {                e.printStackTrace();            } finally {                // 资源敞开                bossGroup.shutdownGracefully();                workerGroup.shutdownGracefully();            }        }    }    /**     * 服务端通道初始化     */    static class ServerInitializer extends ChannelInitializer<SocketChannel> {        // 字符串编码器和解码器        private static final StringDecoder DECODER = new StringDecoder();        private static final StringEncoder ENCODER = new StringEncoder();        // 服务器端连贯之后的执行器(自定义的类)        private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler();        /**         * 初始化通道的具体执行办法         */        @Override        public void initChannel(SocketChannel ch) {            // 通道 Channel 设置            ChannelPipeline pipeline = ch.pipeline();            // 音讯解码:读取音讯头和音讯体            pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));            // 音讯编码:将音讯封装为音讯头和音讯体,在响应字节数据后面增加音讯体长度            pipeline.addLast(new LengthFieldPrepender(4));            // 设置(字符串)编码器和解码器            pipeline.addLast(DECODER);            pipeline.addLast(ENCODER);            // 服务器端连贯之后的执行器,接管到音讯之后的业务解决            pipeline.addLast(SERVER_HANDLER);        }    }    /**     * 服务器端接管到音讯之后的业务解决类     */    static class ServerHandler extends SimpleChannelInboundHandler<String> {        /**         * 读取到客户端的音讯         */        @Override        public void channelRead0(ChannelHandlerContext ctx, String request) {            if (!request.isEmpty()) {                System.out.println("接到客户端的音讯:" + request);            }        }        /**         * 数据读取结束         */        @Override        public void channelReadComplete(ChannelHandlerContext ctx) {            ctx.flush();        }        /**         * 异样解决,打印异样并敞开通道         */        @Override        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {            cause.printStackTrace();            ctx.close();        }    }    /**     * 客户端     */    static class MyNettyClient {        public static void main(String[] args) {            // 创立事件循环线程组(客户端的线程组只有一个)            EventLoopGroup group = new NioEventLoopGroup();            try {                // Netty 客户端启动对象                Bootstrap b = new Bootstrap();                // 设置启动参数                b.group(group)                        // 设置通道类型                        .channel(NioSocketChannel.class)                        // 设置启动执行器(负责启动事件的业务执行,ClientInitializer 为自定义的类)                        .handler(new NettyExample.ClientInitializer());                // 连贯服务器端并同步通道                Channel ch = b.connect("127.0.0.1", PORT).sync().channel();                // 发送音讯                ChannelFuture lastWriteFuture = null;                // 给服务器端发送 10 条音讯                for (int i = 0; i < 10; i++) {                    // 发送给服务器音讯                    lastWriteFuture = ch.writeAndFlush("Hi,Java.\n");                }                // 在敞开通道之前,同步刷新所有的音讯                if (lastWriteFuture != null) {                    lastWriteFuture.sync();                }            } catch (InterruptedException e) {                e.printStackTrace();            } finally {                // 开释资源                group.shutdownGracefully();            }        }    }    /**     * 客户端通道初始化类     */    static class ClientInitializer extends ChannelInitializer<SocketChannel> {        // 字符串编码器和解码器        private static final StringDecoder DECODER = new StringDecoder();        private static final StringEncoder ENCODER = new StringEncoder();        // 客户端连贯胜利之后业务解决        private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler();        /**         * 初始化客户端通道         */        @Override        public void initChannel(SocketChannel ch) {            ChannelPipeline pipeline = ch.pipeline();            // 音讯解码:读取音讯头和音讯体            pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));            // 音讯编码:将音讯封装为音讯头和音讯体,在响应字节数据后面增加音讯体长度            pipeline.addLast(new LengthFieldPrepender(4));            // 设置(字符串)编码器和解码器            pipeline.addLast(DECODER);            pipeline.addLast(ENCODER);            // 客户端连贯胜利之后的业务解决            pipeline.addLast(CLIENT_HANDLER);        }    }    /**     * 客户端连贯胜利之后的业务解决     */    static class ClientHandler extends SimpleChannelInboundHandler<String> {        /**         * 读取到服务器端的音讯         */        @Override        protected void channelRead0(ChannelHandlerContext ctx, String msg) {            System.err.println("接到服务器的音讯:" + msg);        }        /**         * 异样解决,打印异样并敞开通道         */        @Override        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {            cause.printStackTrace();            ctx.close();        }    }}

以上程序的执行后果为:

四、总结

本文提供了传统 Socket 通信将音讯分为音讯头和音讯体的具体代码实现,然而传统的 Socket 在性能和复用性上体现个别,为了更加高效的实现通信,咱们能够应用 Netty 框架来代替传统的 Socket 和 NIO 编程,但 Netty 在应用时仍然会呈现粘包的问题,于是咱们提供了两种最常见的解决方案:通过分隔符或将封装音讯的解决方案,其中最初一种解决方案的应用更加宽泛。

参考 & 鸣谢

《Netty 外围原理分析与 RPC 实际》

关注公众号「Java中文社群」发现更多干货。

查看 Github 发现更多精彩:https://github.com/vipstone/a...