EventLoop是Netty Server用于解决IO事件的事件轮询处理器,职责上相似于Redis的eventLoop,EventLoop通常是由EventLoopGroup来治理的,EventLoopGroup负责调度指派EventLoop,而EventLoop负责具体的执行。
先看一下罕用的NioEventLoopGroup构造关系图
继承关系:
办法图:
顺着关系图,先从各组件的根底性能说起。
EventExecutorGroup
本身提供shutdownGracefully执行器优雅敞开得接口
EventExecutorGroup接口继承ScheduleExecutorService和Iterable
ScheduleExecutorService负责任务调度
Iterable负责返回next()的EventExecutor对象
EventExecutor
EventExecutor继承EventExecutorGroup,在原有的接口根底上提供一些查看线程状态的接口
AbstractEventExecutorGroup
AbstractEventExecutorGroup是基于EventExecutorGroup的抽象类,提供简略的工作调用,次要是一些通过next()获取Executor并执行工作的简略模板,如下
@Overridepublic <T> Future<T> submit(Runnable task, T result) { return next().submit(task, result);}
MultithreadEventExecutorGroup
继承AbstractEventExecutorGroup的简略抽象类,初始化children,该字段保留EventLoop数组。提供缺省的线程工厂和Executor,还有一些批量解决children的实现(比方shutdown)
须要非凡留神的是,创立的EventLoop的接口申明也是在这个抽象类中
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
最终NioEventLoopGroup结构器都会进入上面的父类结构器
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
- nThread指定EventExecutor个数
- executor指定执行器,默认应用ThreadPerTaskExecutor执行器,提供最根本的线程执行性能
- chooserFactory生产EventExecutorChooser,chooser次要性能就是从executors列表中获取下一个EventExecutor(依据列表个数是否位2次幂抉择PowerOfTwoEventExecutorChooser或GenericEventExecutorChooser),罕用于next()办法用于获取下一个EventLoop
- args次要是提供结构Java Selector的SelectorProvider
MultithreadEventLoopGroup
MultithreadEventLoopGroup继承MultithreadEventExecutorGroup接口,并实现EventLoopGroup接口,提供Channel注册相干的模板。
NioEventLoopGroup
NioEventLoopGroup就是常见的Bootstrap(或ServerBootstrap)用于结构group()的实现类,其中实现了newChild接口用于创立具体的EventLoop实例。
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory); }
NioEventLoop
最常见的Netty EventLoop实现类,先看下NioEventLoop关系图
看起来很简单,别慌上面会对几个组件大抵阐明一下,先看几个根底的抽象类
AbstractExecutorService
首先看AbstractExecutorService抽象类,相熟JUC线程池工具包的应该比拟眼生,罕用的用于结构线程池的ThreadPoolExecutor对象就是继承自它,netty并没有沿用JUC的线程池而是抉择本人实现,AbstractExecutorService类只是提供根底的task创立,submit和invoke等操作的根底实现。
AbstractEventExecutor
AbstractEventExecutor继承AbstractExecutorService和EventExecutor接口,是个形象基类,货色不多这里略过。
AbstractScheduledEventExecutor
AbstractScheduledEventExecutor有些相似JUC的ScheduledThreadPoolExecutor,次要是任务调度的模板。
SingleThreadEventExecutor 和SingleThreadEventLoop
SingleThreadEventExecutor,任务调度的根本实现都在这个类里,execute具体实现也在当中。
SingleThreadEventLoop继承SingleThreadEventExecutor,实现了局部注册Channel和执行全副已提交工作的模板。
NioEventLoop
接下来终于轮到NioEventLoop了,次要负责Nio轮询逻辑。
首先如上所述NioEventLoop是在构建NioEventLoopGroup时由其父类MultithreadEventExecutorGroup在结构器中初始EventExecutor数组(children)时调用newChild创立的。
上面是NioEventLoop的结构器,内有正文
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { //newTaskQueue创立队列(jctools) super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); //设置nio selectorProvider this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); //设置select策略选择器,负责管制nio loop逻辑 this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); //selectorTuple其实就是一个简略的bean,外部存有原生selector和包装后的selector final SelectorTuple selectorTuple = openSelector(); this.selector = selectorTuple.selector; this.unwrappedSelector = selectorTuple.unwrappedSelector;}
taskQueue工作队列也是在这个时候创立的,默认应用的是JCTools的MPSC队列,是一个多生产单生产的高性能队列。
工作的调度
下面次要是梳理了一下NioEventLoopGroup的继承关系,上面会详细分析netty是如何设计事件模型来进行IO任务调度。
为了更好的梳理流程,咱们无妨从一个简略的netty服务端demo登程
上面是一个netty官网提供的EchoServer
public final class EchoServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}
疏忽掉ssl局部,首先构建了两个EventLoopGroup实例,在这过程中产生了什么后面曾经说过了,而后是设置channel和handler以及childHandler,最终调用bind(),上面会解释netty外部都做了些什么。
AbstractBootstrap
下面的例子执行bind()办法后,最初进入到AbstractBootstrap.doBind()办法中
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } //疏忽掉一些细节,最次要的就是执行上面的一段代码 ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; }}
initAndRegister
initAndRegister()负责创立和初始化channel,并返回ChannelFuture对象用于后续增加监听器来异步的解决后续的工作。
initAndRegister是初始化流程中十分重要的一步,channel的构建,注册, eventLoop线程启动都是在这之中,上面会顺次注明netty是如何初始化这几个对象,一些重要的说明会在代码中正文
final ChannelFuture initAndRegister() { Channel channel = null; try { //结构netty的channel实例,也是这个时候和java nio中的原生channel做绑定 channel = channelFactory.newChannel(); //初始化channel,server端和客户端的初始逻辑不同,server端会为pipelinee额定增加名为ServerBootstrapAcceptor的handler,而客户端只会增加初始化时用户指定的handler init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } //为channel执行register注册逻辑,次要实现nio中channel的register操作,nio的selector在eventloop初始化时就曾经创立好了 ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture;}
- init(channel) 初始化channel,server端和客户端的初始逻辑不同,server端会为pipeline额定增加名为ServerBootstrapAcceptor的handler,而客户端只会增加初始化时用户指定的handler.
- config().group().register(channel) 为channel执行register注册逻辑,次要实现nio中channel的register操作, selector在eventloop初始化时就曾经创立好了。
而eventloop轮询线程的启动也是在调用register()时触发的。
首先调用MultithreadEventLoopGroup.register()
public ChannelFuture register(Channel channel) { return next().register(channel);}
next()就是从children中获取下一个eventloop, 获取具体的eventloop实例后首先通过SingleThreadEventLoop抽象类把channel包装成ChannelPromise(channelFuture接口的可写模式)并获取unsafe()来实现底层的register性能
public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise;}public ChannelFuture register(final Channel channel, final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); ObjectUtil.checkNotNull(channel, "channel"); channel.unsafe().register(this, promise); return promise;}
最初由AbstractChannel抽象类的外部类AbstractUnsafe来实现底层的register操作
public final void register(EventLoop eventLoop, final ChannelPromise promise) { ObjectUtil.checkNotNull(eventLoop, "eventLoop"); //已注册间接返回,future标记谬误 if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } //校验是否兼容 if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } //为channel绑定eventLoop AbstractChannel.this.eventLoop = eventLoop; //查看以后线程是否为ecentLoo绑定线程,绑定线程是在启动ecentloop时设置的 if (eventLoop.inEventLoop()) { //调用理论绑定办法 register0(promise); } else { try { //调用execure来执行refister0()理论注册逻辑 eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }}
通过一些校验后,通过register0()中的doRegister()来实现理论注册操作
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister();//实现channel的注册 neverRegistered = false; registered = true;//标记为已注册 ... pipeline局部逻辑省略}
doRegister()的实现则在AbstractNioChannel抽象类中
protected void doRegister() throws Exception {boolean selected = false;for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { ...}}
到这里netty才实现理论的java NioChannel的注册逻辑。
EventLoop线程
下面讲到了如何为channel实现register绑定操作,那么回到正题EventLoop的轮询调度线程是何时被启动的呢?其实就是在方才AbstractChannel调用register时启动的。
public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { ... } }}
register会调用eventLoop的execute办法来执行register0,当初来看一下execute做了些什么。首先会进入到SingleThreadEventExecutor.execute
private void execute(Runnable task, boolean immediate) { //inEventLoop判断以后线程是否与SingleThreadEventExecutor.thread雷同,thread是在启动loop线程时设置的,所以为启动前为null boolean inEventLoop = inEventLoop(); //增加工作至队列 addTask(task); if (!inEventLoop) { //启动eventLoop线程 startThread(); if (isShutdown()) { boolean reject = false; try { if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) {1 } if (reject) { reject(); } } } if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); }}
inEventLoop判断以后线程是否与SingleThreadEventExecutor.thread雷同,thread是在启动loop线程时设置的,所以未启动前为null。
SingleThreadEventExecutor先将要执行的工作增加至队列,上文提到的register工作也会增加至该队列,队列初始则是由上文提到的NioEventLoop结构器来实现的。
startThread办法次要是对状态字段state作CAS查看并执行doStartThread()
private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try { doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); } } } }}
doStartThread通过executor来执行真正负责轮询逻辑的SingleThreadEventExecutor.this.run()办法, 另外executor能够通过NioEventLoopGroup结构器指定, 默认应用ThreadPerTaskExecutor每一次执行工作创立一个新线程执行工作。
private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); //查看是否中断 if (interrupted) { thread.interrupt(); } boolean success = false; //更新工夫单位为纳秒 updateLastExecutionTime(); try { //执行具体的轮询工作,该办法为形象办法 SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { //省略掉了shutdown后续确认逻辑,感兴趣的能够看一下源码 } } });}
SingleThreadEventExecutor.this.run()是形象办法,而在netty中
实现类默认为NioEventLoop(NioEventLoop由NioEventLoopGroup确定,反对epoll的linux中用户也能够通过指定EpollEventLoopGroup来获取EpollEventLoop,java nio其实曾经反对epoll操作,不过相比nio来说EpollEventLoop性能更好些,因为采纳ET模式,同时更少的gc, 因为执行run的大抵逻辑雷同,这里就基于罕用的NioEventLoop来阐明)
run次要执行以下步骤:
- 查看以后是否有工作,如果有通过supplier非阻塞调用select获取事件个数(selectNow办法,即select超时工夫设置0),否则返回SelectStrategy.SELECT枚举示意进行阻塞select
- 获取scheduleTask定时工作堆顶工作的deadline工夫,如果枚举为SELECT则先通过deadline计算timeout并select阻塞。
- 获取ioRatio参数,该值决定一次轮询解决工作的 预计最大工夫 = io等待时间 * (100 - ratio)/ratio,假如ratio为50则解决工作最大耗时为io工夫雷同,默认设置为50.
- 调用processSelectedKeys解决selectKeys,通过ioRatio取得解决工作最大工夫并执行工作。
上面是源码附正文:
protected void run() { //计数器记录select次数 int selectCnt = 0; for (;;) { try { int strategy; try {//以后有工作通过supplier非阻塞调用select获取事件个数,否则返回SelectStrategy.SELECT枚举示意进行阻塞select strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO case SelectStrategy.SELECT: //获取schedule定时工作的deadline long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { strategy = select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); } // fall through default: } } catch (IOException e) { // If we receive an IOException here its because the Selector is messed up. Let's rebuild // the selector and retry. https://github.com/netty/netty/issues/8566 rebuildSelector0(); selectCnt = 0; handleLoopException(e); continue; } selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; //ioRatio默认为50 final int ioRatio = this.ioRatio; boolean ranTasks; //100一次性执行全副工作 if (ioRatio == 100) { try { if (strategy > 0) { processSelectedKeys(); } } finally { // Ensure we always run tasks. ranTasks = runAllTasks(); } } else if (strategy > 0) { final long ioStartTime = System.nanoTime(); try { //解决selectKey processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; //依据ioRatio设置工作的解决工夫 ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { ranTasks = runAllTasks(0); // This will run the minimum number of tasks } if (ranTasks || strategy > 0) { if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } selectCnt = 0; } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case) selectCnt = 0; } } catch (CancelledKeyException e) { // Harmless exception - log anyway if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } }}
对于轮询逻辑还有两个比拟重要的点:
- 这里解决工作最大工夫是预估工夫,原理是串行解决工作时刷新并比对deadline工夫,可想而知如果工作是阻塞的就会重大影响nioEventLoop的性能,因而要求用户不要执行阻塞的工作。
- 仔细的可能留神到有一个计数变量selectCnt用来示意select次数,在开端处会调用unexpectedSelectorWakeup办法校验selectCnt数值是否超出阈值(默认512),如果超出阈值则调用rebuildSelector从新创立新的selector,并把旧的selector已注册的channel重新加入到新的selector中.
private boolean unexpectedSelectorWakeup(int selectCnt) { ...省略局部代码 //查看selectCnt是否超出阈值,如果超出阈值则调用rebuildSelector从新创立新的selector,并把旧的selector已注册的channel重新加入到新的selector中 if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); rebuildSelector(); return true; } return false;}
为什么超出阈值就要从新结构selector呢,这来源于jdk nio一个有名的epoll cpu 100%的bug,jdk-bug-6403933,该bug使select不能失常阻塞,因用户nio线程空转导致cpu飙升,所以netty应用计数的形式来检测该问题,并从新构建selector。
worker、boss的分工
BootStrap初始化的时候会要求设置worker和boss,例如EchoServer例子中的bossGroup和workerGroup,个别介绍中都会说到boss负责accpet,worker负责IO事件如下图,上面来解析netty如何调度流转worker和boss
首先Java NIO server须要指定ServerSocketChannel来实现bind、accept等操作,而netty则是对NIO的封装,那么ServerSocketChannel是在何时结构的呢,是在调用initAndRegister是,initAndRegister时会调用channelFactory.newChannel()
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } ....}
channelFactory须要指定class来实例化channel,这个channel即便在结构BootStrap时申明的,如下实例申明的NioServerSocketChannel.class
ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO))
之后就执行BossGroup下的EventLoop轮询逻辑,到目前为止其实与Worker还没有关系,先看一下如果用户申明的是ServerBootstrap,那么在调用initAndRegester时会调用init(),init又一个重要操作就是通过匿名函数将ServerBootstrapAcceptor退出到链表尾部。
void init(Channel channel) { //设置参数 setChannelOptions(channel, newOptionsArray(), logger); setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY)); //获取pipelone ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY); } final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY); //将handler增加至尾部 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });}
ServerBootstrapAcceptor是InboundHandler类型,会把BossGroup下的Eventloop监听到的Channel事件注册到childGroup(即worker)下,代码如下
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; //将childHandler退出链表 child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); try { //将channel注册至childGroup,即workerGroup下 childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { //出现异常敞开channel forceClose(child, t); }}
到此就实现了boss和worker之间的流转,至于channel是如何流转,pipeline原理等其余内容就放到Channel篇来讲述。