基本概念
IO 多路复用是指内核一旦发现进程指定的一个或者多个 IO 条件准备读取, 它就通知该进程. IO 多路复用适用如下场合:
(1)当客户处理多个描述字时 (一般是交互式输入和网络套接口), 必须使用 I / O 复用.
(2)当一个客户同时处理多个套接口时, 而这种情况是可能的, 但很少出现.
(3)如果一个 TCP 服务器既要处理监听套接口, 又要处理已连接套接口, 一般也要用到 I / O 复用.
(4)如果一个服务器即要处理 TCP, 又要处理 UDP, 一般要使用 I / O 复用.
(5)如果一个服务器要处理多个服务或多个协议, 一般要使用 I / O 复用.
与多进程和多线程技术相比, I/ O 多路复用技术的最大优势是系统开销小, 系统不必创建进程 / 线程, 也不必维护这些进程 / 线程, 从而大大减小了系统的开销.
Selector(选择器)
在 Java 中, Selector
这个类是 select/epoll/poll 的外包类, 在不同的平台上, 底层的实现可能有所不同, 但其基本原理是一样的, 其原理图如下所示:
所有的 Channel
都归 Selector
管理, 这些 channel
中只要有至少一个有 IO 动作, 就可以通过 Selector.select
方法检测到, 并且使用 selectedKeys
得到这些有 IO 的 channel
, 然后对它们调用相应的 IO 操作.
我这里有一个服务端的例子:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class EpollServer {public static void main(String[] args) {
try {ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress("127.0.0.1", 8000));
// 不设置阻塞队列
ssc.configureBlocking(false);
Selector selector = Selector.open();
// 注册 channel,并且指定感兴趣的事件是 Accept
ssc.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer readBuff = ByteBuffer.allocate(1024);
ByteBuffer writeBuff = ByteBuffer.allocate(128);
writeBuff.put("received".getBytes());
writeBuff.flip();
while (true) {int nReady = selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {SelectionKey key = it.next();
it.remove();
if (key.isAcceptable()) {
// 创建新的连接,并且把连接注册到 selector 上,而且,// 声明这个 channel 只对读操作感兴趣。SocketChannel socketChannel = ssc.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
else if (key.isReadable()) {SocketChannel socketChannel = (SocketChannel) key.channel();
readBuff.clear();
socketChannel.read(readBuff);
readBuff.flip();
System.out.println("received :" + new String(readBuff.array()));
key.interestOps(SelectionKey.OP_WRITE);
}
else if (key.isWritable()) {writeBuff.rewind();
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.write(writeBuff);
key.interestOps(SelectionKey.OP_READ);
}
}
}
} catch (IOException e) {e.printStackTrace();
}
}
}
这个例子的关键点:
- 创建一个 ServerSocketChannel, 和一个 Selector, 并且把这个 server channel 注册到 selector 上, 注册的时间指定, 这个 channel 所感觉兴趣的事件是
SelectionKey.OP_ACCEPT
, 这个事件代表的是有客户端发起 TCP 连接请求. - 使用 select 方法阻塞住线程, 当 select 返回的时候, 线程被唤醒. 再通过 selectedKeys 方法得到所有可用 channel 的集合.
- 遍历这个集合, 如果其中 channel 上有连接到达, 就接受新的连接, 然后把这个新的连接也注册到 selector 中去.
- 如果有 channel 是读, 那就把数据读出来, 并且把它感兴趣的事件改成写. 如果是写, 就把数据写出去, 并且把感兴趣的事件改成读.
Selector.open
在不同的系统里实现方式不同
sunOS 使用 DevPollSelectorProvider, Linux 就会使用 EPollSelectorProvider, 而默认则使用 PollSelectorProvider
也就是说 selector.select()
用来阻塞线程, 直到一个或多个 channle 进行 io 操作. 比如 SelectionKey.OP_ACCEPT
.
然后使用 selector.selectedKeys()
方法获取出, 这些通道.
那么 selector.select()
是怎么直到已经有 io 操作了呢?
原因是因为 poll
poll
# include <poll.h>
int poll (struct pollfd * fds, unsigned int nfds, int timeout);
pollfd 结构体定义如下:
struct pollfd {
int fd; /* 文件描述符 */
short events; /* 等待的事件 */
short revents; /* 实际发生了的事件 */
};
每一个 pollfd
结构体指定了一个被监视的文件描述符, 可以传递多个结构体, 指示 poll()
监视多个文件描述符.
每个结构体的 events
域是监视该文件描述符的事件掩码, 由用户来设置这个域. revents
域是文件描述符的操作结果事件掩码, 内核在调用返回时设置这个域.
events
域中请求的任何事件都可能在 revents
域中返回. 事件如下:
值 | 描述 |
---|---|
POLLIN | 有数据可读 |
POLLRDNORM | 有普通数据可读 |
POLLRDBAND | 有优先数据可读 |
POLLPRI | 有紧迫数据可读 |
POLLOUT | 写数据不会导致阻塞 |
POLLWRNORM | 写普通数据不会导致阻塞 |
POLLWRBAND | 写优先数据不会导致阻塞 |
POLLMSGSIGPOLL | 消息可用 |
POLLER | 指定的文件描述符发生错误 |
POLLHUP | 指定的文件描述符挂起事件 |
POLLNVAL | 指定的文件描述符非法 |
说白了 poll()
可以监视多个文件描述符.
如果返回值是 3, 我们需要逐个去遍历出返回值是 3 的 socket, 然后在做对应操作.
epoll
poll 方法有一个非常大的缺陷. poll 函数的返回值是一个整数, 得到了这个返回值以后, 我们还是要逐个去检查, 比如说, 有一万个 socket 同时 poll, 返回值是 3, 我们还是只能去遍历这一万个 socket, 看看它们是否有 IO 动作.
这就很低效了, 于是, 就有了 epoll 的改进, epoll 可以直接通过“输出参数”(可以理解为 C 语言中的指针类型的参数), 一个 epoll_event 数组, 直接获得这三个 socket, 这就比较快了.