欢送关注公众号:bin的技术小屋,浏览公众号原文
本系列Netty源码解析文章基于 4.1.56.Final版本
大家第一眼看到这幅流程图,是不是脑瓜子嗡嗡的呢?
大家先不要惊恐,问题不大,本文笔者的目标就是要让大家清晰的了解这幅流程图,从而粗浅的了解Netty Reactor的启动全流程,包含其中波及到的各种代码设计实现细节。
在上篇文章《聊聊Netty那些事儿之Reactor在Netty中的实现(创立篇)》中咱们具体介绍了Netty服务端外围引擎组件主从Reactor组模型 NioEventLoopGroup
以及Reactor模型 NioEventLoop
的创立过程。最终咱们失去了netty Reactor模型的运行骨架如下:
当初Netty服务端程序的骨架是搭建好了,本文咱们就基于这个骨架来深刻分析下Netty服务端的启动过程。
咱们持续回到上篇文章提到的Netty服务端代码模板中,在创立完主从Reactor线程组:bossGroup
,workerGroup
后,接下来就开始配置Netty服务端的启动辅助类ServerBootstrap
了。
public final class EchoServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure the server. //创立主从Reactor线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup)//配置主从Reactor .channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型 .option(ChannelOption.SO_BACKLOG, 100)//设置主Reactor中channel的option选项 .handler(new LoggingHandler(LogLevel.INFO))//设置主Reactor中Channel->pipline->handler .childHandler(new ChannelInitializer<SocketChannel>() {//设置从Reactor中注册channel的pipeline @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); // Start the server. 绑定端口启动服务,开始监听accept事件 ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}
在上篇文章中咱们对代码模板中波及到ServerBootstrap
的一些配置办法做了简略的介绍,大家如果遗记的话,能够在返回去回顾一下。
ServerBootstrap类
其实没有什么特地的逻辑,次要是对Netty启动过程中须要用到的一些外围信息进行配置管理,比方:
- Netty的外围引擎组件
主从Reactor线程组: bossGroup,workerGroup
。通过ServerBootstrap#group办法
配置。 - Netty服务端应用到的Channel类型:
NioServerSocketChannel
,通过ServerBootstrap#channel办法
配置。
以及配置NioServerSocketChannel
时用到的SocketOption
。SocketOption
用于设置底层JDK NIO Socket的一些选项。通过ServerBootstrap#option办法
进行配置。
主ReactorGroup中的MainReactor治理的Channel类型为
NioServerSocketChannel
,如图所示次要用来监听端口,接管客户端连贯,为客户端创立初始化NioSocketChannel
,而后采纳round-robin
轮询的形式从图中从ReactorGroup中抉择一个SubReactor与该客户端NioSocketChannel
进行绑定。从ReactorGroup中的SubReactor治理的Channel类型为
NioSocketChannel
,它是netty中定义客户端连贯的一个模型,每个连贯对应一个。如图所示SubReactor负责监听解决绑定在其上的所有NioSocketChannel
上的IO事件。
- 保留服务端
NioServerSocketChannel
和客户端NioSocketChannel
对应pipeline
中指定的ChannelHandler
。用于后续Channel向Reactor注册胜利之后,初始化Channel里的pipeline。
不论是服务端用到的NioServerSocketChannel
还是客户端用到的NioSocketChannel
,每个Channel实例
都会有一个Pipeline
,Pipeline
中有多个ChannelHandler
用于编排解决对应Channel
上感兴趣的IO事件
。
ServerBootstrap
构造中蕴含了netty服务端程序启动的所有配置信息,在咱们介绍启动流程之前,先来看下ServerBootstrap
的源码构造:
ServerBootstrap
ServerBootstrap
的继承构造比较简单,继承档次的职责分工也比拟明确。
ServerBootstrap
次要负责对主从Reactor线程组
相干的配置进行治理,其中带child前缀的配置办法
是对从Reactor线程组
的相干配置管理。从Reactor线程组
中的Sub Reactor
负责管理的客户端NioSocketChannel
相干配置存储在ServerBootstrap
构造中。
父类AbstractBootstrap
则是次要负责对主Reactor线程组
相干的配置进行治理,以及主Reactor线程组
中的Main Reactor
负责解决的服务端ServerSocketChannel
相干的配置管理。
1. 配置主从Reactor线程组
ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup)//配置主从Reactor
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { //Main Reactor线程组 volatile EventLoopGroup group; //Sub Reactor线程组 private volatile EventLoopGroup childGroup; public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { //父类治理主Reactor线程组 super.group(parentGroup); if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup"); return this; }}
2. 配置服务端ServerSocketChannel
ServerBootstrap b = new ServerBootstrap();b.channel(NioServerSocketChannel.class);
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { //用于创立ServerSocketChannel ReflectiveChannelFactory private volatile ChannelFactory<? extends C> channelFactory; public B channel(Class<? extends C> channelClass) { return channelFactory(new ReflectiveChannelFactory<C>( ObjectUtil.checkNotNull(channelClass, "channelClass") )); } @Deprecated public B channelFactory(ChannelFactory<? extends C> channelFactory) { ObjectUtil.checkNotNull(channelFactory, "channelFactory"); if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return self(); }}
在向ServerBootstrap
配置服务端ServerSocketChannel
的channel
办法中,其实是创立了一个ChannelFactory
工厂实例ReflectiveChannelFactory
,在Netty服务端启动的过程中,会通过这个ChannelFactory
去创立相应的Channel
实例。
咱们能够通过这个办法来配置netty的IO模型,上面为ServerSocketChannel
在不同IO模型下的实现:
BIO | NIO | AIO |
---|---|---|
OioServerSocketChannel | NioServerSocketChannel | AioServerSocketChannel |
EventLoopGroup
Reactor线程组在不同IO模型下的实现:
BIO | NIO | AIO |
---|---|---|
ThreadPerChannelEventLoopGroup | NioEventLoopGroup | AioEventLoopGroup |
咱们只须要将IO模型
的这些外围接口对应的实现类前缀
改为对应IO模型
的前缀,就能够轻松在Netty中实现对IO模型
的切换。
2.1 ReflectiveChannelFactory
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> { //NioServerSocketChannelde 结构器 private final Constructor<? extends T> constructor; public ReflectiveChannelFactory(Class<? extends T> clazz) { ObjectUtil.checkNotNull(clazz, "clazz"); try { //反射获取NioServerSocketChannel的结构器 this.constructor = clazz.getConstructor(); } catch (NoSuchMethodException e) { throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) + " does not have a public non-arg constructor", e); } } @Override public T newChannel() { try { //创立NioServerSocketChannel实例 return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } }}
从类的签名咱们能够看出,这个工厂类是通过泛型
加反射
的形式来创立对应的Channel
实例。
- 泛型参数
T extends Channel
示意的是要通过工厂类创立的Channel类型
,这里咱们初始化的是NioServerSocketChannel
。 - 在
ReflectiveChannelFactory
的结构器中通过反射
的形式获取NioServerSocketChannel
的结构器。 - 在
newChannel
办法中通过结构器反射创立NioServerSocketChannel
实例。
留神这时只是配置阶段,NioServerSocketChannel
此时并未被创立。它是在启动的时候才会被创立进去。
3. 为NioServerSocketChannel配置ChannelOption
ServerBootstrap b = new ServerBootstrap();//设置被MainReactor治理的NioServerSocketChannel的Socket选项b.option(ChannelOption.SO_BACKLOG, 100)
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { //serverSocketChannel中的ChannelOption配置 private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>(); public <T> B option(ChannelOption<T> option, T value) { ObjectUtil.checkNotNull(option, "option"); synchronized (options) { if (value == null) { options.remove(option); } else { options.put(option, value); } } return self(); }}
无论是服务端的NioServerSocketChannel
还是客户端的NioSocketChannel
它们的相干底层Socket选项ChannelOption
配置全副寄存于一个Map
类型的数据结构中。
因为客户端NioSocketChannel
是由从Reactor线程组
中的Sub Reactor
来负责解决,所以波及到客户端NioSocketChannel
所有的办法和配置全副是以child
前缀结尾。
ServerBootstrap b = new ServerBootstrap();.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { //客户端SocketChannel对应的ChannelOption配置 private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>(); public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) { ObjectUtil.checkNotNull(childOption, "childOption"); synchronized (childOptions) { if (value == null) { childOptions.remove(childOption); } else { childOptions.put(childOption, value); } } return this; }}
相干的底层Socket选项,netty全副枚举在ChannelOption类中,笔者这里就不一一列举了,在本系列后续相干的文章中,笔者还会为大家具体的介绍这些参数的作用。
public class ChannelOption<T> extends AbstractConstant<ChannelOption<T>> { ..................省略.............. public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST"); public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE"); public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF"); public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF"); public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR"); public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER"); public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG"); public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT"); ..................省略..............}
4. 为服务端NioServerSocketChannel中的Pipeline配置ChannelHandler
//serverSocketChannel中pipeline里的handler(次要是acceptor) private volatile ChannelHandler handler; public B handler(ChannelHandler handler) { this.handler = ObjectUtil.checkNotNull(handler, "handler"); return self(); }
向NioServerSocketChannel
中的Pipeline
增加ChannelHandler
分为两种形式:
显式增加:
显式增加的形式是由用户在main线程中通过ServerBootstrap#handler
的形式增加。如果须要增加多个ChannelHandler
,则能够通过ChannelInitializer
向pipeline
中进行增加。
对于ChannelInitializer
前面笔者会有具体介绍,这里大家只须要晓得ChannelInitializer
是一种非凡的ChannelHandler
,用于初始化pipeline
。实用于向pipeline中增加多个ChannelHandler的场景。
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup)//配置主从Reactor .channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型 .handler(new ChannelInitializer<NioServerSocketChannel>() { @Override protected void initChannel(NioServerSocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(channelhandler1) .addLast(channelHandler2) ...... .addLast(channelHandler3); } })
隐式增加:
隐式增加次要增加的就是主ReactorGroup
的外围组件也就是下图中的acceptor
,Netty中的实现为ServerBootstrapAcceptor
,实质上也是一种ChannelHandler
,次要负责在客户端连贯建设好后,初始化客户端NioSocketChannel
,在从Reactor线程组中
选取一个Sub Reactor
,将客户端NioSocketChannel
注册到Sub Reactor
中的selector
上。
隐式增加ServerBootstrapAcceptor
是由Netty框架在启动的时候负责增加,用户无需关怀。
在本例中,NioServerSocketChannel
的PipeLine
中只有两个ChannelHandler
,一个由用户在内部显式增加的LoggingHandler
,另一个是由Netty框架隐式增加的ServerBootstrapAcceptor
。
其实咱们在理论我的项目应用的过程中,不会向netty服务端NioServerSocketChannel
增加额定的ChannelHandler,NioServerSocketChannel
只须要分心做好本人最重要的本职工作接管客户端连贯就好了。这里额定增加一个LoggingHandler
只是为了向大家展现ServerBootstrap
的配置办法。
5. 为客户端NioSocketChannel中的Pipeline配置ChannelHandler
final EchoServerHandler serverHandler = new EchoServerHandler(); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {//设置从Reactor中注册channel的pipeline @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } });
//socketChannel中pipeline中的解决handler private volatile ChannelHandler childHandler; public ServerBootstrap childHandler(ChannelHandler childHandler) { this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler"); return this; }
向客户端NioSocketChannel
中的Pipeline
里增加ChannelHandler
齐全是由用户本人管制显式增加,增加的数量不受限制。
因为在Netty的IO线程模型
中,是由单个Sub Reactor线程
负责执行客户端NioSocketChannel
中的Pipeline
,一个Sub Reactor线程
负责解决多个NioSocketChannel
上的IO事件
,如果Pipeline
中的ChannelHandler
增加的太多,就会影响Sub Reactor线程
执行其余NioSocketChannel
上的Pipeline
,从而升高IO解决效率
,升高吞吐量。
所以Pipeline
中的ChannelHandler
不易增加过多,并且不能再ChannelHandler
中执行耗时的业务解决工作。
在咱们通过ServerBootstrap
配置netty服务端启动信息的时候,无论是向服务端NioServerSocketChannel
的pipeline中增加ChannelHandler,还是向客户端NioSocketChannel
的pipeline中增加ChannelHandler,当波及到多个ChannelHandler增加的时候,咱们都会用到ChannelInitializer
,那么这个ChannelInitializer
到底是何方圣神,为什么要这样做呢?咱们接着往下看~~
ChannelInitializer
首先ChannelInitializer
它继承于ChannelHandler
,它本人自身就是一个ChannelHandler,所以它能够增加到childHandler
中。
其余的父类大家这里能够不必管,前面文章中笔者会一一为大家具体介绍。
那为什么不间接增加ChannelHandler
而是抉择用ChannelInitializer
呢?
这里次要有两点起因:
- 前边咱们提到,客户端
NioSocketChannel
是在服务端accept连贯后,在服务端NioServerSocketChannel
中被创立进去的。然而此时咱们正处于配置ServerBootStrap
阶段,服务端还没有启动,更没有客户端连贯上来,此时客户端NioSocketChannel
还没有被创立进去,所以也就没方法向客户端NioSocketChannel
的pipeline中增加ChannelHandler
。 - 客户端
NioSocketChannel
中Pipeline
里能够增加任意多个ChannelHandler
,然而Netty框架无奈预知用户到底须要增加多少个ChannelHandler
,所以Netty框架提供了回调函数ChannelInitializer#initChannel
,使用户能够自定义ChannelHandler
的增加行为。
当客户端NioSocketChannel
注册到对应的Sub Reactor
上后,紧接着就会初始化NioSocketChannel
中的Pipeline
,此时Netty框架会回调ChannelInitializer#initChannel
执行用户自定义的增加逻辑。
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter { @Override @SuppressWarnings("unchecked") public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { //当channelRegister事件产生时,调用initChannel初始化pipeline if (initChannel(ctx)) { .................省略............... } else { .................省略............... } } private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { //此时客户单NioSocketChannel曾经创立并初始化好了 initChannel((C) ctx.channel()); } catch (Throwable cause) { .................省略............... } finally { .................省略............... } return true; } return false; } protected abstract void initChannel(C ch) throws Exception; .................省略...............}
这里由netty框架回调的ChannelInitializer#initChannel办法
正是咱们自定义的增加逻辑。
final EchoServerHandler serverHandler = new EchoServerHandler(); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {//设置从Reactor中注册channel的pipeline @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } });
到此为止,Netty服务端启动所须要的必要配置信息,曾经全副存入ServerBootStrap
启动辅助类中。
接下来要做的事件就是服务端的启动了。
// Start the server. 绑定端口启动服务,开始监听accept事件ChannelFuture f = serverBootStrap.bind(PORT).sync();
Netty服务端的启动
通过后面的铺垫终于来到了本文的核心内容----Netty服务端的启动过程。
如代码模板中的示例所示,Netty服务端的启动过程封装在io.netty.bootstrap.AbstractBootstrap#bind(int)
函数中。
接下来咱们看一下Netty服务端在启动过程中到底干了哪些事件?
大家看到这副启动流程图先不要慌,接下来的内容笔者会带大家各个击破它,在文章的最初保障让大家看懂这副流程图。
咱们先来从netty服务端启动的入口函数开始咱们明天的源码解析旅程:
public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); } public ChannelFuture bind(SocketAddress localAddress) { //校验Netty外围组件是否配置齐全 validate(); //服务端开始启动,绑定端口地址,接管客户端连贯 return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress")); } private ChannelFuture doBind(final SocketAddress localAddress) { //异步创立,初始化,注册ServerSocketChannel到main reactor上 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { ........serverSocketChannel向Main Reactor注册胜利后开始绑定端口...., } else { //如果此时注册操作没有实现,则向regFuture增加operationComplete回调函数,注册胜利后回调。 regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { ........serverSocketChannel向Main Reactor注册胜利后开始绑定端口...., }); return promise; } }
Netty服务端的启动流程总体如下:
- 创立服务端
NioServerSocketChannel
并初始化。 - 将服务端
NioServerSocketChannel
注册到主Reactor线程组
中。 - 注册胜利后,开始初始化
NioServerSocketChannel
中的pipeline,而后在pipeline中触发channelRegister事件。 - 随后由
NioServerSocketChannel
绑定端口地址。 - 绑定端口地址胜利后,向
NioServerSocketChannel
对应的Pipeline
中触发流传ChannelActive事件
,在ChannelActive事件回调
中向Main Reactor
注册OP_ACCEPT事件
,开始期待客户端连贯。服务端启动实现。
当netty服务端启动胜利之后,最终咱们会失去如下构造的阵型,开始严阵以待,筹备接管客户端的连贯,Reactor开始运转。
接下来,咱们就来看下Netty源码是如何实现以上步骤的~~
1. initAndRegister
final ChannelFuture initAndRegister() { Channel channel = null; try { //创立NioServerSocketChannel //ReflectiveChannelFactory通过泛型,反射,工厂的形式灵便创立不同类型的channel channel = channelFactory.newChannel(); //初始化NioServerSocketChannel init(channel); } catch (Throwable t) { ..............省略................. } //向MainReactor注册ServerSocketChannel ChannelFuture regFuture = config().group().register(channel); ..............省略................. return regFuture; }
从函数命名中咱们能够看出,这个函数次要做的事件就是首先创立NioServerSocketChannel
,并对NioServerSocketChannel
进行初始化,最初将NioServerSocketChannel
注册到Main Reactor
中。
1.1 创立NioServerSocketChannel
还记得咱们在介绍ServerBootstrap
启动辅助类配置服务端ServerSocketChannel
类型的时候提到的工厂类ReflectiveChannelFactory
吗?
因为过后咱们在配置ServerBootstrap
启动辅助类的时候,还没到启动阶段,而配置阶段并不是创立具体ServerSocketChannel
的机会。
所以Netty通过工厂模式
将要创立的ServerSocketChannel
的类型(通过泛型指定)以及 创立的过程(封装在newChannel函数中
)通通先封装在工厂类ReflectiveChannelFactory
中。
ReflectiveChannelFactory
通过泛型
,反射
,工厂
的形式灵便
创立不同类型的channel
期待创立机会降临,咱们调用保留在ServerBootstrap
中的channelFactory
间接进行创立。
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> { private final Constructor<? extends T> constructor; @Override public T newChannel() { try { return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } }}
上面咱们来看下NioServerSocketChannel
的构建过程:
1.1.1 NioServerSocketChannel
public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel { //SelectorProvider(用于创立Selector和Selectable Channels) private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } //创立JDK NIO ServerSocketChannel private static ServerSocketChannel newSocket(SelectorProvider provider) { try { return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } } //ServerSocketChannel相干的配置 private final ServerSocketChannelConfig config; public NioServerSocketChannel(ServerSocketChannel channel) { //父类AbstractNioChannel中保留JDK NIO原生ServerSocketChannel以及要监听的事件OP_ACCEPT super(null, channel, SelectionKey.OP_ACCEPT); //DefaultChannelConfig中设置用于Channel接收数据用的buffer->AdaptiveRecvByteBufAllocator config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }}
- 首先调用
newSocket
创立JDK NIO 原生ServerSocketChannel
,这里调用了SelectorProvider#openServerSocketChannel
来创立JDK NIO 原生ServerSocketChannel
,咱们在上篇文章《聊聊Netty那些事儿之Reactor在Netty中的实现(创立篇)》中具体的介绍了SelectorProvider
相干内容,过后是用SelectorProvider
来创立Reactor
中的Selector
。大家还记得吗?? - 通过父类结构器设置
NioServerSocketChannel
感兴趣的IO事件
,这里设置的是SelectionKey.OP_ACCEPT
事件。并将JDK NIO 原生ServerSocketChannel
封装起来。 - 创立
Channel
的配置类NioServerSocketChannelConfig
,在配置类中封装了对Channel底层
的一些配置行为,以及JDK中的ServerSocket
。以及创立NioServerSocketChannel
接收数据用的Buffer
分配器AdaptiveRecvByteBufAllocator
。
NioServerSocketChannelConfig
没什么重要的货色,咱们这里也不用深究,它就是治理NioServerSocketChannel
相干的配置,这里惟一须要大家留神的是这个用于Channel
接收数据用的Buffer分配器
AdaptiveRecvByteBufAllocator,咱们前面在介绍Netty如何接管连贯的时候还会提到。
NioServerSocketChannel
的整体构建过程介绍完了,当初咱们来依照继承档次再回过头来看下NioServerSocketChannel
的档次构建,来看下每一层都创立了什么,封装了什么,这些信息都是Channel
的外围信息,所以有必要理解一下。
在NioServerSocketChannel
的创立过程中,咱们次要关注继承结构图中红框标注的三个类,其余的咱们占时先不必管。
其中AbstractNioMessageChannel类
次要是对NioServerSocketChannel
底层读写行为的封装和定义,比方accept接管客户端连贯。这个咱们后续会介绍到,这里咱们并不开展。
1.1.2 AbstractNioChannel
public abstract class AbstractNioChannel extends AbstractChannel { //JDK NIO原生Selectable Channel private final SelectableChannel ch; // Channel监听事件汇合 这里是SelectionKey.OP_ACCEPT事件 protected final int readInterestOp; protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { //设置Channel为非阻塞 配合IO多路复用模型 ch.configureBlocking(false); } catch (IOException e) { .............省略................ } }}
- 封装由
SelectorProvider
创立进去的JDK NIO原生ServerSocketChannel
。 - 封装
Channel
在创立时指定感兴趣的IO事件
,对于NioServerSocketChannel
来说感兴趣的IO事件
为OP_ACCEPT事件
。 - 设置JDK NIO原生
ServerSocketChannel
为非阻塞模式, 配合IO多路复用模型。
1.1.3 AbstractChannel
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { //channel是由创立档次的,比方ServerSocketChannel 是 SocketChannel的 parent private final Channel parent; //channel全局惟一ID machineId+processId+sequence+timestamp+random private final ChannelId id; //unsafe用于封装对底层socket的相干操作 private final Unsafe unsafe; //为channel调配独立的pipeline用于IO事件编排 private final DefaultChannelPipeline pipeline; protected AbstractChannel(Channel parent) { this.parent = parent; //channel全局惟一ID machineId+processId+sequence+timestamp+random id = newId(); //unsafe用于定义实现对Channel的底层操作 unsafe = newUnsafe(); //为channel调配独立的pipeline用于IO事件编排 pipeline = newChannelPipeline(); }}
- Netty中的
Channel创立
是有档次的,这里的parent属性
用来保留上一级的Channel
,比方这里的NioServerSocketChannel
是顶级Channel
,所以它的parent = null
。客户端NioSocketChannel
是由NioServerSocketChannel
创立的,所以它的parent = NioServerSocketChannel
。 - 为
Channel
调配全局惟一的ChannelId
。ChannelId
由机器Id(machineId
),过程Id(processId
),序列号(sequence
),工夫戳(timestamp
),随机数(random
)形成
private DefaultChannelId() { data = new byte[MACHINE_ID.length + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN]; int i = 0; // machineId System.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID.length); i += MACHINE_ID.length; // processId i = writeInt(i, PROCESS_ID); // sequence i = writeInt(i, nextSequence.getAndIncrement()); // timestamp (kind of) i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis()); // random int random = PlatformDependent.threadLocalRandom().nextInt(); i = writeInt(i, random); assert i == data.length; hashCode = Arrays.hashCode(data); }
- 创立
NioServerSocketChannel
的底层操作类Unsafe
。这里创立的是io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe
。
Unsafe
为Channel接口
的一个外部接口,用于定义实现对Channel底层的各种操作,Unsafe接口
定义的操作行为只能由Netty框架的Reactor线程
调用,用户线程禁止调用。
interface Unsafe { //调配接收数据用的Buffer RecvByteBufAllocator.Handle recvBufAllocHandle(); //服务端绑定的端口地址 SocketAddress localAddress(); //远端地址 SocketAddress remoteAddress(); //channel向Reactor注册 void register(EventLoop eventLoop, ChannelPromise promise); //服务端绑定端口地址 void bind(SocketAddress localAddress, ChannelPromise promise); //客户端连贯服务端 void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); //敞开channle void close(ChannelPromise promise); //读数据 void beginRead(); //写数据 void write(Object msg, ChannelPromise promise); }
- 为
NioServerSocketChannel
调配独立的pipeline
用于IO事件编排。pipeline
其实是一个ChannelHandlerContext
类型的双向链表。头结点HeadContext
,尾结点TailContext
。ChannelHandlerContext
中包装着ChannelHandler
。
ChannelHandlerContext
保留 ChannelHandler上下文信息,用于事件流传。前面笔者会独自开一篇文章介绍,这里咱们还是聚焦于启动主线。
这里只是为了让大家简略了解pipeline
的一个大抵的构造,前面会写一篇文章专门具体解说pipeline
。
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
到了这里NioServerSocketChannel
就创立结束了,咱们来回顾下它到底蕴含了哪些外围信息。
1.2 初始化NioServerSocketChannel
void init(Channel channel) { //向NioServerSocketChannelConfig设置ServerSocketChannelOption setChannelOptions(channel, newOptionsArray(), logger); //向netty自定义的NioServerSocketChannel设置attributes setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY)); ChannelPipeline p = channel.pipeline(); //获取从Reactor线程组 final EventLoopGroup currentChildGroup = childGroup; //获取用于初始化客户端NioSocketChannel的ChannelInitializer final ChannelHandler currentChildHandler = childHandler; //获取用户配置的客户端SocketChannel的channelOption以及attributes final Entry<ChannelOption<?>, Object>[] currentChildOptions; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY); } final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY); //向NioServerSocketChannel中的pipeline增加初始化ChannelHandler的逻辑 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); //ServerBootstrap中用户指定的channelHandler ChannelHandler handler = config.handler(); if (handler != null) { //LoggingHandler pipeline.addLast(handler); } //增加用于接管客户端连贯的acceptor ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
- 向
NioServerSocketChannelConfig
设置ServerSocketChannelOption
。 - 向netty自定义的
NioServerSocketChannel
设置ChannelAttributes
Netty自定义的SocketChannel
类型均继承AttributeMap
接口以及DefaultAttributeMap
类,正是它们定义了ChannelAttributes
。用于向Channel
增加用户自定义的一些信息。
这个ChannelAttributes
的用途大有可为,Netty后边的许多个性都是依附这个ChannelAttributes
来实现的。这里先卖个关子,大家能够本人先想一下能够用这个ChannelAttributes
做哪些事件?
- 获取从Reactor线程组
childGroup
,以及用于初始化客户端NioSocketChannel
的ChannelInitializer
,ChannelOption
,ChannelAttributes
,这些信息均是由用户在启动的时候向ServerBootstrap
增加的客户端NioServerChannel
配置信息。这里用这些信息来初始化ServerBootstrapAcceptor
。因为后续会在ServerBootstrapAcceptor
中接管客户端连贯以及创立NioServerChannel
。 - 向
NioServerSocketChannel
中的pipeline
增加用于初始化pipeline
的ChannelInitializer
。
问题来了,这里为什么不罗唆间接将ChannelHandler
增加到pipeline
中,而是又应用到了ChannelInitializer
呢?
其实起因有两点:
- 为了保障
线程平安
地初始化pipeline
,所以初始化的动作须要由Reactor线程
进行,而以后线程是用户程序
的启动Main线程
并不是
Reactor线程。这里不能立刻初始化。 初始化
Channel
中pipeline
的动作,须要等到Channel
注册到对应的Reactor
中才能够进行初始化,以后只是创立好了NioServerSocketChannel
,但并未注册到Main Reactor
上。初始化
NioServerSocketChannel
中pipeline
的机会是:当NioServerSocketChannel
注册到Main Reactor
之后,绑定端口地址之前。
前边在介绍ServerBootstrap
配置childHandler
时也用到了ChannelInitializer
,还记得吗??
问题又来了,大家留神下ChannelInitializer#initChannel
办法,在该初始化回调办法中,增加LoggingHandler是间接向pipeline中增加,而增加Acceptor为什么不是间接增加而是封装成异步工作呢?
这里先给大家卖个关子,笔者会在后续流程中为大家解答~
此时NioServerSocketChannel
中的pipeline
构造如下图所示:
1.3 向Main Reactor注册NioServerSocketChannel
从ServerBootstrap
获取主Reactor线程组NioEventLoopGroup
,将NioServerSocketChannel
注册到NioEventLoopGroup
中。
ChannelFuture regFuture = config().group().register(channel);
上面咱们来看下具体的注册过程:
1.3.1 主Reactor线程组中选取一个Main Reactor进行注册
@Override public ChannelFuture register(Channel channel) { return next().register(channel); } @Override public EventExecutor next() { return chooser.next(); } //获取绑定策略 @Override public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } } //采纳轮询round-robin的形式抉择Reactor @Override public EventExecutor next() { return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)]; }
Netty通过next()
办法依据上篇文章《聊聊Netty那些事儿之Reactor在Netty中的实现(创立篇)》提到的channel到reactor的绑定策略
,从ReactorGroup
中选取一个Reactor进行注册绑定。之后Channel
生命周期内的所有 IO 事件
都由这个 Reactor
负责解决,如 accept、connect、read、write
等 IO 事件。
一个channel
只能绑定到一个Reactor
上,一个Reactor
负责监听多个channel
。
因为这里是NioServerSocketChannle
向Main Reactor
进行注册绑定,所以Main Reactor
次要负责解决的IO事件
是OP_ACCEPT
事件。
1.3.2 向绑定后的Main Reactor进行注册
向Reactor
进行注册的行为定义在NioEventLoop
的父类SingleThreadEventLoop
中,印象含糊的同学能够在回看下上篇文章中的NioEventLoop继承构造
大节内容。
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { @Override public ChannelFuture register(Channel channel) { //注册channel到绑定的Reactor上 return register(new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); //unsafe负责channel底层的各种操作 promise.channel().unsafe().register(this, promise); return promise; }}
通过NioServerSocketChannel
中的Unsafe类
执行底层具体的注册动作。
protected abstract class AbstractUnsafe implements Unsafe { /** * 注册Channel到绑定的Reactor上 * */ @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ObjectUtil.checkNotNull(eventLoop, "eventLoop"); if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } //EventLoop的类型要与Channel的类型一样 Nio Oio Aio if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } //在channel上设置绑定的Reactor AbstractChannel.this.eventLoop = eventLoop; /** * 执行channel注册的操作必须是Reactor线程来实现 * * 1: 如果以后执行线程是Reactor线程,则间接执行register0进行注册 * 2:如果以后执行线程是内部线程,则须要将register0注册操作 封装程异步Task 由Reactor线程执行 * */ if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { ...............省略............... } } }}
- 首先查看
NioServerSocketChannel
是否曾经实现注册。如果以实现注册,则间接设置代表注册操作后果的ChannelPromise
为fail状态
。 - 通过
isCompatible
办法验证Reactor模型EventLoop
是否与Channel
的类型匹配。NioEventLoop
对应于NioServerSocketChannel
。
上篇文章咱们介绍过 Netty对三种IO模型
:Oio,Nio,Aio
的反对,用户能够通过扭转Netty外围类的前缀轻松切换IO模型
。isCompatible
办法目标就是须要保障Reactor
和Channel
应用的是同一种IO模型
。
- 在
Channel
中保留其绑定的Reactor实例
。 执行
Channel
向Reactor
注册的动作必须要确保是在Reactor线程
中执行。- 如果以后线程是
Reactor线程
则间接执行注册动作register0
- 如果以后线程不是
Reactor线程
,则须要将注册动作register0
封装成异步工作,寄存在Reactor
中的taskQueue
中,期待Reactor线程
执行。
- 如果以后线程是
以后执行线程并不是Reactor线程
,而是用户程序的启动线程Main线程
。
1.3.3 Reactor线程的启动
上篇文章中咱们在介绍NioEventLoopGroup
的创立过程中提到了一个结构器参数executor
,它用于启动Reactor线程
,类型为ThreadPerTaskExecutor
。
过后笔者向大家卖了一个关子~~“Reactor线程是何时启动的?”
那么当初就到了为大家揭晓谜底的时候了~~
Reactor线程
的启动是在向Reactor
提交第一个异步工作的时候启动的。
Netty中的主Reactor线程组NioEventLoopGroup
中的Main ReactorNioEventLoop
是在用户程序Main线程
向Main Reactor
提交用于注册NioServerSocketChannel
的异步工作时开始启动。
eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } });
接下来咱们关注下NioEventLoop
的execute办法
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { @Override public void execute(Runnable task) { ObjectUtil.checkNotNull(task, "task"); execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task)); } private void execute(Runnable task, boolean immediate) { //以后线程是否为Reactor线程 boolean inEventLoop = inEventLoop(); //addTaskWakesUp = true addTask唤醒Reactor线程执行工作 addTask(task); if (!inEventLoop) { //如果以后线程不是Reactor线程,则启动Reactor线程 //这里能够看出Reactor线程的启动是通过 向NioEventLoop增加异步工作时启动的 startThread(); .....................省略..................... } .....................省略..................... }}
- 首先将异步工作
task
增加到Reactor
中的taskQueue
中。 - 判断以后线程是否为
Reactor线程
,此时以后执行线程为用户程序启动线程,所以这里调用startThread
启动Reactor线程
。
1.3.4 startThread
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { //定义Reactor线程状态 private static final int ST_NOT_STARTED = 1; private static final int ST_STARTED = 2; private static final int ST_SHUTTING_DOWN = 3; private static final int ST_SHUTDOWN = 4; private static final int ST_TERMINATED = 5; //Reactor线程状态 初始为 未启动状态 private volatile int state = ST_NOT_STARTED; //Reactor线程状态字段state 原子更新器 private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state"); private void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try { doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); } } } } }}
Reactor线程
初始化状态为ST_NOT_STARTED
,首先CAS
更新状态为ST_STARTED
doStartThread
启动Reactor线程
- 启动失败的话,须要将
Reactor线程
状态改回ST_NOT_STARTED
//ThreadPerTaskExecutor 用于启动Reactor线程 private final Executor executor; private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); } boolean success = false; updateLastExecutionTime(); try { //Reactor线程开始启动 SingleThreadEventExecutor.this.run(); success = true; } ................省略.............. }
这里就来到了ThreadPerTaskExecutor
类型的executor
的用武之地了。
Reactor线程
的外围工作之前介绍过:轮询所有注册其上的Channel中的IO就绪事件
,解决对应Channel上的IO事件
,执行异步工作
。Netty将这些外围工作封装在io.netty.channel.nio.NioEventLoop#run
办法中。
- 将
NioEventLoop#run
封装在异步工作中,提交给executor
执行,Reactor
线程至此开始工作了就。
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; @Override public void execute(Runnable command) { //启动Reactor线程 threadFactory.newThread(command).start(); }}
此时Reactor线程
曾经启动,前面的工作全副都由这个Reactor线程
来负责执行了。
而用户启动线程在向Reactor
提交完NioServerSocketChannel
的注册工作register0
后,就逐渐退出调用堆栈,回退到最开始的启动入口处ChannelFuture f = b.bind(PORT).sync()
。
此时Reactor
中的工作队列中只有一个工作register0
,Reactor线程
启动后,会从工作队列中取出工作执行。
至此NioServerSocketChannel
的注册工作正式拉开帷幕~~
1.3.5 register0
//true if the channel has never been registered, false otherwise private boolean neverRegistered = true; private void register0(ChannelPromise promise) { try { //查看注册操作是否曾经勾销,或者对应channel曾经敞开 if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; //执行真正的注册操作 doRegister(); //批改注册状态 neverRegistered = false; registered = true; //回调pipeline中增加的ChannelInitializer的handlerAdded办法,在这里初始化channelPipeline pipeline.invokeHandlerAddedIfNeeded(); //设置regFuture为success,触发operationComplete回调,将bind操作放入Reactor的工作队列中,期待Reactor线程执行。 safeSetSuccess(promise); //触发channelRegister事件 pipeline.fireChannelRegistered(); //对于服务端ServerSocketChannel来说 只有绑定端口地址胜利后 channel的状态才是active的。 //此时绑定操作作为异步工作在Reactor的工作队列中,绑定操作还没开始,所以这里的isActive()是false if (isActive()) { if (firstRegistration) { //触发channelActive事件 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { ............省略............. } }
register0
是驱动整个Channel
注册绑定流程的要害办法,上面咱们来看下它的外围逻辑:
- 首先须要查看
Channel
的注册动作是否在Reactor线程
外被勾销了曾经!promise.setUncancellable()
。查看要注册的Channel
是否曾经敞开!ensureOpen(promise)
。如果Channel
曾经敞开或者注册操作曾经被勾销,那么就间接返回,进行注册流程。 - 调用
doRegister()
办法,执行真正的注册操作。最终实现在AbstractChannel
的子类AbstractNioChannel
中,这个咱们一会在介绍,先关注整体流程。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { /** * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process. * * Sub-classes may override this method */ protected void doRegister() throws Exception { // NOOP }}
- 当
Channel
向Reactor
注册结束后,调用pipeline.invokeHandlerAddedIfNeeded()
办法,触发回调pipeline中增加的ChannelInitializer的handlerAdded办法,在handlerAdded办法中利用后面提到的ChannelInitializer
初始化ChannelPipeline
。
初始化ChannelPipeline
的机会是当Channel
向对应的Reactor
注册胜利后,在handlerAdded事件回调
中利用ChannelInitializer
进行初始化。
- 设置
regFuture
为Success
,并回调注册在regFuture
上的ChannelFutureListener#operationComplete
办法,在operationComplete
回调办法中将绑定操作
封装成异步工作,提交到Reactor
的taskQueue
中。期待Reactor
的执行。
还记得这个regFuture
在哪里呈现的吗?它是在哪里被创立,又是在哪里增加的ChannelFutureListener
呢? 大家还有印象吗?回顾不起来也没关系,笔者前面还会提到
- 通过
pipeline.fireChannelRegistered()
在pipeline
中触发channelRegister事件
。
pipeline
中channelHandler
的channelRegistered办法
被回调。
- 对于Netty服务端
NioServerSocketChannel
来说, 只有绑定端口地址胜利
后 channel的状态才是active
的。此时绑定操作
在regFuture
上注册的ChannelFutureListener#operationComplete
回调办法中被作为异步工作提交到了Reactor
的工作队列中,Reactor线程
还没开始
执行绑定工作
。所以这里的isActive()
是false
。
当Reactor线程
执行完register0办法
后,才会去执行绑定工作
。
上面咱们来看下register0
办法中这些外围步骤
的具体实现:
1.3.6 doRegister()
public abstract class AbstractNioChannel extends AbstractChannel { //channel注册到Selector后取得的SelectKey volatile SelectionKey selectionKey; @Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { ...............省略.................... } } }}
调用底层JDK NIO Channel
办法java.nio.channels.SelectableChannel#register(java.nio.channels.Selector, int, java.lang.Object)
,将NettyNioServerSocketChannel
中包装的JDK NIO ServerSocketChannel
注册到Reactor
中的JDK NIO Selector
上。
简略介绍下SelectableChannel#register
办法参数的含意:
Selector:
示意JDK NIO Channel
将要向哪个Selector
进行注册。int ops:
示意Channel
上感兴趣的IO事件
,当对应的IO事件就绪
时,Selector
会返回Channel
对应的SelectionKey
。
SelectionKey
能够了解为Channel
在Selector
上的非凡示意模式,SelectionKey
中封装了Channel
感兴趣的IO事件汇合~~~interestOps
,以及IO就绪的事件汇合~~readyOps
, 同时也封装了对应的JDK NIO Channel
以及注册的Selector
。最初还有一个重要的属性attachment
,能够容许咱们在SelectionKey
上附加一些自定义的对象。
Object attachment:
向SelectionKey
中增加用户自定义的附加对象。
这里
NioServerSocketChannel
向Reactor
中的Selector
注册的IO事件
为0
,这个操作的次要目标是先获取到Channel
在Selector
中对应的SelectionKey
,实现注册。当绑定操作实现后,在去向SelectionKey
增加感兴趣的IO事件
~~~OP_ACCEPT事件
。同时通过
SelectableChannel#register
办法将Netty自定义的NioServerSocketChannel
(这里的this
指针)附着在SelectionKey
的attechment
属性上,实现Netty自定义Channel
与JDK NIOChannel
的关系绑定。这样在每次对Selector
进行IO就绪事件
轮询时,Netty 都能够从JDK NIO Selector
返回的SelectionKey
中获取到自定义的Channel
对象(这里指的就是NioServerSocketChannel
)。
1.3.7 HandlerAdded事件回调中初始化ChannelPipeline
当NioServerSocketChannel
注册到Main Reactor
上的Selector
后,Netty通过调用pipeline.invokeHandlerAddedIfNeeded()
开始回调NioServerSocketChannel
中pipeline
里的ChannelHandler的handlerAdded办法
。
此时NioServerSocketChannel
的pipeline
构造如下:
此时pipeline
中只有在初始化NioServerSocketChannel
时增加的ChannelInitializer
。
咱们来看下ChannelInitializer
中handlerAdded回调办法
具体作了哪些事件~~
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { if (initChannel(ctx)) { //初始化工作实现后,须要将本身从pipeline中移除 removeState(ctx); } } } //ChannelInitializer实例是被所有的Channel共享的,用于初始化ChannelPipeline //通过Set汇合保留曾经初始化的ChannelPipeline,防止反复初始化同一ChannelPipeline private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap( new ConcurrentHashMap<ChannelHandlerContext, Boolean>()); private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { //初始化结束后,从pipeline中移除本身 pipeline.remove(this); } } return true; } return false; } //匿名类实现,这里指定具体的初始化逻辑 protected abstract void initChannel(C ch) throws Exception; private void removeState(final ChannelHandlerContext ctx) { //从initMap防重Set汇合中删除ChannelInitializer if (ctx.isRemoved()) { initMap.remove(ctx); } else { ctx.executor().execute(new Runnable() { @Override public void run() { initMap.remove(ctx); } }); } }}
ChannelInitializer
中的初始化逻辑比拟简单明了:
- 首先要判断必须是以后
Channel
曾经实现注册后,才能够进行pipeline
的初始化。ctx.channel().isRegistered()
- 调用
ChannelInitializer
的匿名类指定的initChannel
执行自定义的初始化逻辑。
p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); //ServerBootstrap中用户指定的channelHandler ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });
还记得在初始化NioServerSocketChannel
时。io.netty.bootstrap.ServerBootstrap#init
办法中向pipeline
中增加的ChannelInitializer
吗?
- 当执行完
initChannel 办法
后,ChannelPipeline
的初始化就完结了,此时ChannelInitializer
就没必要再持续呆在pipeline中了
,所须要将ChannelInitializer
从pipeline
中删除。pipeline.remove(this)
当初始化完pipeline
时,此时pipeline
的构造再次发生了变动:
此时Main Reactor
中的工作队列taskQueue
构造变动为:
增加ServerBootstrapAcceptor
的工作是在初始化NioServerSocketChannel
的时候向main reactor提交过来的。还记得吗?
1.3.8 回调regFuture的ChannelFutureListener
在本大节《Netty服务端的启动》的最开始,咱们介绍了服务端启动的入口函数io.netty.bootstrap.AbstractBootstrap#doBind
,在函数的最结尾调用了initAndRegister()
办法用来创立并初始化NioServerSocketChannel
,之后便会将NioServerSocketChannel
注册到Main Reactor
中。
注册的操作是一个异步的过程,所以在initAndRegister()
办法调用后返回一个代表注册后果的ChannelFuture regFuture
。
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { private ChannelFuture doBind(final SocketAddress localAddress) { //异步创立,初始化,注册ServerSocketChannel final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { //如果注册实现,则进行绑定操作 ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); //增加注册实现 回调函数 regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { ...............省略............... // 注册实现后,Reactor线程回调这里 doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }}
之后会向ChannelFuture regFuture
增加一个注册实现后的回调函数~~~~ ChannelFutureListener
。在回调函数operationComplete
中开始发动绑端口地址流程
。
那么这个回调函数在什么时候?什么中央发动的呢??
让咱们在回到本大节的主题register0
办法的流程中:
当调用doRegister()
办法实现NioServerSocketChannel
向Main Reactor
的注册后,紧接着会调用pipeline.invokeHandlerAddedIfNeeded()
办法中触发ChannelInitializer#handlerAdded
回调中对pipeline
进行初始化。
最初在safeSetSuccess
办法中,开始回调注册在regFuture
上的ChannelFutureListener
。
protected final void safeSetSuccess(ChannelPromise promise) { if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) { logger.warn("Failed to mark a promise as success because it is done already: {}", promise); } } @Override public boolean trySuccess() { return trySuccess(null); } @Override public boolean trySuccess(V result) { return setSuccess0(result); } private boolean setSuccess0(V result) { return setValue0(result == null ? SUCCESS : result); } private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { if (checkNotifyWaiters()) { //回调注册在promise上的listeners notifyListeners(); } return true; } return false; }
safeSetSuccess
的逻辑比较简单,首先设置regFuture
后果为success
,并且回调注册在regFuture
上的ChannelFutureListener
。
须要揭示的是,执行
safeSetSuccess
办法,以及后边回调regFuture
上的ChannelFutureListener
这些动作都是由Reactor线程
执行的。对于Netty中的
Promise模型
后边我会在写一篇专门的文章进行剖析,这里大家只需分明大体的流程即可。不用在意过多的细节。
上面咱们把视角切换到regFuture
上的ChannelFutureListener
回调中,看看在Channel
注册实现后,Netty又会做哪些事件?
2. doBind0
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }}
这里Netty又将绑定端口地址
的操作封装成异步工作,提交给Reactor
执行。
然而这里有一个问题,其实此时执行doBind0
办法的线程正是Reactor线程
,那为什么不间接在这里去执行bind操作
,而是再次封装成异步工作提交给Reactor
中的taskQueue
呢?
反正最终都是由Reactor线程
执行,这其中又有什么别离呢?
通过上大节的介绍咱们晓得,bind0
办法的调用是由io.netty.channel.AbstractChannel.AbstractUnsafe#register0
办法在将NioServerSocketChannel
注册到Main Reactor
之后,并且NioServerSocketChannel
的pipeline
曾经初始化结束后,通过safeSetSuccess
办法回调过去的。
这个过程全程是由Reactor线程
来负责执行的,然而此时register0
办法并没有执行结束,还须要执行前面的逻辑。
而绑定逻辑须要在注册逻辑执行完之后执行,所以在doBind0
办法中Reactor线程
会将绑定操作
封装成异步工作先提交给taskQueue
中保留,这样能够使Reactor线程
立马从safeSetSuccess
中返回,继续执行剩下的register0
办法逻辑。
private void register0(ChannelPromise promise) { try { ................省略............ doRegister(); pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); //触发channelRegister事件 pipeline.fireChannelRegistered(); if (isActive()) { ................省略............ } } catch (Throwable t) { ................省略............ } }
当Reactor线程
执行完register0
办法后,就会从taskQueue
中取出异步工作执行。
此时Reactor线程
中的taskQueue
构造如下:
Reactor线程
会先取出位于taskQueue
队首的工作执行,这里是指向NioServerSocketChannel
的pipeline
中增加ServerBootstrapAcceptor
的异步工作。
此时NioServerSocketChannel
中pipeline
的构造如下:
Reactor线程
执行绑定工作。
3. 绑定端口地址
对Channel
的操作行为全副定义在ChannelOutboundInvoker接口中
。
public interface ChannelOutboundInvoker { /** * Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation * completes, either because the operation was successful or because of an error. * */ ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);}
bind
办法由子类AbstractChannel
实现。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { @Override public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); }}
调用pipeline.bind(localAddress, promise)
在pipeline
中流传bind事件
,触发回调pipeline
中所有ChannelHandler
的bind办法
。
事件在pipeline
中的流传具备方向性:
inbound事件
从HeadContext
开始一一向后流传直到TailContext
。outbound事件
则是反向流传,从TailContext
开始反向向前流传直到HeadContext
。
inbound事件
只能被pipeline
中的ChannelInboundHandler
响应解决outbound事件
只能被pipeline
中的ChannelOutboundHandler
响应解决
然而这里的bind事件
在Netty中被定义为outbound事件
,所以它在pipeline
中是反向流传。先从TailContext
开始反向流传直到HeadContext
。
然而bind
的外围逻辑也正是实现在HeadContext
中。
3.1 HeadContext
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { @Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { //触发AbstractChannel->bind办法 执行JDK NIO SelectableChannel 执行底层绑定操作 unsafe.bind(localAddress, promise); }}
在HeadContext#bind
回调办法中,调用Channel
里的unsafe
操作类执行真正的绑定操作。
protected abstract class AbstractUnsafe implements Unsafe { @Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { .................省略................ //这时channel还未激活 wasActive = false boolean wasActive = isActive(); try { //io.netty.channel.socket.nio.NioServerSocketChannel.doBind //调用具体channel实现类 doBind(localAddress); } catch (Throwable t) { .................省略................ return; } //绑定胜利后 channel激活 触发channelActive事件流传 if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { //pipeline中触发channelActive事件 pipeline.fireChannelActive(); } }); } //回调注册在promise上的ChannelFutureListener safeSetSuccess(promise); } protected abstract void doBind(SocketAddress localAddress) throws Exception;}
- 首先执行子类
NioServerSocketChannel
具体实现的doBind
办法,通过JDK NIO 原生 ServerSocketChannel
执行底层的绑定操作。
@Override protected void doBind(SocketAddress localAddress) throws Exception { //调用JDK NIO 底层SelectableChannel 执行绑定操作 if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
- 判断是否为首次绑定,如果是的话将
触发pipeline中的ChannelActive事件
封装成异步工作放入Reactor
中的taskQueue
中。 - 执行
safeSetSuccess(promise)
,回调注册在promise
上的ChannelFutureListener
。
还是同样的问题,以后执行线程曾经是Reactor线程
了,那么为何不间接触发pipeline
中的ChannelActive
事件而是又封装成异步工作呢??
因为如果间接在这里触发ChannelActive事件
,那么Reactor线程
就会去执行pipeline
中的ChannelHandler
的channelActive事件回调
。
这样的话就影响了safeSetSuccess(promise)
的执行,提早了
注册在promise
上的ChannelFutureListener
的回调。
到当初为止,Netty服务端就曾经实现了绑定端口地址的操作,NioServerSocketChannel
的状态当初变为Active
。
最初还有一件重要的事件要做,咱们接着来看pipeline
中对channelActive事件
解决。
3.2 channelActive事件处理
channelActive事件
在Netty中定义为inbound事件
,所以它在pipeline
中的流传为正向流传,从HeadContext
始终到TailContext
为止。
在channelActive事件
回调中须要触发向Selector
指定须要监听的IO事件
~~OP_ACCEPT事件
。
这块的逻辑次要在HeadContext
中实现。
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { @Override public void channelActive(ChannelHandlerContext ctx) { //pipeline中持续向后流传channelActive事件 ctx.fireChannelActive(); //如果是autoRead 则主动触发read事件流传 //在read回调函数中 触发OP_ACCEPT注册 readIfIsAutoRead(); } private void readIfIsAutoRead() { if (channel.config().isAutoRead()) { //如果是autoRead 则触发read事件流传 channel.read(); } } //AbstractChannel public Channel read() { //触发read事件 pipeline.read(); return this; } @Override public void read(ChannelHandlerContext ctx) { //触发注册OP_ACCEPT或者OP_READ事件 unsafe.beginRead(); } }
- 在
HeadContext
中的channelActive
回调中触发pipeline
中的read事件
。 - 当
read事件
再次流传到HeadContext
时,触发HeadContext#read
办法的回调。在read回调
中调用channel
底层操作类unsafe
的beginRead
办法向selector
注册监听OP_ACCEPT事件
。
3.3 beginRead
protected abstract class AbstractUnsafe implements Unsafe { @Override public final void beginRead() { assertEventLoop(); //channel必须是Active if (!isActive()) { return; } try { // 触发在selector上注册channel感兴趣的监听事件 doBeginRead(); } catch (final Exception e) { .............省略.............. } }}public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { //子类负责继承实现 protected abstract void doBeginRead() throws Exception;}
- 断言判断执行该办法的线程必须是
Reactor线程
。 - 此时
NioServerSocketChannel
曾经实现端口地址的绑定操作,isActive() = true
- 调用
doBeginRead
实现向Selector
注册监听事件OP_ACCEPT
public abstract class AbstractNioChannel extends AbstractChannel { //channel注册到Selector后取得的SelectKey volatile SelectionKey selectionKey; // Channel监听事件汇合 protected final int readInterestOp; @Override protected void doBeginRead() throws Exception { final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); /** * 1:ServerSocketChannel 初始化时 readInterestOp设置的是OP_ACCEPT事件 * */ if ((interestOps & readInterestOp) == 0) { //增加OP_ACCEPT事件到interestOps汇合中 selectionKey.interestOps(interestOps | readInterestOp); } }}
- 前边提到在
NioServerSocketChannel
在向Main Reactor
中的Selector
注册后,会取得一个SelectionKey
。这里首先要获取这个SelectionKey
。 - 从
SelectionKey
中获取NioServerSocketChannel
感兴趣的IO事件汇合 interestOps
,过后在注册的时候interestOps
设置为0
。 - 将在
NioServerSocketChannel
初始化时设置的readInterestOp = OP_ACCEPT
,设置到SelectionKey
中的interestOps
汇合中。这样Reactor
中的Selector
就开始监听interestOps
汇合中蕴含的IO事件
了。
Main Reactor
中次要监听的是OP_ACCEPT事件
。
流程走到这里,Netty服务端就真正的启动起来了,下一步就开始期待接管客户端连贯了。大家此刻在来回看这副启动流程图,是不是清晰了很多呢?
此时Netty的Reactor模型
构造如下:
总结
本文咱们通过图解源码的形式残缺地介绍了整个Netty服务端启动流程,并介绍了在启动过程中波及到的ServerBootstrap
相干的属性以及配置形式。NioServerSocketChannel
的创立初始化过程以及类的继承构造。
其中重点介绍了NioServerSocketChannel
向Reactor
的注册过程以及Reactor线程
的启动机会和pipeline
的初始化机会。
最初介绍了NioServerSocketChannel
绑定端口地址的整个流程。
上述介绍的这些流程全副是异步操作,各种回调绕来绕去的,须要重复回忆下,读异步代码就是这样,须要理清各种回调之间的关系,并且时刻揭示本人以后的执行线程是什么?
好了,当初Netty服务端曾经启动起来,接着就该接管客户端连贯了,咱们下篇文章见~~~~