关于后端:一文聊透-Netty-核心引擎-Reactor-的运转架构

46次阅读

共计 40288 个字符,预计需要花费 101 分钟才能阅读完成。

本系列 Netty 源码解析文章基于 4.1.56.Final版本

本文笔者来为大家介绍下 Netty 的外围引擎 Reactor 的运行架构,心愿通过本文的介绍可能让大家对 Reactor 是如何驱动着整个 Netty 框架的运行有一个全面的意识。也为咱们后续进一步介绍 Netty 对于解决网络申请的整个生命周期的相干内容做一个前置常识的铺垫,不便大家后续了解。

那么在开始本文正式的内容之前,笔者先来带着大家回顾下前边文章介绍的对于 Netty 整个框架如何搭建的相干内容,没有看过笔者前边几篇文章的读者敌人也没关系,这些并不会影响到本文的浏览,只不过波及到相干细节的局部,大家能够在回看下。

前文回顾

在《聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现 (创立篇)》一文中,咱们介绍了 Netty 服务端的外围引擎 主从 Reactor 线程组 的创立过程以及相干外围组件里的重要属性。在这个过程中,咱们还提到了 Netty 对各种细节进行的优化,比方针对 JDK NIO 原生 Selector 做的一些优化,展示了 Netty 对性能极致的谋求。最终咱们创立出了如下构造的 Reactor。

在上篇文章《具体图解 Netty Reactor 启动全流程》中,咱们残缺地介绍了 Netty 服务端启动的整个流程,并介绍了在启动过程中波及到的 ServerBootstrap 相干的属性以及配置形式。用于接管连贯的服务端 NioServerSocketChannel 的创立和初始化过程以及其类的继承构造。其中重点介绍了 NioServerSocketChannel 向 Reactor 的注册过程以及 Reactor 线程的启动机会和 pipeline 的初始化机会。最初介绍了 NioServerSocketChannel 绑定端口地址的整个流程。在这个过程中咱们理解了 Netty 的这些外围组件是如何串联起来的。

当 Netty 启动结束后,咱们失去了如下的框架结构:

主 Reactor 线程组中治理的是 NioServerSocketChannel 用于接管客户端连贯,并在本人的 pipeline 中的 ServerBootstrapAcceptor 里初始化接管到的客户端连贯,随后会将初始化好的客户端连贯注册到从 Reactor 线程组中。

从 Reactor 线程组次要负责监听解决注册其上的所有客户端连贯的 IO 就绪事件。

其中一个 Channel 只能调配给一个固定的 Reactor。一个 Reactor 负责解决多个 Channel 上的 IO 就绪事件,这样能够将服务端承载的 全量客户端连贯 摊派到多个 Reactor 中解决,同时也能保障Channel 上 IO 解决的线程安全性。Reactor 与 Channel 之间的对应关系如下图所示:

以上内容就是对笔者前边几篇文章的相干内容回顾,大家能回顾起来更好,回顾不起来也没关系,一点也不影响大家了解本文的内容。如果对相干细节感兴趣的同学,能够在浏览完本文之后,在去回看下。

咱们言归正传,正式开始本文的内容,笔者接下来会为大家介绍这些外围组件是如何相互配合从而驱动着整个 Netty Reactor 框架运行的。


当 Netty Reactor 框架启动结束后,接下来第一件事件也是最重要的事件就是如何来高效的接管客户端的连贯。

那么在探讨 Netty 服务端如何接管连贯之前,咱们须要弄清楚 Reactor 线程 的运行机制,它是如何监听并解决 Channel 上的 IO 就绪事件 的。

本文相当于是后续咱们介绍 Reactor 线程 监听解决 ACCEPT 事件Read 事件Write 事件 的前置篇,本文专一于讲述 Reactor 线程 的整个运行框架。了解了本文的内容,对了解前面 Reactor 线程 如何解决 IO 事件 会大有帮忙。

咱们在 Netty 框架的 创立阶段 启动阶段 无数次的提到了 Reactor 线程,那么在本文要介绍的 运行阶段 就该这个 Reactor 线程 来大显神威了。

通过前边文章的介绍,咱们理解到 Netty 中的 Reactor 线程 次要干三件事件:

  • 轮询注册在 Reactor 上的所有 Channel 感兴趣的IO 就绪事件
  • 解决 Channel 上的IO 就绪事件
  • 执行 Netty 中的异步工作。

正是这三个局部组成了 Reactor 的运行框架,那么咱们当初来看下这个运行框架具体是怎么运行的~~

Reactor 线程的整个运行框架

大家还记不记得笔者在《聊聊 Netty 那些事儿之从内核角度看 IO 模型》一文中提到的,IO 模型的演变 是围绕着 "如何用尽可能少的线程去治理尽可能多的连贯" 这一主题进行的。

Netty 的 IO 模型 是通过 JDK NIO Selector 实现的 IO 多路复用模型,而 Netty 的IO 线程模型主从 Reactor 线程模型

依据《聊聊 Netty 那些事儿之从内核角度看 IO 模型》一文中介绍的 IO 多路复用模型 咱们很容易就能了解到 Netty 会应用一个用户态的 Reactor 线程 去一直的通过 Selector 在内核态去轮训 Channel 上的IO 就绪事件

说白了 Reactor 线程 其实执行的就是一个 死循环 ,在 死循环 中一直的通过 Selector 去轮训 IO 就绪事件,如果产生IO 就绪事件 则从 Selector 零碎调用中返回并解决 IO 就绪事件,如果没有产生IO 就绪事件 则始终 阻塞 Selector零碎调用上,直到满足Selector 唤醒条件

以下三个条件中只有满足任意一个条件,Reactor 线程就会被从 Selector 上唤醒:

  • 当 Selector 轮询到有 IO 沉闷事件产生时。
  • 当 Reactor 线程须要执行的 定时工作 达到工作执行工夫 deadline 时。
  • 当有 异步工作 提交给 Reactor 时,Reactor 线程须要从 Selector 上被唤醒,这样能力及时的去执行 异步工作

这里能够看出 Netty 对 Reactor 线程 的压迫还是比拟狠的,反正当初也没有 IO 就绪事件 须要去解决,不能让 Reactor 线程 在这里白白等着,要立刻唤醒它,转去解决提交过去的异步工作以及定时工作。Reactor 线程 堪称 996 榜样 一刻不停歇地运作着。

在理解了 Reactor 线程 的大略运行框架后,咱们接下来就到源码中去看下它的外围运行框架是如何实现进去的。

因为这块源码比拟宏大繁冗,所以笔者先把它的运行框架提取进去,不便大家整体的了解整个运行过程的全貌。

上图所展现的就是 Reactor 整个工作体系的全貌,次要分为如下几个重要的工作模块:

  1. Reactor 线程在 Selector 上阻塞获取 IO 就绪事件。在这个模块中首先会去查看以后是否有异步工作须要执行,如果有异步须要执行,那么不论以后有没有 IO 就绪事件都不能阻塞在 Selector 上,随后会去非阻塞的轮询一下 Selector 上是否有 IO 就绪事件,如果有,正好能够和异步工作一起执行。优先解决 IO 就绪事件,在执行异步工作。
  2. 如果以后没有异步工作须要执行,那么 Reactor 线程会接着查看是否有定时工作须要执行,如果有则在 Selector 上阻塞直到定时工作的到期工夫 deadline,或者满足其余唤醒条件被唤醒。如果没有定时工作须要执行,Reactor 线程则会在 Selector 上始终阻塞直到满足唤醒条件。
  3. 当 Reactor 线程满足唤醒条件被唤醒后,首先会去判断以后是因为有 IO 就绪事件被唤醒还是因为有异步工作须要执行被唤醒或者是两者都有。随后 Reactor 线程就会去解决 IO 就绪事件和执行异步工作。
  4. 最初 Reactor 线程返回循环终点一直的反复上述三个步骤。

以上就是 Reactor 线程运行的整个外围逻辑,上面是笔者根据上述外围逻辑,将 Reactor 的整体代码设计框架提取进去,大家能够联合上边的 Reactor 工作流程图,从总体上先感触下整个源码实现框架,可能把 Reactor 的外围解决步骤和代码中相应的解决模块对应起来即可,这里不须要读懂每一行代码,要以逻辑解决模块为单位了解。前面笔者会将这些一个一个的逻辑解决模块在独自拎进去为大家具体介绍。

  @Override
    protected void run() {
        // 记录轮询次数 用于解决 JDK epoll 的空轮训 bug
        int selectCnt = 0;
        for (;;) {
            try {
                // 轮询后果
                int strategy;
                try {// 依据轮询策略获取轮询后果 这里的 hasTasks()次要查看的是一般队列和尾部队列中是否有异步工作期待执行
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // NIO 不反对自旋(BUSY_WAIT)case SelectStrategy.SELECT:

                      外围逻辑是有工作须要执行,则 Reactor 线程立马执行异步工作,如果没有异步工作执行,则进行轮询 IO 事件

                    default:
                    }
                } catch (IOException e) {................ 省略...............}

                执行到这里阐明满足了唤醒条件,Reactor 线程从 selector 上被唤醒开始解决 IO 就绪事件和执行异步工作
                /**
                 * Reactor 线程须要保障及时的执行异步工作,只有有异步工作提交,就须要退出轮询。* 有 IO 事件就优先解决 IO 事件,而后解决异步工作
                 * */

                selectCnt++;
                // 次要用于从 IO 就绪的 SelectedKeys 汇合中剔除曾经生效的 selectKey
                needsToSelectAgain = false;
                // 调整 Reactor 线程执行 IO 事件和执行异步工作的 CPU 工夫比例 默认 50,示意执行 IO 事件和异步工作的工夫比例是一比一
                final int ioRatio = this.ioRatio;
             
               这里次要解决 IO 就绪事件,以及执行异步工作
               须要优先解决 IO 就绪事件,而后依据 ioRatio 设置的解决 IO 事件 CPU 用时与异步工作 CPU 用时比例,来决定执行多长时间的异步工作

                // 判断是否触发 JDK Epoll BUG 触发空轮询
                if (ranTasks || strategy > 0) {if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) {// Unexpected wakeup (unusual case)
                    // 既没有 IO 就绪事件,也没有异步工作,Reactor 线程从 Selector 上被异样唤醒 触发 JDK Epoll 空轮训 BUG
                    // 从新构建 Selector,selectCnt 归零
                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) {................ 省略...............} catch (Error e) {................ 省略...............} catch (Throwable t) {................ 省略...............} finally {................ 省略...............}
        }
    }

从下面提取进去的 Reactor 的源码实现框架中,咱们能够看出 Reactor 线程 次要做了上面几个事件:

  1. 通过 JDK NIO Selector 轮询注册在 Reactor 上的所有 Channel 感兴趣的 IO 事件。对于 NioServerSocketChannel 来说因为它次要负责接管客户端连贯所以监听的是OP_ACCEPT 事件,对于客户端 NioSocketChannel 来说因为它次要负责解决连贯上的读写事件所以监听的是OP_READOP_WRITE事件。

这里须要留神的是 netty 只会主动注册 OP_READ 事件,而 OP_WRITE 事件 是在当 Socket 写入缓冲区以满无奈持续写入发送数据时由用户本人注册。

  1. 如果有异步工作须要执行,则立马进行轮询操作,转去执行异步工作。这里分为两种状况:

    • 既有 IO 就绪事件 产生,也有 异步工作 须要执行。则优先解决 IO 就绪事件,而后依据ioRatio 设置的 执行工夫比例 决定执行多长时间的异步工作。这里 Reactor 线程须要管制异步工作的执行工夫,因为 Reactor 线程的外围是解决 IO 就绪事件,不能因为异步工作的执行而耽搁了最重要的事件。
    • 没有 IO 就绪事件 产生,然而有异步工作或者定时工作到期须要执行。则只执行 异步工作,尽可能的去压迫 Reactor 线程。没有 IO 就绪事件产生也不能闲着。

      这里第二种状况下只会执行 64 个异步工作,目标是为了 避免适度 执行异步工作,耽搁了 最重要的事件 轮询 IO 事件

  2. 在最初 Netty 会判断本次 Reactor 线程 的唤醒是否是因为触发了 JDK epoll 空轮询 BUG 导致的,如果触发了该 BUG,则重建Selector。绕过 JDK BUG,达到解决问题的目标。

失常状况下 Reactor 线程从 Selector 中被唤醒有两种状况:

  • 轮询到有 IO 就绪事件产生。
  • 有异步工作或者定时工作须要执行。
    而 JDK epoll 空轮询 BUG 会在上述两种状况都没有产生的时候,Reactor 线程 会意外的从 Selector 中被唤醒,导致 CPU 空转。

JDK epoll 空轮询 BUG:https://bugs.java.com/bugdata…

好了,Reactor 线程 的总体运行构造框架咱们当初曾经理解了,上面咱们来深刻到这些外围解决模块中来各个击破它们~~

1. Reactor 线程轮询 IO 就绪事件

在《聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现 (创立篇)》一文中,笔者在讲述主从 Reactor 线程组NioEventLoopGroup 的创立过程的时候,提到一个结构器参数SelectStrategyFactory

   public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

  public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

Reactor 线程 最重要的一件事件就是 轮询 IO 就绪事件 SelectStrategyFactory 就是用于指定轮询策略的,默认实现为DefaultSelectStrategyFactory.INSTANCE

而在 Reactor 线程 开启轮询的一开始,就是用这个 selectStrategy 去计算一个 轮询策略 strategy ,后续会依据这个 strategy 进行不同的逻辑解决。

  @Override
    protected void run() {
        // 记录轮询次数 用于解决 JDK epoll 的空轮训 bug
        int selectCnt = 0;
        for (;;) {
            try {
                // 轮询后果
                int strategy;
                try {// 依据轮询策略获取轮询后果 这里的 hasTasks()次要查看的是一般队列和尾部队列中是否有异步工作期待执行
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // NIO 不反对自旋(BUSY_WAIT)case SelectStrategy.SELECT:

                      外围逻辑是有工作须要执行,则 Reactor 线程立马执行异步工作,如果没有异步工作执行,则进行轮询 IO 事件

                    default:
                    }
                } catch (IOException e) {................ 省略...............}

                ................ 省略...............
}

上面咱们来看这个 轮询策略 strategy 具体的计算逻辑是什么样的?

1.1 轮询策略

public interface SelectStrategy {

    /**
     * Indicates a blocking select should follow.
     */
    int SELECT = -1;
    /**
     * Indicates the IO loop should be retried, no blocking select to follow directly.
     */
    int CONTINUE = -2;
    /**
     * Indicates the IO loop to poll for new events without blocking.
     */
    int BUSY_WAIT = -3;

    int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
}

咱们首先来看下 Netty 中定义的这三种轮询策略:

  • SelectStrategy.SELECT:此时没有任何异步工作须要执行,Reactor 线程 能够安心的 阻塞 Selector上期待 IO 就绪事件 的降临。
  • SelectStrategy.CONTINUE:从新开启一轮IO 轮询
  • SelectStrategy.BUSY_WAIT: Reactor 线程进行 自旋轮询 ,因为NIO 不反对自旋操作,所以这里间接跳到SelectStrategy.SELECT 策略。

上面咱们来看下 轮询策略 的计算逻辑calculateStrategy

final class DefaultSelectStrategy implements SelectStrategy {static final SelectStrategy INSTANCE = new DefaultSelectStrategy();

    private DefaultSelectStrategy() {}

    @Override
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        /**
         * Reactor 线程要保障及时的执行异步工作
         * 1:如果有异步工作期待执行,则马上执行 selectNow()非阻塞轮询一次 IO 就绪事件
         * 2:没有异步工作,则跳到 switch select 分支
         * */
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;}
}
  • Reactor 线程 的轮询工作开始之前,须要首先判断下以后是否有 异步工作 须要执行。判断根据就是查看 Reactor 中的异步工作队列 taskQueue 和用于统计信息工作用的尾部队列 tailTask 是否有 异步工作
    @Override
    protected boolean hasTasks() {return super.hasTasks() || !tailTasks.isEmpty();}

   protected boolean hasTasks() {assert inEventLoop();
        return !taskQueue.isEmpty();}
  • 如果 Reactor 中有 异步工作 须要执行,那么 Reactor 线程 须要立刻执行,不能阻塞在 Selector 上。在返回前须要再顺带调用 selectNow() 非阻塞查看一下以后是否有 IO 就绪事件 产生。如果有,那么正好能够和 异步工作 一起被解决,如果没有,则及时地解决 异步工作

这里 Netty 要表白的语义是:首先 Reactor 线程须要优先保障 IO 就绪事件 的解决,而后在保障 异步工作 的及时执行。如果以后没有 IO 就绪事件然而有异步工作须要执行时,Reactor 线程就要去及时执行异步工作而不是持续阻塞在 Selector 上期待 IO 就绪事件。

   private final IntSupplier selectNowSupplier = new IntSupplier() {
        @Override
        public int get() throws Exception {return selectNow();
        }
    };

   int selectNow() throws IOException {
        // 非阻塞
        return selector.selectNow();}
  • 如果以后 Reactor 线程 没有异步工作须要执行,那么 calculateStrategy 办法间接返回 SelectStrategy.SELECT 也就是 SelectStrategy 接口 中定义的常量 -1。当calculateStrategy 办法通过 selectNow() 返回 非零 数值时,示意此时有 IO 就绪Channel,返回的数值示意有多少个 IO 就绪Channel
  @Override
    protected void run() {
        // 记录轮询次数 用于解决 JDK epoll 的空轮训 bug
        int selectCnt = 0;
        for (;;) {
            try {
                // 轮询后果
                int strategy;
                try {// 依据轮询策略获取轮询后果 这里的 hasTasks()次要查看的是一般队列和尾部队列中是否有异步工作期待执行
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // NIO 不反对自旋(BUSY_WAIT)case SelectStrategy.SELECT:

                      外围逻辑是有工作须要执行,则 Reactor 线程立马执行异步工作,如果没有异步工作执行,则进行轮询 IO 事件

                    default:
                    }
                } catch (IOException e) {................ 省略...............}

                ................ 解决 IO 就绪事件以及执行异步工作...............
}

从默认的轮询策略咱们能够看出 selectStrategy.calculateStrategy 只会返回三种状况:

  • 返回 -1: switch 逻辑分支进入 SelectStrategy.SELECT 分支,示意此时Reactor 中没有 异步工作 须要执行,Reactor 线程 能够安心的阻塞在 Selector 上期待 IO 就绪事件 产生。
  • 返回 0: switch 逻辑分支进入 default 分支,示意此时Reactor 中没有 IO 就绪事件 然而有 异步工作 须要执行,流程通过 default 分支 间接进入了解决 异步工作 的逻辑局部。
  • 返回 > 0:switch 逻辑分支进入 default 分支,示意此时Reactor 中既有 IO 就绪事件 产生也有 异步工作 须要执行,流程通过 default 分支 间接进入了解决 IO 就绪事件 和执行 异步工作 逻辑局部。

当初 Reactor 的流程解决逻辑走向咱们分明了,那么接下来咱们把重点放在 SelectStrategy.SELECT 分支中的轮询逻辑上。这块是 Reactor 监听 IO 就绪事件的外围。

1.2 轮询逻辑

                    case SelectStrategy.SELECT:
                        // 以后没有异步工作执行,Reactor 线程能够释怀的阻塞期待 IO 就绪事件

                        // 从定时工作队列中取出行将快要执行的定时工作 deadline
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            // - 1 代表以后定时工作队列中没有定时工作
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }

                        // 最早执行定时工作的 deadline 作为 select 的阻塞工夫,意思是到了定时工作的执行工夫
                        // 不论有无 IO 就绪事件,必须唤醒 selector,从而使 reactor 线程执行定时工作
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {if (!hasTasks()) {
                                // 再次查看一般工作队列中是否有异步工作
                                // 没有的话开始 select 阻塞轮询 IO 就绪事件
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // 执行到这里阐明 Reactor 曾经从 Selector 上被唤醒了
                            // 设置 Reactor 的状态为昏迷状态 AWAKE
                            // lazySet 优化不必要的 volatile 操作,不应用内存屏障,不保障写操作的可见性(单线程不须要保障)nextWakeupNanos.lazySet(AWAKE);
                        }

流程走到这里,阐明当初 Reactor 上没有任何事件可做,能够安心的 阻塞 Selector上期待 IO 就绪事件 到来。

那么 Reactor 线程 到底应该在 Selector 上阻塞多久呢??

在答复这个问题之前,咱们在回顾下《聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现 (创立篇)》一文中在讲述Reactor 的创立 时提到,Reactor 线程 除了要轮询 Channel 上的 IO 就绪事件,以及解决IO 就绪事件 外,还有一个工作就是负责执行 Netty 框架中的 异步工作

而 Netty 框架中的 异步工作 分为三类:

  • 寄存在一般工作队列 taskQueue 中的一般异步工作。
  • 寄存在尾部队列 tailTasks 中的用于执行统计工作等收尾动作的尾部工作。
  • 还有一种就是这里行将提到的 定时工作 。寄存在Reactor 中的定时工作队列 scheduledTaskQueue 中。

从 ReactorNioEventLoop 类 中的继承构造咱们也能够看出,Reactor具备执行定时工作的能力。

既然 Reactor 须要执行定时工作,那么它就不能始终 阻塞 Selector上有限期待IO 就绪事件

那么咱们回到本大节一开始提到的问题上,为了保障 Reactor 可能及时地执行 定时工作 Reactor 线程 须要在行将要执行的的第一个定时工作 deadline 达到之前被唤醒。

所以在 Reactor 线程开始轮询 IO 就绪事件 之前,咱们须要首先计算出来 Reactor 线程Selector上的阻塞超时工夫。

1.2.1 Reactor 的轮询超时工夫

首先咱们须要从 Reactor 的定时工作队列 scheduledTaskQueue 中取出行将快要执行的定时工作 deadline。将这个deadline 作为 Reactor 线程Selector上轮询的超时工夫。这样能够保障在定时工作行将要执行时,Reactor 当初能够及时的从 Selector 上被唤醒。

    private static final long AWAKE = -1L;
    private static final long NONE = Long.MAX_VALUE;

    // nextWakeupNanos is:
    //    AWAKE            when EL is awake
    //    NONE             when EL is waiting with no wakeup scheduled
    //    other value T    when EL is waiting with wakeup scheduled at time T
    private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);

      long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
      if (curDeadlineNanos == -1L) {
            // - 1 代表以后定时工作队列中没有定时工作
            curDeadlineNanos = NONE; // nothing on the calendar
      }

      nextWakeupNanos.set(curDeadlineNanos);
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {

    PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;

    protected final long nextScheduledTaskDeadlineNanos() {ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
        return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;}

    final ScheduledFutureTask<?> peekScheduledTask() {
        Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
        return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;}

}

nextScheduledTaskDeadlineNanos 办法会返回以后 Reactor 定时工作队列中最近的一个定时工作 deadline 工夫点,如果定时工作队列中没有定时工作,则返回-1

NioEventLoopnextWakeupNanos 变量用来寄存 Reactor 从 Selector 上被唤醒的工夫点,设置为最近须要被执行定时工作的 deadline,如果以后并没有定时工作须要执行,那么就设置为Long.MAX_VALUE 始终阻塞,直到有 IO 就绪事件 达到或者有 异步工作 须要执行。

1.2.2 Reactor 开始轮询 IO 就绪事件

     if (!hasTasks()) {
             // 再次查看一般工作队列中是否有异步工作,没有的话  开始 select 阻塞轮询 IO 就绪事件
            strategy = select(curDeadlineNanos);
     }

Reactor 线程 开始 阻塞 轮询 IO 就绪事件 之前还须要再次检查一下是否有 异步工作 须要执行。

如果此时凑巧有 异步工作 提交,就须要进行 IO 就绪事件 的轮询,转去执行 异步工作 。如果没有 异步工作,则正式开始轮询IO 就绪事件

    private int select(long deadlineNanos) throws IOException {if (deadlineNanos == NONE) {
            // 无定时工作,无一般工作执行时,开始轮询 IO 就绪事件,没有就始终阻塞 直到唤醒条件成立
            return selector.select();}

        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;

        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
    }

如果 deadlineNanos == NONE,通过上大节的介绍,咱们晓得NONE
示意以后 Reactor 中并没有定时工作,所以能够安心的 阻塞 Selector上期待 IO 就绪事件 到来。

selector.select()调用是一个阻塞调用,如果没有 IO 就绪事件Reactor 线程 就会始终阻塞在这里直到 IO 就绪事件 到来。这里占时不思考前边提到的JDK NIO Epoll 的空轮询 BUG.

读到这里那么问题来了,此时 Reactor 线程 正阻塞在 selector.select() 调用上期待 IO 就绪事件 的到来,如果此时正好有 异步工作 被提交到 Reactor 中须要执行,并且此时无任何 IO 就绪事件,而Reactor 线程 因为没有 IO 就绪事件 到来,会持续在这里阻塞,那么如何去执行 异步工作 呢??

解铃还须系铃人,既然 异步工作 在被提交后心愿立马失去执行,那么就在提交 异步工作 的时候去唤醒Reactor 线程

    //addTaskWakesUp = true 示意 当且仅当只有调用 addTask 办法时 才会唤醒 Reactor 线程
    //addTaskWakesUp = false 示意 并不是只有 addTask 办法能力唤醒 Reactor 还有其余办法能够唤醒 Reactor 默认设置 false
    private final boolean addTaskWakesUp;

    private void execute(Runnable task, boolean immediate) {boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            // 如果以后线程不是 Reactor 线程,则启动 Reactor 线程
            // 这里能够看出 Reactor 线程的启动是通过 向 NioEventLoop 增加异步工作时启动的
            startThread();
            ..................... 省略...................
        }

        if (!addTaskWakesUp && immediate) {
            //io.netty.channel.nio.NioEventLoop.wakeup
            wakeup(inEventLoop);
        }
    }

对于 execute 办法 我想大家肯定不会生疏,在上篇文章《具体图解 Netty Reactor 启动全流程》中咱们在介绍 Reactor 线程的启动 时介绍过该办法。

在启动过程中波及到的重要操作 Register 操作Bind 操作 都须要封装成 异步工作 通过该办法提交到 Reactor 中执行。

这里咱们将重点放在 execute 办法 后半段 wakeup 逻辑局部。

咱们先介绍下和 wakeup 逻辑相干的两个参数 boolean immediateboolean addTaskWakesUp

  • immediate:示意提交的 task 是否须要被立刻执行。Netty 中只有你提交的工作类型不是 LazyRunnable 类型的工作,都是须要立刻执行的。immediate = true
  • addTaskWakesUp : true 示意 当且仅当只有 调用 addTask 办法时才会唤醒 Reactor 线程。调用别的办法并不会唤醒Reactor 线程
    在初始化 NioEventLoop 时会设置为 false,示意 并不是只有 addTask 办法能力唤醒Reactor 线程 还有其余办法能够唤醒Reactor 线程,比方这里的execute 办法 就会唤醒Reactor 线程

针对 execute 办法中的这个唤醒条件!addTaskWakesUp && immediatenetty 这里要表白的语义是:当 immediate 参数为 true 的时候示意该异步工作须要立刻执行,addTaskWakesUp 默认设置为 false 示意不仅只有 addTask 办法能够唤醒 Reactor,还有其余办法比方这里的 execute 办法也能够唤醒。然而当设置为 true 时,语义就变为只有 addTask 才能够唤醒 Reactor,即便 execute 办法里的 immediate = true 也不能唤醒 Reactor,因为执行的是 execute 办法而不是 addTask 办法。

    private static final long AWAKE = -1L;
    private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);

    protected void wakeup(boolean inEventLoop) {if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
            // 将 Reactor 线程从 Selector 上唤醒
            selector.wakeup();}
    }

nextWakeupNanos = AWAKE 时示意以后 Reactor 正处于昏迷状态,既然是昏迷状态也就没有必要去执行 selector.wakeup() 反复唤醒 Reactor 了,同时也能省去这一次的零碎调用开销。

在《1.2 大节 轮询逻辑》开始介绍的源码实现框架里 Reactor 被唤醒之后执行代码会进入 finally{...} 语句块中,在那里会将 nextWakeupNanos 设置为AWAKE

                        try {if (!hasTasks()) {strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // 执行到这里阐明 Reactor 曾经从 Selector 上被唤醒了
                            // 设置 Reactor 的状态为昏迷状态 AWAKE
                            // lazySet 优化不必要的 volatile 操作,不应用内存屏障,不保障写操作的可见性(单线程不须要保障)nextWakeupNanos.lazySet(AWAKE);
                        }

这里 Netty 用了一个 AtomicLong 类型 的变量 nextWakeupNanos,既能示意以后Reactor 线程 的状态,又能示意 Reactor 线程 的阻塞超时工夫。咱们在日常开发中也能够学习下这种技巧。


咱们持续回到 Reactor 线程 轮询 IO 就绪事件 的主线上。

    private int select(long deadlineNanos) throws IOException {if (deadlineNanos == NONE) {
            // 无定时工作,无一般工作执行时,开始轮询 IO 就绪事件,没有就始终阻塞 直到唤醒条件成立
            return selector.select();}

        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;

        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
    }

deadlineNanos 不为 NONE,示意此时Reactor定时工作 须要执行,Reactor 线程 须要阻塞在 Selector 上期待 IO 就绪事件 直到最近的一个定时工作执行工夫点 deadline 达到。

这里的 deadlineNanos 示意的就是 Reactor 中最近的一个定时工作执行工夫点 deadline,单位是 纳秒 。指的是一个 相对工夫

而咱们须要计算的是 Reactor 线程 阻塞在 Selector 的超时工夫 timeoutMillis,单位是 毫秒 ,指的是一个 绝对工夫

所以在 Reactor 线程 开始阻塞在 Selector 上之前,咱们须要将这个单位为 纳秒 的相对工夫 deadlineNanos 转化为单位为 毫秒 的绝对工夫timeoutMillis

    private int select(long deadlineNanos) throws IOException {if (deadlineNanos == NONE) {
            // 无定时工作,无一般工作执行时,开始轮询 IO 就绪事件,没有就始终阻塞 直到唤醒条件成立
            return selector.select();}

        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;

        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
    }

这里大家可能会好奇,通过 deadlineToDelayNanos 办法 计算 timeoutMillis 的时候,为什么要给 deadlineNanos 在加上 0.995 毫秒 呢??

大家设想一下这样的场景,当最近的一个定时工作的 deadline 行将在 5 微秒 内达到,那么这时将纳秒转换成毫秒计算出的 timeoutMillis 会是0

而在 Netty 中 timeoutMillis = 0 要表白的语义是:定时工作执行工夫曾经达到 deadline 工夫点,须要被执行。

而现实情况是定时工作还有 5 微秒 才可能达到 deadline,所以对于这种状况,须要在deadlineNanos 在加上 0.995 毫秒 凑成 1 毫秒 不能让其为 0。

所以从这里咱们能够看出,Reactor在有定时工作的状况下,至多要阻塞 1 毫秒

public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {protected static long deadlineToDelayNanos(long deadlineNanos) {return ScheduledFutureTask.deadlineToDelayNanos(deadlineNanos);
    }
}
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {static long deadlineToDelayNanos(long deadlineNanos) {return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - nanoTime());
    }

    // 启动工夫点
    private static final long START_TIME = System.nanoTime();

    static long nanoTime() {return System.nanoTime() - START_TIME;
    }

    static long deadlineNanos(long delay) {
        // 计算定时工作执行 deadline  去除启动工夫
        long deadlineNanos = nanoTime() + delay;
        // Guard against overflow
        return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
    }

}

这里须要留神一下,在创立定时工作时会通过 deadlineNanos 办法 计算定时工作的执行 deadlinedeadline 的计算逻辑是 以后工夫点 + 工作延时 delay系统启动工夫 这里须要扣除系统启动的工夫

所以这里在通过 deadline 计算延时 delay(也就是 timeout)的时候须要在加上 系统启动的工夫 : deadlineNanos - nanoTime()

当通过 deadlineToDelayNanos 计算出的 timeoutMillis <= 0 时,示意 Reactor 目前有邻近的 定时工作 须要执行,这时候就须要立马返回,不能阻塞在 Selector 上影响 定时工作 的执行。当然在返回执行 定时工作 前,须要在棘手通过 selector.selectNow() 非阻塞轮询一下 Channel 上是否有 IO 就绪事件 达到,避免耽搁 IO 事件 的解决。真是操碎了心~~

timeoutMillis > 0 时,Reactor 线程 就能够安心的阻塞在 Selector 上期待 IO 事件 的到来,直到 timeoutMillis 超时工夫达到。

timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis)

当注册在 Reactor 上的 Channel 中有 IO 事件 到来时,Reactor 线程 就会从 selector.select(timeoutMillis) 调用中唤醒,立刻去解决IO 就绪事件

这里假如一种极其状况,如果最近的一个定时工作的 deadline 是在将来很远的一个工夫点,这样就会使 timeoutMillis 的工夫十分十分久,那么 Reactor 岂不是会始终阻塞在 Selector 上造成 Netty 无奈工作?

笔者感觉大家当初心里应该曾经有了答案,咱们在《1.2.2 Reactor 开始轮询 IO 就绪事件》大节一开始介绍过,当 Reactor 正在 Selector 上阻塞时,如果此时用户线程向 Reactor 提交了异步工作,Reactor 线程会通过 execute 办法被唤醒。


流程到这里,Reactor 中最重要也是最外围的逻辑:轮询 Channel 上的 IO 就绪事件 的解决流程咱们就解说完了。

当 Reactor 轮询到有 IO 沉闷事件或者有异步工作须要执行时,就会从 Selector 上被唤醒,上面就到了该介绍 Reactor 被唤醒之后是如何解决 IO 就绪事件 以及如何执行 异步工作 的时候了。

Netty 毕竟是一个网络框架,所以它会优先去解决 Channel 上的 IO 事件,基于这个事实,所以 Netty 不会容忍 异步工作 被无限度的执行从而影响IO 吞吐

Netty 通过 ioRatio 变量 来调配 Reactor 线程 在解决 IO 事件 和执行 异步工作 之间的 CPU 工夫 分配比例。

上面咱们就来看下这个执行工夫比例的调配逻辑是什么样的~~~

2. Reactor 解决 IO 与解决异步工作的工夫比例调配

无论什么时候,当有 IO 就绪事件 到来时,Reactor都须要保障 IO 事件 被及时残缺的解决完,而 ioRatio 次要限度的是执行 异步工作 所需用时,避免 Reactor 线程 解决 异步工作 工夫过长而导致 I/O 事件 得不到及时地解决。

                // 调整 Reactor 线程执行 IO 事件和执行异步工作的 CPU 工夫比例 默认 50,示意执行 IO 事件和异步工作的工夫比例是一比一
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) { // 先一股脑执行 IO 事件,在一股脑执行异步工作(无工夫限度)try {if (strategy > 0) {
                            // 如果有 IO 就绪事件 则解决 IO 就绪事件
                            processSelectedKeys();}
                    } finally {
                        // Ensure we always run tasks.
                        // 解决所有异步工作
                        ranTasks = runAllTasks();}
                } else if (strategy > 0) {// 先执行 IO 事件 用时 ioTime  执行异步工作只能用时 ioTime * (100 - ioRatio) / ioRatio
                    final long ioStartTime = System.nanoTime();
                    try {processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        // 限定在超时工夫内 解决无限的异步工作 避免 Reactor 线程解决异步工作工夫过长而导致 I/O 事件阻塞
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else { // 没有 IO 就绪事件处理,则只执行异步工作 最多执行 64 个 避免 Reactor 线程解决异步工作工夫过长而导致 I/O 事件阻塞
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }
  • ioRatio = 100 时,示意无需思考执行工夫的限度,当有 IO 就绪事件 时(strategy > 0Reactor 线程 须要优先解决 IO 就绪事件,解决完IO 事件 后,执行所有的 异步工作 包含:一般工作,尾部工作,定时工作。无工夫限度。

strategy的数值示意 IO 就绪Channel个数。它是前边介绍的 io.netty.channel.nio.NioEventLoop#select 办法的返回值。

  • ioRatio 设置的值不为 100 时,默认为 50。须要先统计出执行IO 事件 的用时 ioTime ,依据ioTime * (100 - ioRatio) / ioRatio 计算出,前面执行 异步工作 的限度工夫。也就是说 Reactor 线程 须要在这个限定的工夫内,执行无限的异步工作,避免 Reactor 线程 因为解决 异步工作 工夫过长而导致 I/O 事件 得不到及时地解决。

默认状况下,执行 IO 事件 用时和执行 异步工作 用时比例设置的是一比一。
ioRatio设置的越高,则 Reactor 线程 执行异步工作的工夫占比 越小

要想得到 Reactor 线程 执行 异步工作 所需的工夫限度,必须晓得执行 IO 事件 的用时 ioTime 而后在依据 ioRatio 计算出执行 异步工作 的工夫限度。

那如果此时并没有 IO 就绪事件 须要 Reactor 线程 解决的话,这种状况下咱们无奈失去 ioTime,那怎么失去执行 异步工作 的限度工夫呢??

在这种非凡状况下,Netty 只容许 Reactor 线程 最多执行 64 个异步工作,而后就完结执行。转去持续轮训 IO 就绪事件。外围目标还是避免Reactor 线程 因为解决 异步工作 工夫过长而导致 I/O 事件 得不到及时地解决。

默认状况下,当 Reactor异步工作 须要解决然而没有 IO 就绪事件 时,Netty 只会容许 Reactor 线程 执行最多 64 个异步工作。


当初咱们对 Reactor 解决 IO 事件异步工作 的整体框架曾经理解了,上面咱们就来别离介绍下 Reactor 线程 在解决 IO 事件异步工作 的具体逻辑是什么样的?

3. Reactor 线程解决 IO 就绪事件

    // 该字段为持有 selector 对象 selectedKeys 的援用,当 IO 事件就绪时,间接从这里获取
   private SelectedSelectionKeySet selectedKeys;

   private void processSelectedKeys() {
        // 是否采纳 netty 优化后的 selectedKey 汇合类型 是由变量 DISABLE_KEY_SET_OPTIMIZATION 决定的 默认为 false
        if (selectedKeys != null) {processSelectedKeysOptimized();
        } else {processSelectedKeysPlain(selector.selectedKeys());
        }
    }

看到这段代码大家眼生吗??

不知大家还记不记得咱们在《聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现 (创立篇)》一文中介绍 Reactor NioEventLoop 类 在创立 Selector 的过程中提到,出于对 JDK NIO SelectorselectedKeys 汇合 插入 遍历 操作性能的思考 Netty 将本人用数组实现的 SelectedSelectionKeySet 汇合 替换掉了 JDK NIO SelectorselectedKeys HashSet 实现。

public abstract class SelectorImpl extends AbstractSelector {

    // The set of keys with data ready for an operation
    // //IO 就绪的 SelectionKey(外面包裹着 channel)protected Set<SelectionKey> selectedKeys;

    // The set of keys registered with this Selector
    // 注册在该 Selector 上的所有 SelectionKey(外面包裹着 channel)protected HashSet<SelectionKey> keys;

    ............... 省略...................
}

Netty 中通过优化开关 DISABLE_KEY_SET_OPTIMIZATION 管制是否对 JDK NIO Selector 进行优化。默认是须要优化。

在优化开关开启的状况下,Netty 会将创立的 SelectedSelectionKeySet 汇合 保留在 NioEventLoopprivate SelectedSelectionKeySet selectedKeys字段中,不便 Reactor 线程 间接从这里获取 IO 就绪SelectionKey

在优化开关敞开的状况下,Netty 会间接采纳 JDK NIO Selector 的默认实现。此时 NioEventLoopselectedKeys字段就会为null

遗记这段的同学能够在回顾下《聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现 (创立篇)》一文中对于Reactor 的创立过程。

通过对前边内容的回顾,咱们看到了在 Reactor 解决 IO 就绪事件 的逻辑也分为两个局部,一个是通过 Netty 优化的,一个是采纳 JDK 原生 的。

咱们先来看采纳 JDK 原生Selector的解决形式,了解了这种形式,在看 Netty 优化的形式会更加容易。

3.1 processSelectedKeysPlain

咱们在《聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现 (创立篇)》一文中介绍JDK NIO Selector 的工作过程时讲过,当注册在 Selector 上的 Channel 产生 IO 就绪事件 时,Selector会将 IO 就绪SelectionKey插入到 Set<SelectionKey> selectedKeys 汇合中。

这时 Reactor 线程 会从 java.nio.channels.Selector#select(long) 调用中返回。随后调用 java.nio.channels.Selector#selectedKeys 获取 IO 就绪SelectionKey汇合。

所以 Reactor 线程 在调用 processSelectedKeysPlain 办法 解决 IO 就绪事件 之前须要调用 selector.selectedKeys() 去获取所有 IO 就绪SelectionKeys

processSelectedKeysPlain(selector.selectedKeys())
    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {if (selectedKeys.isEmpty()) {return;}

        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {final SelectionKey k = i.next();
            final Object a = k.attachment();
            // 留神每次迭代开端的 keyIterator.remove()调用。Selector 不会本人从已选择键集中移除 SelectionKey 实例。// 必须在解决完通道时本人移除。下次该通道变成就绪时,Selector 会再次将其放入已选择键集中。i.remove();

            if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);
            } else {@SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (!i.hasNext()) {break;}

            // 目标是再次进入 for 循环 移除生效的 selectKey(socketChannel 可能从 selector 上移除)
            if (needsToSelectAgain) {selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {break;} else {i = selectedKeys.iterator();
                }
            }
        }
    }

3.1.1 获取 IO 就绪的 Channel

Set<SelectionKey> selectedKeys汇合外面装的全副是 IO 就绪SelectionKey,留神,此时 Set<SelectionKey> selectedKeys 的实现类型为HashSet 类型。因为咱们这里首先介绍的是 JDK NIO 原生实现。

通过获取 HashSet 的迭代器,开始一一解决 IO 就绪Channel

Iterator<SelectionKey> i = selectedKeys.iterator();
final SelectionKey k = i.next();
final Object a = k.attachment();

大家还记得这个 SelectionKey 中的 attachment 属性 里寄存的是什么吗??

在上篇文章《具体图解 Netty Reactor 启动全流程》中咱们在讲 NioServerSocketChannelMain Reactor注册的时候,通过 this 指针将本人作为 SelectionKeyattachment 属性 注册到 Selector 中。这一步实现了 Netty 自定义 ChannelJDK NIO Channel的绑定

public abstract class AbstractNioChannel extends AbstractChannel {

    //channel 注册到 Selector 后取得的 SelectKey
    volatile SelectionKey selectionKey;

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {............... 省略....................}
        }
    }

}

而咱们也提到 SelectionKey 就相当于是 ChannelSelector中的一种示意,当 Channel 上有 IO 就绪事件 时,Selector会将 Channel 对应的 SelectionKey 返回给 Reactor 线程,咱们能够通过返回的这个SelectionKey 里的 attachment 属性 获取到对应的 Netty 自定义Channel

对于客户端连贯事件(OP_ACCEPT)沉闷时,这里的 Channel 类型NioServerSocketChannel
对于客户端读写事件(ReadWrite)沉闷时,这里的 Channel 类型NioSocketChannel

当咱们通过 k.attachment() 获取到 Netty 自定义的 Channel 时,就须要把这个 Channel 对应的 SelectionKeySelector的就绪汇合 Set<SelectionKey> selectedKeys 中删除。因为 Selector 本人不会被动删除曾经解决完的 SelectionKey,须要调用者本人被动删除,这样当这个 Channel 再次 IO 就绪时Selector 会再次将这个 Channel 对应的 SelectionKey 放入就绪汇合 Set<SelectionKey> selectedKeys 中。

i.remove();

3.1.2 解决 Channel 上的 IO 事件

     if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);
     } else {@SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
     }

从这里咱们能够看出 Netty 向 SelectionKey 中的 attachment 属性 附加的对象分为两种:

  • 一种是咱们相熟的 Channel,无论是服务端应用的NioServerSocketChannel 还是客户端应用的 NioSocketChannel 都属于 AbstractNioChannel Channel 上的 IO 事件 是由 Netty 框架负责解决,也是本大节咱们要重点介绍的
  • 另一种就是 NioTask,这种类型是 Netty 提供给用户能够自定义一些当Channel 上产生 IO 就绪事件 时的自定义解决。

public interface NioTask<C extends SelectableChannel> {
    /**
     * Invoked when the {@link SelectableChannel} has been selected by the {@link Selector}.
     */
    void channelReady(C ch, SelectionKey key) throws Exception;

    /**
     * Invoked when the {@link SelectionKey} of the specified {@link SelectableChannel} has been cancelled and thus
     * this {@link NioTask} will not be notified anymore.
     *
     * @param cause the cause of the unregistration. {@code null} if a user called {@link SelectionKey#cancel()} or
     *              the event loop has been shut down.
     */
    void channelUnregistered(C ch, Throwable cause) throws Exception;
}

NioTaskChannel 其实实质上是一样的都是负责解决 Channel 上的 IO 就绪事件,只不过一个是 用户自定义解决 ,一个是 Netty 框架解决。这里咱们重点关注ChannelIO 解决逻辑


    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        // 获取 Channel 的底层操作类 Unsafe
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {...... 如果 SelectionKey 曾经生效则敞开对应的 Channel......}

        try {
            // 获取 IO 就绪事件
            int readyOps = k.readyOps();
            // 解决 Connect 事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {int ops = k.interestOps();
                // 移除对 Connect 事件的监听,否则 Selector 会始终告诉
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                // 触发 channelActive 事件处理 Connect 事件
                unsafe.finishConnect();}

            // 解决 Write 事件
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();}

             // 解决 Read 事件或者 Accept 事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();
            }
        } catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());
        }
    }
  • 首先咱们须要获取 IO 就绪 Channel 底层的操作类 Unsafe,用于对具体IO 就绪事件 的解决。

这里能够看出,Netty 对 IO 就绪事件 的解决全副封装在 Unsafe 类 中。比方:对 OP_ACCEPT 事件 的具体解决逻辑是封装在 NioServerSocketChannel 中的 UnSafe 类 中。对 OP_READ 或者 OP_WRITE 事件 的解决是封装在 NioSocketChannel 中的 Unsafe 类 中。

  • Selectionkey 中获取具体IO 就绪事件 readyOps

SelectonKey中对于 IO 事件 的汇合有两个。一个是 interestOps, 用于记录Channel 感兴趣的 IO 事件,在ChannelSelector注册结束后,通过 pipeline 中的 HeadContext 节点的 ChannelActive 事件回调 中增加。上面这段代码就是在 ChannelActive 事件回调 中 Channel 在向 Selector 注册本人感兴趣的 IO 事件。

    public abstract class AbstractNioChannel extends AbstractChannel {
             @Override
              protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was called
                    final SelectionKey selectionKey = this.selectionKey;
                    if (!selectionKey.isValid()) {return;}

                    readPending = true;

                    final int interestOps = selectionKey.interestOps();
                    /**
                       * 1:ServerSocketChannel 初始化时 readInterestOp 设置的是 OP_ACCEPT 事件
                       * 2:SocketChannel 初始化时 readInterestOp 设置的是 OP_READ 事件
                     * */
                    if ((interestOps & readInterestOp) == 0) {
                        // 注册监听 OP_ACCEPT 或者 OP_READ 事件
                        selectionKey.interestOps(interestOps | readInterestOp);
                    }
              }
    }

另一个就是这里的 readyOps,用于记录在Channel 感兴趣的 IO 事件 中具体哪些 IO 事件 就绪了。

Netty 中将各种事件的汇合用一个 int 型 变量来保留。

  • & 操作判断,某个事件是否在事件汇合中:(readyOps & SelectionKey.OP_CONNECT) != 0,这里就是判断 Channel 是否对 Connect 事件感兴趣。
  • | 操作向事件汇合中增加事件:interestOps | readInterestOp
  • 从事件汇合中删除某个事件,是通过先将要删除事件取反 ~,而后在和事件汇合做& 操作:ops &= ~SelectionKey.OP_CONNECT

Netty 这种对空间的极致利用思维,很值得咱们平时在日常开发中学习~~


当初咱们曾经晓得哪些 Channel 当初处于 IO 就绪状态,并且晓得了具体哪些类型的IO 事件 曾经就绪。

上面就该针对 Channel 上的不同 IO 就绪事件 做出相应的解决了。

3.1.2.1 解决 Connect 事件

Netty 客户端向服务端发动连贯,并向客户端的 Reactor 注册 Connect 事件,当连贯建设胜利后,客户端的NioSocketChannel 就会产生Connect 就绪事件,通过后面内容咱们讲的Reactor 的运行框架,最终流程会走到这里。

      if ((readyOps & SelectionKey.OP_CONNECT) != 0) {int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                // 触发 channelActive 事件
                unsafe.finishConnect();}

如果 IO 就绪 的事件是 Connect 事件,那么就调用对应客户端NioSocketChannel 中的 Unsafe 操作类 中的 finishConnect 办法 解决 Connect 事件。这时会在 Netty 客户端NioSocketChannel 中的 pipeline 中流传ChannelActive 事件

最初须要将 OP_CONNECT 事件 从客户端 NioSocketChannel 所关怀的事件汇合 interestOps 中删除。否则 Selector 会始终告诉Connect 事件就绪

3.1.2.2 解决 Write 事件

对于 Reactor 线程 解决 Netty 中的 Write 事件 的流程,笔者后续会专门用一篇文章来为大家介绍。本文咱们重点关注 Reactor 线程 的整体运行框架。

      if ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();}

这里大家只须要记住,OP_WRITE 事件的注册是由用户来实现的,当 Socket 发送缓冲区已满无奈持续写入数据时,用户会向 Reactor 注册 OP_WRITE 事件,等到 Socket 发送缓冲区变得可写时,Reactor 会收到 OP_WRITE 事件沉闷告诉,随后在这里调用客户端 NioSocketChannel 中的 forceFlush 办法 将残余数据发送进来。

3.1.2.3 解决 Read 事件或者 Accept 事件

      if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();
     }

这里能够看出 Netty 中解决 Read 事件Accept 事件 都是由对应 Channel 中的 Unsafe 操作类 中的 read 办法 解决。

服务端 NioServerSocketChannel 中的 Read 办法 解决的是 Accept 事件,客户端NioSocketChannel 中的 Read 办法 解决的是Read 事件

这里大家只需记住各个 IO 事件 在对应 Channel 中的解决入口,后续文章咱们会详细分析这些入口函数。

3.1.3 从 Selector 中移除生效的 SelectionKey

            // 用于及时从 selectedKeys 中革除生效的 selectKey 比方 socketChannel 从 selector 上被用户移除
            private boolean needsToSelectAgain;

             // 目标是再次进入 for 循环 移除生效的 selectKey(socketChannel 可能被用户从 selector 上移除)
            if (needsToSelectAgain) {selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {break;} else {i = selectedKeys.iterator();
                }
            }

在前边介绍 Reactor 运行框架 的时候,咱们看到在每次 Reactor 线程 轮询完结,筹备解决 IO 就绪事件 以及 异步工作 的时候,都会将 needsToSelectAgain 设置为false

那么这个 needsToSelectAgain 到底是干嘛的?以及为什么咱们须要去 “Select Again” 呢?

首先咱们来看下在什么状况下会将 needsToSelectAgain 这个变量设置为true,通过这个设置的过程,咱们是否可能从中找到一些线索?

咱们晓得 Channel 能够将本人注册到 Selector 上,那么当然也能够将本人从 Selector 上勾销移除。

在上篇文章中咱们也花了大量的篇幅解说了这个注册的过程,当初咱们来看下 Channel 的勾销注册。

public abstract class AbstractNioChannel extends AbstractChannel {

   //channel 注册到 Selector 后取得的 SelectKey
    volatile SelectionKey selectionKey;

    @Override
    protected void doDeregister() throws Exception {eventLoop().cancel(selectionKey());
    }

    protected SelectionKey selectionKey() {
        assert selectionKey != null;
        return selectionKey;
    }
}

Channel勾销注册的过程很简略,间接调用 NioChanneldoDeregister 办法,Channel绑定的 Reactor 会将其从 Selector 中勾销并进行监听 Channel 上的IO 事件

public final class NioEventLoop extends SingleThreadEventLoop {

    // 记录 Selector 上移除 socketChannel 的个数 达到 256 个 则须要将有效的 selectKey 从 SelectedKeys 汇合中革除掉
    private int cancelledKeys;

    private static final int CLEANUP_INTERVAL = 256;

    /**
     * 将 socketChannel 从 selector 中移除 勾销监听 IO 事件
     * */
    void cancel(SelectionKey key) {key.cancel();
        cancelledKeys ++;
        // 当从 selector 中移除的 socketChannel 数量达到 256 个,设置 needsToSelectAgain 为 true
        // 在 io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain 中从新做一次轮询,将生效的 selectKey 移除,// 以保障 selectKeySet 的有效性
        if (cancelledKeys >= CLEANUP_INTERVAL) {
            cancelledKeys = 0;
            needsToSelectAgain = true;
        }
    }
}
  • 调用 JDK NIO SelectionKey 的 API cancel 办法 ,将ChannelSelector中勾销掉。SelectionKey#cancel 办法 调用结束后,此时调用 SelectionKey#isValid 将会返回 falseSelectionKey#cancel 办法 调用后,Selector会将要勾销的这个 SelectionKey 退出到 Selector 中的 cancelledKeys 汇合
public abstract class AbstractSelector extends Selector {private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();

    void cancel(SelectionKey k) {synchronized (cancelledKeys) {cancelledKeys.add(k);
        }
    }
}
  • Channel 对应的 SelectionKey 勾销结束后,Channel勾销计数器 cancelledKeys 会加 1,当 cancelledKeys = 256 时,将 needsToSelectAgain 设置为true
  • 随后在 Selector下一次 轮询过程中,会将 cancelledKeys 汇合 中的 SelectionKeySelector 所有的 KeySet 中移除 。这里的KeySet 包含 Selector 用于寄存就绪 SelectionKeyselectedKeys 汇合 ,以及用于寄存所有注册的Channel 对应的 SelectionKeykeys 汇合
public abstract class SelectorImpl extends AbstractSelector {protected Set<SelectionKey> selectedKeys = new HashSet();
    protected HashSet<SelectionKey> keys = new HashSet();
    
     ..................... 省略...............
}

咱们看到 Reactor 线程 中对 needsToSelectAgain 的判断是在 processSelectedKeysPlain 办法 解决 IO 就绪SelectionKey的循环体中进行判断的。

之所以这里特地提到 needsToSelectAgain 判断的地位,是要让大家留神到此时 Reactor 正在解决 本次 轮询的IO 就绪事件

而前边也说了,当调用 SelectionKey#cancel 办法 后,须要等到 下次轮询 的过程中 Selector 才会将这些勾销的 SelectionKeySelector中的所有 KeySet 汇合 中移除,当然这里也包含就绪汇合selectedKeys

当在 本次 轮询期间,如果大量的 ChannelSelector中勾销,Selector 中的就绪汇合 selectedKeys 中仍然会保留这些 Channel 对应 SelectionKey 直到 下次轮询 。那么当然会影响本次轮询后果selectedKeys 的有效性

所以为了保障 Selector 中所有 KeySet 的有效性 ,须要在Channel 勾销个数达到 256 时,触发一次selectNow,目标是革除有效的SelectionKey

    private void selectAgain() {
        needsToSelectAgain = false;
        try {selector.selectNow();
        } catch (Throwable t) {logger.warn("Failed to update SelectionKeys.", t);
        }
    }

到这里,咱们就对 JDK 原生 Selector 的解决形式 processSelectedKeysPlain 办法 就介绍完了,其实 对 IO 就绪事件 的解决逻辑都是一样的,在咱们了解了 processSelectedKeysPlain 办法 后,processSelectedKeysOptimized 办法 IO 就绪事件 的解决,咱们了解起来就十分轻松了。

3.2 processSelectedKeysOptimized

Netty 默认会采纳优化过的 SelectorIO 就绪事件 的解决。然而解决逻辑是大同小异的。上面咱们次要介绍一下这两个办法的不同之处。

    private void processSelectedKeysOptimized() {
        // 在 openSelector 的时候将 JDK 中 selector 实现类中得 selectedKeys 和 publicSelectKeys 字段类型
        // 由原来的 HashSet 类型替换为 Netty 优化后的数组实现的 SelectedSelectionKeySet 类型
        for (int i = 0; i < selectedKeys.size; ++i) {final SelectionKey k = selectedKeys.keys[i];
            // 对应迭代器中得 remove   selector 不会本人革除 selectedKey
            selectedKeys.keys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);
            } else {@SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {selectedKeys.reset(i + 1);

                selectAgain();
                i = -1;
            }
        }
    }
  • JDK NIO 原生 Selector寄存 IO 就绪的 SelectionKey 的汇合为 HashSet 类型selectedKeys 。而 Netty 为了优化对 selectedKeys 汇合遍历效率 采纳了本人实现的 SelectedSelectionKeySet 类型,从而用对 数组 的遍历代替用 HashSet 的迭代器遍历。
  • Selector会在每次轮询到 IO 就绪事件 时,将 IO 就绪的 Channel 对应的 SelectionKey 插入到 selectedKeys 汇合,然而Selector 只管向 selectedKeys 汇合 放入 IO 就绪的 SelectionKeySelectionKey被处理完毕后,Selector是不会本人被动将其从 selectedKeys 汇合 中移除的,典型的 管杀不论埋 。所以须要 Netty 本人在遍历到IO 就绪的 SelectionKey 后,将其删除。

    • processSelectedKeysPlain 中是间接将其从迭代器中删除。
    • processSelectedKeysOptimized 中将其在数组中对应的地位置为Null,不便垃圾回收。
  • 在最初革除有效的 SelectionKey 时,在 processSelectedKeysPlain 中因为采纳的是 JDK NIO 原生的 Selector,所以只须要执行SelectAgain 就能够,Selector会主动革除有效 Key。
    然而在 processSelectedKeysOptimized 中因为是 Netty 本人实现的优化类型,所以须要 Netty 本人将 SelectedSelectionKeySet 数组中的 SelectionKey 全副革除,最初在执行SelectAgain

好了,到这里,咱们就将 Reactor 线程 如何解决 IO 就绪事件 的整个过程讲述完了,上面咱们就该到了介绍 Reactor 线程 如何解决 Netty 框架中的异步工作了。

4. Reactor 线程解决异步工作

Netty 对于解决 异步工作 的办法有两个:

  • 一个是无超时工夫限度的 runAllTasks() 办法 。当ioRatio 设置为 100 时,Reactor 线程 会先一股脑的解决 IO 就绪事件,而后在一股脑的执行 异步工作,并没有工夫的限度。
  • 另一个是有超时工夫限度的 runAllTasks(long timeoutNanos) 办法 。当ioRatio != 100 时,Reactor 线程 执行 异步工作 会有工夫限度,优先一股脑的解决完 IO 就绪事件 统计出执行 IO 工作 耗时 ioTime。依据公式ioTime * (100 - ioRatio) / ioRatio) 计算出 Reactor 线程 执行 异步工作 的超时工夫。在超时工夫限定范畴内,执行无限的 异步工作

上面咱们来别离看下这两个执行 异步工作 的办法解决逻辑:

4.1 runAllTasks()

    protected boolean runAllTasks() {assert inEventLoop();
        boolean fetchedAll;
        boolean ranAtLeastOne = false;

        do {
            // 将达到执行工夫的定时工作转存到一般工作队列 taskQueue 中,对立由 Reactor 线程从 taskQueue 中取出执行
            fetchedAll = fetchFromScheduledTaskQueue();
            if (runAllTasksFrom(taskQueue)) {ranAtLeastOne = true;}
        } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

        if (ranAtLeastOne) {lastExecutionTime = ScheduledFutureTask.nanoTime();
        }
        // 执行尾部队列工作
        afterRunningAllTasks();
        return ranAtLeastOne;
    }

Reactor 线程 执行 异步工作 的外围逻辑就是:

  • 先将到期的 定时工作 一股脑的从定时工作队列 scheduledTaskQueue 中取出并转存到一般工作队列 taskQueue 中。
  • Reactor 线程 对立从一般工作队列 taskQueue 中取出工作执行。
  • Reactor 线程 执行完 定时工作 一般工作 后,开始执行存储于尾部工作队列 tailTasks 中的 尾部工作

上面咱们来别离看下上述几个外围步骤的实现:

4.1.1 fetchFromScheduledTaskQueue

    /**
     * 从定时工作队列中取出达到 deadline 执行工夫的定时工作
     * 将定时工作 转存到 一般工作队列 taskQueue 中,对立由 Reactor 线程从 taskQueue 中取出执行
     *
     * */
    private boolean fetchFromScheduledTaskQueue() {if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {return true;}
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        for (;;) {
            // 从定时工作队列中取出达到执行 deadline 的定时工作  deadline <= nanoTime
            Runnable scheduledTask = pollScheduledTask(nanoTime);
            if (scheduledTask == null) {return true;}
            if (!taskQueue.offer(scheduledTask)) {
                // taskQueue 没有空间包容 则在将定时工作从新塞进定时工作队列中期待下次执行
                scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
                return false;
            }
        }
    }
  1. 获取以后要执行 异步工作 的工夫点nanoTime
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {private static final long START_TIME = System.nanoTime();

    static long nanoTime() {return System.nanoTime() - START_TIME;
    }
}
  1. 从定时工作队列中找出 deadline <= nanoTime 的异步工作。也就是说找出所有到期的定时工作。
    protected final Runnable pollScheduledTask(long nanoTime) {assert inEventLoop();

        // 从定时队列中取出要执行的定时工作  deadline <= nanoTime
        ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
        if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {return null;}
        // 合乎取出条件 则取出
        scheduledTaskQueue.remove();
        scheduledTask.setConsumed();
        return scheduledTask;
    }
  1. 到期的定时工作 插入到一般工作队列 taskQueue 中,如果 taskQueue 曾经没有空间包容新的工作,则将 定时工作 从新塞进 定时工作队列 中期待下次拉取。
            if (!taskQueue.offer(scheduledTask)) {scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
                return false;
            }
  1. fetchFromScheduledTaskQueue 办法 的返回值为 true 时示意到期的定时工作曾经全副拉取出来并转存到一般工作队列中。
    返回值为 false 时示意到期的定时工作只拉取出来一部分,因为这时一般工作队列曾经满了,当执行完一般工作时,还须要在进行一次拉取。

到期的定时工作 从定时工作队列中拉取结束或者当一般工作队列已满时,这时就会进行拉取,开始执行一般工作队列中的 异步工作

4.1.2 runAllTasksFrom

    protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {Runnable task = pollTaskFrom(taskQueue);
        if (task == null) {return false;}
        for (;;) {safeExecute(task);
            task = pollTaskFrom(taskQueue);
            if (task == null) {return true;}
        }
    }
  • 首先 runAllTasksFrom 办法 的返回值示意是否执行了至多一个异步工作。前面会赋值给ranAtLeastOne 变量,这个返回值咱们后续会用到。
  • 从一般工作队列中拉取 异步工作
    protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {for (;;) {Runnable task = taskQueue.poll();
            if (task != WAKEUP_TASK) {return task;}
        }
    }
  • Reactor 线程 执行 异步工作
    protected static void safeExecute(Runnable task) {
        try {task.run();
        } catch (Throwable t) {logger.warn("A task raised an exception. Task: {}", task, t);
        }
    }

4.1.3 afterRunningAllTasks

        if (ranAtLeastOne) {lastExecutionTime = ScheduledFutureTask.nanoTime();
        }
        // 执行尾部队列工作
        afterRunningAllTasks();
        return ranAtLeastOne;

如果 Reactor 线程 执行了至多一个 异步工作 ,那么设置lastExecutionTime,并将ranAtLeastOne 标识 返回。这里的 ranAtLeastOne 标识 就是 runAllTasksFrom 办法 的返回值。

最初执行收尾工作,也就是执行尾部工作队列中的尾部工作。

    @Override
    protected void afterRunningAllTasks() {runAllTasksFrom(tailTasks);
    }

4.2 runAllTasks(long timeoutNanos)

这里在解决 异步工作 的外围逻辑还是和之前一样的,只不过就是多了对 超时工夫 的管制。

    protected boolean runAllTasks(long timeoutNanos) {fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            // 一般队列中没有工作时  执行队尾队列的工作
            afterRunningAllTasks();
            return false;
        }

        // 异步工作执行超时 deadline
        final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {safeExecute(task);
            runTasks ++;
            // 每运行 64 个异步工作 检查一下 是否达到 执行 deadline
            if ((runTasks & 0x3F) == 0) {lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    // 达到异步工作执行超时 deadline,进行执行异步工作
                    break;
                }
            }

            task = pollTask();
            if (task == null) {lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }
  • 首先还是通过 fetchFromScheduledTaskQueue 办法Reactor中的定时工作队列中拉取 到期的定时工作 ,转存到一般工作队列中。当一般工作队列已满或者 到期定时工作 全副拉取结束时,进行拉取。
  • ScheduledFutureTask.nanoTime() + timeoutNanos 作为 Reactor 线程 执行异步工作的超时工夫点deadline
  • 因为零碎调用 System.nanoTime() 须要肯定的零碎开销,所以每执行完 64异步工作 的时候才会去检查一下 执行工夫 是否达到了 deadline。如果达到了执行截止工夫deadline 则退出进行执行 异步工作 。如果没有达到deadline 则持续从一般工作队列中取出工作循环执行上来。

从这个细节又能够看出 Netty 对性能的考量还是相当考究的


流程走到这里,咱们就对 Reactor 的整个运行框架以及 如何轮询 IO 就绪事件 如何解决 IO 就绪事件 如何执行异步工作 的具体实现逻辑就分析完了。

上面还有一个小小的尾巴,就是 Netty 是如何解决文章结尾提到的 JDK NIO Epoll 的空轮询 BUG 的,让咱们一起来看下吧~~~

5. 解决 JDK Epoll 空轮询 BUG

前边提到,因为 JDK NIO Epoll 的空轮询 BUG 存在,这样会导致 Reactor 线程 在没有任何事件可做的状况下被意外唤醒,导致 CPU 空转。

其实 Netty 也没有从根本上解决这个JDK BUG,而是抉择奇妙的绕过这个BUG

上面咱们来看下 Netty 是如何做到的。

                if (ranTasks || strategy > 0) {if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) {// Unexpected wakeup (unusual case)
                    // 既没有 IO 就绪事件,也没有异步工作,Reactor 线程从 Selector 上被异样唤醒 触发 JDK Epoll 空轮训 BUG
                    // 从新构建 Selector,selectCnt 归零
                    selectCnt = 0;
                }

Reactor 线程 解决完 IO 就绪事件异步工作 后,会查看这次 Reactor 线程 被唤醒有没有执行过异步工作和有没有IO 就绪的 Channel

  • boolean ranTasks 这时候就派上了用场,这个 ranTasks 正是前边咱们在讲 runAllTasks 办法 时提到的返回值。用来示意是否执行过至多一次 异步工作
  • int strategy 正是 JDK NIO Selectorselect 办法 的返回值,用来示意 IO 就绪Channel 个数

如果 ranTasks = false 并且 strategy = 0 这代表 Reactor 线程 本次既没有 异步工作 执行也没有 IO 就绪Channel须要解决却被意外的唤醒。等于是空转了一圈啥也没干。

这种状况下 Netty 就会认为可能曾经触发了JDK NIO Epoll 的空轮询 BUG

    int SELECTOR_AUTO_REBUILD_THRESHOLD = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);

    private boolean unexpectedSelectorWakeup(int selectCnt) {
          .................. 省略...............

        /**
         * 走到这里的条件是 既没有 IO 就绪事件,也没有异步工作,Reactor 线程从 Selector 上被异样唤醒
         * 这种状况可能是曾经触发了 JDK Epoll 的空轮询 BUG,如果这种状况继续 512 次 则认为可能曾经触发 BUG,于是重建 Selector
         *
         * */
        if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
            // The selector returned prematurely many times in a row.
            // Rebuild the selector to work around the problem.
            logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                    selectCnt, selector);
            rebuildSelector();
            return true;
        }
        return false;
    }
  • 如果 Reactor 这种意外唤醒的次数 selectCnt 超过了配置的次数 SELECTOR_AUTO_REBUILD_THRESHOLD , 那么 Netty 就会认定这种状况可能曾经触发了JDK NIO Epoll 空轮询 BUG,则重建Selector( 将之前注册的所有 Channel 从新注册到新的 Selector 上并敞开旧的 Selector),selectCnt 计数 0

SELECTOR_AUTO_REBUILD_THRESHOLD 默认为 512,能够通过零碎变量-D io.netty.selectorAutoRebuildThreshold 指定自定义数值。

  • 如果 selectCnt 小于 SELECTOR_AUTO_REBUILD_THRESHOLD ,则返回不做任何解决,selectCnt 持续计数。

Netty 就这样通过计数 Reactor 被意外唤醒的次数,如果计数 selectCnt 达到了 512 次,则通过 重建 Selector 奇妙的绕开了JDK NIO Epoll 空轮询 BUG

咱们在日常开发中也能够借鉴 Netty 这种解决问题的思路,比方在我的项目开发中,当咱们发现咱们无奈保障彻底的解决一个问题时,或者为了解决这个问题导致咱们的投入产出比不高时,咱们就该思考是不是应该换一种思路去绕过这个问题,从而达到同样的成果。解决问题的最高境界就是不解决它,奇妙的绕过去~!!


总结

本文花了大量的篇幅介绍了 Reactor 整体的运行框架,并深刻介绍了 Reactor 外围的工作模块的具体实现逻辑。

通过本文的介绍咱们晓得了 Reactor 如何轮询注册在其上的所有 Channel 上感兴趣的 IO 事件,以及 Reactor 如何去解决 IO 就绪的事件,如何执行 Netty 框架中提交的异步工作和定时工作。

最初介绍了 Netty 如何奇妙的绕过 JDK NIO Epoll 空轮询的 BUG, 达到解决问题的目标。

提炼了新的解决问题的思路:解决问题的最高境界就是不解决它,奇妙的绕过去~!!

好了,本文的内容就到这里了,咱们下篇文章见~

正文完
 0