共计 5713 个字符,预计需要花费 15 分钟才能阅读完成。
最近在学习 Java 网络编程和 Netty 相关的知识,了解到 Netty 是 NIO 模式的网络框架,但是提供了不同的 Channel 来支持不同模式的网络通信处理,包括同步、异步、阻塞和非阻塞。学习要从基础开始,所以我们就要先了解一下相关的基础概念和 Java 原生的 NIO。这里,就将最近我学习的知识总结一下,以供大家了解。为了节约你的时间,本文主要内容如下:
异步,阻塞的概念
操作系统 I / O 的类型
Java NIO 的底层实现
异步,同步,阻塞,非阻塞
同步和异步关注的是消息通信机制,所谓同步就是调用者进行调用后,在没有得到结果之前,该调用一直不会返回,但是一旦调用返回,就得到了返回值,同步就是指调用者主动等待调用结果;而异步则相反,执行调用之后直接返回,所以可能没有返回值,等到有返回值时,由被调用者通过状态,通知来通知调用者.异步就是指被调用者来通知调用者调用结果就绪.所以,二者在消息通信机制上有所不同,一个是调用者检查调用结果是否就绪,一个是被调用者通知调用者结果就绪 阻塞和非阻塞关注的是程序在等待调用结果 (消息,返回值) 时的状态.阻塞调用是指在调用结果返回之前,当前线程会被挂起,调用线程只有在得到结果之后才会继续执行.非阻塞调用是指在不能立刻得到结构之前,调用线程不会被挂起,还是可以执行其他事情. 两组概念相互组合就有四种情况,分别是同步阻塞,同步非阻塞,异步阻塞,异步非阻塞.我们来举个例子来分别类比上诉四种情况. 比如你要从网上下载一个 1G 的文件,按下下载按钮之后,如果你一直在电脑旁边,等待下载结束,这种情况就是同步阻塞;如果你不需要一直呆在电脑旁边,你可以去看一会书,但是你还是隔一段时间来查看一下下载进度,这种情况就是同步非阻塞;如果你一直在电脑旁边,但是下载器在下载结束之后会响起音乐来提醒你,这就是异步阻塞;但是如果你不呆在电脑旁边,去看书,下载器下载结束后响起音乐来提醒你,那么这种情况就是异步非阻塞.
Unix 的 I / O 类型
知道上述两组概念之后,我们来看一下 Unix 下可用的 5 种 I / O 模型:
阻塞 I /O(bloking IO)
非阻塞 I /O(nonblocking IO)
多路复用 I /O(IO multiplexing)
信号驱动 I /O(signal driven IO)
异步 I /O(asynchronous IO)
前4种都是同步,只有最后一种是异步 I /O. 需要注意的是 Java NIO 依赖于 Unix 系统的多路复用 I /O, 对于 I / O 操作来说,它是同步 I /O,但是对于编程模型来说,它是异步网络调用. 下面我们就以系统 read 的调用来介绍不同的 I / O 类型. 当一个 read 发生时,它会经历两个阶段:
1 等待数据准备
2 将数据从内核内存空间拷贝到进程内存空间中
不同的 I / O 类型,在这两个阶段中有不同的行为.但是由于这块内容比较多,而且多为表述性的知识,所以这里我们只给出几张图片来解释,感觉兴趣的同学可以去具体了解一下。
Java NIO 的底层实现
我们都知道 Netty 通过 JNI 的方式提供了 Native Socket Transport,为什么 Netty 要提供自己的 Native 版本的 NIO 呢?明明 Java NIO 底层也是基于 epoll 调用 (最新的版本) 的.这里,我们先不明说,大家想一想可能的情况.下列的源码都来自于 OpenJDK-8u40-b25 版本.
open 方法
如果我们顺着 Selector.open()方法一个类一个类的找下去,很容易就发现 Selector 的初始化是由 DefaultSelectorProvider 根据不同操作系统平台生成的不同的 SelectorProvider,对于 Linux 系统,它会生成 EPollSelectorProvider 实例,而这个实例会生成 EPollSelectorImpl 作为最终的 Selector 实现.
class EPollSelectorImpl extends SelectorImpl
{
…..
// The poll object
EPollArrayWrapper pollWrapper;
…..
EPollSelectorImpl(SelectorProvider sp) throws IOException {
…..
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
…..
}
…..
}
EpollArrayWapper 将 Linux 的 epoll 相关系统调用封装成了 native 方法供 EpollSelectorImpl 使用.
private native int epollCreate();
private native void epollCtl(int epfd, int opcode, int fd, int events);
private native int epollWait(long pollAddress, int numfds, long timeout,
int epfd) throws IOException;
上述三个 native 方法就对应 Linux 下 epoll 相关的三个系统调用
// 创建一个 epoll 句柄,size 是这个监听的数目的最大值.
int epoll_create(int size);
// 事件注册函数,告诉内核 epoll 监听什么类型的事件,参数是感兴趣的事件类型,回调和监听的 fd
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 等待事件的产生,类似于 select 调用,events 参数用来从内核得到事件的集合
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
所以,我们会发现在 EpollArrayWapper 的构造函数中调用了 epollCreate 方法,创建了一个 epoll 的句柄.这样,Selector 对象就算创造完毕了.
register 方法
与 open 类似,ServerSocketChannel 的 register 函数底层是调用了 SelectorImpl 类的 register 方法,这个 SelectorImpl 就是 EPollSelectorImpl 的父类.
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
// 生成 SelectorKey 来存储到 hashmap 中,一共之后获取
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
//attach 用户想要存储的对象
k.attach(attachment);
// 调用子类的 implRegister 方法
synchronized (publicKeys) {
implRegister(k);
}
// 设置关注的 option
k.interestOps(ops);
return k;
}
EpollSelectorImpl 的相应的方法实现如下,它调用了 EPollArrayWrapper 的 add 方法,记录下 Channel 所对应的 fd 值, 然后将 ski 添加到 keys 变量中.在 EPollArrayWrapper 中有一个 byte 数组 eventLow 记录所有的 channel 的 fd 值.
protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
// 获取 Channel 所对应的 fd, 因为在 linux 下 socket 会被当作一个文件,也会有 fd
int fd = Integer.valueOf(ch.getFDVal());
fdToKey.put(fd, ski);
// 调用 pollWrapper 的 add 方法, 将 channel 的 fd 添加到监控列表中
pollWrapper.add(fd);
// 保存到 HashSet 中,keys 是 SelectorImpl 的成员变量
keys.add(ski);
}
我们会发现, 调用 register 方法并没有涉及到 EpollArrayWrapper 中的 native 方法 epollCtl 的调用, 这是因为他们将这个方法的调用推迟到 Select 方法中去了.
Select 方法
和 register 方法类似,SelectorImpl 中的 select 方法最终调用了其子类 EpollSelectorImpl 的 doSelect 方法
protected int doSelect(long timeout) throws IOException {
…..
try {
….
// 调用了 poll 方法, 底层调用了 native 的 epollCtl 和 epollWait 方法
pollWrapper.poll(timeout);
} finally {
….
}
….
// 更新 selectedKeys, 为之后的 selectedKeys 函数做准备
int numKeysUpdated = updateSelectedKeys();
….
return numKeysUpdated;
}
由上述的代码,可以看到,EPollSelectorImpl 先调用 EPollArrayWapper 的 poll 方法, 然后在更新 SelectedKeys.其中 poll 方法会先调用 epollCtl 来注册先前在 register 方法中保存的 Channel 的 fd 和感兴趣的事件类型,然后 epollWait 方法等待感兴趣事件的生成, 导致线程阻塞.
int poll(long timeout) throws IOException {
updateRegistrations(); //// 先调用 epollCtl, 更新关注的事件类型
//// 导致阻塞,等待事件产生
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
…..
return updated;
}
等待关注的事件产生之后(或在等待时间超过预先设置的最大时间),epollWait 函数就会返回.select 函数从阻塞状态恢复.
selectedKeys 方法
我们先来看 SelectorImpl 中的 selectedKeys 方法.
// 是通过 Util.ungrowableSet 生成的, 不能添加, 只能减少
private Set<SelectionKey> publicSelectedKeys;
public Set<SelectionKey> selectedKeys() {
….
return publicSelectedKeys;
}
很奇怪啊, 怎麽直接就返回 publicSelectedKeys 了, 难道在 select 函数的执行过程中有修改过这个变量吗? publicSelectedKeys 这个对象其实是 selectedKeys 变量的一份副本, 你可以在 SelectorImpl 的构造函数中找到它们俩的关系, 我们再回头看一下 select 中 updateSelectedKeys 方法.
private int updateSelectedKeys() {
// 更新了的 keys 的个数, 或在说是产生的事件的个数
int entries = pollWrapper.updated;
int numKeysUpdated = 0;
for (int i=0; i<entries; i++) {
// 对应的 channel 的 fd
int nextFD = pollWrapper.getDescriptor(i);
// 通过 fd 找到对应的 SelectionKey
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
if (ski != null) {
int rOps = pollWrapper.getEventOps(i);
// 更新 selectedKey 变量, 并通知响应的 channel 来做响应的处理
if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
ski.channel.translateAndSetReadyOps(rOps, ski);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
numKeysUpdated++;
}
}
}
}
return numKeysUpdated;
}
后记
看到这里, 详细大家都已经了解到了 NIO 的底层实现了吧. 这里我想在说两个问题. 一是为什么 Netty 自己又从新实现了一边 native 相关的 NIO 底层方法? 听听 Netty 的创始人是怎麽说的吧链接。因为 Java 的版本使用的 epoll 的 level-triggered 模式,而 Netty 则希望使用 edge-triggered 模式,而且 Java 版本没有将 epoll 的部分配置项暴露出来,比如说 TCP_CORK 和 SO_REUSEPORT。二是看这么多源码, 花费这么多时间有什么作用呢? 我感觉如果从非功利的角度来看, 那么就是纯粹的希望了解的更多, 有时候看完源码或在理解了底层原理之后, 都会用一种恍然大悟的感觉, 比如说 AQS 的原理. 如果从目的性的角度来看, 那么就是你知道底层原理之后, 你的把握性就更强了, 如果出了问题, 你可以更快的找出来, 并且解决. 除此之外, 你还可以按照具体的现实情况, 以源码为模板在自己造轮子, 实现一个更加符合你当前需求的版本. 后续如果有时间, 我希望好好了解一下 epoll 的操作系统级别的实现原理.