EventLoop是 Netty Server 用于解决 IO 事件的事件轮询处理器,职责上相似于 Redis 的 eventLoop,EventLoop 通常是由 EventLoopGroup 来治理的,EventLoopGroup 负责调度指派 EventLoop,而 EventLoop 负责具体的执行。
先看一下罕用的 NioEventLoopGroup 构造关系图
继承关系:
办法图:
顺着关系图,先从各组件的根底性能说起。
EventExecutorGroup
本身提供 shutdownGracefully 执行器优雅敞开得接口
EventExecutorGroup 接口继承 ScheduleExecutorService 和 Iterable
ScheduleExecutorService 负责任务调度
Iterable 负责返回 next() 的 EventExecutor 对象
EventExecutor
EventExecutor 继承 EventExecutorGroup,在原有的接口根底上提供一些查看线程状态的接口
AbstractEventExecutorGroup
AbstractEventExecutorGroup 是基于 EventExecutorGroup 的抽象类,提供简略的工作调用,次要是一些通过 next()获取 Executor 并执行工作的简略模板,如下
@Override
public <T> Future<T> submit(Runnable task, T result) {return next().submit(task, result);
}
MultithreadEventExecutorGroup
继承 AbstractEventExecutorGroup 的简略抽象类,初始化children,该字段保留 EventLoop 数组。提供缺省的线程工厂和 Executor,还有一些批量解决 children 的实现(比方 shutdown)
须要非凡留神的是,创立的 EventLoop 的接口申明也是在这个抽象类中
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
最终 NioEventLoopGroup 结构器都会进入上面的父类结构器
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
- nThread 指定 EventExecutor 个数
- executor 指定执行器,默认应用 ThreadPerTaskExecutor 执行器,提供最根本的线程执行性能
- chooserFactory 生产 EventExecutorChooser,chooser 次要性能就是从 executors 列表中获取下一个 EventExecutor(依据列表个数是否位 2 次幂抉择 PowerOfTwoEventExecutorChooser 或 GenericEventExecutorChooser),罕用于 next()办法用于获取下一个 EventLoop
- args 次要是提供结构 Java Selector 的 SelectorProvider
MultithreadEventLoopGroup
MultithreadEventLoopGroup 继承 MultithreadEventExecutorGroup 接口,并实现 EventLoopGroup 接口,提供 Channel 注册相干的模板。
NioEventLoopGroup
NioEventLoopGroup 就是常见的 Bootstrap(或 ServerBootstrap)用于结构 group()的实现类,其中实现了 newChild 接口用于创立具体的 EventLoop 实例。
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
NioEventLoop
最常见的 Netty EventLoop 实现类,先看下 NioEventLoop 关系图
看起来很简单,别慌上面会对几个组件大抵阐明一下,先看几个根底的抽象类
AbstractExecutorService
首先看 AbstractExecutorService 抽象类,相熟 JUC 线程池工具包的应该比拟眼生,罕用的用于结构线程池的 ThreadPoolExecutor 对象就是继承自它,netty 并没有沿用 JUC 的线程池而是抉择本人实现,AbstractExecutorService 类只是提供根底的 task 创立,submit 和 invoke 等操作的根底实现。
AbstractEventExecutor
AbstractEventExecutor 继承 AbstractExecutorService 和 EventExecutor 接口,是个形象基类,货色不多这里略过。
AbstractScheduledEventExecutor
AbstractScheduledEventExecutor 有些相似 JUC 的 ScheduledThreadPoolExecutor,次要是任务调度的模板。
SingleThreadEventExecutor 和 SingleThreadEventLoop
SingleThreadEventExecutor,任务调度的根本实现都在这个类里,execute 具体实现也在当中。
SingleThreadEventLoop 继承 SingleThreadEventExecutor,实现了局部注册 Channel 和执行全副已提交工作的模板。
NioEventLoop
接下来终于轮到 NioEventLoop 了,次要负责 Nio 轮询逻辑。
首先如上所述 NioEventLoop 是在构建 NioEventLoopGroup 时由其父类 MultithreadEventExecutorGroup 在结构器中初始 EventExecutor 数组(children)时调用 newChild 创立的。
上面是 NioEventLoop 的结构器,内有正文
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {//newTaskQueue 创立队列(jctools)
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
// 设置 nio selectorProvider
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
// 设置 select 策略选择器,负责管制 nio loop 逻辑
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
//selectorTuple 其实就是一个简略的 bean,外部存有原生 selector 和包装后的 selector
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
taskQueue 工作队列也是在这个时候创立的,默认应用的是 JCTools 的 MPSC 队列,是一个多生产单生产的高性能队列。
工作的调度
下面次要是梳理了一下 NioEventLoopGroup 的继承关系,上面会详细分析 netty 是如何设计事件模型来进行 IO 任务调度。
为了更好的梳理流程,咱们无妨从一个简略的 netty 服务端 demo 登程
上面是一个 netty 官网提供的 EchoServer
public final class EchoServer {static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();} else {sslCtx = null;}
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();}
}
}
疏忽掉 ssl 局部,首先构建了两个 EventLoopGroup 实例,在这过程中产生了什么后面曾经说过了,而后是设置 channel 和 handler 以及 childHandler,最终调用 bind(),上面会解释 netty 外部都做了些什么。
AbstractBootstrap
下面的例子执行 bind()办法后,最初进入到 AbstractBootstrap.doBind()办法中
private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {return regFuture;}
// 疏忽掉一些细节,最次要的就是执行上面的一段代码
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
}
}
initAndRegister
initAndRegister()负责创立和初始化 channel,并返回 ChannelFuture 对象用于后续增加监听器来异步的解决后续的工作。
initAndRegister 是初始化流程中十分重要的一步,channel 的构建,注册, eventLoop 线程启动都是在这之中,上面会顺次注明 netty 是如何初始化这几个对象,一些重要的说明会在代码中正文
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 结构 netty 的 channel 实例,也是这个时候和 java nio 中的原生 channel 做绑定
channel = channelFactory.newChannel();
// 初始化 channel,server 端和客户端的初始逻辑不同,server 端会为 pipelinee 额定增加名为 ServerBootstrapAcceptor 的 handler,而客户端只会增加初始化时用户指定的 handler
init(channel);
} catch (Throwable t) {if (channel != null) {// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 为 channel 执行 register 注册逻辑,次要实现 nio 中 channel 的 register 操作,nio 的 selector 在 eventloop 初始化时就曾经创立好了
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();
} else {channel.unsafe().closeForcibly();}
}
return regFuture;
}
- init(channel) 初始化 channel,server 端和客户端的初始逻辑不同,server 端会为 pipeline 额定增加名为 ServerBootstrapAcceptor 的 handler,而客户端只会增加初始化时用户指定的 handler.
- config().group().register(channel) 为 channel 执行 register 注册逻辑,次要实现 nio 中 channel 的 register 操作, selector 在 eventloop 初始化时就曾经创立好了。
而 eventloop 轮询线程的启动也是在调用 register() 时触发的。
首先调用MultithreadEventLoopGroup.register()
public ChannelFuture register(Channel channel) {return next().register(channel);
}
next()就是从 children 中获取下一个 eventloop, 获取具体的 eventloop 实例后首先通过 SingleThreadEventLoop 抽象类把 channel 包装成 ChannelPromise(channelFuture 接口的可写模式)并获取 unsafe()来实现底层的 register 性能
public ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");
ObjectUtil.checkNotNull(channel, "channel");
channel.unsafe().register(this, promise);
return promise;
}
最初由 AbstractChannel 抽象类的外部类 AbstractUnsafe 来实现底层的 register 操作
public final void register(EventLoop eventLoop, final ChannelPromise promise) {ObjectUtil.checkNotNull(eventLoop, "eventLoop");
// 已注册间接返回,future 标记谬误
if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
// 校验是否兼容
if (!isCompatible(eventLoop)) {
promise.setFailure(new IllegalStateException("incompatible event loop type:" + eventLoop.getClass().getName()));
return;
}
// 为 channel 绑定 eventLoop
AbstractChannel.this.eventLoop = eventLoop;
// 查看以后线程是否为 ecentLoo 绑定线程,绑定线程是在启动 ecentloop 时设置的
if (eventLoop.inEventLoop()) {
// 调用理论绑定办法
register0(promise);
} else {
try {// 调用 execure 来执行 refister0()理论注册逻辑
eventLoop.execute(new Runnable() {
@Override
public void run() {register0(promise);
}
});
} catch (Throwable t) {
logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
通过一些校验后,通过 register0()中的 doRegister() 来实现理论注册操作
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}
boolean firstRegistration = neverRegistered;
doRegister();// 实现 channel 的注册
neverRegistered = false;
registered = true;// 标记为已注册
... pipeline 局部逻辑省略
}
doRegister()的实现则在 AbstractNioChannel 抽象类中
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {...}
}
到这里 netty 才实现理论的 java NioChannel 的注册逻辑。
EventLoop 线程
下面讲到了如何为 channel 实现 register 绑定操作,那么回到正题 EventLoop 的轮询调度线程是何时被启动的呢?其实就是在方才 AbstractChannel 调用 register 时启动的。
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
if (eventLoop.inEventLoop()) {register0(promise);
} else {
try {eventLoop.execute(new Runnable() {
@Override
public void run() {register0(promise);
}
});
} catch (Throwable t) {...}
}
}
register 会调用 eventLoop 的 execute 办法来执行 register0,当初来看一下 execute 做了些什么。首先会进入到SingleThreadEventExecutor.execute
private void execute(Runnable task, boolean immediate) {
//inEventLoop 判断以后线程是否与 SingleThreadEventExecutor.thread 雷同,thread 是在启动 loop 线程时设置的,所以为启动前为 null
boolean inEventLoop = inEventLoop();
// 增加工作至队列
addTask(task);
if (!inEventLoop) {
// 启动 eventLoop 线程
startThread();
if (isShutdown()) {
boolean reject = false;
try {if (removeTask(task)) {reject = true;}
} catch (UnsupportedOperationException e) {1}
if (reject) {reject();
}
}
}
if (!addTaskWakesUp && immediate) {wakeup(inEventLoop);
}
}
inEventLoop 判断以后线程是否与 SingleThreadEventExecutor.thread 雷同,thread 是在启动 loop 线程时设置的,所以未启动前为 null。
SingleThreadEventExecutor先将要执行的工作增加至队列,上文提到的 register 工作也会增加至该队列,队列初始则是由上文提到的 NioEventLoop 结构器来实现的。
startThread 办法次要是对状态字段 state 作 CAS 查看并执行 doStartThread()
private void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {doStartThread();
success = true;
} finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
doStartThread 通过 executor 来执行真正负责轮询逻辑的 SingleThreadEventExecutor.this.run() 办法, 另外 executor 能够通过 NioEventLoopGroup 结构器指定, 默认应用 ThreadPerTaskExecutor 每一次执行工作创立一个新线程执行工作。
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {thread = Thread.currentThread();
// 查看是否中断
if (interrupted) {thread.interrupt();
}
boolean success = false;
// 更新工夫单位为纳秒
updateLastExecutionTime();
try {
// 执行具体的轮询工作,该办法为形象办法
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {logger.warn("Unexpected exception from an event executor:", t);
} finally {// 省略掉了 shutdown 后续确认逻辑,感兴趣的能够看一下源码}
}
});
}
SingleThreadEventExecutor.this.run()是形象办法,而在 netty 中
实现类默认为 NioEventLoop(NioEventLoop 由NioEventLoopGroup 确定,反对 epoll 的 linux 中用户也能够通过指定 EpollEventLoopGroup 来获取 EpollEventLoop,java nio 其实曾经反对 epoll 操作,不过相比 nio 来说 EpollEventLoop 性能更好些,因为采纳 ET 模式,同时更少的 gc, 因为执行 run 的大抵逻辑雷同,这里就基于罕用的 NioEventLoop 来阐明)
run 次要执行以下步骤:
- 查看以后是否有工作,如果有通过 supplier 非阻塞调用 select 获取事件个数 (selectNow 办法,即 select 超时工夫设置 0),否则返回SelectStrategy.SELECT 枚举示意进行阻塞 select
- 获取 scheduleTask 定时工作堆顶工作的 deadline 工夫,如果枚举为 SELECT 则先通过 deadline 计算 timeout 并 select 阻塞。
- 获取 ioRatio 参数,该值决定一次轮询解决工作的 预计最大工夫 = io 等待时间 *(100 – ratio)/ratio,假如 ratio 为 50 则解决工作最大耗时为 io 工夫雷同,默认设置为 50.
- 调用 processSelectedKeys 解决 selectKeys,通过 ioRatio 取得解决工作最大工夫并执行工作。
上面是源码附正文:
protected void run() {
// 计数器记录 select 次数
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {// 以后有工作通过 supplier 非阻塞调用 select 获取事件个数,否则返回 SelectStrategy.SELECT 枚举示意进行阻塞 select
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
// 获取 schedule 定时工作的 deadline
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE; // nothing on the calendar}
nextWakeupNanos.set(curDeadlineNanos);
try {if (!hasTasks()) {strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
//ioRatio 默认为 50
final int ioRatio = this.ioRatio;
boolean ranTasks;
//100 一次性执行全副工作
if (ioRatio == 100) {
try {if (strategy > 0) {processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();}
} else if (strategy > 0) {final long ioStartTime = System.nanoTime();
try {
// 解决 selectKey
processSelectedKeys();} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
// 依据 ioRatio 设置工作的解决工夫
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) {// Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + "raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Throwable t) {handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {if (isShuttingDown()) {closeAll();
if (confirmShutdown()) {return;}
}
} catch (Throwable t) {handleLoopException(t);
}
}
}
对于轮询逻辑还有两个比拟重要的点:
- 这里解决工作最大工夫是预估工夫,原理是串行解决工作时刷新并比对 deadline 工夫,可想而知如果工作是阻塞的就会重大影响 nioEventLoop 的性能,因而要求用户不要执行阻塞的工作。
- 仔细的可能留神到有一个计数变量 selectCnt 用来示意 select 次数,在开端处会调用 unexpectedSelectorWakeup 办法校验 selectCnt 数值是否超出阈值(默认 512), 如果超出阈值则调用 rebuildSelector 从新创立新的 selector,并把旧的 selector 已注册的 channel 重新加入到新的 selector 中.
private boolean unexpectedSelectorWakeup(int selectCnt) {
... 省略局部代码
// 查看 selectCnt 是否超出阈值,如果超出阈值则调用 rebuildSelector 从新创立新的 selector,并把旧的 selector 已注册的 channel 重新加入到新的 selector 中
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
return true;
}
return false;
}
为什么超出阈值就要从新结构 selector 呢,这来源于 jdk nio 一个有名的 epoll cpu 100% 的 bug,jdk-bug-6403933,该 bug 使 select 不能失常阻塞,因用户 nio 线程空转导致 cpu 飙升,所以 netty 应用计数的形式来检测该问题,并从新构建 selector。
worker、boss 的分工
BootStrap 初始化的时候会要求设置 worker 和 boss,例如 EchoServer 例子中的 bossGroup 和 workerGroup,个别介绍中都会说到 boss 负责 accpet,worker 负责 IO 事件如下图,上面来解析 netty 如何调度流转 worker 和 boss
首先 Java NIO server 须要指定 ServerSocketChannel 来实现 bind、accept 等操作,而 netty 则是对 NIO 的封装,那么 ServerSocketChannel 是在何时结构的呢,是在调用 initAndRegister 是,initAndRegister 时会调用 channelFactory.newChannel()
final ChannelFuture initAndRegister() {
Channel channel = null;
try {channel = channelFactory.newChannel();
init(channel);
}
....
}
channelFactory 须要指定 class 来实例化 channel,这个 channel 即便在结构 BootStrap 时申明的,如下实例申明的 NioServerSocketChannel.class
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
之后就执行 BossGroup 下的 EventLoop 轮询逻辑,到目前为止其实与 Worker 还没有关系,先看一下如果用户申明的是 ServerBootstrap,那么在调用 initAndRegester 时会调用 init(),init 又一个重要操作就是通过匿名函数将 ServerBootstrapAcceptor 退出到链表尾部。
void init(Channel channel) {
// 设置参数
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
// 获取 pipelone
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
// 将 handler 增加至尾部
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
ServerBootstrapAcceptor 是 InboundHandler 类型,会把 BossGroup 下的 Eventloop 监听到的 Channel 事件注册到 childGroup(即 worker)下,代码如下
public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child = (Channel) msg;
// 将 childHandler 退出链表
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
// 将 channel 注册至 childGroup,即 workerGroup 下
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
// 出现异常敞开 channel
forceClose(child, t);
}
}
到此就实现了 boss 和 worker 之间的流转,至于 channel 是如何流转,pipeline 原理等其余内容就放到 Channel 篇来讲述。