关于netty:Netty开发及粘包解决

80次阅读

共计 17505 个字符,预计需要花费 44 分钟才能阅读完成。

1. Netty 介绍

Netty 是一款开源的 Java 网络编程框架,广泛应用于很多高流量的服务器端应用程序:

  • 异步和事件驱动:Netty 基于 NIO(非阻塞 I /O)构建,操作都是异步回调来触发事件,如连贯建设、数据达到等。
  • 高性能:Netty 的一大长处就是高性能。它的设计可能让你最大限度地利用古代的多核硬件。
  • 灵便的协定反对:Netty 反对各种协定,包含 TCP、UDP、HTTP/HTTPS、Unix Socket、WebSockets 等。
  • 零拷贝:Netty 反对“零拷贝”,这能够缩小不必要的零碎调用,显著进步数据处理性能。

Netty 目前最新版本是 4.1.95Final

很久之前 Netty 就公布了 5 的测试版本,市场上都有很多介绍 Netty5 的书在卖了,但惋惜问题太多,最终废除了,目前仍然只保护 4 的版本。

1.1. 组件

1.1.1. EventLoopGroup

EventLoopGroup 是一个线程池,用于治理和调度 EventLoop 对象。在 Netty 中,每个 EventLoopGroup 有一个或多个 EventLoop,用于解决连贯申请和 I/O 操作,而每个 EventLoop 是单线程的。

所以 Netty 能够通过 EventLoopGroup 的结构调参,来实现不同的 Reactor 模型:

(1)既可也是单 Reactor 单线程模型:

EventLoopGroup group = new NioEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)

(2)也能够是 主从 Reactor 多线程模型:

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup(n);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)

主从分工

  • BossEventLoopGroup:负责接管客户端的连贯申请。并将连贯申请调配给 workerEventLoopGroup 中的某个 EventLoop 进行解决。BossGroup 中通常只需一个 EventLoop;
  • WorkerEventLoopGroup:负责解决服务器端的连贯和数据读写操作。每个 EventLoop 都绑定在一个具体的线程上,在运行过程中只解决该线程所监听的 IO 事件。workerGroup 通常须要多个 EventLoop。

1.1.2. EventLoop

EventLoop 则是事件循环的外围,负责监听和解决 Channel 中的事件和 I/O 操作。在 EventLoopGroup 中,每个 EventLoop 都是独立的,能够并发地解决多个 Channel 上的事件和 I/O 操作。

1.1.3. Channel 和 ByteBuf

定义

  • Channel:代表了一个网络通道,能够用于进行网络通信。通过应用 Channel,咱们能够连贯到近程服务器、发送和接收数据等。
  • ByteBuf:则是用于治理和操作数据的缓冲区,通过应用 ByteBuf,咱们能够进行数据的读、写、复制、切片、合并等操作。

搭配应用

在 Netty 中,Channel 和 ByteBuf 是紧密结合的,通常一次数据传输会波及到两个 Channel 和两个 ByteBuf 对象,别离代表了发送端和接收端的数据缓冲区。以下是 Channel 和 ByteBuf 的搭配应用流程:

  1. 创立 Channel:首先,咱们须要创立一个 Channel 对象,用于示意一个网络通道。能够通过 Bootstrap 或 ServerBootstrap 类来创立 Channel 对象,并配置其参数和属性。
  2. 写入数据:当须要向近程服务器发送数据时,咱们须要先将数据写入到 ByteBuf 对象中,而后将 ByteBuf 对象写入到 Channel 对象中。在写入数据时,能够通过 write() 或 writeAndFlush() 办法来实现。
  3. 读取数据:当近程服务器发送数据时,咱们须要通过 Channel 对象来读取数据。读取数据时,Channel 对象会将数据存储到 ByteBuf 对象中,咱们能够通过 read() 办法来获取数据或数据大小。在读取数据之后,咱们须要及时开释 ByteBuf 对象,以便防止内存透露和内存溢出等问题。
  4. 开释资源:当数据传输实现后,咱们须要开释 Channel 和 ByteBuf 对象的资源。在 Netty 中,Channel 和 ByteBuf 对象都须要显式地开释资源,以防止内存透露和内存溢出等问题。能够通过 release() 办法来开释 ByteBuf 对象,通过 close() 办法来开释 Channel 对象。

1.1.4. ChannelPipeline 和 Channel

定义

  • Channel:对象示意一个通信通道,能够进行数据的读写和事件的触发等操作。
  • ChannelPipeline:则是一个事件处理器的链表,用于解决 Channel 中的事件和数据。

每个 Channel 都有一个关联的 ChannelPipeline 对象,当有事件产生时,Netty 会将事件从 Channel 中传递到 ChannelPipeline 中,而后依照程序顺次触发各个事件处理器 ChannelHandler 的逻辑。当事件处理完毕后,Netty 会将处理结果返回到 Channel 中,以便进行数据的读写等操作。

在 ChannelPipeline 中,能够增加多个事件处理器,用于解决不同类型的事件和数据。例如,能够增加一个音讯解码器、一个音讯编码器、一个业务逻辑处理器等。每个事件处理器都能够进行特定的逻辑解决,并将处理结果传递给下一个事件处理器。

1.2. 网络协议

Netty 是一个十分弱小和灵便的网络编程框架,它反对多种通信协议。以下是一些 Netty 反对的通信协议:

  • TCP/IP 和 UDP/IP:Netty 提供了底层的网络通信反对,能够构建基于 TCP/IP 或 UDP/IP 的利用。
  • HTTP/HTTPS and HTTP/2:Netty 提供了 HTTP、HTTPS 以及 HTTP/ 2 的高级反对。
  • WebSocket:Netty 反对 WebSocket,容许 Web 浏览器和服务器之间进行全双工通信。
  • Google Protobuf:Netty 为 Google 的 Protobuf 序列化库提供了反对。
  • SSL/TLS:通过 JDK 的 Secure Socket Extension (JSSE),Netty 反对 SSL/TLS 实现平安通信。
  • Unix Domain Socket:从 Netty 4.1 版本开始,Netty 也开始反对 Unix Domain Socket。

因为 Netty 反对的网络协议丰盛,所以当有非 Http 协定网络通信的需要时,大家第一工夫会想到 Netty。

2. 代码示例

2.1. 基于 tcp 协定

2.1.1. 服务端

pom

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.63.Final</version>
        </dependency>

服务端

@Component
public class NettyServer {
    // 创立两个线程组,别离用于接管客户端连贯和解决网络 IO 操作
    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
    private final EventLoopGroup workerGroup = new NioEventLoopGroup();

    @PostConstruct
    public void start() throws InterruptedException {ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                // 指定应用 NioServerSocketChannel 作为通道实现
                .channel(NioServerSocketChannel.class)
                // 定义 ChannelPipeline(多个 ChannelHandler 组合).childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                        ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                        ch.pipeline().addLast(new ServerHandler());
                    }
                });
        // 绑定端口,开始接管进来的连贯
        ChannelFuture f = b.bind(8080).sync();
        if (f.isSuccess()) {System.out.println("启动 Netty 服务胜利,端口号:" + 8080);
        }
    }

    @PreDestroy
    public void shutdown() {bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();}
}

ChannelHandler 音讯解决

public class ServerHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println("Received message from client:" + msg);
        // 回复音讯给客户端
        //ctx.writeAndFlush("Received your message:" + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();
        ctx.close();}
}

2.1.2. 客户端

客户端

@DependsOn({"nettyServer"})
@Component
public class NettyClient {
    private EventLoopGroup group;
    private Channel channel;

    @PostConstruct
    public void start() throws InterruptedException {group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) {ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                        ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                        socketChannel.pipeline().addLast(new ClientHandler());
                    }
                });
        ChannelFuture future = bootstrap.connect("127.0.0.1", 8080).sync();
        if (future.isSuccess()) {System.out.println("连贯服务器胜利");
        }
        channel = future.channel();}

    @PreDestroy
    public void destroy() {if (group != null) {group.shutdownGracefully();
        }
    }

ChannelHandler 音讯解决

public class ClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {System.out.println("Server response:" + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();
        ctx.close();}
}

2.2. 基于 Unix Socket 协定

其余的不变,这里只关注客户服务端代码。

2.2.1. 代码

服务端

private final EventLoopGroup bossGroup = new KQueueEventLoopGroup();
    private final EventLoopGroup workerGroup = new KQueueEventLoopGroup();

    @PostConstruct
    public void start() throws InterruptedException {ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(KQueueServerDomainSocketChannel.class)
                .childHandler(new ChannelInitializer<KQueueDomainSocketChannel>() {
                    @Override
                    public void initChannel(KQueueDomainSocketChannel ch) throws Exception {ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                        ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                        ch.pipeline().addLast(new ServerHandler());
                    }
                });
        ChannelFuture f = b.bind(new DomainSocketAddress("/tmp/test.sock")).sync();
        if (f.isSuccess()) {System.out.println("启动 Netty 服务胜利,文件:" + "/tmp/test.sock");
        }
    }

客户端

public void start() throws InterruptedException {group = new KQueueEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(KQueueDomainSocketChannel.class)
                .handler(new ChannelInitializer<KQueueDomainSocketChannel>() {
                    @Override
                    protected void initChannel(KQueueDomainSocketChannel socketChannel) {socketChannel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
                        socketChannel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
                        socketChannel.pipeline().addLast(new ClientHandler());
                    }
                });

        ChannelFuture future = bootstrap.connect(new DomainSocketAddress("/tmp/test.sock")).sync();

        if (future.isSuccess()) {System.out.println("连贯服务器胜利");
        }

        channel = future.channel();}

2.2.2. 剖析

Unix Socket 协定

Unix Domain Socket(简称 UDS)是一个用于实现本地过程间通信的协定。与应用网络套接字(socket)进行通信不同,UDS 仅用于同一台机器上的相邻过程之间的通信。

在 Unix/Linux 零碎中,UDS 通常被用于代替 TCP/IP 套接字来进步性能和安全性。不过它们能够通过文件系统门路来建设连贯,不能跨机器通信。

Netty 中协定切换

通过比照上述代码,能够看出 netty 中切换协定是比较简单的,换成对应的 Channel 实现类,以及连贯形式就能够了。

因为是 mac 中运行,示例代码中用 KQueueDomainSocketChannel 代替 DomainSocketChannel

2.3. 测试

Controller 发消息

@RestController
public class MsgController {
    @Autowired
    private NettyClient nettyClient;

    @PostMapping("/send")
    public ResponseEntity<Void> sendMsg(@RequestBody String msg) {System.out.println(msg.getBytes(StandardCharsets.UTF_8).length);
        try {for (int i = 0; i < 1000; i++) {nettyClient.send(msg);
            }
            return new ResponseEntity<>(HttpStatus.OK);
        } catch (Exception e) {return new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR);
        }
    }
}

测试后果

后面曾经基于 TCP 协定写好了 netty 的客户端、服务端,
当初写接口,能够通过客户端给服务端发消息,不过单次调用会一次性发 1000 遍。

接口调用传入:hello

预期后果:

Received message from client: hello
Received message from client: hello
... ... // 同样输入 1000 遍

理论后果:

Received message from client: hello
Received message from client: hellohello
Received message from client: hellohe
Received message from client: llohellohellohellohello
Received message from client: hellohellohello
... ... // 无规则

呈现问题的起因就是下一章要将的粘包、拆包问题。

3. 粘包、拆包问题

3.1. 问题剖析

3.1.1. tcp 协定呈现问题的起因

粘包 / 拆包问题是由 TCP 协定自身造成的,和 Netty 自身无关,任何基于 TCP 协定实现数据传输的技术都会面临这个问题,起因如下:

  • 应用程序写入数据的字节大小大于套接字发送缓冲区的大小:这种状况下,会产生拆包景象,发送方的 TCP 协定栈会把一次应用程序的发送操作分成多个数据段进行发送。
  • 进行了屡次写入操作,但数据没有被及时发送进来:这可能是因为 TCP 的 Nagle 算法造成的。
  • 应用程序读取操作不及时:如果接管方面的应用层没有及时读取接收缓冲区的数据,造成沉积,从而造成一个大的数据块。如果此时应用层进行数据读取,就容易读取到多个 TCP 数据段的数据,造成了粘包景象。
  • 网络环境等硬件问题:如网络提早、抖动等,也可能导致多个小的数据包合并为一个大包进行传送,从而导致粘包。

解决粘包和拆包问题的根本策略就是在应用层引入数据边界。常见的办法有:固定长度、分隔符、在包头中退出长度字段 等。

3.1.2. 其余协定为什么没问题

HTTP 协定

HTTP 协定 基于 TCP 协定构建,而 TCP 是一种面向流的协定,所以实践上可能会有粘包问题。然而在理论利用中,HTTP 协定曾经做了明确的分包解决,因而通常不须要开发者去解决粘包问题,HTTP 应用了一些特定的形式来定义数据包的边界:

对于 HTTP/1.0 和 HTTP/1.1,一次残缺的 HTTP 交互由一个申请和一个响应组成,它们都是绝对独立的。申请和响应都有明确的开始行(申请行或状态行)和完结标记(如 Content-Length 头或 chunked 编码表示的音讯体长度)。这样能够很分明地晓得报文的开始和完结,防止了粘包问题。
对于 HTTP/2,它引入了二进制帧的概念。每个帧有明确的长度和类型,这也使得在接收端能够精确地解析出各个帧,防止粘包问题。

UDP 协定

UDP 协定 是一种无连贯的、不牢靠的协定,它并没有像 TCP 协定那样提供流量管制和拥塞管制等性能,因而在传输过程中可能会呈现丢包或乱序等问题。因为 UDP 协定采纳数据报形式进行传输,每个 UDP 数据报都有独立的头部标识,因而不会呈现粘包问题。

WebSocket 协定

WebSocket 协定 建设连贯后,客户端和服务器之间会放弃长时间的连贯状态,能够随时发送和接收数据。当服务器发送数据时,会将数据封装到一个残缺的 WebSocket 帧中,并通过 TCP 协定进行传输。而客户端收到数据后,会从 WebSocket 帧中解析出数据,并进行相应解决。这样就防止了 TCP 协定中的“粘包”和“拆包”问题。

3.1.3. Unix Socket 为什么也有问题

Unix Socket(也被称为 Unix Domain Socket,UDS)次要反对以下两种类型的通信协议:

  • 流式协定 (SOCK_STREAM): 相似于 TCP,在发送和接收数据时提供了字节流服务。数据在两个方向上都是有序的,并且不会反复或者失落。这种模式下,一端发送的数据程序和另一端接管的数据程序是雷同的。
  • 数据报协定 (SOCK_DGRAM): 这种类型的 socket 提供了一种无需连贯的、固定大小的音讯服务,相似于 UDP。每次读操作都返回最多一条残缺的音讯;如果音讯超出缓冲区的大小,那么该音讯可能会被截断。

Unix Socket 的这两种模式在行为上与 TCP 和 UDP 很类似。因而在基于 SOCK_STREAM 协定应用 Netty 开发服务端和客户端时,可能会呈现相似粘包的问题。

后面有现成的基于 Unix Stream 协定实现的代码,咱们同样调用接口试一下,发现 Unix Socket 同样会产生粘包问题

3.1.4. 解决思路

联合 HTTP、UDP、WebSocket 解决粘包 / 拆包问题的思路,同样也能够推导解决 TCP 问题的思路:在发送数据时,应该设计一种协定来确定音讯的边界,比方:增加非凡的分隔符,或者在每个音讯的头部蕴含音讯的长度等。

基于这个思路,Netty 框架提供了 LineBasedFrameDecoder、DelimiterBasedFrameDecoder 和 LengthFieldBasedFrameDecoder 等解决方案,上面一一介绍。

3.2. 解决方案

3.2.1. LineBasedFrameDecoder

解决形式

应用行结束符作为数据包的分隔符。每条音讯前面都有一个行结束符(例如 \n 或 \r\n),它会始终读取字节直到遇到这个结束符,而后把之前读取到的字节组装成一条音讯。

如果没有找到行结束符,那么就认为以后还没有读取到残缺的数据包,须要将曾经读取到的字节保存起来,期待下次读取。

代码 - 客户端批改

发送音讯的办法中,每条音讯结尾都加上行结束符后缀:

public void send(String msg) {if (channel != null) {channel.writeAndFlush(msg + "\\n");
        } else {System.out.println("message sending failed, connection not established");
        }
    }

代码 - 服务端批改

在 ChannelPipeline 中加上下列解码的 ChannelHandler:

ch.pipeline().addLast(new LineBasedFrameDecoder(1024));

局限性

  • 固定的分隔符:次要是通过 \n 或 \r\n 来标识一个残缺的音讯。这意味着如果你的协定中没有应用这两个字符作为完结标记,或者这两个字符在音讯体中有非凡含意,则不能正确工作。
  • 只反对文本数据:次要设计为解决文本协定。对于二进制数据,尤其是蕴含 \n\r\n 的二进制数据,可能会呈现误切割的状况。
  • 无奈解决大数据包:如果一个十分大的数据块在没有任何分隔符的状况下被发送,会耗费大量内存来存储这些数据,直到找到一个分隔符。这可能会导致内存溢出问题。所以构造方法中要设置 maxLength 参数(如示例中的 1024)。

3.2.2. DelimiterBasedFrameDecoder

解决形式

和 LineBasedFrameDecoder 相似,当接管到数据时,会查看是否存在分隔符。如果存在,它就认为曾经读取到了一个残缺的音讯,并将这个消息传递给下一个 ChannelHandler 进行解决。如果不存在,它将持续期待,直到读取到分隔符。

区别在于,前者的分隔符固定,而它的分隔符能够自定义。

代码 - 客户端批改

发送音讯的办法中,每条音讯结尾都加上行结束符后缀:

public void send(String msg) {if (channel != null) {channel.writeAndFlush(msg + "$_");
        } else {System.out.println("message sending failed, connection not established");
        }
    }

代码 - 服务端批改

在 ChannelPipeline 中加上下列解码的 ChannelHandler:

ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));

局限性

  • 依赖于特定的分隔符:须要依赖特定的分隔符来断定一个音讯的完结,然而在某些状况下,这样的分隔符可能并不存在,或者不能很好地被利用在该协定上。同样可能呈现误判。
  • 不适宜二进制协定:因为 DelimiterBasedFrameDecoder 次要是针对文本协定设计的,所以在解决一些二进制协定时可能会遇到困难。
  • 内存问题:如果一个十分大的数据块在没有任何分隔符的状况下被发送,DelimiterBasedFrameDecoder 可能会耗费过多的内存来存储这些数据,直到找到一个分隔符。这可能会导致内存溢出问题。所以也须要设置 maxFrameLength(如示例中的 1024)。

3.2.3. FixedLengthFrameDecoder

解决形式

工作原理次要是每次从 ByteBuf 中读取固定长度的字节,而后结构成一个独立的 frame 对象,传递给下一个 handler 解决。

这样能够确保不会因为 TCP 粘包导致多个音讯被当作一个音讯解决,也不会因为 TCP 拆包导致一个音讯被当作多个音讯解决。

代码 - 服务端批改

在 ChannelPipeline 中加上下列解码的 ChannelHandler:

ch.pipeline().addLast(new FixedLengthFrameDecoder(5));

因为传输的参数“hello”是 5 个字节,这类就固定为 5.

局限性

  • 固定长度限度: FixedLengthFrameDecoder 只能解决固定长度的音讯,如果理论利用中的音讯长度不固定,那么就无奈应用 FixedLengthFrameDecoder 进行解码。相应地,如果音讯长度小于固定长度,那么必须填充到固定长度,这就可能会节约带宽。
  • 无内置校验: FixedLengthFrameDecoder 仅仅是依照固定长度切分音讯,它并不关怀音讯的完整性和正确性。如果你想对音讯进行校验,须要本人实现。

3.2.4. LengthFieldBasedFrameDecoder

解决形式

  • 长度字段标识: LengthFieldBasedFrameDecoder 解决粘包问题的形式次要是通过在数据包中增加一个示意后续数据长度的字段,这个字段的地位和长度能够由开发者自定义,解码器会依据这个长度字段得悉具体的音讯体长度,而后进行正确的截取。
  • 校验读取: 当接管到新的数据包时,解码器首先找到长度字段,读取出音讯体的长度,而后期待足够长度的数据达到后,再从 ByteBuf 中读取,造成一个残缺的音讯帧。
  • 打消半包读取: 通过以上形式,LengthFieldBasedFrameDecoder 能够确保每次都能从 ByteBuf 中读取到残缺的音讯帧,不会呈现只读取到半个音讯帧的状况。

在网络通信中,发送和接收数据须要遵循同一种协定。LengthFieldBasedFrameDecoder 是一个基于长度字段的解码器,而 LengthFieldPrepender 则是一个对应的编码器,它会在音讯体后面加上一个长度字段。

它们个别会配套应用,这样发送端发送的数据和接收端接管的数据结构就会保持一致,从而可能正确地进行解码。

代码 - 客户端批改

增加 ChannelHandler 实现,通过 LengthFieldPrepender 这个编码器,在发送的音讯前增加长度字段(这里的 4 是指长度字段自身占用的字节数量):

socketChannel.pipeline().addLast(new LengthFieldPrepender(4));

代码 - 服务端批改

在 ChannelPipeline 中加上下列解码的 ChannelHandler:

ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));

4. Netty 个性优化

4.1. 内存池 PooledByteBufAllocator

内存池是一种用于治理和复用内存块的技术。能够防止频繁地调配和开释内存,从而缩小零碎开销和内存碎片问题,进步零碎的效率和性能。

PooledByteBufAllocator(分配器 [ˈæləˌkeɪtər])是 Netty 提供的一个基于内存池的 ByteBuf 分配器。与间接创立新的 ByteBuf 实例相比,PooledByteBufAllocator 提供了重用内存的能力,这能够显著缩小内存调配和垃圾收集的开销,进步性能:

  • 内存分区:PooledByteBufAllocator 将内存划分为多个 Arena,每个 Arena 进一步划分为多个 Chunk 和 Page。通过这种形式,PooledByteBufAllocator 可能满足不同大小的内存需要,并且可能疾速找到适合的内存块进行调配。
  • 对象复用:当 ByteBuf 的援用计数为 0 时,它的内存会被返回到原来的 Arena 并能够被重用。这防止了频繁创立和销毁对象,升高了零碎开销。
  • 线程本地缓存:PooledByteBufAllocator 应用了线程本地缓存技术(Thread Local Cache),每个线程都有本人的一份缓存池,这能够缩小线程间的竞争,进一步提高性能。
  • 内存调配策略:对于小于 Page 大小的内存调配申请,PooledByteBufAllocator 应用 jemalloc 策略进行内存调配。这是一种高效的内存调配策略,可能缩小内存碎片,进步内存使用率。

通过这些形式,PooledByteBufAllocator 能够无效地复用内存,进步了内存应用的效率和性能。

PooledByteBufAllocator 创立 ByteBuf 过程

PooledByteBufAllocator allocator = new PooledByteBufAllocator();

// 别离调配堆内存、堆外内存,内存大小也能够指定,如: allocator.heapBuffer(1024);
ByteBuf heapBuffer = allocator.heapBuffer();
ByteBuf directBuffer = allocator.directBuffer(); 

// 失常将写入数据或读取
heapBuffer.writeBytes(data);
byte b = heapBuffer.readByte();

// 记得不必时开释内存,堆外内存不受垃圾回收,不开释会有内存泄露
heapBuffer.release();
directBuffer.release();

不过理论我的项目中,很少有见过通过创立 PooledByteBufAllocator,再创立 ByteBuf 的。

根本都是由 Unpooled 工具类 创立 ByteBuf。

创立:堆内内存 OR 堆外内存?

(1)堆内内存:

如果你须要解决的数据比拟小(比方几 KB 或几百 KB),而且须要进行频繁的读写操作,那么倡议应用堆内内存。

(2)堆外内存:

如果你须要解决的数据比拟大(比方几 MB 或几十 MB),而且须要进行频繁的 IO 操作,那么倡议应用堆外内存。堆外内存是由操作系统治理的,数据存储在操作系统的内存中,能够间接进行 IO 操作。此外,在应用堆外内存时,能够防止 Java 堆和操作系统之间的数据拷贝,缩小了零碎的开销和提早。

须要留神的是,堆外内存的申请和开释须要调用 JNI 接口,因而申请和开释堆外内存的开销会比拟高。因而一般来说:

对于小规模的数据处理利用,倡议应用堆内内存;对于大规模的数据处理利用,倡议应用堆外内存

4.2. 内存池 Unpooled

Unpooled 是 Netty 中一个工具类,用于创立不同类型的 ByteBuf 对象,而且同样是应用 PooledByteBufAllocator 类来调配和治理内存。

只不过它提供了一些静态方法,能够很不便地创立 HeapBuf、DirectBuf、CompositeBuf 等类型的 ByteBuf 对象。常见办法:

  • buffer():创立一个 HeapBuf 对象,应用 JVM 堆内存来存储数据。
  • directBuffer():创立一个 DirectBuf 对象,应用间接内存来存储数据。
  • wrappedBuffer():创立一个 CompositeBuf 对象,能够将多个 ByteBuf 对象合并成一个虚构的 ByteBuf 对象。
  • copiedBuffer():创立一个 HeapBuf 对象,并将字节数组的内容复制到 HeapBuf 中。
  • unsafeBuffer():创立一个不平安的 ByteBuf 对象,用于一些非凡的场景,例如 JNI 调用等。

不过同样要记得在应用结束后,应该及时调用 release() 办法来开释 ByteBuf 对象的资源哦。

回顾一下:思考到 Netty 中 ByteBuf 等罕用类,为防止频繁地调配和开释内存,通过内存池实现内存复用。但 ByteBuf 也是类,频繁地创立、销毁对象同样有大量的性能开销,怎么优化?

那么接下来咱们看一下 对象池。

4.3. 对象池 Recycler

Recycler(回收器,[ˌriːˈsaɪkl])是 Netty 是一个对象池,次要用于重用对象,防止频繁创立和销毁带来的性能开销。被宽泛地利用于各种场景中,例如 ByteBuf 对象池、EventExecutor 对象池、ChannelHandlerContext 对象池等等。咱们还是来看看 ByteBuf。

ByteBuf 中蕴含一个 Recycler.Handle 对象,用于治理 ByteBuf 对象池的创立和销毁。当须要创立一个新的 ByteBuf 对象时,无论通过后面介绍的 PooledByteBufAllocator、Unpooled,都是通过 ByteBufAllocator 接口提供的 directBuffer() 或 heapBuffer() 等办法来创立。

这些办法就是基于 Recycler,会主动从线程本地的对象池中获取一个 ByteBuf 对象,如果对象池为空,则会创立一个新对象,并将其退出对象池中。当不再须要这个对象时,能够通过调用 release() 办法将其回收到对象池中,期待下次应用。

ChannelHandlerContext 对象池也相似,在 Netty 中,能够通过 ChannelHandlerContext 的 newContext() 办法来获取一个新的 ChannelHandlerContext 对象,这个办法会从 Recycler 对象池中获取一个 ChannelHandlerContext 对象并进行初始化,如果没有可用的对象,则会创立一个新对象。在应用完后,通过调用 ChannelHandlerContext 的 recycle() 办法将其回收到对象池中,期待下次应用。

当然 Recycler 是 Netty 中实现对象池的机制,并不局限于只有 Netty 的这些组件类能够用,任何咱们自定义的类都能够。上面看一个例子。

示例(任何对象)

public class UserCache {private static final Recycler<User> userRecycler = new Recycler<User>() {
        @Override
        protected User newObject(Handle<User> handle) {return new User(handle);
        }
    };

    static final class User {

        private String name;
        private Recycler.Handle<User> handle;

        public void setName(String name) {this.name = name;}

        public String getName() {return name;}

        public User(Recycler.Handle<User> handle) {this.handle = handle;}

        public void recycle() {handle.recycle(this);
        }
    }

    public static void main(String[] args) {User user1 = userRecycler.get();
        user1.setName("hello");
        user1.recycle();
        User user2 = userRecycler.get();
        System.out.println(user1 == user2);
    }
}

右边的例子中,咱们定义了一个 User 类,main 办法中,user1.recycle(),user1 回收了之后,而后 user2 再获取。

  • (1)user2 获取的仍然是同一个对象,所以打印出的后果是:hello 和 true。
  • (2)如果咱们正文掉 user1.cecycle(),user2 会获取不到对象,打印的后果就是:null 和 false。

线程平安

另外,Recycler 应用线程本地变量(FastThreadLocal)来存储对象,每个线程都有一个独立的对象池。这个机制能够保障对象的安全性和线程相互独立,防止了线程平安问题和竞争条件的呈现。

那么这个 FastThreadLocal 是啥?和常见的 ThreadLocal 有啥关系呢?

4.4. 本地线程优化 FastThreadLocal

FastThreadLocal(更快的 ThreadLocal)是 Netty 本人研发的一个工具类,用于替换 Java 原生的 ThreadLocal。次要有以下几个起因:

  • 性能:与 ThreadLocal 相比,FastThreadLocal 在存取线程局部变量时有更快的速度。在 ThreadLocal 中,每次获取变量都须要通过哈希映射进行查找,当线程局部变量很多时,这会成为一个性能瓶颈。而 FastThreadLocal 则将所有线程的局部变量存储在一个数组中,通过索引疾速定位,进步了存取速度。
  • 防止内存透露 :ThreadLocal 在应用不过后,很容易造成内存透露,须要咱们在应用后再手动调用 reomve() 办法。而 FastThreadLocal 能无效防止这个问题。它会在每个线程完结时主动清理线程局部变量,而不是依赖于 JVM 的垃圾回收。
  • 更好的整合:Netty 中很多中央应用了线程局部变量,例如 ByteBuf 的内存池、Recycler 对象池等。有了本人的 FastThreadLocal,Netty 能够更好地管制和优化这些性能,进步整体性能。

代码示例

public class FastThreadLocalDemo {private static final FastThreadLocal<Integer> THREAD_LOCAL = new FastThreadLocal<Integer>() {
        @Override
        protected Integer initialValue() throws Exception {return 1;}
    };

    public static void main(String[] args) {new FastThreadLocalThread(() -> {for (int i = 0; i < 10; i++) {System.out.println(Thread.currentThread().getName() + "-->" + THREAD_LOCAL.get());
                THREAD_LOCAL.set(THREAD_LOCAL.get() + 1);
            }
        }, "FastThreadLocalThread-1").start();}
}

注意事项

FastThreadLocal 的应用形式和 ThreadLocal 差异不大,然而有几点须要留神:

  • 应用 FastThreadLocal 的线程最好是 FastThreadLocalThread 类型或者其子类。FastThreadLocal 会在这些线程中有更好的性能。如果应用的是 Thread 或其余实现的话,FastThreadLocal 依然能够工作,但性能会降级。
  • 相比于 ThreadLocal,FastThreadLocal 的劣势在于当一个线程有多个线程本地变量时,它能够通过缩小哈希抵触和查找来进步性能。然而如果一个线程只有一个或者很少的线程本地变量,那么 ThreadLocal 可能会有更好的性能。
  • 当你不再须要应用 FastThreadLocal 中的对象时,还是应该调用 remove() 来防止内存透露。

虽说在应用了 FastThreadLocalThread 实例的状况下,在线程完结时,FastThreadLocal 会主动清理所有线程局部变量。但显式地调用 remove() 办法依然是一个好的实际。特地是在长生命周期的线程或者应用了线程池的状况下,显式地清理线程局部变量能够帮忙防止潜在的内存透露问题。

正文完
 0