一、简介
在netty中,事件循环EventLoop
是一个很重要的组件,用于解决已注册Channel
的各种IO事件
,而EventLoopGroup对应了一个或多个EventLoop,能够看做EvenLoopGroup就是EventLoop的汇合
。上面是EventLoop和EventLoopGroup相干类图:
从下面类图能够看到,netty在jdk原生接口ScheduledExecutorService上衍生了EventExecutorGroup接口
,其通过next()办法
来获取EventExecutor事件执行器,并在ScheduledExecutorService的根底上增加了优雅敞开、是否正在敞开
等操作,如下图
而EventLoopGroup继承了EventExecutorGroup接口,重写next()办法
并增加注册Channel
的操作,如下图
EventLoop接口自身比较简单,继承于EventExecutor及EventLoopGroup接口,如下
最常应用的 NioEventLoopGroup 和 NioEventLoop,别离继承于抽象类 MultithreadEventLoopGroup 和 SingleThreadEventLoop
,而这两个抽象类自身实现不难,其次要是继承了 MultithreadEventExecutorGroup 和 SingleThreadEventExecutor
,所以上面来看下 MultithreadEventExecutorGroup 和 SingleThreadEventExecutor 的次要代码逻辑
二、MultithreadEventExecutorGroup和SingleThreadEventExecutor
2.1 MultithreadEventExecutorGroup
MultithreadEventExecutorGroup示意通过多个EventExecutor来解决所提交的工作
2.1.1 重要属性
有两个较为重要的属性children和chooser
,children对应EventExecutor数组,而chooser选取器的作用是从children选取EventExecutor来执行工作。如下
// 对应的EventExecutor数组private final EventExecutor[] children;// 选取器,作用是从children里选取EventExecutor来执行工作private final EventExecutorChooserFactory.EventExecutorChooser chooser;
2.1.2 初始化
MultithreadEventExecutorGroup的构造函数会对children和chooser进行初始化,大抵步骤如下:
- 依据传进来的nThreads线程数来初始化children数组
children = new EventExecutor[nThreads]
- 通过
newChild()办法
来实例化children的每个EventExecutor,newChild()为形象办法,须要子类(如NioEventLoopGroup)具体实现。如果没有胜利的实例化children数组,则逐个优雅敞开EventExecutor - 初始化chooser选取器
- 给children中的每个EventExecutor增加
terminationListener终止监听器
,每有一个EventExecutor终止了,就会将terminatedChildren加1,等到terminatedChildren==children总数时,阐明所有的EventExecutor曾经全副终止
源码如下:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { checkPositive(nThreads, "nThreads"); if (executor == null) { // 如果传进的executor执行器为空,设置为ThreadPerTaskExecutor执行器,该执行器会独自创立一个线程来解决每个工作 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } // 依据传进来的nThreads线程数来实例化children children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // newChild作用是生成具体的EventExecutor,其为形象办法,须要子类(如NioEventLoopGroup)去具体实现 children[i] = newChild(executor, args); success = true; } catch (Exception e) { throw new IllegalStateException("failed to create a child event loop", e); } finally { // 如果没有胜利的实例化children数组,则逐个优雅敞开EventExecutor if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } // 期待终止所有的EventExecutor for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { Thread.currentThread().interrupt(); break; } } } } } // 初始化chooser选取器 chooser = chooserFactory.newChooser(children); // 给children中的每个EventExecutor增加终止监听器 // 每有一个EventExecutor终止了,就会将terminatedChildren加1 // 等到terminatedChildren==children总数时,阐明所有的EventExecutor曾经全副终止 final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); }}
2.1.3 提交工作
MultithreadEventExecutorGroup提交工作的大抵流程如下图:
提交工作时,MultithreadEventExecutorGroup是间接应用父类AbstractEventExecutorGroup的submit办法来提交,而该submit办法中是通过调用next()办法来选取到某个EventExecutor,再调用EventExecutor的submit()办法来提交的
,如下
@Overridepublic Future<?> submit(Runnable task) { return next().submit(task);}
而next()办法则是通过chooser选取器来选取到某个EventExecutor的,如下
@Overridepublic EventExecutor next() { return chooser.next();}
2.2 SingleThreadEventExecutor
从下面咱们能够得悉MultithreadEventExecutorGroup提交工作时,本质上是选取到某个EventExecutor,再由该EventExecutor来进行提交
。
因为咱们罕用的NioEventLoop的大多数操作其实是由SingleThreadEventExecutor提供了默认实现
(当然NioEventLoop也有其具体的一些操作,后续会详解),所以在深刻NioEventLoop之前,有必要先理解一下SingleThreadEventExecutor
2.2.1 重要属性
SingleThreadEventExecutor中有一个寄存工作的taskQueue工作队列
,还有一个与之绑定的thread线程
,还有一些优雅敞开相干属性,如下
// 寄存工作的队列private final Queue<Runnable> taskQueue;// 与该SingleThreadEventExecutor绑定的threadprivate volatile Thread thread;// 执行器,首次启动时通过该执行器来启动线程,再由该线程来生产taskQueue的工作private final Executor executor;// 该属性很重要,示意addTask增加工作时,是否主动唤醒线程,如果不能主动唤醒,须要被动调用wakeup办法来唤醒// 如:DefaultEventExecutor的addTaskWakesUp为true,而NioEventLoop为falseprivate final boolean addTaskWakesUp;// 队列的最大容量private final int maxPendingTasks;// 优雅敞开的静默工夫private volatile long gracefulShutdownQuietPeriod;// 优雅敞开的超时工夫private volatile long gracefulShutdownTimeout;// 优雅敞开的开始工夫private long gracefulShutdownStartTime;
2.2.2 状态治理
SingleThreadEventExecutor总共有5种状态,如下
- ST_NOT_STARTED (未启动)
- ST_STARTED (启动)
- ST_SHUTTING_DOWN (敞开中)
- ST_SHUTDOWN (已敞开)
- ST_TERMINATED (已终止)
初始状态为ST_NOT_STARTED未启动
,如下
private static final int ST_NOT_STARTED = 1;private static final int ST_STARTED = 2;private static final int ST_SHUTTING_DOWN = 3;private static final int ST_SHUTDOWN = 4;private static final int ST_TERMINATED = 5;private volatile int state = ST_NOT_STARTED;
状态转换:ST_NOT_STARTED --> ST_STARTED --> ST_SHUTTING_DOWN --> ST_SHUTDOWN --> ST_TERMINATED
2.2.3 提交工作
SingleThreadEventExecutor提交工作的流程图如下:
SingleThreadEventExecutor在首次提交工作
时,会将state设置为已启动,启动工作线程,并将该工作线程与thread属性进行绑定
,后续再次提交工作时,只会将工作增加到taskQueue工作队列中。源码如下
private void execute(Runnable task, boolean immediate) { // 判断以后线程与该SingleThreadEventExecutor绑定的线程是否是同一个 boolean inEventLoop = inEventLoop(); // 增加工作至taskQueue工作队列 addTask(task); if (!inEventLoop) { // 如果state为未启动,则将state更新为已启动,启动工作线程,并将工作线程与该SingleThreadEventExecutor绑定 startThread(); // 如果state为已敞开,则回绝增加工作 if (isShutdown()) { boolean reject = false; try { if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) { } if (reject) { reject(); } } } if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); }}
private void startThread() { // 如果state为未启动 if (state == ST_NOT_STARTED) { // 将state更新为已启动 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try { // 启动工作线程 doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); } } } }}
doStartThread局部代码如下:
private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { // 绑定工作线程 thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { // run办法,从taskQueue中获取工作来执行,由子类去具体实现 SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); }
2.2.4 优雅敞开
下面曾经介绍了有3个属性是跟优雅敞开
相干的,有gracefulShutdownQuietPeriod静默工夫、gracefulShutdownTimeout超时工夫、gracefulShutdownStartTime开始工夫
gracefulShutdownQuietPeriod:如果以后工夫-上一次执行工夫 < 静默工夫,那么临时先不敞开,否则进行敞开
gracefulShutdownTimeout:如果以后工夫-优雅敞开的开始工夫 > 超时工夫,那么进行敞开
gracefulShutdownStartTime:优雅敞开的开始工夫
接下来看下优雅敞开shutdownGracefully办法,该办法会将state状态设置为敞开中,并间接返回terminationFuture,源码如下:
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod"); if (timeout < quietPeriod) { throw new IllegalArgumentException( "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))"); } ObjectUtil.checkNotNull(unit, "unit"); // 如果state >= 敞开中,间接返回terminationFuture if (isShuttingDown()) { return terminationFuture(); } boolean inEventLoop = inEventLoop(); boolean wakeup; int oldState; for (;;) { // 再次判断,如果state >= 敞开中,间接返回terminationFuture if (isShuttingDown()) { return terminationFuture(); } int newState; wakeup = true; // 是否须要唤醒 oldState = state; if (inEventLoop) { newState = ST_SHUTTING_DOWN; } else { switch (oldState) { case ST_NOT_STARTED: case ST_STARTED: // 如果旧状态为已启动,则设置新状态为敞开中 newState = ST_SHUTTING_DOWN; break; default: // 如果旧状态 >= 已启动,那么将wakeup设置为false,不唤醒工作线程 newState = oldState; wakeup = false; } } // 通过CAS操作来更新状态 if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { break; } } gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod); gracefulShutdownTimeout = unit.toNanos(timeout); if (ensureThreadStarted(oldState)) { return terminationFuture; } // 如果须要唤醒,则将WAKEUP_TASK放到队列中,来唤醒工作线程 if (wakeup) { taskQueue.offer(WAKEUP_TASK); if (!addTaskWakesUp) { wakeup(inEventLoop); } } return terminationFuture();}
state状态更新为敞开中后,工作线程从taskQueue队列中每次拿到工作后,将会对state状态进行判断,如果是敞开中,会进一步判断是否确认敞开,如果确认敞开,则会跳出run办法,工作线程执行完结,最终该SingleThreadEventExecutor的state状态更新为已终止
。这里咱们能够通过查看SingleThreadEventExecutor的默认实现DefaultEventExecutor的run办法
,源码如下:
protected void run() { for (;;) { // 从taskQueue中获取工作,如果是WAKEUP_TASK,则拿到的task为null Runnable task = takeTask(); if (task != null) { // 执行工作 runTask(task); // 更新上一次执行工夫 updateLastExecutionTime(); } // 确认是否敞开,如果是,跳出死循环 if (confirmShutdown()) { break; } }}
confirmShutdown的源码如下:
protected boolean confirmShutdown() { // 如果state < 敞开中,间接返回false if (!isShuttingDown()) { return false; } // 勾销定时工作 cancelScheduledTasks(); if (gracefulShutdownStartTime == 0) { // 设置优雅敞开的开始工夫为以后工夫 gracefulShutdownStartTime = ScheduledFutureTask.nanoTime(); } // 如果taskQueue里还有工作,运行所有工作,否则运行敞开钩子 if (runAllTasks() || runShutdownHooks()) { if (isShutdown()) { return true; } // 如果静默期为0,返回true确认敞开 if (gracefulShutdownQuietPeriod == 0) { return true; } taskQueue.offer(WAKEUP_TASK); return false; } final long nanoTime = ScheduledFutureTask.nanoTime(); // 如果state >= 已敞开,返回true确认敞开 // 否则判断以后工夫-优雅敞开的开始工夫 是否大于 超时工夫,如果大于,返回true确认敞开 if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) { return true; } // 如果以后工夫 - 上一次工作执行工夫 小于等于 静默工夫,阐明在这一段时间内还有工作执行,则线程休眠100毫秒,返回false暂不敞开 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) { // 休眠100毫秒后,用于持续唤醒工作线程 taskQueue.offer(WAKEUP_TASK); try { Thread.sleep(100); } catch (InterruptedException e) { } return false; } return true;}
当confirmShutdown返回true跳出run办法后
- 工作线程开始收尾工作
- 因为此时可能又有一些工作被增加到taskQueue外面,所以须要再次调用confirmShutdown办法
- 将
state状态更新为已敞开
(这时曾经不能再接管新的工作了) - 再次调用confirmShutdown办法(理由同上,如果是这样的话,第一次的confirmShutdown调用是不是没有必要?)。
- 最终调用cleanup()钩子办法,并
将state状态更新为已终止
,设置terminationFuture后果为胜利
三、NioEventLoopGroup和NioEventLoop
3.1 NioEventLoopGroup
NioEventLoopGroup
能够说是netty中咱们最相熟的类之一,继承于MultithreadEventLoopGroup,而MultithreadEventLoopGroup继承于MultithreadEventExecutorGroup(下面曾经剖析过该类),如下图
因为NioEventLoopGroup自身实现比较简单,所以这里咱们只看下它的newChild()办法
,这个办法之前在介绍MultithreadEventExecutorGroup也有提及,该办法是一个形象办法,须要子类具体实现,源码如下:
@Overrideprotected EventLoop newChild(Executor executor, Object... args) throws Exception { // SelectorProvider有关上选择器openSelector、关上服务端通道openServerSocketChannel等办法 SelectorProvider selectorProvider = (SelectorProvider) args[0]; // select策略工厂,用于产生SelectStrategy SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1]; // 拒绝执行处理器 RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2]; // taskQueue工厂,taskQueue之前已有提及 EventLoopTaskQueueFactory taskQueueFactory = null; // tailTaskQueue工厂 EventLoopTaskQueueFactory tailTaskQueueFactory = null; int argsLength = args.length; if (argsLength > 3) { taskQueueFactory = (EventLoopTaskQueueFactory) args[3]; } if (argsLength > 4) { tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4]; } // new 一个NioEventLoop实例 return new NioEventLoop(this, executor, selectorProvider, selectStrategyFactory.newSelectStrategy(), rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);}
能够看到,newChild办法的最初就是new一个NioEventLoop实例
,所以最初咱们须要来看下NioEventLoop中的源码,看看它到底是如何来运作的?
3.2 NioEventLoop
学习过NIO的同学都晓得,多个Channel能够注册到一个Selector,这样咱们就能够在单线程中通过一个Selector来治理多个Channel,这就是IO多路复用
,而NioEventLoop就是IO多路复用的一个具体实现
。
在最开始的类图咱们能够看到 NioEventLoop 和 EpollEventLoop
都继承于SingleThreadEventLoop,因为NioEventLoop更为常常应用,所以这里只介绍NioEventLoop
。NioEventLoop继承于SingleThreadEventLoop,而SingleThreadEventLoop继承于SingleThreadEventExecutor,如下图
SinleThreadEventLoop 在 SingleThreadEventExecutor 的根底上,增加了一个tailTasks工作队列(runAllTasks办法中执行完taskQueue中的工作后,会再执行tailTasks中的工作
)
3.2.1 构造函数
NioEventLoop的构造函数,接管多个参数,有executor执行器、select策略、拒绝执行处理器、taskQueue工厂、tailTaskQueue工厂,如下
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) { super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory), rejectedExecutionHandler); this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); // select策略 this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); // 创立SelectorTuple, final SelectorTuple selectorTuple = openSelector(); // 包装后的Selector,类型为SelectedSelectionKeySetSelector,蕴含了SelectedSelectionKeySet this.selector = selectorTuple.selector; // 未包装的Selector this.unwrappedSelector = selectorTuple.unwrappedSelector;}
3.2.2 工作流程
在之前SingleThreadEventExecutor的介绍中,咱们曾经晓得,它会在启动工作线程后,调用run办法,而run办法由子类具体实现
,所以NioEventLoop的run办法就是其工作流程,大抵的工作如下:
- 判断
taskQueue和tailTaskQueue
是否有工作,如果没有工作,则通过调用Selector.select办法来阻塞或超时阻塞获取IO事件
- 如果有工作,调用Selector.selectNow办法非阻塞获取IO事件
- 判断是否有IO事件筹备好,
如果有,先解决IO事件
- 再解决taskQueue和tailTaskQueue中的工作
- 返回第一步,
有限循环
流程图如下:
联合源码来看,源码如下:
protected void run() { int selectCnt = 0; // 死循环 for (;;) { try { int strategy; try { // 计算strategy的值 // 如果hasTasks为true,代表taskQueue或tailQueue里有工作,则间接调用Selector.selectNow()来获取以后已筹备好的IO事件数量,并间接返回 // 如果hasTask为false,代表taskQueue或tailQueue里没有工作,则返回SelectStrategy.SELECT(值为-1) strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: case SelectStrategy.SELECT: // 查看scheduledTaskQueue里的定时工作,如果定时工作不为空,将定时工作的deadlineNanos过期工夫赋值给curDeadlineNanos long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; } nextWakeupNanos.set(curDeadlineNanos); try { // 如果没有工作,则进一步判断 // 如果curDeadlineNanos为NONE,则调用Selector.select()进行阻塞,直到有IO事件筹备好 // 如果curDeadlineNanos不为NONE,则调用selector.select(timeoutMillis)进行超时阻塞 if (!hasTasks()) { strategy = select(curDeadlineNanos); } } finally { nextWakeupNanos.lazySet(AWAKE); } default: } } catch (IOException e) { // 这里是selector.select()的一个bug,即在某种状况下,在没有IO事件筹备好时,select()也没有进行阻塞,此时须要重建Selector // 后续会进行具体介绍 rebuildSelector0(); selectCnt = 0; handleLoopException(e); continue; } // select的次数 selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; // ioRatio示意processSelectedKeys办法(解决IO事件)和runAllTasks()办法所用的事件占比 // 如果ioRatio为50,则工夫比为1:1,如果为60,则工夫比为3:2 final int ioRatio = this.ioRatio; boolean ranTasks; if (ioRatio == 100) { // 如果ioRatio为100 try { if (strategy > 0) { // 解决筹备好的IO事件 processSelectedKeys(); } } finally { // 执行taskQueue里的所有工作 ranTasks = runAllTasks(); } } else if (strategy > 0) { final long ioStartTime = System.nanoTime(); try { // 解决筹备好的IO事件 processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; // 限时执行,工夫到了之后须要先返回,所以可能只能执行taskQueue里的局部工作 ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { // 只执行一个工作 ranTasks = runAllTasks(0); } // 如果至多有一个工作执行胜利,runTasks则为true,则重置selectCnt为0 if (ranTasks || strategy > 0) { if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } selectCnt = 0; } else if (unexpectedSelectorWakeup(selectCnt)) { // 如果未预料到的Selector被唤醒,阐明可能是bug呈现了,重建Selector并重置selectCnt selectCnt = 0; } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } } catch (Error e) { throw e; } catch (Throwable t) { handleLoopException(t); } finally { // 判断是否敞开,代码这里省略 } }}
3.2.3 解决IO事件
下面曾经介绍了NioEventLoop的工作流程,那么在判断如果有筹备好的IO事件,那么会调用processSelectedKeys来解决这些IO事件
,接下来就来看下它的源码
private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { // 获取对应IO事件的SelectionKey,以及它的附件 final SelectionKey k = selectedKeys.keys[i]; selectedKeys.keys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { // 真正来解决对应的IO事件,将SelectionKey以及对应的NioChannel传入 processSelectedKey(k, (AbstractNioChannel) a); } else { NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { selectedKeys.reset(i + 1); selectAgain(); i = -1; } }}
这里来看下processSelectedKey是如何真正的来解决每个IO事件的
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // 判断SelectionKey是否无效,如果有效,间接敞开channel通道并返回 if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { return; } if (eventLoop == this) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); } return; } try { // 获取以后SelectionKey的已筹备好的事件集 int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0) { //如果有OP_CONNECT连贯事件 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); // 设置ops,示意该SelectionKey不再关注连贯事件 unsafe.finishConnect(); // 下一章节会具体介绍,外部就是开始执行pipeline,去执行各个ChannelHandler对应的办法 } if ((readyOps & SelectionKey.OP_WRITE) != 0) { //如果有OP_WRITE写事件 ch.unsafe().forceFlush(); // 下一章节会具体介绍,外部就是开始执行pipeline,去执行各个ChannelHandler对应的办法 } // 如果有OP_READ读事件或者OP_ACCEPT接管事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); // 下一章节会具体介绍,外部就是开始执行pipeline,去执行各个ChannelHandler对应的办法 } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }}
3.2.4 空轮询问题
在执行Selector.select()办法时,失常状况下如果没有筹备好的Channel时,线程会被阻塞 。
而空轮询是因为Selector.select()没有正确工作,在没有筹备好的Channel时,就间接被唤醒,而没有进行阻塞。从而导致run办法始终在死循环,CPU达到了100%
。
那么在什么时候,代表该bug呈现了?
当空轮询次数selectCnt大于等于SELECTOR_AUTO_REBUILD_THRESHOLD(默认为512)
时,示意该bug呈现,则进行重建Selector
,源码如下:
private boolean unexpectedSelectorWakeup(int selectCnt) { if (Thread.interrupted()) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } return true; } // 判断如果空轮询次数selectCnt大于SELECTOR_AUTO_REBUILD_THRESHOLD时 if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); // 重建Selector rebuildSelector(); return true; } return false;}
如何重建Selector?
大抵步骤:
- 从新关上一个Selector
- 将旧Selector的所有Channel和对应的附件,都注册到新的Selector上
- 敞开旧Selector
private void rebuildSelector0() { final Selector oldSelector = selector; final SelectorTuple newSelectorTuple; if (oldSelector == null) { return; } // 关上一个新的Selector newSelectorTuple = openSelector(); int nChannels = 0; for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); try { if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) { continue; } int interestOps = key.interestOps(); key.cancel(); // 将旧Selector上的channel都注册到新的Selector SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a); if (a instanceof AbstractNioChannel) { // Update SelectionKey ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { // 省略局部代码 } } selector = newSelectorTuple.selector; unwrappedSelector = newSelectorTuple.unwrappedSelector; try { // 敞开旧Selector oldSelector.close(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } }}
四、总结
EventLoop和EventLoopGroup是netty中最重要的组件之一,也是netty可能构建高性能程序的关键所在,理解其工作机制与原理是十分有必要的。