简介

netty作为一个异步NIO框架,多线程必定是它的根底,然而对于netty的理论使用者来说,个别是不须要接触到多线程的,咱们只须要依照netty框架规定的流程走上来,自定义handler来解决对应的音讯即可。

那么有敌人会问了,作为一个NIO框架,netty的多线程到底体现在什么中央呢?它的底层原理是什么呢?

明天带大家来看看netty中的工作执行器EventExecutor和EventExecutorGroup。

EventExecutorGroup

因为EventExecutor继承自EventExecutorGroup,这里咱们先来具体解说一下EventExecutorGroup。

先看下EventExecutorGroup的定义:

public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor>

EventExecutorGroup继承自JDK的ScheduledExecutorService,即能够执行定时工作,也能够像一般的工作执行器一样提交工作去执行。

同时EventExecutorGroup还继承了Iterable接口,示意EventExecutorGroup是可遍历的,它的遍历对象是EventExecutor。

EventExecutorGroup有两个和Iterable相干的办法,别离是next和iterator:

    EventExecutor next();    @Override    Iterator<EventExecutor> iterator();

在EventExecutorGroup中调用next办法会返回一个EventExecutor对象,那么EventExecutorGroup和EventExecutor是什么关系呢?

咱们再来看一下EventExecutor的定义:

public interface EventExecutor extends EventExecutorGroup

能够看到EventExecutor实际上是EventExecutorGroup的子类。然而在父类EventExecutorGroup中竟然有对子类EventExecutor的援用。

这种在父类的Group中援用返回子类的设计模式在netty中十分常见,大家能够自行领会一下这样的设计到底是好还是坏。

EventExecutorGroup作为一个EventExecutor的Group对象,它是用来治理group中的EventExecutor的。所以在EventExecutorGroup中设计了一些对EventExecutor的对立治理接口。

比方boolean isShuttingDown()办法用来判断这个group中的所有EventExecutor全都在被shutdown或者曾经被shutdown。

另外EventExecutorGroupt提供了shutdown group中所有EventExector的办法:Future<?> shutdownGracefully() 和 terminate办法:Future<?> terminationFuture()

这两个办法都返回了Future,所以咱们能够认为这两个办法是异步办法。

EventExecutorGroup中其余的办法都是一些对JDK中ScheduledExecutorService办法的重写,比方submit,schedule,scheduleAtFixedRate,scheduleWithFixedDelay等。

EventExecutor

接下来咱们再钻研一下EventExecutor,在上一节中,咱们简略的提到了EventExecutor继承自EventExecutorGroup,和EventExecutorGroup相比,EventExecutor有哪些新的办法呢?

咱们晓得EventExecutorGroup继承了Iterable,并且定义了一个next办法用来返回Group中的一个EventExecutor对象。

因为Group中有很多个EventExecutor,至于具体返回哪一个EventExecutor,还是由具体的实现类来实现的。

在EventExecutor中,它重写了这个办法:

@OverrideEventExecutor next();

这里的next办法,返回的是EventExecutor自身。

另外,因为EventExecutor是由EventExecutorGroup来治理的,所以EventExecutor中还存在一个parent办法,用来返回治理EventExecutor的EventExecutorGroup:

EventExecutorGroup parent();

EventExecutor中新加了两个inEventLoop办法,用来判断给定的线程是否在event loop中执行。

    boolean inEventLoop();    boolean inEventLoop(Thread thread);

EventExecutor还提供两个办法能够返回Promise和ProgressivePromise.

<V> Promise<V> newPromise();<V> ProgressivePromise<V> newProgressivePromise();

相熟ECMAScript的敌人可能晓得,Promise是ES6引入的一个新的语法性能,用来解决回调天堂的问题。这里的netty引入的Promise继承自Future,并且增加了两个success和failure的状态。

ProgressivePromise更进一步,在Promise根底上,提供了一个progress来示意进度。

除此之外,EventExecutor还提供了对Succeeded的后果和Failed异样封装成为Future的办法。

    <V> Future<V> newSucceededFuture(V result);    <V> Future<V> newFailedFuture(Throwable cause);

EventExecutorGroup在netty中的根本实现

EventExecutorGroup和EventExecutor在netty中有很多十分重要的实现,其中最常见的就是EventLoop和EventLoopGroup,鉴于EventLoop和EventLoopGroup的重要性,咱们会在前面的章节中重点解说他们。这里先来看下netty中的其余实现。

netty中EventExecutorGroup的默认实现叫做DefaultEventExecutorGroup,它的继承关系如下所示:

<img src="https://img-blog.csdnimg.cn/ae242899a8234b668045716daa2eec1a.png" style="zoom:67%;" />

能够看到DefaultEventExecutorGroup继承自MultithreadEventExecutorGroup,而MultithreadEventExecutorGroup又继承自AbstractEventExecutorGroup。

先看下AbstractEventExecutorGroup的逻辑。AbstractEventExecutorGroup基本上是对EventExecutorGroup中接口的一些实现。

咱们晓得EventExecutorGroup中定义了一个next()办法,能够返回Group中的一个EventExecutor。

在AbstractEventExecutorGroup中,简直所有EventExecutorGroup中的办法实现,都是调用next()办法来实现的,以submit办法为例:

public Future<?> submit(Runnable task) {        return next().submit(task);    }

能够看到submit办法首先调用next获取到的EventExecutor,而后再调用EventExecutor中的submit办法。

AbstractEventExecutorGroup中的其余办法都是这样的实现。然而AbstractEventExecutorGroup中并没有实现next()办法,具体如何从Group中获取到EventExecutor,还须要看底层的具体实现。

MultithreadEventExecutorGroup继承自AbstractEventExecutorGroup,提供了多线程工作的反对。

MultithreadEventExecutorGroup有两类构造函数,在构造函数中能够指定多线程的个数,还有工作执行器Executor,如果没有提供Executor的话,能够提供一个ThreadFactory,MultithreadEventExecutorGroup会调用new ThreadPerTaskExecutor(threadFactory)来为每一个线程结构一个Executor:

    protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {        this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);    }        protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);    }

MultithreadEventExecutorGroup对多线程的反对是怎么实现的呢?

首先MultithreadEventExecutorGroup提供了两个children,别离是children和readonlyChildren:

    private final EventExecutor[] children;    private final Set<EventExecutor> readonlyChildren;

children和MultithreadEventExecutorGroup中的线程个数是一一对应的,有多少个线程,children就有多大。

children = new EventExecutor[nThreads];

而后通过调用newChild办法,将传入的executor结构成为EventExecutor返回:

children[i] = newChild(executor, args);

看一下newChild办法的定义:

 protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;

这个办法在MultithreadEventExecutorGroup中并没有实现,须要在更具体的类中实现它。

readonlyChildren是child的只读版本,用来在遍历办法中返回:

readonlyChildren = Collections.unmodifiableSet(childrenSet);public Iterator<EventExecutor> iterator() {        return readonlyChildren.iterator();    }

咱们当初有了Group中的所有EventExecutor,那么在MultithreadEventExecutorGroup中,next办法是怎么抉择具体返回哪一个EventExecutor呢?

先来看一下next办法的定义:

private final EventExecutorChooserFactory.EventExecutorChooser chooser;chooser = chooserFactory.newChooser(children);public EventExecutor next() {        return chooser.next();    }

next办法调用的是chooser的next办法,看一下chooser的next办法具体实现:

public EventExecutor next() {            return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];        }

能够看到,其实是一个很简略的依据index来获取对象的操作。

最初看一下DefaultEventExecutorGroup中对newChild办法的实现:

    protected EventExecutor newChild(Executor executor, Object... args) throws Exception {        return new DefaultEventExecutor(this, executor, (Integer) args[0], (RejectedExecutionHandler) args[1]);    }

newChild返回的EventExecutor应用的是DefaultEventExecutor。这个类是EventExecutor在netty中的默认实现,咱们在下一小结中具体进行解说。

EventExecutor在netty中的根本实现

EventExecutor在netty中的默认实现是DefaultEventExecutor,先看下它的继承构造:

<img src="https://img-blog.csdnimg.cn/67a3434b3dc14e1d98baf8cb5dbf07cf.png" style="zoom:67%;" />

DefaultEventExecutor继承自SingleThreadEventExecutor,而SingleThreadEventExecutor又继承自AbstractScheduledEventExecutor,AbstractScheduledEventExecutor继承自AbstractEventExecutor。

先来看一下AbstractEventExecutor的定义:

public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor

AbstractEventExecutor继承了AbstractExecutorService,并且实现了EventExecutor接口。

AbstractExecutorService是JDK中的类,它提供了 ExecutorService 的一些实现,比方submit, invokeAny and invokeAll等办法。

AbstractEventExecutor作为ExecutorGroup的一员,它提供了一个EventExecutorGroup类型的parent属性:

private final EventExecutorGroup parent;public EventExecutorGroup parent() {        return parent;    }

对于next办法来说,AbstractEventExecutor返回的是它自身:

public EventExecutor next() {        return this;    }

AbstractScheduledEventExecutor继承自AbstractEventExecutor,它外部应用了一个PriorityQueue来存储蕴含定时工作的ScheduledFutureTask,从而实现定时工作的性能:

PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;

接下来是SingleThreadEventExecutor,从名字能够看出,SingleThreadEventExecutor应用的是单线程来执行提交的tasks,SingleThreadEventExecutor提供了一个默认的pending执行task的工作大小:DEFAULT_MAX_PENDING_EXECUTOR_TASKS,还定义了工作执行的几种状态:

    private static final int ST_NOT_STARTED = 1;    private static final int ST_STARTED = 2;    private static final int ST_SHUTTING_DOWN = 3;    private static final int ST_SHUTDOWN = 4;    private static final int ST_TERMINATED = 5;

之前提到了EventExecutor中有一个特有的inEventLoop办法,判断给定的thread是否在eventLoop中,在SingleThreadEventExecutor中,咱们看一下具体的实现:

    public boolean inEventLoop(Thread thread) {        return thread == this.thread;    }

具体而言就是判断给定的线程和SingleThreadEventExecutor中定义的thread属性是不是同一个thread,SingleThreadEventExecutor中的thread是这样定义的:

这个thread是在doStartThread办法中进行初始化的:

executor.execute(new Runnable() {            @Override            public void run() {                thread = Thread.currentThread();

所以这个thread是工作执行的线程,也就是executor中执行工作用到的线程。

再看一下十分要害的execute办法:

private void execute(Runnable task, boolean immediate) {        boolean inEventLoop = inEventLoop();        addTask(task);        if (!inEventLoop) {            startThread();

这个办法首先将task增加到工作队列中,而后调用startThread开启线程来执行工作。

最初来看一下DefaultEventExecutor,这个netty中的默认实现:

public final class DefaultEventExecutor extends SingleThreadEventExecutor 

DefaultEventExecutor继承自SingleThreadEventExecutor,这个类中,它定义了run办法如何实现:

    protected void run() {        for (;;) {            Runnable task = takeTask();            if (task != null) {                task.run();                updateLastExecutionTime();            }            if (confirmShutdown()) {                break;            }        }    }

在SingleThreadEventExecutor中,咱们会把工作退出到task queue中,在run办法中,会从task queue中取出对应的task,而后调用task的run办法执行。

总结

DefaultEventExecutorGroup继承了MultithreadEventExecutorGroup,MultithreadEventExecutorGroup中理论调用的是SingleThreadEventExecutor来执行具体的工作。

本文已收录于 http://www.flydean.com/05-1-netty-event…entexecutorgroup/

最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

欢送关注我的公众号:「程序那些事」,懂技术,更懂你!