共计 36729 个字符,预计需要花费 92 分钟才能阅读完成。
欢送关注微信公众号:bin 的技术小屋,浏览公众号原文
本系列 Netty 源码解析文章基于 4.1.56.Final版本
在上篇文章《聊聊 Netty 那些事儿之从内核角度看 IO 模型》中咱们花了大量的篇幅来从内核角度具体讲述了五种 IO 模型
的演进过程以及 ReactorIO 线程模型
的底层基石 IO 多路复用技术在内核中的实现原理。
最初咱们引出了 netty 中应用的主从 Reactor IO 线程模型。
通过上篇文章的介绍,咱们曾经分明了在 IO 调用的过程中内核帮咱们搞了哪些事件,那么俗话说的好 内核领进门,修行在 netty
,netty 在用户空间又帮咱们搞了哪些事件?
那么从本文开始,笔者将从源码角度来带大家看下上图中的 Reactor IO 线程模型
在 Netty 中是如何实现的。
本文作为 Reactor 在 Netty 中实现系列文章中的开篇文章,笔者先来为大家介绍 Reactor 的骨架是如何创立进去的。
在上篇文章中咱们提到 Netty 采纳的是 主从 Reactor 多线程
的模型,然而它在实现上又与 Doug Lea 在 Scalable IO in Java 论文中提到的经典 主从 Reactor 多线程模型
有所差别。
Netty 中的 Reactor
是以 Group
的模式呈现的,主从 Reactor
在 Netty 中就是 主从 Reactor 组
,每个Reactor Group
中会有多个 Reactor
用来执行具体的 IO 工作
。当然在 netty 中Reactor
不只用来执行IO 工作
,这个咱们前面再说。
Main Reactor Group
中的Reactor
数量取决于服务端要监听的端口个数,通常咱们的服务端程序只会监听一个端口,所以Main Reactor Group
只会有一个Main Reactor
线程来解决最重要的事件:绑定端口地址
,接管客户端连贯
,为客户端创立对应的 SocketChannel
,将客户端 SocketChannel 调配给一个固定的 Sub Reactor
。也就是上篇文章笔者为大家举的例子,饭店最重要的工作就是先把客人迎接进来。“我家大门常关上,凋谢怀抱等你,拥抱过就有了默契你会爱上这里 ……”
Sub Reactor Group
里有多个Reactor
线程,Reactor
线程的个数能够通过零碎参数-D io.netty.eventLoopThreads
指定。默认的Reactor
的个数为CPU 核数 * 2
。Sub Reactor
线程次要用来轮询客户端 SocketChannel 上的 IO 就绪事件
,解决 IO 就绪事件
,执行异步工作
。Sub Reactor Group
做的事件就是上篇饭店例子中服务员的工作,客人进来了要为客人调配座位,端茶送水,做菜上菜。“不论远近都是客人,请不必客气,相约好了在一起,咱们欢迎您 ……”
一个
客户端 SocketChannel
只能调配给一个固定的Sub Reactor
。一个Sub Reactor
负责解决多个客户端 SocketChannel
,这样能够将服务端承载的全量客户端连贯
摊派到多个Sub Reactor
中解决,同时也能保障客户端 SocketChannel 上的 IO 解决的线程安全性
。
因为文章篇幅的关系,作为 Reactor 在 netty 中实现的第一篇咱们次要来介绍 主从 Reactor Group
的创立流程,骨架脉络先搭好。
上面咱们来看一段 Netty 服务端代码的编写模板,从代码模板的流程中咱们来解析下主从 Reactor 的创立流程以及在这个过程中所波及到的 Netty 外围类。
Netty 服务端代码模板
/**
* Echoes back any received data from a client.
*/
public final class EchoServer {static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
// 创立主从 Reactor 线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)// 配置主从 Reactor
.channel(NioServerSocketChannel.class)// 配置主 Reactor 中的 channel 类型
.option(ChannelOption.SO_BACKLOG, 100)// 设置主 Reactor 中 channel 的 option 选项
.handler(new LoggingHandler(LogLevel.INFO))// 设置主 Reactor 中 Channel->pipline->handler
.childHandler(new ChannelInitializer<SocketChannel>() {// 设置从 Reactor 中注册 channel 的 pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server. 绑定端口启动服务,开始监听 accept 事件
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();}
}
}
- 首先咱们要创立 Netty 最外围的局部 ->
创立主从 Reactor Group
,在 Netty 中EventLoopGroup
就是Reactor Group
的实现类。对应的EventLoop
就是Reactor
的实现类。
// 创立主从 Reactor 线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
- 创立用于
IO 解决
的ChannelHandler
,实现相应IO 事件
的回调函数,编写对应的IO 解决
逻辑。留神这里只是简略示例哈,具体的 IO 事件处理,笔者会独自开一篇文章专门讲述。
final EchoServerHandler serverHandler = new EchoServerHandler();
/**
* Handler implementation for the echo server.
*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
................ 省略 IO 解决逻辑................
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();}
}
-
创立
ServerBootstrap
Netty 服务端启动类,并在启动类中配置启动 Netty 服务端所须要的一些必备信息。- 通过
serverBootstrap.group(bossGroup, workerGroup)
为 Netty 服务端配置主从 Reactor Group
实例。 -
通过
serverBootstrap.channel(NioServerSocketChannel.class)
配置 Netty 服务端的ServerSocketChannel
用于绑定端口地址
以及创立客户端 SocketChannel
。Netty 中的NioServerSocketChannel.class
就是对 JDK NIO 中ServerSocketChannel
的封装。而用于示意客户端连贯
的NioSocketChannel
是对 JDK NIOSocketChannel
封装。在上篇文章介绍
Socket 内核构造
大节中咱们提到,在编写服务端网络程序时,咱们首先要创立一个Socket
用于listen 和 bind
端口地址,咱们把这个叫做监听 Socket
, 这里对应的就是NioServerSocketChannel.class
。当客户端连贯实现三次握手,零碎调用accept
函数会基于监听 Socket
创立进去一个新的 Socket
专门用于与客户端之间的网络通信咱们称为客户端连贯 Socket
, 这里对应的就是NioSocketChannel.class
serverBootstrap.option(ChannelOption.SO_BACKLOG, 100)
设置服务端ServerSocketChannel
中的SocketOption
。对于SocketOption
的选项咱们后边的文章再聊,本文次要聚焦在 NettyMain Reactor Group
的创立及工作流程。-
serverBootstrap.handler(....)
设置服务端NioServerSocketChannel
中对应Pipieline
中的ChannelHandler
。netty 有两种
Channel 类型
:一种是服务端用于监听绑定端口地址的NioServerSocketChannel
, 一种是用于客户端通信的NioSocketChannel
。每种Channel 类型实例
都会对应一个PipeLine
用于编排对应 channel 实例
上的 IO 事件处理逻辑。PipeLine
中组织的就是ChannelHandler
用于编写特定的 IO 解决逻辑。留神
serverBootstrap.handler
设置的是服务端NioServerSocketChannel PipeLine
中的ChannelHandler
。 -
serverBootstrap.childHandler(ChannelHandler childHandler)
用于设置客户端NioSocketChannel
中对应Pipieline
中的ChannelHandler
。咱们通常配置的编码解码器就是在这里。ServerBootstrap
启动类办法带有child
前缀的均是设置客户端NioSocketChannel
属性的。ChannelInitializer
是用于当SocketChannel
胜利注册到绑定的Reactor
上后,用于初始化该SocketChannel
的Pipeline
。它的initChannel
办法会在注册胜利后执行。这里只是捎带提一下,让大家有个初步印象,前面我会专门介绍。
- 通过
ChannelFuture f = serverBootstrap.bind(PORT).sync()
这一步会是下篇文章要重点剖析的主题Main Reactor Group
的启动,绑定端口地址,开始监听客户端连贯事件(OP_ACCEPT
)。本文咱们只关注创立流程。f.channel().closeFuture().sync()
期待服务端NioServerSocketChannel
敞开。Netty 服务端到这里正式启动,并筹备好承受客户端连贯的筹备。shutdownGracefully
优雅敞开主从 Reactor 线程组
里的所有Reactor 线程
。
Netty 对 IO 模型的反对
在上篇文章中咱们介绍了五种 IO 模型
,Netty 中反对BIO
,NIO
,AIO
以及多种操作系统下的 IO 多路复用技术
实现。
在 Netty 中切换这几种 IO 模型
也是十分的不便,上面咱们来看下 Netty 如何对这几种 IO 模型进行反对的。
首先咱们介绍下几个与 IO 模型
相干的重要接口:
EventLoop
EventLoop
就是 Netty 中的 Reactor
,能够说它就是 Netty 的引擎,负责 Channel 上IO 就绪事件的监听
,IO 就绪事件的解决
, 异步工作的执行
驱动着整个 Netty 的运行。
不同 IO 模型
下,EventLoop
有着不同的实现,咱们只须要切换不同的实现类就能够实现对 NettyIO 模型
的切换。
BIO | NIO | AIO |
---|---|---|
ThreadPerChannelEventLoop | NioEventLoop | AioEventLoop |
在 NIO 模型
下 Netty 会 主动
依据操作系统以及版本的不同抉择对应的IO 多路复用技术实现
。比方 Linux 2.6 版本以上用的是Epoll
,2.6 版本以下用的是Poll
,Mac 下采纳的是Kqueue
。
其中 Linux kernel 在 5.1 版本引入的异步 IO 库 io_uring 正在 netty 中孵化。
EventLoopGroup
Netty 中的 Reactor
是以 Group
的模式呈现的,EventLoopGroup
正是 Reactor 组
的接口定义,负责管理 Reactor
,Netty 中的Channel
就是通过 EventLoopGroup
注册到具体的 Reactor
上的。
Netty 的 IO 线程模型是 主从 Reactor 多线程模型
, 主从 Reactor 线程组
在 Netty 源码中对应的其实就是两个 EventLoopGroup
实例。
不同的 IO 模型
也有对应的实现:
BIO | NIO | AIO |
---|---|---|
ThreadPerChannelEventLoopGroup | NioEventLoopGroup | AioEventLoopGroup |
ServerSocketChannel
用于 Netty 服务端应用的 ServerSocketChannel
,对应于上篇文章提到的 监听 Socket
,负责绑定监听端口地址,接管客户端连贯并创立用于与客户端通信的SocketChannel
。
不同的 IO 模型
下的实现:
BIO | NIO | AIO |
---|---|---|
OioServerSocketChannel | NioServerSocketChannel | AioServerSocketChannel |
SocketChannel
用于与客户端通信的 SocketChannel
,对应于上篇文章提到的 客户端连贯 Socket
,当客户端实现三次握手后,由零碎调用 accept
函数依据 监听 Socket
创立。
不同的 IO 模型
下的实现:
BIO | NIO | AIO |
---|---|---|
OioSocketChannel | NioSocketChannel | AioSocketChannel |
咱们看到在 不同 IO 模型
的实现中,Netty 这些围绕 IO 模型
的外围类只是前缀的不同:
- BIO 对应的前缀为
Oio
示意old io
,当初曾经废除不举荐应用。 - NIO 对应的前缀为
Nio
,正是 Netty 举荐也是咱们罕用的非阻塞 IO 模型
。 - AIO 对应的前缀为
Aio
,因为 Linux 下的异步 IO
机制实现的并不成熟,性能晋升体现上也不显著,现已被删除。
咱们只须要将 IO 模型
的这些外围接口对应的实现类 前缀
改为对应 IO 模型
的前缀,就能够轻松在 Netty 中实现对 IO 模型
的切换。
多种 NIO 的实现
Common | Linux | Mac |
---|---|---|
NioEventLoopGroup | EpollEventLoopGroup | KQueueEventLoopGroup |
NioEventLoop | EpollEventLoop | KQueueEventLoop |
NioServerSocketChannel | EpollServerSocketChannel | KQueueServerSocketChannel |
NioSocketChannel | EpollSocketChannel | KQueueSocketChannel |
咱们通常在应用 NIO 模型
的时候会应用 Common 列
下的这些 IO 模型
外围类,Common 类
也会依据操作系统的不同主动抉择 JDK
在对应平台下的 IO 多路复用技术
的实现。
而 Netty 本身也依据操作系统的不同提供了本人对 IO 多路复用技术
的实现,比 JDK
的实现性能更优。比方:
JDK
的 NIO默认
实现是程度触发
,Netty 是边缘触发 (默认)
和程度触发可切换。。- Netty 实现的垃圾回收更少、性能更好。
咱们编写 Netty 服务端程序的时候也能够依据操作系统的不同,采纳 Netty 本身的实现来进一步优化程序。做法也很简略,间接将上图中红框里的实现类替换成 Netty 的本身实现类即可实现切换。
通过以上对 Netty 服务端代码编写模板以及 IO 模型
相干外围类的简略介绍,咱们对 Netty 的创立流程有了一个简略粗略的总体意识,上面咱们来深刻分析下创立流程过程中的每一个步骤以及这个过程中波及到的外围类实现。
以下源码解析局部咱们均采纳 Common 列
下NIO
相干的实现进行解析。
创立主从 Reactor 线程组
在 Netty 服务端程序编写模板的开始,咱们首先会创立两个 Reactor 线程组:
- 一个是主 Reactor 线程组
bossGroup
用于监听客户端连贯,创立客户端连贯NioSocketChannel
,并将创立好的客户端连贯NioSocketChannel
注册到从 Reactor 线程组中一个固定的Reactor
上。 - 一个是从 Reactor 线程组
workerGroup
,workerGroup
中的Reactor
负责监听绑定在其上的客户端连贯NioSocketChannel
上的IO 就绪事件
,并解决IO 就绪事件
,执行异步工作
。
// 创立主从 Reactor 线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
Netty 中 Reactor 线程组的实现类为 NioEventLoopGroup
,在创立bossGroup
和workerGroup
的时候用到了 NioEventLoopGroup
的两个构造函数:
- 带
nThreads
参数的构造函数public NioEventLoopGroup(int nThreads)
。 - 不带
nThreads
参数的默认
构造函数public NioEventLoopGroup()
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
/**
* Create a new instance using the default number of threads, the default {@link ThreadFactory} and
* the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup() {this(0);
}
/**
* Create a new instance using the specified number of threads, {@link ThreadFactory} and the
* {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup(int nThreads) {this(nThreads, (Executor) null);
}
...................... 省略...........................
}
nThreads
参数示意以后要创立的Reactor 线程组
内蕴含多少个Reactor 线程
。不指定nThreads
参数的话采纳默认的Reactor 线程
个数,用0
示意。
最终会调用到构造函数
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
上面简略介绍下构造函数中这几个参数的作用,前面咱们在解说本文主线的过程中还会提及这几个参数,到时在具体介绍,这里只是让大家有个初步印象,不用做过多的纠缠。
-
Executor executor:
负责启动Reactor 线程
进而 Reactor 才能够开始工作。Reactor 线程组
NioEventLoopGroup
负责创立Reactor 线程
,在创立的时候会将executor
传入。 RejectedExecutionHandler:
当向Reactor
增加异步工作增加失败时,采纳的回绝策略。Reactor 的工作不只是监听 IO 沉闷事件和 IO 工作的解决,还包含对异步工作的解决。这里大家只需有个这样的概念,前面笔者会专门具体介绍。SelectorProvider selectorProvider:
Reactor 中的 IO 模型为IO 多路复用模型
,对应于 JDK NIO 中的实现为java.nio.channels.Selector
(就是咱们上篇文章中提到的select,poll,epoll
),每个 Reator 中都蕴含一个Selector
,用于轮询
注册在该 Reactor 上的所有Channel
上的IO 事件
。SelectorProvider
就是用来创立Selector
的。SelectStrategyFactory selectStrategyFactory:
Reactor 最重要的事件就是轮询
注册其上的Channel
上的IO 就绪事件
,这里的SelectStrategyFactory
用于指定轮询策略
,默认为DefaultSelectStrategyFactory.INSTANCE
。
最终会将这些参数交给 NioEventLoopGroup
的父类结构器,上面咱们来看下 NioEventLoopGroup 类
的继承构造:
NioEventLoopGroup 类
的继承构造乍一看比较复杂,大家不要慌,笔者会随着主线的深刻缓缓地介绍这些父类接口,咱们当初重点关注 Mutithread
前缀的类。
咱们晓得 NioEventLoopGroup
是 Netty 中的 Reactor 线程组
的实现,既然是线程组那么必定是负责管理和创立 多个 Reactor 线程的
,所以Mutithread
前缀的类定义的行为天然是对 Reactor 线程组
内多个 Reactor 线程
的创立和管理工作。
MultithreadEventLoopGroup
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
// 默认 Reactor 个数
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
................... 省略.....................
}
MultithreadEventLoopGroup 类
次要的性能就是用来确定 Reactor 线程组
内Reactor
的个数。
默认的 Reactor
的个数寄存于字段 DEFAULT_EVENT_LOOP_THREADS
中。
从 static {}
动态代码块中咱们能够看出默认 Reactor
的个数的获取逻辑:
- 能够通过零碎变量
-D io.netty.eventLoopThreads"
指定。 - 如果不指定,那么默认的就是
NettyRuntime.availableProcessors() * 2
当 nThread
参数设置为 0
采纳默认设置时,Reactor 线程组
内的 Reactor
个数则设置为DEFAULT_EVENT_LOOP_THREADS
。
MultithreadEventExecutorGroup
MultithreadEventExecutorGroup
这里就是本大节的外围,次要用来定义创立和治理 Reactor
的行为。
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
//Reactor 线程组中的 Reactor 汇合
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
// 从 Reactor group 中抉择一个特定的 Reactor 的抉择策略 用于 channel 注册绑定到一个固定的 Reactor 上
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
............................ 省略................................
}
首先介绍一个新的结构器参数 EventExecutorChooserFactory chooserFactory
。当客户端连贯实现三次握手后,Main Reactor
会创立客户端连贯 NioSocketChannel
,并将其绑定到Sub Reactor Group
中的一个固定 Reactor
,那么具体要绑定到哪个具体的Sub Reactor
上呢?这个绑定策略就是由 chooserFactory
来创立的。默认为DefaultEventExecutorChooserFactory
。
上面就是本大节的主题 Reactor 线程组
的创立过程:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {if (nThreads <= 0) {throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
// 用于创立 Reactor 线程
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
// 循环创立 reaactor group 中的 Reactor
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 创立 reactor
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {throw new IllegalStateException("failed to create a child event loop", e);
} finally {................ 省略................}
}
}
// 创立 channel 到 Reactor 的绑定策略
chooser = chooserFactory.newChooser(children);
................ 省略................
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
1. 创立用于启动 Reactor 线程的 executor
在 Netty Reactor Group 中的单个 Reactor
的IO 线程模型
为上篇文章提到的 单 Reactor 单线程模型
,一个Reactor 线程
负责 轮询
注册其上的所有 Channel
中的 IO 就绪事件
,解决 IO 事件,执行 Netty 中的异步工作等工作。正是这个Reactor 线程
驱动着整个 Netty 的运行,堪称是 Netty 的外围引擎。
而这里的 executor
就是负责启动 Reactor 线程
的,从创立源码中咱们能够看到 executor
的类型为ThreadPerTaskExecutor
。
ThreadPerTaskExecutor
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
}
@Override
public void execute(Runnable command) {threadFactory.newThread(command).start();}
}
咱们看到 ThreadPerTaskExecutor
做的事件很简略,从它的命名前缀 ThreadPerTask
咱们就能够猜出它的工作形式,就是来一个工作就创立一个线程执行。而创立的这个线程正是 netty 的外围引擎 Reactor 线程。
在 Reactor 线程
启动的时候,Netty 会将 Reactor 线程
要做的事件封装成 Runnable
,丢给exexutor
启动。
而 Reactor 线程
的外围就是一个 死循环
不停的 轮询
IO 就绪事件,解决 IO 事件,执行异步工作。一刻也不停歇,堪称996 榜样
。
这里向大家先卖个关子,"Reactor 线程是何时启动的呢??"
2. 创立 Reactor
Reactor 线程组 NioEventLoopGroup
蕴含多个 Reactor
,寄存于private final EventExecutor[] children
数组中。
所以上面的事件就是创立 nThread
个Reactor
,并寄存于 EventExecutor[] children
字段中,
咱们来看下用于创立 Reactor
的newChild(executor, args)
办法:
newChild
newChild
办法是 MultithreadEventExecutorGroup
中的一个形象办法,提供给具体子类实现。
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
这里咱们解析的是 NioEventLoopGroup
,咱们来看下newChild
在该类中的实现:
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
}
前边提到的泛滥结构器参数,这里会通过可变参数 Object... args
传入到 Reactor 类 NioEventLoop
的结构器中。
这里介绍下新的参数 EventLoopTaskQueueFactory queueFactory
,前边提到 Netty 中的Reactor
次要工作是 轮询
注册其上的所有 Channel
上的 IO 就绪事件
,解决IO 就绪事件
。除了这些次要的工作外,Netty 为了极致的压迫Reactor
的性能,还会让它做一些异步工作的执行工作。既然要执行异步工作,那么 Reactor
中就须要一个 队列
来保留工作。
这里的 EventLoopTaskQueueFactory
就是用来创立这样的一个队列来保留 Reactor
中待执行的异步工作。
能够把 Reactor
了解成为一个 单线程的线程池
, 相似
于JDK
中的 SingleThreadExecutor
,仅用一个线程来执行 轮询 IO 就绪事件
, 解决 IO 就绪事件
, 执行异步工作
。同时待执行的异步工作保留在Reactor
里的 taskQueue
中。
NioEventLoop
public final class NioEventLoop extends SingleThreadEventLoop {
// 用于创立 JDK NIO Selector,ServerSocketChannel
private final SelectorProvider provider;
//Selector 轮询策略 决定什么时候轮询,什么时候解决 IO 事件,什么时候执行异步工作
private final SelectStrategy selectStrategy;
/**
* The NIO {@link Selector}.
*/
private Selector selector;
private Selector unwrappedSelector;
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
}
这里就正式开始了 Reactor
的创立过程,咱们晓得 Reactor
的外围是采纳的 IO 多路复用模型
来对客户端连贯上的 IO 事件
进行 监听
,所以最重要的事件是创立Selector
(JDK NIO 中 IO 多路复用技术的实现
)。
能够把
Selector
了解为咱们上篇文章介绍的Select,poll,epoll
,它是JDK NIO
对操作系统内核提供的这些IO 多路复用技术
的封装。
openSelector
openSelector
是 NioEventLoop 类
中用于创立 IO 多路复用
的Selector
,并对创立进去的 JDK NIO
原生的Selector
进行性能优化。
首先会通过 SelectorProvider#openSelector
创立 JDK NIO 原生的Selector
。
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// 通过 JDK NIO SelectorProvider 创立 Selector
unwrappedSelector = provider.openSelector();} catch (IOException e) {throw new ChannelException("failed to open a new selector", e);
}
.................. 省略.............
}
SelectorProvider
会依据操作系统的不同抉择 JDK 在不同操作系统版本下的对应 Selector
的实现。Linux 下会抉择Epoll
,Mac 下会抉择Kqueue
。
上面咱们就来看下 SelectorProvider
是如何做到主动适配不同操作系统下 IO 多路复用
实现的
SelectorProvider
public NioEventLoopGroup(ThreadFactory threadFactory) {this(0, threadFactory, SelectorProvider.provider());
}
SelectorProvider
是在后面介绍的 NioEventLoopGroup 类
构造函数中通过调用 SelectorProvider.provider()
被加载,并通过 NioEventLoopGroup#newChild
办法中的可变长参数 Object... args
传递到 NioEventLoop
中的 private final SelectorProvider provider
字段中。
SelectorProvider 的加载过程:
public abstract class SelectorProvider {public static SelectorProvider provider() {synchronized (lock) {if (provider != null)
return provider;
return AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {public SelectorProvider run() {if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
}
从 SelectorProvider
加载源码中咱们能够看出,SelectorProvider
的加载形式有三种,优先级如下:
- 通过零碎变量
-D java.nio.channels.spi.SelectorProvider
指定SelectorProvider
的自定义实现类全限定名
。通过应用程序类加载器 (Application Classloader)
加载。
private static boolean loadProviderFromProperty() {String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
if (cn == null)
return false;
try {
Class<?> c = Class.forName(cn, true,
ClassLoader.getSystemClassLoader());
provider = (SelectorProvider)c.newInstance();
return true;
}
................. 省略.............
}
- 通过
SPI
形式加载。在工程目录META-INF/services
下定义名为java.nio.channels.spi.SelectorProvider
的SPI 文件
,文件中第一个定义的SelectorProvider
实现类全限定名就会被加载。
private static boolean loadProviderAsService() {
ServiceLoader<SelectorProvider> sl =
ServiceLoader.load(SelectorProvider.class,
ClassLoader.getSystemClassLoader());
Iterator<SelectorProvider> i = sl.iterator();
for (;;) {
try {if (!i.hasNext())
return false;
provider = i.next();
return true;
} catch (ServiceConfigurationError sce) {if (sce.getCause() instanceof SecurityException) {
// Ignore the security exception, try the next provider
continue;
}
throw sce;
}
}
}
- 如果以上两种形式均未被定义,那么就采纳
SelectorProvider
零碎默认实现sun.nio.ch.DefaultSelectorProvider
。笔者以后应用的操作系统是MacOS
,从源码中咱们能够看到主动适配了KQueue
实现。
public class DefaultSelectorProvider {private DefaultSelectorProvider() { }
public static SelectorProvider create() {return new KQueueSelectorProvider();
}
}
不同操作系统中 JDK 对于
DefaultSelectorProvider
会有所不同,Linux 内核版本 2.6 以上对应的Epoll
,Linux 内核版本 2.6 以下对应的Poll
,MacOS 对应的是KQueue
。
上面咱们接着回到 io.netty.channel.nio.NioEventLoop#openSelector
的主线上来。
Netty 对 JDK NIO 原生 Selector 的优化
首先在 NioEventLoop
中有一个 Selector 优化开关 DISABLE_KEY_SET_OPTIMIZATION
, 通过零碎变量-D io.netty.noKeySetOptimization
指定,默认是开启的,示意须要对 JDK NIO 原生 Selector
进行优化。
public final class NioEventLoop extends SingleThreadEventLoop {
//Selector 优化开关 默认开启 为了遍历的效率 会对 Selector 中的 SelectedKeys 进行数据结构优化
private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
}
如果优化开关 DISABLE_KEY_SET_OPTIMIZATION
是敞开的,那么间接返回 JDK NIO 原生的Selector
。
private SelectorTuple openSelector() {
..........SelectorProvider 创立 JDK NIO 原生 Selector..............
if (DISABLE_KEY_SET_OPTIMIZATION) {
//JDK NIO 原生 Selector,Selector 优化开关 默认开启须要对 Selector 进行优化
return new SelectorTuple(unwrappedSelector);
}
}
上面为 Netty 对 JDK NIO 原生的 Selector
的优化过程:
- 获取
JDK NIO 原生 Selector
的形象实现类sun.nio.ch.SelectorImpl
。JDK NIO 原生 Selector
的实现均继承于该抽象类。用于判断由SelectorProvider
创立进去的Selector
是否为JDK 默认实现
(SelectorProvider
第三种加载形式)。因为SelectorProvider
能够是自定义加载,所以它创立进去的Selector
并不一定是 JDK NIO 原生的。
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {return cause;}
}
});
JDK NIO Selector 的抽象类sun.nio.ch.SelectorImpl
public abstract class SelectorImpl extends AbstractSelector {
// The set of keys with data ready for an operation
// //IO 就绪的 SelectionKey(外面包裹着 channel)protected Set<SelectionKey> selectedKeys;
// The set of keys registered with this Selector
// 注册在该 Selector 上的所有 SelectionKey(外面包裹着 channel)protected HashSet<SelectionKey> keys;
// Public views of the key sets
// 用于向调用线程返回的 keys,不可变
private Set<SelectionKey> publicKeys; // Immutable
// 当有 IO 就绪的 SelectionKey 时,向调用线程返回。只可删除其中元素,不可减少
private Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition
protected SelectorImpl(SelectorProvider sp) {super(sp);
keys = new HashSet<SelectionKey>();
selectedKeys = new HashSet<SelectionKey>();
if (Util.atBugLevel("1.4")) {
publicKeys = keys;
publicSelectedKeys = selectedKeys;
} else {
// 不可变
publicKeys = Collections.unmodifiableSet(keys);
// 只可删除其中元素,不可减少
publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}
}
}
这里笔者来简略介绍下 JDK NIO 中的 Selector
中这几个字段的含意,咱们能够和上篇文章讲到的 epoll 在内核中的构造做类比,不便大家后续的了解:
Set<SelectionKey> selectedKeys
相似于咱们上篇文章解说Epoll
时提到的就绪队列 eventpoll->rdllist
,Selector
这里大家能够了解为Epoll
。Selector
会将本人监听到的IO 就绪
的Channel
放到selectedKeys
中。
这里的
SelectionKey
暂且能够了解为Channel
在Selector
中的示意,类比上图中epitem 构造
里的epoll_event
,封装 IO 就绪 Socket 的信息。
其实SelectionKey
里蕴含的信息不止是Channel
还有很多 IO 相干的信息。前面咱们在具体介绍。
-
HashSet<SelectionKey> keys:
这里寄存的是所有注册到该Selector
上的Channel
。类比epoll 中的红黑树结构 rb_root
SelectionKey
在Channel
注册到Selector
中后生成。 Set<SelectionKey> publicSelectedKeys
相当于是selectedKeys
的视图,用于向内部线程返回IO 就绪
的SelectionKey
。这个汇合在内部线程中只能做删除操作不可减少元素
,并且不是线程平安的
。Set<SelectionKey> publicKeys
相当于keys
的不可变视图,用于向内部线程返回所有注册在该Selector
上的SelectionKey
这里须要 重点关注
抽象类 sun.nio.ch.SelectorImpl
中的 selectedKeys
和publicSelectedKeys
这两个字段,留神它们的类型都是HashSet
,一会优化的就是这里!!!!
- 判断由
SelectorProvider
创立进去的Selector
是否是 JDK NIO 原生的Selector
实现。因为 Netty 优化针对的是 JDK NIO 原生Selector
。判断规范为sun.nio.ch.SelectorImpl
类是否为SelectorProvider
创立出Selector
的父类。如果不是则间接返回。不在持续上面的优化过程。
// 判断是否能够对 Selector 进行优化,这里次要针对 JDK NIO 原生 Selector 的实现类进行优化,因为 SelectorProvider 能够加载的是自定义 Selector 实现
// 如果 SelectorProvider 创立的 Selector 不是 JDK 原生 sun.nio.ch.SelectorImpl 的实现类,那么无奈进行优化,间接返回
if (!(maybeSelectorImplClass instanceof Class) ||
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {if (maybeSelectorImplClass instanceof Throwable) {Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
通过后面对 SelectorProvider
的介绍咱们晓得,这里通过 provider.openSelector()
创立进去的 Selector
实现类为 KQueueSelectorImpl 类
,它继承实现了sun.nio.ch.SelectorImpl
,所以它是 JDK NIO 原生的Selector
实现
class KQueueSelectorImpl extends SelectorImpl {}
- 创立
SelectedSelectionKeySet
通过反射替换掉sun.nio.ch.SelectorImpl 类
中selectedKeys
和publicSelectedKeys
的默认HashSet
实现。
为什么要用 SelectedSelectionKeySet
替换掉原来的 HashSet
呢??
因为这里波及到对 HashSet 类型
的sun.nio.ch.SelectorImpl#selectedKeys
汇合的两种操作:
- 插入操作: 通过前边对
sun.nio.ch.SelectorImpl 类
中字段的介绍咱们晓得,在Selector
监听到IO 就绪
的SelectionKey
后,会将IO 就绪
的SelectionKey
插入sun.nio.ch.SelectorImpl#selectedKeys
汇合中,这时Reactor 线程
会从java.nio.channels.Selector#select(long)
阻塞调用中返回(相似上篇文章提到的epoll_wait
)。 - 遍历操作:
Reactor 线程
返回后,会从Selector
中获取IO 就绪
的SelectionKey
汇合(也就是sun.nio.ch.SelectorImpl#selectedKeys
),Reactor 线程
遍历selectedKeys
, 获取IO 就绪
的SocketChannel
,并解决SocketChannel
上的IO 事件
。
咱们都晓得 HashSet
底层数据结构是一个 哈希表
,因为Hash 抵触
这种状况的存在,所以导致对 哈希表
进行 插入
和遍历
操作的性能不如对 数组
进行 插入
和遍历
操作的性能好。
还有一个重要起因是,数组能够利用 CPU 缓存的劣势来进步遍历的效率。前面笔者会有一篇专门的文章来讲述利用 CPU 缓存行如何为咱们带来性能劣势。
所以 Netty 为了优化对 sun.nio.ch.SelectorImpl#selectedKeys
汇合的 插入,遍历
性能,本人用 数组
这种数据结构实现了 SelectedSelectionKeySet
,用它来替换原来的HashSet
实现。
SelectedSelectionKeySet
- 初始化
SelectionKey[] keys
数组大小为1024
,当数组容量不够时,扩容为原来的两倍大小。 - 通过数组尾部指针
size
,在向数组插入元素的时候能够间接定位到插入地位keys[size++]
。操作一步到位,不必像哈希表
那样还须要解决Hash 抵触
。 - 对数组的遍历操作也是如丝般顺滑,CPU 间接能够在缓存行中遍历读取数组元素无需拜访内存。比
HashSet
的迭代器java.util.HashMap.KeyIterator
遍历形式性能不知高到哪里去了。
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
// 采纳数组替换到 JDK 中的 HashSet, 这样 add 操作和遍历操作效率更高,不须要思考 hash 抵触
SelectionKey[] keys;
// 数组尾部指针
int size;
SelectedSelectionKeySet() {keys = new SelectionKey[1024];
}
/**
* 数组的增加效率高于 HashSet 因为不须要思考 hash 抵触
* */
@Override
public boolean add(SelectionKey o) {if (o == null) {return false;}
// 工夫复杂度 O(1)keys[size++] = o;
if (size == keys.length) {
// 扩容为原来的两倍大小
increaseCapacity();}
return true;
}
private void increaseCapacity() {SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
System.arraycopy(keys, 0, newKeys, 0, size);
keys = newKeys;
}
/**
* 采纳数组的遍历效率 高于 HashSet
* */
@Override
public Iterator<SelectionKey> iterator() {return new Iterator<SelectionKey>() {
private int idx;
@Override
public boolean hasNext() {return idx < size;}
@Override
public SelectionKey next() {if (!hasNext()) {throw new NoSuchElementException();
}
return keys[idx++];
}
@Override
public void remove() {throw new UnsupportedOperationException();
}
};
}
}
看到这里不禁感叹,从各种小的细节能够看出 Netty 对性能的优化几乎酣畅淋漓,对性能的谋求令人发指。细节真的是魔鬼。
- Netty 通过反射的形式用
SelectedSelectionKeySet
替换掉sun.nio.ch.SelectorImpl#selectedKeys
,sun.nio.ch.SelectorImpl#publicSelectedKeys
这两个汇合中原来HashSet
的实现。
- 反射获取
sun.nio.ch.SelectorImpl
类中selectedKeys
和publicSelectedKeys
。
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
Java9
版本以上通过sun.misc.Unsafe
设置字段值的形式
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
}
- 通过反射的形式用
SelectedSelectionKeySet
替换掉hashSet
实现的sun.nio.ch.SelectorImpl#selectedKeys,sun.nio.ch.SelectorImpl#publicSelectedKeys
。
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {return cause;}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {return cause;}
//Java8 反射替换字段
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
- 将与
sun.nio.ch.SelectorImpl
类中selectedKeys
和publicSelectedKeys
关联好的 Netty 优化实现SelectedSelectionKeySet
,设置到io.netty.channel.nio.NioEventLoop#selectedKeys
字段中保留。
// 会通过反射替换 selector 对象中的 selectedKeySet 保留就绪的 selectKey
// 该字段为持有 selector 对象 selectedKeys 的援用,当 IO 事件就绪时,间接从这里获取
private SelectedSelectionKeySet selectedKeys;
后续
Reactor 线程
就会间接从io.netty.channel.nio.NioEventLoop#selectedKeys
中获取IO 就绪
的SocketChannel
- 用
SelectorTuple
封装unwrappedSelector
和wrappedSelector
返回给NioEventLoop
构造函数。到此Reactor
中的Selector
就创立结束了。
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
private static final class SelectorTuple {
final Selector unwrappedSelector;
final Selector selector;
SelectorTuple(Selector unwrappedSelector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = unwrappedSelector;
}
SelectorTuple(Selector unwrappedSelector, Selector selector) {
this.unwrappedSelector = unwrappedSelector;
this.selector = selector;
}
}
- 所谓的
unwrappedSelector
是指被 Netty 优化过的 JDK NIO 原生 Selector。 - 所谓的
wrappedSelector
就是用SelectedSelectionKeySetSelector
装璜类将unwrappedSelector
和与sun.nio.ch.SelectorImpl 类
关联好的 Netty 优化实现SelectedSelectionKeySet
封装装璜起来。
wrappedSelector
会将所有对 Selector
的操作全副代理给 unwrappedSelector
,并在 发动轮询 IO 事件
的相干操作中,重置 SelectedSelectionKeySet
清空上一次的轮询后果。
final class SelectedSelectionKeySetSelector extends Selector {
//Netty 优化后的 SelectedKey 就绪汇合
private final SelectedSelectionKeySet selectionKeys;
// 优化后的 JDK NIO 原生 Selector
private final Selector delegate;
SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) {
this.delegate = delegate;
this.selectionKeys = selectionKeys;
}
@Override
public boolean isOpen() {return delegate.isOpen();
}
@Override
public SelectorProvider provider() {return delegate.provider();
}
@Override
public Set<SelectionKey> keys() {return delegate.keys();
}
@Override
public Set<SelectionKey> selectedKeys() {return delegate.selectedKeys();
}
@Override
public int selectNow() throws IOException {
// 重置 SelectedKeys 汇合
selectionKeys.reset();
return delegate.selectNow();}
@Override
public int select(long timeout) throws IOException {
// 重置 SelectedKeys 汇合
selectionKeys.reset();
return delegate.select(timeout);
}
@Override
public int select() throws IOException {
// 重置 SelectedKeys 汇合
selectionKeys.reset();
return delegate.select();}
@Override
public Selector wakeup() {return delegate.wakeup();
}
@Override
public void close() throws IOException {delegate.close();
}
}
到这里 Reactor 的外围 Selector 就创立好了,上面咱们来看下用于保留异步工作的队列是如何创立进去的。
newTaskQueue
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
// 通过用 SelectedSelectionKeySet 装璜后的 unwrappedSelector
this.selector = selectorTuple.selector;
//Netty 优化过的 JDK NIO 近程 Selector
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
咱们持续回到创立 Reactor
的主线上,到目前为止 Reactor
的外围 Selector
就创立好了,前边咱们提到 Reactor
除了须要 监听 IO 就绪事件
以及解决 IO 就绪事件
外,还须要执行一些异步工作,当内部线程向 Reactor
提交异步工作后,Reactor
就须要一个队列来保留这些异步工作,期待 Reactor 线程
执行。
上面咱们来看下 Reactor
中工作队列的创立过程:
// 工作队列大小,默认是无界队列
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
private static Queue<Runnable> newTaskQueue(EventLoopTaskQueueFactory queueFactory) {if (queueFactory == null) {return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
}
return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
- 在
NioEventLoop
的父类SingleThreadEventLoop
中提供了一个动态变量DEFAULT_MAX_PENDING_TASKS
用来指定Reactor
工作队列的大小。能够通过零碎变量-D io.netty.eventLoop.maxPendingTasks
进行设置,默认为Integer.MAX_VALUE
,示意工作队列默认为无界队列
。 - 依据
DEFAULT_MAX_PENDING_TASKS
变量的设定,来决定创立无界工作队列还是有界工作队列。
// 创立无界工作队列
PlatformDependent.<Runnable>newMpscQueue()
// 创立有界工作队列
PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks)
public static <T> Queue<T> newMpscQueue() {return Mpsc.newMpscQueue();
}
public static <T> Queue<T> newMpscQueue(final int maxCapacity) {return Mpsc.newMpscQueue(maxCapacity);
}
Reactor
内的异步工作队列的类型为MpscQueue
, 它是由JCTools
提供的一个高性能无锁队列,从命名前缀Mpsc
能够看出,它实用于多生产者单消费者
的场景,它反对多个生产者线程平安的拜访队列,同一时刻只容许一个消费者线程读取队列中的元素。咱们晓得 Netty 中的
Reactor
能够线程平安
的解决注册其上的多个SocketChannel
上的IO 数据
,保障Reactor 线程平安
的外围起因正是因为这个MpscQueue
,它能够反对多个业务线程在解决完业务逻辑后,线程平安的向MpscQueue
增加异步写工作
,而后由单个Reactor 线程
来执行这些写工作
。既然是单线程执行,那必定是线程平安的了。
Reactor 对应的 NioEventLoop 类型继承构造
NioEventLoop
的继承构造也是比较复杂,这里咱们只关注在 Reactor
创立过程中波及的到两个父类SingleThreadEventLoop
,SingleThreadEventExecutor
。
剩下的继承体系,咱们在后边随着 Netty
源码的深刻在缓缓介绍。
前边咱们提到,其实 Reactor
咱们能够看作是一个单线程的线程池,只有一个线程用来执行 IO 就绪事件的监听
,IO 事件的解决
, 异步工作的执行
。用MpscQueue
来存储待执行的异步工作。
命名前缀为 SingleThread
的父类都是对 Reactor
这些行为的分层定义。也是本大节要介绍的对象
SingleThreadEventLoop
Reactor
负责执行的异步工作分为三类:
一般工作:
这是 Netty 最次要执行的异步工作,寄存在一般工作队列taskQueue
中。在NioEventLoop
构造函数中创立。定时工作:
寄存在优先级队列中。后续咱们介绍。尾部工作:
寄存于尾部工作队列tailTasks
中,尾部工作个别不罕用,在一般工作执行完后 Reactor 线程会执行尾部工作。应用场景:比方对 Netty 的运行状态做一些统计数据,例如工作循环的耗时、占用物理内存的大小等等都能够向尾部队列增加一个收尾工作实现统计数据的实时更新。
SingleThreadEventLoop
负责对 尾部工作队列 tailTasks
进行治理。并且提供 Channel
向Reactor
注册的行为。
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
// 工作队列大小,默认是无界队列
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
// 尾部工作队列
private final Queue<Runnable> tailTasks;
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
RejectedExecutionHandler rejectedExecutionHandler) {super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
// 尾部队列 执行一些统计工作 不罕用
tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}
@Override
public ChannelFuture register(Channel channel) {
// 注册 channel 到绑定的 Reactor 上
return register(new DefaultChannelPromise(channel, this));
}
}
SingleThreadEventExecutor
SingleThreadEventExecutor
次要负责对 一般工作队列
的治理,以及 异步工作的执行
,Reactor 线程的启停
。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) {
//parent 为 Reactor 所属的 NioEventLoopGroup Reactor 线程组
super(parent);
// 向 Reactor 增加工作时,是否唤醒 Selector 进行轮询 IO 就绪事件,马上执行异步工作
this.addTaskWakesUp = addTaskWakesUp;
//Reactor 异步工作队列的大小
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
// 用于启动 Reactor 线程的 executor -> ThreadPerTaskExecutor
this.executor = ThreadExecutorMap.apply(executor, this);
// 一般工作队列
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
// 工作队列满时的回绝策略
this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
}
到当初为止,一个残缺的 Reactor 架构
就被创立进去了。
3. 创立 Channel 到 Reactor 的绑定策略
到这一步,Reactor 线程组 NioEventLoopGroup
里边的所有 Reactor
就曾经全副创立结束。
无论是 Netty 服务端 NioServerSocketChannel
关注的 OP_ACCEPT
事件也好,还是 Netty 客户端 NioSocketChannel
关注的 OP_READ
和OP_WRITE
事件也好,都须要先注册到 Reactor
上,Reactor
能力监听 Channel
上关注的 IO 事件
实现IO 多路复用
。
NioEventLoopGroup
(Reactor 线程组)里边有泛滥的 Reactor
,那么以上提到的这些Channel
到底应该注册到哪个 Reactor
上呢?这就须要一个绑定的策略来平均分配。
还记得咱们前边介绍 MultithreadEventExecutorGroup 类
的时候提到的结构器参数 EventExecutorChooserFactory
吗?
这时候它就派上用场了,它次要用来创立 Channel
到Reactor
的绑定策略。默认为DefaultEventExecutorChooserFactory.INSTANCE
。
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
// 从 Reactor 汇合中抉择一个特定的 Reactor 的绑定策略 用于 channel 注册绑定到一个固定的 Reactor 上
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
chooser = chooserFactory.newChooser(children);
}
上面咱们来看下具体的绑定策略:
DefaultEventExecutorChooserFactory
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() {}
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {if (isPowerOfTwo(executors.length)) {return new PowerOfTwoEventExecutorChooser(executors);
} else {return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {return (val & -val) == val;
}
................... 省略.................
}
咱们看到在 newChooser
办法绑定策略有两个分支,不同之处在于须要判断 Reactor 线程组中的 Reactor
个数是否为 2 的次幂
。
Netty 中的绑定策略就是采纳 round-robin
轮询的形式来挨个抉择 Reactor
进行绑定。
采纳 round-robin
的形式进行负载平衡,咱们个别会用 round % reactor.length
取余的形式来挨个均匀的定位到对应的 Reactor
上。
如果 Reactor
的个数 reactor.length
恰好是 2 的次幂
,那么就能够用位操作&
运算 round & reactor.length -1
来代替 %
运算 round % reactor.length
,因为位运算的性能更高。具体为什么&
运算可能代替 %
运算,笔者会在前面讲述工夫轮的时候为大家具体证实,这里大家只需记住这个公式,咱们还是聚焦本文的主线。
理解了优化原理,咱们在看代码实现就很容易了解了。
利用 %
运算的形式 Math.abs(idx.getAndIncrement() % executors.length)
来进行绑定。
private static final class GenericEventExecutorChooser implements EventExecutorChooser {private final AtomicLong idx = new AtomicLong();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}
@Override
public EventExecutor next() {return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
}
利用 &
运算的形式 idx.getAndIncrement() & executors.length - 1
来进行绑定。
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}
@Override
public EventExecutor next() {return executors[idx.getAndIncrement() & executors.length - 1];
}
}
又一次被 Netty 对性能的极致谋求所折服~~~~
4. 向 Reactor 线程组中所有的 Reactor 注册 terminated 回调函数
当 Reactor 线程组 NioEventLoopGroup
中所有的 Reactor
曾经创立结束,Channel
到 Reactor
的绑定策略也创立结束后,咱们就来到了创立 NioEventGroup
的最初一步。
俗话说的好,有创立就有启动,有启动就有敞开,这里会创立 Reactor 敞开
的回调函数 terminationListener
,在Reactor
敞开时回调。
terminationListener
回调的逻辑很简略:
- 通过
AtomicInteger terminatedChildren
变量记录曾经敞开的Reactor
个数,用来判断NioEventLoopGroup
中的Reactor
是否曾经全副敞开。 - 如果所有
Reactor
均已敞开,设置NioEventLoopGroup
中的terminationFuture
为success
。示意Reactor 线程组
敞开胜利。
// 记录敞开的 Reactor 个数,当 Reactor 全副敞开后,才能够认为敞开胜利
private final AtomicInteger terminatedChildren = new AtomicInteger();
// 敞开 future
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {if (terminatedChildren.incrementAndGet() == children.length) {
// 当所有 Reactor 敞开后 才认为是敞开胜利
terminationFuture.setSuccess(null);
}
}
};
// 为所有 Reactor 增加 terminationListener
for (EventExecutor e: children) {e.terminationFuture().addListener(terminationListener);
}
咱们在回到文章结尾Netty 服务端代码模板
public final class EchoServer {static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
// 创立主从 Reactor 线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
........... 省略............
}
}
当初 Netty 的 主从 Reactor 线程组
就曾经创立结束,此时 Netty 服务端的骨架曾经搭建结束,骨架如下:
总结
本文介绍了首先介绍了 Netty 对各种 IO 模型
的反对以及如何轻松切换各种IO 模型
。
还花了大量的篇幅介绍 Netty 服务端的外围引擎 主从 Reactor 线程组
的创立过程。在这个过程中,咱们还提到了 Netty 对各种细节进行的优化,展示了 Netty 对性能极致的谋求。
好了,Netty 服务端的骨架曾经搭好,剩下的事件就该绑定端口地址而后接管连贯了,咱们下篇文章再见~~~
欢送关注微信公众号:bin 的技术小屋,浏览公众号原文