关于netty:Netty之旅三Netty服务端启动源码分析一梭子带走

3次阅读

共计 19944 个字符,预计需要花费 50 分钟才能阅读完成。

Netty 服务端启动流程源码剖析

前记

哈喽,自从上篇《Netty 之旅二:口口相传的高性能 Netty 到底是什么?》后,迟迟两周才开启明天的 Netty 源码系列。源码剖析的第一篇文章,下一篇我会分享客户端的启动过程源码剖析。通过源码的浏览,咱们将会晓得,Netty 服务端启动的调用链是十分长的,同时必定也会发现一些新的问题,随着咱们源码浏览的不断深入,置信这些问题咱们也会一一攻破。

废话不多说,间接上号!

一、从 EchoServer 示例动手

示例从哪里来?任何开源框架都会有本人的示例代码,Netty 源码也不例外,如模块 netty-example 中就包含了最常见的 EchoServer 示例,上面通过这个示例进入服务端启动流程篇章。

public final class EchoServer {static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();} else {sslCtx = null;}

        // 1. 申明 Main-Sub Reactor 模式线程池:EventLoopGroup
        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        // 创立 EchoServerHandler 对象
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            // 2. 申明服务端启动疏导器, 并设置相干属性
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // 3. 绑定端口即启动服务端,并同步期待
            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // 4. 监听服务端敞开,并阻塞期待
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();} finally {
            // 5. 优雅地敞开两个 EventLoopGroup 线程池 
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();}
    }
}
  1. [代码行 18、19]申明 Main-Sub Reactor 模式线程池:EventLoopGroup

    创立两个 EventLoopGroup 对象。其中,bossGroup 用于服务端承受客户端的连贯,workerGroup用于进行客户端的 SocketChannel 的数据读写。

    (<u> 对于 EventLoopGroup 不是本文重点所以在后续文章中进行剖析 </u>)

  2. [代码行 23-39]申明服务端启动疏导器, 并设置相干属性

    AbstractBootstrap是一个帮忙类,通过办法链(method chaining)的形式,提供了一个简略易用的形式来配置启动一个Channelio.netty.bootstrap.ServerBootstrap,实现 AbstractBootstrap 抽象类,用于 Server 的启动器实现类。io.netty.bootstrap.Bootstrap,实现 AbstractBootstrap 抽象类,用于 Client 的启动器实现类。如下类图所示:

    (<u> 在 EchoServer 示例代码中,咱们看到 ServerBootstrap groupchanneloptionchildHandler 等属性链式设置都放到对于 AbstractBootstrap 体系代码中具体介绍。</u>)

  3. [代码行 43]绑定端口即启动服务端,并同步期待

    先调用 #bind(int port) 办法,绑定端口,后调用 ChannelFuture#sync() 办法,阻塞期待胜利。对于 bind 操作就是本文要具体介绍的 ” 服务端启动流程 ”。

  4. [代码行 47]监听服务端敞开,并阻塞期待

    先调用 #closeFuture() 办法,监听服务器敞开,后调用 ChannelFuture#sync() 办法,阻塞期待胜利。留神,此处不是敞开服务器,而是 channel 的监听敞开。

  5. [代码行 51、52]优雅地敞开两个 EventLoopGroup 线程池

    finally代码块中执行阐明服务端将最终敞开,所以调用 EventLoopGroup#shutdownGracefully() 办法,别离敞开两个 EventLoopGroup 对象,终止所有线程。

二、服务启动过程

在服务启动过程的源码剖析之前,这里回顾一下咱们在通过 JDK NIO 编程在服务端启动初始的代码:

 serverSocketChannel = ServerSocketChannel.open();
 serverSocketChannel.configureBlocking(false);
 serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
 selector = Selector.open();
 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

这 5 行代码标示一个最为相熟的过程:

  • 关上serverSocketChannel
  • 配置非阻塞模式
  • channelsocket绑定监听端口
  • 创立Selector
  • serverSocketChannel 注册到 selector

前面等剖析完 Netty 的启动过程后,会对这些步骤有一个新的意识。在 EchoServer 示例中,进入 #bind(int port) 办法,AbstractBootstrap#bind()其实有多个办法,不便不同地址参数的传递,理论调用的办法是AbstractBootstrap#doBind(final SocketAddress localAddress) 办法,代码如下:

private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {return regFuture;}

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
}
  • [代码行 2]:调用 #initAndRegister() 办法,初始化并注册一个 Channel 对象。因为注册是异步的过程,所以返回一个 ChannelFuture 对象。具体解析,见「initAndRegister()」。
  • [代码行 4 -6]]:若产生异样,间接进行返回。
  • [代码行 9 -34]:因为注册是异步的过程,有可能已实现,有可能未实现。所以实现代码分成了【第 10 至 14 行】和【第 15 至 36 行】别离解决已实现和未实现的状况。

    • 外围在[第 11、29 行],调用 #doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) 办法,绑定 Channel 的端口,并注册 Channel 到 SelectionKey 中。
    • 如果异步注册对应的 ChanelFuture 未实现,则调用 ChannelFuture#addListener(ChannelFutureListener) 办法,增加监听器,在注册实现后,进行回调执行 #doBind0(...) 办法的逻辑。

通过 doBind 办法能够晓得服务端启动流程大抵如下几个步骤:

1. 创立 Channel

#doBind(final SocketAddress localAddress) 进入到initAndRegister():

 final ChannelFuture initAndRegister() {
     Channel channel = null;
     try {channel = channelFactory.newChannel();
         init(channel);
     } catch (Throwable t) {if (channel != null) {// channel can be null if newChannel crashed (eg SocketException("too many open files"))
             channel.unsafe().closeForcibly();
             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
             return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
         }
         // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
         return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
     }

     ChannelFuture regFuture = config().group().register(channel);
     if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();
         } else {channel.unsafe().closeForcibly();}
     }

     return regFuture;
}

[代码行 4]调用 ChannelFactory#newChannel() 办法,创立 Channel 对象。ChannelFactory类继承如下:

能够在 ChannelFactory 正文看到 @deprecated Use {@link io.netty.channel.ChannelFactory} instead.,这里只是包名的调整,对于继承构造不变。netty 默认应用ReflectiveChannelFactory,咱们能够看到重载办法:

@Override
public T newChannel() {
    try {return constructor.newInstance();
    } catch (Throwable t) {throw new ChannelException("Unable to create Channel from class" + constructor.getDeclaringClass(), t);
    }
}

很显著,正如其名是通过反射机制结构 Channel 对象实例的。constructor是在其构造方法初始化的:this.constructor = clazz.getConstructor();这个 clazz 按理说应该是咱们要创立的 Channel 的 Class 对象。那 Class 对象是什么呢?咱们接着看 channelFactory 是怎么初始化的。

首先在 AbstractBootstrap 找到如下代码:

@Deprecated
public B channelFactory(ChannelFactory<? extends C> channelFactory) {ObjectUtil.checkNotNull(channelFactory, "channelFactory");
    if (this.channelFactory != null) {throw new IllegalStateException("channelFactory set already");
    }

    this.channelFactory = channelFactory;
    return self();}

调用这个办法的递推向上看到:

public B channel(Class<? extends C> channelClass) {
    return channelFactory(new ReflectiveChannelFactory<C>(ObjectUtil.checkNotNull(channelClass, "channelClass")
    ));
}

这个办法正是在 EchoServerServerBootstrap链式设置时调用 .channel(NioServerSocketChannel.class) 的办法。咱们看到,channelClass就是 NioServerSocketChannel.classchannelFactory 也是以 ReflectiveChannelFactory 作为具体实例,并且将 NioServerSocketChannel.class 作为结构参数传递初始化的,所以这答复了反射机制结构的是 io.netty.channel.socket.nio.NioServerSocketChannel 对象。

持续看 NioServerSocketChannel 构造方法逻辑做了什么事件,看之前先给出 NioServerSocketChannel 类继承关系:

NioServerSocketChannelNioSocketChannel 别离对应服务端和客户端,公共父类都是 AbstractNioChannelAbstractChannel,上面介绍创立过程能够参照这个 Channel 类继承图。进入 NioServerSocketChannel 构造方法:

/**
  * Create a new instance
  */
public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

点击 newSocket 进去:

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        /**
          *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
          *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
          *
          *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
          */
        return provider.openServerSocketChannel();} catch (IOException e) {
        throw new ChannelException("Failed to open a server socket.", e);
    }
}

以上传进来的 providerDEFAULT_SELECTOR_PROVIDER即默认的 java.nio.channels.spi.SelectorProvider,[代码行 9] 就是相熟的 jdk nio 创立 ServerSocketChannel。这样newSocket(DEFAULT_SELECTOR_PROVIDER) 就返回了后果 ServerSocketChannel,回到NioServerSocketChannel()#this() 点进去:

/**
  * Create a new instance using the given {@link ServerSocketChannel}.
  */
public NioServerSocketChannel(ServerSocketChannel channel) {super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

以上 super 代表父类 AbstractNioMessageChannel 构造方法,点进去看到:

 /**
   * @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)
   */
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent, ch, readInterestOp);
}

以上 super 代表父类 AbstractNioChannel 构造方法,点进去看到:

 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);
     this.ch = ch;
     this.readInterestOp = readInterestOp;
     try {ch.configureBlocking(false);
     } catch (IOException e) {
         try {ch.close();
         } catch (IOException e2) {if (logger.isWarnEnabled()) {logger.warn("Failed to close a partially initialized socket.", e2);
             }
         }

         throw new ChannelException("Failed to enter non-blocking mode.", e);
     }
}

以上 [代码行 3] 将ServerSocketChannel保留到了 AbstractNioChannel#ch 成员变量,在下面提到的 NioServerSocketChannel 构造方法的 [代码行 6]javaChannel() 拿到的就是 ch 保留的 ServerSocketChannel 变量。

以上 [代码行 6] 就是相熟的 jdk nio 编程设置 ServerSocketChannel 非阻塞形式。这里还有 super 父类构造方法,点击进去看到:

 protected AbstractChannel(Channel parent) {
     this.parent = parent;
     id = newId();
     unsafe = newUnsafe();
     pipeline = newChannelPipeline();}

以上构造方法中:

  • parent 属性,代表父 Channel 对象。对于 NioServerSocketChannel parentnull
  • id 属性,Channel 编号对象。在构造方法中,通过调用 #newId() 办法进行创立。(<u> 这里不细开展 Problem-1</u>)
  • unsafe 属性,Unsafe 对象。因为 Channel 真正的具体操作,是通过调用对应的 Unsafe 对象施行。所以须要在构造方法中,通过调用 #newUnsafe() 办法进行创立。这里的 Unsafe 并不是咱们常说的 jdk 自带的sun.misc.Unsafe,而是 io.netty.channel.Channel#Unsafe。(<u> 这里不细开展 Problem-2</u>)
  • pipeline属性默认是 DefaultChannelPipeline 对象,赋值后在前面为 channel 绑定端口的时候会用到

通过以上创立 channel 源码过程剖析,总结的流程时序图如下:

2. 初始化 Channel

回到一开始创立 ChannelinitAndRegister()入口办法,在创立 Channel 后紧接着 init(channel) 进入初始化流程,因为是服务端初始化,所以是ServerBootstrap#init(Channel channel),代码如下:

@Override
void init(Channel channel) throws Exception {final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {setChannelOptions(channel, options, logger);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
    }
    synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}
  • [代码 3 – 6 行]: options0()办法返回的 options 保留了用户在 EchoServer 中设置自定义的可选项汇合,这样 ServerBootstrap 将配置的选项汇合,设置到了 Channel 的可选项汇合中。
  • [代码 8 – 15 行]: attrs0()办法返回的 attrs 保留了用户在 EchoServer 中设置自定义的属性汇合,这样 ServerBootstrap 将配置的属性汇合,设置到了 Channel 的属性汇合中。
  • [代码 21-28 行]:通过局部变量 currentChildOptionscurrentChildAttrs保留了用户自定义的 childOptionschildAttrs,用于[代码 43 行] ServerBootstrapAcceptor 构造方法。
  • [代码 30-47]]:创立 ChannelInitializer 对象,增加到 pipeline 中,用于后续初始化 ChannelHandler pipeline 中, 包含用户在EchoServer 配置的 LoggingHandler 和创立的创立 ServerBootstrapAcceptor 对象。

    • [代码行 34-37]:增加启动器配置的 LoggingHandlerpipeline 中。
    • [代码行 39-45]:创立 ServerBootstrapAcceptor 对象,增加到 pipeline 中。从名字上就可以看进去,ServerBootstrapAcceptor 也是一个 ChannelHandler 实现类,专门用于承受客户端的新连贯申请,把新的申请扔给某个事件循环器,咱们先不做过多剖析。咱们发现是应用EventLoop.execute 执行增加的过程,这是为什么呢?同样记录问题(<u>Problem-</u>3)
    • 须要阐明的是 pipeline 在之前介绍 Netty 外围组件的时候提到是一个蕴含ChannelHandlerContext 的双向链表,每一个 context 对于惟一一个 ChannelHandler,这里初始化后,ChannelPipeline 里就是如下一个构造:

3. 注册 Channel

初始化 Channel 一些根本配置和属性结束后,回到一开始创立 ChannelinitAndRegister()入口办法,在初始化 Channel 后紧接着 [代码行 17] ChannelFuture regFuture = config().group().register(channel); 显著这里是通过 EventLoopGroup 进入注册流程 (EventLoopGroup 体系将在后续文章解说)

EchoServer 中启动器同样通过 ServerBootstrap#group() 设置了 NioEventLoopGroup,它继承自MultithreadEventLoopGroup,所以注册流程会进入MultithreadEventLoopGroup 重载的 register(Channel channel) 办法,代码如下:

@Override
public ChannelFuture register(Channel channel) {return next().register(channel);
}

这里会调用 next() 办法抉择进去一个 EventLoop 来注册 Channel,外面实际上应用的是一个叫做 EventExecutorChooser 的货色来抉择,它实际上又有两种实现形式 ——PowerOfTwoEventExecutorChooserGenericEventExecutorChooser,实质上就是从 EventExecutor 数组中抉择一个 EventExecutor,咱们这里就是 NioEventLoop,那么,它们有什么区别呢?(<u>Problem-4: 在介绍EventLoopGroup 体系的后续文章中将会具体解说,这里简略地提一下,实质都是按数组长度取余数,不过,2 的 N 次方的模式更高效。</u>)

接着,来到 NioEventLoopregister(channel) 办法,你会不会问找不到该办法?提醒NioEventLoop 继承SingleThreadEventLoop,所以父类办法:

@Override
public ChannelFuture register(Channel channel) {return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

能够看到,先创立了一个叫做 ChannelPromise 的货色,它是 ChannelFuture 的子类。[代码行 9] 又调回了 ChannelUnsaferegister () 办法,这里第一个参数是 this,也就是 NioEventLoop,第二个参数是刚创立的 ChannelPromise

点击 AbstractUnsafe#register(EventLoop eventLoop, final ChannelPromise promise) 办法进去,代码如下:

 public final void register(EventLoop eventLoop, final ChannelPromise promise) {if (eventLoop == null) {throw new NullPointerException("eventLoop");
     }
     if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));
         return;
     }
     if (!isCompatible(eventLoop)) {
         promise.setFailure(new IllegalStateException("incompatible event loop type:" + eventLoop.getClass().getName()));
         return;
     }

     AbstractChannel.this.eventLoop = eventLoop;

     if (eventLoop.inEventLoop()) {register0(promise);
     } else {
         try {eventLoop.execute(new Runnable() {
                 @Override
                 public void run() {register0(promise);
                 }
             });
         } catch (Throwable t) {
             logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",
                 AbstractChannel.this, t);
             closeForcibly();
             closeFuture.setClosed();
             safeSetFailure(promise, t);
         }
     }
}

[代码行 15]这行代码是设置 ChanneleventLoop 属性。这行后面的代码次要是在校验传入的 eventLoop 参数非空,校验是否有注册过以及校验 ChanneleventLoop 类型是否匹配。

[代码 18、24]接着,跟踪到 AbstractUnsafe#register0(ChannelPromise promise) 办法中:

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {// This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();}
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

[代码行 9]进入 AbstractNioChannel#doRegister() 办法:

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

[代码行 5]要害一行代码,将 Java 原生 NIO Selector 与 Java 原生 NIOChannel 对象 (ServerSocketChannel) 绑定在一起,并将以后 Netty 的 Channel 通过 attachment 的模式绑定到 SelectionKey 上:

  • 调用 #unwrappedSelector() 办法,返回 Java 原生 NIO Selector 对象,而且每个 NioEventLoop Selector 唯一一对应。
  • 调用 SelectableChannel#register(Selector sel, int ops, Object att) 办法,注册 Java 原生 NIOChannel 对象到 NIO Selector 对象上。

通过以上注册 channel 源码剖析,总结流程的时序图如下:

4. 绑定端口

注册完 Channel 最初回到 AbstractBootstrap#doBind() 办法,剖析 Channel 的端口绑定逻辑。进入doBind0 代码如下:

private static void doBind0(
    final ChannelFuture regFuture, final Channel channel,
    final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {promise.setFailure(regFuture.cause());
            }
        }
    });
}
  • [代码行 7]:在后面 Channel 注册胜利的条件下,调用 EventLoop 执行 Channel 的端口绑定逻辑。然而,实际上以后线程曾经是 EventLoop所在的线程了,为何还要这样操作呢?答案在【第 5 至 6 行】的英语正文,这里作为一个问题记着(<u>Problem-5</u>)。
  • [代码行 11]:进入 AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise),同样立刻异步返回并增加ChannelFutureListener.CLOSE_ON_FAILURE 监听事件。
  • [代码行 13]:如果绑定端口之前的操作并没有胜利,天然也就不能进行端口绑定操作了,通过 promise 记录异样起因。

AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)办法如下:

 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise);
 }

pipeline是之前创立 channel 的时候创立的DefaultChannelPipeline,进入该办法:

 public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return tail.bind(localAddress, promise);
 }

[在剖析初始化流程的时候最初画一个 DefaultChannelPipeline 外部的构造,可能便于剖析前面进入 DefaultChannelPipeline 一系列 bind 办法。]

首先,tail代表 TailContext,进入AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise) 办法:

 public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
     // 省略局部代码
     final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
     EventExecutor executor = next.executor();
     if (executor.inEventLoop()) {next.invokeBind(localAddress, promise);
     } else {safeExecute(executor, new Runnable() {
             @Override
             public void run() {next.invokeBind(localAddress, promise);
             }
         }, promise, null);
     }
     return promise;
}

[代码行 3]:findContextOutbound办法里次要是执行 ctx = ctx.prev; 那么失去的 next 就是绑定 LoggingHandlercontext

[代码行 6]:进入 invokeBind(localAddress, promise) 办法并间接执行LoggingHandler#bind(this, localAddress, promise),进入后的办法如下:

 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {if (logger.isEnabled(internalLevel)) {logger.log(internalLevel, format(ctx, "BIND", localAddress));
     }
     ctx.bind(localAddress, promise);
 }

设置了 LoggingHandler 的日志根本级别为默认的 INFO 后,进行绑定操作的信息打印。接着,持续循环到 AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise) 办法执行 ctx = ctx.prev 取出 HeadContext 进入到 bind 办法:

 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {unsafe.bind(localAddress, promise);
 }

兜兜转转,最终跳出了 pipeline 轮回到AbstractUnsafe#bind(final SocketAddress localAddress, final ChannelPromise promise) 办法,Channel 的端口绑定逻辑。代码如下:

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    // 此处有省略...
    boolean wasActive = isActive();
    try {doBind(localAddress);
    } catch (Throwable t) {safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
    // 此处有省略...
}

做实事办法 doBind 进入后如下:

@Override
protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());
    } else {javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

到了此处,服务端的 Java 原生 NIO ServerSocketChannel 终于绑定上了端口。

三、问题演绎

  • Problem-1: 创立 Channel 流程中 AbstractChannel 构造函数中为 channel 调配 ID 的算法如何实现?
  • Problem-2: AbstractChannel外部类 AbstractUnsafe 的作用?
  • Problem-3: 初始化 channel 流程中pipeline 增加ServerBootstrapAcceptor 是通过EventLoop.execute 执行增加的过程,这是为什么呢?
  • Problem-4:注册 channel 流程中 PowerOfTwoEventExecutorChooserGenericEventExecutorChooser 的区别和优化原理?
  • Problem-5:绑定端口流程中调用 EventLoop 执行 Channel 的端口绑定逻辑。然而,实际上以后线程曾经是 EventLoop所在的线程了,为何还要这样操作呢?

小结

通过对 Netty 服务端启动流程源码剖析,咱们发现了在应用 NIO 的模式下,服务端启动流程其实就是封装了 JDK NIO 编程在服务端启动的流程。只不过对原生 JDK NIO 进行了加强和优化,同时从架构设计上简化了服务端流程的编写。

最重要的是感激彤哥、艿艿和俞超 - 闪电侠这些大佬后期的分享,可能让更多人学习源码的旅途少走很多弯路,谢谢!

欢送关注:

正文完
 0