乐趣区

关于java:netty系列之EventExecutorEventExecutorGroup和netty中的实现

简介

netty 作为一个异步 NIO 框架,多线程必定是它的根底,然而对于 netty 的理论使用者来说,个别是不须要接触到多线程的,咱们只须要依照 netty 框架规定的流程走上来,自定义 handler 来解决对应的音讯即可。

那么有敌人会问了,作为一个 NIO 框架,netty 的多线程到底体现在什么中央呢?它的底层原理是什么呢?

明天带大家来看看 netty 中的工作执行器 EventExecutor 和 EventExecutorGroup。

EventExecutorGroup

因为 EventExecutor 继承自 EventExecutorGroup,这里咱们先来具体解说一下 EventExecutorGroup。

先看下 EventExecutorGroup 的定义:

public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor>

EventExecutorGroup 继承自 JDK 的 ScheduledExecutorService,即能够执行定时工作,也能够像一般的工作执行器一样提交工作去执行。

同时 EventExecutorGroup 还继承了 Iterable 接口,示意 EventExecutorGroup 是可遍历的,它的遍历对象是 EventExecutor。

EventExecutorGroup 有两个和 Iterable 相干的办法,别离是 next 和 iterator:

    EventExecutor next();

    @Override
    Iterator<EventExecutor> iterator();

在 EventExecutorGroup 中调用 next 办法会返回一个 EventExecutor 对象,那么 EventExecutorGroup 和 EventExecutor 是什么关系呢?

咱们再来看一下 EventExecutor 的定义:

public interface EventExecutor extends EventExecutorGroup

能够看到 EventExecutor 实际上是 EventExecutorGroup 的子类。然而在父类 EventExecutorGroup 中竟然有对子类 EventExecutor 的援用。

这种在父类的 Group 中援用返回子类的设计模式在 netty 中十分常见,大家能够自行领会一下这样的设计到底是好还是坏。

EventExecutorGroup 作为一个 EventExecutor 的 Group 对象,它是用来治理 group 中的 EventExecutor 的。所以在 EventExecutorGroup 中设计了一些对 EventExecutor 的对立治理接口。

比方 boolean isShuttingDown() 办法用来判断这个 group 中的所有 EventExecutor 全都在被 shutdown 或者曾经被 shutdown。

另外 EventExecutorGroupt 提供了 shutdown group 中所有 EventExector 的办法:Future<?> shutdownGracefully() 和 terminate 办法:Future<?> terminationFuture()

这两个办法都返回了 Future,所以咱们能够认为这两个办法是异步办法。

EventExecutorGroup 中其余的办法都是一些对 JDK 中 ScheduledExecutorService 办法的重写,比方 submit,schedule,scheduleAtFixedRate,scheduleWithFixedDelay 等。

EventExecutor

接下来咱们再钻研一下 EventExecutor,在上一节中,咱们简略的提到了 EventExecutor 继承自 EventExecutorGroup,和 EventExecutorGroup 相比,EventExecutor 有哪些新的办法呢?

咱们晓得 EventExecutorGroup 继承了 Iterable, 并且定义了一个 next 办法用来返回 Group 中的一个 EventExecutor 对象。

因为 Group 中有很多个 EventExecutor,至于具体返回哪一个 EventExecutor, 还是由具体的实现类来实现的。

在 EventExecutor 中,它重写了这个办法:

@Override
EventExecutor next();

这里的 next 办法,返回的是 EventExecutor 自身。

另外,因为 EventExecutor 是由 EventExecutorGroup 来治理的,所以 EventExecutor 中还存在一个 parent 办法,用来返回治理 EventExecutor 的 EventExecutorGroup:

EventExecutorGroup parent();

EventExecutor 中新加了两个 inEventLoop 办法,用来判断给定的线程是否在 event loop 中执行。

    boolean inEventLoop();

    boolean inEventLoop(Thread thread);

EventExecutor 还提供两个办法能够返回 Promise 和 ProgressivePromise.

<V> Promise<V> newPromise();
<V> ProgressivePromise<V> newProgressivePromise();

相熟 ECMAScript 的敌人可能晓得,Promise 是 ES6 引入的一个新的语法性能,用来解决回调天堂的问题。这里的 netty 引入的 Promise 继承自 Future,并且增加了两个 success 和 failure 的状态。

ProgressivePromise 更进一步,在 Promise 根底上,提供了一个 progress 来示意进度。

除此之外,EventExecutor 还提供了对 Succeeded 的后果和 Failed 异样封装成为 Future 的办法。

    <V> Future<V> newSucceededFuture(V result);

    <V> Future<V> newFailedFuture(Throwable cause);

EventExecutorGroup 在 netty 中的根本实现

EventExecutorGroup 和 EventExecutor 在 netty 中有很多十分重要的实现,其中最常见的就是 EventLoop 和 EventLoopGroup,鉴于 EventLoop 和 EventLoopGroup 的重要性,咱们会在前面的章节中重点解说他们。这里先来看下 netty 中的其余实现。

netty 中 EventExecutorGroup 的默认实现叫做 DefaultEventExecutorGroup, 它的继承关系如下所示:

<img src=”https://img-blog.csdnimg.cn/ae242899a8234b668045716daa2eec1a.png” style=”zoom:67%;” />

能够看到 DefaultEventExecutorGroup 继承自 MultithreadEventExecutorGroup, 而 MultithreadEventExecutorGroup 又继承自 AbstractEventExecutorGroup。

先看下 AbstractEventExecutorGroup 的逻辑。AbstractEventExecutorGroup 基本上是对 EventExecutorGroup 中接口的一些实现。

咱们晓得 EventExecutorGroup 中定义了一个 next()办法,能够返回 Group 中的一个 EventExecutor。

在 AbstractEventExecutorGroup 中,简直所有 EventExecutorGroup 中的办法实现,都是调用 next()办法来实现的,以 submit 办法为例:

public Future<?> submit(Runnable task) {return next().submit(task);
    }

能够看到 submit 办法首先调用 next 获取到的 EventExecutor,而后再调用 EventExecutor 中的 submit 办法。

AbstractEventExecutorGroup 中的其余办法都是这样的实现。然而 AbstractEventExecutorGroup 中并没有实现 next()办法,具体如何从 Group 中获取到 EventExecutor,还须要看底层的具体实现。

MultithreadEventExecutorGroup 继承自 AbstractEventExecutorGroup,提供了多线程工作的反对。

MultithreadEventExecutorGroup 有两类构造函数,在构造函数中能够指定多线程的个数,还有工作执行器 Executor, 如果没有提供 Executor 的话,能够提供一个 ThreadFactory,MultithreadEventExecutorGroup 会调用 new ThreadPerTaskExecutor(threadFactory) 来为每一个线程结构一个 Executor:

    protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
    }

        protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }

MultithreadEventExecutorGroup 对多线程的反对是怎么实现的呢?

首先 MultithreadEventExecutorGroup 提供了两个 children,别离是 children 和 readonlyChildren:

    private final EventExecutor[] children;
    private final Set<EventExecutor> readonlyChildren;

children 和 MultithreadEventExecutorGroup 中的线程个数是一一对应的,有多少个线程,children 就有多大。

children = new EventExecutor[nThreads];

而后通过调用 newChild 办法,将传入的 executor 结构成为 EventExecutor 返回:

children[i] = newChild(executor, args);

看一下 newChild 办法的定义:

 protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;

这个办法在 MultithreadEventExecutorGroup 中并没有实现,须要在更具体的类中实现它。

readonlyChildren 是 child 的只读版本,用来在遍历办法中返回:

readonlyChildren = Collections.unmodifiableSet(childrenSet);

public Iterator<EventExecutor> iterator() {return readonlyChildren.iterator();
    }

咱们当初有了 Group 中的所有 EventExecutor,那么在 MultithreadEventExecutorGroup 中,next 办法是怎么抉择具体返回哪一个 EventExecutor 呢?

先来看一下 next 办法的定义:


private final EventExecutorChooserFactory.EventExecutorChooser chooser;

chooser = chooserFactory.newChooser(children);

public EventExecutor next() {return chooser.next();
    }

next 办法调用的是 chooser 的 next 办法,看一下 chooser 的 next 办法具体实现:

public EventExecutor next() {return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
        }

能够看到,其实是一个很简略的依据 index 来获取对象的操作。

最初看一下 DefaultEventExecutorGroup 中对 newChild 办法的实现:

    protected EventExecutor newChild(Executor executor, Object... args) throws Exception {return new DefaultEventExecutor(this, executor, (Integer) args[0], (RejectedExecutionHandler) args[1]);
    }

newChild 返回的 EventExecutor 应用的是 DefaultEventExecutor。这个类是 EventExecutor 在 netty 中的默认实现,咱们在下一小结中具体进行解说。

EventExecutor 在 netty 中的根本实现

EventExecutor 在 netty 中的默认实现是 DefaultEventExecutor,先看下它的继承构造:

<img src=”https://img-blog.csdnimg.cn/67a3434b3dc14e1d98baf8cb5dbf07cf.png” style=”zoom:67%;” />

DefaultEventExecutor 继承自 SingleThreadEventExecutor, 而 SingleThreadEventExecutor 又继承自 AbstractScheduledEventExecutor,AbstractScheduledEventExecutor 继承自 AbstractEventExecutor。

先来看一下 AbstractEventExecutor 的定义:

public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor

AbstractEventExecutor 继承了 AbstractExecutorService,并且实现了 EventExecutor 接口。

AbstractExecutorService 是 JDK 中的类,它提供了 ExecutorService 的一些实现,比方 submit, invokeAny and invokeAll 等办法。

AbstractEventExecutor 作为 ExecutorGroup 的一员,它提供了一个 EventExecutorGroup 类型的 parent 属性:

private final EventExecutorGroup parent;

public EventExecutorGroup parent() {return parent;}

对于 next 办法来说,AbstractEventExecutor 返回的是它自身:

public EventExecutor next() {return this;}

AbstractScheduledEventExecutor 继承自 AbstractEventExecutor, 它外部应用了一个 PriorityQueue 来存储蕴含定时工作的 ScheduledFutureTask, 从而实现定时工作的性能:

PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;

接下来是 SingleThreadEventExecutor,从名字能够看出,SingleThreadEventExecutor 应用的是单线程来执行提交的 tasks,SingleThreadEventExecutor 提供了一个默认的 pending 执行 task 的工作大小:DEFAULT_MAX_PENDING_EXECUTOR_TASKS,还定义了工作执行的几种状态:

    private static final int ST_NOT_STARTED = 1;
    private static final int ST_STARTED = 2;
    private static final int ST_SHUTTING_DOWN = 3;
    private static final int ST_SHUTDOWN = 4;
    private static final int ST_TERMINATED = 5;

之前提到了 EventExecutor 中有一个特有的 inEventLoop 办法,判断给定的 thread 是否在 eventLoop 中,在 SingleThreadEventExecutor 中,咱们看一下具体的实现:

    public boolean inEventLoop(Thread thread) {return thread == this.thread;}

具体而言就是判断给定的线程和 SingleThreadEventExecutor 中定义的 thread 属性是不是同一个 thread,SingleThreadEventExecutor 中的 thread 是这样定义的:

这个 thread 是在 doStartThread 办法中进行初始化的:

executor.execute(new Runnable() {
            @Override
            public void run() {thread = Thread.currentThread();

所以这个 thread 是工作执行的线程,也就是 executor 中执行工作用到的线程。

再看一下十分要害的 execute 办法:

private void execute(Runnable task, boolean immediate) {boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {startThread();

这个办法首先将 task 增加到工作队列中,而后调用 startThread 开启线程来执行工作。

最初来看一下 DefaultEventExecutor,这个 netty 中的默认实现:

public final class DefaultEventExecutor extends SingleThreadEventExecutor 

DefaultEventExecutor 继承自 SingleThreadEventExecutor,这个类中,它定义了 run 办法如何实现:

    protected void run() {for (;;) {Runnable task = takeTask();
            if (task != null) {task.run();
                updateLastExecutionTime();}

            if (confirmShutdown()) {break;}
        }
    }

在 SingleThreadEventExecutor 中,咱们会把工作退出到 task queue 中,在 run 办法中,会从 task queue 中取出对应的 task,而后调用 task 的 run 办法执行。

总结

DefaultEventExecutorGroup 继承了 MultithreadEventExecutorGroup,MultithreadEventExecutorGroup 中理论调用的是 SingleThreadEventExecutor 来执行具体的工作。

本文已收录于 http://www.flydean.com/05-1-netty-event…entexecutorgroup/

最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

欢送关注我的公众号:「程序那些事」, 懂技术,更懂你!

退出移动版