一、简介
在 netty 中,事件循环 EventLoop
是一个很重要的组件,用于解决 已注册 Channel
的各种IO 事件
,而 EventLoopGroup 对应了一个或多个 EventLoop,能够看做EvenLoopGroup 就是 EventLoop 的汇合
。上面是 EventLoop 和 EventLoopGroup 相干类图:
从下面类图能够看到,netty 在 jdk 原生接口 ScheduledExecutorService 上衍生了 EventExecutorGroup 接口
,其通过next() 办法
来获取 EventExecutor 事件执行器,并在 ScheduledExecutorService 的根底上增加了 优雅敞开、是否正在敞开
等操作,如下图
而 EventLoopGroup 继承了 EventExecutorGroup 接口,重写 next()办法
并增加 注册 Channel
的操作,如下图
EventLoop 接口自身比较简单,继承于 EventExecutor 及 EventLoopGroup 接口,如下
最常应用的 NioEventLoopGroup 和 NioEventLoop,别离继承于抽象类 MultithreadEventLoopGroup 和 SingleThreadEventLoop
,而这两个抽象类自身实现不难,其次要是继承了 MultithreadEventExecutorGroup 和 SingleThreadEventExecutor
,所以上面来看下 MultithreadEventExecutorGroup 和 SingleThreadEventExecutor 的次要代码逻辑
二、MultithreadEventExecutorGroup 和 SingleThreadEventExecutor
2.1 MultithreadEventExecutorGroup
MultithreadEventExecutorGroup 示意 通过多个 EventExecutor 来解决所提交的工作
2.1.1 重要属性
有两个较为重要的属性children 和 chooser
,children 对应 EventExecutor 数组,而 chooser 选取器的作用是从 children 选取 EventExecutor 来执行工作。如下
// 对应的 EventExecutor 数组
private final EventExecutor[] children;
// 选取器,作用是从 children 里选取 EventExecutor 来执行工作
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
2.1.2 初始化
MultithreadEventExecutorGroup 的构造函数会对 children 和 chooser 进行初始化,大抵步骤如下:
- 依据传进来的 nThreads 线程数来初始化 children 数组
children = new EventExecutor[nThreads]
- 通过
newChild() 办法
来实例化 children 的每个 EventExecutor,newChild()为形象办法,须要子类 (如 NioEventLoopGroup) 具体实现。如果没有胜利的实例化 children 数组,则逐个优雅敞开 EventExecutor - 初始化 chooser 选取器
- 给 children 中的每个 EventExecutor 增加
terminationListener 终止监听器
,每有一个 EventExecutor 终止了,就会将 terminatedChildren 加 1,等到 terminatedChildren==children 总数时,阐明所有的 EventExecutor 曾经全副终止
源码如下:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {checkPositive(nThreads, "nThreads");
if (executor == null) {
// 如果传进的 executor 执行器为空,设置为 ThreadPerTaskExecutor 执行器,该执行器会独自创立一个线程来解决每个工作
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 依据传进来的 nThreads 线程数来实例化 children
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// newChild 作用是生成具体的 EventExecutor,其为形象办法,须要子类(如 NioEventLoopGroup)去具体实现
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 如果没有胜利的实例化 children 数组,则逐个优雅敞开 EventExecutor
if (!success) {for (int j = 0; j < i; j ++) {children[j].shutdownGracefully();}
// 期待终止所有的 EventExecutor
for (int j = 0; j < i; j ++) {EventExecutor e = children[j];
try {while (!e.isTerminated()) {e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 初始化 chooser 选取器
chooser = chooserFactory.newChooser(children);
// 给 children 中的每个 EventExecutor 增加终止监听器
// 每有一个 EventExecutor 终止了,就会将 terminatedChildren 加 1
// 等到 terminatedChildren==children 总数时,阐明所有的 EventExecutor 曾经全副终止
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {if (terminatedChildren.incrementAndGet() == children.length) {terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {e.terminationFuture().addListener(terminationListener);
}
}
2.1.3 提交工作
MultithreadEventExecutorGroup 提交工作的大抵流程如下图:
提交工作时,MultithreadEventExecutorGroup 是间接应用父类 AbstractEventExecutorGroup 的 submit 办法来提交,而该 submit 办法中是 通过调用 next()办法来选取到某个 EventExecutor,再调用 EventExecutor 的 submit()办法来提交的
,如下
@Override
public Future<?> submit(Runnable task) {return next().submit(task);
}
而 next()办法则是通过 chooser 选取器来选取到某个 EventExecutor 的,如下
@Override
public EventExecutor next() {return chooser.next();
}
2.2 SingleThreadEventExecutor
从下面咱们能够得悉 MultithreadEventExecutorGroup 提交工作时,本质上是 选取到某个 EventExecutor,再由该 EventExecutor 来进行提交
。
因为咱们罕用的 NioEventLoop 的大多数操作其实 是由 SingleThreadEventExecutor 提供了默认实现
(当然 NioEventLoop 也有其具体的一些操作,后续会详解),所以在深刻 NioEventLoop 之前,有必要先理解一下 SingleThreadEventExecutor
2.2.1 重要属性
SingleThreadEventExecutor 中有一个 寄存工作的 taskQueue 工作队列
,还有一个 与之绑定的 thread 线程
,还有一些优雅敞开相干属性,如下
// 寄存工作的队列
private final Queue<Runnable> taskQueue;
// 与该 SingleThreadEventExecutor 绑定的 thread
private volatile Thread thread;
// 执行器,首次启动时通过该执行器来启动线程,再由该线程来生产 taskQueue 的工作
private final Executor executor;
// 该属性很重要,示意 addTask 增加工作时,是否主动唤醒线程,如果不能主动唤醒,须要被动调用 wakeup 办法来唤醒
// 如:DefaultEventExecutor 的 addTaskWakesUp 为 true,而 NioEventLoop 为 false
private final boolean addTaskWakesUp;
// 队列的最大容量
private final int maxPendingTasks;
// 优雅敞开的静默工夫
private volatile long gracefulShutdownQuietPeriod;
// 优雅敞开的超时工夫
private volatile long gracefulShutdownTimeout;
// 优雅敞开的开始工夫
private long gracefulShutdownStartTime;
2.2.2 状态治理
SingleThreadEventExecutor 总共有 5 种状态,如下
- ST_NOT_STARTED (未启动)
- ST_STARTED (启动)
- ST_SHUTTING_DOWN (敞开中)
- ST_SHUTDOWN (已敞开)
- ST_TERMINATED (已终止)
初始状态为ST_NOT_STARTED 未启动
,如下
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;
private volatile int state = ST_NOT_STARTED;
状态转换:ST_NOT_STARTED –> ST_STARTED –> ST_SHUTTING_DOWN –> ST_SHUTDOWN –> ST_TERMINATED
2.2.3 提交工作
SingleThreadEventExecutor 提交工作的流程图如下:
SingleThreadEventExecutor 在 首次提交工作
时,会将state 设置为已启动,启动工作线程,并将该工作线程与 thread 属性进行绑定
,后续再次提交工作时,只会将工作增加到 taskQueue 工作队列中。源码如下
private void execute(Runnable task, boolean immediate) {
// 判断以后线程与该 SingleThreadEventExecutor 绑定的线程是否是同一个
boolean inEventLoop = inEventLoop();
// 增加工作至 taskQueue 工作队列
addTask(task);
if (!inEventLoop) {
// 如果 state 为未启动,则将 state 更新为已启动,启动工作线程,并将工作线程与该 SingleThreadEventExecutor 绑定
startThread();
// 如果 state 为已敞开,则回绝增加工作
if (isShutdown()) {
boolean reject = false;
try {if (removeTask(task)) {reject = true;}
} catch (UnsupportedOperationException e) { }
if (reject) {reject();
}
}
}
if (!addTaskWakesUp && immediate) {wakeup(inEventLoop);
}
}
private void startThread() {
// 如果 state 为未启动
if (state == ST_NOT_STARTED) {
// 将 state 更新为已启动
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
// 启动工作线程
doStartThread();
success = true;
} finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
doStartThread 局部代码如下:
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
// 绑定工作线程
thread = Thread.currentThread();
if (interrupted) {thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// run 办法,从 taskQueue 中获取工作来执行,由子类去具体实现
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {logger.warn("Unexpected exception from an event executor:", t);
}
2.2.4 优雅敞开
下面曾经介绍了有 3 个属性是跟 优雅敞开
相干的,有gracefulShutdownQuietPeriod 静默工夫、gracefulShutdownTimeout 超时工夫、gracefulShutdownStartTime 开始工夫
gracefulShutdownQuietPeriod:如果以后工夫 - 上一次执行工夫 < 静默工夫,那么临时先不敞开,否则进行敞开
gracefulShutdownTimeout:如果以后工夫 - 优雅敞开的开始工夫 > 超时工夫,那么进行敞开
gracefulShutdownStartTime:优雅敞开的开始工夫
接下来看下优雅敞开 shutdownGracefully 办法,该办法会将 state 状态设置为敞开中,并间接返回 terminationFuture,源码如下:
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
if (timeout < quietPeriod) {
throw new IllegalArgumentException("timeout:" + timeout + "(expected >= quietPeriod (" + quietPeriod + "))");
}
ObjectUtil.checkNotNull(unit, "unit");
// 如果 state >= 敞开中,间接返回 terminationFuture
if (isShuttingDown()) {return terminationFuture();
}
boolean inEventLoop = inEventLoop();
boolean wakeup;
int oldState;
for (;;) {
// 再次判断,如果 state >= 敞开中,间接返回 terminationFuture
if (isShuttingDown()) {return terminationFuture();
}
int newState;
wakeup = true; // 是否须要唤醒
oldState = state;
if (inEventLoop) {newState = ST_SHUTTING_DOWN;} else {switch (oldState) {
case ST_NOT_STARTED:
case ST_STARTED: // 如果旧状态为已启动,则设置新状态为敞开中
newState = ST_SHUTTING_DOWN;
break;
default: // 如果旧状态 >= 已启动,那么将 wakeup 设置为 false,不唤醒工作线程
newState = oldState;
wakeup = false;
}
}
// 通过 CAS 操作来更新状态
if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {break;}
}
gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
gracefulShutdownTimeout = unit.toNanos(timeout);
if (ensureThreadStarted(oldState)) {return terminationFuture;}
// 如果须要唤醒,则将 WAKEUP_TASK 放到队列中,来唤醒工作线程
if (wakeup) {taskQueue.offer(WAKEUP_TASK);
if (!addTaskWakesUp) {wakeup(inEventLoop);
}
}
return terminationFuture();}
state 状态更新为敞开中后,工作线程从 taskQueue 队列中每次拿到工作后,将会对 state 状态进行判断,如果是敞开中,会进一步判断是否确认敞开,如果确认敞开,则会跳出 run 办法,工作线程执行完结,最终该 SingleThreadEventExecutor 的 state 状态更新为已终止
。这里咱们能够通过查看 SingleThreadEventExecutor 的默认实现DefaultEventExecutor 的 run 办法
,源码如下:
protected void run() {for (;;) {
// 从 taskQueue 中获取工作,如果是 WAKEUP_TASK,则拿到的 task 为 null
Runnable task = takeTask();
if (task != null) {
// 执行工作
runTask(task);
// 更新上一次执行工夫
updateLastExecutionTime();}
// 确认是否敞开,如果是,跳出死循环
if (confirmShutdown()) {break;}
}
}
confirmShutdown 的源码如下:
protected boolean confirmShutdown() {
// 如果 state < 敞开中,间接返回 false
if (!isShuttingDown()) {return false;}
// 勾销定时工作
cancelScheduledTasks();
if (gracefulShutdownStartTime == 0) {
// 设置优雅敞开的开始工夫为以后工夫
gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();}
// 如果 taskQueue 里还有工作,运行所有工作,否则运行敞开钩子
if (runAllTasks() || runShutdownHooks()) {if (isShutdown()) {return true;}
// 如果静默期为 0,返回 true 确认敞开
if (gracefulShutdownQuietPeriod == 0) {return true;}
taskQueue.offer(WAKEUP_TASK);
return false;
}
final long nanoTime = ScheduledFutureTask.nanoTime();
// 如果 state >= 已敞开,返回 true 确认敞开
// 否则判断以后工夫 - 优雅敞开的开始工夫 是否大于 超时工夫,如果大于,返回 true 确认敞开
if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {return true;}
// 如果以后工夫 - 上一次工作执行工夫 小于等于 静默工夫,阐明在这一段时间内还有工作执行,则线程休眠 100 毫秒,返回 false 暂不敞开
if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
// 休眠 100 毫秒后,用于持续唤醒工作线程
taskQueue.offer(WAKEUP_TASK);
try {Thread.sleep(100);
} catch (InterruptedException e) { }
return false;
}
return true;
}
当 confirmShutdown 返回 true 跳出 run 办法后
- 工作线程开始收尾工作
- 因为此时可能又有一些工作被增加到 taskQueue 外面,所以须要再次调用 confirmShutdown 办法
- 将
state 状态更新为已敞开
(这时曾经不能再接管新的工作了) - 再次调用 confirmShutdown 办法(理由同上,如果是这样的话,第一次的 confirmShutdown 调用是不是没有必要?)。
- 最终调用 cleanup()钩子办法,并
将 state 状态更新为已终止
,设置 terminationFuture 后果为胜利
三、NioEventLoopGroup 和 NioEventLoop
3.1 NioEventLoopGroup
NioEventLoopGroup
能够说是 netty 中咱们最相熟的类之一,继承于 MultithreadEventLoopGroup,而 MultithreadEventLoopGroup 继承于 MultithreadEventExecutorGroup(下面曾经剖析过该类),如下图
因为 NioEventLoopGroup 自身实现比较简单,所以这里咱们只看下它的 newChild() 办法
,这个办法之前在介绍 MultithreadEventExecutorGroup 也有提及,该办法是一个形象办法,须要子类具体实现,源码如下:
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
// SelectorProvider 有关上选择器 openSelector、关上服务端通道 openServerSocketChannel 等办法
SelectorProvider selectorProvider = (SelectorProvider) args[0];
// select 策略工厂,用于产生 SelectStrategy
SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
// 拒绝执行处理器
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
// taskQueue 工厂,taskQueue 之前已有提及
EventLoopTaskQueueFactory taskQueueFactory = null;
// tailTaskQueue 工厂
EventLoopTaskQueueFactory tailTaskQueueFactory = null;
int argsLength = args.length;
if (argsLength > 3) {taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
}
if (argsLength > 4) {tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
}
// new 一个 NioEventLoop 实例
return new NioEventLoop(this, executor, selectorProvider,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
能够看到,newChild 办法的最初就是new 一个 NioEventLoop 实例
,所以最初咱们须要来看下 NioEventLoop 中的源码,看看它到底是如何来运作的?
3.2 NioEventLoop
学习过 NIO 的同学都晓得,多个 Channel 能够注册到一个 Selector,这样咱们就能够在单线程中通过一个 Selector 来治理多个 Channel,这就是IO 多路复用
,而NioEventLoop 就是 IO 多路复用的一个具体实现
。
在最开始的类图咱们能够看到 NioEventLoop 和 EpollEventLoop
都继承于 SingleThreadEventLoop,因为 NioEventLoop 更为常常应用,所以这里只介绍 NioEventLoop
。NioEventLoop 继承于 SingleThreadEventLoop,而 SingleThreadEventLoop 继承于 SingleThreadEventExecutor,如下图
SinleThreadEventLoop 在 SingleThreadEventExecutor 的根底上,增加了一个 tailTasks 工作队列(runAllTasks 办法中执行完 taskQueue 中的工作后,会再执行 tailTasks 中的工作
)
3.2.1 构造函数
NioEventLoop 的构造函数,接管多个参数,有 executor 执行器、select 策略、拒绝执行处理器、taskQueue 工厂、tailTaskQueue 工厂,如下
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
// select 策略
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
// 创立 SelectorTuple,final SelectorTuple selectorTuple = openSelector();
// 包装后的 Selector,类型为 SelectedSelectionKeySetSelector,蕴含了 SelectedSelectionKeySet
this.selector = selectorTuple.selector;
// 未包装的 Selector
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
3.2.2 工作流程
在之前 SingleThreadEventExecutor 的介绍中,咱们曾经晓得,它会在启动工作线程后,调用 run 办法,而 run 办法由子类具体实现
,所以 NioEventLoop 的 run 办法就是其工作流程,大抵的工作如下:
- 判断
taskQueue 和 tailTaskQueue
是否有工作,如果没有工作,则通过调用Selector.select 办法来阻塞或超时阻塞获取 IO 事件
- 如果有工作,调用 Selector.selectNow 办法非阻塞获取 IO 事件
- 判断是否有 IO 事件筹备好,
如果有,先解决 IO 事件
- 再解决 taskQueue 和 tailTaskQueue 中的工作
- 返回第一步,
有限循环
流程图如下:
联合源码来看,源码如下:
protected void run() {
int selectCnt = 0;
// 死循环
for (;;) {
try {
int strategy;
try {
// 计算 strategy 的值
// 如果 hasTasks 为 true,代表 taskQueue 或 tailQueue 里有工作,则间接调用 Selector.selectNow()来获取以后已筹备好的 IO 事件数量,并间接返回
// 如果 hasTask 为 false,代表 taskQueue 或 tailQueue 里没有工作,则返回 SelectStrategy.SELECT(值为 -1)
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
// 查看 scheduledTaskQueue 里的定时工作,如果定时工作不为空,将定时工作的 deadlineNanos 过期工夫赋值给 curDeadlineNanos
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE;}
nextWakeupNanos.set(curDeadlineNanos);
try {
// 如果没有工作,则进一步判断
// 如果 curDeadlineNanos 为 NONE,则调用 Selector.select()进行阻塞,直到有 IO 事件筹备好
// 如果 curDeadlineNanos 不为 NONE,则调用 selector.select(timeoutMillis)进行超时阻塞
if (!hasTasks()) {strategy = select(curDeadlineNanos);
}
} finally {nextWakeupNanos.lazySet(AWAKE);
}
default:
}
} catch (IOException e) {// 这里是 selector.select()的一个 bug,即在某种状况下,在没有 IO 事件筹备好时,select()也没有进行阻塞,此时须要重建 Selector
// 后续会进行具体介绍
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
// select 的次数
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
// ioRatio 示意 processSelectedKeys 办法(解决 IO 事件)和 runAllTasks()办法所用的事件占比
// 如果 ioRatio 为 50,则工夫比为 1:1,如果为 60,则工夫比为 3:2
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) { // 如果 ioRatio 为 100
try {if (strategy > 0) {
// 解决筹备好的 IO 事件
processSelectedKeys();}
} finally {
// 执行 taskQueue 里的所有工作
ranTasks = runAllTasks();}
} else if (strategy > 0) {final long ioStartTime = System.nanoTime();
try {
// 解决筹备好的 IO 事件
processSelectedKeys();} finally {final long ioTime = System.nanoTime() - ioStartTime;
// 限时执行,工夫到了之后须要先返回,所以可能只能执行 taskQueue 里的局部工作
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
// 只执行一个工作
ranTasks = runAllTasks(0);
}
// 如果至多有一个工作执行胜利,runTasks 则为 true,则重置 selectCnt 为 0
if (ranTasks || strategy > 0) {if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // 如果未预料到的 Selector 被唤醒,阐明可能是 bug 呈现了,重建 Selector 并重置 selectCnt
selectCnt = 0;
}
} catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + "raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Error e) {throw e;} catch (Throwable t) {handleLoopException(t);
} finally {// 判断是否敞开,代码这里省略}
}
}
3.2.3 解决 IO 事件
下面曾经介绍了 NioEventLoop 的工作流程,那么在判断如果有筹备好的 IO 事件,那么会调用 processSelectedKeys 来解决这些 IO 事件
,接下来就来看下它的源码
private void processSelectedKeysOptimized() {for (int i = 0; i < selectedKeys.size; ++i) {
// 获取对应 IO 事件的 SelectionKey,以及它的附件
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
// 真正来解决对应的 IO 事件,将 SelectionKey 以及对应的 NioChannel 传入
processSelectedKey(k, (AbstractNioChannel) a);
} else {NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
这里来看下 processSelectedKey 是如何真正的来解决每个 IO 事件的
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 判断 SelectionKey 是否无效,如果有效,间接敞开 channel 通道并返回
if (!k.isValid()) {
final EventLoop eventLoop;
try {eventLoop = ch.eventLoop();
} catch (Throwable ignored) {return;}
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
// 获取以后 SelectionKey 的已筹备好的事件集
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // 如果有 OP_CONNECT 连贯事件
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops); // 设置 ops,示意该 SelectionKey 不再关注连贯事件
unsafe.finishConnect(); // 下一章节会具体介绍,外部就是开始执行 pipeline,去执行各个 ChannelHandler 对应的办法}
if ((readyOps & SelectionKey.OP_WRITE) != 0) { // 如果有 OP_WRITE 写事件
ch.unsafe().forceFlush(); // 下一章节会具体介绍,外部就是开始执行 pipeline,去执行各个 ChannelHandler 对应的办法
}
// 如果有 OP_READ 读事件或者 OP_ACCEPT 接管事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read(); // 下一章节会具体介绍,外部就是开始执行 pipeline,去执行各个 ChannelHandler 对应的办法
}
} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());
}
}
3.2.4 空轮询问题
在执行 Selector.select()办法时,失常状况下如果没有筹备好的 Channel 时,线程会被阻塞。
而 空轮询是因为 Selector.select()没有正确工作,在没有筹备好的 Channel 时,就间接被唤醒,而没有进行阻塞。从而导致 run 办法始终在死循环,CPU 达到了 100%
。
那么在什么时候,代表该 bug 呈现了?
当 空轮询次数 selectCnt 大于等于 SELECTOR_AUTO_REBUILD_THRESHOLD(默认为 512)
时,示意该 bug 呈现,则进行 重建 Selector
,源码如下:
private boolean unexpectedSelectorWakeup(int selectCnt) {if (Thread.interrupted()) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely because" +
"Thread.currentThread().interrupt() was called. Use" +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
return true;
}
// 判断如果空轮询次数 selectCnt 大于 SELECTOR_AUTO_REBUILD_THRESHOLD 时
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
// 重建 Selector
rebuildSelector();
return true;
}
return false;
}
如何重建 Selector?
大抵步骤:
- 从新关上一个 Selector
- 将旧 Selector 的所有 Channel 和对应的附件,都注册到新的 Selector 上
- 敞开旧 Selector
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {return;}
// 关上一个新的 Selector
newSelectorTuple = openSelector();
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {Object a = key.attachment();
try {if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {continue;}
int interestOps = key.interestOps();
key.cancel();
// 将旧 Selector 上的 channel 都注册到新的 Selector
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {// 省略局部代码}
}
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// 敞开旧 Selector
oldSelector.close();} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("Failed to close the old Selector.", t);
}
}
}
四、总结
EventLoop 和 EventLoopGroup 是 netty 中最重要的组件之一,也是 netty 可能构建高性能程序的关键所在,理解其工作机制与原理是十分有必要的。