共计 8367 个字符,预计需要花费 21 分钟才能阅读完成。
前言
本篇博文是《从 0 到 1 学习 Netty》中入门系列的第二篇博文,次要内容是 介绍 Netty 中 EventLoop 的应用,优化及源码解析,往期系列文章请拜访博主的 Netty 专栏,博文中的所有代码全副收集在博主的 GitHub 仓库中;
概述
事件循环对象 EventLoop
在 Netty 中,EventLoop
是用于解决 I/O 事件的线程。它容许咱们在单线程中同时解决多个连贯,防止了阻塞和期待。
简略来说,Netty 的 EventLoop
具备以下几个特点:
- 单线程执行:每个 EventLoop 都是由一个线程负责执行,这样能够缩小线程切换开销,进步网络应用程序的性能。
- 多路复用:EventLoop 应用底层操作系统提供的 I/O 模型(例如 Java NIO)实现多路复用,即一条线程能够监听多个 Channel,从而实现并发解决多个连贯的能力。
- 事件驱动:EventLoop 通过监听与 Channel 相干的 I/O 事件(例如读、写、连贯等),并将其转化为事件对象(例如
ChannelReadEvent
、ChannelWriteEvent
等)进行解决。这种事件驱动模型能够让 Netty 高效地响应 I/O 事件,防止了轮询等不必要的操作。 - 非阻塞操作:EventLoop 中所有的操作都是非阻塞的,包含 I/O 操作和异步工作的执行。这样能够确保整个零碎始终处于高效运行状态,并进步了零碎的可伸缩性。
其中,EventLoop
的继承关系如下:
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {EventLoopGroup parent();
}
public interface EventLoopGroup extends EventExecutorGroup {...}
public interface EventExecutorGroup extends
ScheduledExecutorService, Iterable<EventExecutor> {...}
- 继承自
java.util.concurrent.ScheduledExecutorService;
因而蕴含了线程池中所有的办法; -
继承自 Netty 本人的
OrderedEventExecutor
:- 提供了
boolean inEventLoop(Thread var1)
办法判断一个线程是否属于此 EventLoop; - 提供了
EventLoopGroup parent()
办法来看看本人属于哪个 EventLoopGroup;
- 提供了
总之,Netty 中的 EventLoop 提供了一种高性能、高可靠性的网络编程模型,它解决了网络应用程序中的并发、高负载等问题,是构建高性能网络应用程序的重要组成部分。
事件循环组 EventLoopGroup
在 Netty 中,EventLoopGroup
是用于解决 I/O 操作和工作执行的线程池。它负责管理一个或多个 EventLoop 实例,每个 EventLoop 负责解决其调配的所有 Channel 的生命周期中产生的事件。
EventLoopGroup 通常会创立两种类型的线程池:Boss Group 和 Worker Group。Boss Group 负责监听传入连贯的 申请 ,而 Worker Group 则负责解决曾经建设的连贯的 读写操作。
当一个新的连贯到来时,Boss Group 会将连贯申请注册到某个 EventLoop 的 Selector 上,并将其关联到对应的 Channel 对象。随后,Worker Group 将负责解决新连贯上的所有 I/O 操作。
EventLoopGroup 还提供了一些办法,例如 shutdownGracefully()
,能够优雅敞开 EventLoopGroup 的所有线程,并期待它们实现未实现的工作。这对于保障程序的平安敞开十分有用。
其中,EventLoopGroup
的继承关系如下:
public interface EventLoopGroup extends EventExecutorGroup {...}
public interface EventExecutorGroup extends
ScheduledExecutorService, Iterable<EventExecutor> {...}
-
继承自 Netty 本人的
EventExecutorGroup
:- 实现了
Iterable
接口提供遍历 EventLoop 的能力; - 另有
next
办法获取汇合中下一个 EventLoop;
- 实现了
执行工作
1、创立事件循环组,线程数 nThreads
可采纳默认设置或者依据需要自定义:
EventLoopGroup group = new NioEventLoopGroup(2);
2、获取下一个事件循环对象:
group.next();
这里采纳的是轮询的形式进行调配,次要是为了工作分工比拟平均:
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
运行后果:
io.netty.channel.nio.NioEventLoop@4b14c583
io.netty.channel.nio.NioEventLoop@65466a6a
io.netty.channel.nio.NioEventLoop@4b14c583
io.netty.channel.nio.NioEventLoop@65466a6a
3、执行一般工作,应用 submit
:
group.next().submit(() -> {
try {Thread.sleep(1000);
} catch (InterruptedException e) {throw new RuntimeException(e);
}
log.debug(Thread.currentThread().getName());
});
log.debug(Thread.currentThread().getName());
运行后果:
15:49:31 [DEBUG] [main] c.s.n.c.TestEventLoop - main
15:49:32 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - nioEventLoopGroup-2-1
4、执行定时工作,应用 scheduleAtFixedRate
:
group.next().scheduleAtFixedRate(() -> {log.debug("sidiot.");
}, 0, 1, TimeUnit.SECONDS);
log.debug(Thread.currentThread().getName());
运行后果:
16:26:44 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - sidiot.
16:26:44 [DEBUG] [main] c.s.n.c.TestEventLoop - main
16:26:45 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - sidiot.
16:26:46 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - sidiot.
16:26:47 [DEBUG] [nioEventLoopGroup-2-1] c.s.n.c.TestEventLoop - sidiot.
5、敞开 EventLoopGroup
:
group.shutdownGracefully();
shutdownGracefully
办法会优雅地敞开 EventLoopGroup
。该办法首先会切换 EventLoopGroup
到敞开状态从而回绝新的工作的退出,而后在工作队列的工作都解决实现后,进行线程的运行,从而确保整体利用是在失常有序的状态下退出的。
IO 工作
在上篇博文 从 0 到 1(六):入门 -Hello World 中有具体的解析,因而这里就不开展叙述,间接给出代码;
服务端
@Slf4j
public class EventLoopServer {public static void main(String[] args) {new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(StandardCharsets.UTF_8));
}
});
}
})
.bind(7999);
}
}
客户端
public class EventLoopClient {public static void main(String[] args) throws InterruptedException {Channel channel = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress(7999))
.sync()
.channel();
System.out.println(channel);
}
}
运行后果:
17:57:12 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.EventLoopServer - C1: sidiot.
17:57:34 [DEBUG] [nioEventLoopGroup-2-3] c.s.n.c.EventLoopServer - C2: sidiot.
17:57:50 [DEBUG] [nioEventLoopGroup-2-3] c.s.n.c.EventLoopServer - C2: sid10t.
17:57:58 [DEBUG] [nioEventLoopGroup-2-2] c.s.n.c.EventLoopServer - C1: sid10t.
从运行后果中不难发现,每个客户端都会绑定一个固定的线程进行解决,这是因为当一个客户端连贯到服务器时,Netty 会创立一个新的 Channel,并将其注册到一个 EventLoop 上。
每个客户端所对应的 Channel 将由同一个 EventLoop 来解决,这样能够保障解决音讯的程序,并且防止了线程上下文切换的开销。
细化分工
在上述 IO 工作中,咱们只应用了一个 NioEventLoopGroup
对所有事件进行解决。为了提高效率,须要进一步的细化分工,咱们将应用两个 EventLoopGroup
别离作为 Boss 和 Worker,Boss 负责解决 Accept 事件,而 Worker 负责 Read & Write 事件;
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
同时思考到,如果 handler 的执行工夫过长,则会占用 Worker 的 NIO 线程,即会影响 Worker 的读写效率,因而还须要再次进行细分,创立一个独立的 EventLoopGroup
用来解决比拟耗时的 handler;
在不细分的状况下,运行后果如下:
10:30:19 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C1: sid10t.
10:30:20 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h1: C3: sid10t.
10:30:24 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h2: C1: sid10t.
10:30:24 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C2: sid10t.
10:30:25 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h2: C3: sid10t.
10:30:29 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h2: C2: sid10t.
10:30:29 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C4: sid10t.
10:30:34 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h2: C4: sid10t.
批改后的代码如下:
EventLoopGroup group = new DefaultEventLoopGroup();
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast("h1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;
log.debug("h1: {}.", buf.toString(StandardCharsets.UTF_8));
ctx.fireChannelRead(msg);
}
}).addLast(group, "h2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {Thread.sleep(5000);
} catch (InterruptedException e) {throw new RuntimeException(e);
}
ByteBuf buf = (ByteBuf) msg;
log.debug("h2: {}.", buf.toString(StandardCharsets.UTF_8));
}
});
}
})
运行后果:
10:35:31 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C1: sidiot..
10:35:32 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h1: C2: sidiot..
10:35:32 [DEBUG] [nioEventLoopGroup-4-2] c.s.n.c.EventLoopServer - h1: C3: sidiot..
10:35:33 [DEBUG] [nioEventLoopGroup-4-1] c.s.n.c.EventLoopServer - h1: C4: sidiot..
10:35:36 [DEBUG] [defaultEventLoopGroup-2-2] c.s.n.c.EventLoopServer - h2: C1: sidiot..
10:35:37 [DEBUG] [defaultEventLoopGroup-2-3] c.s.n.c.EventLoopServer - h2: C2: sidiot..
10:35:37 [DEBUG] [defaultEventLoopGroup-2-4] c.s.n.c.EventLoopServer - h2: C3: sidiot..
10:35:38 [DEBUG] [defaultEventLoopGroup-2-1] c.s.n.c.EventLoopServer - h2: C4: sidiot..
通过前后的后果比照,能够看出,在不应用独立的 EventLoopGroup
进行细分的状况下,耗时的 handler 会始终占用 NIO 线程,从而使得其余的 channel 须要进行期待,导致效率低下;
源码浅析
在上述过程中,多个 handler 用的是不同的 EventLoop
,那它们是如何进行切换的呢?
在 Netty 框架中,当有数据须要被读取时,会将读取操作封装成一个 ChannelRead
事件,并通过 ChannelHandlerContext 传递给后续的处理器来解决。
其中,有一个重要的办法 invokeChannelRead
,用于调用下一个 ChannelHandler 的 channelRead 办法,即实现数据的传递和解决的过程。
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {next.invokeChannelRead(m);
} else {executor.execute(new Runnable() {
@Override
public void run() {next.invokeChannelRead(m);
}
});
}
}
在上述代码中,抽象类 AbstractChannelHandlerContext
示意了一个 ChannelHandler 上下文对象,它记录了解决以后 ChannelHandler 的信息,以及与之前后的 ChannelHandler 的关系等。
接着,应用了 pipeline.touch()
办法来标记并查看音讯对象 msg 是否为空。该办法能够防止在处理过程中反复解决同一个音讯,进步解决效率。如果 msg 为空,则会抛出异样提醒“msg 不能为空”。
而后,通过 next.executor()
获取到 EventExecutor,即执行 ChannelHandler 的线程池。如果以后线程为 Netty 线程(即 IO 线程),则间接调用 next.invokeChannelRead(m)
执行下一个 ChannelHandler 的 channelRead 办法;否则,将该工作退出到线程池中异步执行。
最初,应用匿名外部类实现 Runnable
接口,定义 run
办法来执行下一个 ChannelHandler 的 channelRead 办法。
总之,invokeChannelRead
办法的次要作用就是封装了 ChannelHandlerContext 的读取操作,依据以后线程的执行状况抉择是否异步执行。这种机制能够进步网络读取的效率和性能,防止了阻塞 IO 操作带来的性能问题,并使得处理过程更加灵便和可控。
后记
以上就是 分析 EventLoop 的所有内容了,心愿本篇博文对大家有所帮忙!
参考:
- Netty API reference;
- 黑马程序员 Netty 全套教程;
📝 上篇精讲:「萌新入门」(一)Hello, World!
💖 我是 𝓼𝓲𝓭𝓲𝓸𝓽,期待你的关注;
👍 创作不易,请多多反对;
🔥 系列专栏:摸索 Netty:源码解析与利用案例分享