本系列Netty源码解析文章基于 4.1.56.Final版本
本文笔者来为大家介绍下Netty的外围引擎Reactor的运行架构,心愿通过本文的介绍可能让大家对Reactor是如何驱动着整个Netty框架的运行有一个全面的意识。也为咱们后续进一步介绍Netty对于解决网络申请的整个生命周期的相干内容做一个前置常识的铺垫,不便大家后续了解。
那么在开始本文正式的内容之前,笔者先来带着大家回顾下前边文章介绍的对于Netty整个框架如何搭建的相干内容,没有看过笔者前边几篇文章的读者敌人也没关系,这些并不会影响到本文的浏览,只不过波及到相干细节的局部,大家能够在回看下。
前文回顾
在《聊聊Netty那些事儿之Reactor在Netty中的实现(创立篇)》一文中,咱们介绍了Netty服务端的外围引擎主从Reactor线程组
的创立过程以及相干外围组件里的重要属性。在这个过程中,咱们还提到了Netty对各种细节进行的优化,比方针对JDK NIO 原生Selector做的一些优化,展示了Netty对性能极致的谋求。最终咱们创立出了如下构造的Reactor。
在上篇文章《具体图解Netty Reactor启动全流程》中,咱们残缺地介绍了Netty服务端启动的整个流程,并介绍了在启动过程中波及到的ServerBootstrap相干的属性以及配置形式。用于接管连贯的服务端NioServerSocketChannel的创立和初始化过程以及其类的继承构造。其中重点介绍了NioServerSocketChannel向Reactor的注册过程以及Reactor线程的启动机会和pipeline的初始化机会。最初介绍了NioServerSocketChannel绑定端口地址的整个流程。在这个过程中咱们理解了Netty的这些外围组件是如何串联起来的。
当Netty启动结束后,咱们失去了如下的框架结构:
主Reactor线程组中治理的是NioServerSocketChannel
用于接管客户端连贯,并在本人的pipeline中的ServerBootstrapAcceptor
里初始化接管到的客户端连贯,随后会将初始化好的客户端连贯注册到从Reactor线程组中。
从Reactor线程组次要负责监听解决注册其上的所有客户端连贯的IO就绪事件。
其中一个Channel只能调配给一个固定的Reactor。一个Reactor负责解决多个Channel上的IO就绪事件,这样能够将服务端承载的全量客户端连贯
摊派到多个Reactor
中解决,同时也能保障Channel上IO解决的线程安全性
。Reactor与Channel之间的对应关系如下图所示:
以上内容就是对笔者前边几篇文章的相干内容回顾,大家能回顾起来更好,回顾不起来也没关系,一点也不影响大家了解本文的内容。如果对相干细节感兴趣的同学,能够在浏览完本文之后,在去回看下。
咱们言归正传,正式开始本文的内容,笔者接下来会为大家介绍这些外围组件是如何相互配合从而驱动着整个Netty Reactor框架运行的。
当Netty Reactor框架启动结束后,接下来第一件事件也是最重要的事件就是如何来高效的接管客户端的连贯。
那么在探讨Netty服务端如何接管连贯之前,咱们须要弄清楚Reactor线程
的运行机制,它是如何监听并解决Channel
上的IO就绪事件
的。
本文相当于是后续咱们介绍Reactor线程
监听解决ACCEPT事件
,Read事件
,Write事件
的前置篇,本文专一于讲述Reactor线程
的整个运行框架。了解了本文的内容,对了解前面Reactor线程
如何解决IO事件
会大有帮忙。
咱们在Netty框架的创立阶段
和启动阶段
无数次的提到了Reactor线程
,那么在本文要介绍的运行阶段
就该这个Reactor线程
来大显神威了。
通过前边文章的介绍,咱们理解到Netty中的Reactor线程
次要干三件事件:
- 轮询注册在
Reactor
上的所有Channel
感兴趣的IO就绪事件
。 - 解决
Channel
上的IO就绪事件
。 - 执行Netty中的异步工作。
正是这三个局部组成了Reactor
的运行框架,那么咱们当初来看下这个运行框架具体是怎么运行的~~
Reactor线程的整个运行框架
大家还记不记得笔者在《聊聊Netty那些事儿之从内核角度看IO模型》一文中提到的,IO模型的演变
是围绕着"如何用尽可能少的线程去治理尽可能多的连贯"
这一主题进行的。
Netty的IO模型
是通过JDK NIO Selector
实现的IO多路复用模型
,而Netty的IO线程模型
为主从Reactor线程模型
。
依据《聊聊Netty那些事儿之从内核角度看IO模型》一文中介绍的IO多路复用模型
咱们很容易就能了解到Netty会应用一个用户态的Reactor线程
去一直的通过Selector
在内核态去轮训Channel
上的IO就绪事件
。
说白了Reactor线程
其实执行的就是一个死循环
,在死循环
中一直的通过Selector
去轮训IO就绪事件
,如果产生IO就绪事件
则从Selector
零碎调用中返回并解决IO就绪事件
,如果没有产生IO就绪事件
则始终阻塞
在Selector
零碎调用上,直到满足Selector唤醒条件
。
以下三个条件中只有满足任意一个条件,Reactor线程就会被从Selector上唤醒:
- 当Selector轮询到有IO沉闷事件产生时。
- 当Reactor线程须要执行的
定时工作
达到工作执行工夫deadline
时。 - 当有
异步工作
提交给Reactor时,Reactor线程须要从Selector
上被唤醒,这样能力及时的去执行异步工作
。
这里能够看出Netty对Reactor线程
的压迫还是比拟狠的,反正当初也没有IO就绪事件
须要去解决,不能让Reactor线程
在这里白白等着,要立刻唤醒它,转去解决提交过去的异步工作以及定时工作。Reactor线程
堪称996榜样
一刻不停歇地运作着。
在理解了Reactor线程
的大略运行框架后,咱们接下来就到源码中去看下它的外围运行框架是如何实现进去的。
因为这块源码比拟宏大繁冗,所以笔者先把它的运行框架提取进去,不便大家整体的了解整个运行过程的全貌。
上图所展现的就是Reactor整个工作体系的全貌,次要分为如下几个重要的工作模块:
- Reactor线程在Selector上阻塞获取IO就绪事件。在这个模块中首先会去查看以后是否有异步工作须要执行,如果有异步须要执行,那么不论以后有没有IO就绪事件都不能阻塞在Selector上,随后会去非阻塞的轮询一下Selector上是否有IO就绪事件,如果有,正好能够和异步工作一起执行。优先解决IO就绪事件,在执行异步工作。
- 如果以后没有异步工作须要执行,那么Reactor线程会接着查看是否有定时工作须要执行,如果有则在Selector上阻塞直到定时工作的到期工夫deadline,或者满足其余唤醒条件被唤醒。如果没有定时工作须要执行,Reactor线程则会在Selector上始终阻塞直到满足唤醒条件。
- 当Reactor线程满足唤醒条件被唤醒后,首先会去判断以后是因为有IO就绪事件被唤醒还是因为有异步工作须要执行被唤醒或者是两者都有。随后Reactor线程就会去解决IO就绪事件和执行异步工作。
- 最初Reactor线程返回循环终点一直的反复上述三个步骤。
以上就是Reactor线程运行的整个外围逻辑,上面是笔者根据上述外围逻辑,将Reactor的整体代码设计框架提取进去,大家能够联合上边的Reactor工作流程图,从总体上先感触下整个源码实现框架,可能把Reactor的外围解决步骤和代码中相应的解决模块对应起来即可,这里不须要读懂每一行代码,要以逻辑解决模块为单位了解。前面笔者会将这些一个一个的逻辑解决模块在独自拎进去为大家具体介绍。
@Override protected void run() { //记录轮询次数 用于解决JDK epoll的空轮训bug int selectCnt = 0; for (;;) { try { //轮询后果 int strategy; try { //依据轮询策略获取轮询后果 这里的hasTasks()次要查看的是一般队列和尾部队列中是否有异步工作期待执行 strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // NIO不反对自旋(BUSY_WAIT) case SelectStrategy.SELECT: 外围逻辑是有工作须要执行,则Reactor线程立马执行异步工作,如果没有异步工作执行,则进行轮询IO事件 default: } } catch (IOException e) { ................省略............... } 执行到这里阐明满足了唤醒条件,Reactor线程从selector上被唤醒开始解决IO就绪事件和执行异步工作 /** * Reactor线程须要保障及时的执行异步工作,只有有异步工作提交,就须要退出轮询。 * 有IO事件就优先解决IO事件,而后解决异步工作 * */ selectCnt++; //次要用于从IO就绪的SelectedKeys汇合中剔除曾经生效的selectKey needsToSelectAgain = false; //调整Reactor线程执行IO事件和执行异步工作的CPU工夫比例 默认50,示意执行IO事件和异步工作的工夫比例是一比一 final int ioRatio = this.ioRatio; 这里次要解决IO就绪事件,以及执行异步工作 须要优先解决IO就绪事件,而后依据ioRatio设置的解决IO事件CPU用时与异步工作CPU用时比例, 来决定执行多长时间的异步工作 //判断是否触发JDK Epoll BUG 触发空轮询 if (ranTasks || strategy > 0) { if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } selectCnt = 0; } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case) //既没有IO就绪事件,也没有异步工作,Reactor线程从Selector上被异样唤醒 触发JDK Epoll空轮训BUG //从新构建Selector,selectCnt归零 selectCnt = 0; } } catch (CancelledKeyException e) { ................省略............... } catch (Error e) { ................省略............... } catch (Throwable t) { ................省略............... } finally { ................省略............... } } }
从下面提取进去的Reactor的源码实现框架中,咱们能够看出Reactor线程
次要做了上面几个事件:
- 通过
JDK NIO Selector
轮询注册在Reactor
上的所有Channel
感兴趣的IO事件
。对于NioServerSocketChannel来说因为它次要负责接管客户端连贯所以监听的是OP_ACCEPT事件
,对于客户端NioSocketChannel来说因为它次要负责解决连贯上的读写事件所以监听的是OP_READ
和OP_WRITE
事件。
这里须要留神的是netty只会主动注册OP_READ
事件,而OP_WRITE事件
是在当Socket写入缓冲区以满无奈持续写入发送数据时由用户本人注册。
如果有异步工作须要执行,则立马进行轮询操作,转去执行异步工作。这里分为两种状况:
- 既有
IO就绪事件
产生,也有异步工作
须要执行。则优先解决IO就绪事件
,而后依据ioRatio
设置的执行工夫比例
决定执行多长时间的异步工作。这里Reactor线程须要管制异步工作的执行工夫,因为Reactor线程的外围是解决IO就绪事件,不能因为异步工作的执行而耽搁了最重要的事件。 没有
IO就绪事件
产生,然而有异步工作或者定时工作到期须要执行。则只执行异步工作
,尽可能的去压迫Reactor线程。没有IO就绪事件产生也不能闲着。这里第二种状况下只会执行
64
个异步工作,目标是为了避免适度
执行异步工作,耽搁了
最重要的事件轮询IO事件
。
- 既有
- 在最初Netty会判断本次
Reactor线程
的唤醒是否是因为触发了JDK epoll 空轮询 BUG导致的,如果触发了该BUG,则重建Selector
。绕过JDK BUG,达到解决问题的目标。
失常状况下Reactor线程从Selector中被唤醒有两种状况:
- 轮询到有IO就绪事件产生。
- 有异步工作或者定时工作须要执行。
而JDK epoll 空轮询 BUG会在上述两种状况都没有产生的时候,Reactor线程
会意外的从Selector
中被唤醒,导致CPU空转。JDK epoll 空轮询 BUG:https://bugs.java.com/bugdata...
好了,Reactor线程
的总体运行构造框架咱们当初曾经理解了,上面咱们来深刻到这些外围解决模块中来各个击破它们~~
1. Reactor线程轮询IO就绪事件
在《聊聊Netty那些事儿之Reactor在Netty中的实现(创立篇)》一文中,笔者在讲述主从Reactor线程组NioEventLoopGroup
的创立过程的时候,提到一个结构器参数SelectStrategyFactory
。
public NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider) { this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); } public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
Reactor线程
最重要的一件事件就是轮询IO就绪事件
,SelectStrategyFactory
就是用于指定轮询策略的,默认实现为DefaultSelectStrategyFactory.INSTANCE
。
而在Reactor线程
开启轮询的一开始,就是用这个selectStrategy
去计算一个轮询策略strategy
,后续会依据这个strategy
进行不同的逻辑解决。
@Override protected void run() { //记录轮询次数 用于解决JDK epoll的空轮训bug int selectCnt = 0; for (;;) { try { //轮询后果 int strategy; try { //依据轮询策略获取轮询后果 这里的hasTasks()次要查看的是一般队列和尾部队列中是否有异步工作期待执行 strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // NIO不反对自旋(BUSY_WAIT) case SelectStrategy.SELECT: 外围逻辑是有工作须要执行,则Reactor线程立马执行异步工作,如果没有异步工作执行,则进行轮询IO事件 default: } } catch (IOException e) { ................省略............... } ................省略...............}
上面咱们来看这个轮询策略strategy
具体的计算逻辑是什么样的?
1.1 轮询策略
public interface SelectStrategy { /** * Indicates a blocking select should follow. */ int SELECT = -1; /** * Indicates the IO loop should be retried, no blocking select to follow directly. */ int CONTINUE = -2; /** * Indicates the IO loop to poll for new events without blocking. */ int BUSY_WAIT = -3; int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;}
咱们首先来看下Netty中定义的这三种轮询策略:
SelectStrategy.SELECT:
此时没有任何异步工作须要执行,Reactor线程
能够安心的阻塞
在Selector
上期待IO就绪事件
的降临。SelectStrategy.CONTINUE:
从新开启一轮IO轮询
。SelectStrategy.BUSY_WAIT:
Reactor线程进行自旋轮询
,因为NIO 不反对自旋操作
,所以这里间接跳到SelectStrategy.SELECT
策略。
上面咱们来看下轮询策略
的计算逻辑calculateStrategy
:
final class DefaultSelectStrategy implements SelectStrategy { static final SelectStrategy INSTANCE = new DefaultSelectStrategy(); private DefaultSelectStrategy() { } @Override public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception { /** * Reactor线程要保障及时的执行异步工作 * 1:如果有异步工作期待执行,则马上执行selectNow()非阻塞轮询一次IO就绪事件 * 2:没有异步工作,则跳到switch select分支 * */ return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT; }}
- 在
Reactor线程
的轮询工作开始之前,须要首先判断下以后是否有异步工作
须要执行。判断根据就是查看Reactor
中的异步工作队列taskQueue
和用于统计信息工作用的尾部队列tailTask
是否有异步工作
。
@Override protected boolean hasTasks() { return super.hasTasks() || !tailTasks.isEmpty(); } protected boolean hasTasks() { assert inEventLoop(); return !taskQueue.isEmpty(); }
- 如果
Reactor
中有异步工作
须要执行,那么Reactor线程
须要立刻执行,不能阻塞在Selector
上。在返回前须要再顺带调用selectNow()
非阻塞查看一下以后是否有IO就绪事件
产生。如果有,那么正好能够和异步工作
一起被解决,如果没有,则及时地解决异步工作
。
这里Netty要表白的语义是:首先Reactor线程须要优先保障IO就绪事件
的解决,而后在保障异步工作
的及时执行。如果以后没有IO就绪事件然而有异步工作须要执行时,Reactor线程就要去及时执行异步工作而不是持续阻塞在Selector上期待IO就绪事件。
private final IntSupplier selectNowSupplier = new IntSupplier() { @Override public int get() throws Exception { return selectNow(); } }; int selectNow() throws IOException { //非阻塞 return selector.selectNow(); }
- 如果以后
Reactor线程
没有异步工作须要执行,那么calculateStrategy
办法间接返回SelectStrategy.SELECT
也就是SelectStrategy接口
中定义的常量-1
。当calculateStrategy
办法通过selectNow()
返回非零
数值时,示意此时有IO就绪
的Channel
,返回的数值示意有多少个IO就绪
的Channel
。
@Override protected void run() { //记录轮询次数 用于解决JDK epoll的空轮训bug int selectCnt = 0; for (;;) { try { //轮询后果 int strategy; try { //依据轮询策略获取轮询后果 这里的hasTasks()次要查看的是一般队列和尾部队列中是否有异步工作期待执行 strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // NIO不反对自旋(BUSY_WAIT) case SelectStrategy.SELECT: 外围逻辑是有工作须要执行,则Reactor线程立马执行异步工作,如果没有异步工作执行,则进行轮询IO事件 default: } } catch (IOException e) { ................省略............... } ................解决IO就绪事件以及执行异步工作...............}
从默认的轮询策略咱们能够看出selectStrategy.calculateStrategy
只会返回三种状况:
返回 -1:
switch逻辑分支进入SelectStrategy.SELECT分支
,示意此时Reactor
中没有异步工作
须要执行,Reactor线程
能够安心的阻塞在Selector
上期待IO就绪事件
产生。返回 0:
switch逻辑分支进入default分支
,示意此时Reactor
中没有IO就绪事件
然而有异步工作
须要执行,流程通过default分支
间接进入了解决异步工作
的逻辑局部。返回 > 0:
switch逻辑分支进入default分支
,示意此时Reactor
中既有IO就绪事件
产生也有异步工作
须要执行,流程通过default分支
间接进入了解决IO就绪事件
和执行异步工作
逻辑局部。
当初Reactor
的流程解决逻辑走向咱们分明了,那么接下来咱们把重点放在SelectStrategy.SELECT分支中的轮询逻辑上。这块是Reactor监听IO就绪事件的外围。
1.2 轮询逻辑
case SelectStrategy.SELECT: //以后没有异步工作执行,Reactor线程能够释怀的阻塞期待IO就绪事件 //从定时工作队列中取出行将快要执行的定时工作deadline long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { // -1代表以后定时工作队列中没有定时工作 curDeadlineNanos = NONE; // nothing on the calendar } //最早执行定时工作的deadline作为 select的阻塞工夫,意思是到了定时工作的执行工夫 //不论有无IO就绪事件,必须唤醒selector,从而使reactor线程执行定时工作 nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { //再次查看一般工作队列中是否有异步工作 //没有的话开始select阻塞轮询IO就绪事件 strategy = select(curDeadlineNanos); } } finally { // 执行到这里阐明Reactor曾经从Selector上被唤醒了 // 设置Reactor的状态为昏迷状态AWAKE // lazySet优化不必要的volatile操作,不应用内存屏障,不保障写操作的可见性(单线程不须要保障) nextWakeupNanos.lazySet(AWAKE); }
流程走到这里,阐明当初Reactor
上没有任何事件可做,能够安心的阻塞
在Selector
上期待IO就绪事件
到来。
那么Reactor线程
到底应该在Selector
上阻塞多久呢??
在答复这个问题之前,咱们在回顾下《聊聊Netty那些事儿之Reactor在Netty中的实现(创立篇)》一文中在讲述Reactor的创立
时提到,Reactor线程
除了要轮询Channel
上的IO就绪事件
,以及解决IO就绪事件
外,还有一个工作就是负责执行Netty框架中的异步工作
。
而Netty框架中的异步工作
分为三类:
- 寄存在一般工作队列
taskQueue
中的一般异步工作。 - 寄存在尾部队列
tailTasks
中的用于执行统计工作等收尾动作的尾部工作。 - 还有一种就是这里行将提到的
定时工作
。寄存在Reactor
中的定时工作队列scheduledTaskQueue
中。
从ReactorNioEventLoop类
中的继承构造咱们也能够看出,Reactor
具备执行定时工作的能力。
既然Reactor
须要执行定时工作,那么它就不能始终阻塞
在Selector
上有限期待IO就绪事件
。
那么咱们回到本大节一开始提到的问题上,为了保障Reactor
可能及时地执行定时工作
,Reactor线程
须要在行将要执行的的第一个定时工作deadline
达到之前被唤醒。
所以在Reactor
线程开始轮询IO就绪事件
之前,咱们须要首先计算出来Reactor线程
在Selector
上的阻塞超时工夫。
1.2.1 Reactor的轮询超时工夫
首先咱们须要从Reactor
的定时工作队列scheduledTaskQueue
中取出行将快要执行的定时工作deadline
。将这个deadline
作为Reactor线程
在Selector
上轮询的超时工夫。这样能够保障在定时工作行将要执行时,Reactor当初能够及时的从Selector上被唤醒。
private static final long AWAKE = -1L; private static final long NONE = Long.MAX_VALUE; // nextWakeupNanos is: // AWAKE when EL is awake // NONE when EL is waiting with no wakeup scheduled // other value T when EL is waiting with wakeup scheduled at time T private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE); long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { // -1代表以后定时工作队列中没有定时工作 curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos);
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue; protected final long nextScheduledTaskDeadlineNanos() { ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); return scheduledTask != null ? scheduledTask.deadlineNanos() : -1; } final ScheduledFutureTask<?> peekScheduledTask() { Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null; }}
nextScheduledTaskDeadlineNanos
办法会返回以后Reactor
定时工作队列中最近的一个定时工作deadline
工夫点,如果定时工作队列中没有定时工作,则返回-1
。
NioEventLoop
中nextWakeupNanos
变量用来寄存Reactor从Selector
上被唤醒的工夫点,设置为最近须要被执行定时工作的deadline
,如果以后并没有定时工作须要执行,那么就设置为Long.MAX_VALUE
始终阻塞,直到有IO就绪事件
达到或者有异步工作
须要执行。
1.2.2 Reactor开始轮询IO就绪事件
if (!hasTasks()) { //再次查看一般工作队列中是否有异步工作, 没有的话 开始select阻塞轮询IO就绪事件 strategy = select(curDeadlineNanos); }
在Reactor线程
开始阻塞
轮询IO就绪事件
之前还须要再次检查一下是否有异步工作
须要执行。
如果此时凑巧有异步工作
提交,就须要进行IO就绪事件
的轮询,转去执行异步工作
。如果没有异步工作
,则正式开始轮询IO就绪事件
。
private int select(long deadlineNanos) throws IOException { if (deadlineNanos == NONE) { //无定时工作,无一般工作执行时,开始轮询IO就绪事件,没有就始终阻塞 直到唤醒条件成立 return selector.select(); } long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L; return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis); }
如果deadlineNanos == NONE
,通过上大节的介绍,咱们晓得NONE
示意以后Reactor
中并没有定时工作,所以能够安心的阻塞
在Selector
上期待IO就绪事件
到来。
selector.select()
调用是一个阻塞调用,如果没有IO就绪事件
,Reactor线程
就会始终阻塞在这里直到IO就绪事件
到来。这里占时不思考前边提到的JDK NIO Epoll的空轮询BUG
.
读到这里那么问题来了,此时Reactor线程
正阻塞在selector.select()
调用上期待IO就绪事件
的到来,如果此时正好有异步工作
被提交到Reactor
中须要执行,并且此时无任何IO就绪事件
,而Reactor线程
因为没有IO就绪事件
到来,会持续在这里阻塞,那么如何去执行异步工作
呢??
解铃还须系铃人,既然异步工作
在被提交后心愿立马失去执行,那么就在提交异步工作
的时候去唤醒Reactor线程
。
//addTaskWakesUp = true 示意 当且仅当只有调用addTask办法时 才会唤醒Reactor线程 //addTaskWakesUp = false 示意 并不是只有addTask办法能力唤醒Reactor 还有其余办法能够唤醒Reactor 默认设置false private final boolean addTaskWakesUp; private void execute(Runnable task, boolean immediate) { boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { //如果以后线程不是Reactor线程,则启动Reactor线程 //这里能够看出Reactor线程的启动是通过 向NioEventLoop增加异步工作时启动的 startThread(); .....................省略................... } if (!addTaskWakesUp && immediate) { //io.netty.channel.nio.NioEventLoop.wakeup wakeup(inEventLoop); } }
对于execute办法
我想大家肯定不会生疏,在上篇文章《具体图解Netty Reactor启动全流程》中咱们在介绍Reactor线程的启动
时介绍过该办法。
在启动过程中波及到的重要操作Register操作
,Bind操作
都须要封装成异步工作
通过该办法提交到Reactor
中执行。
这里咱们将重点放在execute办法
后半段wakeup
逻辑局部。
咱们先介绍下和wakeup
逻辑相干的两个参数boolean immediate
和boolean addTaskWakesUp
。
immediate:
示意提交的task
是否须要被立刻执行。Netty中只有你提交的工作类型不是LazyRunnable
类型的工作,都是须要立刻执行的。immediate = true
addTaskWakesUp :
true
示意当且仅当只有
调用addTask
办法时才会唤醒Reactor线程
。调用别的办法并不会唤醒Reactor线程
。
在初始化NioEventLoop
时会设置为false
,示意并不是只有
addTask办法能力唤醒Reactor线程
还有其余办法能够唤醒Reactor线程
,比方这里的execute办法
就会唤醒Reactor线程
。
针对execute办法中的这个唤醒条件!addTaskWakesUp && immediate
,netty这里要表白的语义是:当immediate参数为true的时候示意该异步工作须要立刻执行,addTaskWakesUp 默认设置为false 示意不仅只有addTask办法能够唤醒Reactor,还有其余办法比方这里的execute办法也能够唤醒。然而当设置为true时,语义就变为只有addTask才能够唤醒Reactor,即便execute办法里的immediate = true也不能唤醒Reactor,因为执行的是execute办法而不是addTask办法。
private static final long AWAKE = -1L; private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE); protected void wakeup(boolean inEventLoop) { if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) { //将Reactor线程从Selector上唤醒 selector.wakeup(); } }
当nextWakeupNanos = AWAKE
时示意以后Reactor正处于昏迷状态,既然是昏迷状态也就没有必要去执行 selector.wakeup()
反复唤醒Reactor了,同时也能省去这一次的零碎调用开销。
在《1.2大节 轮询逻辑》开始介绍的源码实现框架里Reactor被唤醒之后执行代码会进入finally{...}
语句块中,在那里会将nextWakeupNanos
设置为AWAKE
。
try { if (!hasTasks()) { strategy = select(curDeadlineNanos); } } finally { // 执行到这里阐明Reactor曾经从Selector上被唤醒了 // 设置Reactor的状态为昏迷状态AWAKE // lazySet优化不必要的volatile操作,不应用内存屏障,不保障写操作的可见性(单线程不须要保障) nextWakeupNanos.lazySet(AWAKE); }
这里Netty用了一个AtomicLong类型
的变量nextWakeupNanos
,既能示意以后Reactor线程
的状态,又能示意Reactor线程
的阻塞超时工夫。咱们在日常开发中也能够学习下这种技巧。
咱们持续回到Reactor线程
轮询IO就绪事件
的主线上。
private int select(long deadlineNanos) throws IOException { if (deadlineNanos == NONE) { //无定时工作,无一般工作执行时,开始轮询IO就绪事件,没有就始终阻塞 直到唤醒条件成立 return selector.select(); } long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L; return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis); }
当deadlineNanos
不为NONE
,示意此时Reactor
有定时工作
须要执行,Reactor线程
须要阻塞在Selector
上期待IO就绪事件
直到最近的一个定时工作执行工夫点deadline
达到。
这里的deadlineNanos
示意的就是Reactor
中最近的一个定时工作执行工夫点deadline
,单位是纳秒
。指的是一个相对工夫
。
而咱们须要计算的是Reactor线程
阻塞在Selector
的超时工夫timeoutMillis
,单位是毫秒
,指的是一个绝对工夫
。
所以在Reactor线程
开始阻塞在Selector
上之前,咱们须要将这个单位为纳秒
的相对工夫deadlineNanos
转化为单位为毫秒
的绝对工夫timeoutMillis
。
private int select(long deadlineNanos) throws IOException { if (deadlineNanos == NONE) { //无定时工作,无一般工作执行时,开始轮询IO就绪事件,没有就始终阻塞 直到唤醒条件成立 return selector.select(); } long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L; return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis); }
这里大家可能会好奇,通过deadlineToDelayNanos办法
计算timeoutMillis
的时候,为什么要给deadlineNanos
在加上0.995毫秒
呢??
大家设想一下这样的场景,当最近的一个定时工作的deadline
行将在5微秒
内达到,那么这时将纳秒转换成毫秒计算出的timeoutMillis
会是0
。
而在Netty中timeoutMillis = 0
要表白的语义是:定时工作执行工夫曾经达到deadline
工夫点,须要被执行。
而现实情况是定时工作还有5微秒
才可能达到deadline
,所以对于这种状况,须要在deadlineNanos
在加上0.995毫秒
凑成1毫秒
不能让其为0。
所以从这里咱们能够看出,Reactor
在有定时工作的状况下,至多要阻塞1毫秒
。
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { protected static long deadlineToDelayNanos(long deadlineNanos) { return ScheduledFutureTask.deadlineToDelayNanos(deadlineNanos); }}
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode { static long deadlineToDelayNanos(long deadlineNanos) { return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - nanoTime()); } //启动工夫点 private static final long START_TIME = System.nanoTime(); static long nanoTime() { return System.nanoTime() - START_TIME; } static long deadlineNanos(long delay) { //计算定时工作执行deadline 去除启动工夫 long deadlineNanos = nanoTime() + delay; // Guard against overflow return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos; }}
这里须要留神一下,在创立定时工作时会通过deadlineNanos办法
计算定时工作的执行deadline
,deadline
的计算逻辑是以后工夫点
+工作延时delay
-系统启动工夫
。这里须要扣除系统启动的工夫。
所以这里在通过deadline
计算延时delay
(也就是timeout)的时候须要在加上系统启动的工夫
: deadlineNanos - nanoTime()
当通过deadlineToDelayNanos
计算出的timeoutMillis <= 0
时,示意Reactor
目前有邻近的定时工作
须要执行,这时候就须要立马返回,不能阻塞在Selector
上影响定时工作
的执行。当然在返回执行定时工作
前,须要在棘手通过selector.selectNow()
非阻塞轮询一下Channel
上是否有IO就绪事件
达到,避免耽搁IO事件
的解决。真是操碎了心~~
当timeoutMillis > 0
时,Reactor线程
就能够安心的阻塞在Selector
上期待IO事件
的到来,直到timeoutMillis
超时工夫达到。
timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis)
当注册在Reactor
上的Channel
中有IO事件
到来时,Reactor线程
就会从selector.select(timeoutMillis)
调用中唤醒,立刻去解决IO就绪事件
。
这里假如一种极其状况,如果最近的一个定时工作的deadline是在将来很远的一个工夫点,这样就会使timeoutMillis的工夫十分十分久,那么Reactor岂不是会始终阻塞在Selector上造成 Netty 无奈工作?
笔者感觉大家当初心里应该曾经有了答案,咱们在《1.2.2 Reactor开始轮询IO就绪事件》大节一开始介绍过,当Reactor正在Selector上阻塞时,如果此时用户线程向Reactor提交了异步工作,Reactor线程会通过execute办法被唤醒。
流程到这里,Reactor中最重要也是最外围的逻辑:轮询Channel
上的IO就绪事件
的解决流程咱们就解说完了。
当Reactor轮询到有IO沉闷事件或者有异步工作须要执行时,就会从Selector上被唤醒,上面就到了该介绍Reactor被唤醒之后是如何解决IO就绪事件
以及如何执行异步工作
的时候了。
Netty毕竟是一个网络框架,所以它会优先去解决Channel
上的IO事件
,基于这个事实,所以Netty不会容忍异步工作
被无限度的执行从而影响IO吞吐
。
Netty通过ioRatio变量
来调配Reactor线程
在解决IO事件
和执行异步工作
之间的CPU工夫
分配比例。
上面咱们就来看下这个执行工夫比例的调配逻辑是什么样的~~~
2. Reactor解决IO与解决异步工作的工夫比例调配
无论什么时候,当有IO就绪事件
到来时,Reactor
都须要保障IO事件
被及时残缺的解决完,而ioRatio
次要限度的是执行异步工作
所需用时,避免Reactor线程
解决异步工作
工夫过长而导致 I/O 事件
得不到及时地解决。
//调整Reactor线程执行IO事件和执行异步工作的CPU工夫比例 默认50,示意执行IO事件和异步工作的工夫比例是一比一 final int ioRatio = this.ioRatio; boolean ranTasks; if (ioRatio == 100) { //先一股脑执行IO事件,在一股脑执行异步工作(无工夫限度) try { if (strategy > 0) { //如果有IO就绪事件 则解决IO就绪事件 processSelectedKeys(); } } finally { // Ensure we always run tasks. //解决所有异步工作 ranTasks = runAllTasks(); } } else if (strategy > 0) {//先执行IO事件 用时ioTime 执行异步工作只能用时ioTime * (100 - ioRatio) / ioRatio final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; // 限定在超时工夫内 解决无限的异步工作 避免Reactor线程解决异步工作工夫过长而导致 I/O 事件阻塞 ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { //没有IO就绪事件处理,则只执行异步工作 最多执行64个 避免Reactor线程解决异步工作工夫过长而导致 I/O 事件阻塞 ranTasks = runAllTasks(0); // This will run the minimum number of tasks }
- 当
ioRatio = 100
时,示意无需思考执行工夫的限度,当有IO就绪事件
时(strategy > 0
)Reactor线程
须要优先解决IO就绪事件
,解决完IO事件
后,执行所有的异步工作
包含:一般工作,尾部工作,定时工作。无工夫限度。
strategy
的数值示意IO就绪
的Channel
个数。它是前边介绍的io.netty.channel.nio.NioEventLoop#select
办法的返回值。
- 当
ioRatio
设置的值不为100
时,默认为50
。须要先统计出执行IO事件
的用时ioTime
,依据ioTime * (100 - ioRatio) / ioRatio
计算出,前面执行异步工作
的限度工夫。也就是说Reactor线程
须要在这个限定的工夫内,执行无限的异步工作,避免Reactor线程
因为解决异步工作
工夫过长而导致I/O 事件
得不到及时地解决。
默认状况下,执行IO事件
用时和执行异步工作
用时比例设置的是一比一。ioRatio
设置的越高,则Reactor线程
执行异步工作的工夫占比越小
。
要想得到Reactor线程
执行异步工作
所需的工夫限度,必须晓得执行IO事件
的用时ioTime
而后在依据ioRatio
计算出执行异步工作
的工夫限度。
那如果此时并没有IO就绪事件
须要Reactor线程
解决的话,这种状况下咱们无奈失去ioTime
,那怎么失去执行异步工作
的限度工夫呢??
在这种非凡状况下,Netty只容许Reactor线程
最多执行64
个异步工作,而后就完结执行。转去持续轮训IO就绪事件
。外围目标还是避免Reactor线程
因为解决异步工作
工夫过长而导致I/O 事件
得不到及时地解决。
默认状况下,当Reactor
有异步工作
须要解决然而没有IO就绪事件
时,Netty只会容许Reactor线程
执行最多64
个异步工作。
当初咱们对Reactor
解决IO事件
和异步工作
的整体框架曾经理解了,上面咱们就来别离介绍下Reactor线程
在解决IO事件
和异步工作
的具体逻辑是什么样的?
3. Reactor线程解决IO就绪事件
//该字段为持有selector对象selectedKeys的援用,当IO事件就绪时,间接从这里获取 private SelectedSelectionKeySet selectedKeys; private void processSelectedKeys() { //是否采纳netty优化后的selectedKey汇合类型 是由变量DISABLE_KEY_SET_OPTIMIZATION决定的 默认为false if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
看到这段代码大家眼生吗??
不知大家还记不记得咱们在《聊聊Netty那些事儿之Reactor在Netty中的实现(创立篇)》一文中介绍Reactor NioEventLoop类
在创立Selector
的过程中提到,出于对JDK NIO Selector
中selectedKeys 汇合
的插入
和遍历
操作性能的思考Netty将本人用数组实现的SelectedSelectionKeySet 汇合
替换掉了JDK NIO Selector
中selectedKeys
的HashSet
实现。
public abstract class SelectorImpl extends AbstractSelector { // The set of keys with data ready for an operation // //IO就绪的SelectionKey(外面包裹着channel) protected Set<SelectionKey> selectedKeys; // The set of keys registered with this Selector //注册在该Selector上的所有SelectionKey(外面包裹着channel) protected HashSet<SelectionKey> keys; ...............省略...................}
Netty中通过优化开关DISABLE_KEY_SET_OPTIMIZATION
管制是否对JDK NIO Selector
进行优化。默认是须要优化。
在优化开关开启的状况下,Netty会将创立的SelectedSelectionKeySet 汇合
保留在NioEventLoop
的private SelectedSelectionKeySet selectedKeys
字段中,不便Reactor线程
间接从这里获取IO就绪
的SelectionKey
。
在优化开关敞开的状况下,Netty会间接采纳JDK NIO Selector
的默认实现。此时NioEventLoop
的selectedKeys
字段就会为null
。
遗记这段的同学能够在回顾下《聊聊Netty那些事儿之Reactor在Netty中的实现(创立篇)》一文中对于Reactor
的创立过程。
通过对前边内容的回顾,咱们看到了在Reactor
解决IO就绪事件
的逻辑也分为两个局部,一个是通过Netty优化的,一个是采纳JDK 原生
的。
咱们先来看采纳JDK 原生
的Selector
的解决形式,了解了这种形式,在看Netty优化的形式会更加容易。
3.1 processSelectedKeysPlain
咱们在《聊聊Netty那些事儿之Reactor在Netty中的实现(创立篇)》一文中介绍JDK NIO Selector
的工作过程时讲过,当注册在Selector
上的Channel
产生IO就绪事件
时,Selector
会将IO就绪
的SelectionKey
插入到Set<SelectionKey> selectedKeys
汇合中。
这时Reactor线程
会从java.nio.channels.Selector#select(long)
调用中返回。随后调用java.nio.channels.Selector#selectedKeys
获取IO就绪
的SelectionKey
汇合。
所以Reactor线程
在调用processSelectedKeysPlain办法
解决IO就绪事件
之前须要调用selector.selectedKeys()
去获取所有IO就绪
的SelectionKeys
。
processSelectedKeysPlain(selector.selectedKeys())
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { if (selectedKeys.isEmpty()) { return; } Iterator<SelectionKey> i = selectedKeys.iterator(); for (;;) { final SelectionKey k = i.next(); final Object a = k.attachment(); //留神每次迭代开端的keyIterator.remove()调用。Selector不会本人从已选择键集中移除SelectionKey实例。 //必须在解决完通道时本人移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中。 i.remove(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (!i.hasNext()) { break; } //目标是再次进入for循环 移除生效的selectKey(socketChannel可能从selector上移除) if (needsToSelectAgain) { selectAgain(); selectedKeys = selector.selectedKeys(); // Create the iterator again to avoid ConcurrentModificationException if (selectedKeys.isEmpty()) { break; } else { i = selectedKeys.iterator(); } } } }
3.1.1 获取IO就绪的Channel
Set<SelectionKey> selectedKeys
汇合外面装的全副是IO就绪
的SelectionKey
,留神,此时Set<SelectionKey> selectedKeys
的实现类型为HashSet类型
。因为咱们这里首先介绍的是JDK NIO 原生实现。
通过获取HashSet
的迭代器,开始一一解决IO就绪
的Channel
。
Iterator<SelectionKey> i = selectedKeys.iterator();final SelectionKey k = i.next();final Object a = k.attachment();
大家还记得这个SelectionKey
中的attachment属性
里寄存的是什么吗??
在上篇文章《具体图解Netty Reactor启动全流程》中咱们在讲NioServerSocketChannel
向Main Reactor
注册的时候,通过this指针将本人作为SelectionKey
的attachment属性
注册到Selector
中。这一步实现了Netty自定义Channel
和JDK NIO Channel
的绑定。
public abstract class AbstractNioChannel extends AbstractChannel { //channel注册到Selector后取得的SelectKey volatile SelectionKey selectionKey; @Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { ...............省略.................... } } }}
而咱们也提到SelectionKey
就相当于是Channel
在Selector
中的一种示意,当Channel
上有IO就绪事件
时,Selector
会将Channel
对应的SelectionKey
返回给Reactor线程
,咱们能够通过返回的这个SelectionKey
里的attachment属性
获取到对应的Netty自定义Channel
。
对于客户端连贯事件(OP_ACCEPT
)沉闷时,这里的Channel类型
为NioServerSocketChannel
。
对于客户端读写事件(Read
,Write
)沉闷时,这里的Channel类型
为NioSocketChannel
。
当咱们通过k.attachment()
获取到Netty自定义的Channel
时,就须要把这个Channel
对应的SelectionKey
从Selector
的就绪汇合Set<SelectionKey> selectedKeys
中删除。因为Selector本人不会被动删除曾经解决完的SelectionKey,须要调用者本人被动删除,这样当这个Channel
再次IO就绪时
,Selector
会再次将这个Channel
对应的SelectionKey
放入就绪汇合Set<SelectionKey> selectedKeys
中。
i.remove();
3.1.2 解决Channel上的IO事件
if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); }
从这里咱们能够看出Netty向SelectionKey
中的attachment属性
附加的对象分为两种:
- 一种是咱们相熟的
Channel
,无论是服务端应用的NioServerSocketChannel
还是客户端应用的NioSocketChannel
都属于AbstractNioChannel
。Channel
上的IO事件
是由Netty框架负责解决,也是本大节咱们要重点介绍的 - 另一种就是
NioTask
,这种类型是Netty提供给用户能够自定义一些当Channel
上产生IO就绪事件
时的自定义解决。
public interface NioTask<C extends SelectableChannel> { /** * Invoked when the {@link SelectableChannel} has been selected by the {@link Selector}. */ void channelReady(C ch, SelectionKey key) throws Exception; /** * Invoked when the {@link SelectionKey} of the specified {@link SelectableChannel} has been cancelled and thus * this {@link NioTask} will not be notified anymore. * * @param cause the cause of the unregistration. {@code null} if a user called {@link SelectionKey#cancel()} or * the event loop has been shut down. */ void channelUnregistered(C ch, Throwable cause) throws Exception;}
NioTask
和Channel
其实实质上是一样的都是负责解决Channel
上的IO就绪事件
,只不过一个是用户自定义解决
,一个是Netty框架解决。这里咱们重点关注Channel
的IO解决逻辑
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { //获取Channel的底层操作类Unsafe final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { ......如果SelectionKey曾经生效则敞开对应的Channel...... } try { //获取IO就绪事件 int readyOps = k.readyOps(); //解决Connect事件 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); //移除对Connect事件的监听,否则Selector会始终告诉 ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); //触发channelActive事件处理Connect事件 unsafe.finishConnect(); } //解决Write事件 if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } //解决Read事件或者Accept事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
- 首先咱们须要获取
IO就绪Channel
底层的操作类Unsafe
,用于对具体IO就绪事件
的解决。
这里能够看出,Netty对IO就绪事件
的解决全副封装在Unsafe类
中。比方:对OP_ACCEPT事件
的具体解决逻辑是封装在NioServerSocketChannel
中的UnSafe类
中。对OP_READ或者OP_WRITE事件
的解决是封装在NioSocketChannel
中的Unsafe类
中。
- 从
Selectionkey
中获取具体IO就绪事件 readyOps
。
SelectonKey
中对于IO事件
的汇合有两个。一个是interestOps
,用于记录Channel
感兴趣的IO事件
,在Channel
向Selector
注册结束后,通过pipeline
中的HeadContext
节点的ChannelActive事件回调
中增加。上面这段代码就是在ChannelActive事件回调
中Channel在向Selector注册本人感兴趣的IO事件。
public abstract class AbstractNioChannel extends AbstractChannel { @Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); /** * 1:ServerSocketChannel 初始化时 readInterestOp设置的是OP_ACCEPT事件 * 2:SocketChannel 初始化时 readInterestOp设置的是OP_READ事件 * */ if ((interestOps & readInterestOp) == 0) { //注册监听OP_ACCEPT或者OP_READ事件 selectionKey.interestOps(interestOps | readInterestOp); } } }
另一个就是这里的readyOps
,用于记录在Channel
感兴趣的IO事件
中具体哪些IO事件
就绪了。
Netty中将各种事件的汇合用一个int型
变量来保留。
- 用
&
操作判断,某个事件是否在事件汇合中:(readyOps & SelectionKey.OP_CONNECT) != 0
,这里就是判断Channel是否对Connect事件感兴趣。 - 用
|
操作向事件汇合中增加事件:interestOps | readInterestOp
- 从事件汇合中删除某个事件,是通过先将要删除事件取反
~
,而后在和事件汇合做&
操作:ops &= ~SelectionKey.OP_CONNECT
Netty这种对空间的极致利用思维,很值得咱们平时在日常开发中学习~~
当初咱们曾经晓得哪些Channel
当初处于IO就绪状态
,并且晓得了具体哪些类型的IO事件
曾经就绪。
上面就该针对Channel
上的不同IO就绪事件
做出相应的解决了。
3.1.2.1 解决Connect事件
Netty客户端向服务端发动连贯,并向客户端的Reactor
注册Connect事件
,当连贯建设胜利后,客户端的NioSocketChannel
就会产生Connect就绪事件
,通过后面内容咱们讲的Reactor的运行框架
,最终流程会走到这里。
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); //触发channelActive事件 unsafe.finishConnect(); }
如果IO就绪
的事件是Connect事件
,那么就调用对应客户端NioSocketChannel
中的Unsafe操作类
中的finishConnect办法
解决Connect事件
。这时会在Netty客户端NioSocketChannel
中的pipeline
中流传ChannelActive事件
。
最初须要将OP_CONNECT事件
从客户端NioSocketChannel
所关怀的事件汇合interestOps
中删除。否则Selector
会始终告诉Connect事件就绪
。
3.1.2.2 解决Write事件
对于Reactor线程
解决Netty中的Write事件
的流程,笔者后续会专门用一篇文章来为大家介绍。本文咱们重点关注Reactor线程
的整体运行框架。
if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); }
这里大家只须要记住,OP_WRITE事件的注册是由用户来实现的,当Socket发送缓冲区已满无奈持续写入数据时,用户会向Reactor注册OP_WRITE事件,等到Socket发送缓冲区变得可写时,Reactor会收到OP_WRITE事件沉闷告诉,随后在这里调用客户端NioSocketChannel
中的forceFlush办法
将残余数据发送进来。
3.1.2.3 解决Read事件或者Accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); }
这里能够看出Netty中解决Read事件
和Accept事件
都是由对应Channel
中的Unsafe操作类
中的read办法
解决。
服务端NioServerSocketChannel
中的Read办法
解决的是Accept事件
,客户端NioSocketChannel
中的Read办法
解决的是Read事件
。
这里大家只需记住各个IO事件
在对应Channel
中的解决入口,后续文章咱们会详细分析这些入口函数。
3.1.3 从Selector中移除生效的SelectionKey
//用于及时从selectedKeys中革除生效的selectKey 比方 socketChannel从selector上被用户移除 private boolean needsToSelectAgain; //目标是再次进入for循环 移除生效的selectKey(socketChannel可能被用户从selector上移除) if (needsToSelectAgain) { selectAgain(); selectedKeys = selector.selectedKeys(); // Create the iterator again to avoid ConcurrentModificationException if (selectedKeys.isEmpty()) { break; } else { i = selectedKeys.iterator(); } }
在前边介绍Reactor运行框架
的时候,咱们看到在每次Reactor线程
轮询完结,筹备解决IO就绪事件
以及异步工作
的时候,都会将needsToSelectAgain
设置为false
。
那么这个needsToSelectAgain
到底是干嘛的?以及为什么咱们须要去“Select Again”
呢?
首先咱们来看下在什么状况下会将needsToSelectAgain
这个变量设置为true
,通过这个设置的过程,咱们是否可能从中找到一些线索?
咱们晓得Channel
能够将本人注册到Selector
上,那么当然也能够将本人从Selector
上勾销移除。
在上篇文章中咱们也花了大量的篇幅解说了这个注册的过程,当初咱们来看下Channel
的勾销注册。
public abstract class AbstractNioChannel extends AbstractChannel { //channel注册到Selector后取得的SelectKey volatile SelectionKey selectionKey; @Override protected void doDeregister() throws Exception { eventLoop().cancel(selectionKey()); } protected SelectionKey selectionKey() { assert selectionKey != null; return selectionKey; }}
Channel
勾销注册的过程很简略,间接调用NioChannel
的doDeregister
办法,Channel
绑定的Reactor
会将其从Selector
中勾销并进行监听Channel
上的IO事件
。
public final class NioEventLoop extends SingleThreadEventLoop { //记录Selector上移除socketChannel的个数 达到256个 则须要将有效的selectKey从SelectedKeys汇合中革除掉 private int cancelledKeys; private static final int CLEANUP_INTERVAL = 256; /** * 将socketChannel从selector中移除 勾销监听IO事件 * */ void cancel(SelectionKey key) { key.cancel(); cancelledKeys ++; // 当从selector中移除的socketChannel数量达到256个,设置needsToSelectAgain为true // 在io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain 中从新做一次轮询,将生效的selectKey移除, // 以保障selectKeySet的有效性 if (cancelledKeys >= CLEANUP_INTERVAL) { cancelledKeys = 0; needsToSelectAgain = true; } }}
- 调用
JDK NIO SelectionKey
的APIcancel办法
,将Channel
从Selector
中勾销掉。SelectionKey#cancel办法
调用结束后,此时调用SelectionKey#isValid
将会返回false
。SelectionKey#cancel办法
调用后,Selector
会将要勾销的这个SelectionKey
退出到Selector
中的cancelledKeys汇合
中。
public abstract class AbstractSelector extends Selector { private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>(); void cancel(SelectionKey k) { synchronized (cancelledKeys) { cancelledKeys.add(k); } }}
- 当
Channel
对应的SelectionKey
勾销结束后,Channel
勾销计数器cancelledKeys
会加1,当cancelledKeys = 256
时,将needsToSelectAgain
设置为true
。 - 随后在
Selector
的下一次
轮询过程中,会将cancelledKeys汇合
中的SelectionKey
从Selector
中所有的KeySet
中移除。这里的KeySet
包含Selector
用于寄存就绪SelectionKey
的selectedKeys汇合
,以及用于寄存所有注册的Channel
对应的SelectionKey
的keys汇合
。
public abstract class SelectorImpl extends AbstractSelector { protected Set<SelectionKey> selectedKeys = new HashSet(); protected HashSet<SelectionKey> keys = new HashSet(); .....................省略...............}
咱们看到Reactor线程
中对needsToSelectAgain
的判断是在processSelectedKeysPlain办法
解决IO就绪
的SelectionKey
的循环体中进行判断的。
之所以这里特地提到needsToSelectAgain
判断的地位,是要让大家留神到此时Reactor
正在解决本次
轮询的IO就绪事件
。
而前边也说了,当调用SelectionKey#cancel办法
后,须要等到下次轮询
的过程中Selector
才会将这些勾销的SelectionKey
从Selector
中的所有KeySet汇合
中移除,当然这里也包含就绪汇合selectedKeys
。
当在本次
轮询期间,如果大量的Channel
从Selector
中勾销,Selector中的就绪汇合selectedKeys
中仍然会保留这些Channel
对应SelectionKey
直到下次轮询
。那么当然会影响本次轮询后果selectedKeys
的有效性。
所以为了保障Selector
中所有KeySet
的有效性,须要在Channel
勾销个数达到256
时,触发一次selectNow
,目标是革除有效的SelectionKey
。
private void selectAgain() { needsToSelectAgain = false; try { selector.selectNow(); } catch (Throwable t) { logger.warn("Failed to update SelectionKeys.", t); } }
到这里,咱们就对JDK 原生 Selector
的解决形式processSelectedKeysPlain办法
就介绍完了,其实 对IO就绪事件
的解决逻辑都是一样的,在咱们了解了processSelectedKeysPlain办法
后,processSelectedKeysOptimized办法
对IO就绪事件
的解决,咱们了解起来就十分轻松了。
3.2 processSelectedKeysOptimized
Netty默认会采纳优化过的Selector
对IO就绪事件
的解决。然而解决逻辑是大同小异的。上面咱们次要介绍一下这两个办法的不同之处。
private void processSelectedKeysOptimized() { // 在openSelector的时候将JDK中selector实现类中得selectedKeys和publicSelectKeys字段类型 // 由原来的HashSet类型替换为 Netty优化后的数组实现的SelectedSelectionKeySet类型 for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // 对应迭代器中得remove selector不会本人革除selectedKey selectedKeys.keys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { selectedKeys.reset(i + 1); selectAgain(); i = -1; } } }
JDK NIO 原生 Selector
寄存IO就绪的SelectionKey
的汇合为HashSet类型
的selectedKeys
。而Netty为了优化对selectedKeys 汇合
的遍历效率
采纳了本人实现的SelectedSelectionKeySet类型
,从而用对数组
的遍历代替用HashSet
的迭代器遍历。Selector
会在每次轮询到IO就绪事件
时,将IO就绪的Channel
对应的SelectionKey
插入到selectedKeys汇合
,然而Selector
只管向selectedKeys汇合
放入IO就绪的SelectionKey
,当SelectionKey
被处理完毕后,Selector
是不会本人被动将其从selectedKeys汇合
中移除的,典型的管杀不论埋
。所以须要Netty本人在遍历到IO就绪的 SelectionKey
后,将其删除。- 在
processSelectedKeysPlain
中是间接将其从迭代器中删除。 - 在
processSelectedKeysOptimized
中将其在数组中对应的地位置为Null
,不便垃圾回收。
- 在
- 在最初革除有效的
SelectionKey
时,在processSelectedKeysPlain
中因为采纳的是JDK NIO 原生的Selector
,所以只须要执行SelectAgain
就能够,Selector
会主动革除有效Key。
然而在processSelectedKeysOptimized
中因为是Netty本人实现的优化类型,所以须要Netty本人将SelectedSelectionKeySet
数组中的SelectionKey
全副革除,最初在执行SelectAgain
。
好了,到这里,咱们就将Reactor线程
如何解决IO就绪事件
的整个过程讲述完了,上面咱们就该到了介绍Reactor线程
如何解决Netty框架中的异步工作了。
4. Reactor线程解决异步工作
Netty对于解决异步工作
的办法有两个:
- 一个是无超时工夫限度的
runAllTasks()办法
。当ioRatio
设置为100
时,Reactor线程
会先一股脑的解决IO就绪事件
,而后在一股脑的执行异步工作
,并没有工夫的限度。 - 另一个是有超时工夫限度的
runAllTasks(long timeoutNanos)办法
。当ioRatio != 100
时,Reactor线程
执行异步工作
会有工夫限度,优先一股脑的解决完IO就绪事件
统计出执行IO工作
耗时ioTime
。依据公式ioTime * (100 - ioRatio) / ioRatio)
计算出Reactor线程
执行异步工作
的超时工夫。在超时工夫限定范畴内,执行无限的异步工作
。
上面咱们来别离看下这两个执行异步工作
的办法解决逻辑:
4.1 runAllTasks()
protected boolean runAllTasks() { assert inEventLoop(); boolean fetchedAll; boolean ranAtLeastOne = false; do { //将达到执行工夫的定时工作转存到一般工作队列taskQueue中,对立由Reactor线程从taskQueue中取出执行 fetchedAll = fetchFromScheduledTaskQueue(); if (runAllTasksFrom(taskQueue)) { ranAtLeastOne = true; } } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks. if (ranAtLeastOne) { lastExecutionTime = ScheduledFutureTask.nanoTime(); } //执行尾部队列工作 afterRunningAllTasks(); return ranAtLeastOne; }
Reactor线程
执行异步工作
的外围逻辑就是:
- 先将到期的
定时工作
一股脑的从定时工作队列scheduledTaskQueue
中取出并转存到一般工作队列taskQueue
中。 - 由
Reactor线程
对立从一般工作队列taskQueue
中取出工作执行。 - 在
Reactor线程
执行完定时工作
和一般工作
后,开始执行存储于尾部工作队列tailTasks
中的尾部工作
。
上面咱们来别离看下上述几个外围步骤的实现:
4.1.1 fetchFromScheduledTaskQueue
/** * 从定时工作队列中取出达到deadline执行工夫的定时工作 * 将定时工作 转存到 一般工作队列taskQueue中,对立由Reactor线程从taskQueue中取出执行 * * */ private boolean fetchFromScheduledTaskQueue() { if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) { return true; } long nanoTime = AbstractScheduledEventExecutor.nanoTime(); for (;;) { //从定时工作队列中取出达到执行deadline的定时工作 deadline <= nanoTime Runnable scheduledTask = pollScheduledTask(nanoTime); if (scheduledTask == null) { return true; } if (!taskQueue.offer(scheduledTask)) { // taskQueue没有空间包容 则在将定时工作从新塞进定时工作队列中期待下次执行 scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask); return false; } } }
- 获取以后要执行
异步工作
的工夫点nanoTime
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode { private static final long START_TIME = System.nanoTime(); static long nanoTime() { return System.nanoTime() - START_TIME; }}
- 从定时工作队列中找出
deadline <= nanoTime
的异步工作。也就是说找出所有到期的定时工作。
protected final Runnable pollScheduledTask(long nanoTime) { assert inEventLoop(); //从定时队列中取出要执行的定时工作 deadline <= nanoTime ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) { return null; } //合乎取出条件 则取出 scheduledTaskQueue.remove(); scheduledTask.setConsumed(); return scheduledTask; }
- 将
到期的定时工作
插入到一般工作队列taskQueue
中,如果taskQueue
曾经没有空间包容新的工作,则将定时工作
从新塞进定时工作队列
中期待下次拉取。
if (!taskQueue.offer(scheduledTask)) { scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask); return false; }
fetchFromScheduledTaskQueue办法
的返回值为true
时示意到期的定时工作曾经全副拉取出来并转存到一般工作队列中。
返回值为false
时示意到期的定时工作只拉取出来一部分,因为这时一般工作队列曾经满了,当执行完一般工作时,还须要在进行一次拉取。
当到期的定时工作
从定时工作队列中拉取结束或者当一般工作队列已满时,这时就会进行拉取,开始执行一般工作队列中的异步工作
。
4.1.2 runAllTasksFrom
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) { Runnable task = pollTaskFrom(taskQueue); if (task == null) { return false; } for (;;) { safeExecute(task); task = pollTaskFrom(taskQueue); if (task == null) { return true; } } }
- 首先
runAllTasksFrom 办法
的返回值示意是否执行了至多一个异步工作。前面会赋值给ranAtLeastOne变量
,这个返回值咱们后续会用到。 - 从一般工作队列中拉取
异步工作
。
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) { for (;;) { Runnable task = taskQueue.poll(); if (task != WAKEUP_TASK) { return task; } } }
Reactor线程
执行异步工作
。
protected static void safeExecute(Runnable task) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception. Task: {}", task, t); } }
4.1.3 afterRunningAllTasks
if (ranAtLeastOne) { lastExecutionTime = ScheduledFutureTask.nanoTime(); } //执行尾部队列工作 afterRunningAllTasks(); return ranAtLeastOne;
如果Reactor线程
执行了至多一个异步工作
,那么设置lastExecutionTime
,并将ranAtLeastOne标识
返回。这里的ranAtLeastOne标识
就是runAllTasksFrom办法
的返回值。
最初执行收尾工作,也就是执行尾部工作队列中的尾部工作。
@Override protected void afterRunningAllTasks() { runAllTasksFrom(tailTasks); }
4.2 runAllTasks(long timeoutNanos)
这里在解决异步工作
的外围逻辑还是和之前一样的,只不过就是多了对超时工夫
的管制。
protected boolean runAllTasks(long timeoutNanos) { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { //一般队列中没有工作时 执行队尾队列的工作 afterRunningAllTasks(); return false; } //异步工作执行超时deadline final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0; long runTasks = 0; long lastExecutionTime; for (;;) { safeExecute(task); runTasks ++; //每运行64个异步工作 检查一下 是否达到 执行deadline if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { //达到异步工作执行超时deadline,进行执行异步工作 break; } } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }
- 首先还是通过
fetchFromScheduledTaskQueue 办法
从Reactor
中的定时工作队列中拉取到期的定时工作
,转存到一般工作队列中。当一般工作队列已满或者到期定时工作
全副拉取结束时,进行拉取。 - 将
ScheduledFutureTask.nanoTime() + timeoutNanos
作为Reactor线程
执行异步工作的超时工夫点deadline
。 - 因为零碎调用
System.nanoTime()
须要肯定的零碎开销,所以每执行完64
个异步工作
的时候才会去检查一下执行工夫
是否达到了deadline
。如果达到了执行截止工夫deadline
则退出进行执行异步工作
。如果没有达到deadline
则持续从一般工作队列中取出工作循环执行上来。
从这个细节又能够看出Netty对性能的考量还是相当考究的
流程走到这里,咱们就对Reactor
的整个运行框架以及如何轮询IO就绪事件
,如何解决IO就绪事件
,如何执行异步工作
的具体实现逻辑就分析完了。
上面还有一个小小的尾巴,就是Netty是如何解决文章结尾提到的JDK NIO Epoll 的空轮询BUG
的,让咱们一起来看下吧~~~
5. 解决JDK Epoll空轮询BUG
前边提到,因为JDK NIO Epoll的空轮询BUG
存在,这样会导致Reactor线程
在没有任何事件可做的状况下被意外唤醒,导致CPU空转。
其实Netty也没有从根本上解决这个JDK BUG
,而是抉择奇妙的绕过这个BUG
。
上面咱们来看下Netty是如何做到的。
if (ranTasks || strategy > 0) { if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } selectCnt = 0; } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case) //既没有IO就绪事件,也没有异步工作,Reactor线程从Selector上被异样唤醒 触发JDK Epoll空轮训BUG //从新构建Selector,selectCnt归零 selectCnt = 0; }
在Reactor线程
解决完IO就绪事件
和异步工作
后,会查看这次Reactor线程
被唤醒有没有执行过异步工作和有没有IO就绪的Channel
。
boolean ranTasks
这时候就派上了用场,这个ranTasks
正是前边咱们在讲runAllTasks办法
时提到的返回值。用来示意是否执行过至多一次异步工作
。int strategy
正是JDK NIO Selector
的select办法
的返回值,用来示意IO就绪
的Channel个数
。
如果ranTasks = false 并且 strategy = 0
这代表Reactor线程
本次既没有异步工作
执行也没有IO就绪
的Channel
须要解决却被意外的唤醒。等于是空转了一圈啥也没干。
这种状况下Netty就会认为可能曾经触发了JDK NIO Epoll的空轮询BUG
int SELECTOR_AUTO_REBUILD_THRESHOLD = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512); private boolean unexpectedSelectorWakeup(int selectCnt) { ..................省略............... /** * 走到这里的条件是 既没有IO就绪事件,也没有异步工作,Reactor线程从Selector上被异样唤醒 * 这种状况可能是曾经触发了JDK Epoll的空轮询BUG,如果这种状况继续512次 则认为可能曾经触发BUG,于是重建Selector * * */ if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); rebuildSelector(); return true; } return false; }
- 如果
Reactor
这种意外唤醒的次数selectCnt
超过了配置的次数SELECTOR_AUTO_REBUILD_THRESHOLD
,那么Netty就会认定这种状况可能曾经触发了JDK NIO Epoll空轮询BUG
,则重建Selector
(将之前注册的所有Channel从新注册到新的Selector上并敞开旧的Selector
),selectCnt计数
归0
。
SELECTOR_AUTO_REBUILD_THRESHOLD
默认为512
,能够通过零碎变量-D io.netty.selectorAutoRebuildThreshold
指定自定义数值。
- 如果
selectCnt
小于SELECTOR_AUTO_REBUILD_THRESHOLD
,则返回不做任何解决,selectCnt
持续计数。
Netty就这样通过计数Reactor
被意外唤醒的次数,如果计数selectCnt
达到了512次
,则通过重建Selector
奇妙的绕开了JDK NIO Epoll空轮询BUG
。
咱们在日常开发中也能够借鉴Netty这种解决问题的思路,比方在我的项目开发中,当咱们发现咱们无奈保障彻底的解决一个问题时,或者为了解决这个问题导致咱们的投入产出比不高时,咱们就该思考是不是应该换一种思路去绕过这个问题,从而达到同样的成果。解决问题的最高境界就是不解决它,奇妙的绕过去~!!
总结
本文花了大量的篇幅介绍了Reactor
整体的运行框架,并深刻介绍了Reactor
外围的工作模块的具体实现逻辑。
通过本文的介绍咱们晓得了Reactor
如何轮询注册在其上的所有Channel上感兴趣的IO事件,以及Reactor如何去解决IO就绪的事件,如何执行Netty框架中提交的异步工作和定时工作。
最初介绍了Netty如何奇妙的绕过JDK NIO Epoll空轮询的BUG,达到解决问题的目标。
提炼了新的解决问题的思路:解决问题的最高境界就是不解决它,奇妙的绕过去~!!
好了,本文的内容就到这里了,咱们下篇文章见~