前言

本篇博文是《从0到1学习 Netty》中入门系列的第二篇博文,次要内容是介绍 Netty 中 EventLoop 的应用,优化及源码解析,往期系列文章请拜访博主的 Netty 专栏,博文中的所有代码全副收集在博主的 GitHub 仓库中;

概述

事件循环对象 EventLoop

在 Netty 中,EventLoop 是用于解决 I/O 事件的线程。它容许咱们在单线程中同时解决多个连贯,防止了阻塞和期待。

简略来说,Netty 的 EventLoop 具备以下几个特点:

  1. 单线程执行:每个 EventLoop 都是由一个线程负责执行,这样能够缩小线程切换开销,进步网络应用程序的性能。
  2. 多路复用:EventLoop 应用底层操作系统提供的 I/O 模型(例如 Java NIO)实现多路复用,即一条线程能够监听多个 Channel,从而实现并发解决多个连贯的能力。
  3. 事件驱动:EventLoop 通过监听与 Channel 相干的 I/O 事件(例如读、写、连贯等),并将其转化为事件对象(例如 ChannelReadEventChannelWriteEvent 等)进行解决。这种事件驱动模型能够让 Netty 高效地响应 I/O 事件,防止了轮询等不必要的操作。
  4. 非阻塞操作:EventLoop 中所有的操作都是非阻塞的,包含 I/O 操作和异步工作的执行。这样能够确保整个零碎始终处于高效运行状态,并进步了零碎的可伸缩性。

其中,EventLoop 的继承关系如下:

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {    EventLoopGroup parent();}public interface EventLoopGroup extends EventExecutorGroup {...}public interface EventExecutorGroup extends     ScheduledExecutorService, Iterable<EventExecutor> {...}
  • 继承自 java.util.concurrent.ScheduledExecutorService; 因而蕴含了线程池中所有的办法;
  • 继承自 Netty 本人的 OrderedEventExecutor

    • 提供了 boolean inEventLoop(Thread var1) 办法判断一个线程是否属于此 EventLoop;
    • 提供了 EventLoopGroup parent() 办法来看看本人属于哪个 EventLoopGroup;

总之,Netty 中的 EventLoop 提供了一种高性能、高可靠性的网络编程模型,它解决了网络应用程序中的并发、高负载等问题,是构建高性能网络应用程序的重要组成部分。


事件循环组 EventLoopGroup

在 Netty 中,EventLoopGroup 是用于解决 I/O 操作和工作执行的线程池。它负责管理一个或多个 EventLoop 实例,每个 EventLoop 负责解决其调配的所有 Channel 的生命周期中产生的事件。

EventLoopGroup 通常会创立两种类型的线程池:Boss GroupWorker Group。Boss Group 负责监听传入连贯的申请,而 Worker Group 则负责解决曾经建设的连贯的读写操作

当一个新的连贯到来时,Boss Group 会将连贯申请注册到某个 EventLoop 的 Selector 上,并将其关联到对应的 Channel 对象。随后,Worker Group 将负责解决新连贯上的所有 I/O 操作。

EventLoopGroup 还提供了一些办法,例如 shutdownGracefully(),能够优雅敞开 EventLoopGroup 的所有线程,并期待它们实现未实现的工作。这对于保障程序的平安敞开十分有用。

其中,EventLoopGroup 的继承关系如下:

public interface EventLoopGroup extends EventExecutorGroup {...}public interface EventExecutorGroup extends     ScheduledExecutorService, Iterable<EventExecutor> {...}
  • 继承自 Netty 本人的 EventExecutorGroup

    • 实现了 Iterable 接口提供遍历 EventLoop 的能力;
    • 另有 next 办法获取汇合中下一个 EventLoop;

执行工作

1、创立事件循环组,线程数 nThreads 可采纳默认设置或者依据需要自定义:

EventLoopGroup group = new NioEventLoopGroup(2);

2、获取下一个事件循环对象:

group.next();

这里采纳的是轮询的形式进行调配,次要是为了工作分工比拟平均:

System.out.println(group.next());  System.out.println(group.next());  System.out.println(group.next());  System.out.println(group.next());

运行后果:

io.netty.channel.nio.NioEventLoop@4b14c583io.netty.channel.nio.NioEventLoop@65466a6aio.netty.channel.nio.NioEventLoop@4b14c583io.netty.channel.nio.NioEventLoop@65466a6a

3、执行一般工作,应用 submit

group.next().submit(() -> {    try {        Thread.sleep(1000);    } catch (InterruptedException e) {        throw new RuntimeException(e);    }    log.debug(Thread.currentThread().getName());});log.debug(Thread.currentThread().getName());

运行后果:

15:49:31 [DEBUG] [main] c.s.n.c.TestEventLoop - main15:49:32 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - nioEventLoopGroup-2-1

4、执行定时工作,应用 scheduleAtFixedRate

group.next().scheduleAtFixedRate(() -> {    log.debug("sidiot.");}, 0, 1, TimeUnit.SECONDS);log.debug(Thread.currentThread().getName());

运行后果:

16:26:44 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - sidiot.16:26:44 [DEBUG] [main] c.s.n.c.TestEventLoop - main16:26:45 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - sidiot.16:26:46 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - sidiot.16:26:47 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - sidiot.

5、敞开 EventLoopGroup

group.shutdownGracefully();

shutdownGracefully 办法会优雅地敞开 EventLoopGroup。该办法首先会切换 EventLoopGroup 到敞开状态从而回绝新的工作的退出,而后在工作队列的工作都解决实现后,进行线程的运行,从而确保整体利用是在失常有序的状态下退出的。

IO 工作

在上篇博文 从0到1(六):入门-Hello World 中有具体的解析,因而这里就不开展叙述,间接给出代码;

服务端

@Slf4jpublic class EventLoopServer {    public static void main(String[] args) {        new ServerBootstrap()                .group(new NioEventLoopGroup())                .channel(NioServerSocketChannel.class)                .childHandler(new ChannelInitializer<NioSocketChannel>() {                    @Override                    protected void initChannel(NioSocketChannel ch) throws Exception {                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {                            @Override                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {                                ByteBuf buf = (ByteBuf) msg;                                log.debug(buf.toString(StandardCharsets.UTF_8));                            }                        });                    }                })                .bind(7999);    }}

客户端

public class EventLoopClient {    public static void main(String[] args) throws InterruptedException {        Channel channel = new Bootstrap()                .group(new NioEventLoopGroup())                .channel(NioSocketChannel.class)                .handler(new ChannelInitializer<NioSocketChannel>() {                    @Override                    protected void initChannel(NioSocketChannel ch) throws Exception {                        ch.pipeline().addLast(new StringEncoder());                    }                })                .connect(new InetSocketAddress(7999))                .sync()                .channel();        System.out.println(channel);    }}

运行后果:

17:57:12 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.EventLoopServer - C1: sidiot.17:57:34 [DEBUG] [nioEventLoopGroup-2-3] c.s.n.c.EventLoopServer - C2: sidiot.17:57:50 [DEBUG] [nioEventLoopGroup-2-3] c.s.n.c.EventLoopServer - C2: sid10t.17:57:58 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.EventLoopServer - C1: sid10t.

从运行后果中不难发现,每个客户端都会绑定一个固定的线程进行解决,这是因为当一个客户端连贯到服务器时,Netty 会创立一个新的 Channel,并将其注册到一个 EventLoop 上。

每个客户端所对应的 Channel 将由同一个 EventLoop 来解决,这样能够保障解决音讯的程序,并且防止了线程上下文切换的开销。

细化分工

在上述 IO 工作中,咱们只应用了一个 NioEventLoopGroup 对所有事件进行解决。为了提高效率,须要进一步的细化分工,咱们将应用两个 EventLoopGroup 别离作为 BossWorkerBoss 负责解决 Accept 事件,而 Worker 负责 Read & Write 事件;

.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))

同时思考到,如果 handler 的执行工夫过长,则会占用 Worker 的 NIO 线程,即会影响 Worker 的读写效率,因而还须要再次进行细分,创立一个独立的 EventLoopGroup 用来解决比拟耗时的 handler;

在不细分的状况下,运行后果如下:

10:30:19 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C1: sid10t.10:30:20 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h1: C3: sid10t.10:30:24 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h2: C1: sid10t.10:30:24 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C2: sid10t.10:30:25 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h2: C3: sid10t.10:30:29 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h2: C2: sid10t.10:30:29 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C4: sid10t.10:30:34 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h2: C4: sid10t.

批改后的代码如下:

EventLoopGroup group = new DefaultEventLoopGroup();.childHandler(new ChannelInitializer<NioSocketChannel>() {    @Override    protected void initChannel(NioSocketChannel ch) throws Exception {        ch.pipeline().addLast("h1", new ChannelInboundHandlerAdapter() {            @Override            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {                ByteBuf buf = (ByteBuf) msg;                log.debug("h1: {}.", buf.toString(StandardCharsets.UTF_8));                ctx.fireChannelRead(msg);            }        }).addLast(group, "h2", new ChannelInboundHandlerAdapter() {            @Override            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {                try {                    Thread.sleep(5000);                } catch (InterruptedException e) {                    throw new RuntimeException(e);                }                ByteBuf buf = (ByteBuf) msg;                log.debug("h2: {}.", buf.toString(StandardCharsets.UTF_8));            }        });    }})

运行后果:

10:35:31 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C1: sidiot..10:35:32 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h1: C2: sidiot..10:35:32 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C3: sidiot..10:35:33 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h1: C4: sidiot..10:35:36 [DEBUG] [defaultEventLoopGroup-2-2] c.s.n.c.EventLoopServer - h2: C1: sidiot..10:35:37 [DEBUG] [defaultEventLoopGroup-2-3] c.s.n.c.EventLoopServer - h2: C2: sidiot..10:35:37 [DEBUG] [defaultEventLoopGroup-2-4] c.s.n.c.EventLoopServer - h2: C3: sidiot..10:35:38 [DEBUG] [defaultEventLoopGroup-2-1] c.s.n.c.EventLoopServer - h2: C4: sidiot..

通过前后的后果比照,能够看出,在不应用独立的 EventLoopGroup 进行细分的状况下,耗时的 handler 会始终占用 NIO 线程,从而使得其余的 channel 须要进行期待,导致效率低下;

源码浅析

在上述过程中,多个 handler 用的是不同的 EventLoop,那它们是如何进行切换的呢?

在 Netty 框架中,当有数据须要被读取时,会将读取操作封装成一个 ChannelRead 事件,并通过 ChannelHandlerContext 传递给后续的处理器来解决。

其中,有一个重要的办法 invokeChannelRead,用于调用下一个 ChannelHandler 的 channelRead 办法,即实现数据的传递和解决的过程。

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);    EventExecutor executor = next.executor();    if (executor.inEventLoop()) {        next.invokeChannelRead(m);    } else {        executor.execute(new Runnable() {            @Override            public void run() {                next.invokeChannelRead(m);            }        });    }}

在上述代码中,抽象类 AbstractChannelHandlerContext 示意了一个 ChannelHandler 上下文对象,它记录了解决以后 ChannelHandler 的信息,以及与之前后的 ChannelHandler 的关系等。

接着,应用了 pipeline.touch() 办法来标记并查看音讯对象 msg 是否为空。该办法能够防止在处理过程中反复解决同一个音讯,进步解决效率。如果 msg 为空,则会抛出异样提醒 “msg 不能为空”。

而后,通过 next.executor() 获取到 EventExecutor,即执行 ChannelHandler 的线程池。如果以后线程为 Netty 线程(即 IO 线程),则间接调用 next.invokeChannelRead(m) 执行下一个 ChannelHandler 的 channelRead 办法;否则,将该工作退出到线程池中异步执行。

最初,应用匿名外部类实现 Runnable 接口,定义 run 办法来执行下一个 ChannelHandler 的 channelRead 办法。

总之,invokeChannelRead 办法的次要作用就是封装了 ChannelHandlerContext 的读取操作,依据以后线程的执行状况抉择是否异步执行。这种机制能够进步网络读取的效率和性能,防止了阻塞 IO 操作带来的性能问题,并使得处理过程更加灵便和可控。

后记

以上就是 分析 EventLoop 的所有内容了,心愿本篇博文对大家有所帮忙!

参考:

  • Netty API reference;
  • 黑马程序员Netty全套教程 ;

上篇精讲:「萌新入门」(一)Hello, World!

我是 ,期待你的关注;

创作不易,请多多反对;

系列专栏:摸索 Netty:源码解析与利用案例分享