共计 7842 个字符,预计需要花费 20 分钟才能阅读完成。
本篇咱们就尝试写一个一个聊天室的例子来领会多路复用,如果你不懂什么叫 I / O 多路复用,能够看下我写的这篇文章《Socket 简介和 I / O 多路复用》。咱们该当清晰的意识到没有操作系统提供的 I / O 多路复用机制,JVM 也是无奈做到多路复用的。
Selector 监督者 轮询者 选择器
Selector 中的一个类,也就是监督 Socket 流状态的类,调用的还是操作系统提供的服务,须要配合通道和缓冲区应用。
不论是多路复用还是非多路复用咱们总须要一个 socket,不过多路复用的时候在 java 中是 ServerSocketChannel。
ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
// 将通道设置为非阻塞,为多路复用做筹备。serverSocketChannel1.configureBlocking(false);
ServerSocket serverSocket = serverSocketChannel1.socket();
// 绑定端口
serverSocket.bind(new InetSocketAddress(8888));
选择器呢,怎么体现哪个客户端有数据就调用对应的代码来解决客户端呢? 还是在 ServerSocketChannel 上,当有数据到来时咱们就激活这个通道,从缓冲区中获取数据。因而咱们须要让 Selector 来治理 ServerSocketChannel,那么就是通过注册。
也就是这样:
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
serverSocketChannel1.configureBlocking(false);
ServerSocket serverSocket = serverSocketChannel1.socket();
serverSocket.bind(new InetSocketAddress(8888));
serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
网络通信中 JVM 形象出了四种通信状态,在 SelectionKey 中寄存:
- OP_READ 数据曾经筹备好了,能够读了,也成为了读就绪。
- OP_WRITE 曾经能够通过通道向缓冲区中写数据了,也称为写就绪。
- OP_CONNECT 连贯曾经就绪
- OP_ACCEPT 服务端曾经监听到了客户端发动的连贯申请,能够与服务端建设连贯的。
从 jdk 的正文咱们就能够看出,OP_ACCEPT 用于服务端程序,因为一个服务端会有许多客户端,先有是否有连贯的申请,再有数据是否筹备实现,是否可写。OP_CONNECT 用于客户端,当连贯建设,我才能够向服务端写数据和读数据。
当通道上有选择器感兴趣的事件产生,比方: 连贯的申请,再客户端和服务端连贯建设后是可读和可写。这些事件会被选择器放入一个汇合中。咱们就能够循环这个汇合,从客户端读取或写。一个客户端对应一些事件,这些事件不可能同时实现,总会有先后顺序,在收到客户端发动的连贯申请之后,咱们从中提取申请。
特地留神 OP_ACCEPT 只能用于选择器治理 ServerSocketChannel。OP_CONNECT 只能用于选择器治理 SocketChannel。
聊天室服务端
介绍选择器的一部分代码就是聊天室服务端代码的一部分。咱们先介绍大抵的思路,在上对应的代码。
惯例状况下咱们还是创立用 ServerSocketChannel 创立 Socket,而后绑定端口。用选择器治理通道,当通道上有选择器最后所感兴趣的事件之后,留神尽管咱们只注册了一个对连贯感兴趣的事件,然而实质上还是用的 Socket,所以在服务端发现有发动建设连贯的申请之后,服务端就会尝试和客户端建设连贯,连贯还是为了通信。
上文咱们提到,在通道有选择器所感兴趣的事件之后,会将放入到一个汇合中,也就是 Set<SelectionKey>
。通过 SelectionKey,咱们能够为减少选择器和通道之间的关联,最后选择器只会在通道有申请的连贯时被激活,只会向汇合中增加一个读就绪事件。咱们能够通过 SelectionKey 的 interestOps() 为选择器增加读就绪事件和写就绪事件。而后就能够灵便的解决了,也就是说在读就绪的时候读,写就绪的时候写。
每次向选择器注册通道时就会创立一个 SelectionKey 键,也就是抉择治理这个通道。通过调用某个 SelectionKey 的 cancel 办法、敞开其通道,或者通过敞开其选择器来勾销 该键之前,它始终放弃无效。
public class ChatServer {
// 存储客户端信息, 一个客户端一个 Channel
private static Map<String, SocketChannel> map = new HashMap<>();
public static void main(String[] args) throws IOException {
// 创立选择器
Selector selector = Selector.open();
// 创立服务端的通道
ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
// 将该通道设置为非阻塞形式
serverSocketChannel1.configureBlocking(false);
// 创立 Socket
ServerSocket serverSocket = serverSocketChannel1.socket();
// 绑定端口
serverSocket.bind(new InetSocketAddress(8888));
// 为 ServerSocketChannel 注册对对客户端申请连贯事件感兴趣
// 此时该 channel 处于选择器得治理之下
serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
// 因为不分明有多少客户端会连贯, 所以是 while(true)
while (true) {
// 该办法将始终阻塞,晓得选择器治理的通道上有选择器感兴趣的事件产生。selector.select();
// 每次向选择器注册通道时就会创立一个 SelectionKey 键
// 每个 SelectionKey 与对应的通道相连, 存储通道的状态
// 一个客户端一个 SocketChannel
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SocketChannel clietChannel = null;
SelectionKey key = iterator.next();
// 如果有客户端申请建设连贯
if (key.isAcceptable()) {
// 获取 SelectionKey 关联的通道
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
// 提取申请, 获取对应的 socketChannel
clietChannel = serverSocketChannel.accept();
// 将解决对应的客户端信息 socketChannel 设置为非阻塞
clietChannel.configureBlocking(false);
// 为该通道注册读就绪事件, 选择器会询问该通道的状态, 当该通道就绪时,
clietChannel.register(selector, SelectionKey.OP_READ);
// 记录解决对应客户端的 channel
String id = "key=" + (int) (Math.random() * 9000 + 1000);
map.put(id, clietChannel);
} else if (key.isReadable()) {
// 读就绪, 通过通道向缓冲区中取数据
clietChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
String receiveMsg = null;
int result = -1;
try {result = clietChannel.read(byteBuffer);
} catch (IOException e) {
// 正在读的时候服务端断开连接, 此时数据无奈持续读取
String clientKey = getClientKey(clietChannel);
System.out.println("退出了战场:" + clientKey);
map.remove(clientKey);
// 开释资源
clietChannel.close();
key.cancel();
continue;
}
if (result > 0) {
// 读之前要 flip
byteBuffer.flip();
// 避免乱码
Charset charset = Charset.forName("UTF-8");
receiveMsg = String.valueOf(charset.decode(byteBuffer).array());
System.out.println(clietChannel + ":" + receiveMsg);
if ("conncet".equals(receiveMsg)) {receiveMsg = "新客户端上线";}
System.out.println("读到了信息" + receiveMsg);
// 将信息搭载在 key 上, 为播送进来做筹备。// 因为咱们做的是聊天室, 相当于群聊。key.attach(receiveMsg);
// key 关联通道, 给 key 注册就相当于给通道注册。// 给对应的通道注册写事件。key.interestOps(SelectionKey.OP_WRITE);
}
} else if (key.isWritable()) {
//channel 能够读能够写
clietChannel = (SocketChannel) key.channel();
// 获取对应的客户端
String name = getClientKey(clietChannel);
for (Map.Entry<String, SocketChannel> entrySet : map.entrySet()) {SocketChannel entryValue = entrySet.getValue();
ByteBuffer broadCast = ByteBuffer.allocate(1024);
broadCast.put((name + ":" + key.attachment()).getBytes());
broadCast.flip();
// 将缓冲区的货色发送给每一个通道
entryValue.write(broadCast);
}
// 给 key 关联的通道注册读事件。key.interestOps(SelectionKey.OP_READ);
} else {System.out.println("hello world");
}
}
// 清空
keys.clear();}
}
private static String getClientKey(SocketChannel clietChannel) {
String key = null;
Set<Map.Entry<String, SocketChannel>> entrySet = map.entrySet();
for (Map.Entry<String, SocketChannel> value : entrySet) {if (value.getValue() == clietChannel) {key = value.getKey();
break;
}
}
return key;
}
}
为什么清空 keys?
为什么最初清空 Set<SelectionKey>
?我清空了还怎么用? 让我来解释一下,事实上选择器保护了三张表(三个汇合)。
- 注册表: 当咱们为通道注册事件,也就是将该通道纳入到选择器得管辖范畴之内时,也就是 channel.register。就会产生一个对应的 key 进入这张表。只有当咱们调用 key.cancel()办法时, 这个 key 才会被移除。通过 Selector 类中的 keys()返回这个汇合。
- 已就绪表: 当咱们调用 selector.select()办法, 这个选择器进入注册表查找, 查看哪个通道上的事件是曾经就绪了, 而后将这些可就绪的 key(就是 selectionKey, 通过 selectionKey 获取所关联通道的状态, 即通道上的事件是否就绪), 这个表不会被选择器清空, 即便咱们再次调用 selector.select(), 他也不会清空这张表中已存在的 key。
- 通道脱离管辖表: 也就是说 selectionKey 调用 cancle()办法, 那么该 selectionKey 所对应的通道将会脱离选择器的管辖。
这就是咱们最初调用清空 keys 汇合的起因, 如果遍历该汇合之后,所有就绪的事件都曾经解决而不清空,那么下次就绪的 key 咱们就获取不到。因为 set 是不可反复的。每次执行完 selector.select(), 选择器都会轮询被治理的通道,将通道上感兴趣的事件就绪的通道所对应的 selectionKey 退出到已就绪的表中。
为什么在写事件就绪之后, 还要为通道注册读事件
咱们要聊聊写事件, 写事件就绪的条件很容易满足, 就是操作系统给对应 socket 调配的缓冲区还有闲暇, 个别状况下都要满足。
key.interestOps(SelectionKey.OP_READ)相当于给该通道注册读事件, 等价于勾销掉 key 关联通道对写事件感兴趣。因为一个 key 目前只能对一个事件感兴趣。如果你不勾销对写事件感兴趣, 那么写事件会一直的被触发。
聊天室客户端
public class ChatClient {public static void main(String[] args) {
try {
// 客户端用 socketchannel
SocketChannel socketChannel = SocketChannel.open();
// 将该通道设置为非阻塞
socketChannel.configureBlocking(false);
// 创立选择器
Selector selector = Selector.open();
// 确定服务端
socketChannel.connect(new InetSocketAddress("127.0.0.1",8888));
// 将该通道纳入选择器得治理范畴之下。客户端只有跟服务端建设连贯, 因而对连贯感兴趣就好
socketChannel.register(selector, SelectionKey.OP_CONNECT);
while (true){
// 轮询通道的感兴趣事件是否就绪, 有就绪事件即返回。没有就始终陷入阻塞
selector.select();
// 获取就绪的通道
Set<SelectionKey> selectKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectKeys.iterator();
while(keyIterator.hasNext() ){SelectionKey selectKey = keyIterator.next();
// 判断连贯是否实现
if (selectKey.isConnectable()){ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
SocketChannel clientChannel = (SocketChannel)selectKey.channel();
// 首次连贯实现后, 程序外部依然须要一些解决,判断解决是否实现。// 这一行能够去掉, 只是为了阐明这个办法。未实现连贯时返回 false
if (clientChannel.isConnectionPending()){
// 只实现一次, 当通道实现连贯后, 返回为 true
if (clientChannel.finishConnect()) {System.out.print("连贯服务端胜利");
sendBuffer.put("conncet".getBytes());
sendBuffer.flip();
clientChannel.write(sendBuffer);
}
}
// 启动一个线程监听用户的输出, 在目前这种模式下。用户随时能够写。随时发送信息
new Thread(()->{while (true){sendBuffer.clear();
InputStreamReader reader = new InputStreamReader(System.in);
BufferedReader bufferReader = new BufferedReader(reader);
try {String message = bufferReader.readLine();
sendBuffer.put(message.getBytes());
sendBuffer.flip();
clientChannel.write(sendBuffer);
} catch (IOException e) {e.printStackTrace();
}
}
}).start();
// 连贯就绪, 注册对读事件感兴趣
clientChannel.register(selector,SelectionKey.OP_READ);
}else if(selectKey.isReadable()){ByteBuffer readBuffer = ByteBuffer.allocate(1024);
SocketChannel clientChannel = (SocketChannel)selectKey.channel();
int len = clientChannel.read(readBuffer);
if (len > 0){String receive = new String(readBuffer.array(), 0, len);
System.out.println(receive);
}
}
}
selectKeys.clear();}
} catch (IOException e) {e.printStackTrace();
}
}
}
运行后果截图
小结
这方面网络编程相干方面优良的材料还是比拟少的,值得注意的是 java 的 NIO 中的选择器还存在着一个危险的空轮询 BUG, 这是 JDK 貌似还没解决的问题,有工夫会再专门写篇博客来探讨这个问题,不过不要放心,曾经被 netty 给解决了,生态好就是强,这是后话了,
参考资料:
- Why the key should be removed in
selector.selectedKeys().iterator()
in java nio? - JAVA NIO 工作原理及代码示例