乐趣区

关于后端:Netty萌新入门二剖析-EventLoop

前言

本篇博文是《从 0 到 1 学习 Netty》中入门系列的第二篇博文,次要内容是 介绍 Netty 中 EventLoop 的应用,优化及源码解析,往期系列文章请拜访博主的 Netty 专栏,博文中的所有代码全副收集在博主的 GitHub 仓库中;

概述

事件循环对象 EventLoop

在 Netty 中,EventLoop 是用于解决 I/O 事件的线程。它容许咱们在单线程中同时解决多个连贯,防止了阻塞和期待。

简略来说,Netty 的 EventLoop 具备以下几个特点:

  1. 单线程执行:每个 EventLoop 都是由一个线程负责执行,这样能够缩小线程切换开销,进步网络应用程序的性能。
  2. 多路复用:EventLoop 应用底层操作系统提供的 I/O 模型(例如 Java NIO)实现多路复用,即一条线程能够监听多个 Channel,从而实现并发解决多个连贯的能力。
  3. 事件驱动:EventLoop 通过监听与 Channel 相干的 I/O 事件(例如读、写、连贯等),并将其转化为事件对象(例如 ChannelReadEventChannelWriteEvent 等)进行解决。这种事件驱动模型能够让 Netty 高效地响应 I/O 事件,防止了轮询等不必要的操作。
  4. 非阻塞操作:EventLoop 中所有的操作都是非阻塞的,包含 I/O 操作和异步工作的执行。这样能够确保整个零碎始终处于高效运行状态,并进步了零碎的可伸缩性。

其中,EventLoop 的继承关系如下:

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {EventLoopGroup parent();
}

public interface EventLoopGroup extends EventExecutorGroup {...}

public interface EventExecutorGroup extends 
    ScheduledExecutorService, Iterable<EventExecutor> {...}
  • 继承自 java.util.concurrent.ScheduledExecutorService; 因而蕴含了线程池中所有的办法;
  • 继承自 Netty 本人的 OrderedEventExecutor

    • 提供了 boolean inEventLoop(Thread var1) 办法判断一个线程是否属于此 EventLoop;
    • 提供了 EventLoopGroup parent() 办法来看看本人属于哪个 EventLoopGroup;

总之,Netty 中的 EventLoop 提供了一种高性能、高可靠性的网络编程模型,它解决了网络应用程序中的并发、高负载等问题,是构建高性能网络应用程序的重要组成部分。


事件循环组 EventLoopGroup

在 Netty 中,EventLoopGroup 是用于解决 I/O 操作和工作执行的线程池。它负责管理一个或多个 EventLoop 实例,每个 EventLoop 负责解决其调配的所有 Channel 的生命周期中产生的事件。

EventLoopGroup 通常会创立两种类型的线程池:Boss GroupWorker Group。Boss Group 负责监听传入连贯的 申请 ,而 Worker Group 则负责解决曾经建设的连贯的 读写操作

当一个新的连贯到来时,Boss Group 会将连贯申请注册到某个 EventLoop 的 Selector 上,并将其关联到对应的 Channel 对象。随后,Worker Group 将负责解决新连贯上的所有 I/O 操作。

EventLoopGroup 还提供了一些办法,例如 shutdownGracefully(),能够优雅敞开 EventLoopGroup 的所有线程,并期待它们实现未实现的工作。这对于保障程序的平安敞开十分有用。

其中,EventLoopGroup 的继承关系如下:

public interface EventLoopGroup extends EventExecutorGroup {...}

public interface EventExecutorGroup extends 
    ScheduledExecutorService, Iterable<EventExecutor> {...}
  • 继承自 Netty 本人的 EventExecutorGroup

    • 实现了 Iterable 接口提供遍历 EventLoop 的能力;
    • 另有 next 办法获取汇合中下一个 EventLoop;

执行工作

1、创立事件循环组,线程数 nThreads 可采纳默认设置或者依据需要自定义:

EventLoopGroup group = new NioEventLoopGroup(2);

2、获取下一个事件循环对象:

group.next();

这里采纳的是轮询的形式进行调配,次要是为了工作分工比拟平均:

System.out.println(group.next());  
System.out.println(group.next());  
System.out.println(group.next());  
System.out.println(group.next());

运行后果:

io.netty.channel.nio.NioEventLoop@4b14c583
io.netty.channel.nio.NioEventLoop@65466a6a
io.netty.channel.nio.NioEventLoop@4b14c583
io.netty.channel.nio.NioEventLoop@65466a6a

3、执行一般工作,应用 submit

group.next().submit(() -> {
    try {Thread.sleep(1000);
    } catch (InterruptedException e) {throw new RuntimeException(e);
    }
    log.debug(Thread.currentThread().getName());
});

log.debug(Thread.currentThread().getName());

运行后果:

15:49:31 [DEBUG] [main] c.s.n.c.TestEventLoop - main
15:49:32 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - nioEventLoopGroup-2-1

4、执行定时工作,应用 scheduleAtFixedRate

group.next().scheduleAtFixedRate(() -> {log.debug("sidiot.");
}, 0, 1, TimeUnit.SECONDS);

log.debug(Thread.currentThread().getName());

运行后果:

16:26:44 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - sidiot.
16:26:44 [DEBUG] [main] c.s.n.c.TestEventLoop - main
16:26:45 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - sidiot.
16:26:46 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - sidiot.
16:26:47 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - sidiot.

5、敞开 EventLoopGroup

group.shutdownGracefully();

shutdownGracefully 办法会优雅地敞开 EventLoopGroup。该办法首先会切换 EventLoopGroup 到敞开状态从而回绝新的工作的退出,而后在工作队列的工作都解决实现后,进行线程的运行,从而确保整体利用是在失常有序的状态下退出的。

IO 工作

在上篇博文 从 0 到 1(六):入门 -Hello World 中有具体的解析,因而这里就不开展叙述,间接给出代码;

服务端

@Slf4j
public class EventLoopServer {public static void main(String[] args) {new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;
                                log.debug(buf.toString(StandardCharsets.UTF_8));
                            }
                        });
                    }
                })
                .bind(7999);
    }
}

客户端

public class EventLoopClient {public static void main(String[] args) throws InterruptedException {Channel channel = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress(7999))
                .sync()
                .channel();

        System.out.println(channel);
    }
}

运行后果:

17:57:12 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.EventLoopServer - C1: sidiot.
17:57:34 [DEBUG] [nioEventLoopGroup-2-3] c.s.n.c.EventLoopServer - C2: sidiot.
17:57:50 [DEBUG] [nioEventLoopGroup-2-3] c.s.n.c.EventLoopServer - C2: sid10t.
17:57:58 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.EventLoopServer - C1: sid10t.

从运行后果中不难发现,每个客户端都会绑定一个固定的线程进行解决,这是因为当一个客户端连贯到服务器时,Netty 会创立一个新的 Channel,并将其注册到一个 EventLoop 上。

每个客户端所对应的 Channel 将由同一个 EventLoop 来解决,这样能够保障解决音讯的程序,并且防止了线程上下文切换的开销。

细化分工

在上述 IO 工作中,咱们只应用了一个 NioEventLoopGroup 对所有事件进行解决。为了提高效率,须要进一步的细化分工,咱们将应用两个 EventLoopGroup 别离作为 BossWorkerBoss 负责解决 Accept 事件,而 Worker 负责 Read & Write 事件;

.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))

同时思考到,如果 handler 的执行工夫过长,则会占用 Worker 的 NIO 线程,即会影响 Worker 的读写效率,因而还须要再次进行细分,创立一个独立的 EventLoopGroup 用来解决比拟耗时的 handler;

在不细分的状况下,运行后果如下:

10:30:19 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C1: sid10t.
10:30:20 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h1: C3: sid10t.
10:30:24 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h2: C1: sid10t.
10:30:24 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C2: sid10t.
10:30:25 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h2: C3: sid10t.
10:30:29 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h2: C2: sid10t.
10:30:29 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C4: sid10t.
10:30:34 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h2: C4: sid10t.

批改后的代码如下:

EventLoopGroup group = new DefaultEventLoopGroup();

.childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast("h1", new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;
                log.debug("h1: {}.", buf.toString(StandardCharsets.UTF_8));
                ctx.fireChannelRead(msg);
            }
        }).addLast(group, "h2", new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                try {Thread.sleep(5000);
                } catch (InterruptedException e) {throw new RuntimeException(e);
                }
                ByteBuf buf = (ByteBuf) msg;
                log.debug("h2: {}.", buf.toString(StandardCharsets.UTF_8));
            }
        });
    }
})

运行后果:

10:35:31 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C1: sidiot..
10:35:32 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h1: C2: sidiot..
10:35:32 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C3: sidiot..
10:35:33 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h1: C4: sidiot..
10:35:36 [DEBUG] [defaultEventLoopGroup-2-2] c.s.n.c.EventLoopServer - h2: C1: sidiot..
10:35:37 [DEBUG] [defaultEventLoopGroup-2-3] c.s.n.c.EventLoopServer - h2: C2: sidiot..
10:35:37 [DEBUG] [defaultEventLoopGroup-2-4] c.s.n.c.EventLoopServer - h2: C3: sidiot..
10:35:38 [DEBUG] [defaultEventLoopGroup-2-1] c.s.n.c.EventLoopServer - h2: C4: sidiot..

通过前后的后果比照,能够看出,在不应用独立的 EventLoopGroup 进行细分的状况下,耗时的 handler 会始终占用 NIO 线程,从而使得其余的 channel 须要进行期待,导致效率低下;

源码浅析

在上述过程中,多个 handler 用的是不同的 EventLoop,那它们是如何进行切换的呢?

在 Netty 框架中,当有数据须要被读取时,会将读取操作封装成一个 ChannelRead 事件,并通过 ChannelHandlerContext 传递给后续的处理器来解决。

其中,有一个重要的办法 invokeChannelRead,用于调用下一个 ChannelHandler 的 channelRead 办法,即实现数据的传递和解决的过程。

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {next.invokeChannelRead(m);
    } else {executor.execute(new Runnable() {
            @Override
            public void run() {next.invokeChannelRead(m);
            }
        });
    }
}

在上述代码中,抽象类 AbstractChannelHandlerContext 示意了一个 ChannelHandler 上下文对象,它记录了解决以后 ChannelHandler 的信息,以及与之前后的 ChannelHandler 的关系等。

接着,应用了 pipeline.touch() 办法来标记并查看音讯对象 msg 是否为空。该办法能够防止在处理过程中反复解决同一个音讯,进步解决效率。如果 msg 为空,则会抛出异样提醒“msg 不能为空”。

而后,通过 next.executor() 获取到 EventExecutor,即执行 ChannelHandler 的线程池。如果以后线程为 Netty 线程(即 IO 线程),则间接调用 next.invokeChannelRead(m) 执行下一个 ChannelHandler 的 channelRead 办法;否则,将该工作退出到线程池中异步执行。

最初,应用匿名外部类实现 Runnable 接口,定义 run 办法来执行下一个 ChannelHandler 的 channelRead 办法。

总之,invokeChannelRead 办法的次要作用就是封装了 ChannelHandlerContext 的读取操作,依据以后线程的执行状况抉择是否异步执行。这种机制能够进步网络读取的效率和性能,防止了阻塞 IO 操作带来的性能问题,并使得处理过程更加灵便和可控。

后记

以上就是 分析 EventLoop 的所有内容了,心愿本篇博文对大家有所帮忙!

参考:

  • Netty API reference;
  • 黑马程序员 Netty 全套教程;

📝 上篇精讲:「萌新入门」(一)Hello, World!

💖 我是 𝓼𝓲𝓭𝓲𝓸𝓽,期待你的关注;

👍 创作不易,请多多反对;

🔥 系列专栏:摸索 Netty:源码解析与利用案例分享

退出移动版