乐趣区

关于netty:NettyNIO三剖析-Selector

前言

本篇博文是《从 0 到 1 学习 Netty》中 NIO 系列的第三篇博文,次要内容是 介绍通过应用 Selector,一个独自的线程能够无效地监督多个通道,从而进步应用程序的解决效率,往期系列文章请拜访博主的 Netty 专栏,博文中的所有代码全副收集在博主的 GitHub 仓库中;

介绍

在 Java 中,Selector 是 NIO(New Input/Output)库中的一种对象,用于监控多个通道的状态,例如文件 I/O 或者网络 I/O。

Selector 的工作原理是应用 select() 办法轮询已注册的通道,获取它们的就绪状态,并返回一个已筹备好进行 I/O 操作的通道汇合。通过应用此机制,能够监督几个通道的状态,并且只有当至多一个通道处于就绪状态时才会执行 I/O 操作,从根本上防止了 CPU 的节约。

总之,Selector 是一种弱小的工具,可实现高效的 I/O 操作和网络编程,因为它可能轻松地监督多个通道的状态并在须要时对它们进行操作。

应用

1、创立 selector,治理多个 channel

Selector selector = Selector.open();

2、注册 selectorchannel 的分割;

SelectionKey sscKey = ssc.register(selector, 0, null);  
sscKey.interestOps(SelectionKey.OP_ACCEPT);

SelectionKey 示意一个通道(Channel)与一个选择器(Selector)之间的注册关系。每个通道在与选择器进行注册时都会创立一个对应的 SelectionKey 对象,这个对象蕴含了对于通道和选择器的一些元数据信息。

这里 SelectionKey 调用 register() 办法指定感兴趣的事件类型,绑定的 事件类型 有以下几种:

  • connect – 客户端连贯胜利时触发;
  • accept – 服务器端胜利承受连贯时触发;
  • read – 数据可读入时触发,有因为接管能力弱,数据暂不能读入的状况;
  • write – 数据可写出时触发,有因为发送能力弱,数据暂不能写出的状况;

3、通过 selector 监听事件,并取得就绪的通道个数,若没有通道就绪,线程会被阻塞:

  • 阻塞直到绑定事件产生;

    int count = selector.select();
  • 阻塞直到绑定事件产生,或是 超时(工夫单位为 ms);

    int count = selector.select(long timeout);
  • 不会阻塞,即不论有没有事件,立即返回,依据返回值查看是否有事件;

    int count = selector.selectNow();

4、处理事件,SelectionKey 外部蕴含了所有产生的事件:

selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {SelectionKey key = iter.next();
    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
    SocketChannel sc = channel.accept();}

5、整体代码如下所示:

@Slf4j
public class SelectorTest {public static void main(String[] args) {
        try {
            // 1. 创立选择器来治理多个 channel
            Selector selector = Selector.open();

            ByteBuffer buffer = ByteBuffer.allocate(16);
            ServerSocketChannel ssc = ServerSocketChannel.open();
            // 通道必须设置为非阻塞模式
            ssc.configureBlocking(false);

            // 2. 注册 selector 和 channel 的分割
            SelectionKey sscKey = ssc.register(selector, 0, null);
            sscKey.interestOps(SelectionKey.OP_ACCEPT);
            log.debug("Register Key: {}", sscKey);

            ssc.bind(new InetSocketAddress(7999));

            while (true) {
                // 3. 在没有事件产生时,线程阻塞;反之,则线程复原运行
                selector.select();

                // 4. 处理事件,SelectionKey 外部蕴含了所有产生的事件
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();

                while (iter.hasNext()) {SelectionKey key = iter.next();
                    log.debug("Key: {}", key);
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    log.debug("{}", sc);
                }
            }
        } catch (IOException e) {e.printStackTrace();
        }
    }
}

运行后果:

20:22:20 [DEBUG] [main] c.s.n.c.SelectorTest - Register Key: channel=sun.nio.ch.ServerSocketChannelImpl[unbound], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=16, readyOps=0
20:22:35 [DEBUG] [main] c.s.n.c.SelectorTest - Key: channel=sun.nio.ch.ServerSocketChannelImpl[/[0:0:0:0:0:0:0:0]:7999], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=16, readyOps=16
20:22:35 [DEBUG] [main] c.s.n.c.SelectorTest - java.nio.channels.SocketChannel[connected local=/127.0.0.1:7999 remote=/127.0.0.1:61605]

进阶

在上个阶段,咱们只是简略地应用了 selector,但对于其绑定的 事件类型 ,咱们并没有进行特地的关注,然而,在理论利用中,咱们不可能只应用一种 事件类型,因而,咱们须要改良咱们的代码;

这里咱们须要对 事件类型 进行判断,SelectionKey 正好提供了相干的办法:

改良代码如下所示:

while (true) {
    // 3. 在没有事件产生时,线程阻塞;反之,则线程复原运行
    selector.select();

    // 4. 处理事件,SelectionKey 外部蕴含了所有产生的事件
    Set<SelectionKey> keySet = selector.selectedKeys();
    Iterator<SelectionKey> iter = keySet.iterator();
    log.debug("count: {}", keySet.size());

    while (iter.hasNext()) {SelectionKey key = iter.next();
        log.debug("Selection Key: {}", key);

        // 5. 辨别事件类型
        if (key.isAcceptable()) {ServerSocketChannel channel = (ServerSocketChannel) key.channel();
            SocketChannel sc = channel.accept();
            sc.configureBlocking(false);
            sc.register(selector, SelectionKey.OP_READ);
            log.debug("sc Key: {}", sc);
        } else if (key.isReadable()) {SocketChannel channel = (SocketChannel) key.channel();
            ByteBuffer buffer = ByteBuffer.allocate(16);
            channel.read(buffer);
            buffer.flip();
            debugRead(buffer);
            buffer.clear();}

    }
}

然而在运行时会发现报错空指针异样 NullPointerException

16:42:24 [DEBUG] [main] c.s.n.c.SelectorTest - Register Key: channel=sun.nio.ch.ServerSocketChannelImpl[unbound], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=16, readyOps=0
16:42:45 [DEBUG] [main] c.s.n.c.SelectorTest - count: 1
16:42:45 [DEBUG] [main] c.s.n.c.SelectorTest - Selection Key: channel=sun.nio.ch.ServerSocketChannelImpl[/[0:0:0:0:0:0:0:0]:7999], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=16, readyOps=16
16:42:45 [DEBUG] [main] c.s.n.c.SelectorTest - sc Key: java.nio.channels.SocketChannel[connected local=/127.0.0.1:7999 remote=/127.0.0.1:60700]
16:43:02 [DEBUG] [main] c.s.n.c.SelectorTest - count: 2
16:43:02 [DEBUG] [main] c.s.n.c.SelectorTest - Selection Key: channel=java.nio.channels.SocketChannel[connected local=/127.0.0.1:7999 remote=/127.0.0.1:60700], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=1, readyOps=1

+--------+-------------------- read -----------------------+----------------+
position: [0], limit: [6]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 73 69 64 69 6f 74                               |sidiot          |
+--------+-------------------------------------------------+----------------+

16:43:02 [DEBUG] [main] c.s.n.c.SelectorTest - Selection Key: channel=sun.nio.ch.ServerSocketChannelImpl[/[0:0:0:0:0:0:0:0]:7999], selector=sun.nio.ch.WindowsSelectorImpl@4f51b3e0, interestOps=16, readyOps=16
Exception in thread "main" java.lang.NullPointerException
        at com.sidiot.netty.c2.SelectorTest.main(SelectorTest.java:52)

这是因为咱们没有及时 remove() 造成的,当调用了 server.register() 办法后,Selector 中保护了一个汇合,用于寄存 SelectionKey 以及其对应的通道

Selector 的通道对应的事件产生后 SelecionKey 会被放到另一个汇合中,在这个汇合中,即便 SelecionKey 被应用了, 它也不会主动移除,所以在解决完一个事件后,须要手动移除迭代器中的 SelecionKey,否则会导致已被解决过的事件再次被解决,引发一些谬误,例如上述的空指针异样。


当客户端被动断开连接时,也会出现异常,控制台输入如下:

java.net.SocketException: Connection reset
        at java.base/sun.nio.ch.SocketChannelImpl.throwConnectionReset(SocketChannelImpl.java:345)
        at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:376)
        at com.sidiot.netty.c2.SelectorTest.main(SelectorTest.java:60)

这是因为当客户端与服务器之间的连贯断开时,会给服务器端发送一个读事件,因而,咱们须要进行判断,当 channel.read() 的返回值为 - 1 时,示意连贯断开,须要调用 key.cancel() 办法勾销此事件;

改良代码如下所示:

if (key.isReadable()) {SocketChannel channel = (SocketChannel) key.channel();
    ByteBuffer buffer = ByteBuffer.allocate(16);
    try {int read = channel.read(buffer);
        if (read == -1) {key.cancel();
            channel.close();} else {buffer.flip();
            debugRead(buffer);
            buffer.clear();}
        iter.remove();} catch (IOException e) {e.printStackTrace();
        key.cancel();
        channel.close();
        iter.remove();}
}

整体代码如下所示:

@Slf4j
public class SelectorTest {public static void main(String[] args) {
        try {
            // 1. 创立选择器来治理多个 channel
            Selector selector = Selector.open();

            ServerSocketChannel ssc = ServerSocketChannel.open();
            // 通道必须设置为非阻塞模式
            ssc.configureBlocking(false);

            // 2. 注册 selector 和 channel 的分割
            SelectionKey sscKey = ssc.register(selector, 0, null);
            sscKey.interestOps(SelectionKey.OP_ACCEPT);
            log.debug("Register Key: {}", sscKey);

            ssc.bind(new InetSocketAddress(7999));

            while (true) {
                // 3. 在没有事件产生时,线程阻塞;反之,则线程复原运行
                selector.select();

                // 4. 处理事件,SelectionKey 外部蕴含了所有产生的事件
                Set<SelectionKey> keySet = selector.selectedKeys();
                Iterator<SelectionKey> iter = keySet.iterator();
                log.debug("count: {}", keySet.size());

                while (iter.hasNext()) {SelectionKey key = iter.next();
                    log.debug("Selection Key: {}", key);

                    // 5. 辨别事件类型
                    if (key.isAcceptable()) {ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                        SocketChannel sc = channel.accept();
                        sc.configureBlocking(false);
                        sc.register(selector, SelectionKey.OP_READ);
                        log.debug("sc Key: {}", sc);
                        iter.remove();} else if (key.isReadable()) {SocketChannel channel = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(4);
                        
                        try {int read = channel.read(buffer);
                            if (read == -1) {key.cancel();
                                channel.close();} else {buffer.flip();
                                debugRead(buffer);
                                buffer.clear();}
                            iter.remove();} catch (IOException e) {e.printStackTrace();
                            key.cancel();
                            channel.close();
                            iter.remove();}
                    }
                }
            }
        } catch (IOException e) {e.printStackTrace();
        }
    }
}

后记

以上就是 分析 Selector 的所有内容了,心愿本篇博文对大家有所帮忙!

参考:

  • Netty API reference;
  • 黑马程序员 Netty 全套教程;

📝 上篇精讲:「NIO」(二)阻塞模式与非阻塞模式

💖 我是 𝓼𝓲𝓭𝓲𝓸𝓽,期待你的关注;

👍 创作不易,请多多反对;

🔥 系列专栏:摸索 Netty:源码解析与利用案例分享

退出移动版