关于java:聊聊-Netty-那些事儿之-Reactor-在-Netty-中的实现创建篇

36次阅读

共计 36729 个字符,预计需要花费 92 分钟才能阅读完成。

欢送关注微信公众号:bin 的技术小屋,浏览公众号原文

本系列 Netty 源码解析文章基于 4.1.56.Final版本

在上篇文章《聊聊 Netty 那些事儿之从内核角度看 IO 模型》中咱们花了大量的篇幅来从内核角度具体讲述了五种 IO 模型 的演进过程以及 ReactorIO 线程模型 的底层基石 IO 多路复用技术在内核中的实现原理。

最初咱们引出了 netty 中应用的主从 Reactor IO 线程模型。

通过上篇文章的介绍,咱们曾经分明了在 IO 调用的过程中内核帮咱们搞了哪些事件,那么俗话说的好 内核领进门,修行在 netty,netty 在用户空间又帮咱们搞了哪些事件?

那么从本文开始,笔者将从源码角度来带大家看下上图中的 Reactor IO 线程模型 在 Netty 中是如何实现的。

本文作为 Reactor 在 Netty 中实现系列文章中的开篇文章,笔者先来为大家介绍 Reactor 的骨架是如何创立进去的。

在上篇文章中咱们提到 Netty 采纳的是 主从 Reactor 多线程 的模型,然而它在实现上又与 Doug Lea 在 Scalable IO in Java 论文中提到的经典 主从 Reactor 多线程模型 有所差别。

Netty 中的 Reactor 是以 Group 的模式呈现的,主从 Reactor在 Netty 中就是 主从 Reactor 组 ,每个Reactor Group 中会有多个 Reactor 用来执行具体的 IO 工作。当然在 netty 中Reactor 不只用来执行IO 工作,这个咱们前面再说。

  • Main Reactor Group中的 Reactor 数量取决于服务端要监听的端口个数,通常咱们的服务端程序只会监听一个端口,所以 Main Reactor Group 只会有一个 Main Reactor 线程来解决最重要的事件:绑定端口地址 接管客户端连贯 为客户端创立对应的 SocketChannel将客户端 SocketChannel 调配给一个固定的 Sub Reactor。也就是上篇文章笔者为大家举的例子,饭店最重要的工作就是先把客人迎接进来。“我家大门常关上,凋谢怀抱等你,拥抱过就有了默契你会爱上这里 ……”
  • Sub Reactor Group里有多个 Reactor 线程,Reactor线程的个数能够通过零碎参数 -D io.netty.eventLoopThreads 指定。默认的 Reactor 的个数为 CPU 核数 * 2Sub Reactor 线程次要用来 轮询客户端 SocketChannel 上的 IO 就绪事件 解决 IO 就绪事件 执行异步工作 Sub Reactor Group 做的事件就是上篇饭店例子中服务员的工作,客人进来了要为客人调配座位,端茶送水,做菜上菜。“不论远近都是客人,请不必客气,相约好了在一起,咱们欢迎您 ……”

一个 客户端 SocketChannel只能调配给一个固定的 Sub Reactor。一个Sub Reactor 负责解决多个 客户端 SocketChannel,这样能够将服务端承载的 全量客户端连贯 摊派到多个 Sub Reactor 中解决,同时也能保障 客户端 SocketChannel 上的 IO 解决的线程安全性

因为文章篇幅的关系,作为 Reactor 在 netty 中实现的第一篇咱们次要来介绍 主从 Reactor Group的创立流程,骨架脉络先搭好。

上面咱们来看一段 Netty 服务端代码的编写模板,从代码模板的流程中咱们来解析下主从 Reactor 的创立流程以及在这个过程中所波及到的 Netty 外围类。

Netty 服务端代码模板

/**
 * Echoes back any received data from a client.
 */
public final class EchoServer {static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure the server.
        // 创立主从 Reactor 线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)// 配置主从 Reactor
             .channel(NioServerSocketChannel.class)// 配置主 Reactor 中的 channel 类型
             .option(ChannelOption.SO_BACKLOG, 100)// 设置主 Reactor 中 channel 的 option 选项
             .handler(new LoggingHandler(LogLevel.INFO))// 设置主 Reactor 中 Channel->pipline->handler
             .childHandler(new ChannelInitializer<SocketChannel>() {// 设置从 Reactor 中注册 channel 的 pipeline
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // Start the server. 绑定端口启动服务,开始监听 accept 事件
            ChannelFuture f = b.bind(PORT).sync();
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();} finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();}
    }
}

  1. 首先咱们要创立 Netty 最外围的局部 -> 创立主从 Reactor Group,在 Netty 中 EventLoopGroup 就是 Reactor Group 的实现类。对应的 EventLoop 就是 Reactor 的实现类。
  // 创立主从 Reactor 线程组
  EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  EventLoopGroup workerGroup = new NioEventLoopGroup();
  1. 创立用于 IO 解决ChannelHandler,实现相应 IO 事件 的回调函数,编写对应的 IO 解决 逻辑。留神这里只是简略示例哈,具体的 IO 事件处理,笔者会独自开一篇文章专门讲述。
final EchoServerHandler serverHandler = new EchoServerHandler();

/**
 * Handler implementation for the echo server.
 */
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ................ 省略 IO 解决逻辑................
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();}
}
  1. 创立ServerBootstrap Netty 服务端启动类,并在启动类中配置启动 Netty 服务端所须要的一些必备信息。

    • 通过 serverBootstrap.group(bossGroup, workerGroup) 为 Netty 服务端配置 主从 Reactor Group实例。
    • 通过 serverBootstrap.channel(NioServerSocketChannel.class) 配置 Netty 服务端的 ServerSocketChannel 用于 绑定端口地址 以及 创立客户端 SocketChannel。Netty 中的 NioServerSocketChannel.class 就是对 JDK NIO 中 ServerSocketChannel 的封装。而用于示意 客户端连贯 NioSocketChannel是对 JDK NIO SocketChannel封装。

      在上篇文章介绍 Socket 内核构造 大节中咱们提到,在编写服务端网络程序时,咱们首先要创立一个 Socket 用于 listen 和 bind 端口地址,咱们把这个叫做 监听 Socket, 这里对应的就是 NioServerSocketChannel.class。当客户端连贯实现三次握手,零碎调用accept 函数会基于 监听 Socket创立进去一个 新的 Socket专门用于与客户端之间的网络通信咱们称为 客户端连贯 Socket, 这里对应的就是NioSocketChannel.class

    • serverBootstrap.option(ChannelOption.SO_BACKLOG, 100)设置服务端 ServerSocketChannel 中的 SocketOption。对于SocketOption 的选项咱们后边的文章再聊,本文次要聚焦在 Netty Main Reactor Group的创立及工作流程。
    • serverBootstrap.handler(....)设置服务端 NioServerSocketChannel 中对应 Pipieline 中的ChannelHandler

      netty 有两种 Channel 类型:一种是服务端用于监听绑定端口地址的NioServerSocketChannel, 一种是用于客户端通信的NioSocketChannel。每种Channel 类型实例 都会对应一个 PipeLine 用于编排 对应 channel 实例 上的 IO 事件处理逻辑。PipeLine中组织的就是 ChannelHandler 用于编写特定的 IO 解决逻辑。

      留神 serverBootstrap.handler 设置的是服务端 NioServerSocketChannel PipeLine 中的ChannelHandler

    • serverBootstrap.childHandler(ChannelHandler childHandler)用于设置客户端 NioSocketChannel 中对应 Pipieline 中的ChannelHandler。咱们通常配置的编码解码器就是在这里。

      ServerBootstrap 启动类办法带有 child 前缀的均是设置客户端 NioSocketChannel 属性的。

      ChannelInitializer 是用于当 SocketChannel 胜利注册到绑定的 Reactor 上后,用于初始化该 SocketChannelPipeline。它的 initChannel 办法会在注册胜利后执行。这里只是捎带提一下,让大家有个初步印象,前面我会专门介绍。

  2. ChannelFuture f = serverBootstrap.bind(PORT).sync()这一步会是下篇文章要重点剖析的主题 Main Reactor Group 的启动,绑定端口地址,开始监听客户端连贯事件(OP_ACCEPT)。本文咱们只关注创立流程。
  3. f.channel().closeFuture().sync()期待服务端 NioServerSocketChannel 敞开。Netty 服务端到这里正式启动,并筹备好承受客户端连贯的筹备。
  4. shutdownGracefully优雅敞开 主从 Reactor 线程组 里的所有Reactor 线程

Netty 对 IO 模型的反对

在上篇文章中咱们介绍了五种 IO 模型,Netty 中反对BIO,NIO,AIO 以及多种操作系统下的 IO 多路复用技术 实现。

在 Netty 中切换这几种 IO 模型 也是十分的不便,上面咱们来看下 Netty 如何对这几种 IO 模型进行反对的。

首先咱们介绍下几个与 IO 模型 相干的重要接口:

EventLoop

EventLoop就是 Netty 中的 Reactor,能够说它就是 Netty 的引擎,负责 Channel 上IO 就绪事件的监听IO 就绪事件的解决 异步工作的执行 驱动着整个 Netty 的运行。

不同 IO 模型 下,EventLoop有着不同的实现,咱们只须要切换不同的实现类就能够实现对 NettyIO 模型 的切换。

BIO NIO AIO
ThreadPerChannelEventLoop NioEventLoop AioEventLoop

NIO 模型 下 Netty 会 主动 依据操作系统以及版本的不同抉择对应的IO 多路复用技术实现。比方 Linux 2.6 版本以上用的是Epoll,2.6 版本以下用的是Poll,Mac 下采纳的是Kqueue

其中 Linux kernel 在 5.1 版本引入的异步 IO 库 io_uring 正在 netty 中孵化。

EventLoopGroup

Netty 中的 Reactor 是以 Group 的模式呈现的,EventLoopGroup正是 Reactor 组 的接口定义,负责管理 Reactor,Netty 中的Channel 就是通过 EventLoopGroup 注册到具体的 Reactor 上的。

Netty 的 IO 线程模型是 主从 Reactor 多线程模型 主从 Reactor 线程组 在 Netty 源码中对应的其实就是两个 EventLoopGroup 实例。

不同的 IO 模型 也有对应的实现:

BIO NIO AIO
ThreadPerChannelEventLoopGroup NioEventLoopGroup AioEventLoopGroup

ServerSocketChannel

用于 Netty 服务端应用的 ServerSocketChannel,对应于上篇文章提到的 监听 Socket,负责绑定监听端口地址,接管客户端连贯并创立用于与客户端通信的SocketChannel

不同的 IO 模型 下的实现:

BIO NIO AIO
OioServerSocketChannel NioServerSocketChannel AioServerSocketChannel

SocketChannel

用于与客户端通信的 SocketChannel,对应于上篇文章提到的 客户端连贯 Socket,当客户端实现三次握手后,由零碎调用 accept 函数依据 监听 Socket创立。

不同的 IO 模型 下的实现:

BIO NIO AIO
OioSocketChannel NioSocketChannel AioSocketChannel

咱们看到在 不同 IO 模型 的实现中,Netty 这些围绕 IO 模型 的外围类只是前缀的不同:

  • BIO 对应的前缀为 Oio 示意old io,当初曾经废除不举荐应用。
  • NIO 对应的前缀为 Nio,正是 Netty 举荐也是咱们罕用的 非阻塞 IO 模型
  • AIO 对应的前缀为 Aio,因为 Linux 下的 异步 IO机制实现的并不成熟,性能晋升体现上也不显著,现已被删除。

咱们只须要将 IO 模型 的这些外围接口对应的实现类 前缀 改为对应 IO 模型 的前缀,就能够轻松在 Netty 中实现对 IO 模型 的切换。

多种 NIO 的实现

Common Linux Mac
NioEventLoopGroup EpollEventLoopGroup KQueueEventLoopGroup
NioEventLoop EpollEventLoop KQueueEventLoop
NioServerSocketChannel EpollServerSocketChannel KQueueServerSocketChannel
NioSocketChannel EpollSocketChannel KQueueSocketChannel

咱们通常在应用 NIO 模型 的时候会应用 Common 列 下的这些 IO 模型 外围类,Common 类 也会依据操作系统的不同主动抉择 JDK 在对应平台下的 IO 多路复用技术 的实现。

而 Netty 本身也依据操作系统的不同提供了本人对 IO 多路复用技术 的实现,比 JDK 的实现性能更优。比方:

  • JDK 的 NIO 默认 实现是 程度触发 ,Netty 是 边缘触发 (默认) 和程度触发可切换。。
  • Netty 实现的垃圾回收更少、性能更好。

咱们编写 Netty 服务端程序的时候也能够依据操作系统的不同,采纳 Netty 本身的实现来进一步优化程序。做法也很简略,间接将上图中红框里的实现类替换成 Netty 的本身实现类即可实现切换。


通过以上对 Netty 服务端代码编写模板以及 IO 模型 相干外围类的简略介绍,咱们对 Netty 的创立流程有了一个简略粗略的总体意识,上面咱们来深刻分析下创立流程过程中的每一个步骤以及这个过程中波及到的外围类实现。

以下源码解析局部咱们均采纳 Common 列NIO相干的实现进行解析。

创立主从 Reactor 线程组

在 Netty 服务端程序编写模板的开始,咱们首先会创立两个 Reactor 线程组:

  • 一个是主 Reactor 线程组 bossGroup 用于监听客户端连贯,创立客户端连贯 NioSocketChannel,并将创立好的客户端连贯NioSocketChannel 注册到从 Reactor 线程组中一个固定的 Reactor 上。
  • 一个是从 Reactor 线程组 workerGroupworkerGroup 中的 Reactor 负责监听绑定在其上的客户端连贯 NioSocketChannel 上的 IO 就绪事件,并解决IO 就绪事件 执行异步工作
  // 创立主从 Reactor 线程组
  EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  EventLoopGroup workerGroup = new NioEventLoopGroup();

Netty 中 Reactor 线程组的实现类为 NioEventLoopGroup,在创立bossGroupworkerGroup的时候用到了 NioEventLoopGroup 的两个构造函数:

  • nThreads 参数的构造函数public NioEventLoopGroup(int nThreads)
  • 不带 nThreads 参数的 默认 构造函数public NioEventLoopGroup()
public class NioEventLoopGroup extends MultithreadEventLoopGroup {

    /**
     * Create a new instance using the default number of threads, the default {@link ThreadFactory} and
     * the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
     */
    public NioEventLoopGroup() {this(0);
    }

    /**
     * Create a new instance using the specified number of threads, {@link ThreadFactory} and the
     * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
     */
    public NioEventLoopGroup(int nThreads) {this(nThreads, (Executor) null);
    }

    ...................... 省略...........................
}

nThreads参数示意以后要创立的 Reactor 线程组 内蕴含多少个 Reactor 线程。不指定nThreads 参数的话采纳默认的 Reactor 线程 个数,用 0 示意。

最终会调用到构造函数

    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

上面简略介绍下构造函数中这几个参数的作用,前面咱们在解说本文主线的过程中还会提及这几个参数,到时在具体介绍,这里只是让大家有个初步印象,不用做过多的纠缠。

  • Executor executor:负责启动 Reactor 线程 进而 Reactor 才能够开始工作。

    Reactor 线程组 NioEventLoopGroup 负责创立 Reactor 线程,在创立的时候会将executor 传入。

  • RejectedExecutionHandler: 当向 Reactor 增加异步工作增加失败时,采纳的回绝策略。Reactor 的工作不只是监听 IO 沉闷事件和 IO 工作的解决,还包含对异步工作的解决。这里大家只需有个这样的概念,前面笔者会专门具体介绍。
  • SelectorProvider selectorProvider: Reactor 中的 IO 模型为 IO 多路复用模型,对应于 JDK NIO 中的实现为java.nio.channels.Selector(就是咱们上篇文章中提到的select,poll,epoll),每个 Reator 中都蕴含一个Selector,用于 轮询 注册在该 Reactor 上的所有 Channel 上的 IO 事件SelectorProvider 就是用来创立 Selector 的。
  • SelectStrategyFactory selectStrategyFactory: Reactor 最重要的事件就是 轮询 注册其上的 Channel 上的 IO 就绪事件,这里的SelectStrategyFactory 用于指定 轮询策略,默认为DefaultSelectStrategyFactory.INSTANCE

最终会将这些参数交给 NioEventLoopGroup 的父类结构器,上面咱们来看下 NioEventLoopGroup 类 的继承构造:

NioEventLoopGroup 类 的继承构造乍一看比较复杂,大家不要慌,笔者会随着主线的深刻缓缓地介绍这些父类接口,咱们当初重点关注 Mutithread 前缀的类。

咱们晓得 NioEventLoopGroup 是 Netty 中的 Reactor 线程组 的实现,既然是线程组那么必定是负责管理和创立 多个 Reactor 线程的 ,所以Mutithread 前缀的类定义的行为天然是对 Reactor 线程组 内多个 Reactor 线程 的创立和管理工作。

MultithreadEventLoopGroup

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
    // 默认 Reactor 个数
    private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

    ................... 省略.....................
}

MultithreadEventLoopGroup 类 次要的性能就是用来确定 Reactor 线程组Reactor的个数。

默认的 Reactor 的个数寄存于字段 DEFAULT_EVENT_LOOP_THREADS 中。

static {} 动态代码块中咱们能够看出默认 Reactor 的个数的获取逻辑:

  • 能够通过零碎变量 -D io.netty.eventLoopThreads"指定。
  • 如果不指定,那么默认的就是NettyRuntime.availableProcessors() * 2

nThread 参数设置为 0 采纳默认设置时,Reactor 线程组 内的 Reactor 个数则设置为DEFAULT_EVENT_LOOP_THREADS

MultithreadEventExecutorGroup

MultithreadEventExecutorGroup这里就是本大节的外围,次要用来定义创立和治理 Reactor 的行为。

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

    //Reactor 线程组中的 Reactor 汇合
    private final EventExecutor[] children;
    private final Set<EventExecutor> readonlyChildren;
    // 从 Reactor group 中抉择一个特定的 Reactor 的抉择策略 用于 channel 注册绑定到一个固定的 Reactor 上
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;

    /**
     * Create a new instance.
     *
     * @param nThreads          the number of threads that will be used by this instance.
     * @param executor          the Executor to use, or {@code null} if the default should be used.
     * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
     */
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }

    ............................ 省略................................
}

首先介绍一个新的结构器参数 EventExecutorChooserFactory chooserFactory。当客户端连贯实现三次握手后,Main Reactor 会创立客户端连贯 NioSocketChannel,并将其绑定到Sub Reactor Group 中的一个固定 Reactor,那么具体要绑定到哪个具体的Sub Reactor 上呢?这个绑定策略就是由 chooserFactory 来创立的。默认为DefaultEventExecutorChooserFactory

上面就是本大节的主题 Reactor 线程组 的创立过程:

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {if (nThreads <= 0) {throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            // 用于创立 Reactor 线程
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];
        // 循环创立 reaactor group 中的 Reactor
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // 创立 reactor
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {throw new IllegalStateException("failed to create a child event loop", e);
            } finally {................ 省略................}
            }
        }
        // 创立 channel 到 Reactor 的绑定策略
        chooser = chooserFactory.newChooser(children);

         ................ 省略................

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

1. 创立用于启动 Reactor 线程的 executor

在 Netty Reactor Group 中的单个 ReactorIO 线程模型 为上篇文章提到的 单 Reactor 单线程模型 ,一个Reactor 线程 负责 轮询 注册其上的所有 Channel 中的 IO 就绪事件,解决 IO 事件,执行 Netty 中的异步工作等工作。正是这个Reactor 线程 驱动着整个 Netty 的运行,堪称是 Netty 的外围引擎。

而这里的 executor 就是负责启动 Reactor 线程 的,从创立源码中咱们能够看到 executor 的类型为ThreadPerTaskExecutor

ThreadPerTaskExecutor

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
    }

    @Override
    public void execute(Runnable command) {threadFactory.newThread(command).start();}
}

咱们看到 ThreadPerTaskExecutor 做的事件很简略,从它的命名前缀 ThreadPerTask 咱们就能够猜出它的工作形式,就是来一个工作就创立一个线程执行。而创立的这个线程正是 netty 的外围引擎 Reactor 线程。

Reactor 线程 启动的时候,Netty 会将 Reactor 线程 要做的事件封装成 Runnable,丢给exexutor 启动。

Reactor 线程 的外围就是一个 死循环 不停的 轮询IO 就绪事件,解决 IO 事件,执行异步工作。一刻也不停歇,堪称996 榜样

这里向大家先卖个关子,"Reactor 线程是何时启动的呢??"

2. 创立 Reactor

Reactor 线程组 NioEventLoopGroup蕴含多个 Reactor,寄存于private final EventExecutor[] children 数组中。

所以上面的事件就是创立 nThreadReactor,并寄存于 EventExecutor[] children 字段中,

咱们来看下用于创立 ReactornewChild(executor, args)办法:

newChild

newChild办法是 MultithreadEventExecutorGroup 中的一个形象办法,提供给具体子类实现。

protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;

这里咱们解析的是 NioEventLoopGroup,咱们来看下newChild 在该类中的实现:

public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    }
}

前边提到的泛滥结构器参数,这里会通过可变参数 Object... args 传入到 Reactor 类 NioEventLoop 的结构器中。

这里介绍下新的参数 EventLoopTaskQueueFactory queueFactory,前边提到 Netty 中的Reactor 次要工作是 轮询 注册其上的所有 Channel 上的 IO 就绪事件,解决IO 就绪事件。除了这些次要的工作外,Netty 为了极致的压迫Reactor 的性能,还会让它做一些异步工作的执行工作。既然要执行异步工作,那么 Reactor 中就须要一个 队列 来保留工作。

这里的 EventLoopTaskQueueFactory 就是用来创立这样的一个队列来保留 Reactor 中待执行的异步工作。

能够把 Reactor 了解成为一个 单线程的线程池 相似 JDK中的 SingleThreadExecutor,仅用一个线程来执行 轮询 IO 就绪事件 解决 IO 就绪事件 执行异步工作 。同时待执行的异步工作保留在Reactor 里的 taskQueue 中。

NioEventLoop

public final class NioEventLoop extends SingleThreadEventLoop {
    // 用于创立 JDK NIO Selector,ServerSocketChannel
    private final SelectorProvider provider;
    //Selector 轮询策略 决定什么时候轮询,什么时候解决 IO 事件,什么时候执行异步工作
    private final SelectStrategy selectStrategy;
    /**
     * The NIO {@link Selector}.
     */
    private Selector selector;
    private Selector unwrappedSelector;

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                rejectedExecutionHandler);
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }
}

这里就正式开始了 Reactor 的创立过程,咱们晓得 Reactor 的外围是采纳的 IO 多路复用模型 来对客户端连贯上的 IO 事件 进行 监听,所以最重要的事件是创立Selector(JDK NIO 中 IO 多路复用技术的实现)。

能够把 Selector 了解为咱们上篇文章介绍的 Select,poll,epoll,它是JDK NIO 对操作系统内核提供的这些 IO 多路复用技术 的封装。

openSelector

openSelectorNioEventLoop 类 中用于创立 IO 多路复用Selector,并对创立进去的 JDK NIO 原生的Selector 进行性能优化。

首先会通过 SelectorProvider#openSelector 创立 JDK NIO 原生的Selector

 private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            // 通过 JDK NIO SelectorProvider 创立 Selector
            unwrappedSelector = provider.openSelector();} catch (IOException e) {throw new ChannelException("failed to open a new selector", e);
        }

        .................. 省略.............
}

SelectorProvider会依据操作系统的不同抉择 JDK 在不同操作系统版本下的对应 Selector 的实现。Linux 下会抉择Epoll,Mac 下会抉择Kqueue

上面咱们就来看下 SelectorProvider 是如何做到主动适配不同操作系统下 IO 多路复用 实现的

SelectorProvider

    public NioEventLoopGroup(ThreadFactory threadFactory) {this(0, threadFactory, SelectorProvider.provider());
    }

SelectorProvider是在后面介绍的 NioEventLoopGroup 类 构造函数中通过调用 SelectorProvider.provider() 被加载,并通过 NioEventLoopGroup#newChild 办法中的可变长参数 Object... args 传递到 NioEventLoop 中的 private final SelectorProvider provider 字段中。

SelectorProvider 的加载过程:

public abstract class SelectorProvider {public static SelectorProvider provider() {synchronized (lock) {if (provider != null)
                return provider;
            return AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {public SelectorProvider run() {if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }
}

SelectorProvider 加载源码中咱们能够看出,SelectorProvider的加载形式有三种,优先级如下:

  1. 通过零碎变量 -D java.nio.channels.spi.SelectorProvider 指定 SelectorProvider 的自定义实现类 全限定名 。通过 应用程序类加载器 (Application Classloader) 加载。
    private static boolean loadProviderFromProperty() {String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
        if (cn == null)
            return false;
        try {
            Class<?> c = Class.forName(cn, true,
                                       ClassLoader.getSystemClassLoader());
            provider = (SelectorProvider)c.newInstance();
            return true;
        } 
        ................. 省略.............
    }
  1. 通过 SPI 形式加载。在工程目录 META-INF/services 下定义名为 java.nio.channels.spi.SelectorProviderSPI 文件 ,文件中第一个定义的SelectorProvider 实现类全限定名就会被加载。
    private static boolean loadProviderAsService() {

        ServiceLoader<SelectorProvider> sl =
            ServiceLoader.load(SelectorProvider.class,
                               ClassLoader.getSystemClassLoader());
        Iterator<SelectorProvider> i = sl.iterator();
        for (;;) {
            try {if (!i.hasNext())
                    return false;
                provider = i.next();
                return true;
            } catch (ServiceConfigurationError sce) {if (sce.getCause() instanceof SecurityException) {
                    // Ignore the security exception, try the next provider
                    continue;
                }
                throw sce;
            }
        }
    }
  1. 如果以上两种形式均未被定义,那么就采纳 SelectorProvider 零碎默认实现 sun.nio.ch.DefaultSelectorProvider。笔者以后应用的操作系统是MacOS,从源码中咱们能够看到主动适配了KQueue 实现。
public class DefaultSelectorProvider {private DefaultSelectorProvider() { }

    public static SelectorProvider create() {return new KQueueSelectorProvider();
    }
}

不同操作系统中 JDK 对于 DefaultSelectorProvider 会有所不同,Linux 内核版本 2.6 以上对应的Epoll,Linux 内核版本 2.6 以下对应的Poll,MacOS 对应的是KQueue

上面咱们接着回到 io.netty.channel.nio.NioEventLoop#openSelector 的主线上来。

Netty 对 JDK NIO 原生 Selector 的优化

首先在 NioEventLoop 中有一个 Selector 优化开关 DISABLE_KEY_SET_OPTIMIZATION, 通过零碎变量-D io.netty.noKeySetOptimization 指定,默认是开启的,示意须要对 JDK NIO 原生 Selector 进行优化。

public final class NioEventLoop extends SingleThreadEventLoop {
   //Selector 优化开关 默认开启 为了遍历的效率 会对 Selector 中的 SelectedKeys 进行数据结构优化
    private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
            SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
}

如果优化开关 DISABLE_KEY_SET_OPTIMIZATION 是敞开的,那么间接返回 JDK NIO 原生的Selector

private SelectorTuple openSelector() {
        ..........SelectorProvider 创立 JDK NIO  原生 Selector..............

        if (DISABLE_KEY_SET_OPTIMIZATION) {
            //JDK NIO 原生 Selector,Selector 优化开关 默认开启须要对 Selector 进行优化
            return new SelectorTuple(unwrappedSelector);
        }
}

上面为 Netty 对 JDK NIO 原生的 Selector 的优化过程:

  1. 获取 JDK NIO 原生 Selector 的形象实现类 sun.nio.ch.SelectorImplJDK NIO 原生 Selector 的实现均继承于该抽象类。用于判断由 SelectorProvider 创立进去的 Selector 是否为 JDK 默认实现SelectorProvider 第三种加载形式)。因为 SelectorProvider 能够是自定义加载,所以它创立进去的 Selector 并不一定是 JDK NIO 原生的。
       Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {return cause;}
            }
        });

JDK NIO Selector 的抽象类sun.nio.ch.SelectorImpl

public abstract class SelectorImpl extends AbstractSelector {

    // The set of keys with data ready for an operation
    // //IO 就绪的 SelectionKey(外面包裹着 channel)protected Set<SelectionKey> selectedKeys;

    // The set of keys registered with this Selector
    // 注册在该 Selector 上的所有 SelectionKey(外面包裹着 channel)protected HashSet<SelectionKey> keys;

    // Public views of the key sets
    // 用于向调用线程返回的 keys,不可变
    private Set<SelectionKey> publicKeys;             // Immutable
    // 当有 IO 就绪的 SelectionKey 时,向调用线程返回。只可删除其中元素,不可减少
    private Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition

    protected SelectorImpl(SelectorProvider sp) {super(sp);
        keys = new HashSet<SelectionKey>();
        selectedKeys = new HashSet<SelectionKey>();
        if (Util.atBugLevel("1.4")) {
            publicKeys = keys;
            publicSelectedKeys = selectedKeys;
        } else {
            // 不可变
            publicKeys = Collections.unmodifiableSet(keys);
            // 只可删除其中元素,不可减少
            publicSelectedKeys = Util.ungrowableSet(selectedKeys);
        }
    }
}

这里笔者来简略介绍下 JDK NIO 中的 Selector 中这几个字段的含意,咱们能够和上篇文章讲到的 epoll 在内核中的构造做类比,不便大家后续的了解:

  • Set<SelectionKey> selectedKeys 相似于咱们上篇文章解说 Epoll 时提到的 就绪队列 eventpoll->rdllistSelector这里大家能够了解为 EpollSelector 会将本人监听到的 IO 就绪Channel放到 selectedKeys 中。

这里的 SelectionKey 暂且能够了解为 ChannelSelector中的示意,类比上图中 epitem 构造 里的 epoll_event,封装 IO 就绪 Socket 的信息。
其实 SelectionKey 里蕴含的信息不止是 Channel 还有很多 IO 相干的信息。前面咱们在具体介绍。

  • HashSet<SelectionKey> keys:这里寄存的是所有注册到该 Selector 上的Channel。类比epoll 中的红黑树结构 rb_root

    SelectionKeyChannel 注册到 Selector 中后生成。

  • Set<SelectionKey> publicSelectedKeys 相当于是 selectedKeys 的视图,用于向内部线程返回 IO 就绪SelectionKey。这个汇合在内部线程中只能做删除操作 不可减少元素 ,并且 不是线程平安的
  • Set<SelectionKey> publicKeys相当于 keys 的不可变视图,用于向内部线程返回所有注册在该 Selector 上的SelectionKey

这里须要 重点关注 抽象类 sun.nio.ch.SelectorImpl 中的 selectedKeyspublicSelectedKeys这两个字段,留神它们的类型都是HashSet ,一会优化的就是这里!!!!

  1. 判断由 SelectorProvider 创立进去的 Selector 是否是 JDK NIO 原生的 Selector 实现。因为 Netty 优化针对的是 JDK NIO 原生 Selector。判断规范为sun.nio.ch.SelectorImpl 类是否为 SelectorProvider 创立出 Selector 的父类。如果不是则间接返回。不在持续上面的优化过程。
        // 判断是否能够对 Selector 进行优化,这里次要针对 JDK NIO 原生 Selector 的实现类进行优化,因为 SelectorProvider 能够加载的是自定义 Selector 实现
        // 如果 SelectorProvider 创立的 Selector 不是 JDK 原生 sun.nio.ch.SelectorImpl 的实现类,那么无奈进行优化,间接返回
        if (!(maybeSelectorImplClass instanceof Class) ||
            !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {if (maybeSelectorImplClass instanceof Throwable) {Throwable t = (Throwable) maybeSelectorImplClass;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
            }
            return new SelectorTuple(unwrappedSelector);
        }

通过后面对 SelectorProvider 的介绍咱们晓得,这里通过 provider.openSelector() 创立进去的 Selector 实现类为 KQueueSelectorImpl 类,它继承实现了sun.nio.ch.SelectorImpl,所以它是 JDK NIO 原生的Selector 实现

class KQueueSelectorImpl extends SelectorImpl {}
  1. 创立 SelectedSelectionKeySet 通过反射替换掉 sun.nio.ch.SelectorImpl 类selectedKeys publicSelectedKeys 的默认 HashSet 实现。

为什么要用 SelectedSelectionKeySet 替换掉原来的 HashSet 呢??

因为这里波及到对 HashSet 类型sun.nio.ch.SelectorImpl#selectedKeys汇合的两种操作:

  • 插入操作: 通过前边对 sun.nio.ch.SelectorImpl 类 中字段的介绍咱们晓得,在 Selector 监听到 IO 就绪SelectionKey 后,会将 IO 就绪SelectionKey 插入 sun.nio.ch.SelectorImpl#selectedKeys 汇合中,这时 Reactor 线程 会从 java.nio.channels.Selector#select(long) 阻塞调用中返回(相似上篇文章提到的epoll_wait)。
  • 遍历操作:Reactor 线程 返回后,会从 Selector 中获取 IO 就绪SelectionKey汇合(也就是 sun.nio.ch.SelectorImpl#selectedKeys),Reactor 线程 遍历 selectedKeys, 获取IO 就绪SocketChannel,并解决 SocketChannel 上的IO 事件

咱们都晓得 HashSet 底层数据结构是一个 哈希表 ,因为Hash 抵触 这种状况的存在,所以导致对 哈希表 进行 插入 遍历 操作的性能不如对 数组 进行 插入 遍历 操作的性能好。

还有一个重要起因是,数组能够利用 CPU 缓存的劣势来进步遍历的效率。前面笔者会有一篇专门的文章来讲述利用 CPU 缓存行如何为咱们带来性能劣势。

所以 Netty 为了优化对 sun.nio.ch.SelectorImpl#selectedKeys 汇合的 插入,遍历 性能,本人用 数组 这种数据结构实现了 SelectedSelectionKeySet ,用它来替换原来的HashSet 实现。

SelectedSelectionKeySet

  • 初始化 SelectionKey[] keys 数组大小为1024,当数组容量不够时,扩容为原来的两倍大小。
  • 通过数组尾部指针 size,在向数组插入元素的时候能够间接定位到插入地位keys[size++]。操作一步到位,不必像 哈希表 那样还须要解决Hash 抵触
  • 对数组的遍历操作也是如丝般顺滑,CPU 间接能够在缓存行中遍历读取数组元素无需拜访内存。比 HashSet 的迭代器java.util.HashMap.KeyIterator 遍历形式性能不知高到哪里去了。
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    // 采纳数组替换到 JDK 中的 HashSet, 这样 add 操作和遍历操作效率更高,不须要思考 hash 抵触
    SelectionKey[] keys;
    // 数组尾部指针
    int size;

    SelectedSelectionKeySet() {keys = new SelectionKey[1024];
    }

    /**
     * 数组的增加效率高于 HashSet 因为不须要思考 hash 抵触
     * */
    @Override
    public boolean add(SelectionKey o) {if (o == null) {return false;}
        // 工夫复杂度 O(1)keys[size++] = o;
        if (size == keys.length) {
            // 扩容为原来的两倍大小
            increaseCapacity();}

        return true;
    }

    private void increaseCapacity() {SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
        System.arraycopy(keys, 0, newKeys, 0, size);
        keys = newKeys;
    }

    /**
     * 采纳数组的遍历效率 高于 HashSet
     * */
    @Override
    public Iterator<SelectionKey> iterator() {return new Iterator<SelectionKey>() {
            private int idx;

            @Override
            public boolean hasNext() {return idx < size;}

            @Override
            public SelectionKey next() {if (!hasNext()) {throw new NoSuchElementException();
                }
                return keys[idx++];
            }

            @Override
            public void remove() {throw new UnsupportedOperationException();
            }
        };
    }
}

看到这里不禁感叹,从各种小的细节能够看出 Netty 对性能的优化几乎酣畅淋漓,对性能的谋求令人发指。细节真的是魔鬼。

  1. Netty 通过反射的形式用 SelectedSelectionKeySet 替换掉 sun.nio.ch.SelectorImpl#selectedKeyssun.nio.ch.SelectorImpl#publicSelectedKeys 这两个汇合中原来 HashSet 的实现。
  • 反射获取 sun.nio.ch.SelectorImpl 类中 selectedKeyspublicSelectedKeys
  Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
  Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
  • Java9版本以上通过 sun.misc.Unsafe 设置字段值的形式
       if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                        long publicSelectedKeysFieldOffset =
                                PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                        if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                            PlatformDependent.putObject(unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                            PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                            return null;
                        }
                        
                    }
  • 通过反射的形式用 SelectedSelectionKeySet 替换掉 hashSet 实现的sun.nio.ch.SelectorImpl#selectedKeys,sun.nio.ch.SelectorImpl#publicSelectedKeys
          Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
          if (cause != null) {return cause;}
          cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
          if (cause != null) {return cause;}
          //Java8 反射替换字段
          selectedKeysField.set(unwrappedSelector, selectedKeySet);
          publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
  1. 将与 sun.nio.ch.SelectorImpl 类中 selectedKeyspublicSelectedKeys关联好的 Netty 优化实现 SelectedSelectionKeySet,设置到io.netty.channel.nio.NioEventLoop#selectedKeys 字段中保留。
   // 会通过反射替换 selector 对象中的 selectedKeySet 保留就绪的 selectKey
    // 该字段为持有 selector 对象 selectedKeys 的援用,当 IO 事件就绪时,间接从这里获取
    private SelectedSelectionKeySet selectedKeys;

后续 Reactor 线程 就会间接从 io.netty.channel.nio.NioEventLoop#selectedKeys 中获取 IO 就绪SocketChannel

  1. SelectorTuple 封装 unwrappedSelectorwrappedSelector返回给 NioEventLoop 构造函数。到此 Reactor 中的 Selector 就创立结束了。
return new SelectorTuple(unwrappedSelector,
                      new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    private static final class SelectorTuple {
        final Selector unwrappedSelector;
        final Selector selector;

        SelectorTuple(Selector unwrappedSelector) {
            this.unwrappedSelector = unwrappedSelector;
            this.selector = unwrappedSelector;
        }

        SelectorTuple(Selector unwrappedSelector, Selector selector) {
            this.unwrappedSelector = unwrappedSelector;
            this.selector = selector;
        }
    }
  • 所谓的 unwrappedSelector 是指被 Netty 优化过的 JDK NIO 原生 Selector。
  • 所谓的 wrappedSelector 就是用 SelectedSelectionKeySetSelector 装璜类将 unwrappedSelector 和与 sun.nio.ch.SelectorImpl 类 关联好的 Netty 优化实现 SelectedSelectionKeySet 封装装璜起来。

wrappedSelector会将所有对 Selector 的操作全副代理给 unwrappedSelector,并在 发动轮询 IO 事件 的相干操作中,重置 SelectedSelectionKeySet 清空上一次的轮询后果。

final class SelectedSelectionKeySetSelector extends Selector {
    //Netty 优化后的 SelectedKey 就绪汇合
    private final SelectedSelectionKeySet selectionKeys;
    // 优化后的 JDK NIO 原生 Selector
    private final Selector delegate;

    SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) {
        this.delegate = delegate;
        this.selectionKeys = selectionKeys;
    }

    @Override
    public boolean isOpen() {return delegate.isOpen();
    }

    @Override
    public SelectorProvider provider() {return delegate.provider();
    }

    @Override
    public Set<SelectionKey> keys() {return delegate.keys();
    }

    @Override
    public Set<SelectionKey> selectedKeys() {return delegate.selectedKeys();
    }

    @Override
    public int selectNow() throws IOException {
        // 重置 SelectedKeys 汇合
        selectionKeys.reset();
        return delegate.selectNow();}

    @Override
    public int select(long timeout) throws IOException {
        // 重置 SelectedKeys 汇合
        selectionKeys.reset();
        return delegate.select(timeout);
    }

    @Override
    public int select() throws IOException {
        // 重置 SelectedKeys 汇合
        selectionKeys.reset();
        return delegate.select();}

    @Override
    public Selector wakeup() {return delegate.wakeup();
    }

    @Override
    public void close() throws IOException {delegate.close();
    }
}

到这里 Reactor 的外围 Selector 就创立好了,上面咱们来看下用于保留异步工作的队列是如何创立进去的。

newTaskQueue

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                rejectedExecutionHandler);
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        final SelectorTuple selectorTuple = openSelector();
        // 通过用 SelectedSelectionKeySet 装璜后的 unwrappedSelector
        this.selector = selectorTuple.selector;
        //Netty 优化过的 JDK NIO 近程 Selector
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }

咱们持续回到创立 Reactor 的主线上,到目前为止 Reactor 的外围 Selector 就创立好了,前边咱们提到 Reactor 除了须要 监听 IO 就绪事件 以及解决 IO 就绪事件 外,还须要执行一些异步工作,当内部线程向 Reactor 提交异步工作后,Reactor就须要一个队列来保留这些异步工作,期待 Reactor 线程 执行。

上面咱们来看下 Reactor 中工作队列的创立过程:

    // 工作队列大小,默认是无界队列
    protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
            SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));

    private static Queue<Runnable> newTaskQueue(EventLoopTaskQueueFactory queueFactory) {if (queueFactory == null) {return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
        }
        return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
    }

    private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {// This event loop never calls takeTask()
        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
                : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
    }  
  • NioEventLoop 的父类 SingleThreadEventLoop 中提供了一个动态变量 DEFAULT_MAX_PENDING_TASKS 用来指定 Reactor 工作队列的大小。能够通过零碎变量 -D io.netty.eventLoop.maxPendingTasks 进行设置,默认为 Integer.MAX_VALUE,示意工作队列默认为 无界队列
  • 依据 DEFAULT_MAX_PENDING_TASKS 变量的设定,来决定创立无界工作队列还是有界工作队列。
    // 创立无界工作队列
    PlatformDependent.<Runnable>newMpscQueue()
    // 创立有界工作队列
    PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks)

    public static <T> Queue<T> newMpscQueue() {return Mpsc.newMpscQueue();
    }

    public static <T> Queue<T> newMpscQueue(final int maxCapacity) {return Mpsc.newMpscQueue(maxCapacity);
    }

Reactor内的异步工作队列的类型为 MpscQueue, 它是由JCTools 提供的一个高性能无锁队列,从命名前缀 Mpsc 能够看出,它实用于 多生产者单消费者 的场景,它反对多个生产者线程平安的拜访队列,同一时刻只容许一个消费者线程读取队列中的元素。

咱们晓得 Netty 中的 Reactor 能够 线程平安 的解决注册其上的多个 SocketChannel 上的 IO 数据,保障Reactor 线程平安 的外围起因正是因为这个 MpscQueue,它能够反对多个业务线程在解决完业务逻辑后,线程平安的向MpscQueue 增加 异步写工作 ,而后由单个Reactor 线程 来执行这些 写工作。既然是单线程执行,那必定是线程平安的了。

Reactor 对应的 NioEventLoop 类型继承构造

NioEventLoop的继承构造也是比较复杂,这里咱们只关注在 Reactor 创立过程中波及的到两个父类SingleThreadEventLoop,SingleThreadEventExecutor

剩下的继承体系,咱们在后边随着 Netty 源码的深刻在缓缓介绍。

前边咱们提到,其实 Reactor 咱们能够看作是一个单线程的线程池,只有一个线程用来执行 IO 就绪事件的监听IO 事件的解决 异步工作的执行 。用MpscQueue 来存储待执行的异步工作。

命名前缀为 SingleThread 的父类都是对 Reactor 这些行为的分层定义。也是本大节要介绍的对象

SingleThreadEventLoop

Reactor负责执行的异步工作分为三类:

  • 一般工作:这是 Netty 最次要执行的异步工作,寄存在一般工作队列 taskQueue 中。在 NioEventLoop 构造函数中创立。
  • 定时工作: 寄存在优先级队列中。后续咱们介绍。
  • 尾部工作: 寄存于尾部工作队列 tailTasks 中,尾部工作个别不罕用,在一般工作执行完后 Reactor 线程会执行尾部工作。应用场景:比方对 Netty 的运行状态做一些统计数据,例如工作循环的耗时、占用物理内存的大小等等都能够向尾部队列增加一个收尾工作实现统计数据的实时更新。

SingleThreadEventLoop 负责对 尾部工作队列 tailTasks进行治理。并且提供 ChannelReactor注册的行为。

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    // 工作队列大小,默认是无界队列
    protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
            SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
    
    // 尾部工作队列
    private final Queue<Runnable> tailTasks;

    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
                                    RejectedExecutionHandler rejectedExecutionHandler) {super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
        // 尾部队列 执行一些统计工作 不罕用
        tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
    }

    @Override
    public ChannelFuture register(Channel channel) {
        // 注册 channel 到绑定的 Reactor 上
        return register(new DefaultChannelPromise(channel, this));
    }
}

SingleThreadEventExecutor

SingleThreadEventExecutor次要负责对 一般工作队列 的治理,以及 异步工作的执行Reactor 线程的启停

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) {
        //parent 为 Reactor 所属的 NioEventLoopGroup Reactor 线程组
        super(parent);
        // 向 Reactor 增加工作时,是否唤醒 Selector 进行轮询 IO 就绪事件,马上执行异步工作
        this.addTaskWakesUp = addTaskWakesUp;
        //Reactor 异步工作队列的大小
        this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
        // 用于启动 Reactor 线程的 executor -> ThreadPerTaskExecutor
        this.executor = ThreadExecutorMap.apply(executor, this);
        // 一般工作队列
        this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
        // 工作队列满时的回绝策略
        this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }
}

到当初为止,一个残缺的 Reactor 架构 就被创立进去了。

3. 创立 Channel 到 Reactor 的绑定策略

到这一步,Reactor 线程组 NioEventLoopGroup 里边的所有 Reactor 就曾经全副创立结束。

无论是 Netty 服务端 NioServerSocketChannel 关注的 OP_ACCEPT 事件也好,还是 Netty 客户端 NioSocketChannel 关注的 OP_READOP_WRITE事件也好,都须要先注册到 Reactor 上,Reactor能力监听 Channel 上关注的 IO 事件 实现IO 多路复用

NioEventLoopGroup(Reactor 线程组)里边有泛滥的 Reactor,那么以上提到的这些Channel 到底应该注册到哪个 Reactor 上呢?这就须要一个绑定的策略来平均分配。

还记得咱们前边介绍 MultithreadEventExecutorGroup 类 的时候提到的结构器参数 EventExecutorChooserFactory 吗?

这时候它就派上用场了,它次要用来创立 ChannelReactor的绑定策略。默认为DefaultEventExecutorChooserFactory.INSTANCE

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
   // 从 Reactor 汇合中抉择一个特定的 Reactor 的绑定策略 用于 channel 注册绑定到一个固定的 Reactor 上
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;

    chooser = chooserFactory.newChooser(children);
}

上面咱们来看下具体的绑定策略:

DefaultEventExecutorChooserFactory

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() {}

    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {if (isPowerOfTwo(executors.length)) {return new PowerOfTwoEventExecutorChooser(executors);
        } else {return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {return (val & -val) == val;
    }
    ................... 省略.................
}

咱们看到在 newChooser 办法绑定策略有两个分支,不同之处在于须要判断 Reactor 线程组中的 Reactor 个数是否为 2 的次幂

Netty 中的绑定策略就是采纳 round-robin 轮询的形式来挨个抉择 Reactor 进行绑定。

采纳 round-robin 的形式进行负载平衡,咱们个别会用 round % reactor.length 取余的形式来挨个均匀的定位到对应的 Reactor 上。

如果 Reactor 的个数 reactor.length 恰好是 2 的次幂,那么就能够用位操作& 运算 round & reactor.length -1 来代替 % 运算 round % reactor.length,因为位运算的性能更高。具体为什么& 运算可能代替 % 运算,笔者会在前面讲述工夫轮的时候为大家具体证实,这里大家只需记住这个公式,咱们还是聚焦本文的主线。

理解了优化原理,咱们在看代码实现就很容易了解了。

利用 % 运算的形式 Math.abs(idx.getAndIncrement() % executors.length) 来进行绑定。

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {private final AtomicLong idx = new AtomicLong();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}

        @Override
        public EventExecutor next() {return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }

利用 & 运算的形式 idx.getAndIncrement() & executors.length - 1 来进行绑定。

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}

        @Override
        public EventExecutor next() {return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

又一次被 Netty 对性能的极致谋求所折服~~~~

4. 向 Reactor 线程组中所有的 Reactor 注册 terminated 回调函数

当 Reactor 线程组 NioEventLoopGroup 中所有的 Reactor 曾经创立结束,ChannelReactor 的绑定策略也创立结束后,咱们就来到了创立 NioEventGroup 的最初一步。

俗话说的好,有创立就有启动,有启动就有敞开,这里会创立 Reactor 敞开 的回调函数 terminationListener,在Reactor 敞开时回调。

terminationListener回调的逻辑很简略:

  • 通过 AtomicInteger terminatedChildren 变量记录曾经敞开的 Reactor 个数,用来判断 NioEventLoopGroup 中的 Reactor 是否曾经全副敞开。
  • 如果所有 Reactor 均已敞开,设置 NioEventLoopGroup 中的 terminationFuturesuccess。示意 Reactor 线程组 敞开胜利。
       // 记录敞开的 Reactor 个数,当 Reactor 全副敞开后,才能够认为敞开胜利
        private final AtomicInteger terminatedChildren = new AtomicInteger();
        // 敞开 future
        private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {if (terminatedChildren.incrementAndGet() == children.length) {
                    // 当所有 Reactor 敞开后 才认为是敞开胜利
                    terminationFuture.setSuccess(null);
                }
            }
        };
        
        // 为所有 Reactor 增加 terminationListener
        for (EventExecutor e: children) {e.terminationFuture().addListener(terminationListener);
        }

咱们在回到文章结尾Netty 服务端代码模板

public final class EchoServer {static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure the server.
        // 创立主从 Reactor 线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ........... 省略............
    }
}

当初 Netty 的 主从 Reactor 线程组 就曾经创立结束,此时 Netty 服务端的骨架曾经搭建结束,骨架如下:


总结

本文介绍了首先介绍了 Netty 对各种 IO 模型 的反对以及如何轻松切换各种IO 模型

还花了大量的篇幅介绍 Netty 服务端的外围引擎 主从 Reactor 线程组 的创立过程。在这个过程中,咱们还提到了 Netty 对各种细节进行的优化,展示了 Netty 对性能极致的谋求。

好了,Netty 服务端的骨架曾经搭好,剩下的事件就该绑定端口地址而后接管连贯了,咱们下篇文章再见~~~

欢送关注微信公众号:bin 的技术小屋,浏览公众号原文

正文完
 0