上一篇文章,剖析了Netty服务端启动的初始化过程,明天咱们来剖析一下Netty中的Reactor线程模型
在剖析源码之前,咱们先剖析,哪些地方用到了EventLoop?
- NioServerSocketChannel的连贯监听注册
- NioSocketChannel的IO事件注册
NioServerSocketChannel连贯监听
在AbstractBootstrap类的initAndRegister()办法中,当NioServerSocketChannel初始化实现后,会调用case
标记地位的代码进行注册。
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { } //注册到boss线程的selector上。 ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture;}
AbstractNioChannel.doRegister
依照代码的执行逻辑,最终会执行到AbstractNioChannel的doRegister()
办法中。
@Overrideprotected void doRegister() throws Exception { boolean selected = false; for (;;) { try { //调用ServerSocketChannel的register办法,把以后服务端对象注册到boss线程的selector上 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } }}
NioEventLoop的启动过程
NioEventLoop是一个线程,它的启动过程如下。
在AbstractBootstrap的doBind0办法中,获取了NioServerSocketChannel中的NioEventLoop,而后应用它来执行绑定端口的工作。
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { //启动 channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } });}
SingleThreadEventExecutor.execute
而后一路执行到SingleThreadEventExecutor.execute办法中,调用startThread()
办法启动线程。
private void execute(Runnable task, boolean immediate) { boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); //启动线程 if (isShutdown()) { boolean reject = false; try { if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) { // The task queue does not support removal so the best thing we can do is to just move on and // hope we will be able to pick-up the task before its completely terminated. // In worst case we will log on termination. } if (reject) { reject(); } } } if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); }}
startThread
private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try { doStartThread(); //执行启动过程 success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); } } } }}
接着调用doStartThread()办法,通过executor.execute
执行一个工作,在该工作中启动了NioEventLoop线程
private void doStartThread() { assert thread == null; executor.execute(new Runnable() { //通过线程池执行一个工作 @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); //调用boss的NioEventLoop的run办法,开启轮询 } //省略.... } });}
NioEventLoop的轮询过程
当NioEventLoop线程被启动后,就间接进入到NioEventLoop的run办法中。
protected void run() { int selectCnt = 0; for (;;) { try { int strategy; try { strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { strategy = select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); } // fall through default: } } catch (IOException e) { // If we receive an IOException here its because the Selector is messed up. Let's rebuild // the selector and retry. https://github.com/netty/netty/issues/8566 rebuildSelector0(); selectCnt = 0; handleLoopException(e); continue; } selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; boolean ranTasks; if (ioRatio == 100) { try { if (strategy > 0) { processSelectedKeys(); } } finally { // Ensure we always run tasks. ranTasks = runAllTasks(); } } else if (strategy > 0) { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { ranTasks = runAllTasks(0); // This will run the minimum number of tasks } if (ranTasks || strategy > 0) { if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } selectCnt = 0; } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case) selectCnt = 0; } } catch (CancelledKeyException e) { // Harmless exception - log anyway if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } } catch (Error e) { throw (Error) e; } catch (Throwable t) { handleLoopException(t); } finally { // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Error e) { throw (Error) e; } catch (Throwable t) { handleLoopException(t); } } }}
NioEventLoop的执行流程
NioEventLoop中的run办法是一个有限循环的线程,在该循环中次要做三件事件,如图9-1所示。
<center>图9-1</center>
- 轮询解决I/O事件(select),轮询Selector选择器中曾经注册的所有Channel的I/O就绪事件
- 解决I/O事件,如果存在曾经就绪的Channel的I/O事件,则调用
processSelectedKeys
进行解决 - 解决异步工作(runAllTasks),Reactor线程有一个十分重要的职责,就是解决工作队列中的非I/O工作,Netty提供了ioRadio参数用来调整I/O工夫和工作解决的工夫比例。
轮询I/O就绪事件
咱们先来看I/O工夫相干的代码片段:
- 通过
selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())
获取以后的执行策略 - 依据不同的策略,用来管制每次轮询时的执行策略。
protected void run() { int selectCnt = 0; for (;;) { try { int strategy; try { strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { strategy = select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); } // fall through default: } } //省略.... } }}
selectStrategy解决逻辑
@Overridepublic int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception { return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;}
如果hasTasks
为true,示意以后NioEventLoop线程存在异步工作的状况下,则调用selectSupplier.get()
,否则间接返回SELECT
。
其中selectSupplier.get()
的定义如下:
private final IntSupplier selectNowSupplier = new IntSupplier() { @Override public int get() throws Exception { return selectNow(); }};
该办法中调用的是selectNow()
办法,这个办法是Selector选择器中的提供的非阻塞办法,执行后会立即返回。
- 如果以后曾经有就绪的Channel,则会返回对应就绪Channel的数量
- 否则,返回0.
分支解决
在下面一个步骤中取得了strategy之后,会依据不同的后果进行分支解决。
- CONTINUE,示意须要重试。
- BUSY_WAIT,因为在NIO中并不反对BUSY_WAIT,所以BUSY_WAIT和SELECT的执行逻辑是一样的
- SELECT,示意须要通过select办法获取就绪的Channel列表,当NioEventLoop中不存在异步工作时,也就是工作队列为空,则返回该策略。
switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { strategy = select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); } // fall through default:}
SelectStrategy.SELECT
当NioEventLoop线程中不存在异步工作时,则开始执行SELECT策略
//下一次定时工作触发截至工夫,默认不是定时工作,返回 -1Llong curDeadlineNanos = nextScheduledTaskDeadlineNanos();if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar}nextWakeupNanos.set(curDeadlineNanos);try { if (!hasTasks()) { //2. taskQueue中工作执行完,开始执行select进行阻塞 strategy = select(curDeadlineNanos); }} finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE);}
select办法定义如下,默认状况下deadlineNanos=NONE
,所以会调用select()
办法阻塞。
private int select(long deadlineNanos) throws IOException { if (deadlineNanos == NONE) { return selector.select(); } //计算select()办法的阻塞超时工夫 long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L; return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);}
最终返回就绪的channel个数,后续的逻辑中会依据返回的就绪channel个数来决定执行逻辑。
NioEventLoop.run中的业务解决
业务解决的逻辑相对来说比拟容易了解
- 如果有就绪的channel,则解决就绪channel的IO事件
- 解决实现后同步执行异步队列中的工作。
- 另外,这里为了解决Java NIO中的空转问题,通过selectCnt记录了空转次数,一次循环产生了空转(既没有IO须要解决、也没有执行任何工作),那么记录下来(selectCnt); ,如果间断产生空转(selectCnt达到肯定值),netty认为触发了NIO的BUG(unexpectedSelectorWakeup解决);
Java Nio中有一个bug,Java nio在Linux零碎下的epoll空轮询问题。也就是在select()
办法中,及时就绪的channel为0,也会从原本应该阻塞的操作中被唤醒,从而导致CPU 使用率达到100%。
@Overrideprotected void run() { int selectCnt = 0; for (;;) { //省略.... selectCnt++;//selectCnt记录的是无功而返的select次数,即eventLoop空转的次数,为解决NIO BUG cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; boolean ranTasks; if (ioRatio == 100) { //ioRadio执行工夫占比是100%,默认是50% try { if (strategy > 0) { //strategy>0示意存在就绪的SocketChannel processSelectedKeys(); //执行就绪SocketChannel的工作 } } finally { //留神,将ioRatio设置为100,并不代表工作不执行,反而是每次将工作队列执行完 ranTasks = runAllTasks(); //确保总是执行队列中的工作 } } else if (strategy > 0) { //strategy>0示意存在就绪的SocketChannel final long ioStartTime = System.nanoTime(); //io工夫解决开始工夫 try { processSelectedKeys(); //开始解决IO就绪事件 } finally { // io事件执行完结工夫 final long ioTime = System.nanoTime() - ioStartTime; //基于本次循环解决IO的工夫,ioRatio,计算出执行工作耗时的下限,也就是只容许解决多长时间异步工作 ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { //这个分支代表:strategy=0,ioRatio<100,此时工作限时=0,意为:尽量少地执行异步工作 //这个分支和strategy>0理论是一码事,代码简化了一下而已 ranTasks = runAllTasks(0); // This will run the minimum number of tasks } if (ranTasks || strategy > 0) { //ranTasks=true,或strategy>0,阐明eventLoop干活了,没有空转,清空selectCnt if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } selectCnt = 0; } //unexpectedSelectorWakeup解决NIO BUG else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case) selectCnt = 0; } }}
processSelectedKeys
通过在select
办法中,咱们能够取得就绪的I/O事件数量,从而触发执行processSelectedKeys
办法。
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); }}
解决I/O事件时,有两个逻辑分支解决:
- 一种是解决Netty优化过的selectedKeys,
- 另一种是失常的解决逻辑
processSelectedKeys办法中依据是否设置了selectedKeys
来判断应用哪种策略,默认应用的是Netty优化过的selectedKeys,它返回的对象是SelectedSelectionKeySet
。
processSelectedKeysOptimized
private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { //1. 取出IO事件以及对应的channel final SelectionKey k = selectedKeys.keys[i]; selectedKeys.keys[i] = null;//k的援用置null,便于gc回收,也示意该channel的事件处理实现防止反复解决 final Object a = k.attachment(); //获取保留在以后channel中的attachment,此时应该是NioServerSocketChannel //解决以后的channel if (a instanceof AbstractNioChannel) { //对于boss NioEventLoop,轮询到的根本是连贯事件,后续的事件就是通过他的pipeline将连贯扔给一个worker NioEventLoop解决 //对于worker NioEventLoop来说,轮循道的根本商是IO读写事件,后续的事件就是通过他的pipeline将读取到的字节流传递给每个channelHandler来解决 processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1; } }}
processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { } if (eventLoop == this) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); } return; } try { int readyOps = k.readyOps(); //获取以后key所属的操作类型 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {//如果是连贯类型 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0) { //如果是写类型 ch.unsafe().forceFlush(); } //如果是读类型或者ACCEPT类型。则执行unsafe.read()办法,unsafe的实例对象为 NioMessageUnsafe if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); }}
NioMessageUnsafe.read()
假如此时是一个读操作,或者是客户端建设连贯,那么代码执行逻辑如下,
@Overridepublic void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); //如果是第一次建设连贯,此时的pipeline是ServerBootstrapAcceptor final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (continueReading(allocHandle)); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); //调用pipeline中的channelRead办法 } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); //调用pipeline中的ExceptionCaught办法 } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } }}
SelectedSelectionKeySet的优化
Netty中本人封装实现了一个SelectedSelectionKeySet,用来优化本来SelectorKeys的构造,它是怎么进行优化的呢?先来看它的代码定义
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> { SelectionKey[] keys; int size; SelectedSelectionKeySet() { keys = new SelectionKey[1024]; } @Override public boolean add(SelectionKey o) { if (o == null) { return false; } keys[size++] = o; if (size == keys.length) { increaseCapacity(); } return true; }}
SelectedSelectionKeySet外部应用的是SelectionKey数组,所有在processSelectedKeysOptimized办法中能够间接通过遍历数组来取出就绪的I/O事件。
而原来的Set<SelectionKey>
返回的是HashSet类型,两者相比,SelectionKey[]不须要思考哈希抵触的问题,所以能够实现O(1)工夫复杂度的add操作。
SelectedSelectionKeySet的初始化
netty通过反射的形式,把Selector对象外部的selectedKeys和publicSelectedKeys替换为SelectedSelectionKeySet。
本来的selectedKeys和publicSelectedKeys这两个字段都是HashSet类型,替换之后变成了SelectedSelectionKeySet。当有就绪的key时,会间接填充到SelectedSelectionKeySet的数组中。后续只须要遍历即可。
private SelectorTuple openSelector() { final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); //应用反射 Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { //Selector外部的selectedKeys字段 Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); //Selector外部的publicSelectedKeys字段 Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { //获取selectedKeysField字段偏移量 long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); //获取publicSelectedKeysField字段偏移量 long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField); if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) { //替换为selectedKeySet PlatformDependent.putObject( unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); PlatformDependent.putObject( unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); return null; } // We could not retrieve the offset, lets try reflection as last-resort. } Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); if (cause != null) { return cause; } cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); if (cause != null) { return cause; } selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; } } }); if (maybeException instanceof Exception) { selectedKeys = null; Exception e = (Exception) maybeException; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e); return new SelectorTuple(unwrappedSelector); } selectedKeys = selectedKeySet;}
异步工作的执行流程
剖析完下面的流程后,咱们持续来看NioEventLoop中的run办法中,针对异步工作的解决流程
@Overrideprotected void run() { int selectCnt = 0; for (;;) { ranTasks = runAllTasks(); }}
runAllTask
须要留神,NioEventLoop能够反对定时工作的执行,通过nioEventLoop.schedule()
来实现。
protected boolean runAllTasks() { assert inEventLoop(); boolean fetchedAll; boolean ranAtLeastOne = false; do { fetchedAll = fetchFromScheduledTaskQueue(); //合并定时工作到一般工作队列 if (runAllTasksFrom(taskQueue)) { //循环执行taskQueue中的工作 ranAtLeastOne = true; } } while (!fetchedAll); if (ranAtLeastOne) { //如果工作全副执行实现,记录执行完实现工夫 lastExecutionTime = ScheduledFutureTask.nanoTime(); } afterRunningAllTasks();//执行收尾工作 return ranAtLeastOne;}
fetchFromScheduledTaskQueue
遍历scheduledTaskQueue中的工作,增加到taskQueue中。
private boolean fetchFromScheduledTaskQueue() { if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) { return true; } long nanoTime = AbstractScheduledEventExecutor.nanoTime(); for (;;) { Runnable scheduledTask = pollScheduledTask(nanoTime); if (scheduledTask == null) { return true; } if (!taskQueue.offer(scheduledTask)) { // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again. scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask); return false; } }}
工作增加办法execute
NioEventLoop外部有两个十分重要的异步工作队列,别离是一般工作和定时工作队列,针对这两个队列提供了两个办法别离向两个队列中增加工作。
- execute()
- schedule()
其中,execute办法的定义如下。
private void execute(Runnable task, boolean immediate) { boolean inEventLoop = inEventLoop(); addTask(task); //把当前任务增加到阻塞队列中 if (!inEventLoop) { //如果是非NioEventLoop startThread(); //启动线程 if (isShutdown()) { //如果以后NioEventLoop曾经是进行状态 boolean reject = false; try { if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) { // The task queue does not support removal so the best thing we can do is to just move on and // hope we will be able to pick-up the task before its completely terminated. // In worst case we will log on termination. } if (reject) { reject(); } } } if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); }}
Nio的空轮转问题
所谓的空轮训,是指咱们在执行selector.select()
办法时,如果没有就绪的SocketChannel时,以后线程会被阻塞 。 而空轮询是指当没有就绪SocketChannel时,会被触发唤醒。
而这个唤醒是没有任何读写申请的,从而导致线程在做有效的轮询,使得CPU占用率较高。
导致这个问题的根本原因是:
在局部Linux的2.6的kernel中,poll和epoll对于忽然中断的连贯socket会对返回的eventSet事件汇合置为POLLHUP,也可能是POLLERR,eventSet事件汇合产生了变动,这就可能导致Selector会被唤醒。这是与操作系统机制有关系的,JDK尽管仅仅是一个兼容各个操作系统平台的软件,但很遗憾在JDK5和JDK6最后的版本中(严格意义上来将,JDK局部版本都是),这个问题并没有解决,而将这个帽子抛给了操作系统方,这也就是这个bug最终始终到2013年才最终修复的起因,最终影响力太广。
Netty是如何解决这个问题的呢?咱们回到NioEventLoop的run办法中
@Overrideprotected void run() { int selectCnt = 0; for (;;) { //selectCnt记录的是无功而返的select次数,即eventLoop空转的次数,为解决NIO BUG selectCnt++; //ranTasks=true,或strategy>0,阐明eventLoop干活了,没有空转,清空selectCnt if (ranTasks || strategy > 0) { //如果抉择操作计数器的值,大于最小选择器重构阈值,则输入log if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } selectCnt = 0; } //unexpectedSelectorWakeup解决NIO BUG else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case) selectCnt = 0; } }}
unexpectedSelectorWakeup
private boolean unexpectedSelectorWakeup(int selectCnt) { if (Thread.interrupted()) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } return true; } //如果抉择重构的阈值大于0, 默认值是512次、 并且以后触发的空轮询次数大于 512次。,则触发重构 if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); rebuildSelector(); return true; } return false;}
rebuildSelector()
public void rebuildSelector() { if (!inEventLoop()) { //如果不是在eventLoop中执行,则应用异步线程执行 execute(new Runnable() { @Override public void run() { rebuildSelector0(); } }); return; } rebuildSelector0();}
rebuildSelector0
这个办法的次要作用: 从新创立一个选择器,代替以后事件循环中的选择器
private void rebuildSelector0() { final Selector oldSelector = selector; //获取老的selector选择器 final SelectorTuple newSelectorTuple; //定义新的选择器 if (oldSelector == null) { //如果老的选择器为空,间接返回 return; } try { newSelectorTuple = openSelector(); //创立一个新的选择器 } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; } // Register all channels to the new Selector. int nChannels = 0; for (SelectionKey key: oldSelector.keys()) {//遍历注册到选择器的抉择key汇合 Object a = key.attachment(); try { //如果抉择key有效或抉择关联的通道曾经注册到新的选择器,则跳出以后循环 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) { continue; } //获取key的抉择关注事件集 int interestOps = key.interestOps(); key.cancel();//勾销抉择key //注册抉择key到新的选择器 SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a); if (a instanceof AbstractNioChannel) {//如果是nio通道,则更新通道的抉择key // Update SelectionKey ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { logger.warn("Failed to re-register a Channel to the new Selector.", e); if (a instanceof AbstractNioChannel) { AbstractNioChannel ch = (AbstractNioChannel) a; ch.unsafe().close(ch.unsafe().voidPromise()); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; invokeChannelUnregistered(task, key, e); } } } //更新以后事件循环选择器 selector = newSelectorTuple.selector; unwrappedSelector = newSelectorTuple.unwrappedSelector; try { // time to close the old selector as everything else is registered to the new one oldSelector.close(); //敞开原始选择器 } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } } if (logger.isInfoEnabled()) { logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); }}
从上述过程中咱们发现,Netty解决NIO空轮转问题的形式,是通过重建Selector对象来实现的,在这个重建过程中,外围是把Selector中所有的SelectionKey从新注册到新的Selector上,从而奇妙的防止了JDK epoll空轮训问题。
连贯的建设及处理过程
在9.2.4.3节中,提到了当客户端有连贯或者读事件发送到服务端时,会调用NioMessageUnsafe类的read()办法。
public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //如果有客户端连贯进来,则localRead为1,否则返回0 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); //累计减少read音讯数量 } while (continueReading(allocHandle)); } catch (Throwable t) { exception = t; } int size = readBuf.size(); //遍历客户端连贯列表 for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); //调用pipeline中handler的channelRead办法。 } readBuf.clear(); //清空集合 allocHandle.readComplete(); pipeline.fireChannelReadComplete(); //触发pipeline中handler的readComplete办法 if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } }}
pipeline.fireChannelRead(readBuf.get(i))
持续来看pipeline的触发办法,此时的pipeline组成,如果以后是连贯事件,那么pipeline = ServerBootstrap$ServerBootstrapAcceptor。
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); //获取pipeline中的下一个节点,调用该handler的channelRead办法 } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); }}
ServerBootstrapAcceptor
ServerBootstrapAcceptor是NioServerSocketChannel中一个非凡的Handler,专门用来解决客户端连贯事件,该办法中外围的目标是把针对SocketChannel的handler链表,增加到以后NioSocketChannel中的pipeline中。
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); //把服务端配置的childHandler,增加到以后NioSocketChannel中的pipeline中 setChannelOptions(child, childOptions, logger); //设置NioSocketChannel的属性 setAttributes(child, childAttrs); try { //把以后的NioSocketChannel注册到Selector上,并且监听一个异步事件。 childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); }}
pipeline的构建过程
9.6.2节中,child其实就是一个NioSocketChannel,它是在NioServerSocketChannel中,当接管到一个新的链接时,创建对象。
@Overrideprotected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); //这里 return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0;}
而NioSocketChannel在结构时,调用了父类AbstractChannel中的构造方法,初始化了一个pipeline.
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline();}
DefaultChannelPipeline
pipeline的默认实例是DefaultChannelPipeline,构造方法如下。
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head;}
初始化了一个头节点和尾节点,组成一个双向链表,如图9-2所示
<center>图9-2</center>
NioSocketChannel中handler链的形成
再回到ServerBootstrapAccepter的channelRead办法中,收到客户端连贯时,触发了NioSocketChannel中的pipeline的增加
以下代码是DefaultChannelPipeline的addLast办法。
@Overridepublic final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { ObjectUtil.checkNotNull(handlers, "handlers"); for (ChannelHandler h: handlers) { //遍历handlers列表,此时这里的handler是ChannelInitializer回调办法 if (h == null) { break; } addLast(executor, null, h); } return this;}
addLast
把服务端配置的ChannelHandler,增加到pipeline中,留神,此时的pipeline中保留的是ChannelInitializer回调办法。
@Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); //查看是否有反复的handler //创立新的DefaultChannelHandlerContext节点 newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); //增加新的DefaultChannelHandlerContext到ChannelPipeline if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this; } } callHandlerAdded0(newCtx); return this;}
这个回调办法什么时候触发调用呢?其实就是在ServerBootstrapAcceptor
这个类的channelRead办法中,注册以后NioSocketChannel时
childGroup.register(child).addListener(new ChannelFutureListener() {}
最终依照之前咱们上一节课源码剖析的思路,定位到AbstractChannel中的register0办法中。
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; // pipeline.invokeHandlerAddedIfNeeded(); }}
callHandlerAddedForAllHandlers
pipeline.invokeHandlerAddedIfNeeded()办法,向下执行,会进入到DefaultChannelPipeline这个类中的callHandlerAddedForAllHandlers办法中
private void callHandlerAddedForAllHandlers() { final PendingHandlerCallback pendingHandlerCallbackHead; synchronized (this) { assert !registered; // This Channel itself was registered. registered = true; pendingHandlerCallbackHead = this.pendingHandlerCallbackHead; // Null out so it can be GC'ed. this.pendingHandlerCallbackHead = null; } //从期待被调用的handler 回调列表中,取出工作来执行。 PendingHandlerCallback task = pendingHandlerCallbackHead; while (task != null) { task.execute(); task = task.next; }}
咱们发现,pendingHandlerCallbackHead这个单向链表,是在callHandlerCallbackLater办法中被增加的,
而callHandlerCallbackLater又是在addLast办法中增加的,所以形成了一个异步残缺的闭环。
ChannelInitializer.handlerAdded
task.execute()办法执行门路是
callHandlerAdded0 -> ctx.callHandlerAdded ->
-------> AbstractChannelHandlerContext.callHandlerAddded()
---------------> ChannelInitializer.handlerAdded
调用initChannel办法来初始化NioSocketChannel中的Channel.
@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { // This should always be true with our current DefaultChannelPipeline implementation. // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers // will be added in the expected order. if (initChannel(ctx)) { // We are done with init the Channel, removing the initializer now. removeState(ctx); } }}
接着,调用initChannel形象办法,该办法由具体的实现类来实现。
private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); } catch (Throwable cause) { // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...). // We do so to prevent multiple calls to initChannel(...). exceptionCaught(ctx, cause); } finally { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { pipeline.remove(this); } } return true; } return false;}
ChannelInitializer的实现,是咱们自定义Server中的匿名外部类,ChannelInitializer。因而通过这个回调来实现以后NioSocketChannel的pipeline的构建过程。
public static void main(String[] args){ EventLoopGroup boss = new NioEventLoopGroup(); //2 用于对承受客户端连贯读写操作的线程工作组 EventLoopGroup work = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(boss, work) //绑定两个工作线程组 .channel(NioServerSocketChannel.class) //设置NIO的模式 // 初始化绑定服务通道 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline() .addLast( new LengthFieldBasedFrameDecoder(1024, 9,4,0,0)) .addLast(new MessageRecordEncoder()) .addLast(new MessageRecordDecode()) .addLast(new ServerHandler()); } });}
版权申明:本博客所有文章除特地申明外,均采纳 CC BY-NC-SA 4.0 许可协定。转载请注明来自Mic带你学架构
!
如果本篇文章对您有帮忙,还请帮忙点个关注和赞,您的保持是我一直创作的能源。欢送关注同名微信公众号获取更多技术干货!