前言

后面小飞曾经解说了NIONetty服务端启动,这一讲是Client的启动过程。

源码系列的文章仍旧还是遵循大白话+画图的格调来解说,本文Netty源码及当前的文章版本都基于:4.1.22.Final

本篇是以NettyClient启动为切入点,带大家一步步进入Netty源码的世界。

Client启动流程揭秘

1、探秘的入口:netty-client demo

这里用netty-exmaple中的EchoClient来作为例子:

public final class EchoClient {    public static void main(String[] args) throws Exception {        EventLoopGroup group = new NioEventLoopGroup();        try {            Bootstrap b = new Bootstrap();            b.group(group)             .channel(NioSocketChannel.class)             .option(ChannelOption.TCP_NODELAY, true)             .handler(new ChannelInitializer<SocketChannel>() {                 @Override                 public void initChannel(SocketChannel ch) throws Exception {                     ChannelPipeline p = ch.pipeline();                     p.addLast(new EchoClientHandler());                 }             });            ChannelFuture f = b.connect(HOST, PORT).sync();            f.channel().closeFuture().sync();        } finally {            group.shutdownGracefully();        }    }}

代码没有什么独特的中央,咱们上一篇文章时也梳理过Netty网络编程的一些套路,这里就不再赘述了。
(遗记的小朋友能够查看Netty系列文章中查找~)

下面的客户端代码尽管简略, 然而却展现了Netty 客户端初始化时所需的所有内容:

  • EventLoopGroupNetty服务端或者客户端,都必须指定EventLoopGroup,客户端指定的是NioEventLoopGroup
  • Bootstrap: Netty客户端启动类,负责客户端的启动和初始化过程
  • channel()类型:指定Channel的类型,因为这里是客户端,所以应用的是NioSocketChannel,服务端会应用NioServerSocketChannel
  • Handler:设置数据的处理器
  • bootstrap.connect(): 客户端连贯netty服务的办法

2、NioEventLoopGroup 流程解析

咱们先从NioEventLoopGroup开始,一行行代码解析,先看看其类构造:

下面是大抵的类构造,而 EventLoop 又继承自EventLoopGroup,所以类的大抵构造咱们可想而知。这里一些外围逻辑会在MultithreadEventExecutorGroup中,蕴含EventLoopGroup的创立和初始化操作等。

接着从NioEventLoopGroup构造方法开始看起,一步步往下跟(代码都只展现重点的局部,省去很多临时不须要关怀的代码,以下代码都遵循这个准则):

EventLoopGroup group = new NioEventLoopGroup();public NioEventLoopGroup() {    this(0);}public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);}protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);}

这里通过调用this()super()办法一路往下传递,期间会结构一些默认属性,始终传递到MultithreadEventExecutorGroup类中,接着往西看。

2.1、MultithreadEventExecutorGroup

下面构造函数有一个重要的参数传递:DEFAULT_EVENT_LOOP_THREADS,这个值默认是CPU核数 * 2

为什么要传递这个参数呢?咱们之前说过EventLoopGroup能够了解成一个线程池,MultithreadEventExecutorGroup有一个线程数组EventExecutor[] children属性,而传递过去的DEFAULT_EVENT_LOOP_THREADS就是数组的长度。

先看下MultithreadEventExecutorGroup中的构造方法:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,                                            EventExecutorChooserFactory chooserFactory, Object... args) {    if (executor == null) {        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());    }        children = new EventExecutor[nThreads];        for (int i = 0; i < nThreads; i ++) {        children[i] = newChild(executor, args);    }        // ... 省略}

这段代码执行逻辑能够了解为:

  • 通过ThreadPerTaskExecutor结构一个Executor执行器,前面会细说,外面蕴含了线程执行的execute()办法
  • 接着创立一个EventExecutor数组对象,大小为传递进来的threads数量,这个所谓的EventExecutor能够了解为咱们的EventLoop,在这个demo中就是NioEventLoop对象
  • 最初调用 newChild 办法一一初始化EventLoopGroup中的EventLoop对象

下面只是大略说了下MultithreadEventExecutorGroup中的构造方法做的事件,前面还会一个个具体开展,先不必焦急,咱们先有个整体的认知就好。

再回到MultithreadEventExecutorGroup中的构造方法入参中,有个EventExecutorChooserFactory对象,这外面是有个很亮眼的细节设计,通过它咱们来洞悉Netty的良苦用心。

2.1、亮点设计:DefaultEventExecutorChooserFactory

EventExecutorChooserFactory这个类的作用是用来抉择EventLoop执行器的,咱们晓得EventLoopGroup是一个蕴含了CPU * 2个数量的EventLoop数组对象,那每次抉择EventLoop来执行工作是抉择数组中的哪一个呢?

咱们看一下这个类的具体实现,红框中都是须要重点查看的中央:

DefaultEventExecutorChooserFactory是一个选择器工厂类,调用外面的next()办法达到一个轮询抉择的目标。

数组的长度是length,执行第n次,取数组中的哪个元素就是对length取余

持续回到代码的实现,这里的优化就是在于先通过isPowerOfTwo()办法判断数组的长度是否为2的n次幂,判断的形式很奇妙,应用val & -val == val,这里我不做过多的解释,网上还有很多判断2的n次幂的优良解法,我就不班门弄斧了。(可参考:https://leetcode-cn.com/probl...)

当然我认为这里还有更容易了解的一个算法:x & (x - 1) == 0 大家能够看上面的图就懂了,这里就不延展了:

BUT!!! 这里为什么要去殚精竭虑的判断数组的长度是2的n次幂?

不晓得小伙伴们是否还记得大明湖畔HashMap?个别咱们要求HashMap数组的长度须要是2的n次幂,因为在key值寻找数组地位的办法:(n - 1) & hash n是数组长度,这里如果数组长度是2的n次幂就能够通过位运算来晋升性能,当length为2的n次幂时上面公式是等价的:

n & (length - 1) <=> n % length

还记得下面说过,数组的长度默认都是CPU * 2,而个别服务器CPU外围数都是2、4、8、16等等,所以这一个小优化就很实用了,再认真想想,原来数组长度的初始化也是很考究的。

这里位运算的益处就是效率远远高于与运算,Netty针对于这个小细节都做了优化,真是太棒了。

2.3、线程执行器:ThreadPerTaskExecutor

接着看下ThreadPerTaskExecutor线程执行器,每次执行工作都会通过它来创立一个线程实体。

public final class ThreadPerTaskExecutor implements Executor {    private final ThreadFactory threadFactory;    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {        if (threadFactory == null) {            throw new NullPointerException("threadFactory");        }        this.threadFactory = threadFactory;    }    @Override    public void execute(Runnable command) {        threadFactory.newThread(command).start();    }}

传递进来的threadFactoryDefaultThreadFactory,这外面会结构NioEventLoop线程命名规定为nioEventLoop-1-xxx,咱们就不细看这个了。当线程执行的时候会调用execute()办法,这里会创立一个FastThreadLocalThread线程,具体看代码:

public class DefaultThreadFactory implements ThreadFactory {    @Override    public Thread newThread(Runnable r) {        Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());        return t;    }    protected Thread newThread(Runnable r, String name) {        return new FastThreadLocalThread(threadGroup, r, name);    }}

这里通过newThread()来创立一个线程,而后初始化线程对象数据,最终会调用到Thread.init()中。

2.4、EventLoop初始化

接着持续看MultithreadEventExecutorGroup构造方法:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,                                            EventExecutorChooserFactory chooserFactory, Object... args) {    children = new EventExecutor[nThreads];    for (int i = 0; i < nThreads; i ++) {        children[i] = newChild(executor, args);        // .... 省略局部代码    }}

下面代码的最初一部分是 newChild 办法, 这个是一个形象办法, 它的工作是实例化 EventLoop 对象. 咱们跟踪一下它的代码, 能够发现, 这个办法在 NioEventLoopGroup 类中实现了, 其内容很简略:

@Overrideprotected EventLoop newChild(Executor executor, Object... args) throws Exception {    return new NioEventLoop(this, executor, (SelectorProvider) args[0],        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);}NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);    if (selectorProvider == null) {        throw new NullPointerException("selectorProvider");    }    if (strategy == null) {        throw new NullPointerException("selectStrategy");    }    provider = selectorProvider;    final SelectorTuple selectorTuple = openSelector();    selector = selectorTuple.selector;    unwrappedSelector = selectorTuple.unwrappedSelector;    selectStrategy = strategy;}

其实就是实例化一个 NioEventLoop 对象, 而后返回。NioEventLoop构造函数中会保留provider和事件轮询器selector,在其父类中还会创立一个MpscQueue队列,而后保留线程执行器executor

再回过头来想一想,MultithreadEventExecutorGroup 外部保护了一个 EventExecutor[] children数组, NettyEventLoopGroup 的实现机制其实就建设在 MultithreadEventExecutorGroup 之上。

每当 Netty 须要一个 EventLoop 时, 会调用 next() 办法从EventLoopGroup数组中获取一个可用的 EventLoop对象。其中next办法的实现是通过NioEventLoopGroup.next()来实现的,就是用的下面有过解说的通过轮询算法来计算得出的。

最初总结一下整个 EventLoopGroup 的初始化过程:

  • EventLoopGroup(其实是MultithreadEventExecutorGroup) 外部保护一个类型为 EventExecutor children 数组,数组长度是nThreads
  • 如果咱们在实例化 NioEventLoopGroup 时, 如果指定线程池大小, 则 nThreads 就是指定的值, 反之是处理器外围数 * 2
  • MultithreadEventExecutorGroup 中会调用 newChild 形象办法来初始化 children 数组
  • 形象办法 newChild 是在 NioEventLoopGroup 中实现的, 它返回一个 NioEventLoop 实例.
  • NioEventLoop 属性:

    • SelectorProvider provider 属性: NioEventLoopGroup 结构器中通过 SelectorProvider.provider() 获取一个 SelectorProvider
    • Selector selector 属性: NioEventLoop 结构器中通过调用通过 selector = provider.openSelector() 获取一个 selector 对象.

2.5、NioSocketChannel

Netty中,Channel是对Socket的形象,每当Netty建设一个连贯后,都会有一个与其对应的Channel实例。

咱们在结尾的Demo中,设置了channel(NioSocketChannel.class)NioSocketChannel的类构造如下:

接着剖析代码,当咱们调用b.channel()时实际上会进入AbstractBootstrap.channel()逻辑,接着看AbstractBootstrap中代码:

public B channel(Class<? extends C> channelClass) {    if (channelClass == null) {        throw new NullPointerException("channelClass");    }    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));}public ReflectiveChannelFactory(Class<? extends T> clazz) {    if (clazz == null) {        throw new NullPointerException("clazz");    }    this.clazz = clazz;}public B channelFactory(ChannelFactory<? extends C> channelFactory) {    if (channelFactory == null) {        throw new NullPointerException("channelFactory");    }    if (this.channelFactory != null) {        throw new IllegalStateException("channelFactory set already");    }    this.channelFactory = channelFactory;    return self();}

能够看到,这里ReflectiveChannelFactory其实就是返回咱们指定的channelClass:NioSocketChannel, 而后指定AbstractBootstrap中的channelFactory = new ReflectiveChannelFactory()

2.6、Channel初始化流程

到了这一步,咱们曾经晓得NioEventLoopGroupchannel()的流程,接着来看看Channel的 初始化流程,这也是Netty客户端启动的的外围流程之一:

ChannelFuture f = b.connect(HOST, PORT).sync();

接着就开始从b.connect()为入口一步步往后跟,先看下NioSocketChannel结构的整体流程:

connet往后梳理下整体流程:

Bootstrap.connect -> Bootstrap.doResolveAndConnect -> AbstractBootstrap.initAndRegister
final ChannelFuture initAndRegister() {    Channel channel = channelFactory.newChannel();    init(channel);        ChannelFuture regFuture = config().group().register(channel);    return regFuture;}

为了更易读,这里代码都做了简化,只保留了一些重要的代码。

紧接着咱们看看channelFactory.newChannel()做了什么,这里channelFactoryReflectiveChannelFactory,咱们在下面的章节剖析过:

@Overridepublic T newChannel() {    try {        return clazz.getConstructor().newInstance();    } catch (Throwable t) {        throw new ChannelException("Unable to create Channel from class " + clazz, t);    }}

这里的clazzNioSocketChannel,同样是在下面章节讲到过,这里是调用NioSocketChannel的构造函数而后初始化一个Channel实例。

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {    public NioSocketChannel() {        this(DEFAULT_SELECTOR_PROVIDER);    }    public NioSocketChannel(SelectorProvider provider) {        this(newSocket(provider));    }    private static SocketChannel newSocket(SelectorProvider provider) {        try {            return provider.openSocketChannel();        } catch (IOException e) {            throw new ChannelException("Failed to open a socket.", e);        }    }}

这里其实也很简略,就是创立一个Java NIO SocketChannel而已,接着看看NioSocketChannel的父类还做了哪些事件,这里梳理下类的关系:

NioSocketChannel -> extends AbstractNioByteChannel -> exntends AbstractNioChannel
public abstract class AbstractNioChannel extends AbstractChannel {    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {        super(parent, ch, SelectionKey.OP_READ);    }    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {        super(parent);        ch.configureBlocking(false);    }}

这里会调用父类的结构参数,并且传递readInterestOp = SelectionKey.OP_READ:,这里还有一个很重要的点,配置 Java NIO SocketChannel 为非阻塞的,咱们之前在NIO章节的时候解说过,这里也不再赘述。

接着持续看AbstractChannel的构造函数:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {    protected AbstractChannel(Channel parent) {        this.parent = parent;        id = newId();        unsafe = newUnsafe();        pipeline = newChannelPipeline();    }}

这里创立一个ChannelId,创立一个Unsafe对象,这里的Unsafe并不是Java中的Unsafe,前面也会讲到。而后创立一个ChannelPipeline,前面也会讲到,到了这里,一个残缺的NioSocketChannel 就初始化实现了,咱们再来总结一下:

  • NettySocketChannel 会与 Java 原生的 SocketChannel 绑定在一起;
  • 会注册 Read 事件;
  • 会为每一个 Channel 调配一个 channelId
  • 会为每一个 Channel 创立一个Unsafe对象;
  • 会为每一个 Channel 调配一个 ChannelPipeline

2.7、Channel 注册流程

还是回到最下面initAndRegister办法,咱们下面都是在剖析外面newChannel的操作,这个办法是NioSocketChannel创立的一个流程,接着咱们在持续跟init()register()的过程:

 public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {    final ChannelFuture initAndRegister() {        Channel channel = channelFactory.newChannel();        init(channel);        ChannelFuture regFuture = config().group().register(channel);    }}

init()就是将一些参数optionsattrs设置到channel中,咱们重点须要看的是register办法,其调用链为:

AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register

这里最初到了unsaferegister()办法,最终调用到AbstractNioChannel.doRegister():

@Overrideprotected void doRegister() throws Exception {    boolean selected = false;    for (;;) {        selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);        return;    }}

javaChannel()就是Java NIO中的SocketChannel,这里是将SocketChannel注册到与eventLoop相关联的selector上。

最初咱们整顿一下服务启动的整体流程:

  1. initAndRegister()初始化并注册什么呢?

    • channelFactory.newChannel()
    • 通过反射创立一个 NioSocketChannel
    • Java 原生 Channel 绑定到 NettyChannel
    • 注册 Read 事件
    • Channel 调配 id
    • Channel 创立 unsafe对象
    • Channel 创立 ChannelPipeline(默认是 head<=>tail 的双向链表)
  2. init(channel)`

    • Bootstrap 中的配置设置到 Channel
  3. register(channel)

    • Channel 绑定到一个 EventLoop
    • Java 原生 Channel、NettyChannel、Selector 绑定到 SelectionKey
    • 触发 Register 相干的事件

2.8 unsafe初始化

下面有提到过在初始化Channel的过程中会创立一个Unsafe的对象,而后绑定到Channel上:

protected AbstractChannel(Channel parent) {    this.parent = parent;    id = newId();    unsafe = newUnsafe();    pipeline = newChannelPipeline();}

newUnsafe间接调用到了NioSocketChannel中的办法:

@Overrideprotected AbstractNioUnsafe newUnsafe() {    return new NioSocketChannelUnsafe();}

NioSocketChannelUnsafeNioSocketChannel中的一个外部类,而后向上还有几个父类继承,这里次要是对应到相干Java底层的Socket操作。

2.9 pipeline初始化

咱们还是回到pipeline初始化的过程,来看一下newChannelPipeline()的具体实现:

protected DefaultChannelPipeline newChannelPipeline() {    return new DefaultChannelPipeline(this);}protected DefaultChannelPipeline(Channel channel) {    this.channel = ObjectUtil.checkNotNull(channel, "channel");    succeededFuture = new SucceededChannelFuture(channel, null);    voidPromise =  new VoidChannelPromise(channel, true);    tail = new TailContext(this);    head = new HeadContext(this);    head.next = tail;    tail.prev = head;}

咱们调用 DefaultChannelPipeline 的结构器, 传入了一个 channel, 而这个 channel 其实就是咱们实例化的 NioSocketChannel

DefaultChannelPipeline 会将这个 NioSocketChannel 对象保留在channel 字段中. DefaultChannelPipeline 中, 还有两个非凡的字段, 即 headtail, 而这两个字段是一个双向链表的头和尾. 其实在 DefaultChannelPipeline 中, 保护了一个以 AbstractChannelHandlerContext 为节点的双向链表, 这个链表是 Netty 实现 Pipeline 机制的要害.

对于 DefaultChannelPipeline 中的双向链表以及它所起的作用, 咱们会在后续章节具体解说。这里只是对pipeline做个初步的意识。

HeadContext 的继承层次结构如下所示:

TailContext 的继承层次结构如下所示:

咱们能够看到, 链表中 head 是一个 ChannelOutboundHandler, 而 tail 则是一个 ChannelInboundHandler.

3.0、客户端connect过程

客户端连贯的入口办法还是在Bootstrap.connect()中,下面也剖析过一部分内容,申请的具体流程是:

Bootstrap.connect() -> AbstractChannel.coonnect() -> NioSocketChannel.doConnect()
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)            throws IOException {    try {        return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {            @Override            public Boolean run() throws IOException {                return socketChannel.connect(remoteAddress);            }        });    } catch (PrivilegedActionException e) {        throw (IOException) e.getCause();    }}

看到这里,还是用Java NIO SocketChannel发送的connect申请进行客户端连贯申请。

总结

本篇文章以一个Netty Client demo为入口,而后解析了NioEventLoopGroup创立的流程、Channel的创立和注册的流程,以及客户端发动connect的具体流程,这里对于很多细节并没有很深的深刻上来,这些会放到后续的源码剖析文章,敬请期待~