NioEventLoopGroup的初始化源码

一、寻找源码的过程

咱们后面说到过,NioEventLoopGroup咱们能够近乎把它看作是一个线程池,该线程池会执行一个一个的工作,咱们罕用的NioEventLoopGroup大略有两种,NioEventLoopGroup(int nThreads),NioEventLoopGroup(),即一个是指定线程数量的,一个是默认指定线程数量的!这里咱们以无参结构为入口进行剖析!

EventLoopGroup work = new NioEventLoopGroup();
public NioEventLoopGroup() {    this(0);}

当咱们应用默认的数量的时候,他会传递一个0,咱们持续往下跟!

public NioEventLoopGroup(int nThreads) {    this(nThreads, (Executor) null);}

留神这里传递的参数是:0,null

public NioEventLoopGroup(int nThreads, Executor executor) {    //每个 group保护一个 SelectorProvider 次要用它获取selector选择器    this(nThreads, executor, SelectorProvider.provider());}
这外面多传递了一个 SelectorProvider.provider(),该办法是JDK NIO提供的API次要能够获取NIO选择器或者Channel,如下图:

咱们回归主线持续跟:

public NioEventLoopGroup(            int nThreads, Executor executor, final SelectorProvider selectorProvider) {    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);}

这里多传递了一个 DefaultSelectStrategy抉择策略,这在前面解说NioEventLoop会具体解说,不做论述!

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,                             final SelectStrategyFactory selectStrategyFactory) {    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());}

咱们会发现这里还会默认传递一个回绝策略RejectedExecutionHandlers.reject(),这个回绝策略是干嘛的呢?

@Overridepublic void rejected(Runnable task, SingleThreadEventExecutor executor) {    throw new RejectedExecutionException();}

咱们失去一个论断,当某些条件触发这个回绝策略,那么他会抛出一个RejectedExecutionException异样,具体什么时候触发,后续也会具体阐明,这里只须要记住就OK了!

咱们持续回到主线, 这里咱们开始调用父类,还记得上一节课咱们剖析的NioEventLoopGroup的父类是谁吗?没错是:MultithreadEventLoopGroup, 咱们会进入到MultithreadEventLoopGroup外面:

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {    //线程数量为0时  应用默认的cpu * 2    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);}

nThreads还记得是几吗?是0对不对,这里有个判断,当你的线程数量为0的时候,会应用DEFAULT_EVENT_LOOP_THREADS当作线程池的数量,DEFAULT_EVENT_LOOP_THREADS是多少呢?

DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

默认是CPU的两倍,所以咱们当初失去一个论断,当咱们应用默认的NioEventLoopGroup的时候,零碎会默认应用零碎CPU核数*2当作线程池的数量!

咱们上一步传递过去的selectorProvider、回绝策略、selectStrategyFactory被封装为数组,并放在args[0],args[1], args[2]的地位!

咱们持续回到主线,这里又再次调用到父类,MultithreadEventExecutorGroup:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);}

留神,这里又再次多传递了一个参数:DefaultEventExecutorChooserFactory一个选择器工厂,这里会返回一个选择器,他是DefaultEventExecutorChooserFactory类型的,具体分析前面会剖析!咱们持续回到主线:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {    ..........后续源码补充..........}

到这里咱们终于看到了一大段代码,这里是EventLoopGroup的次要逻辑,咱们逐行剖析:

二、构建线程执行器

1. 源码解析

//newDefaultThreadFactory  构建线程工厂if (executor == null) {    //创立并保留线程执行器  执行器  执行工作的  默认是  DefaultThreadFactory 线程池    executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}

这里会判断咱们传入的执行器是否为空,否则就新建一个,咱们还记得executor是什么值吗?是null,对不对,所以它会进入到这里的逻辑咱们进入到newDefaultThreadFactory源码外面看一下:

newDefaultThreadFactory()次要逻辑

protected ThreadFactory newDefaultThreadFactory() {    return new DefaultThreadFactory(getClass());}

能够看到,这里向执行器外面传入了一个 DefaultThreadFactory一个默认的线程工厂!

ThreadPerTaskExecutor次要逻辑

/** * io.netty.util.concurrent.DefaultThreadFactory#newThread(java.lang.Runnable) * * 执行工作  每次执行工作都会创立一个线程实体对象 * @param command 线程 */@Overridepublic void execute(Runnable command) {    //执行工作    threadFactory.newThread(command).start();}

咱们发现,这里调用了一个咱们传入的线程工厂,创立了一个新的线程并调用start办法启动了起来,那么他是如何创立的呢? 咱们进入到newThread源码外面查看,因为咱们默认应用的线程工厂是 DefaultThreadFactory, 所以,咱们会进入到 DefaultThreadFactory#newThread

@Overridepublic Thread newThread(Runnable r) {    //创立一个线程 每次执行工作的时候都会创立一个线程实体    Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());    try {        if (t.isDaemon() != daemon) {            t.setDaemon(daemon);        }        if (t.getPriority() != priority) {            t.setPriority(priority);        }    } catch (Exception ignored) {        // Doesn't matter even if failed to set.    }    return t;}

这里没有太多的操作,只是会将一个 Runnable封装为一个 Thread进行返回,咱们重点关注一下这个Thread,它和咱们传统应用的Thread是一样的吗? 咱们跟进到 newThread办法看一下:

protected Thread newThread(Runnable r, String name) {    //Netty本人封装的线程    return new FastThreadLocalThread(threadGroup, r, name);}

逻辑很简略,就是将一个Thread包装为Netty自定义的 FastThreadLocalThread,至于为什么,咱们临时不往下多做解释,后续章节会很具体的解释它!

2. 线程执行器总结

这里咱们会创立一个线程执行器 ThreadPerTaskExecutor,应用默认的线程工厂DefaultThreadFactory,线程执行器会将一个工作包装为一个 FastThreadLocalThread对象,而后调用start办法开启一个新的线程执行工作!

三、创立对应数量的执行器

//创立执行器数组  数量和预设线程数量统一children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {    boolean success = false;    try {        //创立执行器  开始创立执行器   这里的执行机预计就会EventLoop  是NioEventLoop        children[i] = newChild(executor, args);        success = true;    } catch (Exception e) {        .....省略不必要代码    } finally {        .....省略不必要代码    }}

1. 源码解析

children = new EventExecutor[nThreads];

首先他会创立一个空的EventExecutor执行器数组,而后遍历填充!

还记得 nThreads是几吗? 默认是CPU2的大小,所以这里会创立 CPU 2数量的执行器! 咱们发现,for循环中填充的次要逻辑是newChild,所以,咱们进入到 newChild办法, 这里提醒一点,咱们创立的Group对象是一个什么对象? 是NioEventLoopGroup对象对不对,所以咱们这里会进入到 NioEventLoopGroup#newChild办法:

@Overrideprotected EventLoop newChild(Executor executor, Object... args) throws Exception {    EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;    return new NioEventLoop(this, executor, (SelectorProvider) args[0],                            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);}

咱们传递过去 的args长度为3,后面做过解析 :

args[0]为 selectorProvider、args[1]为回绝策略、args[2]为selectStrategyFactory

所以 queueFactory为null, 而后咱们再重点关注 NioEventLoop对象,能够看出,newChild办法返回的是 NioEventLoop,那么咱们初步就能够确定,EventExecutor数组外面存在的是NioEventLoop对象!至此,咱们就不深究了,NioEventLoop的初始化源码剖析我会放到下一节课剖析,这里咱们能够确定一件事, EventExecutor数组外面存在的是NioEventLoop对象!咱们持续回到主线:

2. 执行器数组总结

for循环结束之后,此时的EventExecutor[nThreads];数组就被填充斥了,外面的每一个元素都是NioEventLoop对象,每一个NioEventLoop对象都蕴含一个 ThreadPerTaskExecutor线程执行器对象!

四、创立一个执行器选择器

1. 源码解析

chooser = chooserFactory.newChooser(children);

还记得 chooserFactory是什么类型的吗? 是DefaultEventExecutorChooserFactory类型的,忘了的能够往上翻一下寻找源码的过程中的代码或者调试一下!

咱们进入到 DefaultEventExecutorChooserFactory#newChooser 源码逻辑中,并传入刚刚咱们循环填充好的数组:

@Overridepublic EventExecutorChooser newChooser(EventExecutor[] executors) {    //判断2的幂 isPowerOfTwo    if (isPowerOfTwo(executors.length)) {        return new PowerOfTwoEventExecutorChooser(executors);    } else {        //简略的        return new GenericEventExecutorChooser(executors);    }}

能够看到,这里仿佛有两种状况,返回的是不同的策略对象,当你的数组长度是2的幂等次方的时候,返回的是 PowerOfTwoEventExecutorChooser对象,否则返回 GenericEventExecutorChooser对象,咱们就两种状况全副剖析一下:

I、PowerOfTwoEventExecutorChooser
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {    this.executors = executors;}@Overridepublic EventExecutor next() {    //2的幂等性  实现这个  也能实现循环取数的    //executors   就是NioEventLoop数组  依照2次幂求本次获取的EventLoop是个啥    return executors[idx.getAndIncrement() & executors.length - 1];}

这段代码的次要逻辑是,取一个自增的CAS类,与数组长度做&运算,最终会呈现循环取数的后果:

<img src="![](http://images.huangfusuper.cn/typora/huanghuang.gif)" style="zoom:50%;" />

从下面的图片能够根本看进去, 该性能能够实现一个循环取数的性能,每次达到数组的尾部部都会从新回到头部从新获取!

代码案例:

public static void main(String[] args) {    String[] strings = {"第一个", "第二个", "第三个", "第四个"};    AtomicInteger idx = new AtomicInteger();    for (int i = 0; i < 9; i++) {        System.out.println(strings[idx.getAndIncrement() & strings.length -1]);    }}

后果集

第一个第二个第三个第四个第一个第二个第三个第四个第一个
II、GenericEventExecutorChooser

当你的线程数量不是2的幂次方的时候,会走一个通用的选择器,具体实现源码如下:

GenericEventExecutorChooser(EventExecutor[] executors) {    this.executors = executors;}@Overridepublic EventExecutor next() {    //自增  取模   以达到循环的目标    //假如executors 长度为5  那么 一直的循环就会一直的失去 0 1 2 3 4  0 1 2 3 4。。。    return executors[Math.abs(idx.getAndIncrement() % executors.length)];}

这个代码就不必了我做演示了吧,他的性能和下面那个性能是一样的能 可能达到一个循环取数的性能

思考

为什么Netty要分为两个策略类来实现呢,间接用第二种不行吗?

Netty官网对性能的要求达到了极致,大家要晓得位运算速度要高于间接取模运算的,所以Netty官网即便是这一点也做了一个优化!

2. 执行器选择器总结

咱们通过上述能够理解到,这里会通过一个选择器工厂创立一个选择器,并保留在NioEvenetLoopGroup中,调用该选择器的next办法会返回一个NioEventLoop对象,其中的获取形式是一直的循环,顺次获取NioEventLoop对象,这也是一个NioEventLoop对SocketChannel为一对多的根底!这都是后话!

NioEventLoopGroup源码总结

  1. 创立一个线程执行器,当调用该线程执行器的execute办法的时候,会讲一个Runable对象包装为Thread对象,再将Thread对象包装为FastThreadLocalThread对象,而后启动起来! 简略来说,每调用一次execute办法,都去创立并启动一条新线程执行工作!
  2. 创立一个执行器数组,数组长度与咱们传递的数量无关,默认为CPU*2个数量,而后再循环填充这个空数组,数组外面的元素是一个NioEventLoop对象,每一个NioEventLoop对会持有一个线程执行器的援用!
  3. 创立一个执行器选择器,调用该执行器选择器的next办法能够返回一个NioEventLoop对象,外部是进行循环取数的,每一个NioEventLoop都可能会被屡次获取!