本篇咱们就尝试写一个一个聊天室的例子来领会多路复用,如果你不懂什么叫I/O多路复用,能够看下我写的这篇文章 《Socket简介和I/O多路复用》。 咱们该当清晰的意识到没有操作系统提供的I/O多路复用机制,JVM也是无奈做到多路复用的。

Selector 监督者 轮询者 选择器

Selector中的一个类,也就是监督Socket流状态的类,调用的还是操作系统提供的服务,须要配合通道和缓冲区应用。
不论是多路复用还是非多路复用咱们总须要一个socket,不过多路复用的时候在java中是ServerSocketChannel。

   ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();   // 将通道设置为非阻塞,为多路复用做筹备。   serverSocketChannel1.configureBlocking(false);   ServerSocket serverSocket = serverSocketChannel1.socket();   // 绑定端口   serverSocket.bind(new InetSocketAddress(8888));

选择器呢,怎么体现哪个客户端有数据就调用对应的代码来解决客户端呢? 还是在ServerSocketChannel上,当有数据到来时咱们就激活这个通道,从缓冲区中获取数据。因而咱们须要让Selector来治理ServerSocketChannel,那么就是通过注册。
也就是这样:

    Selector selector = Selector.open();    ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();    serverSocketChannel1.configureBlocking(false);    ServerSocket serverSocket = serverSocketChannel1.socket();    serverSocket.bind(new InetSocketAddress(8888));    serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);

网络通信中JVM形象出了四种通信状态,在SelectionKey中寄存:

  • OP_READ 数据曾经筹备好了,能够读了,也成为了读就绪。
  • OP_WRITE 曾经能够通过通道向缓冲区中写数据了,也称为写就绪。
  • OP_CONNECT 连贯曾经就绪
  • OP_ACCEPT 服务端曾经监听到了客户端发动的连贯申请,能够与服务端建设连贯的。

从jdk的正文咱们就能够看出,OP_ACCEPT用于服务端程序,因为一个服务端会有许多客户端,先有是否有连贯的申请,再有数据是否筹备实现,是否可写。OP_CONNECT用于客户端,当连贯建设,我才能够向服务端写数据和读数据。

当通道上有选择器感兴趣的事件产生,比方: 连贯的申请,再客户端和服务端连贯建设后是可读和可写。这些事件会被选择器放入一个汇合中。咱们就能够循环这个汇合,从客户端读取或写。一个客户端对应一些事件,这些事件不可能同时实现,总会有先后顺序,在收到客户端发动的连贯申请之后,咱们从中提取申请。

特地留神OP_ACCEPT只能用于选择器治理ServerSocketChannel。OP_CONNECT只能用于选择器治理SocketChannel。

聊天室服务端

介绍选择器的一部分代码就是聊天室服务端代码的一部分。咱们先介绍大抵的思路,在上对应的代码。

惯例状况下咱们还是创立用ServerSocketChannel创立Socket,而后绑定端口。用选择器治理通道,当通道上有选择器最后所感兴趣的事件之后,留神尽管咱们只注册了一个对连贯感兴趣的事件,然而实质上还是用的Socket,所以在服务端发现有发动建设连贯的申请之后,服务端就会尝试和客户端建设连贯,连贯还是为了通信。

上文咱们提到,在通道有选择器所感兴趣的事件之后,会将放入到一个汇合中,也就是Set<SelectionKey>。通过SelectionKey,咱们能够为减少选择器和通道之间的关联,最后选择器只会在通道有申请的连贯时被激活,只会向汇合中增加一个读就绪事件。咱们能够通过SelectionKey的interestOps()为选择器增加读就绪事件和写就绪事件。而后就能够灵便的解决了,也就是说在读就绪的时候读,写就绪的时候写。

每次向选择器注册通道时就会创立一个SelectionKey键,也就是抉择治理这个通道。通过调用某个SelectionKey的 cancel 办法、敞开其通道,或者通过敞开其选择器来勾销 该键之前,它始终放弃无效。

public class ChatServer {    // 存储客户端信息,一个客户端一个Channel            private static Map<String, SocketChannel> map = new HashMap<>();    public static void main(String[] args) throws IOException {        // 创立选择器                Selector selector = Selector.open();        //创立服务端的通道        ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();        // 将该通道设置为非阻塞形式        serverSocketChannel1.configureBlocking(false);        // 创立Socket        ServerSocket serverSocket = serverSocketChannel1.socket();        // 绑定端口        serverSocket.bind(new InetSocketAddress(8888));        // 为ServerSocketChannel注册对对客户端申请连贯事件感兴趣        // 此时该channel处于选择器得治理之下        serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);        // 因为不分明有多少客户端会连贯,所以是while(true)        while (true) {            // 该办法将始终阻塞,晓得选择器治理的通道上有选择器感兴趣的事件产生。            selector.select();            // 每次向选择器注册通道时就会创立一个SelectionKey键            // 每个SelectionKey与对应的通道相连,存储通道的状态            // 一个客户端一个SocketChannel            Set<SelectionKey> keys = selector.selectedKeys();            Iterator<SelectionKey> iterator = keys.iterator();            while (iterator.hasNext()) {                SocketChannel clietChannel = null;                SelectionKey key = iterator.next();                // 如果有客户端申请建设连贯                if (key.isAcceptable()) {                   // 获取SelectionKey关联的通道                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();                    // 提取申请,获取对应的socketChannel                    clietChannel = serverSocketChannel.accept();                    // 将解决对应的客户端信息socketChannel设置为非阻塞                    clietChannel.configureBlocking(false);                    // 为该通道注册读就绪事件, 选择器会询问该通道的状态,当该通道就绪时,                    clietChannel.register(selector, SelectionKey.OP_READ);                    // 记录解决对应客户端的channel                    String id = "key=" + (int) (Math.random() * 9000 + 1000);                    map.put(id, clietChannel);                } else if (key.isReadable()) {                    //读就绪,通过通道向缓冲区中取数据                    clietChannel = (SocketChannel) key.channel();                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);                    String receiveMsg = null;                    int result = -1;                    try {                        result = clietChannel.read(byteBuffer);                    } catch (IOException e) {                        // 正在读的时候服务端断开连接,此时数据无奈持续读取                        String clientKey = getClientKey(clietChannel);                        System.out.println("退出了战场:" + clientKey);                        map.remove(clientKey);                        // 开释资源                        clietChannel.close();                        key.cancel();                        continue;                    }                    if (result > 0) {                        // 读之前要flip                        byteBuffer.flip();                        //避免乱码                        Charset charset = Charset.forName("UTF-8");                        receiveMsg = String.valueOf(charset.decode(byteBuffer).array());                        System.out.println(clietChannel + ":" + receiveMsg);                        if ("conncet".equals(receiveMsg)) {                            receiveMsg = "新客户端上线";                        }                        System.out.println("读到了信息" + receiveMsg);                        //将信息搭载在key上,为播送进来做筹备。                        // 因为咱们做的是聊天室,相当于群聊。                           key.attach(receiveMsg);                        // key关联通道,给key注册就相当于给通道注册。                         // 给对应的通道注册写事件。                        key.interestOps(SelectionKey.OP_WRITE);                    }                } else if (key.isWritable()) {                    //channel能够读能够写                    clietChannel = (SocketChannel) key.channel();                    // 获取对应的客户端                    String name = getClientKey(clietChannel);                    for (Map.Entry<String, SocketChannel> entrySet : map.entrySet()) {                        SocketChannel entryValue = entrySet.getValue();                        ByteBuffer broadCast = ByteBuffer.allocate(1024);                        broadCast.put((name + ":" + key.attachment()).getBytes());                        broadCast.flip();                        //将缓冲区的货色发送给每一个通道                        entryValue.write(broadCast);                    }                    // 给key关联的通道注册读事件。                    key.interestOps(SelectionKey.OP_READ);                } else {                      System.out.println("hello world");                }            }            //清空            keys.clear();        }    }        private static String getClientKey(SocketChannel clietChannel) {        String key = null;        Set<Map.Entry<String, SocketChannel>> entrySet = map.entrySet();        for (Map.Entry<String, SocketChannel> value : entrySet) {            if (value.getValue() == clietChannel) {                key = value.getKey();                break;            }        }        return key;    }}

为什么清空keys?

为什么最初清空 Set<SelectionKey>?我清空了还怎么用?让我来解释一下,事实上选择器保护了三张表(三个汇合)。

  • 注册表: 当咱们为通道注册事件,也就是将该通道纳入到选择器得管辖范畴之内时,也就是channel.register。就会产生一个对应的key进入这张表。只有当咱们调用key.cancel()办法时,这个key才会被移除。通过Selector类中的keys()返回这个汇合。
  • 已就绪表: 当咱们调用selector.select()办法,这个选择器进入注册表查找,查看哪个通道上的事件是曾经就绪了,而后将这些可就绪的key(就是selectionKey,通过selectionKey获取所关联通道的状态,即通道上的事件是否就绪),这个表不会被选择器清空,即便咱们再次调用selector.select(),他也不会清空这张表中已存在的key。
  • 通道脱离管辖表: 也就是说selectionKey调用cancle()办法,那么该selectionKey所对应的通道将会脱离选择器的管辖。
这就是咱们最初调用清空keys汇合的起因,如果遍历该汇合之后,所有就绪的事件都曾经解决而不清空,那么下次就绪的key咱们就获取不到。因为set是不可反复的。每次执行完selector.select(),选择器都会轮询被治理的通道,将通道上感兴趣的事件就绪的通道所对应的selectionKey退出到已就绪的表中。

为什么在写事件就绪之后,还要为通道注册读事件

咱们要聊聊写事件,写事件就绪的条件很容易满足,就是操作系统给对应socket调配的缓冲区还有闲暇,个别状况下都要满足。
key.interestOps(SelectionKey.OP_READ)相当于给该通道注册读事件,等价于勾销掉key关联通道对写事件感兴趣。因为一个key目前只能对一个事件感兴趣。如果你不勾销对写事件感兴趣,那么写事件会一直的被触发。

聊天室客户端

public class ChatClient {    public static void main(String[] args) {        try {            //客户端用socketchannel            SocketChannel socketChannel = SocketChannel.open();            // 将该通道设置为非阻塞            socketChannel.configureBlocking(false);            // 创立选择器                Selector selector = Selector.open();            // 确定服务端            socketChannel.connect(new InetSocketAddress("127.0.0.1",8888));            // 将该通道纳入选择器得治理范畴之下。客户端只有跟服务端建设连贯,因而对连贯感兴趣就好            socketChannel.register(selector, SelectionKey.OP_CONNECT);            while (true){               // 轮询通道的感兴趣事件是否就绪,有就绪事件即返回。没有就始终陷入阻塞               selector.select();               // 获取就绪的通道                  Set<SelectionKey> selectKeys = selector.selectedKeys();                Iterator<SelectionKey> keyIterator = selectKeys.iterator();                while(keyIterator.hasNext() ){                    SelectionKey selectKey = keyIterator.next();                    // 判断连贯是否实现                    if (selectKey.isConnectable()){                        ByteBuffer sendBuffer = ByteBuffer.allocate(1024);                        SocketChannel clientChannel = (SocketChannel)selectKey.channel();                        // 首次连贯实现后,程序外部依然须要一些解决,判断解决是否实现。                        // 这一行能够去掉,只是为了阐明这个办法。未实现连贯时返回false                        if (clientChannel.isConnectionPending()){                            // 只实现一次,当通道实现连贯后,返回为true                            if (clientChannel.finishConnect()) {                                System.out.print("连贯服务端胜利");                                sendBuffer.put("conncet".getBytes());                                sendBuffer.flip();                                clientChannel.write(sendBuffer);                            }                        }                        // 启动一个线程监听用户的输出,在目前这种模式下。用户随时能够写。随时发送信息                        new Thread(()->{                            while (true){                                sendBuffer.clear();                                InputStreamReader reader = new InputStreamReader(System.in);                                BufferedReader bufferReader = new BufferedReader(reader);                                try {                                    String message = bufferReader.readLine();                                    sendBuffer.put(message.getBytes());                                    sendBuffer.flip();                                    clientChannel.write(sendBuffer);                                } catch (IOException e) {                                    e.printStackTrace();                                }                            }                        }).start();                        // 连贯就绪,注册对读事件感兴趣                        clientChannel.register(selector,SelectionKey.OP_READ);                    }else if(selectKey.isReadable()){                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);                        SocketChannel clientChannel = (SocketChannel)selectKey.channel();                        int len = clientChannel.read(readBuffer);                        if (len > 0){                            String receive = new String(readBuffer.array(), 0, len);                            System.out.println(receive);                        }                    }                }                selectKeys.clear();            }        } catch (IOException e) {            e.printStackTrace();        }    }}

运行后果截图


小结

这方面网络编程相干方面优良的材料还是比拟少的,值得注意的是java的NIO中的选择器还存在着一个危险的空轮询BUG,这是JDK貌似还没解决的问题,有工夫会再专门写篇博客来探讨这个问题,不过不要放心,曾经被netty给解决了,生态好就是强,这是后话了,

参考资料:

  • Why the key should be removed in selector.selectedKeys().iterator() in java nio?
  • JAVA NIO工作原理及代码示例