共计 19944 个字符,预计需要花费 50 分钟才能阅读完成。
Netty 服务端启动流程源码剖析
前记
哈喽,自从上篇《Netty 之旅二:口口相传的高性能 Netty 到底是什么?》后,迟迟两周才开启明天的 Netty
源码系列。源码剖析的第一篇文章,下一篇我会分享客户端的启动过程源码剖析。通过源码的浏览,咱们将会晓得,Netty
服务端启动的调用链是十分长的,同时必定也会发现一些新的问题,随着咱们源码浏览的不断深入,置信这些问题咱们也会一一攻破。
废话不多说,间接上号!
一、从 EchoServer 示例动手
示例从哪里来?任何开源框架都会有本人的示例代码,Netty 源码也不例外,如模块 netty-example
中就包含了最常见的 EchoServer
示例,上面通过这个示例进入服务端启动流程篇章。
public final class EchoServer {static final boolean SSL = System.getProperty("ssl") != null; | |
static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); | |
public static void main(String[] args) throws Exception { | |
// Configure SSL. | |
final SslContext sslCtx; | |
if (SSL) {SelfSignedCertificate ssc = new SelfSignedCertificate(); | |
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();} else {sslCtx = null;} | |
// 1. 申明 Main-Sub Reactor 模式线程池:EventLoopGroup | |
// Configure the server. | |
EventLoopGroup bossGroup = new NioEventLoopGroup(1); | |
EventLoopGroup workerGroup = new NioEventLoopGroup(); | |
// 创立 EchoServerHandler 对象 | |
final EchoServerHandler serverHandler = new EchoServerHandler(); | |
try { | |
// 2. 申明服务端启动疏导器, 并设置相干属性 | |
ServerBootstrap b = new ServerBootstrap(); | |
b.group(bossGroup, workerGroup) | |
.channel(NioServerSocketChannel.class) | |
.option(ChannelOption.SO_BACKLOG, 100) | |
.handler(new LoggingHandler(LogLevel.INFO)) | |
.childHandler(new ChannelInitializer<SocketChannel>() { | |
@Override | |
public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline(); | |
if (sslCtx != null) {p.addLast(sslCtx.newHandler(ch.alloc())); | |
} | |
//p.addLast(new LoggingHandler(LogLevel.INFO)); | |
p.addLast(serverHandler); | |
} | |
}); | |
// 3. 绑定端口即启动服务端,并同步期待 | |
// Start the server. | |
ChannelFuture f = b.bind(PORT).sync(); | |
// 4. 监听服务端敞开,并阻塞期待 | |
// Wait until the server socket is closed. | |
f.channel().closeFuture().sync();} finally { | |
// 5. 优雅地敞开两个 EventLoopGroup 线程池 | |
// Shut down all event loops to terminate all threads. | |
bossGroup.shutdownGracefully(); | |
workerGroup.shutdownGracefully();} | |
} | |
} |
- [代码行 18、19]申明
Main-Sub Reactor
模式线程池:EventLoopGroup
创立两个
EventLoopGroup
对象。其中,bossGroup
用于服务端承受客户端的连贯,workerGroup
用于进行客户端的SocketChannel
的数据读写。(<u> 对于
EventLoopGroup
不是本文重点所以在后续文章中进行剖析 </u>) - [代码行 23-39]申明服务端启动疏导器, 并设置相干属性
AbstractBootstrap
是一个帮忙类,通过办法链(method chaining
)的形式,提供了一个简略易用的形式来配置启动一个Channel
。io.netty.bootstrap.ServerBootstrap
,实现AbstractBootstrap
抽象类,用于Server
的启动器实现类。io.netty.bootstrap.Bootstrap
,实现AbstractBootstrap
抽象类,用于Client
的启动器实现类。如下类图所示:(<u> 在
EchoServer
示例代码中,咱们看到ServerBootstrap
的group
、channel
、option
、childHandler
等属性链式设置都放到对于AbstractBootstrap
体系代码中具体介绍。</u>) - [代码行 43]绑定端口即启动服务端,并同步期待
先调用
#bind(int port)
办法,绑定端口,后调用ChannelFuture#sync()
办法,阻塞期待胜利。对于bind
操作就是本文要具体介绍的 ” 服务端启动流程 ”。 - [代码行 47]监听服务端敞开,并阻塞期待
先调用
#closeFuture()
办法,监听服务器敞开,后调用ChannelFuture#sync()
办法,阻塞期待胜利。留神,此处不是敞开服务器,而是channel
的监听敞开。 - [代码行 51、52]优雅地敞开两个
EventLoopGroup
线程池finally
代码块中执行阐明服务端将最终敞开,所以调用EventLoopGroup#shutdownGracefully()
办法,别离敞开两个EventLoopGroup
对象,终止所有线程。
二、服务启动过程
在服务启动过程的源码剖析之前,这里回顾一下咱们在通过 JDK NIO
编程在服务端启动初始的代码:
serverSocketChannel = ServerSocketChannel.open(); | |
serverSocketChannel.configureBlocking(false); | |
serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024); | |
selector = Selector.open(); | |
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); |
这 5 行代码标示一个最为相熟的过程:
- 关上
serverSocketChannel
- 配置非阻塞模式
- 为
channel
的socket
绑定监听端口 - 创立
Selector
- 将
serverSocketChannel
注册到selector
前面等剖析完 Netty
的启动过程后,会对这些步骤有一个新的意识。在 EchoServer
示例中,进入 #bind(int port)
办法,AbstractBootstrap#bind()
其实有多个办法,不便不同地址参数的传递,理论调用的办法是AbstractBootstrap#doBind(final SocketAddress localAddress)
办法,代码如下:
private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister(); | |
final Channel channel = regFuture.channel(); | |
if (regFuture.cause() != null) {return regFuture;} | |
if (regFuture.isDone()) { | |
// At this point we know that the registration was complete and successful. | |
ChannelPromise promise = channel.newPromise(); | |
doBind0(regFuture, channel, localAddress, promise); | |
return promise; | |
} else { | |
// Registration future is almost always fulfilled already, but just in case it's not. | |
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); | |
regFuture.addListener(new ChannelFutureListener() { | |
@Override | |
public void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause(); | |
if (cause != null) { | |
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an | |
// IllegalStateException once we try to access the EventLoop of the Channel. | |
promise.setFailure(cause); | |
} else { | |
// Registration was successful, so set the correct executor to use. | |
// See https://github.com/netty/netty/issues/2586 | |
promise.registered(); | |
doBind0(regFuture, channel, localAddress, promise); | |
} | |
} | |
}); | |
return promise; | |
} | |
} |
- [代码行 2]:调用
#initAndRegister()
办法,初始化并注册一个Channel
对象。因为注册是异步的过程,所以返回一个ChannelFuture
对象。具体解析,见「initAndRegister()
」。 - [代码行 4 -6]]:若产生异样,间接进行返回。
-
[代码行 9 -34]:因为注册是异步的过程,有可能已实现,有可能未实现。所以实现代码分成了【第 10 至 14 行】和【第 15 至 36 行】别离解决已实现和未实现的状况。
- 外围在[第 11、29 行],调用
#doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise)
办法,绑定 Channel 的端口,并注册 Channel 到SelectionKey
中。 - 如果异步注册对应的
ChanelFuture
未实现,则调用ChannelFuture#addListener(ChannelFutureListener)
办法,增加监听器,在注册实现后,进行回调执行#doBind0(...)
办法的逻辑。
- 外围在[第 11、29 行],调用
通过 doBind
办法能够晓得服务端启动流程大抵如下几个步骤:
1. 创立 Channel
从 #doBind(final SocketAddress localAddress)
进入到initAndRegister()
:
final ChannelFuture initAndRegister() { | |
Channel channel = null; | |
try {channel = channelFactory.newChannel(); | |
init(channel); | |
} catch (Throwable t) {if (channel != null) {// channel can be null if newChannel crashed (eg SocketException("too many open files")) | |
channel.unsafe().closeForcibly(); | |
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor | |
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); | |
} | |
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor | |
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); | |
} | |
ChannelFuture regFuture = config().group().register(channel); | |
if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close(); | |
} else {channel.unsafe().closeForcibly();} | |
} | |
return regFuture; | |
} |
[代码行 4]调用 ChannelFactory#newChannel()
办法,创立 Channel
对象。ChannelFactory
类继承如下:
能够在 ChannelFactory
正文看到 @deprecated Use {@link io.netty.channel.ChannelFactory} instead.
,这里只是包名的调整,对于继承构造不变。netty
默认应用ReflectiveChannelFactory
,咱们能够看到重载办法:
@Override | |
public T newChannel() { | |
try {return constructor.newInstance(); | |
} catch (Throwable t) {throw new ChannelException("Unable to create Channel from class" + constructor.getDeclaringClass(), t); | |
} | |
} |
很显著,正如其名是通过反射机制结构 Channel
对象实例的。constructor
是在其构造方法初始化的:this.constructor = clazz.getConstructor();
这个 clazz
按理说应该是咱们要创立的 Channel
的 Class 对象。那 Class
对象是什么呢?咱们接着看 channelFactory
是怎么初始化的。
首先在 AbstractBootstrap
找到如下代码:
@Deprecated | |
public B channelFactory(ChannelFactory<? extends C> channelFactory) {ObjectUtil.checkNotNull(channelFactory, "channelFactory"); | |
if (this.channelFactory != null) {throw new IllegalStateException("channelFactory set already"); | |
} | |
this.channelFactory = channelFactory; | |
return self();} |
调用这个办法的递推向上看到:
public B channel(Class<? extends C> channelClass) { | |
return channelFactory(new ReflectiveChannelFactory<C>(ObjectUtil.checkNotNull(channelClass, "channelClass") | |
)); | |
} |
这个办法正是在 EchoServer
中ServerBootstrap
链式设置时调用 .channel(NioServerSocketChannel.class)
的办法。咱们看到,channelClass
就是 NioServerSocketChannel.class
,channelFactory
也是以 ReflectiveChannelFactory
作为具体实例,并且将 NioServerSocketChannel.class
作为结构参数传递初始化的,所以这答复了反射机制结构的是 io.netty.channel.socket.nio.NioServerSocketChannel
对象。
持续看 NioServerSocketChannel
构造方法逻辑做了什么事件,看之前先给出 NioServerSocketChannel
类继承关系:
NioServerSocketChannel
与 NioSocketChannel
别离对应服务端和客户端,公共父类都是 AbstractNioChannel
和AbstractChannel
,上面介绍创立过程能够参照这个 Channel
类继承图。进入 NioServerSocketChannel
构造方法:
/** | |
* Create a new instance | |
*/ | |
public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER)); | |
} |
点击 newSocket
进去:
private static ServerSocketChannel newSocket(SelectorProvider provider) { | |
try { | |
/** | |
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in | |
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise. | |
* | |
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>. | |
*/ | |
return provider.openServerSocketChannel();} catch (IOException e) { | |
throw new ChannelException("Failed to open a server socket.", e); | |
} | |
} |
以上传进来的 provider
是DEFAULT_SELECTOR_PROVIDER
即默认的 java.nio.channels.spi.SelectorProvider
,[代码行 9] 就是相熟的 jdk nio
创立 ServerSocketChannel
。这样newSocket(DEFAULT_SELECTOR_PROVIDER)
就返回了后果 ServerSocketChannel
,回到NioServerSocketChannel()#this()
点进去:
/** | |
* Create a new instance using the given {@link ServerSocketChannel}. | |
*/ | |
public NioServerSocketChannel(ServerSocketChannel channel) {super(null, channel, SelectionKey.OP_ACCEPT); | |
config = new NioServerSocketChannelConfig(this, javaChannel().socket()); | |
} |
以上 super
代表父类 AbstractNioMessageChannel
构造方法,点进去看到:
/** | |
* @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int) | |
*/ | |
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent, ch, readInterestOp); | |
} |
以上 super
代表父类 AbstractNioChannel
构造方法,点进去看到:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent); | |
this.ch = ch; | |
this.readInterestOp = readInterestOp; | |
try {ch.configureBlocking(false); | |
} catch (IOException e) { | |
try {ch.close(); | |
} catch (IOException e2) {if (logger.isWarnEnabled()) {logger.warn("Failed to close a partially initialized socket.", e2); | |
} | |
} | |
throw new ChannelException("Failed to enter non-blocking mode.", e); | |
} | |
} |
以上 [代码行 3] 将ServerSocketChannel
保留到了 AbstractNioChannel#ch
成员变量,在下面提到的 NioServerSocketChannel
构造方法的 [代码行 6]javaChannel()
拿到的就是 ch
保留的 ServerSocketChannel
变量。
以上 [代码行 6] 就是相熟的 jdk nio
编程设置 ServerSocketChannel
非阻塞形式。这里还有 super
父类构造方法,点击进去看到:
protected AbstractChannel(Channel parent) { | |
this.parent = parent; | |
id = newId(); | |
unsafe = newUnsafe(); | |
pipeline = newChannelPipeline();} |
以上构造方法中:
parent
属性,代表父Channel
对象。对于NioServerSocketChannel
的parent
为null
。id
属性,Channel
编号对象。在构造方法中,通过调用#newId()
办法进行创立。(<u> 这里不细开展 Problem-1</u>)unsafe
属性,Unsafe
对象。因为Channel
真正的具体操作,是通过调用对应的Unsafe
对象施行。所以须要在构造方法中,通过调用#newUnsafe()
办法进行创立。这里的Unsafe
并不是咱们常说的jdk
自带的sun.misc.Unsafe
,而是io.netty.channel.Channel#Unsafe
。(<u> 这里不细开展 Problem-2</u>)pipeline
属性默认是DefaultChannelPipeline
对象,赋值后在前面为 channel 绑定端口的时候会用到
通过以上创立 channel
源码过程剖析,总结的流程时序图如下:
2. 初始化 Channel
回到一开始创立 Channel
的initAndRegister()
入口办法,在创立 Channel
后紧接着 init(channel)
进入初始化流程,因为是服务端初始化,所以是ServerBootstrap#init(Channel channel)
,代码如下:
@Override | |
void init(Channel channel) throws Exception {final Map<ChannelOption<?>, Object> options = options0(); | |
synchronized (options) {setChannelOptions(channel, options, logger); | |
} | |
final Map<AttributeKey<?>, Object> attrs = attrs0(); | |
synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked") | |
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); | |
channel.attr(key).set(e.getValue()); | |
} | |
} | |
ChannelPipeline p = channel.pipeline(); | |
final EventLoopGroup currentChildGroup = childGroup; | |
final ChannelHandler currentChildHandler = childHandler; | |
final Entry<ChannelOption<?>, Object>[] currentChildOptions; | |
final Entry<AttributeKey<?>, Object>[] currentChildAttrs; | |
synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); | |
} | |
synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); | |
} | |
p.addLast(new ChannelInitializer<Channel>() { | |
@Override | |
public void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline(); | |
ChannelHandler handler = config.handler(); | |
if (handler != null) {pipeline.addLast(handler); | |
} | |
ch.eventLoop().execute(new Runnable() { | |
@Override | |
public void run() { | |
pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); | |
} | |
}); | |
} | |
}); | |
} |
- [代码 3 – 6 行]:
options0()
办法返回的options
保留了用户在EchoServer
中设置自定义的可选项汇合,这样ServerBootstrap
将配置的选项汇合,设置到了Channel
的可选项汇合中。 - [代码 8 – 15 行]:
attrs0()
办法返回的attrs
保留了用户在EchoServer
中设置自定义的属性汇合,这样ServerBootstrap
将配置的属性汇合,设置到了Channel
的属性汇合中。 - [代码 21-28 行]:通过局部变量
currentChildOptions
和currentChildAttrs
保留了用户自定义的childOptions
和childAttrs
,用于[代码 43 行]ServerBootstrapAcceptor
构造方法。 -
[代码 30-47]]:创立
ChannelInitializer
对象,增加到pipeline
中,用于后续初始化ChannelHandler
到pipeline
中, 包含用户在EchoServer
配置的LoggingHandler
和创立的创立ServerBootstrapAcceptor
对象。- [代码行 34-37]:增加启动器配置的
LoggingHandler
到pipeline
中。 - [代码行 39-45]:创立
ServerBootstrapAcceptor
对象,增加到pipeline
中。从名字上就可以看进去,ServerBootstrapAcceptor
也是一个ChannelHandler
实现类,专门用于承受客户端的新连贯申请,把新的申请扔给某个事件循环器,咱们先不做过多剖析。咱们发现是应用EventLoop.execute
执行增加的过程,这是为什么呢?同样记录问题(<u>Problem-</u>3) - 须要阐明的是
pipeline
在之前介绍 Netty 外围组件的时候提到是一个蕴含ChannelHandlerContext
的双向链表,每一个context
对于惟一一个ChannelHandler
,这里初始化后,ChannelPipeline
里就是如下一个构造:
- [代码行 34-37]:增加启动器配置的
3. 注册 Channel
初始化 Channel
一些根本配置和属性结束后,回到一开始创立 Channel
的initAndRegister()
入口办法,在初始化 Channel
后紧接着 [代码行 17] ChannelFuture regFuture = config().group().register(channel);
显著这里是通过 EventLoopGroup
进入注册流程 (EventLoopGroup
体系将在后续文章解说)
在 EchoServer
中启动器同样通过 ServerBootstrap#group()
设置了 NioEventLoopGroup
,它继承自MultithreadEventLoopGroup
,所以注册流程会进入MultithreadEventLoopGroup
重载的 register(Channel channel)
办法,代码如下:
@Override | |
public ChannelFuture register(Channel channel) {return next().register(channel); | |
} |
这里会调用 next()
办法抉择进去一个 EventLoop
来注册 Channel
,外面实际上应用的是一个叫做 EventExecutorChooser
的货色来抉择,它实际上又有两种实现形式 ——PowerOfTwoEventExecutorChooser
和 GenericEventExecutorChooser
,实质上就是从 EventExecutor
数组中抉择一个 EventExecutor
,咱们这里就是 NioEventLoop
,那么,它们有什么区别呢?(<u>Problem-4: 在介绍EventLoopGroup
体系的后续文章中将会具体解说,这里简略地提一下,实质都是按数组长度取余数,不过,2 的 N 次方的模式更高效。</u>)
接着,来到 NioEventLoop
的 register(channel)
办法,你会不会问找不到该办法?提醒NioEventLoop
继承SingleThreadEventLoop
,所以父类办法:
@Override | |
public ChannelFuture register(Channel channel) {return register(new DefaultChannelPromise(channel, this)); | |
} | |
@Override | |
public ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise"); | |
promise.channel().unsafe().register(this, promise); | |
return promise; | |
} |
能够看到,先创立了一个叫做 ChannelPromise
的货色,它是 ChannelFuture
的子类。[代码行 9] 又调回了 Channel
的 Unsafe
的 register ()
办法,这里第一个参数是 this
,也就是 NioEventLoop
,第二个参数是刚创立的 ChannelPromise
。
点击 AbstractUnsafe#register(EventLoop eventLoop, final ChannelPromise promise)
办法进去,代码如下:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {if (eventLoop == null) {throw new NullPointerException("eventLoop"); | |
} | |
if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already")); | |
return; | |
} | |
if (!isCompatible(eventLoop)) { | |
promise.setFailure(new IllegalStateException("incompatible event loop type:" + eventLoop.getClass().getName())); | |
return; | |
} | |
AbstractChannel.this.eventLoop = eventLoop; | |
if (eventLoop.inEventLoop()) {register0(promise); | |
} else { | |
try {eventLoop.execute(new Runnable() { | |
@Override | |
public void run() {register0(promise); | |
} | |
}); | |
} catch (Throwable t) { | |
logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", | |
AbstractChannel.this, t); | |
closeForcibly(); | |
closeFuture.setClosed(); | |
safeSetFailure(promise, t); | |
} | |
} | |
} |
[代码行 15]这行代码是设置 Channel
的 eventLoop
属性。这行后面的代码次要是在校验传入的 eventLoop
参数非空,校验是否有注册过以及校验 Channel
和 eventLoop
类型是否匹配。
[代码 18、24]接着,跟踪到 AbstractUnsafe#register0(ChannelPromise promise)
办法中:
private void register0(ChannelPromise promise) { | |
try { | |
// check if the channel is still open as it could be closed in the mean time when the register | |
// call was outside of the eventLoop | |
if (!promise.setUncancellable() || !ensureOpen(promise)) {return;} | |
boolean firstRegistration = neverRegistered; | |
doRegister(); | |
neverRegistered = false; | |
registered = true; | |
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the | |
// user may already fire events through the pipeline in the ChannelFutureListener. | |
pipeline.invokeHandlerAddedIfNeeded(); | |
safeSetSuccess(promise); | |
pipeline.fireChannelRegistered(); | |
// Only fire a channelActive if the channel has never been registered. This prevents firing | |
// multiple channel actives if the channel is deregistered and re-registered. | |
if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive(); | |
} else if (config().isAutoRead()) {// This channel was registered before and autoRead() is set. This means we need to begin read | |
// again so that we process inbound data. | |
// | |
// See https://github.com/netty/netty/issues/4805 | |
beginRead();} | |
} | |
} catch (Throwable t) { | |
// Close the channel directly to avoid FD leak. | |
closeForcibly(); | |
closeFuture.setClosed(); | |
safeSetFailure(promise, t); | |
} | |
} |
[代码行 9]进入 AbstractNioChannel#doRegister()
办法:
protected void doRegister() throws Exception { | |
boolean selected = false; | |
for (;;) { | |
try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); | |
return; | |
} catch (CancelledKeyException e) {if (!selected) { | |
// Force the Selector to select now as the "canceled" SelectionKey may still be | |
// cached and not removed because no Select.select(..) operation was called yet. | |
eventLoop().selectNow(); | |
selected = true; | |
} else { | |
// We forced a select operation on the selector before but the SelectionKey is still cached | |
// for whatever reason. JDK bug ? | |
throw e; | |
} | |
} | |
} | |
} |
[代码行 5]要害一行代码,将 Java 原生 NIO Selector
与 Java 原生 NIO
的 Channel
对象 (ServerSocketChannel
) 绑定在一起,并将以后 Netty 的 Channel
通过 attachment
的模式绑定到 SelectionKey
上:
- 调用
#unwrappedSelector()
办法,返回 Java 原生NIO Selector
对象,而且每个NioEventLoop
与Selector
唯一一对应。 - 调用
SelectableChannel#register(Selector sel, int ops, Object att)
办法,注册 Java 原生NIO
的Channel
对象到NIO Selector
对象上。
通过以上注册 channel 源码剖析,总结流程的时序图如下:
4. 绑定端口
注册完 Channel
最初回到 AbstractBootstrap#doBind()
办法,剖析 Channel
的端口绑定逻辑。进入doBind0
代码如下:
private static void doBind0( | |
final ChannelFuture regFuture, final Channel channel, | |
final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up | |
// the pipeline in its channelRegistered() implementation. | |
channel.eventLoop().execute(new Runnable() { | |
@Override | |
public void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); | |
} else {promise.setFailure(regFuture.cause()); | |
} | |
} | |
}); | |
} |
- [代码行 7]:在后面
Channel
注册胜利的条件下,调用EventLoop
执行Channel
的端口绑定逻辑。然而,实际上以后线程曾经是EventLoop
所在的线程了,为何还要这样操作呢?答案在【第 5 至 6 行】的英语正文,这里作为一个问题记着(<u>Problem-5</u>)。 - [代码行 11]:进入
AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)
,同样立刻异步返回并增加ChannelFutureListener.CLOSE_ON_FAILURE
监听事件。 - [代码行 13]:如果绑定端口之前的操作并没有胜利,天然也就不能进行端口绑定操作了,通过 promise 记录异样起因。
AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)
办法如下:
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise); | |
} |
pipeline
是之前创立 channel
的时候创立的DefaultChannelPipeline
,进入该办法:
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return tail.bind(localAddress, promise); | |
} |
[在剖析初始化流程的时候最初画一个 DefaultChannelPipeline
外部的构造,可能便于剖析前面进入 DefaultChannelPipeline
一系列 bind
办法。]
首先,tail
代表 TailContext
,进入AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)
办法:
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { | |
// 省略局部代码 | |
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND); | |
EventExecutor executor = next.executor(); | |
if (executor.inEventLoop()) {next.invokeBind(localAddress, promise); | |
} else {safeExecute(executor, new Runnable() { | |
@Override | |
public void run() {next.invokeBind(localAddress, promise); | |
} | |
}, promise, null); | |
} | |
return promise; | |
} |
[代码行 3]:findContextOutbound
办法里次要是执行 ctx = ctx.prev;
那么失去的 next
就是绑定 LoggingHandler
的context
[代码行 6]:进入 invokeBind(localAddress, promise)
办法并间接执行LoggingHandler#bind(this, localAddress, promise)
,进入后的办法如下:
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {if (logger.isEnabled(internalLevel)) {logger.log(internalLevel, format(ctx, "BIND", localAddress)); | |
} | |
ctx.bind(localAddress, promise); | |
} |
设置了 LoggingHandler
的日志根本级别为默认的 INFO 后,进行绑定操作的信息打印。接着,持续循环到 AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)
办法执行 ctx = ctx.prev
取出 HeadContext
进入到 bind 办法:
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {unsafe.bind(localAddress, promise); | |
} |
兜兜转转,最终跳出了 pipeline
轮回到AbstractUnsafe#bind(final SocketAddress localAddress, final ChannelPromise promise)
办法,Channel 的端口绑定逻辑。代码如下:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { | |
// 此处有省略... | |
boolean wasActive = isActive(); | |
try {doBind(localAddress); | |
} catch (Throwable t) {safeSetFailure(promise, t); | |
closeIfClosed(); | |
return; | |
} | |
// 此处有省略... | |
} |
做实事办法 doBind
进入后如下:
@Override | |
protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog()); | |
} else {javaChannel().socket().bind(localAddress, config.getBacklog()); | |
} | |
} |
到了此处,服务端的 Java 原生 NIO ServerSocketChannel
终于绑定上了端口。
三、问题演绎
- Problem-1: 创立
Channel
流程中AbstractChannel
构造函数中为channel
调配 ID 的算法如何实现? - Problem-2:
AbstractChannel
外部类AbstractUnsafe
的作用? - Problem-3: 初始化
channel
流程中pipeline
增加ServerBootstrapAcceptor
是通过EventLoop.execute
执行增加的过程,这是为什么呢? - Problem-4:注册
channel
流程中PowerOfTwoEventExecutorChooser
和GenericEventExecutorChooser
的区别和优化原理? - Problem-5:绑定端口流程中调用
EventLoop
执行Channel
的端口绑定逻辑。然而,实际上以后线程曾经是EventLoop
所在的线程了,为何还要这样操作呢?
小结
通过对 Netty 服务端启动流程源码剖析,咱们发现了在应用 NIO
的模式下,服务端启动流程其实就是封装了 JDK NIO
编程在服务端启动的流程。只不过对原生 JDK NIO
进行了加强和优化,同时从架构设计上简化了服务端流程的编写。
最重要的是感激彤哥、艿艿和俞超 - 闪电侠这些大佬后期的分享,可能让更多人学习源码的旅途少走很多弯路,谢谢!
欢送关注: