本系列Netty源码解析文章基于 4.1.56.Final版本

本文笔者来为大家介绍下Netty的外围引擎Reactor的运行架构,心愿通过本文的介绍可能让大家对Reactor是如何驱动着整个Netty框架的运行有一个全面的意识。也为咱们后续进一步介绍Netty对于解决网络申请的整个生命周期的相干内容做一个前置常识的铺垫,不便大家后续了解。

那么在开始本文正式的内容之前,笔者先来带着大家回顾下前边文章介绍的对于Netty整个框架如何搭建的相干内容,没有看过笔者前边几篇文章的读者敌人也没关系,这些并不会影响到本文的浏览,只不过波及到相干细节的局部,大家能够在回看下。

前文回顾

在《聊聊Netty那些事儿之Reactor在Netty中的实现(创立篇)》一文中,咱们介绍了Netty服务端的外围引擎主从Reactor线程组的创立过程以及相干外围组件里的重要属性。在这个过程中,咱们还提到了Netty对各种细节进行的优化,比方针对JDK NIO 原生Selector做的一些优化,展示了Netty对性能极致的谋求。最终咱们创立出了如下构造的Reactor。

在上篇文章《具体图解Netty Reactor启动全流程》中,咱们残缺地介绍了Netty服务端启动的整个流程,并介绍了在启动过程中波及到的ServerBootstrap相干的属性以及配置形式。用于接管连贯的服务端NioServerSocketChannel的创立和初始化过程以及其类的继承构造。其中重点介绍了NioServerSocketChannel向Reactor的注册过程以及Reactor线程的启动机会和pipeline的初始化机会。最初介绍了NioServerSocketChannel绑定端口地址的整个流程。在这个过程中咱们理解了Netty的这些外围组件是如何串联起来的。

当Netty启动结束后,咱们失去了如下的框架结构:

主Reactor线程组中治理的是NioServerSocketChannel用于接管客户端连贯,并在本人的pipeline中的ServerBootstrapAcceptor里初始化接管到的客户端连贯,随后会将初始化好的客户端连贯注册到从Reactor线程组中。

从Reactor线程组次要负责监听解决注册其上的所有客户端连贯的IO就绪事件。

其中一个Channel只能调配给一个固定的Reactor。一个Reactor负责解决多个Channel上的IO就绪事件,这样能够将服务端承载的全量客户端连贯摊派到多个Reactor中解决,同时也能保障Channel上IO解决的线程安全性。Reactor与Channel之间的对应关系如下图所示:

以上内容就是对笔者前边几篇文章的相干内容回顾,大家能回顾起来更好,回顾不起来也没关系,一点也不影响大家了解本文的内容。如果对相干细节感兴趣的同学,能够在浏览完本文之后,在去回看下。

咱们言归正传,正式开始本文的内容,笔者接下来会为大家介绍这些外围组件是如何相互配合从而驱动着整个Netty Reactor框架运行的。


当Netty Reactor框架启动结束后,接下来第一件事件也是最重要的事件就是如何来高效的接管客户端的连贯。

那么在探讨Netty服务端如何接管连贯之前,咱们须要弄清楚Reactor线程的运行机制,它是如何监听并解决Channel上的IO就绪事件的。

本文相当于是后续咱们介绍Reactor线程监听解决ACCEPT事件Read事件Write事件的前置篇,本文专一于讲述Reactor线程的整个运行框架。了解了本文的内容,对了解前面Reactor线程如何解决IO事件会大有帮忙。

咱们在Netty框架的创立阶段启动阶段无数次的提到了Reactor线程,那么在本文要介绍的运行阶段就该这个Reactor线程来大显神威了。

通过前边文章的介绍,咱们理解到Netty中的Reactor线程次要干三件事件:

  • 轮询注册在Reactor上的所有Channel感兴趣的IO就绪事件
  • 解决Channel上的IO就绪事件
  • 执行Netty中的异步工作。

正是这三个局部组成了Reactor的运行框架,那么咱们当初来看下这个运行框架具体是怎么运行的~~

Reactor线程的整个运行框架

大家还记不记得笔者在《聊聊Netty那些事儿之从内核角度看IO模型》一文中提到的,IO模型的演变是围绕着"如何用尽可能少的线程去治理尽可能多的连贯"这一主题进行的。

Netty的IO模型是通过JDK NIO Selector实现的IO多路复用模型,而Netty的IO线程模型主从Reactor线程模型

依据《聊聊Netty那些事儿之从内核角度看IO模型》一文中介绍的IO多路复用模型咱们很容易就能了解到Netty会应用一个用户态的Reactor线程去一直的通过Selector在内核态去轮训Channel上的IO就绪事件

说白了Reactor线程其实执行的就是一个死循环,在死循环中一直的通过Selector去轮训IO就绪事件,如果产生IO就绪事件则从Selector零碎调用中返回并解决IO就绪事件,如果没有产生IO就绪事件则始终阻塞Selector零碎调用上,直到满足Selector唤醒条件

以下三个条件中只有满足任意一个条件,Reactor线程就会被从Selector上唤醒:

  • 当Selector轮询到有IO沉闷事件产生时。
  • 当Reactor线程须要执行的定时工作达到工作执行工夫deadline时。
  • 当有异步工作提交给Reactor时,Reactor线程须要从Selector上被唤醒,这样能力及时的去执行异步工作
这里能够看出Netty对Reactor线程的压迫还是比拟狠的,反正当初也没有IO就绪事件须要去解决,不能让Reactor线程在这里白白等着,要立刻唤醒它,转去解决提交过去的异步工作以及定时工作。Reactor线程堪称996榜样一刻不停歇地运作着。

在理解了Reactor线程的大略运行框架后,咱们接下来就到源码中去看下它的外围运行框架是如何实现进去的。

因为这块源码比拟宏大繁冗,所以笔者先把它的运行框架提取进去,不便大家整体的了解整个运行过程的全貌。

上图所展现的就是Reactor整个工作体系的全貌,次要分为如下几个重要的工作模块:

  1. Reactor线程在Selector上阻塞获取IO就绪事件。在这个模块中首先会去查看以后是否有异步工作须要执行,如果有异步须要执行,那么不论以后有没有IO就绪事件都不能阻塞在Selector上,随后会去非阻塞的轮询一下Selector上是否有IO就绪事件,如果有,正好能够和异步工作一起执行。优先解决IO就绪事件,在执行异步工作。
  2. 如果以后没有异步工作须要执行,那么Reactor线程会接着查看是否有定时工作须要执行,如果有则在Selector上阻塞直到定时工作的到期工夫deadline,或者满足其余唤醒条件被唤醒。如果没有定时工作须要执行,Reactor线程则会在Selector上始终阻塞直到满足唤醒条件。
  3. 当Reactor线程满足唤醒条件被唤醒后,首先会去判断以后是因为有IO就绪事件被唤醒还是因为有异步工作须要执行被唤醒或者是两者都有。随后Reactor线程就会去解决IO就绪事件和执行异步工作。
  4. 最初Reactor线程返回循环终点一直的反复上述三个步骤。

以上就是Reactor线程运行的整个外围逻辑,上面是笔者根据上述外围逻辑,将Reactor的整体代码设计框架提取进去,大家能够联合上边的Reactor工作流程图,从总体上先感触下整个源码实现框架,可能把Reactor的外围解决步骤和代码中相应的解决模块对应起来即可,这里不须要读懂每一行代码,要以逻辑解决模块为单位了解。前面笔者会将这些一个一个的逻辑解决模块在独自拎进去为大家具体介绍。

  @Override    protected void run() {        //记录轮询次数 用于解决JDK epoll的空轮训bug        int selectCnt = 0;        for (;;) {            try {                //轮询后果                int strategy;                try {                    //依据轮询策略获取轮询后果 这里的hasTasks()次要查看的是一般队列和尾部队列中是否有异步工作期待执行                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());                    switch (strategy) {                    case SelectStrategy.CONTINUE:                        continue;                    case SelectStrategy.BUSY_WAIT:                        // NIO不反对自旋(BUSY_WAIT)                    case SelectStrategy.SELECT:                      外围逻辑是有工作须要执行,则Reactor线程立马执行异步工作,如果没有异步工作执行,则进行轮询IO事件                    default:                    }                } catch (IOException e) {                       ................省略...............                }                执行到这里阐明满足了唤醒条件,Reactor线程从selector上被唤醒开始解决IO就绪事件和执行异步工作                /**                 * Reactor线程须要保障及时的执行异步工作,只有有异步工作提交,就须要退出轮询。                 * 有IO事件就优先解决IO事件,而后解决异步工作                 * */                selectCnt++;                //次要用于从IO就绪的SelectedKeys汇合中剔除曾经生效的selectKey                needsToSelectAgain = false;                //调整Reactor线程执行IO事件和执行异步工作的CPU工夫比例 默认50,示意执行IO事件和异步工作的工夫比例是一比一                final int ioRatio = this.ioRatio;                            这里次要解决IO就绪事件,以及执行异步工作               须要优先解决IO就绪事件,而后依据ioRatio设置的解决IO事件CPU用时与异步工作CPU用时比例,               来决定执行多长时间的异步工作                //判断是否触发JDK Epoll BUG 触发空轮询                if (ranTasks || strategy > 0) {                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",                                selectCnt - 1, selector);                    }                    selectCnt = 0;                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)                    //既没有IO就绪事件,也没有异步工作,Reactor线程从Selector上被异样唤醒 触发JDK Epoll空轮训BUG                    //从新构建Selector,selectCnt归零                    selectCnt = 0;                }            } catch (CancelledKeyException e) {                ................省略...............            } catch (Error e) {                ................省略...............            } catch (Throwable t) {              ................省略...............            } finally {              ................省略...............            }        }    }

从下面提取进去的Reactor的源码实现框架中,咱们能够看出Reactor线程次要做了上面几个事件:

  1. 通过JDK NIO Selector轮询注册在Reactor上的所有Channel感兴趣的IO事件。对于NioServerSocketChannel来说因为它次要负责接管客户端连贯所以监听的是OP_ACCEPT事件,对于客户端NioSocketChannel来说因为它次要负责解决连贯上的读写事件所以监听的是OP_READOP_WRITE事件。
这里须要留神的是netty只会主动注册OP_READ事件,而OP_WRITE事件是在当Socket写入缓冲区以满无奈持续写入发送数据时由用户本人注册。
  1. 如果有异步工作须要执行,则立马进行轮询操作,转去执行异步工作。这里分为两种状况:

    • 既有IO就绪事件产生,也有异步工作须要执行。则优先解决IO就绪事件,而后依据ioRatio设置的执行工夫比例决定执行多长时间的异步工作。这里Reactor线程须要管制异步工作的执行工夫,因为Reactor线程的外围是解决IO就绪事件,不能因为异步工作的执行而耽搁了最重要的事件。
    • 没有IO就绪事件产生,然而有异步工作或者定时工作到期须要执行。则只执行异步工作,尽可能的去压迫Reactor线程。没有IO就绪事件产生也不能闲着。

      这里第二种状况下只会执行64个异步工作,目标是为了避免适度执行异步工作,耽搁了最重要的事件轮询IO事件
  2. 在最初Netty会判断本次Reactor线程的唤醒是否是因为触发了JDK epoll 空轮询 BUG导致的,如果触发了该BUG,则重建Selector。绕过JDK BUG,达到解决问题的目标。

失常状况下Reactor线程从Selector中被唤醒有两种状况:

  • 轮询到有IO就绪事件产生。
  • 有异步工作或者定时工作须要执行。
    而JDK epoll 空轮询 BUG会在上述两种状况都没有产生的时候,Reactor线程会意外的从Selector中被唤醒,导致CPU空转。

JDK epoll 空轮询 BUG:https://bugs.java.com/bugdata...

好了,Reactor线程的总体运行构造框架咱们当初曾经理解了,上面咱们来深刻到这些外围解决模块中来各个击破它们~~

1. Reactor线程轮询IO就绪事件

在《聊聊Netty那些事儿之Reactor在Netty中的实现(创立篇)》一文中,笔者在讲述主从Reactor线程组NioEventLoopGroup的创立过程的时候,提到一个结构器参数SelectStrategyFactory

   public NioEventLoopGroup(            int nThreads, Executor executor, final SelectorProvider selectorProvider) {        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);    }  public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,                             final SelectStrategyFactory selectStrategyFactory) {        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());    }

Reactor线程最重要的一件事件就是轮询IO就绪事件SelectStrategyFactory 就是用于指定轮询策略的,默认实现为DefaultSelectStrategyFactory.INSTANCE

而在Reactor线程开启轮询的一开始,就是用这个selectStrategy 去计算一个轮询策略strategy ,后续会依据这个strategy 进行不同的逻辑解决。

  @Override    protected void run() {        //记录轮询次数 用于解决JDK epoll的空轮训bug        int selectCnt = 0;        for (;;) {            try {                //轮询后果                int strategy;                try {                    //依据轮询策略获取轮询后果 这里的hasTasks()次要查看的是一般队列和尾部队列中是否有异步工作期待执行                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());                    switch (strategy) {                    case SelectStrategy.CONTINUE:                        continue;                    case SelectStrategy.BUSY_WAIT:                        // NIO不反对自旋(BUSY_WAIT)                    case SelectStrategy.SELECT:                      外围逻辑是有工作须要执行,则Reactor线程立马执行异步工作,如果没有异步工作执行,则进行轮询IO事件                    default:                    }                } catch (IOException e) {                       ................省略...............                }                ................省略...............}

上面咱们来看这个轮询策略strategy 具体的计算逻辑是什么样的?

1.1 轮询策略

public interface SelectStrategy {    /**     * Indicates a blocking select should follow.     */    int SELECT = -1;    /**     * Indicates the IO loop should be retried, no blocking select to follow directly.     */    int CONTINUE = -2;    /**     * Indicates the IO loop to poll for new events without blocking.     */    int BUSY_WAIT = -3;    int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;}

咱们首先来看下Netty中定义的这三种轮询策略:

  • SelectStrategy.SELECT:此时没有任何异步工作须要执行,Reactor线程能够安心的阻塞Selector上期待IO就绪事件的降临。
  • SelectStrategy.CONTINUE:从新开启一轮IO轮询
  • SelectStrategy.BUSY_WAIT: Reactor线程进行自旋轮询,因为NIO 不反对自旋操作,所以这里间接跳到SelectStrategy.SELECT策略。

上面咱们来看下轮询策略的计算逻辑calculateStrategy

final class DefaultSelectStrategy implements SelectStrategy {    static final SelectStrategy INSTANCE = new DefaultSelectStrategy();    private DefaultSelectStrategy() { }    @Override    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {        /**         * Reactor线程要保障及时的执行异步工作         * 1:如果有异步工作期待执行,则马上执行selectNow()非阻塞轮询一次IO就绪事件         * 2:没有异步工作,则跳到switch select分支         * */        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;    }}
  • Reactor线程的轮询工作开始之前,须要首先判断下以后是否有异步工作须要执行。判断根据就是查看Reactor中的异步工作队列taskQueue和用于统计信息工作用的尾部队列tailTask是否有异步工作
    @Override    protected boolean hasTasks() {        return super.hasTasks() || !tailTasks.isEmpty();    }   protected boolean hasTasks() {        assert inEventLoop();        return !taskQueue.isEmpty();    }
  • 如果Reactor中有异步工作须要执行,那么Reactor线程须要立刻执行,不能阻塞在Selector上。在返回前须要再顺带调用selectNow()非阻塞查看一下以后是否有IO就绪事件产生。如果有,那么正好能够和异步工作一起被解决,如果没有,则及时地解决异步工作
这里Netty要表白的语义是:首先Reactor线程须要优先保障IO就绪事件的解决,而后在保障异步工作的及时执行。如果以后没有IO就绪事件然而有异步工作须要执行时,Reactor线程就要去及时执行异步工作而不是持续阻塞在Selector上期待IO就绪事件。
   private final IntSupplier selectNowSupplier = new IntSupplier() {        @Override        public int get() throws Exception {            return selectNow();        }    };   int selectNow() throws IOException {        //非阻塞        return selector.selectNow();    }
  • 如果以后Reactor线程没有异步工作须要执行,那么calculateStrategy 办法间接返回SelectStrategy.SELECT也就是SelectStrategy接口中定义的常量-1。当calculateStrategy 办法通过selectNow()返回非零数值时,示意此时有IO就绪Channel,返回的数值示意有多少个IO就绪Channel
  @Override    protected void run() {        //记录轮询次数 用于解决JDK epoll的空轮训bug        int selectCnt = 0;        for (;;) {            try {                //轮询后果                int strategy;                try {                    //依据轮询策略获取轮询后果 这里的hasTasks()次要查看的是一般队列和尾部队列中是否有异步工作期待执行                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());                    switch (strategy) {                    case SelectStrategy.CONTINUE:                        continue;                    case SelectStrategy.BUSY_WAIT:                        // NIO不反对自旋(BUSY_WAIT)                    case SelectStrategy.SELECT:                      外围逻辑是有工作须要执行,则Reactor线程立马执行异步工作,如果没有异步工作执行,则进行轮询IO事件                    default:                    }                } catch (IOException e) {                       ................省略...............                }                ................解决IO就绪事件以及执行异步工作...............}

从默认的轮询策略咱们能够看出selectStrategy.calculateStrategy只会返回三种状况:

  • 返回 -1: switch逻辑分支进入SelectStrategy.SELECT分支,示意此时Reactor中没有异步工作须要执行,Reactor线程能够安心的阻塞在Selector上期待IO就绪事件产生。
  • 返回 0: switch逻辑分支进入default分支,示意此时Reactor中没有IO就绪事件然而有异步工作须要执行,流程通过default分支间接进入了解决异步工作的逻辑局部。
  • 返回 > 0:switch逻辑分支进入default分支,示意此时Reactor中既有IO就绪事件产生也有异步工作须要执行,流程通过default分支间接进入了解决IO就绪事件和执行异步工作逻辑局部。

当初Reactor的流程解决逻辑走向咱们分明了,那么接下来咱们把重点放在SelectStrategy.SELECT分支中的轮询逻辑上。这块是Reactor监听IO就绪事件的外围。

1.2 轮询逻辑

                    case SelectStrategy.SELECT:                        //以后没有异步工作执行,Reactor线程能够释怀的阻塞期待IO就绪事件                        //从定时工作队列中取出行将快要执行的定时工作deadline                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();                        if (curDeadlineNanos == -1L) {                            // -1代表以后定时工作队列中没有定时工作                            curDeadlineNanos = NONE; // nothing on the calendar                        }                        //最早执行定时工作的deadline作为 select的阻塞工夫,意思是到了定时工作的执行工夫                        //不论有无IO就绪事件,必须唤醒selector,从而使reactor线程执行定时工作                        nextWakeupNanos.set(curDeadlineNanos);                        try {                            if (!hasTasks()) {                                //再次查看一般工作队列中是否有异步工作                                //没有的话开始select阻塞轮询IO就绪事件                                strategy = select(curDeadlineNanos);                            }                        } finally {                            // 执行到这里阐明Reactor曾经从Selector上被唤醒了                            // 设置Reactor的状态为昏迷状态AWAKE                            // lazySet优化不必要的volatile操作,不应用内存屏障,不保障写操作的可见性(单线程不须要保障)                            nextWakeupNanos.lazySet(AWAKE);                        }

流程走到这里,阐明当初Reactor上没有任何事件可做,能够安心的阻塞Selector上期待IO就绪事件到来。

那么Reactor线程到底应该在Selector上阻塞多久呢??

在答复这个问题之前,咱们在回顾下《聊聊Netty那些事儿之Reactor在Netty中的实现(创立篇)》一文中在讲述Reactor的创立时提到,Reactor线程除了要轮询Channel上的IO就绪事件,以及解决IO就绪事件外,还有一个工作就是负责执行Netty框架中的异步工作

而Netty框架中的异步工作分为三类:

  • 寄存在一般工作队列taskQueue中的一般异步工作。
  • 寄存在尾部队列tailTasks 中的用于执行统计工作等收尾动作的尾部工作。
  • 还有一种就是这里行将提到的定时工作。寄存在Reactor中的定时工作队列scheduledTaskQueue中。

从ReactorNioEventLoop类中的继承构造咱们也能够看出,Reactor具备执行定时工作的能力。

既然Reactor须要执行定时工作,那么它就不能始终阻塞Selector上有限期待IO就绪事件

那么咱们回到本大节一开始提到的问题上,为了保障Reactor可能及时地执行定时工作Reactor线程须要在行将要执行的的第一个定时工作deadline达到之前被唤醒。

所以在Reactor线程开始轮询IO就绪事件之前,咱们须要首先计算出来Reactor线程Selector上的阻塞超时工夫。

1.2.1 Reactor的轮询超时工夫

首先咱们须要从Reactor的定时工作队列scheduledTaskQueue 中取出行将快要执行的定时工作deadline。将这个deadline作为Reactor线程Selector上轮询的超时工夫。这样能够保障在定时工作行将要执行时,Reactor当初能够及时的从Selector上被唤醒。

    private static final long AWAKE = -1L;    private static final long NONE = Long.MAX_VALUE;    // nextWakeupNanos is:    //    AWAKE            when EL is awake    //    NONE             when EL is waiting with no wakeup scheduled    //    other value T    when EL is waiting with wakeup scheduled at time T    private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);      long curDeadlineNanos = nextScheduledTaskDeadlineNanos();      if (curDeadlineNanos == -1L) {            // -1代表以后定时工作队列中没有定时工作            curDeadlineNanos = NONE; // nothing on the calendar      }      nextWakeupNanos.set(curDeadlineNanos);
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;    protected final long nextScheduledTaskDeadlineNanos() {        ScheduledFutureTask<?> scheduledTask = peekScheduledTask();        return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;    }    final ScheduledFutureTask<?> peekScheduledTask() {        Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;        return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;    }}

nextScheduledTaskDeadlineNanos 办法会返回以后Reactor定时工作队列中最近的一个定时工作deadline工夫点,如果定时工作队列中没有定时工作,则返回-1

NioEventLoopnextWakeupNanos 变量用来寄存Reactor从Selector上被唤醒的工夫点,设置为最近须要被执行定时工作的deadline,如果以后并没有定时工作须要执行,那么就设置为Long.MAX_VALUE始终阻塞,直到有IO就绪事件达到或者有异步工作须要执行。

1.2.2 Reactor开始轮询IO就绪事件

     if (!hasTasks()) {             //再次查看一般工作队列中是否有异步工作, 没有的话  开始select阻塞轮询IO就绪事件            strategy = select(curDeadlineNanos);     }

Reactor线程开始阻塞轮询IO就绪事件之前还须要再次检查一下是否有异步工作须要执行。

如果此时凑巧有异步工作提交,就须要进行IO就绪事件的轮询,转去执行异步工作。如果没有异步工作,则正式开始轮询IO就绪事件

    private int select(long deadlineNanos) throws IOException {        if (deadlineNanos == NONE) {            //无定时工作,无一般工作执行时,开始轮询IO就绪事件,没有就始终阻塞 直到唤醒条件成立            return selector.select();        }        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);    }

如果deadlineNanos == NONE,通过上大节的介绍,咱们晓得NONE
示意以后Reactor中并没有定时工作,所以能够安心的阻塞Selector上期待IO就绪事件到来。

selector.select()调用是一个阻塞调用,如果没有IO就绪事件Reactor线程就会始终阻塞在这里直到IO就绪事件到来。这里占时不思考前边提到的JDK NIO Epoll的空轮询BUG.

读到这里那么问题来了,此时Reactor线程正阻塞在selector.select()调用上期待IO就绪事件的到来,如果此时正好有异步工作被提交到Reactor中须要执行,并且此时无任何IO就绪事件,而Reactor线程因为没有IO就绪事件到来,会持续在这里阻塞,那么如何去执行异步工作呢??

解铃还须系铃人,既然异步工作在被提交后心愿立马失去执行,那么就在提交异步工作的时候去唤醒Reactor线程

    //addTaskWakesUp = true 示意 当且仅当只有调用addTask办法时 才会唤醒Reactor线程    //addTaskWakesUp = false 示意 并不是只有addTask办法能力唤醒Reactor 还有其余办法能够唤醒Reactor 默认设置false    private final boolean addTaskWakesUp;    private void execute(Runnable task, boolean immediate) {        boolean inEventLoop = inEventLoop();        addTask(task);        if (!inEventLoop) {            //如果以后线程不是Reactor线程,则启动Reactor线程            //这里能够看出Reactor线程的启动是通过 向NioEventLoop增加异步工作时启动的            startThread();            .....................省略...................        }        if (!addTaskWakesUp && immediate) {            //io.netty.channel.nio.NioEventLoop.wakeup            wakeup(inEventLoop);        }    }

对于execute办法我想大家肯定不会生疏,在上篇文章《具体图解Netty Reactor启动全流程》中咱们在介绍Reactor线程的启动时介绍过该办法。

在启动过程中波及到的重要操作Register操作Bind操作都须要封装成异步工作通过该办法提交到Reactor中执行。

这里咱们将重点放在execute办法后半段wakeup逻辑局部。

咱们先介绍下和wakeup逻辑相干的两个参数boolean immediateboolean addTaskWakesUp

  • immediate:示意提交的task是否须要被立刻执行。Netty中只有你提交的工作类型不是LazyRunnable类型的工作,都是须要立刻执行的。immediate = true
  • addTaskWakesUp : true 示意当且仅当只有调用addTask办法时才会唤醒Reactor线程。调用别的办法并不会唤醒Reactor线程
    在初始化NioEventLoop时会设置为false,示意并不是只有addTask办法能力唤醒Reactor线程 还有其余办法能够唤醒Reactor线程,比方这里的execute办法就会唤醒Reactor线程

针对execute办法中的这个唤醒条件!addTaskWakesUp && immediatenetty这里要表白的语义是:当immediate参数为true的时候示意该异步工作须要立刻执行,addTaskWakesUp 默认设置为false 示意不仅只有addTask办法能够唤醒Reactor,还有其余办法比方这里的execute办法也能够唤醒。然而当设置为true时,语义就变为只有addTask才能够唤醒Reactor,即便execute办法里的immediate = true也不能唤醒Reactor,因为执行的是execute办法而不是addTask办法。

    private static final long AWAKE = -1L;    private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);    protected void wakeup(boolean inEventLoop) {        if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {            //将Reactor线程从Selector上唤醒            selector.wakeup();        }    }

nextWakeupNanos = AWAKE时示意以后Reactor正处于昏迷状态,既然是昏迷状态也就没有必要去执行 selector.wakeup()反复唤醒Reactor了,同时也能省去这一次的零碎调用开销。

在《1.2大节 轮询逻辑》开始介绍的源码实现框架里Reactor被唤醒之后执行代码会进入finally{...}语句块中,在那里会将nextWakeupNanos设置为AWAKE

                        try {                            if (!hasTasks()) {                                strategy = select(curDeadlineNanos);                            }                        } finally {                            // 执行到这里阐明Reactor曾经从Selector上被唤醒了                            // 设置Reactor的状态为昏迷状态AWAKE                            // lazySet优化不必要的volatile操作,不应用内存屏障,不保障写操作的可见性(单线程不须要保障)                            nextWakeupNanos.lazySet(AWAKE);                        }
这里Netty用了一个AtomicLong类型的变量nextWakeupNanos,既能示意以后Reactor线程的状态,又能示意Reactor线程的阻塞超时工夫。咱们在日常开发中也能够学习下这种技巧。

咱们持续回到Reactor线程轮询IO就绪事件的主线上。

    private int select(long deadlineNanos) throws IOException {        if (deadlineNanos == NONE) {            //无定时工作,无一般工作执行时,开始轮询IO就绪事件,没有就始终阻塞 直到唤醒条件成立            return selector.select();        }        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);    }

deadlineNanos不为NONE,示意此时Reactor定时工作须要执行,Reactor线程须要阻塞在Selector上期待IO就绪事件直到最近的一个定时工作执行工夫点deadline达到。

这里的deadlineNanos示意的就是Reactor中最近的一个定时工作执行工夫点deadline,单位是纳秒。指的是一个相对工夫

而咱们须要计算的是Reactor线程阻塞在Selector的超时工夫timeoutMillis,单位是毫秒,指的是一个绝对工夫

所以在Reactor线程开始阻塞在Selector上之前,咱们须要将这个单位为纳秒的相对工夫deadlineNanos转化为单位为毫秒的绝对工夫timeoutMillis

    private int select(long deadlineNanos) throws IOException {        if (deadlineNanos == NONE) {            //无定时工作,无一般工作执行时,开始轮询IO就绪事件,没有就始终阻塞 直到唤醒条件成立            return selector.select();        }        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);    }

这里大家可能会好奇,通过deadlineToDelayNanos办法计算timeoutMillis的时候,为什么要给deadlineNanos在加上0.995毫秒呢??

大家设想一下这样的场景,当最近的一个定时工作的deadline行将在5微秒内达到,那么这时将纳秒转换成毫秒计算出的timeoutMillis 会是0

而在Netty中timeoutMillis = 0 要表白的语义是:定时工作执行工夫曾经达到deadline工夫点,须要被执行。

而现实情况是定时工作还有5微秒才可能达到deadline,所以对于这种状况,须要在deadlineNanos在加上0.995毫秒凑成1毫秒不能让其为0。

所以从这里咱们能够看出,Reactor在有定时工作的状况下,至多要阻塞1毫秒
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {    protected static long deadlineToDelayNanos(long deadlineNanos) {        return ScheduledFutureTask.deadlineToDelayNanos(deadlineNanos);    }}
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {    static long deadlineToDelayNanos(long deadlineNanos) {        return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - nanoTime());    }    //启动工夫点    private static final long START_TIME = System.nanoTime();    static long nanoTime() {        return System.nanoTime() - START_TIME;    }    static long deadlineNanos(long delay) {        //计算定时工作执行deadline  去除启动工夫        long deadlineNanos = nanoTime() + delay;        // Guard against overflow        return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;    }}

这里须要留神一下,在创立定时工作时会通过deadlineNanos办法计算定时工作的执行deadlinedeadline的计算逻辑是以后工夫点+工作延时delay-系统启动工夫这里须要扣除系统启动的工夫

所以这里在通过deadline计算延时delay(也就是timeout)的时候须要在加上系统启动的工夫 : deadlineNanos - nanoTime()

当通过deadlineToDelayNanos 计算出的timeoutMillis <= 0时,示意Reactor目前有邻近的定时工作须要执行,这时候就须要立马返回,不能阻塞在Selector上影响定时工作的执行。当然在返回执行定时工作前,须要在棘手通过selector.selectNow()非阻塞轮询一下Channel上是否有IO就绪事件达到,避免耽搁IO事件的解决。真是操碎了心~~

timeoutMillis > 0时,Reactor线程就能够安心的阻塞在Selector上期待IO事件的到来,直到timeoutMillis超时工夫达到。

timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis)

当注册在Reactor上的Channel中有IO事件到来时,Reactor线程就会从selector.select(timeoutMillis)调用中唤醒,立刻去解决IO就绪事件

这里假如一种极其状况,如果最近的一个定时工作的deadline是在将来很远的一个工夫点,这样就会使timeoutMillis的工夫十分十分久,那么Reactor岂不是会始终阻塞在Selector上造成 Netty 无奈工作?

笔者感觉大家当初心里应该曾经有了答案,咱们在《1.2.2 Reactor开始轮询IO就绪事件》大节一开始介绍过,当Reactor正在Selector上阻塞时,如果此时用户线程向Reactor提交了异步工作,Reactor线程会通过execute办法被唤醒。


流程到这里,Reactor中最重要也是最外围的逻辑:轮询Channel上的IO就绪事件的解决流程咱们就解说完了。

当Reactor轮询到有IO沉闷事件或者有异步工作须要执行时,就会从Selector上被唤醒,上面就到了该介绍Reactor被唤醒之后是如何解决IO就绪事件以及如何执行异步工作的时候了。

Netty毕竟是一个网络框架,所以它会优先去解决Channel上的IO事件,基于这个事实,所以Netty不会容忍异步工作被无限度的执行从而影响IO吞吐

Netty通过ioRatio变量来调配Reactor线程在解决IO事件和执行异步工作之间的CPU工夫分配比例。

上面咱们就来看下这个执行工夫比例的调配逻辑是什么样的~~~

2. Reactor解决IO与解决异步工作的工夫比例调配

无论什么时候,当有IO就绪事件到来时,Reactor都须要保障IO事件被及时残缺的解决完,而ioRatio次要限度的是执行异步工作所需用时,避免Reactor线程解决异步工作工夫过长而导致 I/O 事件得不到及时地解决。

                //调整Reactor线程执行IO事件和执行异步工作的CPU工夫比例 默认50,示意执行IO事件和异步工作的工夫比例是一比一                final int ioRatio = this.ioRatio;                boolean ranTasks;                if (ioRatio == 100) { //先一股脑执行IO事件,在一股脑执行异步工作(无工夫限度)                    try {                        if (strategy > 0) {                            //如果有IO就绪事件 则解决IO就绪事件                            processSelectedKeys();                        }                    } finally {                        // Ensure we always run tasks.                        //解决所有异步工作                        ranTasks = runAllTasks();                    }                } else if (strategy > 0) {//先执行IO事件 用时ioTime  执行异步工作只能用时ioTime * (100 - ioRatio) / ioRatio                    final long ioStartTime = System.nanoTime();                    try {                        processSelectedKeys();                    } finally {                        // Ensure we always run tasks.                        final long ioTime = System.nanoTime() - ioStartTime;                        // 限定在超时工夫内 解决无限的异步工作 避免Reactor线程解决异步工作工夫过长而导致 I/O 事件阻塞                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);                    }                } else { //没有IO就绪事件处理,则只执行异步工作 最多执行64个 避免Reactor线程解决异步工作工夫过长而导致 I/O 事件阻塞                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks                }
  • ioRatio = 100时,示意无需思考执行工夫的限度,当有IO就绪事件时(strategy > 0Reactor线程须要优先解决IO就绪事件,解决完IO事件后,执行所有的异步工作包含:一般工作,尾部工作,定时工作。无工夫限度。
strategy的数值示意IO就绪Channel个数。它是前边介绍的io.netty.channel.nio.NioEventLoop#select办法的返回值。
  • ioRatio设置的值不为100时,默认为50。须要先统计出执行IO事件的用时ioTime ,依据ioTime * (100 - ioRatio) / ioRatio计算出,前面执行异步工作的限度工夫。也就是说Reactor线程须要在这个限定的工夫内,执行无限的异步工作,避免Reactor线程因为解决异步工作工夫过长而导致I/O 事件得不到及时地解决。
默认状况下,执行IO事件用时和执行异步工作用时比例设置的是一比一。
ioRatio设置的越高,则Reactor线程执行异步工作的工夫占比越小

要想得到Reactor线程执行异步工作所需的工夫限度,必须晓得执行IO事件的用时ioTime而后在依据ioRatio计算出执行异步工作的工夫限度。

那如果此时并没有IO就绪事件须要Reactor线程解决的话,这种状况下咱们无奈失去ioTime,那怎么失去执行异步工作的限度工夫呢??

在这种非凡状况下,Netty只容许Reactor线程最多执行64个异步工作,而后就完结执行。转去持续轮训IO就绪事件。外围目标还是避免Reactor线程因为解决异步工作工夫过长而导致I/O 事件得不到及时地解决。

默认状况下,当Reactor异步工作须要解决然而没有IO就绪事件时,Netty只会容许Reactor线程执行最多64个异步工作。

当初咱们对Reactor解决IO事件异步工作的整体框架曾经理解了,上面咱们就来别离介绍下Reactor线程在解决IO事件异步工作的具体逻辑是什么样的?

3. Reactor线程解决IO就绪事件

    //该字段为持有selector对象selectedKeys的援用,当IO事件就绪时,间接从这里获取   private SelectedSelectionKeySet selectedKeys;   private void processSelectedKeys() {        //是否采纳netty优化后的selectedKey汇合类型 是由变量DISABLE_KEY_SET_OPTIMIZATION决定的 默认为false        if (selectedKeys != null) {            processSelectedKeysOptimized();        } else {            processSelectedKeysPlain(selector.selectedKeys());        }    }

看到这段代码大家眼生吗??

不知大家还记不记得咱们在《聊聊Netty那些事儿之Reactor在Netty中的实现(创立篇)》一文中介绍Reactor NioEventLoop类在创立Selector的过程中提到,出于对JDK NIO SelectorselectedKeys 汇合插入遍历操作性能的思考Netty将本人用数组实现的SelectedSelectionKeySet 汇合替换掉了JDK NIO SelectorselectedKeys HashSet实现。

public abstract class SelectorImpl extends AbstractSelector {    // The set of keys with data ready for an operation    // //IO就绪的SelectionKey(外面包裹着channel)    protected Set<SelectionKey> selectedKeys;    // The set of keys registered with this Selector    //注册在该Selector上的所有SelectionKey(外面包裹着channel)    protected HashSet<SelectionKey> keys;    ...............省略...................}

Netty中通过优化开关DISABLE_KEY_SET_OPTIMIZATION 管制是否对JDK NIO Selector进行优化。默认是须要优化。

在优化开关开启的状况下,Netty会将创立的SelectedSelectionKeySet 汇合保留在NioEventLoopprivate SelectedSelectionKeySet selectedKeys字段中,不便Reactor线程间接从这里获取IO就绪SelectionKey

在优化开关敞开的状况下,Netty会间接采纳JDK NIO Selector的默认实现。此时NioEventLoopselectedKeys字段就会为null

遗记这段的同学能够在回顾下《聊聊Netty那些事儿之Reactor在Netty中的实现(创立篇)》一文中对于Reactor的创立过程。

通过对前边内容的回顾,咱们看到了在Reactor解决IO就绪事件的逻辑也分为两个局部,一个是通过Netty优化的,一个是采纳JDK 原生的。

咱们先来看采纳JDK 原生Selector的解决形式,了解了这种形式,在看Netty优化的形式会更加容易。

3.1 processSelectedKeysPlain

咱们在《聊聊Netty那些事儿之Reactor在Netty中的实现(创立篇)》一文中介绍JDK NIO Selector的工作过程时讲过,当注册在Selector上的Channel产生IO就绪事件时,Selector会将IO就绪SelectionKey插入到Set<SelectionKey> selectedKeys汇合中。

这时Reactor线程会从java.nio.channels.Selector#select(long)调用中返回。随后调用java.nio.channels.Selector#selectedKeys获取IO就绪SelectionKey汇合。

所以Reactor线程在调用processSelectedKeysPlain办法解决IO就绪事件之前须要调用selector.selectedKeys()去获取所有IO就绪SelectionKeys

processSelectedKeysPlain(selector.selectedKeys())
    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {        if (selectedKeys.isEmpty()) {            return;        }        Iterator<SelectionKey> i = selectedKeys.iterator();        for (;;) {            final SelectionKey k = i.next();            final Object a = k.attachment();            //留神每次迭代开端的keyIterator.remove()调用。Selector不会本人从已选择键集中移除SelectionKey实例。            //必须在解决完通道时本人移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中。            i.remove();            if (a instanceof AbstractNioChannel) {                processSelectedKey(k, (AbstractNioChannel) a);            } else {                @SuppressWarnings("unchecked")                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;                processSelectedKey(k, task);            }            if (!i.hasNext()) {                break;            }            //目标是再次进入for循环 移除生效的selectKey(socketChannel可能从selector上移除)            if (needsToSelectAgain) {                selectAgain();                selectedKeys = selector.selectedKeys();                // Create the iterator again to avoid ConcurrentModificationException                if (selectedKeys.isEmpty()) {                    break;                } else {                    i = selectedKeys.iterator();                }            }        }    }

3.1.1 获取IO就绪的Channel

Set<SelectionKey> selectedKeys汇合外面装的全副是IO就绪SelectionKey,留神,此时Set<SelectionKey> selectedKeys的实现类型为HashSet类型。因为咱们这里首先介绍的是JDK NIO 原生实现。

通过获取HashSet的迭代器,开始一一解决IO就绪Channel

Iterator<SelectionKey> i = selectedKeys.iterator();final SelectionKey k = i.next();final Object a = k.attachment();

大家还记得这个SelectionKey中的attachment属性里寄存的是什么吗??

在上篇文章《具体图解Netty Reactor启动全流程》中咱们在讲NioServerSocketChannelMain Reactor注册的时候,通过this指针将本人作为SelectionKeyattachment属性注册到Selector中。这一步实现了Netty自定义ChannelJDK NIO Channel的绑定

public abstract class AbstractNioChannel extends AbstractChannel {    //channel注册到Selector后取得的SelectKey    volatile SelectionKey selectionKey;    @Override    protected void doRegister() throws Exception {        boolean selected = false;        for (;;) {            try {                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);                return;            } catch (CancelledKeyException e) {                ...............省略....................            }        }    }}

而咱们也提到SelectionKey就相当于是ChannelSelector中的一种示意,当Channel上有IO就绪事件时,Selector会将Channel对应的SelectionKey返回给Reactor线程,咱们能够通过返回的这个SelectionKey里的attachment属性获取到对应的Netty自定义Channel

对于客户端连贯事件(OP_ACCEPT)沉闷时,这里的Channel类型NioServerSocketChannel
对于客户端读写事件(ReadWrite)沉闷时,这里的Channel类型NioSocketChannel

当咱们通过k.attachment()获取到Netty自定义的Channel时,就须要把这个Channel对应的SelectionKeySelector的就绪汇合Set<SelectionKey> selectedKeys中删除。因为Selector本人不会被动删除曾经解决完的SelectionKey,须要调用者本人被动删除,这样当这个Channel再次IO就绪时Selector会再次将这个Channel对应的SelectionKey放入就绪汇合Set<SelectionKey> selectedKeys中。

i.remove();

3.1.2 解决Channel上的IO事件

     if (a instanceof AbstractNioChannel) {                processSelectedKey(k, (AbstractNioChannel) a);     } else {                @SuppressWarnings("unchecked")                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;                processSelectedKey(k, task);     }

从这里咱们能够看出Netty向SelectionKey中的attachment属性附加的对象分为两种:

  • 一种是咱们相熟的Channel,无论是服务端应用的NioServerSocketChannel还是客户端应用的NioSocketChannel都属于AbstractNioChannel Channel上的IO事件是由Netty框架负责解决,也是本大节咱们要重点介绍的
  • 另一种就是NioTask,这种类型是Netty提供给用户能够自定义一些当Channel上产生IO就绪事件时的自定义解决。
public interface NioTask<C extends SelectableChannel> {    /**     * Invoked when the {@link SelectableChannel} has been selected by the {@link Selector}.     */    void channelReady(C ch, SelectionKey key) throws Exception;    /**     * Invoked when the {@link SelectionKey} of the specified {@link SelectableChannel} has been cancelled and thus     * this {@link NioTask} will not be notified anymore.     *     * @param cause the cause of the unregistration. {@code null} if a user called {@link SelectionKey#cancel()} or     *              the event loop has been shut down.     */    void channelUnregistered(C ch, Throwable cause) throws Exception;}
NioTaskChannel其实实质上是一样的都是负责解决Channel上的IO就绪事件,只不过一个是用户自定义解决,一个是Netty框架解决。这里咱们重点关注ChannelIO解决逻辑

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {        //获取Channel的底层操作类Unsafe        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();        if (!k.isValid()) {            ......如果SelectionKey曾经生效则敞开对应的Channel......        }        try {            //获取IO就绪事件            int readyOps = k.readyOps();            //解决Connect事件            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {                int ops = k.interestOps();                //移除对Connect事件的监听,否则Selector会始终告诉                ops &= ~SelectionKey.OP_CONNECT;                k.interestOps(ops);                //触发channelActive事件处理Connect事件                unsafe.finishConnect();            }            //解决Write事件            if ((readyOps & SelectionKey.OP_WRITE) != 0) {                ch.unsafe().forceFlush();            }             //解决Read事件或者Accept事件            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {                unsafe.read();            }        } catch (CancelledKeyException ignored) {            unsafe.close(unsafe.voidPromise());        }    }
  • 首先咱们须要获取IO就绪Channel底层的操作类Unsafe,用于对具体IO就绪事件的解决。
这里能够看出,Netty对IO就绪事件的解决全副封装在Unsafe类中。比方:对OP_ACCEPT事件的具体解决逻辑是封装在NioServerSocketChannel中的UnSafe类中。对OP_READ或者OP_WRITE事件的解决是封装在NioSocketChannel中的Unsafe类中。
  • Selectionkey中获取具体IO就绪事件 readyOps

SelectonKey中对于IO事件的汇合有两个。一个是interestOps,用于记录Channel感兴趣的IO事件,在ChannelSelector注册结束后,通过pipeline中的HeadContext节点的ChannelActive事件回调中增加。上面这段代码就是在ChannelActive事件回调中Channel在向Selector注册本人感兴趣的IO事件。

    public abstract class AbstractNioChannel extends AbstractChannel {             @Override              protected void doBeginRead() throws Exception {                    // Channel.read() or ChannelHandlerContext.read() was called                    final SelectionKey selectionKey = this.selectionKey;                    if (!selectionKey.isValid()) {                        return;                    }                    readPending = true;                    final int interestOps = selectionKey.interestOps();                    /**                       * 1:ServerSocketChannel 初始化时 readInterestOp设置的是OP_ACCEPT事件                       * 2:SocketChannel 初始化时 readInterestOp设置的是OP_READ事件                     * */                    if ((interestOps & readInterestOp) == 0) {                        //注册监听OP_ACCEPT或者OP_READ事件                        selectionKey.interestOps(interestOps | readInterestOp);                    }              }    }

另一个就是这里的readyOps,用于记录在Channel感兴趣的IO事件中具体哪些IO事件就绪了。

Netty中将各种事件的汇合用一个int型变量来保留。

  • &操作判断,某个事件是否在事件汇合中:(readyOps & SelectionKey.OP_CONNECT) != 0,这里就是判断Channel是否对Connect事件感兴趣。
  • |操作向事件汇合中增加事件:interestOps | readInterestOp
  • 从事件汇合中删除某个事件,是通过先将要删除事件取反~,而后在和事件汇合做&操作:ops &= ~SelectionKey.OP_CONNECT

Netty这种对空间的极致利用思维,很值得咱们平时在日常开发中学习~~


当初咱们曾经晓得哪些Channel当初处于IO就绪状态,并且晓得了具体哪些类型的IO事件曾经就绪。

上面就该针对Channel上的不同IO就绪事件做出相应的解决了。

3.1.2.1 解决Connect事件

Netty客户端向服务端发动连贯,并向客户端的Reactor注册Connect事件,当连贯建设胜利后,客户端的NioSocketChannel就会产生Connect就绪事件,通过后面内容咱们讲的Reactor的运行框架,最终流程会走到这里。

      if ((readyOps & SelectionKey.OP_CONNECT) != 0) {                int ops = k.interestOps();                ops &= ~SelectionKey.OP_CONNECT;                k.interestOps(ops);                //触发channelActive事件                unsafe.finishConnect();     }

如果IO就绪的事件是Connect事件,那么就调用对应客户端NioSocketChannel中的Unsafe操作类中的finishConnect办法解决Connect事件。这时会在Netty客户端NioSocketChannel中的pipeline中流传ChannelActive事件

最初须要将OP_CONNECT事件从客户端NioSocketChannel所关怀的事件汇合interestOps中删除。否则Selector会始终告诉Connect事件就绪

3.1.2.2 解决Write事件

对于Reactor线程解决Netty中的Write事件的流程,笔者后续会专门用一篇文章来为大家介绍。本文咱们重点关注Reactor线程的整体运行框架。

      if ((readyOps & SelectionKey.OP_WRITE) != 0) {            ch.unsafe().forceFlush();      }

这里大家只须要记住,OP_WRITE事件的注册是由用户来实现的,当Socket发送缓冲区已满无奈持续写入数据时,用户会向Reactor注册OP_WRITE事件,等到Socket发送缓冲区变得可写时,Reactor会收到OP_WRITE事件沉闷告诉,随后在这里调用客户端NioSocketChannel中的forceFlush办法将残余数据发送进来。

3.1.2.3 解决Read事件或者Accept事件

      if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {            unsafe.read();     }

这里能够看出Netty中解决Read事件Accept事件都是由对应Channel中的Unsafe操作类中的read办法解决。

服务端NioServerSocketChannel中的Read办法解决的是Accept事件,客户端NioSocketChannel中的Read办法解决的是Read事件

这里大家只需记住各个IO事件在对应Channel中的解决入口,后续文章咱们会详细分析这些入口函数。

3.1.3 从Selector中移除生效的SelectionKey

            //用于及时从selectedKeys中革除生效的selectKey 比方 socketChannel从selector上被用户移除            private boolean needsToSelectAgain;             //目标是再次进入for循环 移除生效的selectKey(socketChannel可能被用户从selector上移除)            if (needsToSelectAgain) {                selectAgain();                selectedKeys = selector.selectedKeys();                // Create the iterator again to avoid ConcurrentModificationException                if (selectedKeys.isEmpty()) {                    break;                } else {                    i = selectedKeys.iterator();                }            }

在前边介绍Reactor运行框架的时候,咱们看到在每次Reactor线程轮询完结,筹备解决IO就绪事件以及异步工作的时候,都会将needsToSelectAgain 设置为false

那么这个needsToSelectAgain 到底是干嘛的?以及为什么咱们须要去“Select Again”呢?

首先咱们来看下在什么状况下会将needsToSelectAgain 这个变量设置为true,通过这个设置的过程,咱们是否可能从中找到一些线索?

咱们晓得Channel能够将本人注册到Selector上,那么当然也能够将本人从Selector上勾销移除。

在上篇文章中咱们也花了大量的篇幅解说了这个注册的过程,当初咱们来看下Channel的勾销注册。

public abstract class AbstractNioChannel extends AbstractChannel {   //channel注册到Selector后取得的SelectKey    volatile SelectionKey selectionKey;    @Override    protected void doDeregister() throws Exception {        eventLoop().cancel(selectionKey());    }    protected SelectionKey selectionKey() {        assert selectionKey != null;        return selectionKey;    }}

Channel勾销注册的过程很简略,间接调用NioChanneldoDeregister 办法,Channel绑定的Reactor会将其从Selector中勾销并进行监听Channel上的IO事件

public final class NioEventLoop extends SingleThreadEventLoop {    //记录Selector上移除socketChannel的个数 达到256个 则须要将有效的selectKey从SelectedKeys汇合中革除掉    private int cancelledKeys;    private static final int CLEANUP_INTERVAL = 256;    /**     * 将socketChannel从selector中移除 勾销监听IO事件     * */    void cancel(SelectionKey key) {        key.cancel();        cancelledKeys ++;        // 当从selector中移除的socketChannel数量达到256个,设置needsToSelectAgain为true        // 在io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain 中从新做一次轮询,将生效的selectKey移除,        // 以保障selectKeySet的有效性        if (cancelledKeys >= CLEANUP_INTERVAL) {            cancelledKeys = 0;            needsToSelectAgain = true;        }    }}
  • 调用JDK NIO SelectionKey的API cancel办法,将ChannelSelector中勾销掉。SelectionKey#cancel办法调用结束后,此时调用SelectionKey#isValid将会返回falseSelectionKey#cancel办法调用后,Selector会将要勾销的这个SelectionKey退出到Selector中的cancelledKeys汇合
public abstract class AbstractSelector extends Selector {    private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();    void cancel(SelectionKey k) {                              synchronized (cancelledKeys) {            cancelledKeys.add(k);        }    }}
  • Channel对应的SelectionKey勾销结束后,Channel勾销计数器cancelledKeys 会加1,当cancelledKeys = 256时,将needsToSelectAgain 设置为true
  • 随后在Selector下一次轮询过程中,会将cancelledKeys汇合中的SelectionKeySelector所有的KeySet中移除。这里的KeySet包含Selector用于寄存就绪SelectionKeyselectedKeys汇合,以及用于寄存所有注册的Channel对应的SelectionKeykeys汇合
public abstract class SelectorImpl extends AbstractSelector {    protected Set<SelectionKey> selectedKeys = new HashSet();    protected HashSet<SelectionKey> keys = new HashSet();         .....................省略...............}

咱们看到Reactor线程中对needsToSelectAgain 的判断是在processSelectedKeysPlain办法解决IO就绪SelectionKey的循环体中进行判断的。

之所以这里特地提到needsToSelectAgain 判断的地位,是要让大家留神到此时Reactor正在解决本次轮询的IO就绪事件

而前边也说了,当调用SelectionKey#cancel办法后,须要等到下次轮询的过程中Selector才会将这些勾销的SelectionKeySelector中的所有KeySet汇合中移除,当然这里也包含就绪汇合selectedKeys

当在本次轮询期间,如果大量的ChannelSelector中勾销,Selector中的就绪汇合selectedKeys 中仍然会保留这些Channel对应SelectionKey直到下次轮询。那么当然会影响本次轮询后果selectedKeys的有效性

所以为了保障Selector中所有KeySet的有效性,须要在Channel勾销个数达到256时,触发一次selectNow,目标是革除有效的SelectionKey

    private void selectAgain() {        needsToSelectAgain = false;        try {            selector.selectNow();        } catch (Throwable t) {            logger.warn("Failed to update SelectionKeys.", t);        }    }

到这里,咱们就对JDK 原生 Selector的解决形式processSelectedKeysPlain办法就介绍完了,其实 对IO就绪事件的解决逻辑都是一样的,在咱们了解了processSelectedKeysPlain办法后,processSelectedKeysOptimized办法IO就绪事件的解决,咱们了解起来就十分轻松了。

3.2 processSelectedKeysOptimized

Netty默认会采纳优化过的SelectorIO就绪事件的解决。然而解决逻辑是大同小异的。上面咱们次要介绍一下这两个办法的不同之处。

    private void processSelectedKeysOptimized() {        // 在openSelector的时候将JDK中selector实现类中得selectedKeys和publicSelectKeys字段类型        // 由原来的HashSet类型替换为 Netty优化后的数组实现的SelectedSelectionKeySet类型        for (int i = 0; i < selectedKeys.size; ++i) {            final SelectionKey k = selectedKeys.keys[i];            // 对应迭代器中得remove   selector不会本人革除selectedKey            selectedKeys.keys[i] = null;            final Object a = k.attachment();            if (a instanceof AbstractNioChannel) {                processSelectedKey(k, (AbstractNioChannel) a);            } else {                @SuppressWarnings("unchecked")                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;                processSelectedKey(k, task);            }            if (needsToSelectAgain) {                selectedKeys.reset(i + 1);                selectAgain();                i = -1;            }        }    }
  • JDK NIO 原生 Selector寄存IO就绪的SelectionKey的汇合为HashSet类型selectedKeys 。而Netty为了优化对selectedKeys 汇合遍历效率采纳了本人实现的SelectedSelectionKeySet类型,从而用对数组的遍历代替用HashSet的迭代器遍历。
  • Selector会在每次轮询到IO就绪事件时,将IO就绪的Channel对应的SelectionKey插入到selectedKeys汇合,然而Selector只管向selectedKeys汇合放入IO就绪的SelectionKeySelectionKey被处理完毕后,Selector是不会本人被动将其从selectedKeys汇合中移除的,典型的管杀不论埋。所以须要Netty本人在遍历到IO就绪的 SelectionKey后,将其删除。

    • processSelectedKeysPlain中是间接将其从迭代器中删除。
    • processSelectedKeysOptimized中将其在数组中对应的地位置为Null,不便垃圾回收。
  • 在最初革除有效的SelectionKey时,在processSelectedKeysPlain中因为采纳的是JDK NIO 原生的Selector,所以只须要执行SelectAgain就能够,Selector会主动革除有效Key。
    然而在processSelectedKeysOptimized 中因为是Netty本人实现的优化类型,所以须要Netty本人将SelectedSelectionKeySet数组中的SelectionKey全副革除,最初在执行SelectAgain

好了,到这里,咱们就将Reactor线程如何解决IO就绪事件的整个过程讲述完了,上面咱们就该到了介绍Reactor线程如何解决Netty框架中的异步工作了。

4. Reactor线程解决异步工作

Netty对于解决异步工作的办法有两个:

  • 一个是无超时工夫限度的runAllTasks()办法。当ioRatio设置为100时,Reactor线程会先一股脑的解决IO就绪事件,而后在一股脑的执行异步工作,并没有工夫的限度。
  • 另一个是有超时工夫限度的runAllTasks(long timeoutNanos)办法。当ioRatio != 100时,Reactor线程执行异步工作会有工夫限度,优先一股脑的解决完IO就绪事件统计出执行IO工作耗时ioTime。依据公式ioTime * (100 - ioRatio) / ioRatio)计算出Reactor线程执行异步工作的超时工夫。在超时工夫限定范畴内,执行无限的异步工作

上面咱们来别离看下这两个执行异步工作的办法解决逻辑:

4.1 runAllTasks()

    protected boolean runAllTasks() {        assert inEventLoop();        boolean fetchedAll;        boolean ranAtLeastOne = false;        do {            //将达到执行工夫的定时工作转存到一般工作队列taskQueue中,对立由Reactor线程从taskQueue中取出执行            fetchedAll = fetchFromScheduledTaskQueue();            if (runAllTasksFrom(taskQueue)) {                ranAtLeastOne = true;            }        } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.        if (ranAtLeastOne) {            lastExecutionTime = ScheduledFutureTask.nanoTime();        }        //执行尾部队列工作        afterRunningAllTasks();        return ranAtLeastOne;    }

Reactor线程执行异步工作的外围逻辑就是:

  • 先将到期的定时工作一股脑的从定时工作队列scheduledTaskQueue中取出并转存到一般工作队列taskQueue中。
  • Reactor线程对立从一般工作队列taskQueue中取出工作执行。
  • Reactor线程执行完定时工作一般工作后,开始执行存储于尾部工作队列tailTasks中的尾部工作

上面咱们来别离看下上述几个外围步骤的实现:

4.1.1 fetchFromScheduledTaskQueue

    /**     * 从定时工作队列中取出达到deadline执行工夫的定时工作     * 将定时工作 转存到 一般工作队列taskQueue中,对立由Reactor线程从taskQueue中取出执行     *     * */    private boolean fetchFromScheduledTaskQueue() {        if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {            return true;        }        long nanoTime = AbstractScheduledEventExecutor.nanoTime();        for (;;) {            //从定时工作队列中取出达到执行deadline的定时工作  deadline <= nanoTime            Runnable scheduledTask = pollScheduledTask(nanoTime);            if (scheduledTask == null) {                return true;            }            if (!taskQueue.offer(scheduledTask)) {                // taskQueue没有空间包容 则在将定时工作从新塞进定时工作队列中期待下次执行                scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);                return false;            }        }    }
  1. 获取以后要执行异步工作的工夫点nanoTime
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {    private static final long START_TIME = System.nanoTime();    static long nanoTime() {        return System.nanoTime() - START_TIME;    }}
  1. 从定时工作队列中找出deadline <= nanoTime的异步工作。也就是说找出所有到期的定时工作。
    protected final Runnable pollScheduledTask(long nanoTime) {        assert inEventLoop();        //从定时队列中取出要执行的定时工作  deadline <= nanoTime        ScheduledFutureTask<?> scheduledTask = peekScheduledTask();        if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {            return null;        }        //合乎取出条件 则取出        scheduledTaskQueue.remove();        scheduledTask.setConsumed();        return scheduledTask;    }
  1. 到期的定时工作插入到一般工作队列taskQueue中,如果taskQueue曾经没有空间包容新的工作,则将定时工作从新塞进定时工作队列中期待下次拉取。
            if (!taskQueue.offer(scheduledTask)) {                scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);                return false;            }
  1. fetchFromScheduledTaskQueue办法的返回值为true时示意到期的定时工作曾经全副拉取出来并转存到一般工作队列中。
    返回值为false时示意到期的定时工作只拉取出来一部分,因为这时一般工作队列曾经满了,当执行完一般工作时,还须要在进行一次拉取。

到期的定时工作从定时工作队列中拉取结束或者当一般工作队列已满时,这时就会进行拉取,开始执行一般工作队列中的异步工作

4.1.2 runAllTasksFrom

    protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {        Runnable task = pollTaskFrom(taskQueue);        if (task == null) {            return false;        }        for (;;) {            safeExecute(task);            task = pollTaskFrom(taskQueue);            if (task == null) {                return true;            }        }    }
  • 首先runAllTasksFrom 办法的返回值示意是否执行了至多一个异步工作。前面会赋值给ranAtLeastOne变量,这个返回值咱们后续会用到。
  • 从一般工作队列中拉取异步工作
    protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {        for (;;) {            Runnable task = taskQueue.poll();            if (task != WAKEUP_TASK) {                return task;            }        }    }
  • Reactor线程执行异步工作
    protected static void safeExecute(Runnable task) {        try {            task.run();        } catch (Throwable t) {            logger.warn("A task raised an exception. Task: {}", task, t);        }    }

4.1.3 afterRunningAllTasks

        if (ranAtLeastOne) {            lastExecutionTime = ScheduledFutureTask.nanoTime();        }        //执行尾部队列工作        afterRunningAllTasks();        return ranAtLeastOne;

如果Reactor线程执行了至多一个异步工作,那么设置lastExecutionTime,并将ranAtLeastOne标识返回。这里的ranAtLeastOne标识就是runAllTasksFrom办法的返回值。

最初执行收尾工作,也就是执行尾部工作队列中的尾部工作。

    @Override    protected void afterRunningAllTasks() {        runAllTasksFrom(tailTasks);    }

4.2 runAllTasks(long timeoutNanos)

这里在解决异步工作的外围逻辑还是和之前一样的,只不过就是多了对超时工夫的管制。

    protected boolean runAllTasks(long timeoutNanos) {        fetchFromScheduledTaskQueue();        Runnable task = pollTask();        if (task == null) {            //一般队列中没有工作时  执行队尾队列的工作            afterRunningAllTasks();            return false;        }        //异步工作执行超时deadline        final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;        long runTasks = 0;        long lastExecutionTime;        for (;;) {            safeExecute(task);            runTasks ++;            //每运行64个异步工作 检查一下 是否达到 执行deadline            if ((runTasks & 0x3F) == 0) {                lastExecutionTime = ScheduledFutureTask.nanoTime();                if (lastExecutionTime >= deadline) {                    //达到异步工作执行超时deadline,进行执行异步工作                    break;                }            }            task = pollTask();            if (task == null) {                lastExecutionTime = ScheduledFutureTask.nanoTime();                break;            }        }        afterRunningAllTasks();        this.lastExecutionTime = lastExecutionTime;        return true;    }
  • 首先还是通过fetchFromScheduledTaskQueue 办法Reactor中的定时工作队列中拉取到期的定时工作,转存到一般工作队列中。当一般工作队列已满或者到期定时工作全副拉取结束时,进行拉取。
  • ScheduledFutureTask.nanoTime() + timeoutNanos 作为Reactor线程执行异步工作的超时工夫点deadline
  • 因为零碎调用System.nanoTime()须要肯定的零碎开销,所以每执行完64异步工作的时候才会去检查一下执行工夫是否达到了deadline。如果达到了执行截止工夫deadline则退出进行执行异步工作。如果没有达到deadline则持续从一般工作队列中取出工作循环执行上来。
从这个细节又能够看出Netty对性能的考量还是相当考究的

流程走到这里,咱们就对Reactor的整个运行框架以及如何轮询IO就绪事件如何解决IO就绪事件如何执行异步工作的具体实现逻辑就分析完了。

上面还有一个小小的尾巴,就是Netty是如何解决文章结尾提到的JDK NIO Epoll 的空轮询BUG的,让咱们一起来看下吧~~~

5. 解决JDK Epoll空轮询BUG

前边提到,因为JDK NIO Epoll的空轮询BUG存在,这样会导致Reactor线程在没有任何事件可做的状况下被意外唤醒,导致CPU空转。

其实Netty也没有从根本上解决这个JDK BUG,而是抉择奇妙的绕过这个BUG

上面咱们来看下Netty是如何做到的。

                if (ranTasks || strategy > 0) {                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",                                selectCnt - 1, selector);                    }                    selectCnt = 0;                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)                    //既没有IO就绪事件,也没有异步工作,Reactor线程从Selector上被异样唤醒 触发JDK Epoll空轮训BUG                    //从新构建Selector,selectCnt归零                    selectCnt = 0;                }

Reactor线程解决完IO就绪事件异步工作后,会查看这次Reactor线程被唤醒有没有执行过异步工作和有没有IO就绪的Channel

  • boolean ranTasks 这时候就派上了用场,这个ranTasks正是前边咱们在讲runAllTasks办法时提到的返回值。用来示意是否执行过至多一次异步工作
  • int strategy 正是JDK NIO Selectorselect办法的返回值,用来示意IO就绪Channel个数

如果ranTasks = false 并且 strategy = 0这代表Reactor线程本次既没有异步工作执行也没有IO就绪Channel须要解决却被意外的唤醒。等于是空转了一圈啥也没干。

这种状况下Netty就会认为可能曾经触发了JDK NIO Epoll的空轮询BUG

    int SELECTOR_AUTO_REBUILD_THRESHOLD = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);    private boolean unexpectedSelectorWakeup(int selectCnt) {          ..................省略...............        /**         * 走到这里的条件是 既没有IO就绪事件,也没有异步工作,Reactor线程从Selector上被异样唤醒         * 这种状况可能是曾经触发了JDK Epoll的空轮询BUG,如果这种状况继续512次 则认为可能曾经触发BUG,于是重建Selector         *         * */        if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {            // The selector returned prematurely many times in a row.            // Rebuild the selector to work around the problem.            logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",                    selectCnt, selector);            rebuildSelector();            return true;        }        return false;    }
  • 如果Reactor这种意外唤醒的次数selectCnt 超过了配置的次数SELECTOR_AUTO_REBUILD_THRESHOLD ,那么Netty就会认定这种状况可能曾经触发了JDK NIO Epoll空轮询BUG,则重建Selector(将之前注册的所有Channel从新注册到新的Selector上并敞开旧的Selector),selectCnt计数0
SELECTOR_AUTO_REBUILD_THRESHOLD 默认为512,能够通过零碎变量-D io.netty.selectorAutoRebuildThreshold指定自定义数值。
  • 如果selectCnt小于SELECTOR_AUTO_REBUILD_THRESHOLD ,则返回不做任何解决,selectCnt持续计数。

Netty就这样通过计数Reactor被意外唤醒的次数,如果计数selectCnt达到了512次,则通过重建Selector 奇妙的绕开了JDK NIO Epoll空轮询BUG

咱们在日常开发中也能够借鉴Netty这种解决问题的思路,比方在我的项目开发中,当咱们发现咱们无奈保障彻底的解决一个问题时,或者为了解决这个问题导致咱们的投入产出比不高时,咱们就该思考是不是应该换一种思路去绕过这个问题,从而达到同样的成果。解决问题的最高境界就是不解决它,奇妙的绕过去~!!

总结

本文花了大量的篇幅介绍了Reactor整体的运行框架,并深刻介绍了Reactor外围的工作模块的具体实现逻辑。

通过本文的介绍咱们晓得了Reactor如何轮询注册在其上的所有Channel上感兴趣的IO事件,以及Reactor如何去解决IO就绪的事件,如何执行Netty框架中提交的异步工作和定时工作。

最初介绍了Netty如何奇妙的绕过JDK NIO Epoll空轮询的BUG,达到解决问题的目标。

提炼了新的解决问题的思路:解决问题的最高境界就是不解决它,奇妙的绕过去~!!

好了,本文的内容就到这里了,咱们下篇文章见~