关于java:详细图解Netty-Reactor启动全流程-万字长文-多图预警

欢送关注公众号:bin的技术小屋,浏览公众号原文

本系列Netty源码解析文章基于 4.1.56.Final版本

大家第一眼看到这幅流程图,是不是脑瓜子嗡嗡的呢?

大家先不要惊恐,问题不大,本文笔者的目标就是要让大家清晰的了解这幅流程图,从而粗浅的了解Netty Reactor的启动全流程,包含其中波及到的各种代码设计实现细节。

在上篇文章《聊聊Netty那些事儿之Reactor在Netty中的实现(创立篇)》中咱们具体介绍了Netty服务端外围引擎组件主从Reactor组模型 NioEventLoopGroup以及Reactor模型 NioEventLoop的创立过程。最终咱们失去了netty Reactor模型的运行骨架如下:

当初Netty服务端程序的骨架是搭建好了,本文咱们就基于这个骨架来深刻分析下Netty服务端的启动过程。

咱们持续回到上篇文章提到的Netty服务端代码模板中,在创立完主从Reactor线程组:bossGroupworkerGroup后,接下来就开始配置Netty服务端的启动辅助类ServerBootstrap 了。

public final class EchoServer {
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure the server.
        //创立主从Reactor线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主从Reactor
             .channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型
             .option(ChannelOption.SO_BACKLOG, 100)//设置主Reactor中channel的option选项
             .handler(new LoggingHandler(LogLevel.INFO))//设置主Reactor中Channel->pipline->handler
             .childHandler(new ChannelInitializer<SocketChannel>() {//设置从Reactor中注册channel的pipeline
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // Start the server. 绑定端口启动服务,开始监听accept事件
            ChannelFuture f = b.bind(PORT).sync();
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

在上篇文章中咱们对代码模板中波及到ServerBootstrap 的一些配置办法做了简略的介绍,大家如果遗记的话,能够在返回去回顾一下。

ServerBootstrap类其实没有什么特地的逻辑,次要是对Netty启动过程中须要用到的一些外围信息进行配置管理,比方:

  • Netty的外围引擎组件主从Reactor线程组: bossGroup,workerGroup。通过ServerBootstrap#group办法配置。
  • Netty服务端应用到的Channel类型:NioServerSocketChannel ,通过ServerBootstrap#channel办法配置。
    以及配置NioServerSocketChannel时用到的SocketOptionSocketOption用于设置底层JDK NIO Socket的一些选项。通过ServerBootstrap#option办法进行配置。

主ReactorGroup中的MainReactor治理的Channel类型为NioServerSocketChannel,如图所示次要用来监听端口,接管客户端连贯,为客户端创立初始化NioSocketChannel,而后采纳round-robin轮询的形式从图中从ReactorGroup中抉择一个SubReactor与该客户端NioSocketChannel进行绑定。

从ReactorGroup中的SubReactor治理的Channel类型为NioSocketChannel,它是netty中定义客户端连贯的一个模型,每个连贯对应一个。如图所示SubReactor负责监听解决绑定在其上的所有NioSocketChannel上的IO事件。

  • 保留服务端NioServerSocketChannel和客户端NioSocketChannel对应pipeline中指定的ChannelHandler。用于后续Channel向Reactor注册胜利之后,初始化Channel里的pipeline。

不论是服务端用到的NioServerSocketChannel还是客户端用到的NioSocketChannel,每个Channel实例都会有一个PipelinePipeline中有多个ChannelHandler用于编排解决对应Channel上感兴趣的IO事件

ServerBootstrap构造中蕴含了netty服务端程序启动的所有配置信息,在咱们介绍启动流程之前,先来看下ServerBootstrap的源码构造:

ServerBootstrap

ServerBootstrap的继承构造比较简单,继承档次的职责分工也比拟明确。

ServerBootstrap次要负责对主从Reactor线程组相干的配置进行治理,其中带child前缀的配置办法是对从Reactor线程组的相干配置管理。从Reactor线程组中的Sub Reactor负责管理的客户端NioSocketChannel相干配置存储在ServerBootstrap构造中。

父类AbstractBootstrap则是次要负责对主Reactor线程组相干的配置进行治理,以及主Reactor线程组中的Main Reactor负责解决的服务端ServerSocketChannel相干的配置管理。

1. 配置主从Reactor线程组

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置主从Reactor
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

     //Main Reactor线程组
    volatile EventLoopGroup group;
    //Sub Reactor线程组
    private volatile EventLoopGroup childGroup;

    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        //父类治理主Reactor线程组
        super.group(parentGroup);
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
        return this;
    }

}

2. 配置服务端ServerSocketChannel

ServerBootstrap b = new ServerBootstrap();
b.channel(NioServerSocketChannel.class);
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

    //用于创立ServerSocketChannel  ReflectiveChannelFactory
    private volatile ChannelFactory<? extends C> channelFactory;

    public B channel(Class<? extends C> channelClass) {
        return channelFactory(new ReflectiveChannelFactory<C>(
                ObjectUtil.checkNotNull(channelClass, "channelClass")
        ));
    }

    @Deprecated
    public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        ObjectUtil.checkNotNull(channelFactory, "channelFactory");
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }

        this.channelFactory = channelFactory;
        return self();
    }

}

在向ServerBootstrap配置服务端ServerSocketChannelchannel 办法中,其实是创立了一个ChannelFactory工厂实例ReflectiveChannelFactory,在Netty服务端启动的过程中,会通过这个ChannelFactory去创立相应的Channel实例。

咱们能够通过这个办法来配置netty的IO模型,上面为ServerSocketChannel在不同IO模型下的实现:

BIO NIO AIO
OioServerSocketChannel NioServerSocketChannel AioServerSocketChannel

EventLoopGroup Reactor线程组在不同IO模型下的实现:

BIO NIO AIO
ThreadPerChannelEventLoopGroup NioEventLoopGroup AioEventLoopGroup

咱们只须要将IO模型的这些外围接口对应的实现类前缀改为对应IO模型的前缀,就能够轻松在Netty中实现对IO模型的切换。

2.1 ReflectiveChannelFactory

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    //NioServerSocketChannelde 结构器
    private final Constructor<? extends T> constructor;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        ObjectUtil.checkNotNull(clazz, "clazz");
        try {
            //反射获取NioServerSocketChannel的结构器
            this.constructor = clazz.getConstructor();
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                    " does not have a public non-arg constructor", e);
        }
    }

    @Override
    public T newChannel() {
        try {
            //创立NioServerSocketChannel实例
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }
}

从类的签名咱们能够看出,这个工厂类是通过泛型反射的形式来创立对应的Channel实例。

  • 泛型参数T extends Channel示意的是要通过工厂类创立的Channel类型,这里咱们初始化的是NioServerSocketChannel
  • ReflectiveChannelFactory 的结构器中通过反射的形式获取NioServerSocketChannel的结构器。
  • newChannel 办法中通过结构器反射创立NioServerSocketChannel实例。

留神这时只是配置阶段,NioServerSocketChannel此时并未被创立。它是在启动的时候才会被创立进去。

3. 为NioServerSocketChannel配置ChannelOption

ServerBootstrap b = new ServerBootstrap();
//设置被MainReactor治理的NioServerSocketChannel的Socket选项
b.option(ChannelOption.SO_BACKLOG, 100)
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    //serverSocketChannel中的ChannelOption配置
    private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();

    public <T> B option(ChannelOption<T> option, T value) {
        ObjectUtil.checkNotNull(option, "option");
        synchronized (options) {
            if (value == null) {
                options.remove(option);
            } else {
                options.put(option, value);
            }
        }
        return self();
    }
}

无论是服务端的NioServerSocketChannel还是客户端的NioSocketChannel它们的相干底层Socket选项ChannelOption配置全副寄存于一个Map类型的数据结构中。

因为客户端NioSocketChannel是由从Reactor线程组中的Sub Reactor来负责解决,所以波及到客户端NioSocketChannel所有的办法和配置全副是以child前缀结尾。

ServerBootstrap b = new ServerBootstrap();
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

   //客户端SocketChannel对应的ChannelOption配置
    private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();

    public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
        ObjectUtil.checkNotNull(childOption, "childOption");
        synchronized (childOptions) {
            if (value == null) {
                childOptions.remove(childOption);
            } else {
                childOptions.put(childOption, value);
            }
        }
        return this;
    }
}

相干的底层Socket选项,netty全副枚举在ChannelOption类中,笔者这里就不一一列举了,在本系列后续相干的文章中,笔者还会为大家具体的介绍这些参数的作用。

public class ChannelOption<T> extends AbstractConstant<ChannelOption<T>> {

    ..................省略..............

    public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST");
    public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");
    public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF");
    public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");
    public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR");
    public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER");
    public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG");
    public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT");

    ..................省略..............

}

4. 为服务端NioServerSocketChannel中的Pipeline配置ChannelHandler

    //serverSocketChannel中pipeline里的handler(次要是acceptor)
    private volatile ChannelHandler handler;

    public B handler(ChannelHandler handler) {
        this.handler = ObjectUtil.checkNotNull(handler, "handler");
        return self();
    }

NioServerSocketChannel中的Pipeline增加ChannelHandler分为两种形式:

  • 显式增加: 显式增加的形式是由用户在main线程中通过ServerBootstrap#handler的形式增加。如果须要增加多个ChannelHandler,则能够通过ChannelInitializerpipeline中进行增加。

对于ChannelInitializer前面笔者会有具体介绍,这里大家只须要晓得ChannelInitializer是一种非凡的ChannelHandler,用于初始化pipeline。实用于向pipeline中增加多个ChannelHandler的场景。

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主从Reactor
             .channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型
             .handler(new ChannelInitializer<NioServerSocketChannel>() {
                 @Override
                 protected void initChannel(NioServerSocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(channelhandler1)
                      .addLast(channelHandler2)
                      
                      ......
                     
                      .addLast(channelHandler3);
                 }
             })
  • 隐式增加:隐式增加次要增加的就是主ReactorGroup的外围组件也就是下图中的acceptor,Netty中的实现为ServerBootstrapAcceptor,实质上也是一种ChannelHandler,次要负责在客户端连贯建设好后,初始化客户端NioSocketChannel,在从Reactor线程组中选取一个Sub Reactor,将客户端NioSocketChannel 注册到Sub Reactor中的selector上。

隐式增加ServerBootstrapAcceptor是由Netty框架在启动的时候负责增加,用户无需关怀。

在本例中,NioServerSocketChannelPipeLine中只有两个ChannelHandler,一个由用户在内部显式增加的LoggingHandler,另一个是由Netty框架隐式增加的ServerBootstrapAcceptor

其实咱们在理论我的项目应用的过程中,不会向netty服务端NioServerSocketChannel增加额定的ChannelHandler,NioServerSocketChannel只须要分心做好本人最重要的本职工作接管客户端连贯就好了。这里额定增加一个LoggingHandler只是为了向大家展现ServerBootstrap的配置办法。

5. 为客户端NioSocketChannel中的Pipeline配置ChannelHandler

            final EchoServerHandler serverHandler = new EchoServerHandler();

            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {//设置从Reactor中注册channel的pipeline
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
            
                     p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });
    //socketChannel中pipeline中的解决handler
    private volatile ChannelHandler childHandler;

    public ServerBootstrap childHandler(ChannelHandler childHandler) {
        this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
        return this;
    }

向客户端NioSocketChannel中的Pipeline里增加ChannelHandler齐全是由用户本人管制显式增加,增加的数量不受限制。

因为在Netty的IO线程模型中,是由单个Sub Reactor线程负责执行客户端NioSocketChannel中的Pipeline,一个Sub Reactor线程负责解决多个NioSocketChannel上的IO事件,如果Pipeline中的ChannelHandler增加的太多,就会影响Sub Reactor线程执行其余NioSocketChannel上的Pipeline,从而升高IO解决效率,升高吞吐量。

所以Pipeline中的ChannelHandler不易增加过多,并且不能再ChannelHandler中执行耗时的业务解决工作。

在咱们通过ServerBootstrap配置netty服务端启动信息的时候,无论是向服务端NioServerSocketChannel的pipeline中增加ChannelHandler,还是向客户端NioSocketChannel的pipeline中增加ChannelHandler,当波及到多个ChannelHandler增加的时候,咱们都会用到ChannelInitializer,那么这个ChannelInitializer到底是何方圣神,为什么要这样做呢?咱们接着往下看~~

ChannelInitializer

首先ChannelInitializer它继承于ChannelHandler,它本人自身就是一个ChannelHandler,所以它能够增加到childHandler中。

其余的父类大家这里能够不必管,前面文章中笔者会一一为大家具体介绍。

那为什么不间接增加ChannelHandler而是抉择用ChannelInitializer呢?

这里次要有两点起因:

  • 前边咱们提到,客户端NioSocketChannel是在服务端accept连贯后,在服务端NioServerSocketChannel中被创立进去的。然而此时咱们正处于配置ServerBootStrap阶段,服务端还没有启动,更没有客户端连贯上来,此时客户端NioSocketChannel还没有被创立进去,所以也就没方法向客户端NioSocketChannel的pipeline中增加ChannelHandler
  • 客户端NioSocketChannelPipeline里能够增加任意多个ChannelHandler,然而Netty框架无奈预知用户到底须要增加多少个ChannelHandler,所以Netty框架提供了回调函数ChannelInitializer#initChannel,使用户能够自定义ChannelHandler的增加行为。

当客户端NioSocketChannel注册到对应的Sub Reactor上后,紧接着就会初始化NioSocketChannel中的Pipeline,此时Netty框架会回调ChannelInitializer#initChannel执行用户自定义的增加逻辑。

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    @Override
    @SuppressWarnings("unchecked")
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        //当channelRegister事件产生时,调用initChannel初始化pipeline
        if (initChannel(ctx)) {
                 .................省略...............
        } else {
                 .................省略...............
        }
    }

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) { // Guard against re-entrance.
            try {
                //此时客户单NioSocketChannel曾经创立并初始化好了
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                 .................省略...............
            } finally {
                  .................省略...............
            }
            return true;
        }
        return false;
    }

    protected abstract void initChannel(C ch) throws Exception;
    
    .................省略...............
}

这里由netty框架回调的ChannelInitializer#initChannel办法正是咱们自定义的增加逻辑。

            final EchoServerHandler serverHandler = new EchoServerHandler();

            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {//设置从Reactor中注册channel的pipeline
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
            
                     p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

到此为止,Netty服务端启动所须要的必要配置信息,曾经全副存入ServerBootStrap启动辅助类中。

接下来要做的事件就是服务端的启动了。

// Start the server. 绑定端口启动服务,开始监听accept事件
ChannelFuture f = serverBootStrap.bind(PORT).sync();

Netty服务端的启动

通过后面的铺垫终于来到了本文的核心内容—-Netty服务端的启动过程。

如代码模板中的示例所示,Netty服务端的启动过程封装在io.netty.bootstrap.AbstractBootstrap#bind(int)函数中。

接下来咱们看一下Netty服务端在启动过程中到底干了哪些事件?

大家看到这副启动流程图先不要慌,接下来的内容笔者会带大家各个击破它,在文章的最初保障让大家看懂这副流程图。

咱们先来从netty服务端启动的入口函数开始咱们明天的源码解析旅程:

    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }

    public ChannelFuture bind(SocketAddress localAddress) {
        //校验Netty外围组件是否配置齐全
        validate();
        //服务端开始启动,绑定端口地址,接管客户端连贯
        return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
    }

   private ChannelFuture doBind(final SocketAddress localAddress) {
        //异步创立,初始化,注册ServerSocketChannel到main reactor上
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {   

           ........serverSocketChannel向Main Reactor注册胜利后开始绑定端口....,               
             
        } else {
            //如果此时注册操作没有实现,则向regFuture增加operationComplete回调函数,注册胜利后回调。
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {

                   ........serverSocketChannel向Main Reactor注册胜利后开始绑定端口...., 
            });
            return promise;
        }
    }

Netty服务端的启动流程总体如下:

  • 创立服务端NioServerSocketChannel并初始化。
  • 将服务端NioServerSocketChannel注册到主Reactor线程组中。
  • 注册胜利后,开始初始化NioServerSocketChannel中的pipeline,而后在pipeline中触发channelRegister事件。
  • 随后由NioServerSocketChannel绑定端口地址。
  • 绑定端口地址胜利后,向NioServerSocketChannel对应的Pipeline中触发流传ChannelActive事件,在ChannelActive事件回调中向Main Reactor注册OP_ACCEPT事件,开始期待客户端连贯。服务端启动实现。

当netty服务端启动胜利之后,最终咱们会失去如下构造的阵型,开始严阵以待,筹备接管客户端的连贯,Reactor开始运转。

接下来,咱们就来看下Netty源码是如何实现以上步骤的~~

1. initAndRegister

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            //创立NioServerSocketChannel
            //ReflectiveChannelFactory通过泛型,反射,工厂的形式灵便创立不同类型的channel
            channel = channelFactory.newChannel();
            //初始化NioServerSocketChannel
            init(channel);
        } catch (Throwable t) {
            ..............省略.................
        }

        //向MainReactor注册ServerSocketChannel
        ChannelFuture regFuture = config().group().register(channel);

           ..............省略.................

        return regFuture;
    }

从函数命名中咱们能够看出,这个函数次要做的事件就是首先创立NioServerSocketChannel ,并对NioServerSocketChannel 进行初始化,最初将NioServerSocketChannel 注册到Main Reactor中。

1.1 创立NioServerSocketChannel

还记得咱们在介绍ServerBootstrap启动辅助类配置服务端ServerSocketChannel类型的时候提到的工厂类ReflectiveChannelFactory 吗?

因为过后咱们在配置ServerBootstrap启动辅助类的时候,还没到启动阶段,而配置阶段并不是创立具体ServerSocketChannel的机会。

所以Netty通过工厂模式将要创立的ServerSocketChannel的类型(通过泛型指定)以及 创立的过程(封装在newChannel函数中)通通先封装在工厂类ReflectiveChannelFactory中。

ReflectiveChannelFactory通过泛型反射工厂的形式灵便创立不同类型的channel

期待创立机会降临,咱们调用保留在ServerBootstrap中的channelFactory间接进行创立。

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Constructor<? extends T> constructor;

    @Override
    public T newChannel() {
        try {
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }
}

上面咱们来看下NioServerSocketChannel的构建过程:

1.1.1 NioServerSocketChannel

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

    //SelectorProvider(用于创立Selector和Selectable Channels)
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }

    //创立JDK NIO ServerSocketChannel
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }

     //ServerSocketChannel相干的配置
    private final ServerSocketChannelConfig config;

    public NioServerSocketChannel(ServerSocketChannel channel) {
        //父类AbstractNioChannel中保留JDK NIO原生ServerSocketChannel以及要监听的事件OP_ACCEPT
        super(null, channel, SelectionKey.OP_ACCEPT);
        //DefaultChannelConfig中设置用于Channel接收数据用的buffer->AdaptiveRecvByteBufAllocator
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

}
  • 首先调用newSocket 创立JDK NIO 原生ServerSocketChannel,这里调用了SelectorProvider#openServerSocketChannel 来创立JDK NIO 原生ServerSocketChannel,咱们在上篇文章《聊聊Netty那些事儿之Reactor在Netty中的实现(创立篇)》中具体的介绍了SelectorProvider相干内容,过后是用SelectorProvider来创立Reactor中的Selector。大家还记得吗??
  • 通过父类结构器设置NioServerSocketChannel感兴趣的IO事件,这里设置的是SelectionKey.OP_ACCEPT事件。并将JDK NIO 原生ServerSocketChannel封装起来。
  • 创立Channel的配置类NioServerSocketChannelConfig,在配置类中封装了对Channel底层的一些配置行为,以及JDK中的ServerSocket。以及创立NioServerSocketChannel接收数据用的Buffer分配器AdaptiveRecvByteBufAllocator

NioServerSocketChannelConfig没什么重要的货色,咱们这里也不用深究,它就是治理NioServerSocketChannel相干的配置,这里惟一须要大家留神的是这个用于Channel接收数据用的Buffer分配器AdaptiveRecvByteBufAllocator,咱们前面在介绍Netty如何接管连贯的时候还会提到。

NioServerSocketChannel 的整体构建过程介绍完了,当初咱们来依照继承档次再回过头来看下NioServerSocketChannel 的档次构建,来看下每一层都创立了什么,封装了什么,这些信息都是Channel的外围信息,所以有必要理解一下。

NioServerSocketChannel 的创立过程中,咱们次要关注继承结构图中红框标注的三个类,其余的咱们占时先不必管。

其中AbstractNioMessageChannel类次要是对NioServerSocketChannel底层读写行为的封装和定义,比方accept接管客户端连贯。这个咱们后续会介绍到,这里咱们并不开展。

1.1.2 AbstractNioChannel

public abstract class AbstractNioChannel extends AbstractChannel {
   //JDK NIO原生Selectable Channel
    private final SelectableChannel ch;
    // Channel监听事件汇合 这里是SelectionKey.OP_ACCEPT事件
    protected final int readInterestOp;

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            //设置Channel为非阻塞 配合IO多路复用模型
            ch.configureBlocking(false);
        } catch (IOException e) {
            .............省略................
        }
    }
}
  • 封装由SelectorProvider创立进去的JDK NIO原生ServerSocketChannel
  • 封装Channel在创立时指定感兴趣的IO事件,对于NioServerSocketChannel来说感兴趣的IO事件OP_ACCEPT事件
  • 设置JDK NIO原生ServerSocketChannel为非阻塞模式, 配合IO多路复用模型。

1.1.3 AbstractChannel

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    //channel是由创立档次的,比方ServerSocketChannel 是 SocketChannel的 parent
    private final Channel parent;
    //channel全局惟一ID machineId+processId+sequence+timestamp+random
    private final ChannelId id;
    //unsafe用于封装对底层socket的相干操作
    private final Unsafe unsafe;
    //为channel调配独立的pipeline用于IO事件编排
    private final DefaultChannelPipeline pipeline;

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        //channel全局惟一ID machineId+processId+sequence+timestamp+random
        id = newId();
        //unsafe用于定义实现对Channel的底层操作
        unsafe = newUnsafe();
        //为channel调配独立的pipeline用于IO事件编排
        pipeline = newChannelPipeline();
    }
}
  • Netty中的Channel创立是有档次的,这里的parent属性用来保留上一级的Channel,比方这里的NioServerSocketChannel是顶级Channel,所以它的parent = null。客户端NioSocketChannel是由NioServerSocketChannel创立的,所以它的parent = NioServerSocketChannel
  • Channel调配全局惟一的ChannelIdChannelId由机器Id(machineId),过程Id(processId),序列号(sequence),工夫戳(timestamp),随机数(random)形成
   private DefaultChannelId() {
        data = new byte[MACHINE_ID.length + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN];
        int i = 0;

        // machineId
        System.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID.length);
        i += MACHINE_ID.length;

        // processId
        i = writeInt(i, PROCESS_ID);

        // sequence
        i = writeInt(i, nextSequence.getAndIncrement());

        // timestamp (kind of)
        i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());

        // random
        int random = PlatformDependent.threadLocalRandom().nextInt();
        i = writeInt(i, random);
        assert i == data.length;

        hashCode = Arrays.hashCode(data);
    }
  • 创立NioServerSocketChannel的底层操作类Unsafe 。这里创立的是io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe

UnsafeChannel接口的一个外部接口,用于定义实现对Channel底层的各种操作,Unsafe接口定义的操作行为只能由Netty框架的Reactor线程调用,用户线程禁止调用。

interface Unsafe {
        
        //调配接收数据用的Buffer
        RecvByteBufAllocator.Handle recvBufAllocHandle();

        //服务端绑定的端口地址
        SocketAddress localAddress();
        //远端地址
        SocketAddress remoteAddress();
        //channel向Reactor注册
        void register(EventLoop eventLoop, ChannelPromise promise);

        //服务端绑定端口地址
        void bind(SocketAddress localAddress, ChannelPromise promise);
        //客户端连贯服务端
        void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
        //敞开channle
        void close(ChannelPromise promise);
        //读数据
        void beginRead();
        //写数据
        void write(Object msg, ChannelPromise promise);

    }
  • NioServerSocketChannel调配独立的pipeline用于IO事件编排。pipeline其实是一个ChannelHandlerContext类型的双向链表。头结点HeadContext,尾结点TailContextChannelHandlerContext中包装着ChannelHandler

ChannelHandlerContext 保留 ChannelHandler上下文信息,用于事件流传。前面笔者会独自开一篇文章介绍,这里咱们还是聚焦于启动主线。

这里只是为了让大家简略了解pipeline的一个大抵的构造,前面会写一篇文章专门具体解说pipeline

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }


到了这里NioServerSocketChannel就创立结束了,咱们来回顾下它到底蕴含了哪些外围信息。

1.2 初始化NioServerSocketChannel

   void init(Channel channel) {
        //向NioServerSocketChannelConfig设置ServerSocketChannelOption
        setChannelOptions(channel, newOptionsArray(), logger);
        //向netty自定义的NioServerSocketChannel设置attributes
        setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));

        ChannelPipeline p = channel.pipeline();
        
        //获取从Reactor线程组
        final EventLoopGroup currentChildGroup = childGroup;
        //获取用于初始化客户端NioSocketChannel的ChannelInitializer
        final ChannelHandler currentChildHandler = childHandler;
        //获取用户配置的客户端SocketChannel的channelOption以及attributes
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
        }
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);

        //向NioServerSocketChannel中的pipeline增加初始化ChannelHandler的逻辑
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                //ServerBootstrap中用户指定的channelHandler
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    //LoggingHandler
                    pipeline.addLast(handler);
                }
                //增加用于接管客户端连贯的acceptor
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
  • NioServerSocketChannelConfig设置ServerSocketChannelOption
  • 向netty自定义的NioServerSocketChannel设置ChannelAttributes

Netty自定义的SocketChannel类型均继承AttributeMap接口以及DefaultAttributeMap类,正是它们定义了ChannelAttributes。用于向Channel增加用户自定义的一些信息。

这个ChannelAttributes的用途大有可为,Netty后边的许多个性都是依附这个ChannelAttributes来实现的。这里先卖个关子,大家能够本人先想一下能够用这个ChannelAttributes做哪些事件?

  • 获取从Reactor线程组childGroup,以及用于初始化客户端NioSocketChannelChannelInitializer,ChannelOption,ChannelAttributes,这些信息均是由用户在启动的时候向ServerBootstrap增加的客户端NioServerChannel配置信息。这里用这些信息来初始化ServerBootstrapAcceptor。因为后续会在ServerBootstrapAcceptor中接管客户端连贯以及创立NioServerChannel
  • NioServerSocketChannel中的pipeline增加用于初始化pipelineChannelInitializer

问题来了,这里为什么不罗唆间接将ChannelHandler增加到pipeline中,而是又应用到了ChannelInitializer呢?

其实起因有两点:

  • 为了保障线程平安地初始化pipeline,所以初始化的动作须要由Reactor线程进行,而以后线程是用户程序启动Main线程不是Reactor线程。这里不能立刻初始化。
  • 初始化Channelpipeline的动作,须要等到Channel注册到对应的Reactor中才能够进行初始化,以后只是创立好了NioServerSocketChannel,但并未注册到Main Reactor上。

    初始化NioServerSocketChannelpipeline的机会是:当NioServerSocketChannel注册到Main Reactor之后,绑定端口地址之前。

前边在介绍ServerBootstrap配置childHandler时也用到了ChannelInitializer,还记得吗??

问题又来了,大家留神下ChannelInitializer#initChannel办法,在该初始化回调办法中,增加LoggingHandler是间接向pipeline中增加,而增加Acceptor为什么不是间接增加而是封装成异步工作呢?

这里先给大家卖个关子,笔者会在后续流程中为大家解答~

此时NioServerSocketChannel中的pipeline构造如下图所示:

1.3 向Main Reactor注册NioServerSocketChannel

ServerBootstrap获取主Reactor线程组NioEventLoopGroup,将NioServerSocketChannel注册到NioEventLoopGroup中。

ChannelFuture regFuture = config().group().register(channel);

上面咱们来看下具体的注册过程:

1.3.1 主Reactor线程组中选取一个Main Reactor进行注册

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

    @Override
    public EventExecutor next() {
        return chooser.next();
    }

    //获取绑定策略
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }
    
    //采纳轮询round-robin的形式抉择Reactor
    @Override
    public EventExecutor next() {
            return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
    }

Netty通过next()办法依据上篇文章《聊聊Netty那些事儿之Reactor在Netty中的实现(创立篇)》提到的channel到reactor的绑定策略,从ReactorGroup中选取一个Reactor进行注册绑定。之后Channel生命周期内的所有 IO 事件都由这个 Reactor 负责解决,如 accept、connect、read、write 等 IO 事件。

一个channel只能绑定到一个Reactor上,一个Reactor负责监听多个channel

因为这里是NioServerSocketChannleMain Reactor进行注册绑定,所以Main Reactor次要负责解决的IO事件OP_ACCEPT事件。

1.3.2 向绑定后的Main Reactor进行注册

Reactor进行注册的行为定义在NioEventLoop的父类SingleThreadEventLoop中,印象含糊的同学能够在回看下上篇文章中的NioEventLoop继承构造大节内容。

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    @Override
    public ChannelFuture register(Channel channel) {
        //注册channel到绑定的Reactor上
        return register(new DefaultChannelPromise(channel, this));
    }

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        //unsafe负责channel底层的各种操作
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
}

通过NioServerSocketChannel中的Unsafe类执行底层具体的注册动作。

protected abstract class AbstractUnsafe implements Unsafe {

        /**
         * 注册Channel到绑定的Reactor上
         * */
        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ObjectUtil.checkNotNull(eventLoop, "eventLoop");
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            //EventLoop的类型要与Channel的类型一样  Nio Oio Aio
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            //在channel上设置绑定的Reactor
            AbstractChannel.this.eventLoop = eventLoop;

            /**
             * 执行channel注册的操作必须是Reactor线程来实现
             *
             * 1: 如果以后执行线程是Reactor线程,则间接执行register0进行注册
             * 2:如果以后执行线程是内部线程,则须要将register0注册操作 封装程异步Task 由Reactor线程执行
             * */
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                   ...............省略...............
                }
            }
        }
}
  • 首先查看NioServerSocketChannel是否曾经实现注册。如果以实现注册,则间接设置代表注册操作后果的ChannelPromisefail状态
  • 通过isCompatible办法验证Reactor模型EventLoop是否与Channel的类型匹配。NioEventLoop对应于NioServerSocketChannel

上篇文章咱们介绍过 Netty对三种IO模型Oio,Nio,Aio的反对,用户能够通过扭转Netty外围类的前缀轻松切换IO模型isCompatible办法目标就是须要保障ReactorChannel应用的是同一种IO模型

  • Channel中保留其绑定的Reactor实例
  • 执行ChannelReactor注册的动作必须要确保是在Reactor线程中执行。

    • 如果以后线程是Reactor线程则间接执行注册动作register0
    • 如果以后线程不是Reactor线程,则须要将注册动作register0封装成异步工作,寄存在Reactor中的taskQueue中,期待Reactor线程执行。

以后执行线程并不是Reactor线程,而是用户程序的启动线程Main线程

1.3.3 Reactor线程的启动

上篇文章中咱们在介绍NioEventLoopGroup的创立过程中提到了一个结构器参数executor,它用于启动Reactor线程,类型为ThreadPerTaskExecutor

过后笔者向大家卖了一个关子~~“Reactor线程是何时启动的?”

那么当初就到了为大家揭晓谜底的时候了~~

Reactor线程的启动是在向Reactor提交第一个异步工作的时候启动的。

Netty中的主Reactor线程组NioEventLoopGroup中的Main ReactorNioEventLoop是在用户程序Main线程Main Reactor提交用于注册NioServerSocketChannel的异步工作时开始启动。

   eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });

接下来咱们关注下NioEventLoopexecute办法

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    @Override
    public void execute(Runnable task) {
        ObjectUtil.checkNotNull(task, "task");
        execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
    }

    private void execute(Runnable task, boolean immediate) {
        //以后线程是否为Reactor线程
        boolean inEventLoop = inEventLoop();
        //addTaskWakesUp = true  addTask唤醒Reactor线程执行工作
        addTask(task);
        if (!inEventLoop) {
            //如果以后线程不是Reactor线程,则启动Reactor线程
            //这里能够看出Reactor线程的启动是通过 向NioEventLoop增加异步工作时启动的
            startThread();

            .....................省略.....................
        }
        .....................省略.....................
    }

}
  • 首先将异步工作task增加到Reactor中的taskQueue中。
  • 判断以后线程是否为Reactor线程,此时以后执行线程为用户程序启动线程,所以这里调用startThread 启动Reactor线程

1.3.4 startThread

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    //定义Reactor线程状态
    private static final int ST_NOT_STARTED = 1;
    private static final int ST_STARTED = 2;
    private static final int ST_SHUTTING_DOWN = 3;
    private static final int ST_SHUTDOWN = 4;
    private static final int ST_TERMINATED = 5;

     //Reactor线程状态  初始为 未启动状态
    private volatile int state = ST_NOT_STARTED;

    //Reactor线程状态字段state 原子更新器
    private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
    AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");

    private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                    doStartThread();
                    success = true;
                } finally {
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }

}
  • Reactor线程初始化状态为ST_NOT_STARTED ,首先CAS更新状态为ST_STARTED
  • doStartThread 启动Reactor线程
  • 启动失败的话,须要将Reactor线程状态改回ST_NOT_STARTED
    //ThreadPerTaskExecutor 用于启动Reactor线程
    private final Executor executor;

    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    //Reactor线程开始启动
                    SingleThreadEventExecutor.this.run();
                    success = true;
                }
              
                ................省略..............
        }

这里就来到了ThreadPerTaskExecutor 类型的executor的用武之地了。

  • Reactor线程的外围工作之前介绍过:轮询所有注册其上的Channel中的IO就绪事件解决对应Channel上的IO事件执行异步工作。Netty将这些外围工作封装在io.netty.channel.nio.NioEventLoop#run办法中。
  • NioEventLoop#run封装在异步工作中,提交给executor执行,Reactor线程至此开始工作了就。
public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    @Override
    public void execute(Runnable command) {
        //启动Reactor线程
        threadFactory.newThread(command).start();
    }
}

此时Reactor线程曾经启动,前面的工作全副都由这个Reactor线程来负责执行了。

而用户启动线程在向Reactor提交完NioServerSocketChannel的注册工作register0后,就逐渐退出调用堆栈,回退到最开始的启动入口处ChannelFuture f = b.bind(PORT).sync()

此时Reactor中的工作队列中只有一个工作register0Reactor线程启动后,会从工作队列中取出工作执行。

至此NioServerSocketChannel的注册工作正式拉开帷幕~~

1.3.5 register0

       //true if the channel has never been registered, false otherwise 
        private boolean neverRegistered = true;

        private void register0(ChannelPromise promise) {
            try {
                //查看注册操作是否曾经勾销,或者对应channel曾经敞开
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                //执行真正的注册操作
                doRegister();
                //批改注册状态
                neverRegistered = false;
                registered = true;
                //回调pipeline中增加的ChannelInitializer的handlerAdded办法,在这里初始化channelPipeline
                pipeline.invokeHandlerAddedIfNeeded();
                //设置regFuture为success,触发operationComplete回调,将bind操作放入Reactor的工作队列中,期待Reactor线程执行。
                safeSetSuccess(promise);
                //触发channelRegister事件
                pipeline.fireChannelRegistered();
                //对于服务端ServerSocketChannel来说 只有绑定端口地址胜利后 channel的状态才是active的。
                //此时绑定操作作为异步工作在Reactor的工作队列中,绑定操作还没开始,所以这里的isActive()是false
                if (isActive()) {
                    if (firstRegistration) {
                        //触发channelActive事件
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                 ............省略.............
            }
        }

register0是驱动整个Channel注册绑定流程的要害办法,上面咱们来看下它的外围逻辑:

  • 首先须要查看Channel的注册动作是否在Reactor线程外被勾销了曾经!promise.setUncancellable()。查看要注册的Channel是否曾经敞开!ensureOpen(promise)。如果Channel曾经敞开或者注册操作曾经被勾销,那么就间接返回,进行注册流程。
  • 调用doRegister()办法,执行真正的注册操作。最终实现在AbstractChannel的子类AbstractNioChannel中,这个咱们一会在介绍,先关注整体流程。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

   /**
     * Is called after the {@link Channel} is registered with its {@link EventLoop} as part of the register process.
     *
     * Sub-classes may override this method
     */
    protected void doRegister() throws Exception {
        // NOOP
    }

}
  • ChannelReactor注册结束后,调用pipeline.invokeHandlerAddedIfNeeded()办法,触发回调pipeline中增加的ChannelInitializer的handlerAdded办法,在handlerAdded办法中利用后面提到的ChannelInitializer初始化ChannelPipeline

初始化ChannelPipeline的机会是当Channel向对应的Reactor注册胜利后,在handlerAdded事件回调中利用ChannelInitializer进行初始化。

  • 设置regFutureSuccess,并回调注册在regFuture上的ChannelFutureListener#operationComplete办法,在operationComplete回调办法中将绑定操作封装成异步工作,提交到ReactortaskQueue中。期待Reactor的执行。

还记得这个regFuture在哪里呈现的吗?它是在哪里被创立,又是在哪里增加的ChannelFutureListener呢? 大家还有印象吗?回顾不起来也没关系,笔者前面还会提到

  • 通过pipeline.fireChannelRegistered()pipeline中触发channelRegister事件

pipelinechannelHandlerchannelRegistered办法被回调。

  • 对于Netty服务端NioServerSocketChannel来说, 只有绑定端口地址胜利后 channel的状态才是active的。此时绑定操作regFuture上注册的ChannelFutureListener#operationComplete回调办法中被作为异步工作提交到了Reactor的工作队列中,Reactor线程没开始执行绑定工作。所以这里的isActive()false

Reactor线程执行完register0办法后,才会去执行绑定工作

上面咱们来看下register0办法中这些外围步骤的具体实现:

1.3.6 doRegister()

public abstract class AbstractNioChannel extends AbstractChannel {

    //channel注册到Selector后取得的SelectKey
    volatile SelectionKey selectionKey;

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                ...............省略....................
            }
        }
    }

}

调用底层JDK NIO Channel办法java.nio.channels.SelectableChannel#register(java.nio.channels.Selector, int, java.lang.Object),将NettyNioServerSocketChannel中包装的JDK NIO ServerSocketChannel注册到Reactor中的JDK NIO Selector上。

简略介绍下SelectableChannel#register办法参数的含意:

  • Selector:示意JDK NIO Channel将要向哪个Selector进行注册。
  • int ops: 示意Channel上感兴趣的IO事件,当对应的IO事件就绪时,Selector会返回Channel对应的SelectionKey

SelectionKey能够了解为ChannelSelector上的非凡示意模式, SelectionKey中封装了Channel感兴趣的IO事件汇合~~~interestOps,以及IO就绪的事件汇合~~readyOps, 同时也封装了对应的JDK NIO Channel以及注册的Selector。最初还有一个重要的属性attachment,能够容许咱们在SelectionKey上附加一些自定义的对象。

  • Object attachment:SelectionKey中增加用户自定义的附加对象。

这里NioServerSocketChannelReactor中的Selector注册的IO事件0,这个操作的次要目标是先获取到ChannelSelector中对应的SelectionKey,实现注册。当绑定操作实现后,在去向SelectionKey增加感兴趣的IO事件~~~OP_ACCEPT事件

同时通过SelectableChannel#register办法将Netty自定义的NioServerSocketChannel(这里的this指针)附着在SelectionKeyattechment属性上,实现Netty自定义Channel与JDK NIO Channel的关系绑定。这样在每次对Selector 进行IO就绪事件轮询时,Netty 都能够从 JDK NIO Selector返回的SelectionKey中获取到自定义的Channel对象(这里指的就是NioServerSocketChannel)。

1.3.7 HandlerAdded事件回调中初始化ChannelPipeline

NioServerSocketChannel注册到Main Reactor上的Selector后,Netty通过调用pipeline.invokeHandlerAddedIfNeeded()开始回调NioServerSocketChannelpipeline里的ChannelHandler的handlerAdded办法

此时NioServerSocketChannelpipeline构造如下:

此时pipeline中只有在初始化NioServerSocketChannel时增加的ChannelInitializer

咱们来看下ChannelInitializerhandlerAdded回调办法具体作了哪些事件~~

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            if (initChannel(ctx)) {
                //初始化工作实现后,须要将本身从pipeline中移除
                removeState(ctx);
            }
        }
    }

    //ChannelInitializer实例是被所有的Channel共享的,用于初始化ChannelPipeline
    //通过Set汇合保留曾经初始化的ChannelPipeline,防止反复初始化同一ChannelPipeline
    private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(
            new ConcurrentHashMap<ChannelHandlerContext, Boolean>());

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) { // Guard against re-entrance.
            try {
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                exceptionCaught(ctx, cause);
            } finally {
                ChannelPipeline pipeline = ctx.pipeline();
                if (pipeline.context(this) != null) {
                     //初始化结束后,从pipeline中移除本身
                    pipeline.remove(this);
                }
            }
            return true;
        }
        return false;
    }

    //匿名类实现,这里指定具体的初始化逻辑
    protected abstract void initChannel(C ch) throws Exception;

    private void removeState(final ChannelHandlerContext ctx) {
        //从initMap防重Set汇合中删除ChannelInitializer
        if (ctx.isRemoved()) {
            initMap.remove(ctx);
        } else {
            ctx.executor().execute(new Runnable() {
                @Override
                public void run() {
                    initMap.remove(ctx);
                }
            });
        }
    }
}

ChannelInitializer 中的初始化逻辑比拟简单明了:

  • 首先要判断必须是以后Channel曾经实现注册后,才能够进行pipeline的初始化。ctx.channel().isRegistered()
  • 调用ChannelInitializer 的匿名类指定的initChannel 执行自定义的初始化逻辑。
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                //ServerBootstrap中用户指定的channelHandler
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });

还记得在初始化NioServerSocketChannel时。io.netty.bootstrap.ServerBootstrap#init办法中向pipeline中增加的ChannelInitializer吗?

  • 当执行完initChannel 办法后,ChannelPipeline的初始化就完结了,此时ChannelInitializer 就没必要再持续呆在pipeline中了,所须要将ChannelInitializer pipeline中删除。pipeline.remove(this)

当初始化完pipeline时,此时pipeline的构造再次发生了变动:

此时Main Reactor中的工作队列taskQueue构造变动为:

增加ServerBootstrapAcceptor的工作是在初始化NioServerSocketChannel的时候向main reactor提交过来的。还记得吗?

1.3.8 回调regFuture的ChannelFutureListener

在本大节《Netty服务端的启动》的最开始,咱们介绍了服务端启动的入口函数io.netty.bootstrap.AbstractBootstrap#doBind,在函数的最结尾调用了initAndRegister()办法用来创立并初始化NioServerSocketChannel,之后便会将NioServerSocketChannel注册到Main Reactor中。

注册的操作是一个异步的过程,所以在initAndRegister()办法调用后返回一个代表注册后果的ChannelFuture regFuture

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    private ChannelFuture doBind(final SocketAddress localAddress) {
        //异步创立,初始化,注册ServerSocketChannel
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            //如果注册实现,则进行绑定操作
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            //增加注册实现 回调函数
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {

                         ...............省略...............
                          // 注册实现后,Reactor线程回调这里
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
}

之后会向ChannelFuture regFuture增加一个注册实现后的回调函数~~~~ ChannelFutureListener 。在回调函数operationComplete 中开始发动绑端口地址流程

那么这个回调函数在什么时候?什么中央发动的呢??

让咱们在回到本大节的主题register0 办法的流程中:

当调用doRegister()办法实现NioServerSocketChannelMain Reactor的注册后,紧接着会调用pipeline.invokeHandlerAddedIfNeeded()办法中触发ChannelInitializer#handlerAdded回调中对pipeline进行初始化。

最初在safeSetSuccess办法中,开始回调注册在regFuture 上的ChannelFutureListener

   protected final void safeSetSuccess(ChannelPromise promise) {
        if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
           logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
        }
   }

   @Override
    public boolean trySuccess() {
        return trySuccess(null);
    }

    @Override
    public boolean trySuccess(V result) {
        return setSuccess0(result);
    }

   private boolean setSuccess0(V result) {
        return setValue0(result == null ? SUCCESS : result);
    }

    private boolean setValue0(Object objResult) {
        if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
            RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
            if (checkNotifyWaiters()) {
                //回调注册在promise上的listeners
                notifyListeners();
            }
            return true;
        }
        return false;
    }

safeSetSuccess 的逻辑比较简单,首先设置regFuture后果为success,并且回调注册在regFuture上的ChannelFutureListener

须要揭示的是,执行safeSetSuccess 办法,以及后边回调regFuture上的ChannelFutureListener 这些动作都是由Reactor线程执行的。

对于Netty中的Promise模型后边我会在写一篇专门的文章进行剖析,这里大家只需分明大体的流程即可。不用在意过多的细节。

上面咱们把视角切换到regFuture上的ChannelFutureListener 回调中,看看在Channel注册实现后,Netty又会做哪些事件?

2. doBind0

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

}

这里Netty又将绑定端口地址的操作封装成异步工作,提交给Reactor执行。

然而这里有一个问题,其实此时执行doBind0办法的线程正是Reactor线程,那为什么不间接在这里去执行bind操作,而是再次封装成异步工作提交给Reactor中的taskQueue呢?

反正最终都是由Reactor线程执行,这其中又有什么别离呢?

通过上大节的介绍咱们晓得,bind0办法的调用是由io.netty.channel.AbstractChannel.AbstractUnsafe#register0办法在将NioServerSocketChannel注册到Main Reactor之后,并且NioServerSocketChannelpipeline曾经初始化结束后,通过safeSetSuccess 办法回调过去的。

这个过程全程是由Reactor线程来负责执行的,然而此时register0办法并没有执行结束,还须要执行前面的逻辑。

而绑定逻辑须要在注册逻辑执行完之后执行,所以在doBind0办法中Reactor线程会将绑定操作封装成异步工作先提交给taskQueue中保留,这样能够使Reactor线程立马从safeSetSuccess 中返回,继续执行剩下的register0办法逻辑。

        private void register0(ChannelPromise promise) {
            try {
                ................省略............

                doRegister();
                pipeline.invokeHandlerAddedIfNeeded();
                safeSetSuccess(promise);
                //触发channelRegister事件
                pipeline.fireChannelRegistered();

                if (isActive()) {
                     ................省略............
                }
            } catch (Throwable t) {
                  ................省略............
            }
        }

Reactor线程执行完register0办法后,就会从taskQueue中取出异步工作执行。

此时Reactor线程中的taskQueue构造如下:

  • Reactor线程会先取出位于taskQueue队首的工作执行,这里是指向NioServerSocketChannelpipeline中增加ServerBootstrapAcceptor的异步工作。

此时NioServerSocketChannelpipeline的构造如下:

  • Reactor线程执行绑定工作。

3. 绑定端口地址

Channel的操作行为全副定义在ChannelOutboundInvoker接口中

public interface ChannelOutboundInvoker {

    /**
     * Request to bind to the given {@link SocketAddress} and notify the {@link ChannelFuture} once the operation
     * completes, either because the operation was successful or because of an error.
     *
     */
    ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
}

bind办法由子类AbstractChannel实现。

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

   @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }

}

调用pipeline.bind(localAddress, promise)pipeline中流传bind事件,触发回调pipeline中所有ChannelHandlerbind办法

事件在pipeline中的流传具备方向性:

  • inbound事件HeadContext开始一一向后流传直到TailContext
  • outbound事件则是反向流传,从TailContext开始反向向前流传直到HeadContext

inbound事件只能被pipeline中的ChannelInboundHandler响应解决
outbound事件只能被pipeline中的ChannelOutboundHandler响应解决

然而这里的bind事件在Netty中被定义为outbound事件,所以它在pipeline中是反向流传。先从TailContext开始反向流传直到HeadContext

然而bind的外围逻辑也正是实现在HeadContext中。

3.1 HeadContext

  final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

     @Override
        public void bind(
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
            //触发AbstractChannel->bind办法 执行JDK NIO SelectableChannel 执行底层绑定操作
            unsafe.bind(localAddress, promise);
        }

}

HeadContext#bind回调办法中,调用Channel里的unsafe操作类执行真正的绑定操作。

protected abstract class AbstractUnsafe implements Unsafe {

      @Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            .................省略................

            //这时channel还未激活  wasActive = false
            boolean wasActive = isActive();
            try {
                //io.netty.channel.socket.nio.NioServerSocketChannel.doBind
                //调用具体channel实现类
                doBind(localAddress);
            } catch (Throwable t) {
                .................省略................
                return;
            }

            //绑定胜利后 channel激活 触发channelActive事件流传
            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        //pipeline中触发channelActive事件
                        pipeline.fireChannelActive();
                    }
                });
            }
            //回调注册在promise上的ChannelFutureListener
            safeSetSuccess(promise);
        }

        protected abstract void doBind(SocketAddress localAddress) throws Exception;
}
  • 首先执行子类NioServerSocketChannel具体实现的doBind办法,通过JDK NIO 原生 ServerSocketChannel执行底层的绑定操作。
    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        //调用JDK NIO 底层SelectableChannel 执行绑定操作
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }
  • 判断是否为首次绑定,如果是的话将触发pipeline中的ChannelActive事件封装成异步工作放入Reactor中的taskQueue中。
  • 执行safeSetSuccess(promise),回调注册在promise上的ChannelFutureListener

还是同样的问题,以后执行线程曾经是Reactor线程了,那么为何不间接触发pipeline中的ChannelActive事件而是又封装成异步工作呢??

因为如果间接在这里触发ChannelActive事件,那么Reactor线程就会去执行pipeline中的ChannelHandlerchannelActive事件回调

这样的话就影响了safeSetSuccess(promise)的执行,提早了注册在promise上的ChannelFutureListener的回调。

到当初为止,Netty服务端就曾经实现了绑定端口地址的操作,NioServerSocketChannel的状态当初变为Active

最初还有一件重要的事件要做,咱们接着来看pipeline中对channelActive事件解决。

3.2 channelActive事件处理

channelActive事件在Netty中定义为inbound事件,所以它在pipeline中的流传为正向流传,从HeadContext始终到TailContext为止。

channelActive事件回调中须要触发向Selector指定须要监听的IO事件~~OP_ACCEPT事件

这块的逻辑次要在HeadContext中实现。

    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            //pipeline中持续向后流传channelActive事件
            ctx.fireChannelActive();
            //如果是autoRead 则主动触发read事件流传
            //在read回调函数中 触发OP_ACCEPT注册
            readIfIsAutoRead();
        }

        private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                //如果是autoRead 则触发read事件流传
                channel.read();
            }
        }

        //AbstractChannel
        public Channel read() {
                //触发read事件
                pipeline.read();
                return this;
        }

       @Override
        public void read(ChannelHandlerContext ctx) {
            //触发注册OP_ACCEPT或者OP_READ事件
            unsafe.beginRead();
        }
   }
  • HeadContext中的channelActive回调中触发pipeline中的read事件
  • read事件再次流传到HeadContext时,触发HeadContext#read办法的回调。在read回调中调用channel底层操作类unsafebeginRead办法向selector注册监听OP_ACCEPT事件

3.3 beginRead

protected abstract class AbstractUnsafe implements Unsafe {

     @Override
        public final void beginRead() {
            assertEventLoop();
            //channel必须是Active
            if (!isActive()) {
                return;
            }

            try {
                // 触发在selector上注册channel感兴趣的监听事件
                doBeginRead();
            } catch (final Exception e) {
               .............省略..............
            }
        }
}

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    //子类负责继承实现
    protected abstract void doBeginRead() throws Exception;

}
  • 断言判断执行该办法的线程必须是Reactor线程
  • 此时NioServerSocketChannel曾经实现端口地址的绑定操作,isActive() = true
  • 调用doBeginRead实现向Selector注册监听事件OP_ACCEPT
public abstract class AbstractNioChannel extends AbstractChannel {

    //channel注册到Selector后取得的SelectKey
    volatile SelectionKey selectionKey;
    // Channel监听事件汇合
    protected final int readInterestOp;

    @Override
    protected void doBeginRead() throws Exception {
      
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        /**
         * 1:ServerSocketChannel 初始化时 readInterestOp设置的是OP_ACCEPT事件
         * */
        if ((interestOps & readInterestOp) == 0) {
            //增加OP_ACCEPT事件到interestOps汇合中
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
}
  • 前边提到在NioServerSocketChannel在向Main Reactor中的Selector注册后,会取得一个SelectionKey。这里首先要获取这个SelectionKey
  • SelectionKey中获取NioServerSocketChannel感兴趣的IO事件汇合 interestOps ,过后在注册的时候interestOps设置为0
  • 将在NioServerSocketChannel初始化时设置的readInterestOp = OP_ACCEPT,设置到SelectionKey中的interestOps 汇合中。这样Reactor中的Selector就开始监听interestOps 汇合中蕴含的IO事件了。

Main Reactor中次要监听的是OP_ACCEPT事件

流程走到这里,Netty服务端就真正的启动起来了,下一步就开始期待接管客户端连贯了。大家此刻在来回看这副启动流程图,是不是清晰了很多呢?

此时Netty的Reactor模型构造如下:


总结

本文咱们通过图解源码的形式残缺地介绍了整个Netty服务端启动流程,并介绍了在启动过程中波及到的ServerBootstrap 相干的属性以及配置形式。NioServerSocketChannel 的创立初始化过程以及类的继承构造。

其中重点介绍了NioServerSocketChannel Reactor的注册过程以及Reactor线程的启动机会和pipeline的初始化机会。

最初介绍了NioServerSocketChannel绑定端口地址的整个流程。

上述介绍的这些流程全副是异步操作,各种回调绕来绕去的,须要重复回忆下,读异步代码就是这样,须要理清各种回调之间的关系,并且时刻揭示本人以后的执行线程是什么?

好了,当初Netty服务端曾经启动起来,接着就该接管客户端连贯了,咱们下篇文章见~~~~

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理