共计 40288 个字符,预计需要花费 101 分钟才能阅读完成。
本系列 Netty 源码解析文章基于 4.1.56.Final版本
本文笔者来为大家介绍下 Netty 的外围引擎 Reactor 的运行架构,心愿通过本文的介绍可能让大家对 Reactor 是如何驱动着整个 Netty 框架的运行有一个全面的意识。也为咱们后续进一步介绍 Netty 对于解决网络申请的整个生命周期的相干内容做一个前置常识的铺垫,不便大家后续了解。
那么在开始本文正式的内容之前,笔者先来带着大家回顾下前边文章介绍的对于 Netty 整个框架如何搭建的相干内容,没有看过笔者前边几篇文章的读者敌人也没关系,这些并不会影响到本文的浏览,只不过波及到相干细节的局部,大家能够在回看下。
前文回顾
在《聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现 (创立篇)》一文中,咱们介绍了 Netty 服务端的外围引擎 主从 Reactor 线程组
的创立过程以及相干外围组件里的重要属性。在这个过程中,咱们还提到了 Netty 对各种细节进行的优化,比方针对 JDK NIO 原生 Selector 做的一些优化,展示了 Netty 对性能极致的谋求。最终咱们创立出了如下构造的 Reactor。
在上篇文章《具体图解 Netty Reactor 启动全流程》中,咱们残缺地介绍了 Netty 服务端启动的整个流程,并介绍了在启动过程中波及到的 ServerBootstrap 相干的属性以及配置形式。用于接管连贯的服务端 NioServerSocketChannel 的创立和初始化过程以及其类的继承构造。其中重点介绍了 NioServerSocketChannel 向 Reactor 的注册过程以及 Reactor 线程的启动机会和 pipeline 的初始化机会。最初介绍了 NioServerSocketChannel 绑定端口地址的整个流程。在这个过程中咱们理解了 Netty 的这些外围组件是如何串联起来的。
当 Netty 启动结束后,咱们失去了如下的框架结构:
主 Reactor 线程组中治理的是 NioServerSocketChannel
用于接管客户端连贯,并在本人的 pipeline 中的 ServerBootstrapAcceptor
里初始化接管到的客户端连贯,随后会将初始化好的客户端连贯注册到从 Reactor 线程组中。
从 Reactor 线程组次要负责监听解决注册其上的所有客户端连贯的 IO 就绪事件。
其中一个 Channel 只能调配给一个固定的 Reactor。一个 Reactor 负责解决多个 Channel 上的 IO 就绪事件,这样能够将服务端承载的 全量客户端连贯
摊派到多个 Reactor
中解决,同时也能保障Channel 上 IO 解决的线程安全性
。Reactor 与 Channel 之间的对应关系如下图所示:
以上内容就是对笔者前边几篇文章的相干内容回顾,大家能回顾起来更好,回顾不起来也没关系,一点也不影响大家了解本文的内容。如果对相干细节感兴趣的同学,能够在浏览完本文之后,在去回看下。
咱们言归正传,正式开始本文的内容,笔者接下来会为大家介绍这些外围组件是如何相互配合从而驱动着整个 Netty Reactor 框架运行的。
当 Netty Reactor 框架启动结束后,接下来第一件事件也是最重要的事件就是如何来高效的接管客户端的连贯。
那么在探讨 Netty 服务端如何接管连贯之前,咱们须要弄清楚 Reactor 线程
的运行机制,它是如何监听并解决 Channel
上的 IO 就绪事件
的。
本文相当于是后续咱们介绍 Reactor 线程
监听解决 ACCEPT 事件
,Read 事件
,Write 事件
的前置篇,本文专一于讲述 Reactor 线程
的整个运行框架。了解了本文的内容,对了解前面 Reactor 线程
如何解决 IO 事件
会大有帮忙。
咱们在 Netty 框架的 创立阶段
和启动阶段
无数次的提到了 Reactor 线程
,那么在本文要介绍的 运行阶段
就该这个 Reactor 线程
来大显神威了。
通过前边文章的介绍,咱们理解到 Netty 中的 Reactor 线程
次要干三件事件:
- 轮询注册在
Reactor
上的所有Channel
感兴趣的IO 就绪事件
。 - 解决
Channel
上的IO 就绪事件
。 - 执行 Netty 中的异步工作。
正是这三个局部组成了 Reactor
的运行框架,那么咱们当初来看下这个运行框架具体是怎么运行的~~
Reactor 线程的整个运行框架
大家还记不记得笔者在《聊聊 Netty 那些事儿之从内核角度看 IO 模型》一文中提到的,IO 模型的演变
是围绕着 "如何用尽可能少的线程去治理尽可能多的连贯"
这一主题进行的。
Netty 的 IO 模型
是通过 JDK NIO Selector
实现的 IO 多路复用模型
,而 Netty 的IO 线程模型
为主从 Reactor 线程模型
。
依据《聊聊 Netty 那些事儿之从内核角度看 IO 模型》一文中介绍的 IO 多路复用模型
咱们很容易就能了解到 Netty 会应用一个用户态的 Reactor 线程
去一直的通过 Selector
在内核态去轮训 Channel
上的IO 就绪事件
。
说白了 Reactor 线程
其实执行的就是一个 死循环
,在 死循环
中一直的通过 Selector
去轮训 IO 就绪事件
,如果产生IO 就绪事件
则从 Selector
零碎调用中返回并解决 IO 就绪事件
,如果没有产生IO 就绪事件
则始终 阻塞
在Selector
零碎调用上,直到满足Selector 唤醒条件
。
以下三个条件中只有满足任意一个条件,Reactor 线程就会被从 Selector 上唤醒:
- 当 Selector 轮询到有 IO 沉闷事件产生时。
- 当 Reactor 线程须要执行的
定时工作
达到工作执行工夫deadline
时。 - 当有
异步工作
提交给 Reactor 时,Reactor 线程须要从Selector
上被唤醒,这样能力及时的去执行异步工作
。
这里能够看出 Netty 对
Reactor 线程
的压迫还是比拟狠的,反正当初也没有IO 就绪事件
须要去解决,不能让Reactor 线程
在这里白白等着,要立刻唤醒它,转去解决提交过去的异步工作以及定时工作。Reactor 线程
堪称996 榜样
一刻不停歇地运作着。
在理解了 Reactor 线程
的大略运行框架后,咱们接下来就到源码中去看下它的外围运行框架是如何实现进去的。
因为这块源码比拟宏大繁冗,所以笔者先把它的运行框架提取进去,不便大家整体的了解整个运行过程的全貌。
上图所展现的就是 Reactor 整个工作体系的全貌,次要分为如下几个重要的工作模块:
- Reactor 线程在 Selector 上阻塞获取 IO 就绪事件。在这个模块中首先会去查看以后是否有异步工作须要执行,如果有异步须要执行,那么不论以后有没有 IO 就绪事件都不能阻塞在 Selector 上,随后会去非阻塞的轮询一下 Selector 上是否有 IO 就绪事件,如果有,正好能够和异步工作一起执行。优先解决 IO 就绪事件,在执行异步工作。
- 如果以后没有异步工作须要执行,那么 Reactor 线程会接着查看是否有定时工作须要执行,如果有则在 Selector 上阻塞直到定时工作的到期工夫 deadline,或者满足其余唤醒条件被唤醒。如果没有定时工作须要执行,Reactor 线程则会在 Selector 上始终阻塞直到满足唤醒条件。
- 当 Reactor 线程满足唤醒条件被唤醒后,首先会去判断以后是因为有 IO 就绪事件被唤醒还是因为有异步工作须要执行被唤醒或者是两者都有。随后 Reactor 线程就会去解决 IO 就绪事件和执行异步工作。
- 最初 Reactor 线程返回循环终点一直的反复上述三个步骤。
以上就是 Reactor 线程运行的整个外围逻辑,上面是笔者根据上述外围逻辑,将 Reactor 的整体代码设计框架提取进去,大家能够联合上边的 Reactor 工作流程图,从总体上先感触下整个源码实现框架,可能把 Reactor 的外围解决步骤和代码中相应的解决模块对应起来即可,这里不须要读懂每一行代码,要以逻辑解决模块为单位了解。前面笔者会将这些一个一个的逻辑解决模块在独自拎进去为大家具体介绍。
@Override
protected void run() {
// 记录轮询次数 用于解决 JDK epoll 的空轮训 bug
int selectCnt = 0;
for (;;) {
try {
// 轮询后果
int strategy;
try {// 依据轮询策略获取轮询后果 这里的 hasTasks()次要查看的是一般队列和尾部队列中是否有异步工作期待执行
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// NIO 不反对自旋(BUSY_WAIT)case SelectStrategy.SELECT:
外围逻辑是有工作须要执行,则 Reactor 线程立马执行异步工作,如果没有异步工作执行,则进行轮询 IO 事件
default:
}
} catch (IOException e) {................ 省略...............}
执行到这里阐明满足了唤醒条件,Reactor 线程从 selector 上被唤醒开始解决 IO 就绪事件和执行异步工作
/**
* Reactor 线程须要保障及时的执行异步工作,只有有异步工作提交,就须要退出轮询。* 有 IO 事件就优先解决 IO 事件,而后解决异步工作
* */
selectCnt++;
// 次要用于从 IO 就绪的 SelectedKeys 汇合中剔除曾经生效的 selectKey
needsToSelectAgain = false;
// 调整 Reactor 线程执行 IO 事件和执行异步工作的 CPU 工夫比例 默认 50,示意执行 IO 事件和异步工作的工夫比例是一比一
final int ioRatio = this.ioRatio;
这里次要解决 IO 就绪事件,以及执行异步工作
须要优先解决 IO 就绪事件,而后依据 ioRatio 设置的解决 IO 事件 CPU 用时与异步工作 CPU 用时比例,来决定执行多长时间的异步工作
// 判断是否触发 JDK Epoll BUG 触发空轮询
if (ranTasks || strategy > 0) {if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) {// Unexpected wakeup (unusual case)
// 既没有 IO 就绪事件,也没有异步工作,Reactor 线程从 Selector 上被异样唤醒 触发 JDK Epoll 空轮训 BUG
// 从新构建 Selector,selectCnt 归零
selectCnt = 0;
}
} catch (CancelledKeyException e) {................ 省略...............} catch (Error e) {................ 省略...............} catch (Throwable t) {................ 省略...............} finally {................ 省略...............}
}
}
从下面提取进去的 Reactor 的源码实现框架中,咱们能够看出 Reactor 线程
次要做了上面几个事件:
- 通过
JDK NIO Selector
轮询注册在Reactor
上的所有Channel
感兴趣的IO 事件
。对于 NioServerSocketChannel 来说因为它次要负责接管客户端连贯所以监听的是OP_ACCEPT 事件
,对于客户端 NioSocketChannel 来说因为它次要负责解决连贯上的读写事件所以监听的是OP_READ
和OP_WRITE
事件。
这里须要留神的是 netty 只会主动注册
OP_READ
事件,而OP_WRITE 事件
是在当 Socket 写入缓冲区以满无奈持续写入发送数据时由用户本人注册。
-
如果有异步工作须要执行,则立马进行轮询操作,转去执行异步工作。这里分为两种状况:
- 既有
IO 就绪事件
产生,也有异步工作
须要执行。则优先解决IO 就绪事件
,而后依据ioRatio
设置的执行工夫比例
决定执行多长时间的异步工作。这里 Reactor 线程须要管制异步工作的执行工夫,因为 Reactor 线程的外围是解决 IO 就绪事件,不能因为异步工作的执行而耽搁了最重要的事件。 -
没有
IO 就绪事件
产生,然而有异步工作或者定时工作到期须要执行。则只执行异步工作
,尽可能的去压迫 Reactor 线程。没有 IO 就绪事件产生也不能闲着。这里第二种状况下只会执行
64
个异步工作,目标是为了避免适度
执行异步工作,耽搁了
最重要的事件轮询 IO 事件
。
- 既有
- 在最初 Netty 会判断本次
Reactor 线程
的唤醒是否是因为触发了 JDK epoll 空轮询 BUG 导致的,如果触发了该 BUG,则重建Selector
。绕过 JDK BUG,达到解决问题的目标。
失常状况下 Reactor 线程从 Selector 中被唤醒有两种状况:
- 轮询到有 IO 就绪事件产生。
- 有异步工作或者定时工作须要执行。
而 JDK epoll 空轮询 BUG 会在上述两种状况都没有产生的时候,Reactor 线程
会意外的从Selector
中被唤醒,导致 CPU 空转。JDK epoll 空轮询 BUG:https://bugs.java.com/bugdata…
好了,Reactor 线程
的总体运行构造框架咱们当初曾经理解了,上面咱们来深刻到这些外围解决模块中来各个击破它们~~
1. Reactor 线程轮询 IO 就绪事件
在《聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现 (创立篇)》一文中,笔者在讲述主从 Reactor 线程组NioEventLoopGroup
的创立过程的时候,提到一个结构器参数SelectStrategyFactory
。
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
Reactor 线程
最重要的一件事件就是 轮询 IO 就绪事件
,SelectStrategyFactory
就是用于指定轮询策略的,默认实现为DefaultSelectStrategyFactory.INSTANCE
。
而在 Reactor 线程
开启轮询的一开始,就是用这个 selectStrategy
去计算一个 轮询策略 strategy
,后续会依据这个 strategy
进行不同的逻辑解决。
@Override
protected void run() {
// 记录轮询次数 用于解决 JDK epoll 的空轮训 bug
int selectCnt = 0;
for (;;) {
try {
// 轮询后果
int strategy;
try {// 依据轮询策略获取轮询后果 这里的 hasTasks()次要查看的是一般队列和尾部队列中是否有异步工作期待执行
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// NIO 不反对自旋(BUSY_WAIT)case SelectStrategy.SELECT:
外围逻辑是有工作须要执行,则 Reactor 线程立马执行异步工作,如果没有异步工作执行,则进行轮询 IO 事件
default:
}
} catch (IOException e) {................ 省略...............}
................ 省略...............
}
上面咱们来看这个 轮询策略 strategy
具体的计算逻辑是什么样的?
1.1 轮询策略
public interface SelectStrategy {
/**
* Indicates a blocking select should follow.
*/
int SELECT = -1;
/**
* Indicates the IO loop should be retried, no blocking select to follow directly.
*/
int CONTINUE = -2;
/**
* Indicates the IO loop to poll for new events without blocking.
*/
int BUSY_WAIT = -3;
int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
}
咱们首先来看下 Netty 中定义的这三种轮询策略:
SelectStrategy.SELECT:
此时没有任何异步工作须要执行,Reactor 线程
能够安心的阻塞
在Selector
上期待IO 就绪事件
的降临。SelectStrategy.CONTINUE:
从新开启一轮IO 轮询
。SelectStrategy.BUSY_WAIT:
Reactor 线程进行自旋轮询
,因为NIO 不反对自旋操作
,所以这里间接跳到SelectStrategy.SELECT
策略。
上面咱们来看下 轮询策略
的计算逻辑calculateStrategy
:
final class DefaultSelectStrategy implements SelectStrategy {static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
private DefaultSelectStrategy() {}
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
/**
* Reactor 线程要保障及时的执行异步工作
* 1:如果有异步工作期待执行,则马上执行 selectNow()非阻塞轮询一次 IO 就绪事件
* 2:没有异步工作,则跳到 switch select 分支
* */
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;}
}
- 在
Reactor 线程
的轮询工作开始之前,须要首先判断下以后是否有异步工作
须要执行。判断根据就是查看Reactor
中的异步工作队列taskQueue
和用于统计信息工作用的尾部队列tailTask
是否有异步工作
。
@Override
protected boolean hasTasks() {return super.hasTasks() || !tailTasks.isEmpty();}
protected boolean hasTasks() {assert inEventLoop();
return !taskQueue.isEmpty();}
- 如果
Reactor
中有异步工作
须要执行,那么Reactor 线程
须要立刻执行,不能阻塞在Selector
上。在返回前须要再顺带调用selectNow()
非阻塞查看一下以后是否有IO 就绪事件
产生。如果有,那么正好能够和异步工作
一起被解决,如果没有,则及时地解决异步工作
。
这里 Netty 要表白的语义是:首先 Reactor 线程须要优先保障
IO 就绪事件
的解决,而后在保障异步工作
的及时执行。如果以后没有 IO 就绪事件然而有异步工作须要执行时,Reactor 线程就要去及时执行异步工作而不是持续阻塞在 Selector 上期待 IO 就绪事件。
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {return selectNow();
}
};
int selectNow() throws IOException {
// 非阻塞
return selector.selectNow();}
- 如果以后
Reactor 线程
没有异步工作须要执行,那么calculateStrategy
办法间接返回SelectStrategy.SELECT
也就是SelectStrategy 接口
中定义的常量-1
。当calculateStrategy
办法通过selectNow()
返回非零
数值时,示意此时有IO 就绪
的Channel
,返回的数值示意有多少个IO 就绪
的Channel
。
@Override
protected void run() {
// 记录轮询次数 用于解决 JDK epoll 的空轮训 bug
int selectCnt = 0;
for (;;) {
try {
// 轮询后果
int strategy;
try {// 依据轮询策略获取轮询后果 这里的 hasTasks()次要查看的是一般队列和尾部队列中是否有异步工作期待执行
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// NIO 不反对自旋(BUSY_WAIT)case SelectStrategy.SELECT:
外围逻辑是有工作须要执行,则 Reactor 线程立马执行异步工作,如果没有异步工作执行,则进行轮询 IO 事件
default:
}
} catch (IOException e) {................ 省略...............}
................ 解决 IO 就绪事件以及执行异步工作...............
}
从默认的轮询策略咱们能够看出 selectStrategy.calculateStrategy
只会返回三种状况:
返回 -1:
switch 逻辑分支进入SelectStrategy.SELECT 分支
,示意此时Reactor
中没有异步工作
须要执行,Reactor 线程
能够安心的阻塞在Selector
上期待IO 就绪事件
产生。返回 0:
switch 逻辑分支进入default 分支
,示意此时Reactor
中没有IO 就绪事件
然而有异步工作
须要执行,流程通过default 分支
间接进入了解决异步工作
的逻辑局部。返回 > 0:
switch 逻辑分支进入default 分支
,示意此时Reactor
中既有IO 就绪事件
产生也有异步工作
须要执行,流程通过default 分支
间接进入了解决IO 就绪事件
和执行异步工作
逻辑局部。
当初 Reactor
的流程解决逻辑走向咱们分明了,那么接下来咱们把重点放在 SelectStrategy.SELECT 分支中的轮询逻辑上。这块是 Reactor 监听 IO 就绪事件的外围。
1.2 轮询逻辑
case SelectStrategy.SELECT:
// 以后没有异步工作执行,Reactor 线程能够释怀的阻塞期待 IO 就绪事件
// 从定时工作队列中取出行将快要执行的定时工作 deadline
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
// - 1 代表以后定时工作队列中没有定时工作
curDeadlineNanos = NONE; // nothing on the calendar
}
// 最早执行定时工作的 deadline 作为 select 的阻塞工夫,意思是到了定时工作的执行工夫
// 不论有无 IO 就绪事件,必须唤醒 selector,从而使 reactor 线程执行定时工作
nextWakeupNanos.set(curDeadlineNanos);
try {if (!hasTasks()) {
// 再次查看一般工作队列中是否有异步工作
// 没有的话开始 select 阻塞轮询 IO 就绪事件
strategy = select(curDeadlineNanos);
}
} finally {
// 执行到这里阐明 Reactor 曾经从 Selector 上被唤醒了
// 设置 Reactor 的状态为昏迷状态 AWAKE
// lazySet 优化不必要的 volatile 操作,不应用内存屏障,不保障写操作的可见性(单线程不须要保障)nextWakeupNanos.lazySet(AWAKE);
}
流程走到这里,阐明当初 Reactor
上没有任何事件可做,能够安心的 阻塞
在Selector
上期待 IO 就绪事件
到来。
那么 Reactor 线程
到底应该在 Selector
上阻塞多久呢??
在答复这个问题之前,咱们在回顾下《聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现 (创立篇)》一文中在讲述Reactor 的创立
时提到,Reactor 线程
除了要轮询 Channel
上的 IO 就绪事件
,以及解决IO 就绪事件
外,还有一个工作就是负责执行 Netty 框架中的 异步工作
。
而 Netty 框架中的 异步工作
分为三类:
- 寄存在一般工作队列
taskQueue
中的一般异步工作。 - 寄存在尾部队列
tailTasks
中的用于执行统计工作等收尾动作的尾部工作。 - 还有一种就是这里行将提到的
定时工作
。寄存在Reactor
中的定时工作队列scheduledTaskQueue
中。
从 ReactorNioEventLoop 类
中的继承构造咱们也能够看出,Reactor
具备执行定时工作的能力。
既然 Reactor
须要执行定时工作,那么它就不能始终 阻塞
在Selector
上有限期待IO 就绪事件
。
那么咱们回到本大节一开始提到的问题上,为了保障 Reactor
可能及时地执行 定时工作
,Reactor 线程
须要在行将要执行的的第一个定时工作 deadline
达到之前被唤醒。
所以在 Reactor
线程开始轮询 IO 就绪事件
之前,咱们须要首先计算出来 Reactor 线程
在Selector
上的阻塞超时工夫。
1.2.1 Reactor 的轮询超时工夫
首先咱们须要从 Reactor
的定时工作队列 scheduledTaskQueue
中取出行将快要执行的定时工作 deadline
。将这个deadline
作为 Reactor 线程
在Selector
上轮询的超时工夫。这样能够保障在定时工作行将要执行时,Reactor 当初能够及时的从 Selector 上被唤醒。
private static final long AWAKE = -1L;
private static final long NONE = Long.MAX_VALUE;
// nextWakeupNanos is:
// AWAKE when EL is awake
// NONE when EL is waiting with no wakeup scheduled
// other value T when EL is waiting with wakeup scheduled at time T
private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
// - 1 代表以后定时工作队列中没有定时工作
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
protected final long nextScheduledTaskDeadlineNanos() {ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;}
final ScheduledFutureTask<?> peekScheduledTask() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;}
}
nextScheduledTaskDeadlineNanos
办法会返回以后 Reactor
定时工作队列中最近的一个定时工作 deadline
工夫点,如果定时工作队列中没有定时工作,则返回-1
。
NioEventLoop
中 nextWakeupNanos
变量用来寄存 Reactor 从 Selector
上被唤醒的工夫点,设置为最近须要被执行定时工作的 deadline
,如果以后并没有定时工作须要执行,那么就设置为Long.MAX_VALUE
始终阻塞,直到有 IO 就绪事件
达到或者有 异步工作
须要执行。
1.2.2 Reactor 开始轮询 IO 就绪事件
if (!hasTasks()) {
// 再次查看一般工作队列中是否有异步工作,没有的话 开始 select 阻塞轮询 IO 就绪事件
strategy = select(curDeadlineNanos);
}
在 Reactor 线程
开始 阻塞
轮询 IO 就绪事件
之前还须要再次检查一下是否有 异步工作
须要执行。
如果此时凑巧有 异步工作
提交,就须要进行 IO 就绪事件
的轮询,转去执行 异步工作
。如果没有 异步工作
,则正式开始轮询IO 就绪事件
。
private int select(long deadlineNanos) throws IOException {if (deadlineNanos == NONE) {
// 无定时工作,无一般工作执行时,开始轮询 IO 就绪事件,没有就始终阻塞 直到唤醒条件成立
return selector.select();}
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
如果 deadlineNanos == NONE
,通过上大节的介绍,咱们晓得NONE
示意以后 Reactor
中并没有定时工作,所以能够安心的 阻塞
在Selector
上期待 IO 就绪事件
到来。
selector.select()
调用是一个阻塞调用,如果没有 IO 就绪事件
,Reactor 线程
就会始终阻塞在这里直到 IO 就绪事件
到来。这里占时不思考前边提到的JDK NIO Epoll 的空轮询 BUG
.
读到这里那么问题来了,此时 Reactor 线程
正阻塞在 selector.select()
调用上期待 IO 就绪事件
的到来,如果此时正好有 异步工作
被提交到 Reactor
中须要执行,并且此时无任何 IO 就绪事件
,而Reactor 线程
因为没有 IO 就绪事件
到来,会持续在这里阻塞,那么如何去执行 异步工作
呢??
解铃还须系铃人,既然 异步工作
在被提交后心愿立马失去执行,那么就在提交 异步工作
的时候去唤醒Reactor 线程
。
//addTaskWakesUp = true 示意 当且仅当只有调用 addTask 办法时 才会唤醒 Reactor 线程
//addTaskWakesUp = false 示意 并不是只有 addTask 办法能力唤醒 Reactor 还有其余办法能够唤醒 Reactor 默认设置 false
private final boolean addTaskWakesUp;
private void execute(Runnable task, boolean immediate) {boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
// 如果以后线程不是 Reactor 线程,则启动 Reactor 线程
// 这里能够看出 Reactor 线程的启动是通过 向 NioEventLoop 增加异步工作时启动的
startThread();
..................... 省略...................
}
if (!addTaskWakesUp && immediate) {
//io.netty.channel.nio.NioEventLoop.wakeup
wakeup(inEventLoop);
}
}
对于 execute 办法
我想大家肯定不会生疏,在上篇文章《具体图解 Netty Reactor 启动全流程》中咱们在介绍 Reactor 线程的启动
时介绍过该办法。
在启动过程中波及到的重要操作 Register 操作
,Bind 操作
都须要封装成 异步工作
通过该办法提交到 Reactor
中执行。
这里咱们将重点放在 execute 办法
后半段 wakeup
逻辑局部。
咱们先介绍下和 wakeup
逻辑相干的两个参数 boolean immediate
和boolean addTaskWakesUp
。
immediate:
示意提交的task
是否须要被立刻执行。Netty 中只有你提交的工作类型不是LazyRunnable
类型的工作,都是须要立刻执行的。immediate = true
addTaskWakesUp :
true
示意当且仅当只有
调用addTask
办法时才会唤醒Reactor 线程
。调用别的办法并不会唤醒Reactor 线程
。
在初始化NioEventLoop
时会设置为false
,示意并不是只有
addTask 办法能力唤醒Reactor 线程
还有其余办法能够唤醒Reactor 线程
,比方这里的execute 办法
就会唤醒Reactor 线程
。
针对 execute 办法中的这个唤醒条件!addTaskWakesUp && immediate
,netty 这里要表白的语义是:当 immediate 参数为 true 的时候示意该异步工作须要立刻执行,addTaskWakesUp 默认设置为 false 示意不仅只有 addTask 办法能够唤醒 Reactor,还有其余办法比方这里的 execute 办法也能够唤醒。然而当设置为 true 时,语义就变为只有 addTask 才能够唤醒 Reactor,即便 execute 办法里的 immediate = true 也不能唤醒 Reactor,因为执行的是 execute 办法而不是 addTask 办法。
private static final long AWAKE = -1L;
private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
protected void wakeup(boolean inEventLoop) {if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
// 将 Reactor 线程从 Selector 上唤醒
selector.wakeup();}
}
当 nextWakeupNanos = AWAKE
时示意以后 Reactor 正处于昏迷状态,既然是昏迷状态也就没有必要去执行 selector.wakeup()
反复唤醒 Reactor 了,同时也能省去这一次的零碎调用开销。
在《1.2 大节 轮询逻辑》开始介绍的源码实现框架里 Reactor 被唤醒之后执行代码会进入 finally{...}
语句块中,在那里会将 nextWakeupNanos
设置为AWAKE
。
try {if (!hasTasks()) {strategy = select(curDeadlineNanos);
}
} finally {
// 执行到这里阐明 Reactor 曾经从 Selector 上被唤醒了
// 设置 Reactor 的状态为昏迷状态 AWAKE
// lazySet 优化不必要的 volatile 操作,不应用内存屏障,不保障写操作的可见性(单线程不须要保障)nextWakeupNanos.lazySet(AWAKE);
}
这里 Netty 用了一个
AtomicLong 类型
的变量nextWakeupNanos
,既能示意以后Reactor 线程
的状态,又能示意Reactor 线程
的阻塞超时工夫。咱们在日常开发中也能够学习下这种技巧。
咱们持续回到 Reactor 线程
轮询 IO 就绪事件
的主线上。
private int select(long deadlineNanos) throws IOException {if (deadlineNanos == NONE) {
// 无定时工作,无一般工作执行时,开始轮询 IO 就绪事件,没有就始终阻塞 直到唤醒条件成立
return selector.select();}
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
当 deadlineNanos
不为 NONE
,示意此时Reactor
有定时工作
须要执行,Reactor 线程
须要阻塞在 Selector
上期待 IO 就绪事件
直到最近的一个定时工作执行工夫点 deadline
达到。
这里的 deadlineNanos
示意的就是 Reactor
中最近的一个定时工作执行工夫点 deadline
,单位是 纳秒
。指的是一个 相对工夫
。
而咱们须要计算的是 Reactor 线程
阻塞在 Selector
的超时工夫 timeoutMillis
,单位是 毫秒
,指的是一个 绝对工夫
。
所以在 Reactor 线程
开始阻塞在 Selector
上之前,咱们须要将这个单位为 纳秒
的相对工夫 deadlineNanos
转化为单位为 毫秒
的绝对工夫timeoutMillis
。
private int select(long deadlineNanos) throws IOException {if (deadlineNanos == NONE) {
// 无定时工作,无一般工作执行时,开始轮询 IO 就绪事件,没有就始终阻塞 直到唤醒条件成立
return selector.select();}
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
这里大家可能会好奇,通过 deadlineToDelayNanos 办法
计算 timeoutMillis
的时候,为什么要给 deadlineNanos
在加上 0.995 毫秒
呢??
大家设想一下这样的场景,当最近的一个定时工作的 deadline
行将在 5 微秒
内达到,那么这时将纳秒转换成毫秒计算出的 timeoutMillis
会是0
。
而在 Netty 中 timeoutMillis = 0
要表白的语义是:定时工作执行工夫曾经达到 deadline
工夫点,须要被执行。
而现实情况是定时工作还有 5 微秒
才可能达到 deadline
,所以对于这种状况,须要在deadlineNanos
在加上 0.995 毫秒
凑成 1 毫秒
不能让其为 0。
所以从这里咱们能够看出,
Reactor
在有定时工作的状况下,至多要阻塞 1 毫秒
。
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {protected static long deadlineToDelayNanos(long deadlineNanos) {return ScheduledFutureTask.deadlineToDelayNanos(deadlineNanos);
}
}
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {static long deadlineToDelayNanos(long deadlineNanos) {return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - nanoTime());
}
// 启动工夫点
private static final long START_TIME = System.nanoTime();
static long nanoTime() {return System.nanoTime() - START_TIME;
}
static long deadlineNanos(long delay) {
// 计算定时工作执行 deadline 去除启动工夫
long deadlineNanos = nanoTime() + delay;
// Guard against overflow
return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
}
}
这里须要留神一下,在创立定时工作时会通过 deadlineNanos 办法
计算定时工作的执行 deadline
,deadline
的计算逻辑是 以后工夫点
+ 工作延时 delay
–系统启动工夫
。 这里须要扣除系统启动的工夫。
所以这里在通过 deadline
计算延时 delay
(也就是 timeout)的时候须要在加上 系统启动的工夫
: deadlineNanos - nanoTime()
当通过 deadlineToDelayNanos
计算出的 timeoutMillis <= 0
时,示意 Reactor
目前有邻近的 定时工作
须要执行,这时候就须要立马返回,不能阻塞在 Selector
上影响 定时工作
的执行。当然在返回执行 定时工作
前,须要在棘手通过 selector.selectNow()
非阻塞轮询一下 Channel
上是否有 IO 就绪事件
达到,避免耽搁 IO 事件
的解决。真是操碎了心~~
当 timeoutMillis > 0
时,Reactor 线程
就能够安心的阻塞在 Selector
上期待 IO 事件
的到来,直到 timeoutMillis
超时工夫达到。
timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis)
当注册在 Reactor
上的 Channel
中有 IO 事件
到来时,Reactor 线程
就会从 selector.select(timeoutMillis)
调用中唤醒,立刻去解决IO 就绪事件
。
这里假如一种极其状况,如果最近的一个定时工作的 deadline 是在将来很远的一个工夫点,这样就会使 timeoutMillis 的工夫十分十分久,那么 Reactor 岂不是会始终阻塞在 Selector 上造成 Netty 无奈工作?
笔者感觉大家当初心里应该曾经有了答案,咱们在《1.2.2 Reactor 开始轮询 IO 就绪事件》大节一开始介绍过,当 Reactor 正在 Selector 上阻塞时,如果此时用户线程向 Reactor 提交了异步工作,Reactor 线程会通过 execute 办法被唤醒。
流程到这里,Reactor 中最重要也是最外围的逻辑:轮询 Channel
上的 IO 就绪事件
的解决流程咱们就解说完了。
当 Reactor 轮询到有 IO 沉闷事件或者有异步工作须要执行时,就会从 Selector 上被唤醒,上面就到了该介绍 Reactor 被唤醒之后是如何解决 IO 就绪事件
以及如何执行 异步工作
的时候了。
Netty 毕竟是一个网络框架,所以它会优先去解决 Channel
上的 IO 事件
,基于这个事实,所以 Netty 不会容忍 异步工作
被无限度的执行从而影响IO 吞吐
。
Netty 通过 ioRatio 变量
来调配 Reactor 线程
在解决 IO 事件
和执行 异步工作
之间的 CPU 工夫
分配比例。
上面咱们就来看下这个执行工夫比例的调配逻辑是什么样的~~~
2. Reactor 解决 IO 与解决异步工作的工夫比例调配
无论什么时候,当有 IO 就绪事件
到来时,Reactor
都须要保障 IO 事件
被及时残缺的解决完,而 ioRatio
次要限度的是执行 异步工作
所需用时,避免 Reactor 线程
解决 异步工作
工夫过长而导致 I/O 事件
得不到及时地解决。
// 调整 Reactor 线程执行 IO 事件和执行异步工作的 CPU 工夫比例 默认 50,示意执行 IO 事件和异步工作的工夫比例是一比一
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) { // 先一股脑执行 IO 事件,在一股脑执行异步工作(无工夫限度)try {if (strategy > 0) {
// 如果有 IO 就绪事件 则解决 IO 就绪事件
processSelectedKeys();}
} finally {
// Ensure we always run tasks.
// 解决所有异步工作
ranTasks = runAllTasks();}
} else if (strategy > 0) {// 先执行 IO 事件 用时 ioTime 执行异步工作只能用时 ioTime * (100 - ioRatio) / ioRatio
final long ioStartTime = System.nanoTime();
try {processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
// 限定在超时工夫内 解决无限的异步工作 避免 Reactor 线程解决异步工作工夫过长而导致 I/O 事件阻塞
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else { // 没有 IO 就绪事件处理,则只执行异步工作 最多执行 64 个 避免 Reactor 线程解决异步工作工夫过长而导致 I/O 事件阻塞
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
- 当
ioRatio = 100
时,示意无需思考执行工夫的限度,当有IO 就绪事件
时(strategy > 0
)Reactor 线程
须要优先解决IO 就绪事件
,解决完IO 事件
后,执行所有的异步工作
包含:一般工作,尾部工作,定时工作。无工夫限度。
strategy
的数值示意IO 就绪
的Channel
个数。它是前边介绍的io.netty.channel.nio.NioEventLoop#select
办法的返回值。
- 当
ioRatio
设置的值不为100
时,默认为50
。须要先统计出执行IO 事件
的用时ioTime
,依据ioTime * (100 - ioRatio) / ioRatio
计算出,前面执行异步工作
的限度工夫。也就是说Reactor 线程
须要在这个限定的工夫内,执行无限的异步工作,避免Reactor 线程
因为解决异步工作
工夫过长而导致I/O 事件
得不到及时地解决。
默认状况下,执行
IO 事件
用时和执行异步工作
用时比例设置的是一比一。ioRatio
设置的越高,则Reactor 线程
执行异步工作的工夫占比越小
。
要想得到 Reactor 线程
执行 异步工作
所需的工夫限度,必须晓得执行 IO 事件
的用时 ioTime
而后在依据 ioRatio
计算出执行 异步工作
的工夫限度。
那如果此时并没有 IO 就绪事件
须要 Reactor 线程
解决的话,这种状况下咱们无奈失去 ioTime
,那怎么失去执行 异步工作
的限度工夫呢??
在这种非凡状况下,Netty 只容许 Reactor 线程
最多执行 64
个异步工作,而后就完结执行。转去持续轮训 IO 就绪事件
。外围目标还是避免Reactor 线程
因为解决 异步工作
工夫过长而导致 I/O 事件
得不到及时地解决。
默认状况下,当
Reactor
有异步工作
须要解决然而没有IO 就绪事件
时,Netty 只会容许Reactor 线程
执行最多64
个异步工作。
当初咱们对 Reactor
解决 IO 事件
和异步工作
的整体框架曾经理解了,上面咱们就来别离介绍下 Reactor 线程
在解决 IO 事件
和异步工作
的具体逻辑是什么样的?
3. Reactor 线程解决 IO 就绪事件
// 该字段为持有 selector 对象 selectedKeys 的援用,当 IO 事件就绪时,间接从这里获取
private SelectedSelectionKeySet selectedKeys;
private void processSelectedKeys() {
// 是否采纳 netty 优化后的 selectedKey 汇合类型 是由变量 DISABLE_KEY_SET_OPTIMIZATION 决定的 默认为 false
if (selectedKeys != null) {processSelectedKeysOptimized();
} else {processSelectedKeysPlain(selector.selectedKeys());
}
}
看到这段代码大家眼生吗??
不知大家还记不记得咱们在《聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现 (创立篇)》一文中介绍 Reactor NioEventLoop 类
在创立 Selector
的过程中提到,出于对 JDK NIO Selector
中selectedKeys 汇合
的插入
和遍历
操作性能的思考 Netty 将本人用数组实现的 SelectedSelectionKeySet 汇合
替换掉了 JDK NIO Selector
中selectedKeys
的 HashSet
实现。
public abstract class SelectorImpl extends AbstractSelector {
// The set of keys with data ready for an operation
// //IO 就绪的 SelectionKey(外面包裹着 channel)protected Set<SelectionKey> selectedKeys;
// The set of keys registered with this Selector
// 注册在该 Selector 上的所有 SelectionKey(外面包裹着 channel)protected HashSet<SelectionKey> keys;
............... 省略...................
}
Netty 中通过优化开关 DISABLE_KEY_SET_OPTIMIZATION
管制是否对 JDK NIO Selector
进行优化。默认是须要优化。
在优化开关开启的状况下,Netty 会将创立的 SelectedSelectionKeySet 汇合
保留在 NioEventLoop
的private SelectedSelectionKeySet selectedKeys
字段中,不便 Reactor 线程
间接从这里获取 IO 就绪
的SelectionKey
。
在优化开关敞开的状况下,Netty 会间接采纳 JDK NIO Selector
的默认实现。此时 NioEventLoop
的selectedKeys
字段就会为null
。
遗记这段的同学能够在回顾下《聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现 (创立篇)》一文中对于
Reactor
的创立过程。
通过对前边内容的回顾,咱们看到了在 Reactor
解决 IO 就绪事件
的逻辑也分为两个局部,一个是通过 Netty 优化的,一个是采纳 JDK 原生
的。
咱们先来看采纳 JDK 原生
的Selector
的解决形式,了解了这种形式,在看 Netty 优化的形式会更加容易。
3.1 processSelectedKeysPlain
咱们在《聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现 (创立篇)》一文中介绍JDK NIO Selector
的工作过程时讲过,当注册在 Selector
上的 Channel
产生 IO 就绪事件
时,Selector
会将 IO 就绪
的SelectionKey
插入到 Set<SelectionKey> selectedKeys
汇合中。
这时 Reactor 线程
会从 java.nio.channels.Selector#select(long)
调用中返回。随后调用 java.nio.channels.Selector#selectedKeys
获取 IO 就绪
的SelectionKey
汇合。
所以 Reactor 线程
在调用 processSelectedKeysPlain 办法
解决 IO 就绪事件
之前须要调用 selector.selectedKeys()
去获取所有 IO 就绪
的SelectionKeys
。
processSelectedKeysPlain(selector.selectedKeys())
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {if (selectedKeys.isEmpty()) {return;}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {final SelectionKey k = i.next();
final Object a = k.attachment();
// 留神每次迭代开端的 keyIterator.remove()调用。Selector 不会本人从已选择键集中移除 SelectionKey 实例。// 必须在解决完通道时本人移除。下次该通道变成就绪时,Selector 会再次将其放入已选择键集中。i.remove();
if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);
} else {@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {break;}
// 目标是再次进入 for 循环 移除生效的 selectKey(socketChannel 可能从 selector 上移除)
if (needsToSelectAgain) {selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {break;} else {i = selectedKeys.iterator();
}
}
}
}
3.1.1 获取 IO 就绪的 Channel
Set<SelectionKey> selectedKeys
汇合外面装的全副是 IO 就绪
的SelectionKey
,留神,此时 Set<SelectionKey> selectedKeys
的实现类型为HashSet 类型
。因为咱们这里首先介绍的是 JDK NIO 原生实现。
通过获取 HashSet
的迭代器,开始一一解决 IO 就绪
的Channel
。
Iterator<SelectionKey> i = selectedKeys.iterator();
final SelectionKey k = i.next();
final Object a = k.attachment();
大家还记得这个 SelectionKey
中的 attachment 属性
里寄存的是什么吗??
在上篇文章《具体图解 Netty Reactor 启动全流程》中咱们在讲 NioServerSocketChannel
向Main Reactor
注册的时候,通过 this 指针将本人作为 SelectionKey
的attachment 属性
注册到 Selector
中。这一步实现了 Netty 自定义 Channel
和JDK NIO Channel
的绑定。
public abstract class AbstractNioChannel extends AbstractChannel {
//channel 注册到 Selector 后取得的 SelectKey
volatile SelectionKey selectionKey;
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {............... 省略....................}
}
}
}
而咱们也提到 SelectionKey
就相当于是 Channel
在Selector
中的一种示意,当 Channel
上有 IO 就绪事件
时,Selector
会将 Channel
对应的 SelectionKey
返回给 Reactor 线程
,咱们能够通过返回的这个SelectionKey
里的 attachment 属性
获取到对应的 Netty 自定义Channel
。
对于客户端连贯事件(
OP_ACCEPT
)沉闷时,这里的Channel 类型
为NioServerSocketChannel
。
对于客户端读写事件(Read
,Write
)沉闷时,这里的Channel 类型
为NioSocketChannel
。
当咱们通过 k.attachment()
获取到 Netty 自定义的 Channel
时,就须要把这个 Channel
对应的 SelectionKey
从Selector
的就绪汇合 Set<SelectionKey> selectedKeys
中删除。因为 Selector 本人不会被动删除曾经解决完的 SelectionKey,须要调用者本人被动删除,这样当这个 Channel
再次 IO 就绪时
,Selector
会再次将这个 Channel
对应的 SelectionKey
放入就绪汇合 Set<SelectionKey> selectedKeys
中。
i.remove();
3.1.2 解决 Channel 上的 IO 事件
if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);
} else {@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
从这里咱们能够看出 Netty 向 SelectionKey
中的 attachment 属性
附加的对象分为两种:
- 一种是咱们相熟的
Channel
,无论是服务端应用的NioServerSocketChannel
还是客户端应用的NioSocketChannel
都属于AbstractNioChannel
。Channel
上的IO 事件
是由 Netty 框架负责解决,也是本大节咱们要重点介绍的 - 另一种就是
NioTask
,这种类型是 Netty 提供给用户能够自定义一些当Channel
上产生IO 就绪事件
时的自定义解决。
public interface NioTask<C extends SelectableChannel> {
/**
* Invoked when the {@link SelectableChannel} has been selected by the {@link Selector}.
*/
void channelReady(C ch, SelectionKey key) throws Exception;
/**
* Invoked when the {@link SelectionKey} of the specified {@link SelectableChannel} has been cancelled and thus
* this {@link NioTask} will not be notified anymore.
*
* @param cause the cause of the unregistration. {@code null} if a user called {@link SelectionKey#cancel()} or
* the event loop has been shut down.
*/
void channelUnregistered(C ch, Throwable cause) throws Exception;
}
NioTask
和Channel
其实实质上是一样的都是负责解决Channel
上的IO 就绪事件
,只不过一个是用户自定义解决
,一个是 Netty 框架解决。这里咱们重点关注Channel
的IO 解决逻辑
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// 获取 Channel 的底层操作类 Unsafe
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {...... 如果 SelectionKey 曾经生效则敞开对应的 Channel......}
try {
// 获取 IO 就绪事件
int readyOps = k.readyOps();
// 解决 Connect 事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {int ops = k.interestOps();
// 移除对 Connect 事件的监听,否则 Selector 会始终告诉
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
// 触发 channelActive 事件处理 Connect 事件
unsafe.finishConnect();}
// 解决 Write 事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();}
// 解决 Read 事件或者 Accept 事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();
}
} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());
}
}
- 首先咱们须要获取
IO 就绪 Channel
底层的操作类Unsafe
,用于对具体IO 就绪事件
的解决。
这里能够看出,Netty 对
IO 就绪事件
的解决全副封装在Unsafe 类
中。比方:对OP_ACCEPT 事件
的具体解决逻辑是封装在NioServerSocketChannel
中的UnSafe 类
中。对OP_READ 或者 OP_WRITE 事件
的解决是封装在NioSocketChannel
中的Unsafe 类
中。
- 从
Selectionkey
中获取具体IO 就绪事件 readyOps
。
SelectonKey
中对于 IO 事件
的汇合有两个。一个是 interestOps
, 用于记录Channel
感兴趣的 IO 事件
,在Channel
向Selector
注册结束后,通过 pipeline
中的 HeadContext
节点的 ChannelActive 事件回调
中增加。上面这段代码就是在 ChannelActive 事件回调
中 Channel 在向 Selector 注册本人感兴趣的 IO 事件。
public abstract class AbstractNioChannel extends AbstractChannel {
@Override
protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {return;}
readPending = true;
final int interestOps = selectionKey.interestOps();
/**
* 1:ServerSocketChannel 初始化时 readInterestOp 设置的是 OP_ACCEPT 事件
* 2:SocketChannel 初始化时 readInterestOp 设置的是 OP_READ 事件
* */
if ((interestOps & readInterestOp) == 0) {
// 注册监听 OP_ACCEPT 或者 OP_READ 事件
selectionKey.interestOps(interestOps | readInterestOp);
}
}
}
另一个就是这里的 readyOps
,用于记录在Channel
感兴趣的 IO 事件
中具体哪些 IO 事件
就绪了。
Netty 中将各种事件的汇合用一个 int 型
变量来保留。
- 用
&
操作判断,某个事件是否在事件汇合中:(readyOps & SelectionKey.OP_CONNECT) != 0
,这里就是判断 Channel 是否对 Connect 事件感兴趣。 - 用
|
操作向事件汇合中增加事件:interestOps | readInterestOp
- 从事件汇合中删除某个事件,是通过先将要删除事件取反
~
,而后在和事件汇合做&
操作:ops &= ~SelectionKey.OP_CONNECT
Netty 这种对空间的极致利用思维,很值得咱们平时在日常开发中学习~~
当初咱们曾经晓得哪些 Channel
当初处于 IO 就绪状态
,并且晓得了具体哪些类型的IO 事件
曾经就绪。
上面就该针对 Channel
上的不同 IO 就绪事件
做出相应的解决了。
3.1.2.1 解决 Connect 事件
Netty 客户端向服务端发动连贯,并向客户端的 Reactor
注册 Connect 事件
,当连贯建设胜利后,客户端的NioSocketChannel
就会产生Connect 就绪事件
,通过后面内容咱们讲的Reactor 的运行框架
,最终流程会走到这里。
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
// 触发 channelActive 事件
unsafe.finishConnect();}
如果 IO 就绪
的事件是 Connect 事件
,那么就调用对应客户端NioSocketChannel
中的 Unsafe 操作类
中的 finishConnect 办法
解决 Connect 事件
。这时会在 Netty 客户端NioSocketChannel
中的 pipeline
中流传ChannelActive 事件
。
最初须要将 OP_CONNECT 事件
从客户端 NioSocketChannel
所关怀的事件汇合 interestOps
中删除。否则 Selector
会始终告诉Connect 事件就绪
。
3.1.2.2 解决 Write 事件
对于 Reactor 线程
解决 Netty 中的 Write 事件
的流程,笔者后续会专门用一篇文章来为大家介绍。本文咱们重点关注 Reactor 线程
的整体运行框架。
if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();}
这里大家只须要记住,OP_WRITE 事件的注册是由用户来实现的,当 Socket 发送缓冲区已满无奈持续写入数据时,用户会向 Reactor 注册 OP_WRITE 事件,等到 Socket 发送缓冲区变得可写时,Reactor 会收到 OP_WRITE 事件沉闷告诉,随后在这里调用客户端 NioSocketChannel
中的 forceFlush 办法
将残余数据发送进来。
3.1.2.3 解决 Read 事件或者 Accept 事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();
}
这里能够看出 Netty 中解决 Read 事件
和Accept 事件
都是由对应 Channel
中的 Unsafe 操作类
中的 read 办法
解决。
服务端 NioServerSocketChannel
中的 Read 办法
解决的是 Accept 事件
,客户端NioSocketChannel
中的 Read 办法
解决的是Read 事件
。
这里大家只需记住各个
IO 事件
在对应Channel
中的解决入口,后续文章咱们会详细分析这些入口函数。
3.1.3 从 Selector 中移除生效的 SelectionKey
// 用于及时从 selectedKeys 中革除生效的 selectKey 比方 socketChannel 从 selector 上被用户移除
private boolean needsToSelectAgain;
// 目标是再次进入 for 循环 移除生效的 selectKey(socketChannel 可能被用户从 selector 上移除)
if (needsToSelectAgain) {selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {break;} else {i = selectedKeys.iterator();
}
}
在前边介绍 Reactor 运行框架
的时候,咱们看到在每次 Reactor 线程
轮询完结,筹备解决 IO 就绪事件
以及 异步工作
的时候,都会将 needsToSelectAgain
设置为false
。
那么这个 needsToSelectAgain
到底是干嘛的?以及为什么咱们须要去 “Select Again”
呢?
首先咱们来看下在什么状况下会将 needsToSelectAgain
这个变量设置为true
,通过这个设置的过程,咱们是否可能从中找到一些线索?
咱们晓得 Channel
能够将本人注册到 Selector
上,那么当然也能够将本人从 Selector
上勾销移除。
在上篇文章中咱们也花了大量的篇幅解说了这个注册的过程,当初咱们来看下 Channel
的勾销注册。
public abstract class AbstractNioChannel extends AbstractChannel {
//channel 注册到 Selector 后取得的 SelectKey
volatile SelectionKey selectionKey;
@Override
protected void doDeregister() throws Exception {eventLoop().cancel(selectionKey());
}
protected SelectionKey selectionKey() {
assert selectionKey != null;
return selectionKey;
}
}
Channel
勾销注册的过程很简略,间接调用 NioChannel
的doDeregister
办法,Channel
绑定的 Reactor
会将其从 Selector
中勾销并进行监听 Channel
上的IO 事件
。
public final class NioEventLoop extends SingleThreadEventLoop {
// 记录 Selector 上移除 socketChannel 的个数 达到 256 个 则须要将有效的 selectKey 从 SelectedKeys 汇合中革除掉
private int cancelledKeys;
private static final int CLEANUP_INTERVAL = 256;
/**
* 将 socketChannel 从 selector 中移除 勾销监听 IO 事件
* */
void cancel(SelectionKey key) {key.cancel();
cancelledKeys ++;
// 当从 selector 中移除的 socketChannel 数量达到 256 个,设置 needsToSelectAgain 为 true
// 在 io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain 中从新做一次轮询,将生效的 selectKey 移除,// 以保障 selectKeySet 的有效性
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
needsToSelectAgain = true;
}
}
}
- 调用
JDK NIO SelectionKey
的 APIcancel 办法
,将Channel
从Selector
中勾销掉。SelectionKey#cancel 办法
调用结束后,此时调用SelectionKey#isValid
将会返回false
。SelectionKey#cancel 办法
调用后,Selector
会将要勾销的这个SelectionKey
退出到Selector
中的cancelledKeys 汇合
中。
public abstract class AbstractSelector extends Selector {private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();
void cancel(SelectionKey k) {synchronized (cancelledKeys) {cancelledKeys.add(k);
}
}
}
- 当
Channel
对应的SelectionKey
勾销结束后,Channel
勾销计数器cancelledKeys
会加 1,当cancelledKeys = 256
时,将needsToSelectAgain
设置为true
。 - 随后在
Selector
的下一次
轮询过程中,会将cancelledKeys 汇合
中的SelectionKey
从Selector
中 所有的KeySet
中移除 。这里的KeySet
包含Selector
用于寄存就绪SelectionKey
的selectedKeys 汇合
,以及用于寄存所有注册的Channel
对应的SelectionKey
的keys 汇合
。
public abstract class SelectorImpl extends AbstractSelector {protected Set<SelectionKey> selectedKeys = new HashSet();
protected HashSet<SelectionKey> keys = new HashSet();
..................... 省略...............
}
咱们看到 Reactor 线程
中对 needsToSelectAgain
的判断是在 processSelectedKeysPlain 办法
解决 IO 就绪
的SelectionKey
的循环体中进行判断的。
之所以这里特地提到 needsToSelectAgain
判断的地位,是要让大家留神到此时 Reactor
正在解决 本次
轮询的IO 就绪事件
。
而前边也说了,当调用 SelectionKey#cancel 办法
后,须要等到 下次轮询
的过程中 Selector
才会将这些勾销的 SelectionKey
从Selector
中的所有 KeySet 汇合
中移除,当然这里也包含就绪汇合selectedKeys
。
当在 本次
轮询期间,如果大量的 Channel
从Selector
中勾销,Selector 中的就绪汇合 selectedKeys
中仍然会保留这些 Channel
对应 SelectionKey
直到 下次轮询
。那么当然会影响本次轮询后果selectedKeys
的有效性。
所以为了保障 Selector
中所有 KeySet
的有效性 ,须要在Channel
勾销个数达到 256
时,触发一次selectNow
,目标是革除有效的SelectionKey
。
private void selectAgain() {
needsToSelectAgain = false;
try {selector.selectNow();
} catch (Throwable t) {logger.warn("Failed to update SelectionKeys.", t);
}
}
到这里,咱们就对 JDK 原生 Selector
的解决形式 processSelectedKeysPlain 办法
就介绍完了,其实 对 IO 就绪事件
的解决逻辑都是一样的,在咱们了解了 processSelectedKeysPlain 办法
后,processSelectedKeysOptimized 办法
对IO 就绪事件
的解决,咱们了解起来就十分轻松了。
3.2 processSelectedKeysOptimized
Netty 默认会采纳优化过的 Selector
对IO 就绪事件
的解决。然而解决逻辑是大同小异的。上面咱们次要介绍一下这两个办法的不同之处。
private void processSelectedKeysOptimized() {
// 在 openSelector 的时候将 JDK 中 selector 实现类中得 selectedKeys 和 publicSelectKeys 字段类型
// 由原来的 HashSet 类型替换为 Netty 优化后的数组实现的 SelectedSelectionKeySet 类型
for (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];
// 对应迭代器中得 remove selector 不会本人革除 selectedKey
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);
} else {@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
JDK NIO 原生 Selector
寄存IO 就绪的 SelectionKey
的汇合为HashSet 类型
的selectedKeys
。而 Netty 为了优化对selectedKeys 汇合
的遍历效率
采纳了本人实现的SelectedSelectionKeySet 类型
,从而用对数组
的遍历代替用HashSet
的迭代器遍历。-
Selector
会在每次轮询到IO 就绪事件
时,将IO 就绪的 Channel
对应的SelectionKey
插入到selectedKeys 汇合
,然而Selector
只管向selectedKeys 汇合
放入IO 就绪的 SelectionKey
, 当SelectionKey
被处理完毕后,Selector
是不会本人被动将其从selectedKeys 汇合
中移除的,典型的管杀不论埋
。所以须要 Netty 本人在遍历到IO 就绪的 SelectionKey
后,将其删除。- 在
processSelectedKeysPlain
中是间接将其从迭代器中删除。 - 在
processSelectedKeysOptimized
中将其在数组中对应的地位置为Null
,不便垃圾回收。
- 在
- 在最初革除有效的
SelectionKey
时,在processSelectedKeysPlain
中因为采纳的是JDK NIO 原生的 Selector
,所以只须要执行SelectAgain
就能够,Selector
会主动革除有效 Key。
然而在processSelectedKeysOptimized
中因为是 Netty 本人实现的优化类型,所以须要 Netty 本人将SelectedSelectionKeySet
数组中的SelectionKey
全副革除,最初在执行SelectAgain
。
好了,到这里,咱们就将 Reactor 线程
如何解决 IO 就绪事件
的整个过程讲述完了,上面咱们就该到了介绍 Reactor 线程
如何解决 Netty 框架中的异步工作了。
4. Reactor 线程解决异步工作
Netty 对于解决 异步工作
的办法有两个:
- 一个是无超时工夫限度的
runAllTasks() 办法
。当ioRatio
设置为100
时,Reactor 线程
会先一股脑的解决IO 就绪事件
,而后在一股脑的执行异步工作
,并没有工夫的限度。 - 另一个是有超时工夫限度的
runAllTasks(long timeoutNanos) 办法
。当ioRatio != 100
时,Reactor 线程
执行异步工作
会有工夫限度,优先一股脑的解决完IO 就绪事件
统计出执行IO 工作
耗时ioTime
。依据公式ioTime * (100 - ioRatio) / ioRatio)
计算出Reactor 线程
执行异步工作
的超时工夫。在超时工夫限定范畴内,执行无限的异步工作
。
上面咱们来别离看下这两个执行 异步工作
的办法解决逻辑:
4.1 runAllTasks()
protected boolean runAllTasks() {assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
// 将达到执行工夫的定时工作转存到一般工作队列 taskQueue 中,对立由 Reactor 线程从 taskQueue 中取出执行
fetchedAll = fetchFromScheduledTaskQueue();
if (runAllTasksFrom(taskQueue)) {ranAtLeastOne = true;}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {lastExecutionTime = ScheduledFutureTask.nanoTime();
}
// 执行尾部队列工作
afterRunningAllTasks();
return ranAtLeastOne;
}
Reactor 线程
执行 异步工作
的外围逻辑就是:
- 先将到期的
定时工作
一股脑的从定时工作队列scheduledTaskQueue
中取出并转存到一般工作队列taskQueue
中。 - 由
Reactor 线程
对立从一般工作队列taskQueue
中取出工作执行。 - 在
Reactor 线程
执行完定时工作
和一般工作
后,开始执行存储于尾部工作队列tailTasks
中的尾部工作
。
上面咱们来别离看下上述几个外围步骤的实现:
4.1.1 fetchFromScheduledTaskQueue
/**
* 从定时工作队列中取出达到 deadline 执行工夫的定时工作
* 将定时工作 转存到 一般工作队列 taskQueue 中,对立由 Reactor 线程从 taskQueue 中取出执行
*
* */
private boolean fetchFromScheduledTaskQueue() {if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {return true;}
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
for (;;) {
// 从定时工作队列中取出达到执行 deadline 的定时工作 deadline <= nanoTime
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {return true;}
if (!taskQueue.offer(scheduledTask)) {
// taskQueue 没有空间包容 则在将定时工作从新塞进定时工作队列中期待下次执行
scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
}
}
- 获取以后要执行
异步工作
的工夫点nanoTime
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {private static final long START_TIME = System.nanoTime();
static long nanoTime() {return System.nanoTime() - START_TIME;
}
}
- 从定时工作队列中找出
deadline <= nanoTime
的异步工作。也就是说找出所有到期的定时工作。
protected final Runnable pollScheduledTask(long nanoTime) {assert inEventLoop();
// 从定时队列中取出要执行的定时工作 deadline <= nanoTime
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {return null;}
// 合乎取出条件 则取出
scheduledTaskQueue.remove();
scheduledTask.setConsumed();
return scheduledTask;
}
- 将
到期的定时工作
插入到一般工作队列taskQueue
中,如果taskQueue
曾经没有空间包容新的工作,则将定时工作
从新塞进定时工作队列
中期待下次拉取。
if (!taskQueue.offer(scheduledTask)) {scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
fetchFromScheduledTaskQueue 办法
的返回值为true
时示意到期的定时工作曾经全副拉取出来并转存到一般工作队列中。
返回值为false
时示意到期的定时工作只拉取出来一部分,因为这时一般工作队列曾经满了,当执行完一般工作时,还须要在进行一次拉取。
当 到期的定时工作
从定时工作队列中拉取结束或者当一般工作队列已满时,这时就会进行拉取,开始执行一般工作队列中的 异步工作
。
4.1.2 runAllTasksFrom
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {Runnable task = pollTaskFrom(taskQueue);
if (task == null) {return false;}
for (;;) {safeExecute(task);
task = pollTaskFrom(taskQueue);
if (task == null) {return true;}
}
}
- 首先
runAllTasksFrom 办法
的返回值示意是否执行了至多一个异步工作。前面会赋值给ranAtLeastOne 变量
,这个返回值咱们后续会用到。 - 从一般工作队列中拉取
异步工作
。
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {for (;;) {Runnable task = taskQueue.poll();
if (task != WAKEUP_TASK) {return task;}
}
}
Reactor 线程
执行异步工作
。
protected static void safeExecute(Runnable task) {
try {task.run();
} catch (Throwable t) {logger.warn("A task raised an exception. Task: {}", task, t);
}
}
4.1.3 afterRunningAllTasks
if (ranAtLeastOne) {lastExecutionTime = ScheduledFutureTask.nanoTime();
}
// 执行尾部队列工作
afterRunningAllTasks();
return ranAtLeastOne;
如果 Reactor 线程
执行了至多一个 异步工作
,那么设置lastExecutionTime
,并将ranAtLeastOne 标识
返回。这里的 ranAtLeastOne 标识
就是 runAllTasksFrom 办法
的返回值。
最初执行收尾工作,也就是执行尾部工作队列中的尾部工作。
@Override
protected void afterRunningAllTasks() {runAllTasksFrom(tailTasks);
}
4.2 runAllTasks(long timeoutNanos)
这里在解决 异步工作
的外围逻辑还是和之前一样的,只不过就是多了对 超时工夫
的管制。
protected boolean runAllTasks(long timeoutNanos) {fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
// 一般队列中没有工作时 执行队尾队列的工作
afterRunningAllTasks();
return false;
}
// 异步工作执行超时 deadline
final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
for (;;) {safeExecute(task);
runTasks ++;
// 每运行 64 个异步工作 检查一下 是否达到 执行 deadline
if ((runTasks & 0x3F) == 0) {lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
// 达到异步工作执行超时 deadline,进行执行异步工作
break;
}
}
task = pollTask();
if (task == null) {lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
- 首先还是通过
fetchFromScheduledTaskQueue 办法
从Reactor
中的定时工作队列中拉取到期的定时工作
,转存到一般工作队列中。当一般工作队列已满或者到期定时工作
全副拉取结束时,进行拉取。 - 将
ScheduledFutureTask.nanoTime() + timeoutNanos
作为Reactor 线程
执行异步工作的超时工夫点deadline
。 - 因为零碎调用
System.nanoTime()
须要肯定的零碎开销,所以每执行完64
个异步工作
的时候才会去检查一下执行工夫
是否达到了deadline
。如果达到了执行截止工夫deadline
则退出进行执行异步工作
。如果没有达到deadline
则持续从一般工作队列中取出工作循环执行上来。
从这个细节又能够看出 Netty 对性能的考量还是相当考究的
流程走到这里,咱们就对 Reactor
的整个运行框架以及 如何轮询 IO 就绪事件
, 如何解决 IO 就绪事件
, 如何执行异步工作
的具体实现逻辑就分析完了。
上面还有一个小小的尾巴,就是 Netty 是如何解决文章结尾提到的 JDK NIO Epoll 的空轮询 BUG
的,让咱们一起来看下吧~~~
5. 解决 JDK Epoll 空轮询 BUG
前边提到,因为 JDK NIO Epoll 的空轮询 BUG
存在,这样会导致 Reactor 线程
在没有任何事件可做的状况下被意外唤醒,导致 CPU 空转。
其实 Netty 也没有从根本上解决这个JDK BUG
,而是抉择奇妙的绕过这个BUG
。
上面咱们来看下 Netty 是如何做到的。
if (ranTasks || strategy > 0) {if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) {// Unexpected wakeup (unusual case)
// 既没有 IO 就绪事件,也没有异步工作,Reactor 线程从 Selector 上被异样唤醒 触发 JDK Epoll 空轮训 BUG
// 从新构建 Selector,selectCnt 归零
selectCnt = 0;
}
在 Reactor 线程
解决完 IO 就绪事件
和异步工作
后,会查看这次 Reactor 线程
被唤醒有没有执行过异步工作和有没有IO 就绪的 Channel
。
boolean ranTasks
这时候就派上了用场,这个ranTasks
正是前边咱们在讲runAllTasks 办法
时提到的返回值。用来示意是否执行过至多一次异步工作
。int strategy
正是JDK NIO Selector
的select 办法
的返回值,用来示意IO 就绪
的Channel 个数
。
如果 ranTasks = false 并且 strategy = 0
这代表 Reactor 线程
本次既没有 异步工作
执行也没有 IO 就绪
的Channel
须要解决却被意外的唤醒。等于是空转了一圈啥也没干。
这种状况下 Netty 就会认为可能曾经触发了JDK NIO Epoll 的空轮询 BUG
int SELECTOR_AUTO_REBUILD_THRESHOLD = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
private boolean unexpectedSelectorWakeup(int selectCnt) {
.................. 省略...............
/**
* 走到这里的条件是 既没有 IO 就绪事件,也没有异步工作,Reactor 线程从 Selector 上被异样唤醒
* 这种状况可能是曾经触发了 JDK Epoll 的空轮询 BUG,如果这种状况继续 512 次 则认为可能曾经触发 BUG,于是重建 Selector
*
* */
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
return true;
}
return false;
}
- 如果
Reactor
这种意外唤醒的次数selectCnt
超过了配置的次数SELECTOR_AUTO_REBUILD_THRESHOLD
, 那么 Netty 就会认定这种状况可能曾经触发了JDK NIO Epoll 空轮询 BUG
,则重建Selector
(将之前注册的所有 Channel 从新注册到新的 Selector 上并敞开旧的 Selector
),selectCnt 计数
归0
。
SELECTOR_AUTO_REBUILD_THRESHOLD
默认为512
,能够通过零碎变量-D io.netty.selectorAutoRebuildThreshold
指定自定义数值。
- 如果
selectCnt
小于SELECTOR_AUTO_REBUILD_THRESHOLD
,则返回不做任何解决,selectCnt
持续计数。
Netty 就这样通过计数 Reactor
被意外唤醒的次数,如果计数 selectCnt
达到了 512 次
,则通过 重建 Selector
奇妙的绕开了JDK NIO Epoll 空轮询 BUG
。
咱们在日常开发中也能够借鉴 Netty 这种解决问题的思路,比方在我的项目开发中,当咱们发现咱们无奈保障彻底的解决一个问题时,或者为了解决这个问题导致咱们的投入产出比不高时,咱们就该思考是不是应该换一种思路去绕过这个问题,从而达到同样的成果。解决问题的最高境界就是不解决它,奇妙的绕过去
~!!
总结
本文花了大量的篇幅介绍了 Reactor
整体的运行框架,并深刻介绍了 Reactor
外围的工作模块的具体实现逻辑。
通过本文的介绍咱们晓得了 Reactor
如何轮询注册在其上的所有 Channel 上感兴趣的 IO 事件,以及 Reactor 如何去解决 IO 就绪的事件,如何执行 Netty 框架中提交的异步工作和定时工作。
最初介绍了 Netty 如何奇妙的绕过 JDK NIO Epoll 空轮询的 BUG, 达到解决问题的目标。
提炼了新的解决问题的思路:解决问题的最高境界就是不解决它,奇妙的绕过去~!!
好了,本文的内容就到这里了,咱们下篇文章见~