关于netty:Netty之旅四你一定看得懂的Netty客户端启动源码分析

90次阅读

共计 13196 个字符,预计需要花费 33 分钟才能阅读完成。

前言

后面小飞曾经解说了 NIONetty服务端启动,这一讲是 Client 的启动过程。

源码系列的文章仍旧还是遵循大白话 + 画图的格调来解说,本文 Netty 源码及当前的文章版本都基于:4.1.22.Final

本篇是以 NettyClient 启动为切入点,带大家一步步进入 Netty 源码的世界。

Client 启动流程揭秘

1、探秘的入口:netty-client demo

这里用 netty-exmaple 中的 EchoClient 来作为例子:

public final class EchoClient {public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup();
        try {Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();
                     p.addLast(new EchoClientHandler());
                 }
             });

            ChannelFuture f = b.connect(HOST, PORT).sync();

            f.channel().closeFuture().sync();} finally {group.shutdownGracefully();
        }
    }
}

代码没有什么独特的中央,咱们上一篇文章时也梳理过 Netty 网络编程的一些套路,这里就不再赘述了。
(遗记的小朋友能够查看 Netty 系列文章中查找~)

下面的客户端代码尽管简略, 然而却展现了Netty 客户端初始化时所需的所有内容:

  • EventLoopGroupNetty服务端或者客户端,都必须指定EventLoopGroup,客户端指定的是NioEventLoopGroup
  • Bootstrap: Netty客户端启动类,负责客户端的启动和初始化过程
  • channel()类型:指定 Channel 的类型,因为这里是客户端,所以应用的是NioSocketChannel,服务端会应用NioServerSocketChannel
  • Handler:设置数据的处理器
  • bootstrap.connect(): 客户端连贯 netty 服务的办法

2、NioEventLoopGroup 流程解析

咱们先从 NioEventLoopGroup 开始,一行行代码解析,先看看其类构造:

下面是大抵的类构造,而 EventLoop 又继承自 EventLoopGroup,所以类的大抵构造咱们可想而知。这里一些外围逻辑会在MultithreadEventExecutorGroup 中,蕴含 EventLoopGroup 的创立和初始化操作等。

接着从 NioEventLoopGroup 构造方法开始看起,一步步往下跟(代码都只展现重点的局部,省去很多临时不须要关怀的代码,以下代码都遵循这个准则):

EventLoopGroup group = new NioEventLoopGroup();

public NioEventLoopGroup() {this(0);
}

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

这里通过调用 this()super()办法一路往下传递,期间会结构一些默认属性,始终传递到 MultithreadEventExecutorGroup 类中,接着往西看。

2.1、MultithreadEventExecutorGroup

下面构造函数有一个重要的参数传递:DEFAULT_EVENT_LOOP_THREADS,这个值默认是CPU 核数 * 2

为什么要传递这个参数呢?咱们之前说过 EventLoopGroup 能够了解成一个线程池,MultithreadEventExecutorGroup有一个线程数组 EventExecutor[] children 属性,而传递过去的 DEFAULT_EVENT_LOOP_THREADS 就是数组的长度。

先看下 MultithreadEventExecutorGroup 中的构造方法:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    
    children = new EventExecutor[nThreads];
    
    for (int i = 0; i < nThreads; i ++) {children[i] = newChild(executor, args);
    }
    
    // ... 省略
}

这段代码执行逻辑能够了解为:

  • 通过 ThreadPerTaskExecutor 结构一个 Executor 执行器,前面会细说,外面蕴含了线程执行的 execute() 办法
  • 接着创立一个 EventExecutor 数组对象,大小为传递进来的 threads 数量,这个所谓的 EventExecutor 能够了解为咱们的 EventLoop,在这个 demo 中就是NioEventLoop 对象
  • 最初调用 newChild 办法一一初始化 EventLoopGroup 中的 EventLoop 对象

下面只是大略说了下 MultithreadEventExecutorGroup 中的构造方法做的事件,前面还会一个个具体开展,先不必焦急,咱们先有个整体的认知就好。

再回到 MultithreadEventExecutorGroup 中的构造方法入参中,有个 EventExecutorChooserFactory 对象,这外面是有个很亮眼的细节设计,通过它咱们来洞悉 Netty 的良苦用心。

2.1、亮点设计:DefaultEventExecutorChooserFactory

EventExecutorChooserFactory这个类的作用是用来抉择 EventLoop 执行器的,咱们晓得 EventLoopGroup 是一个蕴含了 CPU * 2 个数量的 EventLoop 数组对象,那每次抉择 EventLoop 来执行工作是抉择数组中的哪一个呢?

咱们看一下这个类的具体实现,红框中 都是须要重点查看的中央:

DefaultEventExecutorChooserFactory是一个选择器工厂类,调用外面的 next() 办法达到一个轮询抉择的目标。

数组的长度是 length,执行第 n 次,取数组中的哪个元素就是对 length 取余

持续回到代码的实现,这里的优化就是在于先通过 isPowerOfTwo() 办法判断数组的长度是否为 2 的 n 次幂,判断的形式很奇妙,应用 val & -val == val,这里我不做过多的解释,网上还有很多判断 2 的 n 次幂的优良解法,我就不班门弄斧了。( 可参考:https://leetcode-cn.com/probl…)

当然我认为这里还有更容易了解的一个算法:x & (x - 1) == 0 大家能够看上面的图就懂了,这里就不延展了:

BUT!!! 这里为什么要去殚精竭虑的判断数组的长度是 2 的 n 次幂?

不晓得小伙伴们是否还记得 大明湖畔 HashMap?个别咱们要求 HashMap 数组的长度须要是 2 的 n 次幂,因为在 key 值寻找数组地位的办法:(n - 1) & hash n 是数组长度,这里如果数组长度是 2 的 n 次幂就能够通过位运算来晋升性能,当 length 为 2 的 n 次幂时上面公式是等价的:

n & (length – 1) <=> n % length

还记得下面说过,数组的长度默认都是CPU * 2,而个别服务器 CPU 外围数都是 2、4、8、16 等等,所以这一个小优化就很实用了,再认真想想,原来数组长度的初始化也是很考究的。

这里位运算的益处就是效率远远高于与运算,Netty针对于这个小细节都做了优化,真是太棒了。

2.3、线程执行器:ThreadPerTaskExecutor

接着看下 ThreadPerTaskExecutor 线程执行器,每次执行工作都会通过它来创立一个线程实体。

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {if (threadFactory == null) {throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {threadFactory.newThread(command).start();}
}

传递进来的 threadFactoryDefaultThreadFactory,这外面会结构 NioEventLoop 线程命名规定为 nioEventLoop-1-xxx,咱们就不细看这个了。当线程执行的时候会调用execute() 办法,这里会创立一个 FastThreadLocalThread 线程,具体看代码:

public class DefaultThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
        return t;
    }

    protected Thread newThread(Runnable r, String name) {return new FastThreadLocalThread(threadGroup, r, name);
    }
}

这里通过 newThread() 来创立一个线程,而后初始化线程对象数据,最终会调用到 Thread.init() 中。

2.4、EventLoop 初始化

接着持续看 MultithreadEventExecutorGroup 构造方法:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {children = new EventExecutor[nThreads];
    for (int i = 0; i < nThreads; i ++) {children[i] = newChild(executor, args);
        // .... 省略局部代码
    }
}

下面代码的最初一部分是 newChild 办法, 这个是一个形象办法, 它的工作是实例化 EventLoop 对象. 咱们跟踪一下它的代码, 能够发现, 这个办法在 NioEventLoopGroup 类中实现了, 其内容很简略:

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}

其实就是实例化一个 NioEventLoop 对象, 而后返回。NioEventLoop构造函数中会保留 provider 和事件轮询器selector,在其父类中还会创立一个MpscQueue 队列,而后保留线程执行器executor

再回过头来想一想,MultithreadEventExecutorGroup 外部保护了一个 EventExecutor[] children数组, NettyEventLoopGroup 的实现机制其实就建设在 MultithreadEventExecutorGroup 之上。

每当 Netty 须要一个 EventLoop 时, 会调用 next() 办法从 EventLoopGroup 数组中获取一个可用的 EventLoop对象。其中 next 办法的实现是通过 NioEventLoopGroup.next() 来实现的,就是用的下面有过解说的通过轮询算法来计算得出的。

最初总结一下整个 EventLoopGroup 的初始化过程:

  • EventLoopGroup(其实是MultithreadEventExecutorGroup) 外部保护一个类型为 EventExecutor children 数组,数组长度是nThreads
  • 如果咱们在实例化 NioEventLoopGroup 时, 如果指定线程池大小, 则 nThreads 就是指定的值, 反之是 处理器外围数 * 2
  • MultithreadEventExecutorGroup 中会调用 newChild 形象办法来初始化 children 数组
  • 形象办法 newChild 是在 NioEventLoopGroup 中实现的, 它返回一个 NioEventLoop 实例.
  • NioEventLoop 属性:

    • SelectorProvider provider 属性: NioEventLoopGroup 结构器中通过 SelectorProvider.provider() 获取一个 SelectorProvider
    • Selector selector 属性: NioEventLoop 结构器中通过调用通过 selector = provider.openSelector() 获取一个 selector 对象.

2.5、NioSocketChannel

Netty 中,Channel是对 Socket 的形象,每当 Netty 建设一个连贯后,都会有一个与其对应的 Channel 实例。

咱们在结尾的 Demo 中,设置了 channel(NioSocketChannel.class)NioSocketChannel 的类构造如下:

接着剖析代码,当咱们调用 b.channel() 时实际上会进入 AbstractBootstrap.channel() 逻辑,接着看 AbstractBootstrap 中代码:

public B channel(Class<? extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

public ReflectiveChannelFactory(Class<? extends T> clazz) {if (clazz == null) {throw new NullPointerException("clazz");
    }
    this.clazz = clazz;
}

public B channelFactory(ChannelFactory<? extends C> channelFactory) {if (channelFactory == null) {throw new NullPointerException("channelFactory");
    }
    if (this.channelFactory != null) {throw new IllegalStateException("channelFactory set already");
    }

    this.channelFactory = channelFactory;
    return self();}

能够看到,这里 ReflectiveChannelFactory 其实就是返回咱们指定的 channelClass:NioSocketChannel, 而后指定AbstractBootstrap 中的channelFactory = new ReflectiveChannelFactory()

2.6、Channel 初始化流程

到了这一步,咱们曾经晓得 NioEventLoopGroupchannel()的流程,接着来看看 Channel 的 初始化流程,这也是 Netty 客户端启动的的外围流程之一:

ChannelFuture f = b.connect(HOST, PORT).sync();

接着就开始从 b.connect() 为入口一步步往后跟,先看下 NioSocketChannel 结构的整体流程:

connet 往后梳理下整体流程:

Bootstrap.connect -> Bootstrap.doResolveAndConnect -> AbstractBootstrap.initAndRegister

final ChannelFuture initAndRegister() {Channel channel = channelFactory.newChannel();
    init(channel);
    
    ChannelFuture regFuture = config().group().register(channel);
    return regFuture;
}

为了更易读,这里代码都做了简化,只保留了一些重要的代码。

紧接着咱们看看 channelFactory.newChannel() 做了什么,这里 channelFactoryReflectiveChannelFactory,咱们在下面的章节剖析过:

@Override
public T newChannel() {
    try {return clazz.getConstructor().newInstance();} catch (Throwable t) {throw new ChannelException("Unable to create Channel from class" + clazz, t);
    }
}

这里的 clazzNioSocketChannel,同样是在下面章节讲到过,这里是调用 NioSocketChannel 的构造函数而后初始化一个 Channel 实例。

public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {public NioSocketChannel() {this(DEFAULT_SELECTOR_PROVIDER);
    }

    public NioSocketChannel(SelectorProvider provider) {this(newSocket(provider));
    }

    private static SocketChannel newSocket(SelectorProvider provider) {
        try {return provider.openSocketChannel();
        } catch (IOException e) {throw new ChannelException("Failed to open a socket.", e);
        }
    }
}

这里其实也很简略,就是创立一个 Java NIO SocketChannel 而已,接着看看 NioSocketChannel 的父类还做了哪些事件,这里梳理下类的关系:

NioSocketChannel -> extends AbstractNioByteChannel -> exntends AbstractNioChannel

public abstract class AbstractNioChannel extends AbstractChannel {protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {super(parent, ch, SelectionKey.OP_READ);
    }

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);
        ch.configureBlocking(false);
    }
}

这里会调用父类的结构参数,并且传递 readInterestOp = SelectionKey.OP_READ:,这里还有一个很重要的点,配置 Java NIO SocketChannel 为非阻塞的,咱们之前在NIO 章节的时候解说过,这里也不再赘述。

接着持续看 AbstractChannel 的构造函数:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();}
}

这里创立一个 ChannelId,创立一个Unsafe 对象,这里的 Unsafe 并不是 Java 中的 Unsafe,前面也会讲到。而后创立一个ChannelPipeline,前面也会讲到,到了这里,一个残缺的NioSocketChannel 就初始化实现了,咱们再来总结一下:

  • NettySocketChannel 会与 Java 原生的 SocketChannel 绑定在一起;
  • 会注册 Read 事件;
  • 会为每一个 Channel 调配一个 channelId
  • 会为每一个 Channel 创立一个 Unsafe 对象;
  • 会为每一个 Channel 调配一个 ChannelPipeline

2.7、Channel 注册流程

还是回到最下面 initAndRegister 办法,咱们下面都是在剖析外面 newChannel 的操作,这个办法是 NioSocketChannel 创立的一个流程,接着咱们在持续跟 init()register()的过程:

 public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {final ChannelFuture initAndRegister() {Channel channel = channelFactory.newChannel();
        init(channel);
        ChannelFuture regFuture = config().group().register(channel);
    }
}

init()就是将一些参数 optionsattrs设置到 channel 中,咱们重点须要看的是 register 办法,其调用链为:

AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register

这里最初到了 unsaferegister()办法,最终调用到AbstractNioChannel.doRegister():

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
        return;
    }
}

javaChannel()就是 Java NIO 中的 SocketChannel,这里是将SocketChannel 注册到与 eventLoop 相关联的 selector 上。

最初咱们整顿一下服务启动的整体流程:

  1. initAndRegister()初始化并注册什么呢?

    • channelFactory.newChannel()
    • 通过反射创立一个 NioSocketChannel
    • Java 原生 Channel 绑定到 NettyChannel
    • 注册 Read 事件
    • Channel 调配 id
    • Channel 创立 unsafe对象
    • Channel 创立 ChannelPipeline(默认是 head<=>tail 的双向链表)
  2. init(channel)`

    • Bootstrap 中的配置设置到 Channel
  3. register(channel)

    • Channel 绑定到一个 EventLoop
    • Java 原生 Channel、NettyChannel、Selector 绑定到 SelectionKey
    • 触发 Register 相干的事件

2.8 unsafe 初始化

下面有提到过在初始化 Channel 的过程中会创立一个 Unsafe 的对象,而后绑定到 Channel 上:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();}

newUnsafe间接调用到了 NioSocketChannel 中的办法:

@Override
protected AbstractNioUnsafe newUnsafe() {return new NioSocketChannelUnsafe();
}

NioSocketChannelUnsafeNioSocketChannel 中的一个外部类,而后向上还有几个父类继承,这里次要是对应到相干 Java 底层的 Socket 操作。

2.9 pipeline 初始化

咱们还是回到 pipeline 初始化的过程,来看一下 newChannelPipeline() 的具体实现:

protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);
}

protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

咱们调用 DefaultChannelPipeline 的结构器, 传入了一个 channel, 而这个 channel 其实就是咱们实例化的 NioSocketChannel

DefaultChannelPipeline 会将这个 NioSocketChannel 对象保留在channel 字段中. DefaultChannelPipeline 中, 还有两个非凡的字段, 即 headtail, 而这两个字段是一个双向链表的头和尾. 其实在 DefaultChannelPipeline 中, 保护了一个以 AbstractChannelHandlerContext 为节点的双向链表, 这个链表是 Netty 实现 Pipeline 机制的要害.

对于 DefaultChannelPipeline 中的双向链表以及它所起的作用, 咱们会在后续章节具体解说。这里只是对 pipeline 做个初步的意识。

HeadContext 的继承层次结构如下所示:

TailContext 的继承层次结构如下所示:

咱们能够看到, 链表中 head 是一个 ChannelOutboundHandler, 而 tail 则是一个 ChannelInboundHandler.

3.0、客户端 connect 过程

客户端连贯的入口办法还是在 Bootstrap.connect() 中,下面也剖析过一部分内容,申请的具体流程是:

Bootstrap.connect() -> AbstractChannel.coonnect() -> NioSocketChannel.doConnect()

public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
            throws IOException {
    try {return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
            @Override
            public Boolean run() throws IOException {return socketChannel.connect(remoteAddress);
            }
        });
    } catch (PrivilegedActionException e) {throw (IOException) e.getCause();}
}

看到这里,还是用 Java NIO SocketChannel 发送的 connect 申请进行客户端连贯申请。

总结

本篇文章以一个 Netty Client demo 为入口,而后解析了 NioEventLoopGroup 创立的流程、Channel的创立和注册的流程,以及客户端发动 connect 的具体流程,这里对于很多细节并没有很深的深刻上来,这些会放到后续的源码剖析文章,敬请期待~

正文完
 0