关于java:详细图解Netty-Reactor启动全流程-万字长文-多图预警

55次阅读

共计 45367 个字符,预计需要花费 114 分钟才能阅读完成。

欢送关注公众号:bin 的技术小屋,浏览公众号原文

本系列 Netty 源码解析文章基于 4.1.56.Final版本

大家第一眼看到这幅流程图,是不是脑瓜子嗡嗡的呢?

大家先不要惊恐,问题不大,本文笔者的目标就是要让大家清晰的了解这幅流程图,从而粗浅的了解 Netty Reactor 的启动全流程,包含其中波及到的各种代码设计实现细节。

在上篇文章《聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现 (创立篇)》中咱们具体介绍了 Netty 服务端外围引擎组件 主从 Reactor 组模型 NioEventLoopGroup以及 Reactor 模型 NioEventLoop 的创立过程。最终咱们失去了 netty Reactor 模型的运行骨架如下:

当初 Netty 服务端程序的骨架是搭建好了,本文咱们就基于这个骨架来深刻分析下 Netty 服务端的启动过程。

咱们持续回到上篇文章提到的 Netty 服务端代码模板中,在创立完主从 Reactor 线程组:bossGroupworkerGroup后,接下来就开始配置 Netty 服务端的启动辅助类 ServerBootstrap 了。

public final class EchoServer {static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure the server.
        // 创立主从 Reactor 线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)// 配置主从 Reactor
             .channel(NioServerSocketChannel.class)// 配置主 Reactor 中的 channel 类型
             .option(ChannelOption.SO_BACKLOG, 100)// 设置主 Reactor 中 channel 的 option 选项
             .handler(new LoggingHandler(LogLevel.INFO))// 设置主 Reactor 中 Channel->pipline->handler
             .childHandler(new ChannelInitializer<SocketChannel>() {// 设置从 Reactor 中注册 channel 的 pipeline
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // Start the server. 绑定端口启动服务,开始监听 accept 事件
            ChannelFuture f = b.bind(PORT).sync();
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();} finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();}
    }
}

在上篇文章中咱们对代码模板中波及到 ServerBootstrap 的一些配置办法做了简略的介绍,大家如果遗记的话,能够在返回去回顾一下。

ServerBootstrap 类 其实没有什么特地的逻辑,次要是对 Netty 启动过程中须要用到的一些外围信息进行配置管理,比方:

  • Netty 的外围引擎组件 主从 Reactor 线程组:bossGroup,workerGroup。通过 ServerBootstrap#group 办法 配置。
  • Netty 服务端应用到的 Channel 类型:NioServerSocketChannel , 通过 ServerBootstrap#channel 办法 配置。
    以及配置 NioServerSocketChannel 时用到的 SocketOptionSocketOption 用于设置底层 JDK NIO Socket 的一些选项。通过 ServerBootstrap#option 办法 进行配置。

主 ReactorGroup 中的 MainReactor 治理的 Channel 类型为 NioServerSocketChannel,如图所示次要用来监听端口,接管客户端连贯,为客户端创立初始化NioSocketChannel,而后采纳round-robin 轮询的形式从图中从 ReactorGroup 中抉择一个 SubReactor 与该客户端 NioSocketChannel 进行绑定。

从 ReactorGroup 中的 SubReactor 治理的 Channel 类型为 NioSocketChannel,它是 netty 中定义客户端连贯的一个模型,每个连贯对应一个。如图所示 SubReactor 负责监听解决绑定在其上的所有NioSocketChannel 上的 IO 事件。

  • 保留服务端 NioServerSocketChannel 和客户端 NioSocketChannel 对应 pipeline 中指定的ChannelHandler。用于后续 Channel 向 Reactor 注册胜利之后,初始化 Channel 里的 pipeline。

不论是服务端用到的 NioServerSocketChannel 还是客户端用到的 NioSocketChannel,每个Channel 实例 都会有一个 PipelinePipeline 中有多个 ChannelHandler 用于编排解决对应 Channel 上感兴趣的IO 事件

ServerBootstrap构造中蕴含了 netty 服务端程序启动的所有配置信息,在咱们介绍启动流程之前,先来看下 ServerBootstrap 的源码构造:

ServerBootstrap

ServerBootstrap的继承构造比较简单,继承档次的职责分工也比拟明确。

ServerBootstrap次要负责对 主从 Reactor 线程组 相干的配置进行治理,其中带 child 前缀的配置办法 是对 从 Reactor 线程组 的相干配置管理。从 Reactor 线程组 中的 Sub Reactor 负责管理的客户端 NioSocketChannel 相干配置存储在 ServerBootstrap 构造中。

父类 AbstractBootstrap 则是次要负责对 主 Reactor 线程组 相干的配置进行治理,以及 主 Reactor 线程组 中的 Main Reactor 负责解决的服务端 ServerSocketChannel 相干的配置管理。

1. 配置主从 Reactor 线程组

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)// 配置主从 Reactor
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

     //Main Reactor 线程组
    volatile EventLoopGroup group;
    //Sub Reactor 线程组
    private volatile EventLoopGroup childGroup;

    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        // 父类治理主 Reactor 线程组
        super.group(parentGroup);
        if (this.childGroup != null) {throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
        return this;
    }

}

2. 配置服务端 ServerSocketChannel

ServerBootstrap b = new ServerBootstrap();
b.channel(NioServerSocketChannel.class);
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

    // 用于创立 ServerSocketChannel  ReflectiveChannelFactory
    private volatile ChannelFactory<? extends C> channelFactory;

    public B channel(Class<? extends C> channelClass) {
        return channelFactory(new ReflectiveChannelFactory<C>(ObjectUtil.checkNotNull(channelClass, "channelClass")
        ));
    }

    @Deprecated
    public B channelFactory(ChannelFactory<? extends C> channelFactory) {ObjectUtil.checkNotNull(channelFactory, "channelFactory");
        if (this.channelFactory != null) {throw new IllegalStateException("channelFactory set already");
        }

        this.channelFactory = channelFactory;
        return self();}

}

在向 ServerBootstrap 配置服务端 ServerSocketChannelchannel 办法中,其实是创立了一个 ChannelFactory 工厂实例 ReflectiveChannelFactory, 在 Netty 服务端启动的过程中,会通过这个ChannelFactory 去创立相应的 Channel 实例。

咱们能够通过这个办法来配置 netty 的 IO 模型,上面为 ServerSocketChannel 在不同 IO 模型下的实现:

BIO NIO AIO
OioServerSocketChannel NioServerSocketChannel AioServerSocketChannel

EventLoopGroup Reactor 线程组在不同 IO 模型下的实现:

BIO NIO AIO
ThreadPerChannelEventLoopGroup NioEventLoopGroup AioEventLoopGroup

咱们只须要将 IO 模型 的这些外围接口对应的实现类 前缀 改为对应 IO 模型 的前缀,就能够轻松在 Netty 中实现对 IO 模型 的切换。

2.1 ReflectiveChannelFactory

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    //NioServerSocketChannelde 结构器
    private final Constructor<? extends T> constructor;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {ObjectUtil.checkNotNull(clazz, "clazz");
        try {
            // 反射获取 NioServerSocketChannel 的结构器
            this.constructor = clazz.getConstructor();} catch (NoSuchMethodException e) {throw new IllegalArgumentException("Class" + StringUtil.simpleClassName(clazz) +
                    "does not have a public non-arg constructor", e);
        }
    }

    @Override
    public T newChannel() {
        try {
            // 创立 NioServerSocketChannel 实例
            return constructor.newInstance();} catch (Throwable t) {throw new ChannelException("Unable to create Channel from class" + constructor.getDeclaringClass(), t);
        }
    }
}

从类的签名咱们能够看出,这个工厂类是通过 泛型 反射 的形式来创立对应的 Channel 实例。

  • 泛型参数 T extends Channel 示意的是要通过工厂类创立的Channel 类型,这里咱们初始化的是NioServerSocketChannel
  • ReflectiveChannelFactory 的结构器中通过 反射 的形式获取 NioServerSocketChannel 的结构器。
  • newChannel 办法中通过结构器反射创立 NioServerSocketChannel 实例。

留神 这时只是配置阶段,NioServerSocketChannel此时并未被创立。它是在启动的时候才会被创立进去。

3. 为 NioServerSocketChannel 配置 ChannelOption

ServerBootstrap b = new ServerBootstrap();
// 设置被 MainReactor 治理的 NioServerSocketChannel 的 Socket 选项
b.option(ChannelOption.SO_BACKLOG, 100)
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    //serverSocketChannel 中的 ChannelOption 配置
    private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();

    public <T> B option(ChannelOption<T> option, T value) {ObjectUtil.checkNotNull(option, "option");
        synchronized (options) {if (value == null) {options.remove(option);
            } else {options.put(option, value);
            }
        }
        return self();}
}

无论是服务端的 NioServerSocketChannel 还是客户端的 NioSocketChannel 它们的相干底层 Socket 选项 ChannelOption 配置全副寄存于一个 Map 类型的数据结构中。

因为客户端 NioSocketChannel 是由 从 Reactor 线程组 中的 Sub Reactor 来负责解决,所以波及到客户端 NioSocketChannel 所有的办法和配置全副是以 child 前缀结尾。

ServerBootstrap b = new ServerBootstrap();
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

   // 客户端 SocketChannel 对应的 ChannelOption 配置
    private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();

    public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {ObjectUtil.checkNotNull(childOption, "childOption");
        synchronized (childOptions) {if (value == null) {childOptions.remove(childOption);
            } else {childOptions.put(childOption, value);
            }
        }
        return this;
    }
}

相干的底层 Socket 选项,netty 全副枚举在 ChannelOption 类中,笔者这里就不一一列举了,在本系列后续相干的文章中,笔者还会为大家具体的介绍这些参数的作用。

public class ChannelOption<T> extends AbstractConstant<ChannelOption<T>> {

    .................. 省略..............

    public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST");
    public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");
    public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF");
    public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");
    public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR");
    public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER");
    public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG");
    public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT");

    .................. 省略..............

}

4. 为服务端 NioServerSocketChannel 中的 Pipeline 配置 ChannelHandler

    //serverSocketChannel 中 pipeline 里的 handler(次要是 acceptor)
    private volatile ChannelHandler handler;

    public B handler(ChannelHandler handler) {this.handler = ObjectUtil.checkNotNull(handler, "handler");
        return self();}

NioServerSocketChannel 中的 Pipeline 增加 ChannelHandler 分为两种形式:

  • 显式增加: 显式增加的形式是由用户在 main 线程中通过 ServerBootstrap#handler 的形式增加。如果须要增加多个 ChannelHandler,则能够通过ChannelInitializerpipeline中进行增加。

对于 ChannelInitializer 前面笔者会有具体介绍,这里大家只须要晓得 ChannelInitializer 是一种非凡的ChannelHandler,用于初始化pipeline。实用于向 pipeline 中增加多个 ChannelHandler 的场景。

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)// 配置主从 Reactor
             .channel(NioServerSocketChannel.class)// 配置主 Reactor 中的 channel 类型
             .handler(new ChannelInitializer<NioServerSocketChannel>() {
                 @Override
                 protected void initChannel(NioServerSocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();
                     p.addLast(channelhandler1)
                      .addLast(channelHandler2)
                      
                      ......
                     
                      .addLast(channelHandler3);
                 }
             })
  • 隐式增加:隐式增加次要增加的就是 主 ReactorGroup的外围组件也就是下图中的 acceptor,Netty 中的实现为ServerBootstrapAcceptor,实质上也是一种ChannelHandler,次要负责在客户端连贯建设好后,初始化客户端NioSocketChannel,在 从 Reactor 线程组中 选取一个 Sub Reactor,将客户端NioSocketChannel 注册到Sub Reactor 中的 selector 上。

隐式增加 ServerBootstrapAcceptor 是由 Netty 框架在启动的时候负责增加,用户无需关怀。

在本例中,NioServerSocketChannelPipeLine 中只有两个ChannelHandler, 一个由用户在内部显式增加的LoggingHandler, 另一个是由 Netty 框架隐式增加的ServerBootstrapAcceptor

其实咱们在理论我的项目应用的过程中,不会向 netty 服务端 NioServerSocketChannel 增加额定的 ChannelHandler,NioServerSocketChannel只须要分心做好本人最重要的本职工作接管客户端连贯就好了。这里额定增加一个 LoggingHandler 只是为了向大家展现 ServerBootstrap 的配置办法。

5. 为客户端 NioSocketChannel 中的 Pipeline 配置 ChannelHandler

            final EchoServerHandler serverHandler = new EchoServerHandler();

            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {// 设置从 Reactor 中注册 channel 的 pipeline
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();
            
                     p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });
    //socketChannel 中 pipeline 中的解决 handler
    private volatile ChannelHandler childHandler;

    public ServerBootstrap childHandler(ChannelHandler childHandler) {this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
        return this;
    }

向客户端 NioSocketChannel 中的 Pipeline 里增加 ChannelHandler 齐全是由用户本人管制显式增加,增加的数量不受限制。

因为在 Netty 的 IO 线程模型 中,是由单个 Sub Reactor 线程 负责执行客户端 NioSocketChannel 中的 Pipeline,一个Sub Reactor 线程 负责解决多个 NioSocketChannel 上的 IO 事件,如果Pipeline 中的 ChannelHandler 增加的太多,就会影响 Sub Reactor 线程 执行其余 NioSocketChannel 上的Pipeline,从而升高IO 解决效率,升高吞吐量。

所以 Pipeline 中的 ChannelHandler 不易增加过多,并且不能再 ChannelHandler 中执行耗时的业务解决工作。

在咱们通过 ServerBootstrap 配置 netty 服务端启动信息的时候,无论是向服务端 NioServerSocketChannel 的 pipeline 中增加 ChannelHandler,还是向客户端 NioSocketChannel 的 pipeline 中增加 ChannelHandler,当波及到多个 ChannelHandler 增加的时候,咱们都会用到 ChannelInitializer,那么这个ChannelInitializer 到底是何方圣神,为什么要这样做呢?咱们接着往下看~~

ChannelInitializer

首先 ChannelInitializer 它继承于 ChannelHandler,它本人自身就是一个 ChannelHandler,所以它能够增加到childHandler 中。

其余的父类大家这里能够不必管,前面文章中笔者会一一为大家具体介绍。

那为什么不间接增加 ChannelHandler 而是抉择用 ChannelInitializer 呢?

这里次要有两点起因:

  • 前边咱们提到,客户端 NioSocketChannel 是在服务端 accept 连贯后,在服务端 NioServerSocketChannel 中被创立进去的。然而此时咱们正处于配置 ServerBootStrap 阶段,服务端还没有启动,更没有客户端连贯上来 ,此时客户端NioSocketChannel 还没有被创立进去,所以也就没方法向客户端 NioSocketChannel 的 pipeline 中增加ChannelHandler
  • 客户端 NioSocketChannelPipeline里能够增加任意多个 ChannelHandler,然而 Netty 框架无奈预知用户到底须要增加多少个ChannelHandler,所以 Netty 框架提供了回调函数ChannelInitializer#initChannel,使用户能够自定义ChannelHandler 的增加行为。

当客户端 NioSocketChannel 注册到对应的 Sub Reactor 上后,紧接着就会初始化 NioSocketChannel 中的 Pipeline,此时 Netty 框架会回调ChannelInitializer#initChannel 执行用户自定义的增加逻辑。

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    @Override
    @SuppressWarnings("unchecked")
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // 当 channelRegister 事件产生时,调用 initChannel 初始化 pipeline
        if (initChannel(ctx)) {................. 省略...............} else {................. 省略...............}
    }

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.add(ctx)) { // Guard against re-entrance.
            try {
                // 此时客户单 NioSocketChannel 曾经创立并初始化好了
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {................. 省略...............} finally {................. 省略...............}
            return true;
        }
        return false;
    }

    protected abstract void initChannel(C ch) throws Exception;
    
    ................. 省略...............
}

这里由 netty 框架回调的 ChannelInitializer#initChannel 办法 正是咱们自定义的增加逻辑。

            final EchoServerHandler serverHandler = new EchoServerHandler();

            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {// 设置从 Reactor 中注册 channel 的 pipeline
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();
            
                     p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

到此为止,Netty 服务端启动所须要的必要配置信息,曾经全副存入 ServerBootStrap 启动辅助类中。

接下来要做的事件就是服务端的启动了。

// Start the server. 绑定端口启动服务,开始监听 accept 事件
ChannelFuture f = serverBootStrap.bind(PORT).sync();

Netty 服务端的启动

通过后面的铺垫终于来到了本文的核心内容 —-Netty 服务端的启动过程。

如代码模板中的示例所示,Netty 服务端的启动过程封装在 io.netty.bootstrap.AbstractBootstrap#bind(int) 函数中。

接下来咱们看一下 Netty 服务端在启动过程中到底干了哪些事件?

大家看到这副启动流程图先不要慌,接下来的内容笔者会带大家各个击破它,在文章的最初保障让大家看懂这副流程图。

咱们先来从 netty 服务端启动的入口函数开始咱们明天的源码解析旅程:

    public ChannelFuture bind(int inetPort) {return bind(new InetSocketAddress(inetPort));
    }

    public ChannelFuture bind(SocketAddress localAddress) {
        // 校验 Netty 外围组件是否配置齐全
        validate();
        // 服务端开始启动,绑定端口地址,接管客户端连贯
        return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    }

   private ChannelFuture doBind(final SocketAddress localAddress) {
        // 异步创立,初始化,注册 ServerSocketChannel 到 main reactor 上
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {return regFuture;}

        if (regFuture.isDone()) {........serverSocketChannel 向 Main Reactor 注册胜利后开始绑定端口....,} else {
            // 如果此时注册操作没有实现,则向 regFuture 增加 operationComplete 回调函数,注册胜利后回调。regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {........serverSocketChannel 向 Main Reactor 注册胜利后开始绑定端口....,});
            return promise;
        }
    }

Netty 服务端的启动流程总体如下:

  • 创立服务端 NioServerSocketChannel 并初始化。
  • 将服务端 NioServerSocketChannel 注册到 主 Reactor 线程组 中。
  • 注册胜利后,开始初始化 NioServerSocketChannel 中的 pipeline,而后在 pipeline 中触发 channelRegister 事件。
  • 随后由 NioServerSocketChannel 绑定端口地址。
  • 绑定端口地址胜利后,向 NioServerSocketChannel 对应的 Pipeline 中触发流传 ChannelActive 事件,在ChannelActive 事件回调 中向 Main Reactor 注册OP_ACCEPT 事件,开始期待客户端连贯。服务端启动实现。

当 netty 服务端启动胜利之后,最终咱们会失去如下构造的阵型,开始严阵以待,筹备接管客户端的连贯,Reactor 开始运转。

接下来,咱们就来看下 Netty 源码是如何实现以上步骤的~~

1. initAndRegister

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 创立 NioServerSocketChannel
            //ReflectiveChannelFactory 通过泛型,反射,工厂的形式灵便创立不同类型的 channel
            channel = channelFactory.newChannel();
            // 初始化 NioServerSocketChannel
            init(channel);
        } catch (Throwable t) {.............. 省略.................}

        // 向 MainReactor 注册 ServerSocketChannel
        ChannelFuture regFuture = config().group().register(channel);

           .............. 省略.................

        return regFuture;
    }

从函数命名中咱们能够看出,这个函数次要做的事件就是首先创立 NioServerSocketChannel ,并对NioServerSocketChannel 进行初始化,最初将 NioServerSocketChannel 注册到 Main Reactor 中。

1.1 创立 NioServerSocketChannel

还记得咱们在介绍 ServerBootstrap 启动辅助类配置服务端 ServerSocketChannel 类型的时候提到的工厂类 ReflectiveChannelFactory 吗?

因为过后咱们在配置 ServerBootstrap 启动辅助类的时候,还没到启动阶段,而配置阶段并不是创立具体 ServerSocketChannel 的机会。

所以 Netty 通过 工厂模式 将要创立的 ServerSocketChannel 的类型(通过泛型指定)以及 创立的过程 ( 封装在 newChannel 函数中 ) 通通先封装在工厂类 ReflectiveChannelFactory 中。

ReflectiveChannelFactory通过 泛型 反射 工厂 的形式 灵便 创立不同类型的channel

期待创立机会降临,咱们调用保留在 ServerBootstrap 中的 channelFactory 间接进行创立。

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Constructor<? extends T> constructor;

    @Override
    public T newChannel() {
        try {return constructor.newInstance();
        } catch (Throwable t) {throw new ChannelException("Unable to create Channel from class" + constructor.getDeclaringClass(), t);
        }
    }
}

上面咱们来看下 NioServerSocketChannel 的构建过程:

1.1.1 NioServerSocketChannel

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {//SelectorProvider(用于创立 Selector 和 Selectable Channels)
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }

    // 创立 JDK NIO ServerSocketChannel
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException("Failed to open a server socket.", e);
        }
    }

     //ServerSocketChannel 相干的配置
    private final ServerSocketChannelConfig config;

    public NioServerSocketChannel(ServerSocketChannel channel) {
        // 父类 AbstractNioChannel 中保留 JDK NIO 原生 ServerSocketChannel 以及要监听的事件 OP_ACCEPT
        super(null, channel, SelectionKey.OP_ACCEPT);
        //DefaultChannelConfig 中设置用于 Channel 接收数据用的 buffer->AdaptiveRecvByteBufAllocator
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

}
  • 首先调用 newSocket 创立 JDK NIO 原生 ServerSocketChannel,这里调用了SelectorProvider#openServerSocketChannel 来创立 JDK NIO 原生 ServerSocketChannel,咱们在上篇文章《聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现(创立篇)》中具体的介绍了SelectorProvider 相干内容,过后是用 SelectorProvider 来创立 Reactor 中的Selector。大家还记得吗??
  • 通过父类结构器设置 NioServerSocketChannel 感兴趣的 IO 事件, 这里设置的是SelectionKey.OP_ACCEPT 事件。并将 JDK NIO 原生 ServerSocketChannel 封装起来。
  • 创立 Channel 的配置类 NioServerSocketChannelConfig,在配置类中封装了对Channel 底层 的一些配置行为,以及 JDK 中的 ServerSocket。以及创立NioServerSocketChannel 接收数据用的 Buffer 分配器AdaptiveRecvByteBufAllocator

NioServerSocketChannelConfig没什么重要的货色,咱们这里也不用深究,它就是治理 NioServerSocketChannel 相干的配置,这里惟一须要大家留神的是这个用于 Channel 接收数据用的Buffer 分配器AdaptiveRecvByteBufAllocator,咱们前面在介绍 Netty 如何接管连贯的时候还会提到。

NioServerSocketChannel 的整体构建过程介绍完了,当初咱们来依照继承档次再回过头来看下 NioServerSocketChannel 的档次构建,来看下每一层都创立了什么,封装了什么,这些信息都是 Channel 的外围信息,所以有必要理解一下。

NioServerSocketChannel 的创立过程中,咱们次要关注继承结构图中红框标注的三个类,其余的咱们占时先不必管。

其中 AbstractNioMessageChannel 类 次要是对 NioServerSocketChannel 底层读写行为的封装和定义,比方 accept 接管客户端连贯。这个咱们后续会介绍到,这里咱们并不开展。

1.1.2 AbstractNioChannel

public abstract class AbstractNioChannel extends AbstractChannel {
   //JDK NIO 原生 Selectable Channel
    private final SelectableChannel ch;
    // Channel 监听事件汇合 这里是 SelectionKey.OP_ACCEPT 事件
    protected final int readInterestOp;

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            // 设置 Channel 为非阻塞 配合 IO 多路复用模型
            ch.configureBlocking(false);
        } catch (IOException e) {............. 省略................}
    }
}
  • 封装由 SelectorProvider 创立进去的 JDK NIO 原生ServerSocketChannel
  • 封装 Channel 在创立时指定感兴趣的 IO 事件,对于NioServerSocketChannel 来说感兴趣的 IO 事件OP_ACCEPT 事件
  • 设置 JDK NIO 原生 ServerSocketChannel 为非阻塞模式,配合 IO 多路复用模型。

1.1.3 AbstractChannel

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    //channel 是由创立档次的,比方 ServerSocketChannel 是 SocketChannel 的 parent
    private final Channel parent;
    //channel 全局惟一 ID machineId+processId+sequence+timestamp+random
    private final ChannelId id;
    //unsafe 用于封装对底层 socket 的相干操作
    private final Unsafe unsafe;
    // 为 channel 调配独立的 pipeline 用于 IO 事件编排
    private final DefaultChannelPipeline pipeline;

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        //channel 全局惟一 ID machineId+processId+sequence+timestamp+random
        id = newId();
        //unsafe 用于定义实现对 Channel 的底层操作
        unsafe = newUnsafe();
        // 为 channel 调配独立的 pipeline 用于 IO 事件编排
        pipeline = newChannelPipeline();}
}
  • Netty 中的 Channel 创立 是有档次的,这里的 parent 属性 用来保留上一级的 Channel,比方这里的NioServerSocketChannel 是顶级 Channel,所以它的parent = null。客户端NioSocketChannel 是由 NioServerSocketChannel 创立的,所以它的parent = NioServerSocketChannel
  • Channel 调配全局惟一的 ChannelIdChannelId 由机器 Id(machineId),过程 Id(processId),序列号(sequence),工夫戳(timestamp),随机数(random)形成
   private DefaultChannelId() {data = new byte[MACHINE_ID.length + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN];
        int i = 0;

        // machineId
        System.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID.length);
        i += MACHINE_ID.length;

        // processId
        i = writeInt(i, PROCESS_ID);

        // sequence
        i = writeInt(i, nextSequence.getAndIncrement());

        // timestamp (kind of)
        i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());

        // random
        int random = PlatformDependent.threadLocalRandom().nextInt();
        i = writeInt(i, random);
        assert i == data.length;

        hashCode = Arrays.hashCode(data);
    }
  • 创立 NioServerSocketChannel 的底层操作类Unsafe 。这里创立的是io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe

UnsafeChannel 接口 的一个外部接口,用于定义实现对 Channel 底层的各种操作,Unsafe 接口 定义的操作行为只能由 Netty 框架的 Reactor 线程 调用,用户线程禁止调用。

interface Unsafe {
        
        // 调配接收数据用的 Buffer
        RecvByteBufAllocator.Handle recvBufAllocHandle();

        // 服务端绑定的端口地址
        SocketAddress localAddress();
        // 远端地址
        SocketAddress remoteAddress();
        //channel 向 Reactor 注册
        void register(EventLoop eventLoop, ChannelPromise promise);

        // 服务端绑定端口地址
        void bind(SocketAddress localAddress, ChannelPromise promise);
        // 客户端连贯服务端
        void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
        // 敞开 channle
        void close(ChannelPromise promise);
        // 读数据
        void beginRead();
        // 写数据
        void write(Object msg, ChannelPromise promise);

    }
  • NioServerSocketChannel 调配独立的 pipeline 用于 IO 事件编排。pipeline其实是一个 ChannelHandlerContext 类型的双向链表。头结点 HeadContext, 尾结点TailContextChannelHandlerContext 中包装着ChannelHandler

ChannelHandlerContext 保留 ChannelHandler 上下文信息,用于事件流传。前面笔者会独自开一篇文章介绍,这里咱们还是聚焦于启动主线。

这里只是为了让大家简略了解 pipeline 的一个大抵的构造,前面会写一篇文章专门具体解说pipeline

    protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }


到了这里 NioServerSocketChannel 就创立结束了,咱们来回顾下它到底蕴含了哪些外围信息。

1.2 初始化 NioServerSocketChannel

   void init(Channel channel) {
        // 向 NioServerSocketChannelConfig 设置 ServerSocketChannelOption
        setChannelOptions(channel, newOptionsArray(), logger);
        // 向 netty 自定义的 NioServerSocketChannel 设置 attributes
        setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));

        ChannelPipeline p = channel.pipeline();
        
        // 获取从 Reactor 线程组
        final EventLoopGroup currentChildGroup = childGroup;
        // 获取用于初始化客户端 NioSocketChannel 的 ChannelInitializer
        final ChannelHandler currentChildHandler = childHandler;
        // 获取用户配置的客户端 SocketChannel 的 channelOption 以及 attributes
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
        }
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);

        // 向 NioServerSocketChannel 中的 pipeline 增加初始化 ChannelHandler 的逻辑
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline();
                //ServerBootstrap 中用户指定的 channelHandler
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    //LoggingHandler
                    pipeline.addLast(handler);
                }
                // 增加用于接管客户端连贯的 acceptor
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
  • NioServerSocketChannelConfig 设置ServerSocketChannelOption
  • 向 netty 自定义的 NioServerSocketChannel 设置ChannelAttributes

Netty 自定义的 SocketChannel 类型均继承 AttributeMap 接口以及 DefaultAttributeMap 类,正是它们定义了 ChannelAttributes。用于向Channel 增加用户自定义的一些信息。

这个 ChannelAttributes 的用途大有可为,Netty 后边的许多个性都是依附这个 ChannelAttributes 来实现的。这里先卖个关子,大家能够本人先想一下能够用这个 ChannelAttributes 做哪些事件?

  • 获取从 Reactor 线程组 childGroup,以及用于初始化客户端NioSocketChannelChannelInitializer,ChannelOption,ChannelAttributes,这些信息均是由用户在启动的时候向 ServerBootstrap 增加的客户端 NioServerChannel 配置信息。这里用这些信息来初始化 ServerBootstrapAcceptor。因为后续会在ServerBootstrapAcceptor 中接管客户端连贯以及创立NioServerChannel
  • NioServerSocketChannel 中的 pipeline 增加用于初始化 pipelineChannelInitializer

问题来了,这里为什么不罗唆间接将 ChannelHandler 增加到 pipeline 中,而是又应用到了 ChannelInitializer 呢?

其实起因有两点:

  • 为了保障 线程平安 地初始化 pipeline,所以初始化的动作须要由Reactor 线程 进行,而以后线程是 用户程序 启动 Main 线程 不是Reactor 线程。这里不能立刻初始化。
  • 初始化 Channelpipeline的动作,须要等到 Channel 注册到对应的 Reactor 中才能够进行初始化,以后只是创立好了 NioServerSocketChannel,但并未注册到Main Reactor 上。

    初始化 NioServerSocketChannelpipeline的机会是:当 NioServerSocketChannel 注册到 Main Reactor 之后,绑定端口地址之前。

前边在介绍 ServerBootstrap 配置 childHandler 时也用到了ChannelInitializer,还记得吗??

问题又来了,大家留神下 ChannelInitializer#initChannel 办法,在该初始化回调办法中,增加 LoggingHandler 是间接向 pipeline 中增加,而增加 Acceptor 为什么不是间接增加而是封装成异步工作呢?

这里先给大家卖个关子,笔者会在后续流程中为大家解答~

此时 NioServerSocketChannel 中的 pipeline 构造如下图所示:

1.3 向 Main Reactor 注册 NioServerSocketChannel

ServerBootstrap 获取主 Reactor 线程组 NioEventLoopGroup,将NioServerSocketChannel 注册到 NioEventLoopGroup 中。

ChannelFuture regFuture = config().group().register(channel);

上面咱们来看下具体的注册过程:

1.3.1 主 Reactor 线程组中选取一个 Main Reactor 进行注册

    @Override
    public ChannelFuture register(Channel channel) {return next().register(channel);
    }

    @Override
    public EventExecutor next() {return chooser.next();
    }

    // 获取绑定策略
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {if (isPowerOfTwo(executors.length)) {return new PowerOfTwoEventExecutorChooser(executors);
        } else {return new GenericEventExecutorChooser(executors);
        }
    }
    
    // 采纳轮询 round-robin 的形式抉择 Reactor
    @Override
    public EventExecutor next() {return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
    }

Netty 通过 next() 办法依据上篇文章《聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现 (创立篇)》提到的channel 到 reactor 的绑定策略,从ReactorGroup 中选取一个 Reactor 进行注册绑定。之后 Channel 生命周期内的所有 IO 事件 都由这个 Reactor 负责解决,如 accept、connect、read、write 等 IO 事件。

一个 channel 只能绑定到一个 Reactor 上,一个 Reactor 负责监听 多个 channel

因为这里是 NioServerSocketChannleMain Reactor进行注册绑定,所以 Main Reactor 次要负责解决的 IO 事件OP_ACCEPT事件。

1.3.2 向绑定后的 Main Reactor 进行注册

Reactor 进行注册的行为定义在 NioEventLoop 的父类 SingleThreadEventLoop 中,印象含糊的同学能够在回看下上篇文章中的 NioEventLoop 继承构造 大节内容。

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    @Override
    public ChannelFuture register(Channel channel) {
        // 注册 channel 到绑定的 Reactor 上
        return register(new DefaultChannelPromise(channel, this));
    }

    @Override
    public ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");
        //unsafe 负责 channel 底层的各种操作
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
}

通过 NioServerSocketChannel 中的 Unsafe 类 执行底层具体的注册动作。

protected abstract class AbstractUnsafe implements Unsafe {

        /**
         * 注册 Channel 到绑定的 Reactor 上
         * */
        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {ObjectUtil.checkNotNull(eventLoop, "eventLoop");
            if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            //EventLoop 的类型要与 Channel 的类型一样  Nio Oio Aio
            if (!isCompatible(eventLoop)) {
                promise.setFailure(new IllegalStateException("incompatible event loop type:" + eventLoop.getClass().getName()));
                return;
            }

            // 在 channel 上设置绑定的 Reactor
            AbstractChannel.this.eventLoop = eventLoop;

            /**
             * 执行 channel 注册的操作必须是 Reactor 线程来实现
             *
             * 1: 如果以后执行线程是 Reactor 线程,则间接执行 register0 进行注册
             * 2:如果以后执行线程是内部线程,则须要将 register0 注册操作 封装程异步 Task 由 Reactor 线程执行
             * */
            if (eventLoop.inEventLoop()) {register0(promise);
            } else {
                try {eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {register0(promise);
                        }
                    });
                } catch (Throwable t) {............... 省略...............}
            }
        }
}
  • 首先查看 NioServerSocketChannel 是否曾经实现注册。如果以实现注册,则间接设置代表注册操作后果的 ChannelPromisefail 状态
  • 通过 isCompatible 办法验证 Reactor 模型 EventLoop 是否与 Channel 的类型匹配。NioEventLoop对应于NioServerSocketChannel

上篇文章咱们介绍过 Netty 对三种 IO 模型Oio,Nio,Aio 的反对,用户能够通过扭转 Netty 外围类的前缀轻松切换 IO 模型isCompatible 办法目标就是须要保障 ReactorChannel应用的是同一种IO 模型

  • Channel 中保留其绑定的Reactor 实例
  • 执行 ChannelReactor注册的动作必须要确保是在 Reactor 线程 中执行。

    • 如果以后线程是 Reactor 线程 则间接执行注册动作register0
    • 如果以后线程不是 Reactor 线程,则须要将注册动作register0 封装成异步工作,寄存在 Reactor 中的 taskQueue 中,期待 Reactor 线程 执行。

以后执行线程并不是Reactor 线程,而是用户程序的启动线程Main 线程

1.3.3 Reactor 线程的启动

上篇文章中咱们在介绍 NioEventLoopGroup 的创立过程中提到了一个结构器参数executor,它用于启动Reactor 线程,类型为ThreadPerTaskExecutor

过后笔者向大家卖了一个关子~~“Reactor 线程是何时启动的?”

那么当初就到了为大家揭晓谜底的时候了~~

Reactor 线程 的启动是在向 Reactor 提交第一个异步工作的时候启动的。

Netty 中的主 Reactor 线程组 NioEventLoopGroup 中的 Main ReactorNioEventLoop是在用户程序 Main 线程Main Reactor提交用于注册 NioServerSocketChannel 的异步工作时开始启动。

   eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {register0(promise);
                        }
                    });

接下来咱们关注下 NioEventLoopexecute 办法

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    @Override
    public void execute(Runnable task) {ObjectUtil.checkNotNull(task, "task");
        execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
    }

    private void execute(Runnable task, boolean immediate) {
        // 以后线程是否为 Reactor 线程
        boolean inEventLoop = inEventLoop();
        //addTaskWakesUp = true  addTask 唤醒 Reactor 线程执行工作
        addTask(task);
        if (!inEventLoop) {
            // 如果以后线程不是 Reactor 线程,则启动 Reactor 线程
            // 这里能够看出 Reactor 线程的启动是通过 向 NioEventLoop 增加异步工作时启动的
            startThread();

            ..................... 省略.....................
        }
        ..................... 省略.....................
    }

}
  • 首先将异步工作 task 增加到 Reactor 中的 taskQueue 中。
  • 判断以后线程是否为 Reactor 线程,此时以后执行线程为用户程序启动线程,所以这里调用startThread 启动Reactor 线程

1.3.4 startThread

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    // 定义 Reactor 线程状态
    private static final int ST_NOT_STARTED = 1;
    private static final int ST_STARTED = 2;
    private static final int ST_SHUTTING_DOWN = 3;
    private static final int ST_SHUTDOWN = 4;
    private static final int ST_TERMINATED = 5;

     //Reactor 线程状态  初始为 未启动状态
    private volatile int state = ST_NOT_STARTED;

    //Reactor 线程状态字段 state 原子更新器
    private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
    AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");

    private void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {doStartThread();
                    success = true;
                } finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }

}
  • Reactor 线程 初始化状态为 ST_NOT_STARTED , 首先CAS 更新状态为ST_STARTED
  • doStartThread 启动Reactor 线程
  • 启动失败的话,须要将 Reactor 线程 状态改回ST_NOT_STARTED
    //ThreadPerTaskExecutor 用于启动 Reactor 线程
    private final Executor executor;

    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {thread = Thread.currentThread();
                if (interrupted) {thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    //Reactor 线程开始启动
                    SingleThreadEventExecutor.this.run();
                    success = true;
                }
              
                ................ 省略..............
        }

这里就来到了 ThreadPerTaskExecutor 类型的 executor 的用武之地了。

  • Reactor 线程 的外围工作之前介绍过:轮询所有注册其上的 Channel 中的 IO 就绪事件 解决对应 Channel 上的 IO 事件 执行异步工作 。Netty 将这些外围工作封装在io.netty.channel.nio.NioEventLoop#run 办法中。
  • NioEventLoop#run 封装在异步工作中,提交给 executor 执行,Reactor线程至此开始工作了就。
public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    @Override
    public void execute(Runnable command) {
        // 启动 Reactor 线程
        threadFactory.newThread(command).start();}
}

此时 Reactor 线程 曾经启动,前面的工作全副都由这个 Reactor 线程 来负责执行了。

而用户启动线程在向 Reactor 提交完 NioServerSocketChannel 的注册工作 register0 后,就逐渐退出调用堆栈,回退到最开始的启动入口处ChannelFuture f = b.bind(PORT).sync()

此时 Reactor 中的工作队列中只有一个工作 register0Reactor 线程 启动后,会从工作队列中取出工作执行。

至此 NioServerSocketChannel 的注册工作正式拉开帷幕~~

1.3.5 register0

       //true if the channel has never been registered, false otherwise 
        private boolean neverRegistered = true;

        private void register0(ChannelPromise promise) {
            try {
                // 查看注册操作是否曾经勾销,或者对应 channel 曾经敞开
                if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}
                boolean firstRegistration = neverRegistered;
                // 执行真正的注册操作
                doRegister();
                // 批改注册状态
                neverRegistered = false;
                registered = true;
                // 回调 pipeline 中增加的 ChannelInitializer 的 handlerAdded 办法,在这里初始化 channelPipeline
                pipeline.invokeHandlerAddedIfNeeded();
                // 设置 regFuture 为 success,触发 operationComplete 回调, 将 bind 操作放入 Reactor 的工作队列中,期待 Reactor 线程执行。safeSetSuccess(promise);
                // 触发 channelRegister 事件
                pipeline.fireChannelRegistered();
                // 对于服务端 ServerSocketChannel 来说 只有绑定端口地址胜利后 channel 的状态才是 active 的。// 此时绑定操作作为异步工作在 Reactor 的工作队列中,绑定操作还没开始,所以这里的 isActive()是 false
                if (isActive()) {if (firstRegistration) {
                        // 触发 channelActive 事件
                        pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();
                    }
                }
            } catch (Throwable t) {............ 省略.............}
        }

register0是驱动整个 Channel 注册绑定流程的要害办法,上面咱们来看下它的外围逻辑:

  • 首先须要查看 Channel 的注册动作是否在 Reactor 线程 外被勾销了曾经 !promise.setUncancellable()。查看要注册的Channel 是否曾经敞开 !ensureOpen(promise)。如果Channel 曾经敞开或者注册操作曾经被勾销,那么就间接返回,进行注册流程。
  • 调用 doRegister() 办法,执行真正的注册操作。最终实现在 AbstractChannel 的子类 AbstractNioChannel 中,这个咱们一会在介绍,先关注整体流程。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

   /**
     * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
     *
     * Sub-classes may override this method
     */
    protected void doRegister() throws Exception {// NOOP}

}
  • ChannelReactor注册结束后,调用 pipeline.invokeHandlerAddedIfNeeded() 办法,触发回调 pipeline 中增加的 ChannelInitializer 的 handlerAdded 办法,在 handlerAdded 办法中利用后面提到的 ChannelInitializer 初始化ChannelPipeline

初始化 ChannelPipeline 的机会是当 Channel 向对应的 Reactor 注册胜利后,在 handlerAdded 事件回调 中利用 ChannelInitializer 进行初始化。

  • 设置 regFutureSuccess,并回调注册在 regFuture 上的 ChannelFutureListener#operationComplete 办法,在 operationComplete 回调办法中将 绑定操作 封装成异步工作,提交到 ReactortaskQueue中。期待 Reactor 的执行。

还记得这个 regFuture 在哪里呈现的吗?它是在哪里被创立,又是在哪里增加的 ChannelFutureListener 呢?大家还有印象吗?回顾不起来也没关系,笔者前面还会提到

  • 通过 pipeline.fireChannelRegistered()pipeline中触发channelRegister 事件

pipelinechannelHandlerchannelRegistered 办法 被回调。

  • 对于 Netty 服务端 NioServerSocketChannel 来说,只有 绑定端口地址胜利 后 channel 的状态才是 active 的。此时 绑定操作 regFuture上注册的 ChannelFutureListener#operationComplete 回调办法中被作为异步工作提交到了 Reactor 的工作队列中,Reactor 线程 没开始 执行 绑定工作 。所以这里的isActive()false

Reactor 线程 执行完 register0 办法 后,才会去执行 绑定工作

上面咱们来看下 register0 办法中这些 外围步骤 的具体实现:

1.3.6 doRegister()

public abstract class AbstractNioChannel extends AbstractChannel {

    //channel 注册到 Selector 后取得的 SelectKey
    volatile SelectionKey selectionKey;

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {............... 省略....................}
        }
    }

}

调用底层 JDK NIO Channel 办法 java.nio.channels.SelectableChannel#register(java.nio.channels.Selector, int, java.lang.Object),将 NettyNioServerSocketChannel 中包装的 JDK NIO ServerSocketChannel 注册到 Reactor 中的 JDK NIO Selector 上。

简略介绍下 SelectableChannel#register 办法参数的含意:

  • Selector:示意 JDK NIO Channel 将要向哪个 Selector 进行注册。
  • int ops: 示意 Channel 上感兴趣的 IO 事件,当对应的IO 事件就绪 时,Selector会返回 Channel 对应的SelectionKey

SelectionKey能够了解为 ChannelSelector上的非凡示意模式,SelectionKey中封装了 Channel 感兴趣的 IO 事件汇合~~~interestOps,以及IO 就绪的事件汇合~~readyOps,同时也封装了对应的JDK NIO Channel 以及注册的 Selector。最初还有一个重要的属性attachment,能够容许咱们在SelectionKey 上附加一些自定义的对象。

  • Object attachment:SelectionKey 中增加用户自定义的附加对象。

这里 NioServerSocketChannelReactor中的 Selector 注册的 IO 事件0,这个操作的次要目标是先获取到 ChannelSelector中对应的 SelectionKey,实现注册。当绑定操作实现后,在去向SelectionKey 增加感兴趣的IO 事件~~~OP_ACCEPT 事件

同时通过 SelectableChannel#register 办法将 Netty 自定义的 NioServerSocketChannel(这里的this 指针)附着在 SelectionKeyattechment属性上,实现 Netty 自定义 Channel 与 JDK NIO Channel的关系绑定 。这样在每次对Selector 进行 IO 就绪事件 轮询时,Netty 都能够从 JDK NIO Selector返回的 SelectionKey 中获取到自定义的 Channel 对象(这里指的就是NioServerSocketChannel)。

1.3.7 HandlerAdded 事件回调中初始化 ChannelPipeline

NioServerSocketChannel 注册到 Main Reactor 上的 Selector 后,Netty 通过调用 pipeline.invokeHandlerAddedIfNeeded() 开始回调 NioServerSocketChannelpipeline里的 ChannelHandler 的handlerAdded 办法

此时 NioServerSocketChannelpipeline构造如下:

此时 pipeline 中只有在初始化 NioServerSocketChannel 时增加的ChannelInitializer

咱们来看下 ChannelInitializerhandlerAdded 回调办法 具体作了哪些事件~~

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isRegistered()) {if (initChannel(ctx)) {
                // 初始化工作实现后,须要将本身从 pipeline 中移除
                removeState(ctx);
            }
        }
    }

    //ChannelInitializer 实例是被所有的 Channel 共享的,用于初始化 ChannelPipeline
    // 通过 Set 汇合保留曾经初始化的 ChannelPipeline,防止反复初始化同一 ChannelPipeline
    private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(new ConcurrentHashMap<ChannelHandlerContext, Boolean>());

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.add(ctx)) { // Guard against re-entrance.
            try {initChannel((C) ctx.channel());
            } catch (Throwable cause) {exceptionCaught(ctx, cause);
            } finally {ChannelPipeline pipeline = ctx.pipeline();
                if (pipeline.context(this) != null) {
                     // 初始化结束后,从 pipeline 中移除本身
                    pipeline.remove(this);
                }
            }
            return true;
        }
        return false;
    }

    // 匿名类实现,这里指定具体的初始化逻辑
    protected abstract void initChannel(C ch) throws Exception;

    private void removeState(final ChannelHandlerContext ctx) {
        // 从 initMap 防重 Set 汇合中删除 ChannelInitializer
        if (ctx.isRemoved()) {initMap.remove(ctx);
        } else {ctx.executor().execute(new Runnable() {
                @Override
                public void run() {initMap.remove(ctx);
                }
            });
        }
    }
}

ChannelInitializer 中的初始化逻辑比拟简单明了:

  • 首先要判断必须是以后 Channel 曾经实现注册后,才能够进行 pipeline 的初始化。ctx.channel().isRegistered()
  • 调用 ChannelInitializer 的匿名类指定的 initChannel 执行自定义的初始化逻辑。
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {final ChannelPipeline pipeline = ch.pipeline();
                //ServerBootstrap 中用户指定的 channelHandler
                ChannelHandler handler = config.handler();
                if (handler != null) {pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });

还记得在初始化 NioServerSocketChannel 时。io.netty.bootstrap.ServerBootstrap#init办法中向 pipeline 中增加的 ChannelInitializer 吗?

  • 当执行完 initChannel 办法 后,ChannelPipeline的初始化就完结了,此时 ChannelInitializer 就没必要再持续呆在 pipeline 中了,所须要将ChannelInitializer pipeline中删除。pipeline.remove(this)

当初始化完 pipeline 时,此时 pipeline 的构造再次发生了变动:

此时 Main Reactor 中的工作队列 taskQueue 构造变动为:

增加 ServerBootstrapAcceptor 的工作是在初始化 NioServerSocketChannel 的时候向 main reactor 提交过来的。还记得吗?

1.3.8 回调 regFuture 的 ChannelFutureListener

在本大节《Netty 服务端的启动》的最开始,咱们介绍了服务端启动的入口函数 io.netty.bootstrap.AbstractBootstrap#doBind,在函数的最结尾调用了initAndRegister() 办法用来创立并初始化 NioServerSocketChannel,之后便会将NioServerSocketChannel 注册到 Main Reactor 中。

注册的操作是一个异步的过程,所以在 initAndRegister() 办法调用后返回一个代表注册后果的ChannelFuture regFuture

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {private ChannelFuture doBind(final SocketAddress localAddress) {
        // 异步创立,初始化,注册 ServerSocketChannel
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {return regFuture;}

        if (regFuture.isDone()) {
            // 如果注册实现,则进行绑定操作
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            // 增加注册实现 回调函数
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {

                         ............... 省略...............
                          // 注册实现后,Reactor 线程回调这里
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
}

之后会向 ChannelFuture regFuture 增加一个 注册实现后的回调函数~~~~ ChannelFutureListener 。在回调函数 operationComplete 中开始发动 绑端口地址流程

那么这个回调函数在什么时候?什么中央发动的呢??

让咱们在回到本大节的主题 register0 办法的流程中:

当调用 doRegister() 办法实现 NioServerSocketChannelMain Reactor的注册后,紧接着会调用 pipeline.invokeHandlerAddedIfNeeded() 办法中触发 ChannelInitializer#handlerAdded 回调中对 pipeline 进行初始化。

最初在 safeSetSuccess 办法中,开始回调注册在 regFuture 上的ChannelFutureListener

   protected final void safeSetSuccess(ChannelPromise promise) {if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
        }
   }

   @Override
    public boolean trySuccess() {return trySuccess(null);
    }

    @Override
    public boolean trySuccess(V result) {return setSuccess0(result);
    }

   private boolean setSuccess0(V result) {return setValue0(result == null ? SUCCESS : result);
    }

    private boolean setValue0(Object objResult) {if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {if (checkNotifyWaiters()) {
                // 回调注册在 promise 上的 listeners
                notifyListeners();}
            return true;
        }
        return false;
    }

safeSetSuccess 的逻辑比较简单,首先设置 regFuture 后果为 success,并且回调注册在regFuture 上的ChannelFutureListener

须要揭示的是,执行 safeSetSuccess 办法,以及后边回调 regFuture 上的 ChannelFutureListener 这些动作都是由 Reactor 线程 执行的。

对于 Netty 中的 Promise 模型 后边我会在写一篇专门的文章进行剖析,这里大家只需分明大体的流程即可。不用在意过多的细节。

上面咱们把视角切换到 regFuture 上的 ChannelFutureListener 回调中,看看在 Channel 注册实现后,Netty 又会做哪些事件?

2. doBind0

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {promise.setFailure(regFuture.cause());
                }
            }
        });
    }

}

这里 Netty 又将 绑定端口地址 的操作封装成异步工作,提交给 Reactor 执行。

然而这里有一个问题,其实此时执行 doBind0 办法的线程正是 Reactor 线程,那为什么不间接在这里去执行bind 操作,而是再次封装成异步工作提交给Reactor 中的 taskQueue 呢?

反正最终都是由 Reactor 线程 执行,这其中又有什么别离呢?

通过上大节的介绍咱们晓得,bind0办法的调用是由 io.netty.channel.AbstractChannel.AbstractUnsafe#register0 办法在将 NioServerSocketChannel 注册到 Main Reactor 之后,并且 NioServerSocketChannelpipeline曾经初始化结束后,通过 safeSetSuccess 办法回调过去的。

这个过程全程是由 Reactor 线程 来负责执行的,然而此时 register0 办法并没有执行结束,还须要执行前面的逻辑。

而绑定逻辑须要在注册逻辑执行完之后执行 ,所以在doBind0 办法中 Reactor 线程 会将 绑定操作 封装成异步工作先提交给 taskQueue 中保留,这样能够使 Reactor 线程 立马从 safeSetSuccess 中返回,继续执行剩下的 register0 办法逻辑。

        private void register0(ChannelPromise promise) {
            try {
                ................ 省略............

                doRegister();
                pipeline.invokeHandlerAddedIfNeeded();
                safeSetSuccess(promise);
                // 触发 channelRegister 事件
                pipeline.fireChannelRegistered();

                if (isActive()) {................ 省略............}
            } catch (Throwable t) {................ 省略............}
        }

Reactor 线程 执行完 register0 办法后,就会从 taskQueue 中取出异步工作执行。

此时 Reactor 线程 中的 taskQueue 构造如下:

  • Reactor 线程 会先取出位于 taskQueue 队首的工作执行,这里是指向 NioServerSocketChannelpipeline中增加 ServerBootstrapAcceptor 的异步工作。

此时 NioServerSocketChannelpipeline的构造如下:

  • Reactor 线程 执行绑定工作。

3. 绑定端口地址

Channel 的操作行为全副定义在ChannelOutboundInvoker 接口中

public interface ChannelOutboundInvoker {

    /**
     * Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
     * completes, either because the operation was successful or because of an error.
     *
     */
    ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
}

bind办法由子类 AbstractChannel 实现。

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

   @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise);
    }

}

调用 pipeline.bind(localAddress, promise)pipeline中流传 bind 事件,触发回调pipeline 中所有 ChannelHandlerbind 办法

事件在 pipeline 中的流传具备方向性:

  • inbound 事件 HeadContext开始一一向后流传直到TailContext
  • outbound 事件 则是反向流传,从 TailContext 开始反向向前流传直到HeadContext

inbound 事件 只能被 pipeline 中的 ChannelInboundHandler 响应解决
outbound 事件 只能被 pipeline 中的 ChannelOutboundHandler 响应解决

然而这里的 bind 事件 在 Netty 中被定义为 outbound 事件,所以它在pipeline 中是反向流传。先从 TailContext 开始反向流传直到HeadContext

然而 bind 的外围逻辑也正是实现在 HeadContext 中。

3.1 HeadContext

  final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

     @Override
        public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
            // 触发 AbstractChannel->bind 办法 执行 JDK NIO SelectableChannel 执行底层绑定操作
            unsafe.bind(localAddress, promise);
        }

}

HeadContext#bind 回调办法中,调用 Channel 里的 unsafe 操作类执行真正的绑定操作。

protected abstract class AbstractUnsafe implements Unsafe {

      @Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            ................. 省略................

            // 这时 channel 还未激活  wasActive = false
            boolean wasActive = isActive();
            try {
                //io.netty.channel.socket.nio.NioServerSocketChannel.doBind
                // 调用具体 channel 实现类
                doBind(localAddress);
            } catch (Throwable t) {
                ................. 省略................
                return;
            }

            // 绑定胜利后 channel 激活 触发 channelActive 事件流传
            if (!wasActive && isActive()) {invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        //pipeline 中触发 channelActive 事件
                        pipeline.fireChannelActive();}
                });
            }
            // 回调注册在 promise 上的 ChannelFutureListener
            safeSetSuccess(promise);
        }

        protected abstract void doBind(SocketAddress localAddress) throws Exception;
}
  • 首先执行子类 NioServerSocketChannel 具体实现的 doBind 办法,通过 JDK NIO 原生 ServerSocketChannel 执行底层的绑定操作。
    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        // 调用 JDK NIO 底层 SelectableChannel 执行绑定操作
        if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());
        } else {javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }
  • 判断是否为首次绑定,如果是的话将 触发 pipeline 中的 ChannelActive 事件 封装成异步工作放入 Reactor 中的 taskQueue 中。
  • 执行 safeSetSuccess(promise), 回调注册在promise 上的ChannelFutureListener

还是同样的问题,以后执行线程曾经是 Reactor 线程 了,那么为何不间接触发 pipeline 中的 ChannelActive 事件而是又封装成异步工作呢??

因为如果间接在这里触发 ChannelActive 事件,那么Reactor 线程 就会去执行 pipeline 中的 ChannelHandlerchannelActive 事件回调

这样的话就影响了 safeSetSuccess(promise) 的执行,提早了 注册在 promise 上的 ChannelFutureListener 的回调。

到当初为止,Netty 服务端就曾经实现了绑定端口地址的操作,NioServerSocketChannel的状态当初变为Active

最初还有一件重要的事件要做,咱们接着来看 pipeline 中对 channelActive 事件 解决。

3.2 channelActive 事件处理

channelActive 事件 在 Netty 中定义为 inbound 事件,所以它在pipeline 中的流传为正向流传,从 HeadContext 始终到 TailContext 为止。

channelActive 事件 回调中须要触发向 Selector 指定须要监听的IO 事件~~OP_ACCEPT 事件

这块的逻辑次要在 HeadContext 中实现。

    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            //pipeline 中持续向后流传 channelActive 事件
            ctx.fireChannelActive();
            // 如果是 autoRead 则主动触发 read 事件流传
            // 在 read 回调函数中 触发 OP_ACCEPT 注册
            readIfIsAutoRead();}

        private void readIfIsAutoRead() {if (channel.config().isAutoRead()) {
                // 如果是 autoRead 则触发 read 事件流传
                channel.read();}
        }

        //AbstractChannel
        public Channel read() {
                // 触发 read 事件
                pipeline.read();
                return this;
        }

       @Override
        public void read(ChannelHandlerContext ctx) {
            // 触发注册 OP_ACCEPT 或者 OP_READ 事件
            unsafe.beginRead();}
   }
  • HeadContext 中的 channelActive 回调中触发 pipeline 中的read 事件
  • read 事件 再次流传到 HeadContext 时,触发 HeadContext#read 办法的回调。在 read 回调 中调用 channel 底层操作类 unsafebeginRead办法向 selector 注册监听OP_ACCEPT 事件

3.3 beginRead

protected abstract class AbstractUnsafe implements Unsafe {

     @Override
        public final void beginRead() {assertEventLoop();
            //channel 必须是 Active
            if (!isActive()) {return;}

            try {
                // 触发在 selector 上注册 channel 感兴趣的监听事件
                doBeginRead();} catch (final Exception e) {............. 省略..............}
        }
}

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    // 子类负责继承实现
    protected abstract void doBeginRead() throws Exception;}
  • 断言判断执行该办法的线程必须是Reactor 线程
  • 此时 NioServerSocketChannel 曾经实现端口地址的绑定操作,isActive() = true
  • 调用 doBeginRead 实现向 Selector 注册监听事件OP_ACCEPT
public abstract class AbstractNioChannel extends AbstractChannel {

    //channel 注册到 Selector 后取得的 SelectKey
    volatile SelectionKey selectionKey;
    // Channel 监听事件汇合
    protected final int readInterestOp;

    @Override
    protected void doBeginRead() throws Exception {
      
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {return;}

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        /**
         * 1:ServerSocketChannel 初始化时 readInterestOp 设置的是 OP_ACCEPT 事件
         * */
        if ((interestOps & readInterestOp) == 0) {
            // 增加 OP_ACCEPT 事件到 interestOps 汇合中
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
}
  • 前边提到在 NioServerSocketChannel 在向 Main Reactor 中的 Selector 注册后,会取得一个SelectionKey。这里首先要获取这个SelectionKey
  • SelectionKey 中获取 NioServerSocketChannel 感兴趣的 IO 事件汇合 interestOps ,过后在注册的时候interestOps 设置为0
  • 将在 NioServerSocketChannel 初始化时设置的 readInterestOp = OP_ACCEPT,设置到SelectionKey 中的 interestOps 汇合中。这样 Reactor 中的 Selector 就开始监听 interestOps 汇合中蕴含的 IO 事件 了。

Main Reactor中次要监听的是OP_ACCEPT 事件

流程走到这里,Netty 服务端就真正的启动起来了,下一步就开始期待接管客户端连贯了。大家此刻在来回看这副启动流程图,是不是清晰了很多呢?

此时 Netty 的 Reactor 模型 构造如下:


总结

本文咱们通过图解源码的形式残缺地介绍了整个 Netty 服务端启动流程,并介绍了在启动过程中波及到的 ServerBootstrap 相干的属性以及配置形式。NioServerSocketChannel 的创立初始化过程以及类的继承构造。

其中重点介绍了 NioServerSocketChannel Reactor的注册过程以及 Reactor 线程 的启动机会和 pipeline 的初始化机会。

最初介绍了 NioServerSocketChannel 绑定端口地址的整个流程。

上述介绍的这些流程全副是异步操作,各种回调绕来绕去的,须要重复回忆下,读异步代码就是这样,须要理清各种回调之间的关系,并且时刻揭示本人以后的执行线程是什么?

好了,当初 Netty 服务端曾经启动起来,接着就该接管客户端连贯了,咱们下篇文章见~~~~

正文完
 0