欢送关注微信公众号:bin的技术小屋,浏览公众号原文
本系列Netty源码解析文章基于 4.1.56.Final版本

在上篇文章《聊聊Netty那些事儿之从内核角度看IO模型》中咱们花了大量的篇幅来从内核角度具体讲述了五种IO模型的演进过程以及ReactorIO线程模型的底层基石IO多路复用技术在内核中的实现原理。

最初咱们引出了netty中应用的主从Reactor IO线程模型。

通过上篇文章的介绍,咱们曾经分明了在IO调用的过程中内核帮咱们搞了哪些事件,那么俗话说的好内核领进门,修行在netty,netty在用户空间又帮咱们搞了哪些事件?

那么从本文开始,笔者将从源码角度来带大家看下上图中的Reactor IO线程模型在Netty中是如何实现的。

本文作为Reactor在Netty中实现系列文章中的开篇文章,笔者先来为大家介绍Reactor的骨架是如何创立进去的。

在上篇文章中咱们提到Netty采纳的是主从Reactor多线程的模型,然而它在实现上又与Doug Lea在Scalable IO in Java论文中提到的经典主从Reactor多线程模型有所差别。

Netty中的Reactor是以Group的模式呈现的,主从Reactor在Netty中就是主从Reactor组,每个Reactor Group中会有多个Reactor用来执行具体的IO工作。当然在netty中Reactor不只用来执行IO工作,这个咱们前面再说。

  • Main Reactor Group中的Reactor数量取决于服务端要监听的端口个数,通常咱们的服务端程序只会监听一个端口,所以Main Reactor Group只会有一个Main Reactor线程来解决最重要的事件:绑定端口地址接管客户端连贯为客户端创立对应的SocketChannel将客户端SocketChannel调配给一个固定的Sub Reactor。也就是上篇文章笔者为大家举的例子,饭店最重要的工作就是先把客人迎接进来。“我家大门常关上,凋谢怀抱等你,拥抱过就有了默契你会爱上这里......”

  • Sub Reactor Group里有多个Reactor线程,Reactor线程的个数能够通过零碎参数 -D io.netty.eventLoopThreads指定。默认的Reactor的个数为CPU核数 * 2Sub Reactor线程次要用来轮询客户端SocketChannel上的IO就绪事件解决IO就绪事件执行异步工作Sub Reactor Group做的事件就是上篇饭店例子中服务员的工作,客人进来了要为客人调配座位,端茶送水,做菜上菜。“不论远近都是客人,请不必客气,相约好了在一起,咱们欢迎您......”

一个客户端SocketChannel只能调配给一个固定的Sub Reactor。一个Sub Reactor负责解决多个客户端SocketChannel,这样能够将服务端承载的全量客户端连贯摊派到多个Sub Reactor中解决,同时也能保障客户端SocketChannel上的IO解决的线程安全性

因为文章篇幅的关系,作为Reactor在netty中实现的第一篇咱们次要来介绍主从Reactor Group的创立流程,骨架脉络先搭好。

上面咱们来看一段Netty服务端代码的编写模板,从代码模板的流程中咱们来解析下主从Reactor的创立流程以及在这个过程中所波及到的Netty外围类。

Netty服务端代码模板

/** * Echoes back any received data from a client. */public final class EchoServer {    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));    public static void main(String[] args) throws Exception {        // Configure the server.        //创立主从Reactor线程组        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup();        final EchoServerHandler serverHandler = new EchoServerHandler();        try {            ServerBootstrap b = new ServerBootstrap();            b.group(bossGroup, workerGroup)//配置主从Reactor             .channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型             .option(ChannelOption.SO_BACKLOG, 100)//设置主Reactor中channel的option选项             .handler(new LoggingHandler(LogLevel.INFO))//设置主Reactor中Channel->pipline->handler             .childHandler(new ChannelInitializer<SocketChannel>() {//设置从Reactor中注册channel的pipeline                 @Override                 public void initChannel(SocketChannel ch) throws Exception {                     ChannelPipeline p = ch.pipeline();                     //p.addLast(new LoggingHandler(LogLevel.INFO));                     p.addLast(serverHandler);                 }             });            // Start the server. 绑定端口启动服务,开始监听accept事件            ChannelFuture f = b.bind(PORT).sync();            // Wait until the server socket is closed.            f.channel().closeFuture().sync();        } finally {            // Shut down all event loops to terminate all threads.            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }}
  1. 首先咱们要创立Netty最外围的局部 -> 创立主从Reactor Group,在Netty中EventLoopGroup就是Reactor Group的实现类。对应的EventLoop就是Reactor的实现类。
  //创立主从Reactor线程组  EventLoopGroup bossGroup = new NioEventLoopGroup(1);  EventLoopGroup workerGroup = new NioEventLoopGroup();
  1. 创立用于IO解决ChannelHandler,实现相应IO事件的回调函数,编写对应的IO解决逻辑。留神这里只是简略示例哈,具体的IO事件处理,笔者会独自开一篇文章专门讲述。
final EchoServerHandler serverHandler = new EchoServerHandler();/** * Handler implementation for the echo server. */@Sharablepublic class EchoServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) {        ................省略IO解决逻辑................        ctx.write(msg);    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) {                ctx.flush();    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        // Close the connection when an exception is raised.        cause.printStackTrace();        ctx.close();    }}
  1. 创立ServerBootstrap Netty服务端启动类,并在启动类中配置启动Netty服务端所须要的一些必备信息。

    • 通过 serverBootstrap.group(bossGroup, workerGroup)为Netty服务端配置主从Reactor Group实例。
    • 通过serverBootstrap.channel(NioServerSocketChannel.class)配置Netty服务端的ServerSocketChannel用于绑定端口地址以及创立客户端SocketChannel。Netty中的NioServerSocketChannel.class就是对JDK NIO中ServerSocketChannel的封装。而用于示意客户端连贯NioSocketChannel是对JDK NIO SocketChannel封装。

      在上篇文章介绍Socket内核构造大节中咱们提到,在编写服务端网络程序时,咱们首先要创立一个Socket用于listen和bind端口地址,咱们把这个叫做监听Socket,这里对应的就是NioServerSocketChannel.class。当客户端连贯实现三次握手,零碎调用accept函数会基于监听Socket创立进去一个新的Socket专门用于与客户端之间的网络通信咱们称为客户端连贯Socket,这里对应的就是NioSocketChannel.class
    • serverBootstrap.option(ChannelOption.SO_BACKLOG, 100)设置服务端ServerSocketChannel中的SocketOption。对于SocketOption的选项咱们后边的文章再聊,本文次要聚焦在Netty Main Reactor Group的创立及工作流程。
    • serverBootstrap.handler(....)设置服务端NioServerSocketChannel中对应Pipieline中的ChannelHandler

      netty有两种Channel类型:一种是服务端用于监听绑定端口地址的NioServerSocketChannel,一种是用于客户端通信的NioSocketChannel。每种Channel类型实例都会对应一个PipeLine用于编排对应channel实例上的IO事件处理逻辑。PipeLine中组织的就是ChannelHandler用于编写特定的IO解决逻辑。

      留神serverBootstrap.handler设置的是服务端NioServerSocketChannel PipeLine中的ChannelHandler

    • serverBootstrap.childHandler(ChannelHandler childHandler)用于设置客户端NioSocketChannel中对应Pipieline中的ChannelHandler。咱们通常配置的编码解码器就是在这里。

      ServerBootstrap 启动类办法带有child前缀的均是设置客户端NioSocketChannel属性的。

      ChannelInitializer 是用于当SocketChannel胜利注册到绑定的Reactor上后,用于初始化该SocketChannelPipeline。它的initChannel 办法会在注册胜利后执行。这里只是捎带提一下,让大家有个初步印象,前面我会专门介绍。

  2. ChannelFuture f = serverBootstrap.bind(PORT).sync()这一步会是下篇文章要重点剖析的主题Main Reactor Group的启动,绑定端口地址,开始监听客户端连贯事件(OP_ACCEPT)。本文咱们只关注创立流程。
  3. f.channel().closeFuture().sync()期待服务端NioServerSocketChannel敞开。Netty服务端到这里正式启动,并筹备好承受客户端连贯的筹备。
  4. shutdownGracefully优雅敞开主从Reactor线程组里的所有Reactor线程

Netty对IO模型的反对

在上篇文章中咱们介绍了五种IO模型,Netty中反对BIO,NIO,AIO以及多种操作系统下的IO多路复用技术实现。

在Netty中切换这几种IO模型也是十分的不便,上面咱们来看下Netty如何对这几种IO模型进行反对的。

首先咱们介绍下几个与IO模型相干的重要接口:

EventLoop

EventLoop就是Netty中的Reactor,能够说它就是Netty的引擎,负责Channel上IO就绪事件的监听IO就绪事件的解决异步工作的执行驱动着整个Netty的运行。

不同IO模型下,EventLoop有着不同的实现,咱们只须要切换不同的实现类就能够实现对NettyIO模型的切换。

BIONIOAIO
ThreadPerChannelEventLoopNioEventLoopAioEventLoop

NIO模型下Netty会主动依据操作系统以及版本的不同抉择对应的IO多路复用技术实现。比方Linux 2.6版本以上用的是Epoll,2.6版本以下用的是Poll,Mac下采纳的是Kqueue

其中Linux kernel 在5.1版本引入的异步IO库io_uring正在netty中孵化。

EventLoopGroup

Netty中的Reactor是以Group的模式呈现的,EventLoopGroup正是Reactor组的接口定义,负责管理Reactor,Netty中的Channel就是通过EventLoopGroup注册到具体的Reactor上的。

Netty的IO线程模型是主从Reactor多线程模型主从Reactor线程组在Netty源码中对应的其实就是两个EventLoopGroup实例。

不同的IO模型也有对应的实现:

BIONIOAIO
ThreadPerChannelEventLoopGroupNioEventLoopGroupAioEventLoopGroup

ServerSocketChannel

用于Netty服务端应用的ServerSocketChannel,对应于上篇文章提到的监听Socket,负责绑定监听端口地址,接管客户端连贯并创立用于与客户端通信的SocketChannel

不同的IO模型下的实现:

BIONIOAIO
OioServerSocketChannelNioServerSocketChannelAioServerSocketChannel

SocketChannel

用于与客户端通信的SocketChannel,对应于上篇文章提到的客户端连贯Socket,当客户端实现三次握手后,由零碎调用accept函数依据监听Socket创立。

不同的IO模型下的实现:

BIONIOAIO
OioSocketChannelNioSocketChannelAioSocketChannel

咱们看到在不同IO模型的实现中,Netty这些围绕IO模型的外围类只是前缀的不同:

  • BIO对应的前缀为Oio示意old io,当初曾经废除不举荐应用。
  • NIO对应的前缀为Nio,正是Netty举荐也是咱们罕用的非阻塞IO模型
  • AIO对应的前缀为Aio,因为Linux下的异步IO机制实现的并不成熟,性能晋升体现上也不显著,现已被删除。

咱们只须要将IO模型的这些外围接口对应的实现类前缀改为对应IO模型的前缀,就能够轻松在Netty中实现对IO模型的切换。

多种NIO的实现

CommonLinuxMac
NioEventLoopGroupEpollEventLoopGroupKQueueEventLoopGroup
NioEventLoopEpollEventLoopKQueueEventLoop
NioServerSocketChannelEpollServerSocketChannelKQueueServerSocketChannel
NioSocketChannelEpollSocketChannelKQueueSocketChannel

咱们通常在应用NIO模型的时候会应用Common列下的这些IO模型外围类,Common类也会依据操作系统的不同主动抉择JDK在对应平台下的IO多路复用技术的实现。

而Netty本身也依据操作系统的不同提供了本人对IO多路复用技术的实现,比JDK的实现性能更优。比方:

  • JDK 的 NIO 默认实现是程度触发,Netty 是边缘触发(默认)和程度触发可切换。。
  • Netty 实现的垃圾回收更少、性能更好。

咱们编写Netty服务端程序的时候也能够依据操作系统的不同,采纳Netty本身的实现来进一步优化程序。做法也很简略,间接将上图中红框里的实现类替换成Netty的本身实现类即可实现切换。


通过以上对Netty服务端代码编写模板以及IO模型相干外围类的简略介绍,咱们对Netty的创立流程有了一个简略粗略的总体意识,上面咱们来深刻分析下创立流程过程中的每一个步骤以及这个过程中波及到的外围类实现。

以下源码解析局部咱们均采纳Common列NIO相干的实现进行解析。

创立主从Reactor线程组

在Netty服务端程序编写模板的开始,咱们首先会创立两个Reactor线程组:

  • 一个是主Reactor线程组bossGroup用于监听客户端连贯,创立客户端连贯NioSocketChannel,并将创立好的客户端连贯NioSocketChannel注册到从Reactor线程组中一个固定的Reactor上。
  • 一个是从Reactor线程组workerGroupworkerGroup中的Reactor负责监听绑定在其上的客户端连贯NioSocketChannel上的IO就绪事件,并解决IO就绪事件执行异步工作
  //创立主从Reactor线程组  EventLoopGroup bossGroup = new NioEventLoopGroup(1);  EventLoopGroup workerGroup = new NioEventLoopGroup();

Netty中Reactor线程组的实现类为NioEventLoopGroup,在创立bossGroupworkerGroup的时候用到了NioEventLoopGroup的两个构造函数:

  • nThreads参数的构造函数public NioEventLoopGroup(int nThreads)
  • 不带nThreads参数的默认构造函数public NioEventLoopGroup()
public class NioEventLoopGroup extends MultithreadEventLoopGroup {    /**     * Create a new instance using the default number of threads, the default {@link ThreadFactory} and     * the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.     */    public NioEventLoopGroup() {        this(0);    }    /**     * Create a new instance using the specified number of threads, {@link ThreadFactory} and the     * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.     */    public NioEventLoopGroup(int nThreads) {        this(nThreads, (Executor) null);    }    ......................省略...........................}
nThreads参数示意以后要创立的Reactor线程组内蕴含多少个Reactor线程。不指定nThreads参数的话采纳默认的Reactor线程个数,用0示意。

最终会调用到构造函数

    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,                             final SelectStrategyFactory selectStrategyFactory) {        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());    }

上面简略介绍下构造函数中这几个参数的作用,前面咱们在解说本文主线的过程中还会提及这几个参数,到时在具体介绍,这里只是让大家有个初步印象,不用做过多的纠缠。

  • Executor executor:负责启动Reactor线程进而Reactor才能够开始工作。

    Reactor线程组NioEventLoopGroup 负责创立Reactor线程,在创立的时候会将executor传入。
  • RejectedExecutionHandler: 当向Reactor增加异步工作增加失败时,采纳的回绝策略。Reactor的工作不只是监听IO沉闷事件和IO工作的解决,还包含对异步工作的解决。这里大家只需有个这样的概念,前面笔者会专门具体介绍。
  • SelectorProvider selectorProvider: Reactor中的IO模型为IO多路复用模型,对应于JDK NIO中的实现为java.nio.channels.Selector(就是咱们上篇文章中提到的select,poll,epoll),每个Reator中都蕴含一个Selector,用于轮询注册在该Reactor上的所有Channel上的IO事件SelectorProvider就是用来创立Selector的。
  • SelectStrategyFactory selectStrategyFactory: Reactor最重要的事件就是轮询注册其上的Channel上的IO就绪事件,这里的SelectStrategyFactory用于指定轮询策略,默认为DefaultSelectStrategyFactory.INSTANCE

最终会将这些参数交给NioEventLoopGroup 的父类结构器,上面咱们来看下NioEventLoopGroup类的继承构造:

NioEventLoopGroup类的继承构造乍一看比较复杂,大家不要慌,笔者会随着主线的深刻缓缓地介绍这些父类接口,咱们当初重点关注Mutithread前缀的类。

咱们晓得NioEventLoopGroup是Netty中的Reactor线程组的实现,既然是线程组那么必定是负责管理和创立多个Reactor线程的,所以Mutithread前缀的类定义的行为天然是对Reactor线程组内多个Reactor线程的创立和管理工作。

MultithreadEventLoopGroup

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {    private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);    //默认Reactor个数    private static final int DEFAULT_EVENT_LOOP_THREADS;    static {        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));        if (logger.isDebugEnabled()) {            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);        }    }    /**     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)     */    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);    }    ...................省略.....................}

MultithreadEventLoopGroup类次要的性能就是用来确定Reactor线程组Reactor的个数。

默认的Reactor的个数寄存于字段DEFAULT_EVENT_LOOP_THREADS 中。

static {}动态代码块中咱们能够看出默认Reactor的个数的获取逻辑:

  • 能够通过零碎变量 -D io.netty.eventLoopThreads"指定。
  • 如果不指定,那么默认的就是NettyRuntime.availableProcessors() * 2

nThread参数设置为0采纳默认设置时,Reactor线程组内的Reactor个数则设置为DEFAULT_EVENT_LOOP_THREADS

MultithreadEventExecutorGroup

MultithreadEventExecutorGroup这里就是本大节的外围,次要用来定义创立和治理Reactor的行为。

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {    //Reactor线程组中的Reactor汇合    private final EventExecutor[] children;    private final Set<EventExecutor> readonlyChildren;    //从Reactor group中抉择一个特定的Reactor的抉择策略 用于channel注册绑定到一个固定的Reactor上    private final EventExecutorChooserFactory.EventExecutorChooser chooser;    /**     * Create a new instance.     *     * @param nThreads          the number of threads that will be used by this instance.     * @param executor          the Executor to use, or {@code null} if the default should be used.     * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call     */    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);    }    ............................省略................................}

首先介绍一个新的结构器参数EventExecutorChooserFactory chooserFactory。当客户端连贯实现三次握手后,Main Reactor会创立客户端连贯NioSocketChannel,并将其绑定到Sub Reactor Group中的一个固定Reactor,那么具体要绑定到哪个具体的Sub Reactor上呢?这个绑定策略就是由chooserFactory来创立的。默认为DefaultEventExecutorChooserFactory

上面就是本大节的主题Reactor线程组的创立过程:

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,                                            EventExecutorChooserFactory chooserFactory, Object... args) {        if (nThreads <= 0) {            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));        }        if (executor == null) {            //用于创立Reactor线程            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());        }        children = new EventExecutor[nThreads];        //循环创立reaactor group中的Reactor        for (int i = 0; i < nThreads; i ++) {            boolean success = false;            try {                //创立reactor                children[i] = newChild(executor, args);                success = true;            } catch (Exception e) {                throw new IllegalStateException("failed to create a child event loop", e);            } finally {                     ................省略................                }            }        }        //创立channel到Reactor的绑定策略        chooser = chooserFactory.newChooser(children);         ................省略................        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);        Collections.addAll(childrenSet, children);        readonlyChildren = Collections.unmodifiableSet(childrenSet);    }

1. 创立用于启动Reactor线程的executor

在Netty Reactor Group中的单个ReactorIO线程模型为上篇文章提到的单Reactor单线程模型,一个Reactor线程负责轮询注册其上的所有Channel中的IO就绪事件,解决IO事件,执行Netty中的异步工作等工作。正是这个Reactor线程驱动着整个Netty的运行,堪称是Netty的外围引擎。

而这里的executor就是负责启动Reactor线程的,从创立源码中咱们能够看到executor的类型为ThreadPerTaskExecutor

ThreadPerTaskExecutor

public final class ThreadPerTaskExecutor implements Executor {    private final ThreadFactory threadFactory;    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {        this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");    }    @Override    public void execute(Runnable command) {        threadFactory.newThread(command).start();    }}

咱们看到ThreadPerTaskExecutor 做的事件很简略,从它的命名前缀ThreadPerTask咱们就能够猜出它的工作形式,就是来一个工作就创立一个线程执行。而创立的这个线程正是netty的外围引擎Reactor线程。

Reactor线程启动的时候,Netty会将Reactor线程要做的事件封装成Runnable,丢给exexutor启动。

Reactor线程的外围就是一个死循环不停的轮询IO就绪事件,解决IO事件,执行异步工作。一刻也不停歇,堪称996榜样

这里向大家先卖个关子,"Reactor线程是何时启动的呢??"

2. 创立Reactor

Reactor线程组NioEventLoopGroup蕴含多个Reactor,寄存于private final EventExecutor[] children数组中。

所以上面的事件就是创立nThreadReactor,并寄存于EventExecutor[] children字段中,

咱们来看下用于创立ReactornewChild(executor, args)办法:

newChild

newChild办法是MultithreadEventExecutorGroup中的一个形象办法,提供给具体子类实现。

protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;

这里咱们解析的是NioEventLoopGroup,咱们来看下newChild在该类中的实现:

public class NioEventLoopGroup extends MultithreadEventLoopGroup {    @Override    protected EventLoop newChild(Executor executor, Object... args) throws Exception {        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;        return new NioEventLoop(this, executor, (SelectorProvider) args[0],            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);    }}

前边提到的泛滥结构器参数,这里会通过可变参数Object... args传入到Reactor类NioEventLoop的结构器中。

这里介绍下新的参数EventLoopTaskQueueFactory queueFactory,前边提到Netty中的Reactor次要工作是轮询注册其上的所有Channel上的IO就绪事件,解决IO就绪事件。除了这些次要的工作外,Netty为了极致的压迫Reactor的性能,还会让它做一些异步工作的执行工作。既然要执行异步工作,那么Reactor中就须要一个队列来保留工作。

这里的EventLoopTaskQueueFactory就是用来创立这样的一个队列来保留Reactor中待执行的异步工作。

能够把Reactor了解成为一个单线程的线程池相似JDK中的SingleThreadExecutor,仅用一个线程来执行轮询IO就绪事件解决IO就绪事件执行异步工作。同时待执行的异步工作保留在Reactor里的taskQueue中。

NioEventLoop

public final class NioEventLoop extends SingleThreadEventLoop {    //用于创立JDK NIO Selector,ServerSocketChannel    private final SelectorProvider provider;    //Selector轮询策略 决定什么时候轮询,什么时候解决IO事件,什么时候执行异步工作    private final SelectStrategy selectStrategy;    /**     * The NIO {@link Selector}.     */    private Selector selector;    private Selector unwrappedSelector;    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,                 EventLoopTaskQueueFactory queueFactory) {        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),                rejectedExecutionHandler);        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");        final SelectorTuple selectorTuple = openSelector();        this.selector = selectorTuple.selector;        this.unwrappedSelector = selectorTuple.unwrappedSelector;    }}

这里就正式开始了Reactor的创立过程,咱们晓得Reactor的外围是采纳的IO多路复用模型来对客户端连贯上的IO事件进行监听,所以最重要的事件是创立Selector(JDK NIO 中IO多路复用技术的实现)。

能够把Selector了解为咱们上篇文章介绍的Select,poll,epoll,它是JDK NIO对操作系统内核提供的这些IO多路复用技术的封装。

openSelector

openSelectorNioEventLoop类中用于创立IO多路复用Selector,并对创立进去的JDK NIO 原生的Selector进行性能优化。

首先会通过SelectorProvider#openSelector 创立JDK NIO原生的Selector

 private SelectorTuple openSelector() {        final Selector unwrappedSelector;        try {            //通过JDK NIO SelectorProvider创立Selector            unwrappedSelector = provider.openSelector();        } catch (IOException e) {            throw new ChannelException("failed to open a new selector", e);        }        ..................省略.............}

SelectorProvider会依据操作系统的不同抉择JDK在不同操作系统版本下的对应Selector的实现。Linux下会抉择Epoll,Mac下会抉择Kqueue

上面咱们就来看下SelectorProvider是如何做到主动适配不同操作系统下IO多路复用实现的

SelectorProvider

    public NioEventLoopGroup(ThreadFactory threadFactory) {        this(0, threadFactory, SelectorProvider.provider());    }

SelectorProvider是在后面介绍的NioEventLoopGroup类构造函数中通过调用SelectorProvider.provider()被加载,并通过NioEventLoopGroup#newChild办法中的可变长参数Object... args传递到NioEventLoop中的private final SelectorProvider provider字段中。

SelectorProvider的加载过程:

public abstract class SelectorProvider {    public static SelectorProvider provider() {        synchronized (lock) {            if (provider != null)                return provider;            return AccessController.doPrivileged(                new PrivilegedAction<SelectorProvider>() {                    public SelectorProvider run() {                            if (loadProviderFromProperty())                                return provider;                            if (loadProviderAsService())                                return provider;                            provider = sun.nio.ch.DefaultSelectorProvider.create();                            return provider;                        }                    });        }    }}

SelectorProvider加载源码中咱们能够看出,SelectorProvider的加载形式有三种,优先级如下:

  1. 通过零碎变量-D java.nio.channels.spi.SelectorProvider指定SelectorProvider的自定义实现类全限定名。通过应用程序类加载器(Application Classloader)加载。
    private static boolean loadProviderFromProperty() {        String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");        if (cn == null)            return false;        try {            Class<?> c = Class.forName(cn, true,                                       ClassLoader.getSystemClassLoader());            provider = (SelectorProvider)c.newInstance();            return true;        }         .................省略.............    }
  1. 通过SPI形式加载。在工程目录META-INF/services下定义名为java.nio.channels.spi.SelectorProviderSPI文件,文件中第一个定义的SelectorProvider实现类全限定名就会被加载。
    private static boolean loadProviderAsService() {        ServiceLoader<SelectorProvider> sl =            ServiceLoader.load(SelectorProvider.class,                               ClassLoader.getSystemClassLoader());        Iterator<SelectorProvider> i = sl.iterator();        for (;;) {            try {                if (!i.hasNext())                    return false;                provider = i.next();                return true;            } catch (ServiceConfigurationError sce) {                if (sce.getCause() instanceof SecurityException) {                    // Ignore the security exception, try the next provider                    continue;                }                throw sce;            }        }    }
  1. 如果以上两种形式均未被定义,那么就采纳SelectorProvider零碎默认实现sun.nio.ch.DefaultSelectorProvider。笔者以后应用的操作系统是MacOS,从源码中咱们能够看到主动适配了KQueue实现。
public class DefaultSelectorProvider {    private DefaultSelectorProvider() {    }    public static SelectorProvider create() {        return new KQueueSelectorProvider();    }}
不同操作系统中JDK对于DefaultSelectorProvider会有所不同,Linux内核版本2.6以上对应的Epoll,Linux内核版本2.6以下对应的Poll,MacOS对应的是KQueue

上面咱们接着回到io.netty.channel.nio.NioEventLoop#openSelector的主线上来。

Netty对JDK NIO 原生Selector的优化

首先在NioEventLoop中有一个Selector优化开关DISABLE_KEY_SET_OPTIMIZATION,通过零碎变量-D io.netty.noKeySetOptimization指定,默认是开启的,示意须要对JDK NIO原生Selector进行优化。

public final class NioEventLoop extends SingleThreadEventLoop {   //Selector优化开关 默认开启 为了遍历的效率 会对Selector中的SelectedKeys进行数据结构优化    private static final boolean DISABLE_KEY_SET_OPTIMIZATION =            SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);}

如果优化开关DISABLE_KEY_SET_OPTIMIZATION 是敞开的,那么间接返回JDK NIO原生的Selector

private SelectorTuple openSelector() {        ..........SelectorProvider创立JDK NIO  原生Selector..............        if (DISABLE_KEY_SET_OPTIMIZATION) {            //JDK NIO原生Selector ,Selector优化开关 默认开启须要对Selector进行优化            return new SelectorTuple(unwrappedSelector);        }}

上面为Netty对JDK NIO原生的Selector的优化过程:

  1. 获取JDK NIO原生Selector的形象实现类sun.nio.ch.SelectorImplJDK NIO原生Selector的实现均继承于该抽象类。用于判断由SelectorProvider创立进去的Selector是否为JDK默认实现SelectorProvider第三种加载形式)。因为SelectorProvider能够是自定义加载,所以它创立进去的Selector并不一定是JDK NIO 原生的。
       Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {            @Override            public Object run() {                try {                    return Class.forName(                            "sun.nio.ch.SelectorImpl",                            false,                            PlatformDependent.getSystemClassLoader());                } catch (Throwable cause) {                    return cause;                }            }        });

JDK NIO Selector的抽象类sun.nio.ch.SelectorImpl

public abstract class SelectorImpl extends AbstractSelector {    // The set of keys with data ready for an operation    // //IO就绪的SelectionKey(外面包裹着channel)    protected Set<SelectionKey> selectedKeys;    // The set of keys registered with this Selector    //注册在该Selector上的所有SelectionKey(外面包裹着channel)    protected HashSet<SelectionKey> keys;    // Public views of the key sets    //用于向调用线程返回的keys,不可变    private Set<SelectionKey> publicKeys;             // Immutable    //当有IO就绪的SelectionKey时,向调用线程返回。只可删除其中元素,不可减少    private Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition    protected SelectorImpl(SelectorProvider sp) {        super(sp);        keys = new HashSet<SelectionKey>();        selectedKeys = new HashSet<SelectionKey>();        if (Util.atBugLevel("1.4")) {            publicKeys = keys;            publicSelectedKeys = selectedKeys;        } else {            //不可变            publicKeys = Collections.unmodifiableSet(keys);            //只可删除其中元素,不可减少            publicSelectedKeys = Util.ungrowableSet(selectedKeys);        }    }}

这里笔者来简略介绍下JDK NIO中的Selector中这几个字段的含意,咱们能够和上篇文章讲到的epoll在内核中的构造做类比,不便大家后续的了解:

  • Set<SelectionKey> selectedKeys 相似于咱们上篇文章解说Epoll时提到的就绪队列eventpoll->rdllistSelector这里大家能够了解为EpollSelector会将本人监听到的IO就绪Channel放到selectedKeys中。
这里的SelectionKey暂且能够了解为ChannelSelector中的示意,类比上图中epitem构造里的epoll_event,封装IO就绪Socket的信息。
其实SelectionKey里蕴含的信息不止是Channel还有很多IO相干的信息。前面咱们在具体介绍。
  • HashSet<SelectionKey> keys:这里寄存的是所有注册到该Selector上的Channel。类比epoll中的红黑树结构rb_root

    SelectionKeyChannel注册到Selector中后生成。
  • Set<SelectionKey> publicSelectedKeys 相当于是selectedKeys 的视图,用于向内部线程返回IO就绪SelectionKey。这个汇合在内部线程中只能做删除操作不可减少元素,并且不是线程平安的
  • Set<SelectionKey> publicKeys相当于keys 的不可变视图,用于向内部线程返回所有注册在该Selector上的SelectionKey

这里须要重点关注抽象类sun.nio.ch.SelectorImpl中的selectedKeyspublicSelectedKeys这两个字段,留神它们的类型都是HashSet ,一会优化的就是这里!!!!

  1. 判断由SelectorProvider创立进去的Selector是否是JDK NIO原生的Selector实现。因为Netty优化针对的是JDK NIO 原生Selector。判断规范为sun.nio.ch.SelectorImpl类是否为SelectorProvider创立出Selector的父类。如果不是则间接返回。不在持续上面的优化过程。
        //判断是否能够对Selector进行优化,这里次要针对JDK NIO原生Selector的实现类进行优化,因为SelectorProvider能够加载的是自定义Selector实现        //如果SelectorProvider创立的Selector不是JDK原生sun.nio.ch.SelectorImpl的实现类,那么无奈进行优化,间接返回        if (!(maybeSelectorImplClass instanceof Class) ||            !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {            if (maybeSelectorImplClass instanceof Throwable) {                Throwable t = (Throwable) maybeSelectorImplClass;                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);            }            return new SelectorTuple(unwrappedSelector);        }

通过后面对SelectorProvider的介绍咱们晓得,这里通过provider.openSelector()创立进去的Selector实现类为KQueueSelectorImpl类,它继承实现了sun.nio.ch.SelectorImpl,所以它是JDK NIO 原生的Selector实现

class KQueueSelectorImpl extends SelectorImpl {}
  1. 创立SelectedSelectionKeySet通过反射替换掉sun.nio.ch.SelectorImpl类selectedKeys publicSelectedKeys 的默认HashSet实现。

为什么要用SelectedSelectionKeySet替换掉原来的HashSet呢??

因为这里波及到对HashSet类型sun.nio.ch.SelectorImpl#selectedKeys汇合的两种操作:

  • 插入操作: 通过前边对sun.nio.ch.SelectorImpl类中字段的介绍咱们晓得,在Selector监听到IO就绪SelectionKey 后,会将IO就绪SelectionKey 插入sun.nio.ch.SelectorImpl#selectedKeys汇合中,这时Reactor线程会从java.nio.channels.Selector#select(long)阻塞调用中返回(相似上篇文章提到的epoll_wait)。
  • 遍历操作:Reactor线程返回后,会从Selector中获取IO就绪SelectionKey汇合(也就是sun.nio.ch.SelectorImpl#selectedKeys),Reactor线程遍历selectedKeys,获取IO就绪SocketChannel,并解决SocketChannel上的IO事件

咱们都晓得HashSet底层数据结构是一个哈希表,因为Hash抵触这种状况的存在,所以导致对哈希表进行插入遍历操作的性能不如对数组进行插入遍历操作的性能好。

还有一个重要起因是,数组能够利用CPU缓存的劣势来进步遍历的效率。前面笔者会有一篇专门的文章来讲述利用CPU缓存行如何为咱们带来性能劣势。

所以Netty为了优化对sun.nio.ch.SelectorImpl#selectedKeys汇合的插入,遍历性能,本人用数组这种数据结构实现了SelectedSelectionKeySet ,用它来替换原来的HashSet实现。

SelectedSelectionKeySet

  • 初始化SelectionKey[] keys数组大小为1024,当数组容量不够时,扩容为原来的两倍大小。
  • 通过数组尾部指针size,在向数组插入元素的时候能够间接定位到插入地位keys[size++]。操作一步到位,不必像哈希表那样还须要解决Hash抵触
  • 对数组的遍历操作也是如丝般顺滑,CPU间接能够在缓存行中遍历读取数组元素无需拜访内存。比HashSet的迭代器java.util.HashMap.KeyIterator 遍历形式性能不知高到哪里去了。
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {    //采纳数组替换到JDK中的HashSet,这样add操作和遍历操作效率更高,不须要思考hash抵触    SelectionKey[] keys;    //数组尾部指针    int size;    SelectedSelectionKeySet() {        keys = new SelectionKey[1024];    }    /**     * 数组的增加效率高于 HashSet 因为不须要思考hash抵触     * */    @Override    public boolean add(SelectionKey o) {        if (o == null) {            return false;        }        //工夫复杂度O(1)        keys[size++] = o;        if (size == keys.length) {            //扩容为原来的两倍大小            increaseCapacity();        }        return true;    }    private void increaseCapacity() {        SelectionKey[] newKeys = new SelectionKey[keys.length << 1];        System.arraycopy(keys, 0, newKeys, 0, size);        keys = newKeys;    }    /**     * 采纳数组的遍历效率 高于 HashSet     * */    @Override    public Iterator<SelectionKey> iterator() {        return new Iterator<SelectionKey>() {            private int idx;            @Override            public boolean hasNext() {                return idx < size;            }            @Override            public SelectionKey next() {                if (!hasNext()) {                    throw new NoSuchElementException();                }                return keys[idx++];            }            @Override            public void remove() {                throw new UnsupportedOperationException();            }        };    }}
看到这里不禁感叹,从各种小的细节能够看出Netty对性能的优化几乎酣畅淋漓,对性能的谋求令人发指。细节真的是魔鬼。
  1. Netty通过反射的形式用SelectedSelectionKeySet替换掉sun.nio.ch.SelectorImpl#selectedKeyssun.nio.ch.SelectorImpl#publicSelectedKeys这两个汇合中原来HashSet的实现。
  • 反射获取sun.nio.ch.SelectorImpl类中selectedKeyspublicSelectedKeys
  Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");  Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
  • Java9版本以上通过sun.misc.Unsafe设置字段值的形式
       if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {                        long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);                        long publicSelectedKeysFieldOffset =                                PlatformDependent.objectFieldOffset(publicSelectedKeysField);                        if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {                            PlatformDependent.putObject(                                    unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);                            PlatformDependent.putObject(                                    unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);                            return null;                        }                                            }
  • 通过反射的形式用SelectedSelectionKeySet替换掉hashSet实现的sun.nio.ch.SelectorImpl#selectedKeys,sun.nio.ch.SelectorImpl#publicSelectedKeys
          Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);          if (cause != null) {                return cause;          }          cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);          if (cause != null) {                return cause;          }          //Java8反射替换字段          selectedKeysField.set(unwrappedSelector, selectedKeySet);          publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
  1. 将与sun.nio.ch.SelectorImpl类中selectedKeyspublicSelectedKeys关联好的Netty优化实现SelectedSelectionKeySet,设置到io.netty.channel.nio.NioEventLoop#selectedKeys字段中保留。
   //会通过反射替换selector对象中的selectedKeySet保留就绪的selectKey    //该字段为持有selector对象selectedKeys的援用,当IO事件就绪时,间接从这里获取    private SelectedSelectionKeySet selectedKeys;
后续Reactor线程就会间接从io.netty.channel.nio.NioEventLoop#selectedKeys中获取IO就绪SocketChannel
  1. SelectorTuple封装unwrappedSelectorwrappedSelector返回给NioEventLoop构造函数。到此Reactor中的Selector就创立结束了。
return new SelectorTuple(unwrappedSelector,                      new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    private static final class SelectorTuple {        final Selector unwrappedSelector;        final Selector selector;        SelectorTuple(Selector unwrappedSelector) {            this.unwrappedSelector = unwrappedSelector;            this.selector = unwrappedSelector;        }        SelectorTuple(Selector unwrappedSelector, Selector selector) {            this.unwrappedSelector = unwrappedSelector;            this.selector = selector;        }    }
  • 所谓的unwrappedSelector 是指被Netty优化过的JDK NIO原生Selector。
  • 所谓的wrappedSelector就是用SelectedSelectionKeySetSelector装璜类将unwrappedSelector和与sun.nio.ch.SelectorImpl类关联好的Netty优化实现SelectedSelectionKeySet 封装装璜起来。

wrappedSelector会将所有对Selector的操作全副代理给unwrappedSelector,并在发动轮询IO事件的相干操作中,重置SelectedSelectionKeySet清空上一次的轮询后果。

final class SelectedSelectionKeySetSelector extends Selector {    //Netty优化后的 SelectedKey就绪汇合    private final SelectedSelectionKeySet selectionKeys;    //优化后的JDK NIO 原生Selector    private final Selector delegate;    SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) {        this.delegate = delegate;        this.selectionKeys = selectionKeys;    }    @Override    public boolean isOpen() {        return delegate.isOpen();    }    @Override    public SelectorProvider provider() {        return delegate.provider();    }    @Override    public Set<SelectionKey> keys() {        return delegate.keys();    }    @Override    public Set<SelectionKey> selectedKeys() {        return delegate.selectedKeys();    }    @Override    public int selectNow() throws IOException {        //重置SelectedKeys汇合        selectionKeys.reset();        return delegate.selectNow();    }    @Override    public int select(long timeout) throws IOException {        //重置SelectedKeys汇合        selectionKeys.reset();        return delegate.select(timeout);    }    @Override    public int select() throws IOException {        //重置SelectedKeys汇合        selectionKeys.reset();        return delegate.select();    }    @Override    public Selector wakeup() {        return delegate.wakeup();    }    @Override    public void close() throws IOException {        delegate.close();    }}

到这里Reactor的外围Selector就创立好了,上面咱们来看下用于保留异步工作的队列是如何创立进去的。

newTaskQueue

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,                 EventLoopTaskQueueFactory queueFactory) {        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),                rejectedExecutionHandler);        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");        final SelectorTuple selectorTuple = openSelector();        //通过用SelectedSelectionKeySet装璜后的unwrappedSelector        this.selector = selectorTuple.selector;        //Netty优化过的JDK NIO近程Selector        this.unwrappedSelector = selectorTuple.unwrappedSelector;    }

咱们持续回到创立Reactor的主线上,到目前为止Reactor的外围Selector就创立好了,前边咱们提到Reactor除了须要监听IO就绪事件以及解决IO就绪事件外,还须要执行一些异步工作,当内部线程向Reactor提交异步工作后,Reactor就须要一个队列来保留这些异步工作,期待Reactor线程执行。

上面咱们来看下Reactor中工作队列的创立过程:

    //工作队列大小,默认是无界队列    protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,            SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));    private static Queue<Runnable> newTaskQueue(            EventLoopTaskQueueFactory queueFactory) {        if (queueFactory == null) {            return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);        }        return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);    }    private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {        // This event loop never calls takeTask()        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()                : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);    }  
  • NioEventLoop的父类SingleThreadEventLoop中提供了一个动态变量DEFAULT_MAX_PENDING_TASKS 用来指定Reactor工作队列的大小。能够通过零碎变量-D io.netty.eventLoop.maxPendingTasks进行设置,默认为Integer.MAX_VALUE,示意工作队列默认为无界队列
  • 依据DEFAULT_MAX_PENDING_TASKS 变量的设定,来决定创立无界工作队列还是有界工作队列。
    //创立无界工作队列    PlatformDependent.<Runnable>newMpscQueue()    //创立有界工作队列    PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks)    public static <T> Queue<T> newMpscQueue() {        return Mpsc.newMpscQueue();    }    public static <T> Queue<T> newMpscQueue(final int maxCapacity) {        return Mpsc.newMpscQueue(maxCapacity);    }

Reactor内的异步工作队列的类型为MpscQueue,它是由JCTools提供的一个高性能无锁队列,从命名前缀Mpsc能够看出,它实用于多生产者单消费者的场景,它反对多个生产者线程平安的拜访队列,同一时刻只容许一个消费者线程读取队列中的元素。

咱们晓得Netty中的Reactor能够线程平安的解决注册其上的多个SocketChannel上的IO数据,保障Reactor线程平安的外围起因正是因为这个MpscQueue,它能够反对多个业务线程在解决完业务逻辑后,线程平安的向MpscQueue增加异步写工作,而后由单个Reactor线程来执行这些写工作。既然是单线程执行,那必定是线程平安的了。

Reactor对应的NioEventLoop类型继承构造

NioEventLoop的继承构造也是比较复杂,这里咱们只关注在Reactor创立过程中波及的到两个父类SingleThreadEventLoop,SingleThreadEventExecutor

剩下的继承体系,咱们在后边随着Netty源码的深刻在缓缓介绍。

前边咱们提到,其实Reactor咱们能够看作是一个单线程的线程池,只有一个线程用来执行IO就绪事件的监听IO事件的解决异步工作的执行。用MpscQueue 来存储待执行的异步工作。

命名前缀为SingleThread的父类都是对Reactor这些行为的分层定义。也是本大节要介绍的对象

SingleThreadEventLoop

Reactor负责执行的异步工作分为三类:

  • 一般工作:这是Netty最次要执行的异步工作,寄存在一般工作队列taskQueue 中。在NioEventLoop构造函数中创立。
  • 定时工作: 寄存在优先级队列中。后续咱们介绍。
  • 尾部工作: 寄存于尾部工作队列tailTasks 中,尾部工作个别不罕用,在一般工作执行完后 Reactor线程会执行尾部工作。应用场景:比方对Netty 的运行状态做一些统计数据,例如工作循环的耗时、占用物理内存的大小等等都能够向尾部队列增加一个收尾工作实现统计数据的实时更新。

SingleThreadEventLoop 负责对尾部工作队列tailTasks进行治理。并且提供ChannelReactor注册的行为。

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {    //工作队列大小,默认是无界队列    protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,            SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));        //尾部工作队列    private final Queue<Runnable> tailTasks;    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,                                    boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,                                    RejectedExecutionHandler rejectedExecutionHandler) {        super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);        //尾部队列 执行一些统计工作 不罕用        tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");    }    @Override    public ChannelFuture register(Channel channel) {        //注册channel到绑定的Reactor上        return register(new DefaultChannelPromise(channel, this));    }}

SingleThreadEventExecutor

SingleThreadEventExecutor次要负责对一般工作队列的治理,以及异步工作的执行Reactor线程的启停

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,                                        boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) {        //parent为Reactor所属的NioEventLoopGroup Reactor线程组        super(parent);        //向Reactor增加工作时,是否唤醒Selector进行轮询IO就绪事件,马上执行异步工作        this.addTaskWakesUp = addTaskWakesUp;        //Reactor异步工作队列的大小        this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;        //用于启动Reactor线程的executor -> ThreadPerTaskExecutor        this.executor = ThreadExecutorMap.apply(executor, this);        //一般工作队列        this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");        //工作队列满时的回绝策略        this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");    }}

到当初为止,一个残缺的Reactor架构就被创立进去了。

3. 创立Channel到Reactor的绑定策略

到这一步,Reactor线程组NioEventLoopGroup里边的所有Reactor就曾经全副创立结束。

无论是Netty服务端NioServerSocketChannel关注的OP_ACCEPT事件也好,还是Netty客户端NioSocketChannel关注的OP_READOP_WRITE事件也好,都须要先注册到Reactor上,Reactor能力监听Channel上关注的IO事件实现IO多路复用

NioEventLoopGroup(Reactor线程组)里边有泛滥的Reactor,那么以上提到的这些Channel到底应该注册到哪个Reactor上呢?这就须要一个绑定的策略来平均分配。

还记得咱们前边介绍MultithreadEventExecutorGroup类的时候提到的结构器参数EventExecutorChooserFactory 吗?

这时候它就派上用场了,它次要用来创立ChannelReactor的绑定策略。默认为DefaultEventExecutorChooserFactory.INSTANCE

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {   //从Reactor汇合中抉择一个特定的Reactor的绑定策略 用于channel注册绑定到一个固定的Reactor上    private final EventExecutorChooserFactory.EventExecutorChooser chooser;    chooser = chooserFactory.newChooser(children);}

上面咱们来看下具体的绑定策略:

DefaultEventExecutorChooserFactory

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();    private DefaultEventExecutorChooserFactory() { }    @Override    public EventExecutorChooser newChooser(EventExecutor[] executors) {        if (isPowerOfTwo(executors.length)) {            return new PowerOfTwoEventExecutorChooser(executors);        } else {            return new GenericEventExecutorChooser(executors);        }    }    private static boolean isPowerOfTwo(int val) {        return (val & -val) == val;    }    ...................省略.................}

咱们看到在newChooser 办法绑定策略有两个分支,不同之处在于须要判断Reactor线程组中的Reactor个数是否为2的次幂

Netty中的绑定策略就是采纳round-robin轮询的形式来挨个抉择Reactor进行绑定。

采纳round-robin的形式进行负载平衡,咱们个别会用round % reactor.length取余的形式来挨个均匀的定位到对应的Reactor上。

如果Reactor的个数reactor.length恰好是2的次幂,那么就能够用位操作&运算round & reactor.length -1来代替%运算round % reactor.length,因为位运算的性能更高。具体为什么&运算可能代替%运算,笔者会在前面讲述工夫轮的时候为大家具体证实,这里大家只需记住这个公式,咱们还是聚焦本文的主线。

理解了优化原理,咱们在看代码实现就很容易了解了。

利用%运算的形式Math.abs(idx.getAndIncrement() % executors.length)来进行绑定。

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {        private final AtomicLong idx = new AtomicLong();        private final EventExecutor[] executors;        GenericEventExecutorChooser(EventExecutor[] executors) {            this.executors = executors;        }        @Override        public EventExecutor next() {            return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];        }    }

利用&运算的形式idx.getAndIncrement() & executors.length - 1来进行绑定。

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {        private final AtomicInteger idx = new AtomicInteger();        private final EventExecutor[] executors;        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {            this.executors = executors;        }        @Override        public EventExecutor next() {            return executors[idx.getAndIncrement() & executors.length - 1];        }    }
又一次被Netty对性能的极致谋求所折服~~~~

4. 向Reactor线程组中所有的Reactor注册terminated回调函数

当Reactor线程组NioEventLoopGroup中所有的Reactor曾经创立结束,ChannelReactor的绑定策略也创立结束后,咱们就来到了创立NioEventGroup的最初一步。

俗话说的好,有创立就有启动,有启动就有敞开,这里会创立Reactor敞开的回调函数terminationListener,在Reactor敞开时回调。

terminationListener回调的逻辑很简略:

  • 通过AtomicInteger terminatedChildren变量记录曾经敞开的Reactor个数,用来判断NioEventLoopGroup中的Reactor是否曾经全副敞开。
  • 如果所有Reactor均已敞开,设置NioEventLoopGroup中的terminationFuturesuccess。示意Reactor线程组敞开胜利。
       //记录敞开的Reactor个数,当Reactor全副敞开后,才能够认为敞开胜利        private final AtomicInteger terminatedChildren = new AtomicInteger();        //敞开future        private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);        final FutureListener<Object> terminationListener = new FutureListener<Object>() {            @Override            public void operationComplete(Future<Object> future) throws Exception {                if (terminatedChildren.incrementAndGet() == children.length) {                    //当所有Reactor敞开后 才认为是敞开胜利                    terminationFuture.setSuccess(null);                }            }        };                //为所有Reactor增加terminationListener        for (EventExecutor e: children) {            e.terminationFuture().addListener(terminationListener);        }

咱们在回到文章结尾Netty服务端代码模板

public final class EchoServer {    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));    public static void main(String[] args) throws Exception {        // Configure the server.        //创立主从Reactor线程组        EventLoopGroup bossGroup = new NioEventLoopGroup(1);        EventLoopGroup workerGroup = new NioEventLoopGroup();        ...........省略............    }}

当初Netty的主从Reactor线程组就曾经创立结束,此时Netty服务端的骨架曾经搭建结束,骨架如下:


总结

本文介绍了首先介绍了Netty对各种IO模型的反对以及如何轻松切换各种IO模型

还花了大量的篇幅介绍Netty服务端的外围引擎主从Reactor线程组的创立过程。在这个过程中,咱们还提到了Netty对各种细节进行的优化,展示了Netty对性能极致的谋求。

好了,Netty服务端的骨架曾经搭好,剩下的事件就该绑定端口地址而后接管连贯了,咱们下篇文章再见~~~

欢送关注微信公众号:bin的技术小屋,浏览公众号原文