关于后端:Netty萌新入门三ChannelFuture-与-CloseFuture

4次阅读

共计 4854 个字符,预计需要花费 13 分钟才能阅读完成。

前言

本篇博文是《从 0 到 1 学习 Netty》中入门系列的第三篇博文,次要内容是 介绍 Netty 中 ChannelFuture 与 CloseFuture 的应用,解决连贯问题与敞开问题,往期系列文章请拜访博主的 Netty 专栏,博文中的所有代码全副收集在博主的 GitHub 仓库中;

连贯问题与 ChannelFuture

在 Netty 中,所有的 I/O 操作都是异步的,因而当你发动一个 I/O 操作时,它会立刻返回一个 ChannelFuture 对象,该对象代表了尚未实现的操作。ChannelFuture 提供了一种在操作实现时告诉应用程序的机制,以便应用程序能够执行某些操作或检索操作的后果。

例如,在写入数据到 Channel 时,调用 write() 办法将立刻返回一个 ChannelFuture 对象,而不是期待数据理论被写入。通过增加侦听器(Listener)到 ChannelFuture,当写操作实现时,侦听器将被告诉,从而使应用程序可能对写入数据的后果做出响应。

sync

sync() 办法是 ChannelFuture 接口中的一个 同步办法,它将阻塞以后线程,直到这个 ChannelFuture 执行结束。调用 sync() 办法后会期待对应的 I/O 操作实现,如果操作失败则会抛出异样。

复用上篇博文 从 0 到 1(七):入门 -EventLoop 中的服务端代码,稍微调整一下客户端代码如下:

@Slf4j
public class ChannelFutureClient {public static void main(String[] args) throws InterruptedException {ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress(7999));

        channelFuture.sync();
        Channel channel = channelFuture.channel();
        log.debug(channel.toString());
        channel.writeAndFlush("sidiot.");
    }
}

服务端运行后果:

20:24:04 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h1: sidiot..
20:24:09 [DEBUG] [defaultEventLoopGroup-2-1] c.s.n.c.EventLoopServer - h2: sidiot..

但如果将 channelFuture.sync(); 正文掉后,会发现客户端运行之后,服务端并没有像之前一样接管到音讯。

客户端运行后果:

# 存在 sync()
20:24:04 [DEBUG] [main] c.s.n.c.ChannelFutureClient - [id: 0x473d8e1a, L:/169.254.80.84:57837 - R:IDIOT/169.254.80.84:7999]

# 正文 sync()
20:24:14 [DEBUG] [main] c.s.n.c.ChannelFutureClient - [id: 0x871ab919]

这是因为 ChannelFuture 是用于异步操作后果告诉的类。调用 sync() 将会阻塞以后线程,期待异步操作实现并获取其后果。如果正文掉了 sync() 办法,则程序不会等到连贯建设胜利后再向服务端发送音讯,而是间接执行 writeAndFlush() 办法,此时连贯还没有建设胜利,所以服务端收不到客户端发的音讯。

应用 sync() 办法能够保障在后续代码执行之前,实现以后的操作,这样能够防止一些并发问题。然而须要留神的是,因为 sync() 办法会阻塞以后线程,因而应该尽可能地防止在 I/O 线程中调用 sync() 办法,免得影响整个零碎的性能体现。


addListener

除了 sync() 办法之外,咱们还能够应用 addListener() 办法来处理结果。

在 Netty 中,addListener() 办法是 异步办法,其作用是向 ChannelFuture 增加一个或多个 GenericFutureListener 监听器,用于监听异步操作(例如网络 I/O 操作)执行实现时的事件。当异步操作实现后,这些监听器会被告诉,并且能够获取到操作的后果。

channelFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {Channel channel = future.channel();
        log.debug(channel.toString());
        channel.writeAndFlush("sidiot.");
    }
});

运行后果:

# 服务端
21:21:03 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: sidiot..
21:21:08 [DEBUG] [defaultEventLoopGroup-2-6] c.s.n.c.EventLoopServer - h2: sidiot..

# 客户端
21:21:03 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.ChannelFutureClient - [id: 0xc4465d09, L:/169.254.80.84:58393 - R:IDIOT/169.254.80.84:7999]

比照应用 sync()addListener() 两个办法的客户端后果能够发现,应用 sync() 的客户端的解决线程是以后线程,即 main 线程,而 addListener() 因为是异步办法的关系,其客户端的解决线程就不是以后线程,而是 NIO 线程 nioEventLoopGroup-2-1


小结

sync()addListener() 都是用于在不同组件之间进行通信的办法,但它们的实现形式略有不同。

sync() 是一种通过将属性绑定到一个共享状态来实现组件之间通信的办法。当某个组件更改该绑定的属性时,其余所有应用该属性的组件都会自动更新。这种办法的长处是简略间接,可能疾速实现组件之间的数据同步,但毛病是对于大型应用程序,应用全局状态治理可能会变得复杂和凌乱。

相比之下,addListener() 则是一种更加灵便的办法,它容许组件之间准确地管制何时以及如何进行通信。addListener() 能够被用于创立事件监听器,使得一个组件能够注册到另一个组件中产生的事件的告诉。当事件产生时,触发监听器并向其传递相应的数据。这种办法的长处是,更容易实现针对特定事件的精密管制,并且能够缩小对全局状态的依赖。

因而,总的来说,addListener() 更灵便,并且能够更好地适应简单的应用程序需要,而 sync() 则更适宜简略的利用场景。

敞开问题与 CloseFuture

在后面的博文中,博主都是以 DEBUG 的模式来操作客户端的,但这时的客户端都不是被失常敞开的,因而,接下来批改一下代码,使得客户端可能一直向服务端发送音讯,并在某一时刻可能被敞开:

Channel channel = channelFuture.sync().channel();
log.debug(channel.toString());

new Thread(() -> {Scanner scanner = new Scanner(System.in);
    while (true) {String line = scanner.nextLine();
        if ("quit".equals(line)) {channel.close();
            break;
        }
        channel.writeAndFlush(line);
    }
}, "input").start();

log.debug("解决 channel 敞开之后的操作");

运行后果:

能够发现“解决 channel 敞开之后的操作”并没有等 channel 敞开之后再进行,这是因为在 input 线程运行过程中并没有阻塞主线程,因而,主线程就会持续向下运行,造成了下面的状况;

那如果将“解决 channel 敞开之后的操作”挪动到 channel.close(); 前面是不是就能够了呢?

if ("quit".equals(line)) {channel.close();  
    log.debug("解决 channel 敞开之后的操作");  
    break;  
}

接下来咱们进行验证,在 pipeline 中新增一个 handler:

ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));

同时须要在配置文件 logback.xml 中减少下述代码:

<logger name="io.netty.handler.logging.LoggingHandler" level="DEBUG" additivity="false">  
    <appender-ref ref="STDOUT" />  
</logger>

运行后果:

依据运行后果能够发现,将“解决 channel 敞开之后的操作”挪动到 channel.close(); 前面的办法也是行不通的,因为这两个操作不属于同一个线程;

“解决 channel 敞开之后的操作”是在 input 线程中执行的,而 channel.close(); 则是在 NIO 线程 nioEventLoopGroup-2-1 中所执行的,因而两个线程谁先谁后是不肯定的,这是由 CPU 调度器决定的;


这里,咱们能够应用 closeFuture() 来解决问题,closeFuture() 办法能够让咱们监听 Channel 敞开事件,从而在 Channel 敞开后执行一些特定的逻辑。例如,在解决连贯断开的状况下,咱们能够期待 closeFuture() 的实现,并在其实现后开释资源或清理状态。

closeFuture()ChannelFuture() 类似,同样是有同步办法 sync 和异步办法 addaddListener 两种形式;

sync

ChannelFuture closeFuture = channel.closeFuture();
System.out.println("Waiting Close...");
closeFuture.sync();
log.debug("解决 channel 敞开之后的操作");

运行后果:

addaddListener

ChannelFuture closeFuture = channel.closeFuture();
System.out.println("Waiting Close...");
closeFuture.addListener((ChannelFutureListener) future -> {log.debug("解决 channel 敞开之后的操作");  
    group.shutdownGracefully();});

运行后果:

后记

以上就是 ChannelFuture 与 CloseFuture 的所有内容了,心愿本篇博文对大家有所帮忙!

参考:

  • Netty API reference;
  • 黑马程序员 Netty 全套教程;

📝 上篇精讲:「萌新入门」(二)分析 EventLoop

💖 我是 𝓼𝓲𝓭𝓲𝓸𝓽,期待你的关注;

👍 创作不易,请多多反对;

🔥 系列专栏:摸索 Netty:源码解析与利用案例分享

正文完
 0