欢送关注微信公众号:bin的技术小屋,浏览公众号原文
本系列Netty源码解析文章基于 4.1.56.Final版本
在上篇文章《聊聊Netty那些事儿之从内核角度看IO模型》中咱们花了大量的篇幅来从内核角度具体讲述了五种IO模型
的演进过程以及ReactorIO线程模型
的底层基石IO多路复用技术在内核中的实现原理。
最初咱们引出了netty中应用的主从Reactor IO线程模型。
通过上篇文章的介绍,咱们曾经分明了在IO调用的过程中内核帮咱们搞了哪些事件,那么俗话说的好内核领进门,修行在netty
,netty在用户空间又帮咱们搞了哪些事件?
那么从本文开始,笔者将从源码角度来带大家看下上图中的Reactor IO线程模型
在Netty中是如何实现的。
本文作为Reactor在Netty中实现系列文章中的开篇文章,笔者先来为大家介绍Reactor的骨架是如何创立进去的。
在上篇文章中咱们提到Netty采纳的是主从Reactor多线程
的模型,然而它在实现上又与Doug Lea在Scalable IO in Java论文中提到的经典主从Reactor多线程模型
有所差别。
Netty中的Reactor
是以Group
的模式呈现的,主从Reactor
在Netty中就是主从Reactor组
,每个Reactor Group
中会有多个Reactor
用来执行具体的IO工作
。当然在netty中Reactor
不只用来执行IO工作
,这个咱们前面再说。
Main Reactor Group
中的Reactor
数量取决于服务端要监听的端口个数,通常咱们的服务端程序只会监听一个端口,所以Main Reactor Group
只会有一个Main Reactor
线程来解决最重要的事件:绑定端口地址
,接管客户端连贯
,为客户端创立对应的SocketChannel
,将客户端SocketChannel调配给一个固定的Sub Reactor
。也就是上篇文章笔者为大家举的例子,饭店最重要的工作就是先把客人迎接进来。“我家大门常关上,凋谢怀抱等你,拥抱过就有了默契你会爱上这里......”
Sub Reactor Group
里有多个Reactor
线程,Reactor
线程的个数能够通过零碎参数-D io.netty.eventLoopThreads
指定。默认的Reactor
的个数为CPU核数 * 2
。Sub Reactor
线程次要用来轮询客户端SocketChannel上的IO就绪事件
,解决IO就绪事件
,执行异步工作
。Sub Reactor Group
做的事件就是上篇饭店例子中服务员的工作,客人进来了要为客人调配座位,端茶送水,做菜上菜。“不论远近都是客人,请不必客气,相约好了在一起,咱们欢迎您......”
一个客户端SocketChannel
只能调配给一个固定的Sub Reactor
。一个Sub Reactor
负责解决多个客户端SocketChannel
,这样能够将服务端承载的全量客户端连贯
摊派到多个Sub Reactor
中解决,同时也能保障客户端SocketChannel上的IO解决的线程安全性
。
因为文章篇幅的关系,作为Reactor在netty中实现的第一篇咱们次要来介绍主从Reactor Group
的创立流程,骨架脉络先搭好。
上面咱们来看一段Netty服务端代码的编写模板,从代码模板的流程中咱们来解析下主从Reactor的创立流程以及在这个过程中所波及到的Netty外围类。
Netty服务端代码模板
/** * Echoes back any received data from a client. */public final class EchoServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure the server. //创立主从Reactor线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup)//配置主从Reactor .channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型 .option(ChannelOption.SO_BACKLOG, 100)//设置主Reactor中channel的option选项 .handler(new LoggingHandler(LogLevel.INFO))//设置主Reactor中Channel->pipline->handler .childHandler(new ChannelInitializer<SocketChannel>() {//设置从Reactor中注册channel的pipeline @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); // Start the server. 绑定端口启动服务,开始监听accept事件 ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}
- 首先咱们要创立Netty最外围的局部 ->
创立主从Reactor Group
,在Netty中EventLoopGroup
就是Reactor Group
的实现类。对应的EventLoop
就是Reactor
的实现类。
//创立主从Reactor线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();
- 创立用于
IO解决
的ChannelHandler
,实现相应IO事件
的回调函数,编写对应的IO解决
逻辑。留神这里只是简略示例哈,具体的IO事件处理,笔者会独自开一篇文章专门讲述。
final EchoServerHandler serverHandler = new EchoServerHandler();/** * Handler implementation for the echo server. */@Sharablepublic class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ................省略IO解决逻辑................ ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); }}
创立
ServerBootstrap
Netty服务端启动类,并在启动类中配置启动Netty服务端所须要的一些必备信息。- 通过
serverBootstrap.group(bossGroup, workerGroup)
为Netty服务端配置主从Reactor Group
实例。 通过
serverBootstrap.channel(NioServerSocketChannel.class)
配置Netty服务端的ServerSocketChannel
用于绑定端口地址
以及创立客户端SocketChannel
。Netty中的NioServerSocketChannel.class
就是对JDK NIO中ServerSocketChannel
的封装。而用于示意客户端连贯
的NioSocketChannel
是对JDK NIOSocketChannel
封装。在上篇文章介绍
Socket内核构造
大节中咱们提到,在编写服务端网络程序时,咱们首先要创立一个Socket
用于listen和bind
端口地址,咱们把这个叫做监听Socket
,这里对应的就是NioServerSocketChannel.class
。当客户端连贯实现三次握手,零碎调用accept
函数会基于监听Socket
创立进去一个新的Socket
专门用于与客户端之间的网络通信咱们称为客户端连贯Socket
,这里对应的就是NioSocketChannel.class
serverBootstrap.option(ChannelOption.SO_BACKLOG, 100)
设置服务端ServerSocketChannel
中的SocketOption
。对于SocketOption
的选项咱们后边的文章再聊,本文次要聚焦在NettyMain Reactor Group
的创立及工作流程。serverBootstrap.handler(....)
设置服务端NioServerSocketChannel
中对应Pipieline
中的ChannelHandler
。netty有两种
Channel类型
:一种是服务端用于监听绑定端口地址的NioServerSocketChannel
,一种是用于客户端通信的NioSocketChannel
。每种Channel类型实例
都会对应一个PipeLine
用于编排对应channel实例
上的IO事件处理逻辑。PipeLine
中组织的就是ChannelHandler
用于编写特定的IO解决逻辑。留神
serverBootstrap.handler
设置的是服务端NioServerSocketChannel PipeLine
中的ChannelHandler
。serverBootstrap.childHandler(ChannelHandler childHandler)
用于设置客户端NioSocketChannel
中对应Pipieline
中的ChannelHandler
。咱们通常配置的编码解码器就是在这里。ServerBootstrap
启动类办法带有child
前缀的均是设置客户端NioSocketChannel
属性的。ChannelInitializer
是用于当SocketChannel
胜利注册到绑定的Reactor
上后,用于初始化该SocketChannel
的Pipeline
。它的initChannel
办法会在注册胜利后执行。这里只是捎带提一下,让大家有个初步印象,前面我会专门介绍。
- 通过
ChannelFuture f = serverBootstrap.bind(PORT).sync()
这一步会是下篇文章要重点剖析的主题Main Reactor Group
的启动,绑定端口地址,开始监听客户端连贯事件(OP_ACCEPT
)。本文咱们只关注创立流程。f.channel().closeFuture().sync()
期待服务端NioServerSocketChannel
敞开。Netty服务端到这里正式启动,并筹备好承受客户端连贯的筹备。shutdownGracefully
优雅敞开主从Reactor线程组
里的所有Reactor线程
。
Netty对IO模型的反对
在上篇文章中咱们介绍了五种IO模型
,Netty中反对BIO
,NIO
,AIO
以及多种操作系统下的IO多路复用技术
实现。
在Netty中切换这几种IO模型
也是十分的不便,上面咱们来看下Netty如何对这几种IO模型进行反对的。
首先咱们介绍下几个与IO模型
相干的重要接口:
EventLoop
EventLoop
就是Netty中的Reactor
,能够说它就是Netty的引擎,负责Channel上IO就绪事件的监听
,IO就绪事件的解决
,异步工作的执行
驱动着整个Netty的运行。
不同IO模型
下,EventLoop
有着不同的实现,咱们只须要切换不同的实现类就能够实现对NettyIO模型
的切换。
BIO | NIO | AIO |
---|---|---|
ThreadPerChannelEventLoop | NioEventLoop | AioEventLoop |
在NIO模型
下Netty会主动
依据操作系统以及版本的不同抉择对应的IO多路复用技术实现
。比方Linux 2.6版本以上用的是Epoll
,2.6版本以下用的是Poll
,Mac下采纳的是Kqueue
。
其中Linux kernel 在5.1版本引入的异步IO库io_uring正在netty中孵化。
EventLoopGroup
Netty中的Reactor
是以Group
的模式呈现的,EventLoopGroup
正是Reactor组
的接口定义,负责管理Reactor
,Netty中的Channel
就是通过EventLoopGroup
注册到具体的Reactor
上的。
Netty的IO线程模型是主从Reactor多线程模型
,主从Reactor线程组
在Netty源码中对应的其实就是两个EventLoopGroup
实例。
不同的IO模型
也有对应的实现:
BIO | NIO | AIO |
---|---|---|
ThreadPerChannelEventLoopGroup | NioEventLoopGroup | AioEventLoopGroup |
ServerSocketChannel
用于Netty服务端应用的ServerSocketChannel
,对应于上篇文章提到的监听Socket
,负责绑定监听端口地址,接管客户端连贯并创立用于与客户端通信的SocketChannel
。
不同的IO模型
下的实现:
BIO | NIO | AIO |
---|---|---|
OioServerSocketChannel | NioServerSocketChannel | AioServerSocketChannel |
SocketChannel
用于与客户端通信的SocketChannel
,对应于上篇文章提到的客户端连贯Socket
,当客户端实现三次握手后,由零碎调用accept
函数依据监听Socket
创立。
不同的IO模型
下的实现:
BIO | NIO | AIO |
---|---|---|
OioSocketChannel | NioSocketChannel | AioSocketChannel |
咱们看到在不同IO模型
的实现中,Netty这些围绕IO模型
的外围类只是前缀的不同:
- BIO对应的前缀为
Oio
示意old io
,当初曾经废除不举荐应用。 - NIO对应的前缀为
Nio
,正是Netty举荐也是咱们罕用的非阻塞IO模型
。 - AIO对应的前缀为
Aio
,因为Linux下的异步IO
机制实现的并不成熟,性能晋升体现上也不显著,现已被删除。
咱们只须要将IO模型
的这些外围接口对应的实现类前缀
改为对应IO模型
的前缀,就能够轻松在Netty中实现对IO模型
的切换。
多种NIO的实现
Common | Linux | Mac |
---|---|---|
NioEventLoopGroup | EpollEventLoopGroup | KQueueEventLoopGroup |
NioEventLoop | EpollEventLoop | KQueueEventLoop |
NioServerSocketChannel | EpollServerSocketChannel | KQueueServerSocketChannel |
NioSocketChannel | EpollSocketChannel | KQueueSocketChannel |
咱们通常在应用NIO模型
的时候会应用Common列
下的这些IO模型
外围类,Common类
也会依据操作系统的不同主动抉择JDK
在对应平台下的IO多路复用技术
的实现。
而Netty本身也依据操作系统的不同提供了本人对IO多路复用技术
的实现,比JDK
的实现性能更优。比方:
JDK
的 NIO默认
实现是程度触发
,Netty 是边缘触发(默认)
和程度触发可切换。。- Netty 实现的垃圾回收更少、性能更好。
咱们编写Netty服务端程序的时候也能够依据操作系统的不同,采纳Netty本身的实现来进一步优化程序。做法也很简略,间接将上图中红框里的实现类替换成Netty的本身实现类即可实现切换。
通过以上对Netty服务端代码编写模板以及IO模型
相干外围类的简略介绍,咱们对Netty的创立流程有了一个简略粗略的总体意识,上面咱们来深刻分析下创立流程过程中的每一个步骤以及这个过程中波及到的外围类实现。
以下源码解析局部咱们均采纳Common列
下NIO
相干的实现进行解析。
创立主从Reactor线程组
在Netty服务端程序编写模板的开始,咱们首先会创立两个Reactor线程组:
- 一个是主Reactor线程组
bossGroup
用于监听客户端连贯,创立客户端连贯NioSocketChannel
,并将创立好的客户端连贯NioSocketChannel
注册到从Reactor线程组中一个固定的Reactor
上。 - 一个是从Reactor线程组
workerGroup
,workerGroup
中的Reactor
负责监听绑定在其上的客户端连贯NioSocketChannel
上的IO就绪事件
,并解决IO就绪事件
,执行异步工作
。
//创立主从Reactor线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();
Netty中Reactor线程组的实现类为NioEventLoopGroup
,在创立bossGroup
和workerGroup
的时候用到了NioEventLoopGroup
的两个构造函数:
- 带
nThreads
参数的构造函数public NioEventLoopGroup(int nThreads)
。 - 不带
nThreads
参数的默认
构造函数public NioEventLoopGroup()
public class NioEventLoopGroup extends MultithreadEventLoopGroup { /** * Create a new instance using the default number of threads, the default {@link ThreadFactory} and * the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. */ public NioEventLoopGroup() { this(0); } /** * Create a new instance using the specified number of threads, {@link ThreadFactory} and the * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}. */ public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor) null); } ......................省略...........................}
nThreads
参数示意以后要创立的Reactor线程组
内蕴含多少个Reactor线程
。不指定nThreads
参数的话采纳默认的Reactor线程
个数,用0
示意。
最终会调用到构造函数
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
上面简略介绍下构造函数中这几个参数的作用,前面咱们在解说本文主线的过程中还会提及这几个参数,到时在具体介绍,这里只是让大家有个初步印象,不用做过多的纠缠。
Executor executor:
负责启动Reactor线程
进而Reactor才能够开始工作。Reactor线程组
NioEventLoopGroup
负责创立Reactor线程
,在创立的时候会将executor
传入。RejectedExecutionHandler:
当向Reactor
增加异步工作增加失败时,采纳的回绝策略。Reactor的工作不只是监听IO沉闷事件和IO工作的解决,还包含对异步工作的解决。这里大家只需有个这样的概念,前面笔者会专门具体介绍。SelectorProvider selectorProvider:
Reactor中的IO模型为IO多路复用模型
,对应于JDK NIO中的实现为java.nio.channels.Selector
(就是咱们上篇文章中提到的select,poll,epoll
),每个Reator中都蕴含一个Selector
,用于轮询
注册在该Reactor上的所有Channel
上的IO事件
。SelectorProvider
就是用来创立Selector
的。SelectStrategyFactory selectStrategyFactory:
Reactor最重要的事件就是轮询
注册其上的Channel
上的IO就绪事件
,这里的SelectStrategyFactory
用于指定轮询策略
,默认为DefaultSelectStrategyFactory.INSTANCE
。
最终会将这些参数交给NioEventLoopGroup
的父类结构器,上面咱们来看下NioEventLoopGroup类
的继承构造:
NioEventLoopGroup类
的继承构造乍一看比较复杂,大家不要慌,笔者会随着主线的深刻缓缓地介绍这些父类接口,咱们当初重点关注Mutithread
前缀的类。
咱们晓得NioEventLoopGroup
是Netty中的Reactor线程组
的实现,既然是线程组那么必定是负责管理和创立多个Reactor线程的
,所以Mutithread
前缀的类定义的行为天然是对Reactor线程组
内多个Reactor线程
的创立和管理工作。
MultithreadEventLoopGroup
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup { private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class); //默认Reactor个数 private static final int DEFAULT_EVENT_LOOP_THREADS; static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } } /** * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...) */ protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); } ...................省略.....................}
MultithreadEventLoopGroup类
次要的性能就是用来确定Reactor线程组
内Reactor
的个数。
默认的Reactor
的个数寄存于字段DEFAULT_EVENT_LOOP_THREADS
中。
从static {}
动态代码块中咱们能够看出默认Reactor
的个数的获取逻辑:
- 能够通过零碎变量
-D io.netty.eventLoopThreads"
指定。 - 如果不指定,那么默认的就是
NettyRuntime.availableProcessors() * 2
当nThread
参数设置为0
采纳默认设置时,Reactor线程组
内的Reactor
个数则设置为DEFAULT_EVENT_LOOP_THREADS
。
MultithreadEventExecutorGroup
MultithreadEventExecutorGroup
这里就是本大节的外围,次要用来定义创立和治理Reactor
的行为。
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup { //Reactor线程组中的Reactor汇合 private final EventExecutor[] children; private final Set<EventExecutor> readonlyChildren; //从Reactor group中抉择一个特定的Reactor的抉择策略 用于channel注册绑定到一个固定的Reactor上 private final EventExecutorChooserFactory.EventExecutorChooser chooser; /** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. * @param executor the Executor to use, or {@code null} if the default should be used. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); } ............................省略................................}
首先介绍一个新的结构器参数EventExecutorChooserFactory chooserFactory
。当客户端连贯实现三次握手后,Main Reactor
会创立客户端连贯NioSocketChannel
,并将其绑定到Sub Reactor Group
中的一个固定Reactor
,那么具体要绑定到哪个具体的Sub Reactor
上呢?这个绑定策略就是由chooserFactory
来创立的。默认为DefaultEventExecutorChooserFactory
。
上面就是本大节的主题Reactor线程组
的创立过程:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { //用于创立Reactor线程 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; //循环创立reaactor group中的Reactor for (int i = 0; i < nThreads; i ++) { boolean success = false; try { //创立reactor children[i] = newChild(executor, args); success = true; } catch (Exception e) { throw new IllegalStateException("failed to create a child event loop", e); } finally { ................省略................ } } } //创立channel到Reactor的绑定策略 chooser = chooserFactory.newChooser(children); ................省略................ Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
1. 创立用于启动Reactor线程的executor
在Netty Reactor Group中的单个Reactor
的IO线程模型
为上篇文章提到的单Reactor单线程模型
,一个Reactor线程
负责轮询
注册其上的所有Channel
中的IO就绪事件
,解决IO事件,执行Netty中的异步工作等工作。正是这个Reactor线程
驱动着整个Netty的运行,堪称是Netty的外围引擎。
而这里的executor
就是负责启动Reactor线程
的,从创立源码中咱们能够看到executor
的类型为ThreadPerTaskExecutor
。
ThreadPerTaskExecutor
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory"); } @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); }}
咱们看到ThreadPerTaskExecutor
做的事件很简略,从它的命名前缀ThreadPerTask
咱们就能够猜出它的工作形式,就是来一个工作就创立一个线程执行。而创立的这个线程正是netty的外围引擎Reactor线程。
在Reactor线程
启动的时候,Netty会将Reactor线程
要做的事件封装成Runnable
,丢给exexutor
启动。
而Reactor线程
的外围就是一个死循环
不停的轮询
IO就绪事件,解决IO事件,执行异步工作。一刻也不停歇,堪称996榜样
。
这里向大家先卖个关子,"Reactor线程是何时启动的呢??"
2. 创立Reactor
Reactor线程组NioEventLoopGroup
蕴含多个Reactor
,寄存于private final EventExecutor[] children
数组中。
所以上面的事件就是创立nThread
个Reactor
,并寄存于EventExecutor[] children
字段中,
咱们来看下用于创立Reactor
的newChild(executor, args)
办法:
newChild
newChild
办法是MultithreadEventExecutorGroup
中的一个形象办法,提供给具体子类实现。
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
这里咱们解析的是NioEventLoopGroup
,咱们来看下newChild
在该类中的实现:
public class NioEventLoopGroup extends MultithreadEventLoopGroup { @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory); }}
前边提到的泛滥结构器参数,这里会通过可变参数Object... args
传入到Reactor类NioEventLoop
的结构器中。
这里介绍下新的参数EventLoopTaskQueueFactory queueFactory
,前边提到Netty中的Reactor
次要工作是轮询
注册其上的所有Channel
上的IO就绪事件
,解决IO就绪事件
。除了这些次要的工作外,Netty为了极致的压迫Reactor
的性能,还会让它做一些异步工作的执行工作。既然要执行异步工作,那么Reactor
中就须要一个队列
来保留工作。
这里的EventLoopTaskQueueFactory
就是用来创立这样的一个队列来保留Reactor
中待执行的异步工作。
能够把Reactor
了解成为一个单线程的线程池
,相似
于JDK
中的SingleThreadExecutor
,仅用一个线程来执行轮询IO就绪事件
,解决IO就绪事件
,执行异步工作
。同时待执行的异步工作保留在Reactor
里的taskQueue
中。
NioEventLoop
public final class NioEventLoop extends SingleThreadEventLoop { //用于创立JDK NIO Selector,ServerSocketChannel private final SelectorProvider provider; //Selector轮询策略 决定什么时候轮询,什么时候解决IO事件,什么时候执行异步工作 private final SelectStrategy selectStrategy; /** * The NIO {@link Selector}. */ private Selector selector; private Selector unwrappedSelector; NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); final SelectorTuple selectorTuple = openSelector(); this.selector = selectorTuple.selector; this.unwrappedSelector = selectorTuple.unwrappedSelector; }}
这里就正式开始了Reactor
的创立过程,咱们晓得Reactor
的外围是采纳的IO多路复用模型
来对客户端连贯上的IO事件
进行监听
,所以最重要的事件是创立Selector
(JDK NIO 中IO多路复用技术的实现
)。
能够把Selector
了解为咱们上篇文章介绍的Select,poll,epoll
,它是JDK NIO
对操作系统内核提供的这些IO多路复用技术
的封装。
openSelector
openSelector
是NioEventLoop类
中用于创立IO多路复用
的Selector
,并对创立进去的JDK NIO
原生的Selector
进行性能优化。
首先会通过SelectorProvider#openSelector
创立JDK NIO原生的Selector
。
private SelectorTuple openSelector() { final Selector unwrappedSelector; try { //通过JDK NIO SelectorProvider创立Selector unwrappedSelector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } ..................省略.............}
SelectorProvider
会依据操作系统的不同抉择JDK在不同操作系统版本下的对应Selector
的实现。Linux下会抉择Epoll
,Mac下会抉择Kqueue
。
上面咱们就来看下SelectorProvider
是如何做到主动适配不同操作系统下IO多路复用
实现的
SelectorProvider
public NioEventLoopGroup(ThreadFactory threadFactory) { this(0, threadFactory, SelectorProvider.provider()); }
SelectorProvider
是在后面介绍的NioEventLoopGroup类
构造函数中通过调用SelectorProvider.provider()
被加载,并通过NioEventLoopGroup#newChild
办法中的可变长参数Object... args
传递到NioEventLoop
中的private final SelectorProvider provider
字段中。
SelectorProvider的加载过程:
public abstract class SelectorProvider { public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction<SelectorProvider>() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }}
从SelectorProvider
加载源码中咱们能够看出,SelectorProvider
的加载形式有三种,优先级如下:
- 通过零碎变量
-D java.nio.channels.spi.SelectorProvider
指定SelectorProvider
的自定义实现类全限定名
。通过应用程序类加载器(Application Classloader)
加载。
private static boolean loadProviderFromProperty() { String cn = System.getProperty("java.nio.channels.spi.SelectorProvider"); if (cn == null) return false; try { Class<?> c = Class.forName(cn, true, ClassLoader.getSystemClassLoader()); provider = (SelectorProvider)c.newInstance(); return true; } .................省略............. }
- 通过
SPI
形式加载。在工程目录META-INF/services
下定义名为java.nio.channels.spi.SelectorProvider
的SPI文件
,文件中第一个定义的SelectorProvider
实现类全限定名就会被加载。
private static boolean loadProviderAsService() { ServiceLoader<SelectorProvider> sl = ServiceLoader.load(SelectorProvider.class, ClassLoader.getSystemClassLoader()); Iterator<SelectorProvider> i = sl.iterator(); for (;;) { try { if (!i.hasNext()) return false; provider = i.next(); return true; } catch (ServiceConfigurationError sce) { if (sce.getCause() instanceof SecurityException) { // Ignore the security exception, try the next provider continue; } throw sce; } } }
- 如果以上两种形式均未被定义,那么就采纳
SelectorProvider
零碎默认实现sun.nio.ch.DefaultSelectorProvider
。笔者以后应用的操作系统是MacOS
,从源码中咱们能够看到主动适配了KQueue
实现。
public class DefaultSelectorProvider { private DefaultSelectorProvider() { } public static SelectorProvider create() { return new KQueueSelectorProvider(); }}
不同操作系统中JDK对于DefaultSelectorProvider
会有所不同,Linux内核版本2.6以上对应的Epoll
,Linux内核版本2.6以下对应的Poll
,MacOS对应的是KQueue
。
上面咱们接着回到io.netty.channel.nio.NioEventLoop#openSelector
的主线上来。
Netty对JDK NIO 原生Selector的优化
首先在NioEventLoop
中有一个Selector优化开关DISABLE_KEY_SET_OPTIMIZATION
,通过零碎变量-D io.netty.noKeySetOptimization
指定,默认是开启的,示意须要对JDK NIO原生Selector
进行优化。
public final class NioEventLoop extends SingleThreadEventLoop { //Selector优化开关 默认开启 为了遍历的效率 会对Selector中的SelectedKeys进行数据结构优化 private static final boolean DISABLE_KEY_SET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);}
如果优化开关DISABLE_KEY_SET_OPTIMIZATION
是敞开的,那么间接返回JDK NIO原生的Selector
。
private SelectorTuple openSelector() { ..........SelectorProvider创立JDK NIO 原生Selector.............. if (DISABLE_KEY_SET_OPTIMIZATION) { //JDK NIO原生Selector ,Selector优化开关 默认开启须要对Selector进行优化 return new SelectorTuple(unwrappedSelector); }}
上面为Netty对JDK NIO原生的Selector
的优化过程:
- 获取
JDK NIO原生Selector
的形象实现类sun.nio.ch.SelectorImpl
。JDK NIO原生Selector
的实现均继承于该抽象类。用于判断由SelectorProvider
创立进去的Selector
是否为JDK默认实现
(SelectorProvider
第三种加载形式)。因为SelectorProvider
能够是自定义加载,所以它创立进去的Selector
并不一定是JDK NIO 原生的。
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (Throwable cause) { return cause; } } });
JDK NIO Selector的抽象类sun.nio.ch.SelectorImpl
public abstract class SelectorImpl extends AbstractSelector { // The set of keys with data ready for an operation // //IO就绪的SelectionKey(外面包裹着channel) protected Set<SelectionKey> selectedKeys; // The set of keys registered with this Selector //注册在该Selector上的所有SelectionKey(外面包裹着channel) protected HashSet<SelectionKey> keys; // Public views of the key sets //用于向调用线程返回的keys,不可变 private Set<SelectionKey> publicKeys; // Immutable //当有IO就绪的SelectionKey时,向调用线程返回。只可删除其中元素,不可减少 private Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition protected SelectorImpl(SelectorProvider sp) { super(sp); keys = new HashSet<SelectionKey>(); selectedKeys = new HashSet<SelectionKey>(); if (Util.atBugLevel("1.4")) { publicKeys = keys; publicSelectedKeys = selectedKeys; } else { //不可变 publicKeys = Collections.unmodifiableSet(keys); //只可删除其中元素,不可减少 publicSelectedKeys = Util.ungrowableSet(selectedKeys); } }}
这里笔者来简略介绍下JDK NIO中的Selector
中这几个字段的含意,咱们能够和上篇文章讲到的epoll在内核中的构造做类比,不便大家后续的了解:
Set<SelectionKey> selectedKeys
相似于咱们上篇文章解说Epoll
时提到的就绪队列eventpoll->rdllist
,Selector
这里大家能够了解为Epoll
。Selector
会将本人监听到的IO就绪
的Channel
放到selectedKeys
中。
这里的SelectionKey
暂且能够了解为Channel
在Selector
中的示意,类比上图中epitem构造
里的epoll_event
,封装IO就绪Socket的信息。
其实SelectionKey
里蕴含的信息不止是Channel
还有很多IO相干的信息。前面咱们在具体介绍。
HashSet<SelectionKey> keys:
这里寄存的是所有注册到该Selector
上的Channel
。类比epoll中的红黑树结构rb_root
SelectionKey
在Channel
注册到Selector
中后生成。Set<SelectionKey> publicSelectedKeys
相当于是selectedKeys
的视图,用于向内部线程返回IO就绪
的SelectionKey
。这个汇合在内部线程中只能做删除操作不可减少元素
,并且不是线程平安的
。Set<SelectionKey> publicKeys
相当于keys
的不可变视图,用于向内部线程返回所有注册在该Selector
上的SelectionKey
这里须要重点关注
抽象类sun.nio.ch.SelectorImpl
中的selectedKeys
和publicSelectedKeys
这两个字段,留神它们的类型都是HashSet
,一会优化的就是这里!!!!
- 判断由
SelectorProvider
创立进去的Selector
是否是JDK NIO原生的Selector
实现。因为Netty优化针对的是JDK NIO 原生Selector
。判断规范为sun.nio.ch.SelectorImpl
类是否为SelectorProvider
创立出Selector
的父类。如果不是则间接返回。不在持续上面的优化过程。
//判断是否能够对Selector进行优化,这里次要针对JDK NIO原生Selector的实现类进行优化,因为SelectorProvider能够加载的是自定义Selector实现 //如果SelectorProvider创立的Selector不是JDK原生sun.nio.ch.SelectorImpl的实现类,那么无奈进行优化,间接返回 if (!(maybeSelectorImplClass instanceof Class) || !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass instanceof Throwable) { Throwable t = (Throwable) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t); } return new SelectorTuple(unwrappedSelector); }
通过后面对SelectorProvider
的介绍咱们晓得,这里通过provider.openSelector()
创立进去的Selector
实现类为KQueueSelectorImpl类
,它继承实现了sun.nio.ch.SelectorImpl
,所以它是JDK NIO 原生的Selector
实现
class KQueueSelectorImpl extends SelectorImpl {}
- 创立
SelectedSelectionKeySet
通过反射替换掉sun.nio.ch.SelectorImpl类
中selectedKeys
和publicSelectedKeys
的默认HashSet
实现。
为什么要用SelectedSelectionKeySet
替换掉原来的HashSet
呢??
因为这里波及到对HashSet类型
的sun.nio.ch.SelectorImpl#selectedKeys
汇合的两种操作:
- 插入操作: 通过前边对
sun.nio.ch.SelectorImpl类
中字段的介绍咱们晓得,在Selector
监听到IO就绪
的SelectionKey
后,会将IO就绪
的SelectionKey
插入sun.nio.ch.SelectorImpl#selectedKeys
汇合中,这时Reactor线程
会从java.nio.channels.Selector#select(long)
阻塞调用中返回(相似上篇文章提到的epoll_wait
)。 - 遍历操作:
Reactor线程
返回后,会从Selector
中获取IO就绪
的SelectionKey
汇合(也就是sun.nio.ch.SelectorImpl#selectedKeys
),Reactor线程
遍历selectedKeys
,获取IO就绪
的SocketChannel
,并解决SocketChannel
上的IO事件
。
咱们都晓得HashSet
底层数据结构是一个哈希表
,因为Hash抵触
这种状况的存在,所以导致对哈希表
进行插入
和遍历
操作的性能不如对数组
进行插入
和遍历
操作的性能好。
还有一个重要起因是,数组能够利用CPU缓存的劣势来进步遍历的效率。前面笔者会有一篇专门的文章来讲述利用CPU缓存行如何为咱们带来性能劣势。
所以Netty为了优化对sun.nio.ch.SelectorImpl#selectedKeys
汇合的插入,遍历
性能,本人用数组
这种数据结构实现了SelectedSelectionKeySet
,用它来替换原来的HashSet
实现。
SelectedSelectionKeySet
- 初始化
SelectionKey[] keys
数组大小为1024
,当数组容量不够时,扩容为原来的两倍大小。 - 通过数组尾部指针
size
,在向数组插入元素的时候能够间接定位到插入地位keys[size++]
。操作一步到位,不必像哈希表
那样还须要解决Hash抵触
。 - 对数组的遍历操作也是如丝般顺滑,CPU间接能够在缓存行中遍历读取数组元素无需拜访内存。比
HashSet
的迭代器java.util.HashMap.KeyIterator
遍历形式性能不知高到哪里去了。
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> { //采纳数组替换到JDK中的HashSet,这样add操作和遍历操作效率更高,不须要思考hash抵触 SelectionKey[] keys; //数组尾部指针 int size; SelectedSelectionKeySet() { keys = new SelectionKey[1024]; } /** * 数组的增加效率高于 HashSet 因为不须要思考hash抵触 * */ @Override public boolean add(SelectionKey o) { if (o == null) { return false; } //工夫复杂度O(1) keys[size++] = o; if (size == keys.length) { //扩容为原来的两倍大小 increaseCapacity(); } return true; } private void increaseCapacity() { SelectionKey[] newKeys = new SelectionKey[keys.length << 1]; System.arraycopy(keys, 0, newKeys, 0, size); keys = newKeys; } /** * 采纳数组的遍历效率 高于 HashSet * */ @Override public Iterator<SelectionKey> iterator() { return new Iterator<SelectionKey>() { private int idx; @Override public boolean hasNext() { return idx < size; } @Override public SelectionKey next() { if (!hasNext()) { throw new NoSuchElementException(); } return keys[idx++]; } @Override public void remove() { throw new UnsupportedOperationException(); } }; }}
看到这里不禁感叹,从各种小的细节能够看出Netty对性能的优化几乎酣畅淋漓,对性能的谋求令人发指。细节真的是魔鬼。
- Netty通过反射的形式用
SelectedSelectionKeySet
替换掉sun.nio.ch.SelectorImpl#selectedKeys
,sun.nio.ch.SelectorImpl#publicSelectedKeys
这两个汇合中原来HashSet
的实现。
- 反射获取
sun.nio.ch.SelectorImpl
类中selectedKeys
和publicSelectedKeys
。
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
Java9
版本以上通过sun.misc.Unsafe
设置字段值的形式
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField); if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) { PlatformDependent.putObject( unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); PlatformDependent.putObject( unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); return null; } }
- 通过反射的形式用
SelectedSelectionKeySet
替换掉hashSet
实现的sun.nio.ch.SelectorImpl#selectedKeys,sun.nio.ch.SelectorImpl#publicSelectedKeys
。
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); if (cause != null) { return cause; } cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); if (cause != null) { return cause; } //Java8反射替换字段 selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
- 将与
sun.nio.ch.SelectorImpl
类中selectedKeys
和publicSelectedKeys
关联好的Netty优化实现SelectedSelectionKeySet
,设置到io.netty.channel.nio.NioEventLoop#selectedKeys
字段中保留。
//会通过反射替换selector对象中的selectedKeySet保留就绪的selectKey //该字段为持有selector对象selectedKeys的援用,当IO事件就绪时,间接从这里获取 private SelectedSelectionKeySet selectedKeys;
后续Reactor线程
就会间接从io.netty.channel.nio.NioEventLoop#selectedKeys
中获取IO就绪
的SocketChannel
- 用
SelectorTuple
封装unwrappedSelector
和wrappedSelector
返回给NioEventLoop
构造函数。到此Reactor
中的Selector
就创立结束了。
return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
private static final class SelectorTuple { final Selector unwrappedSelector; final Selector selector; SelectorTuple(Selector unwrappedSelector) { this.unwrappedSelector = unwrappedSelector; this.selector = unwrappedSelector; } SelectorTuple(Selector unwrappedSelector, Selector selector) { this.unwrappedSelector = unwrappedSelector; this.selector = selector; } }
- 所谓的
unwrappedSelector
是指被Netty优化过的JDK NIO原生Selector。 - 所谓的
wrappedSelector
就是用SelectedSelectionKeySetSelector
装璜类将unwrappedSelector
和与sun.nio.ch.SelectorImpl类
关联好的Netty优化实现SelectedSelectionKeySet
封装装璜起来。
wrappedSelector
会将所有对Selector
的操作全副代理给unwrappedSelector
,并在发动轮询IO事件
的相干操作中,重置SelectedSelectionKeySet
清空上一次的轮询后果。
final class SelectedSelectionKeySetSelector extends Selector { //Netty优化后的 SelectedKey就绪汇合 private final SelectedSelectionKeySet selectionKeys; //优化后的JDK NIO 原生Selector private final Selector delegate; SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) { this.delegate = delegate; this.selectionKeys = selectionKeys; } @Override public boolean isOpen() { return delegate.isOpen(); } @Override public SelectorProvider provider() { return delegate.provider(); } @Override public Set<SelectionKey> keys() { return delegate.keys(); } @Override public Set<SelectionKey> selectedKeys() { return delegate.selectedKeys(); } @Override public int selectNow() throws IOException { //重置SelectedKeys汇合 selectionKeys.reset(); return delegate.selectNow(); } @Override public int select(long timeout) throws IOException { //重置SelectedKeys汇合 selectionKeys.reset(); return delegate.select(timeout); } @Override public int select() throws IOException { //重置SelectedKeys汇合 selectionKeys.reset(); return delegate.select(); } @Override public Selector wakeup() { return delegate.wakeup(); } @Override public void close() throws IOException { delegate.close(); }}
到这里Reactor的外围Selector就创立好了,上面咱们来看下用于保留异步工作的队列是如何创立进去的。
newTaskQueue
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); final SelectorTuple selectorTuple = openSelector(); //通过用SelectedSelectionKeySet装璜后的unwrappedSelector this.selector = selectorTuple.selector; //Netty优化过的JDK NIO近程Selector this.unwrappedSelector = selectorTuple.unwrappedSelector; }
咱们持续回到创立Reactor
的主线上,到目前为止Reactor
的外围Selector
就创立好了,前边咱们提到Reactor
除了须要监听IO就绪事件
以及解决IO就绪事件
外,还须要执行一些异步工作,当内部线程向Reactor
提交异步工作后,Reactor
就须要一个队列来保留这些异步工作,期待Reactor线程
执行。
上面咱们来看下Reactor
中工作队列的创立过程:
//工作队列大小,默认是无界队列 protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16, SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE)); private static Queue<Runnable> newTaskQueue( EventLoopTaskQueueFactory queueFactory) { if (queueFactory == null) { return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS); } return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS); } private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) { // This event loop never calls takeTask() return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue() : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks); }
- 在
NioEventLoop
的父类SingleThreadEventLoop
中提供了一个动态变量DEFAULT_MAX_PENDING_TASKS
用来指定Reactor
工作队列的大小。能够通过零碎变量-D io.netty.eventLoop.maxPendingTasks
进行设置,默认为Integer.MAX_VALUE
,示意工作队列默认为无界队列
。 - 依据
DEFAULT_MAX_PENDING_TASKS
变量的设定,来决定创立无界工作队列还是有界工作队列。
//创立无界工作队列 PlatformDependent.<Runnable>newMpscQueue() //创立有界工作队列 PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks) public static <T> Queue<T> newMpscQueue() { return Mpsc.newMpscQueue(); } public static <T> Queue<T> newMpscQueue(final int maxCapacity) { return Mpsc.newMpscQueue(maxCapacity); }
Reactor
内的异步工作队列的类型为MpscQueue
,它是由JCTools
提供的一个高性能无锁队列,从命名前缀Mpsc
能够看出,它实用于多生产者单消费者
的场景,它反对多个生产者线程平安的拜访队列,同一时刻只容许一个消费者线程读取队列中的元素。咱们晓得Netty中的
Reactor
能够线程平安
的解决注册其上的多个SocketChannel
上的IO数据
,保障Reactor线程平安
的外围起因正是因为这个MpscQueue
,它能够反对多个业务线程在解决完业务逻辑后,线程平安的向MpscQueue
增加异步写工作
,而后由单个Reactor线程
来执行这些写工作
。既然是单线程执行,那必定是线程平安的了。
Reactor对应的NioEventLoop类型继承构造
NioEventLoop
的继承构造也是比较复杂,这里咱们只关注在Reactor
创立过程中波及的到两个父类SingleThreadEventLoop
,SingleThreadEventExecutor
。
剩下的继承体系,咱们在后边随着Netty
源码的深刻在缓缓介绍。
前边咱们提到,其实Reactor
咱们能够看作是一个单线程的线程池,只有一个线程用来执行IO就绪事件的监听
,IO事件的解决
,异步工作的执行
。用MpscQueue
来存储待执行的异步工作。
命名前缀为SingleThread
的父类都是对Reactor
这些行为的分层定义。也是本大节要介绍的对象
SingleThreadEventLoop
Reactor
负责执行的异步工作分为三类:
一般工作:
这是Netty最次要执行的异步工作,寄存在一般工作队列taskQueue
中。在NioEventLoop
构造函数中创立。定时工作:
寄存在优先级队列中。后续咱们介绍。尾部工作:
寄存于尾部工作队列tailTasks
中,尾部工作个别不罕用,在一般工作执行完后 Reactor线程会执行尾部工作。应用场景:比方对Netty 的运行状态做一些统计数据,例如工作循环的耗时、占用物理内存的大小等等都能够向尾部队列增加一个收尾工作实现统计数据的实时更新。
SingleThreadEventLoop
负责对尾部工作队列tailTasks
进行治理。并且提供Channel
向Reactor
注册的行为。
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { //工作队列大小,默认是无界队列 protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16, SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE)); //尾部工作队列 private final Queue<Runnable> tailTasks; protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler); //尾部队列 执行一些统计工作 不罕用 tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue"); } @Override public ChannelFuture register(Channel channel) { //注册channel到绑定的Reactor上 return register(new DefaultChannelPromise(channel, this)); }}
SingleThreadEventExecutor
SingleThreadEventExecutor
次要负责对一般工作队列
的治理,以及异步工作的执行
,Reactor线程的启停
。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) { //parent为Reactor所属的NioEventLoopGroup Reactor线程组 super(parent); //向Reactor增加工作时,是否唤醒Selector进行轮询IO就绪事件,马上执行异步工作 this.addTaskWakesUp = addTaskWakesUp; //Reactor异步工作队列的大小 this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS; //用于启动Reactor线程的executor -> ThreadPerTaskExecutor this.executor = ThreadExecutorMap.apply(executor, this); //一般工作队列 this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue"); //工作队列满时的回绝策略 this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }}
到当初为止,一个残缺的Reactor架构
就被创立进去了。
3. 创立Channel到Reactor的绑定策略
到这一步,Reactor线程组NioEventLoopGroup
里边的所有Reactor
就曾经全副创立结束。
无论是Netty服务端NioServerSocketChannel
关注的OP_ACCEPT
事件也好,还是Netty客户端NioSocketChannel
关注的OP_READ
和OP_WRITE
事件也好,都须要先注册到Reactor
上,Reactor
能力监听Channel
上关注的IO事件
实现IO多路复用
。
NioEventLoopGroup
(Reactor线程组)里边有泛滥的Reactor
,那么以上提到的这些Channel
到底应该注册到哪个Reactor
上呢?这就须要一个绑定的策略来平均分配。
还记得咱们前边介绍MultithreadEventExecutorGroup类
的时候提到的结构器参数EventExecutorChooserFactory
吗?
这时候它就派上用场了,它次要用来创立Channel
到Reactor
的绑定策略。默认为DefaultEventExecutorChooserFactory.INSTANCE
。
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup { //从Reactor汇合中抉择一个特定的Reactor的绑定策略 用于channel注册绑定到一个固定的Reactor上 private final EventExecutorChooserFactory.EventExecutorChooser chooser; chooser = chooserFactory.newChooser(children);}
上面咱们来看下具体的绑定策略:
DefaultEventExecutorChooserFactory
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory { public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory(); private DefaultEventExecutorChooserFactory() { } @Override public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } } private static boolean isPowerOfTwo(int val) { return (val & -val) == val; } ...................省略.................}
咱们看到在newChooser
办法绑定策略有两个分支,不同之处在于须要判断Reactor线程组中的Reactor
个数是否为2的次幂
。
Netty中的绑定策略就是采纳round-robin
轮询的形式来挨个抉择Reactor
进行绑定。
采纳round-robin
的形式进行负载平衡,咱们个别会用round % reactor.length
取余的形式来挨个均匀的定位到对应的Reactor
上。
如果Reactor
的个数reactor.length
恰好是2的次幂
,那么就能够用位操作&
运算round & reactor.length -1
来代替%
运算round % reactor.length
,因为位运算的性能更高。具体为什么&
运算可能代替%
运算,笔者会在前面讲述工夫轮的时候为大家具体证实,这里大家只需记住这个公式,咱们还是聚焦本文的主线。
理解了优化原理,咱们在看代码实现就很容易了解了。
利用%
运算的形式Math.abs(idx.getAndIncrement() % executors.length)
来进行绑定。
private static final class GenericEventExecutorChooser implements EventExecutorChooser { private final AtomicLong idx = new AtomicLong(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)]; } }
利用&
运算的形式idx.getAndIncrement() & executors.length - 1
来进行绑定。
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } }
又一次被Netty对性能的极致谋求所折服~~~~
4. 向Reactor线程组中所有的Reactor注册terminated回调函数
当Reactor线程组NioEventLoopGroup
中所有的Reactor
曾经创立结束,Channel
到Reactor
的绑定策略也创立结束后,咱们就来到了创立NioEventGroup
的最初一步。
俗话说的好,有创立就有启动,有启动就有敞开,这里会创立Reactor敞开
的回调函数terminationListener
,在Reactor
敞开时回调。
terminationListener
回调的逻辑很简略:
- 通过
AtomicInteger terminatedChildren
变量记录曾经敞开的Reactor
个数,用来判断NioEventLoopGroup
中的Reactor
是否曾经全副敞开。 - 如果所有
Reactor
均已敞开,设置NioEventLoopGroup
中的terminationFuture
为success
。示意Reactor线程组
敞开胜利。
//记录敞开的Reactor个数,当Reactor全副敞开后,才能够认为敞开胜利 private final AtomicInteger terminatedChildren = new AtomicInteger(); //敞开future private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { //当所有Reactor敞开后 才认为是敞开胜利 terminationFuture.setSuccess(null); } } }; //为所有Reactor增加terminationListener for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); }
咱们在回到文章结尾Netty服务端代码模板
public final class EchoServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure the server. //创立主从Reactor线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ...........省略............ }}
当初Netty的主从Reactor线程组
就曾经创立结束,此时Netty服务端的骨架曾经搭建结束,骨架如下:
总结
本文介绍了首先介绍了Netty对各种IO模型
的反对以及如何轻松切换各种IO模型
。
还花了大量的篇幅介绍Netty服务端的外围引擎主从Reactor线程组
的创立过程。在这个过程中,咱们还提到了Netty对各种细节进行的优化,展示了Netty对性能极致的谋求。
好了,Netty服务端的骨架曾经搭好,剩下的事件就该绑定端口地址而后接管连贯了,咱们下篇文章再见~~~
欢送关注微信公众号:bin的技术小屋,浏览公众号原文