本篇文章次要详细分析 Netty 中的外围组件。
启动器 Bootstrap 和 ServerBootstrap 作为 Netty 构建客户端和服务端的路口,是编写 Netty 网络程序的第一步。它能够让咱们把 Netty 的外围组件像搭积木一样组装在一起。在 Netty Server 端构建的过程中,咱们须要关注三个重要的步骤
- 配置线程池
- Channel 初始化
- Handler 处理器构建
后面咱们讲过 NIO 多路复用的设计模式之 Reactor 模型,Reactor 模型的次要思维就是把网络连接、事件散发、工作解决的职责进行拆散,并且通过引入多线程来进步 Reactor 模型中的吞吐量。其中包含三种 Reactor 模型
- 单线程单 Reactor 模型
- 多线程单 Reactor 模型
- 多线程多 Reactor 模型
在 Netty 中,能够十分轻松的实现上述三种线程模型,并且 Netty 举荐应用主从多线程模型,这样就能够轻松的实现成千上万的客户端连贯的解决。在海量的客户端并发申请中,主从多线程模型能够通过减少 SubReactor 线程数量,充分利用多核能力晋升零碎吞吐量。
Reactor 模型的运行机制分为四个步骤,如图 2 -10 所示。
- 连贯注册,Channel 建设后,注册到 Reactor 线程中的 Selector 选择器
- 事件轮询,轮询 Selector 选择器中曾经注册的所有 Channel 的 I / O 事件
- 事件散发,为准备就绪的 I / O 事件调配相应的解决线程
- 工作解决,Reactor 线程还负责工作队列中的非 I / O 工作,每个 Worker 线程从各自保护的工作队列中取出工作异步执行。
<center> 图 2 -10 Reactor 工作流程 </center>
EventLoop 事件循环
在 Netty 中,Reactor 模型的事件处理器是应用 EventLoop 来实现的,一个 EventLoop 对应一个线程,EventLoop 外部保护了一个 Selector 和 taskQueue,别离用来解决网络 IO 事件以及外部工作,它的工作原理如图 2 -11 所示。
<center> 图 2 -11 NioEventLoop 原理 </center>
EventLoop 根本利用
上面这段代码示意 EventLoop,别离实现 Selector 注册以及一般工作提交性能。
public class EventLoopExample {public static void main(String[] args) {EventLoopGroup group=new NioEventLoopGroup(2);
System.out.println(group.next()); // 输入第一个 NioEventLoop
System.out.println(group.next()); // 输入第二个 NioEventLoop
System.out.println(group.next()); // 因为只有两个,所以又会从第一个开始
// 获取一个事件循环对象 NioEventLoop
group.next().register(); // 注册到 selector 上
EventLoop 的外围流程
基于上述的解说,了解了 EventLoop 的工作机制后,咱们再通过一个整体的流程图来阐明,如图 2 -12 所示。
EventLoop 是一个 Reactor 模型 的事件处理器,一个 EventLoop 对应一个线程,其外部会保护一个 selector 和 taskQueue,负责解决 IO 事件和外部工作。IO 事件和外部工作执行工夫百分比通过 ioRatio 来调节,ioRatio 示意执行 IO 工夫所占百分比。工作包含一般工作和曾经到时的提早工作,提早工作寄存到一个优先级队列 PriorityQueue 中,执行工作前从 PriorityQueue 读取所有到时的 task,而后增加到 taskQueue 中,最初对立执行 task。
<center> 图 2 -12 EventLoop 工作机制 </center>
EventLoop 如何实现多种 Reactor 模型
EventLoopGroup group=new NioEventLoopGroup(1); ServerBootstrap b=new ServerBootstrap(); b.group(group);
EventLoopGroup group =new NioEventLoopGroup(); // 默认会设置 cpu 外围数的 2 倍 ServerBootstrap b=new ServerBootstrap(); b.group(group);
EventLoopGroup boss=new NioEventLoopGroup(1); EventLoopGroup work=new NioEventLoopGroup(); ServerBootstrap b=new ServerBootstrap(); b.group(boss,work);
EventLoop 实现原理
EventLoopGroup 初始化办法,在 MultithreadEventExecutorGroup.java 中,依据配置的 nThreads 数量,构建一个 EventExecutor 数组
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {checkPositive(nThreads, "nThreads"); if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try {children[i] = newChild(executor, args); } } }
注册 channel 到多路复用器的实现,MultithreadEventLoopGroup.register 办法()
SingleThreadEventLoop ->AbstractUnsafe.register ->AbstractChannel.register0->AbstractNioChannel.doRegister()
能够看到会把 channel 注册到某一个 eventLoop 中的 unwrappedSelector 复路器中。
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } } }
事件处理过程,通过 NioEventLoop 中的 run 办法一直遍历
protected void run() { int selectCnt = 0; for (;;) { try { int strategy; try { // 计算策略,依据阻塞队列中是否含有工作来决定以后的解决形式 strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE; // nothing on the calendar} nextWakeupNanos.set(curDeadlineNanos); try {if (!hasTasks()) { // 如果队列中数据为空,则调用 select 查问就绪事件 strategy = select(curDeadlineNanos); } } finally {nextWakeupNanos.lazySet(AWAKE); } default: } } selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; /* ioRatio 调节连贯事件和外部工作执行事件百分比 * ioRatio 越大,连贯事件处理占用百分比越大 */ final int ioRatio = this.ioRatio; boolean ranTasks; if (ioRatio == 100) { try {if (strategy > 0) { // 解决 IO 工夫 processSelectedKeys();} } finally { // 确保每次都要执行队列中的工作 ranTasks = runAllTasks();} } else if (strategy > 0) {final long ioStartTime = System.nanoTime(); try {processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else {ranTasks = runAllTasks(0); // This will run the minimum number of tasks } if (ranTasks || strategy > 0) {if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } selectCnt = 0; } else if (unexpectedSelectorWakeup(selectCnt)) {// Unexpected wakeup (unusual case) selectCnt = 0; } } }
服务编排层 Pipeline 的协调解决
通过 EventLoop 能够实现工作的调度,负责监听 I / O 事件、信号事件等,当收到相干事件后,须要有人来响应这些事件和数据,而这些事件是通过 ChannelPipeline 中所定义的 ChannelHandler 实现的,他们是 Netty 中服务编排层的外围组件。
在上面这段代码中,咱们减少了 h1 和 h2 两个 InboundHandler,用来解决客户端数据的读取操作,代码如下。
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
// 配置 Server 的通道,相当于 NIO 中的 ServerSocketChannel
//childHandler 示意给 worker 那些线程配置了一个处理器,// 这个就是下面 NIO 中说的,把解决业务的具体逻辑形象进去,放到 Handler 外面
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {// socketChannel.pipeline().addLast(new NormalMessageHandler());
socketChannel.pipeline().addLast("h1",new ChannelInboundHandlerAdapter(){
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("handler-01");
super.channelRead(ctx, msg);
}).addLast("h2",new ChannelInboundHandlerAdapter(){
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("handler-02");
super.channelRead(ctx, msg);
上述代码构建了一个 ChannelPipeline,失去如图 2 -13 所示的构造,每个 Channel 都会绑定一个 ChannelPipeline,一个 ChannelPipeline 蕴含多个 ChannelHandler,这些 Handler 会被包装成 ChannelHandlerContext 退出到 Pipeline 构建的双向链表中。
ChannelHandlerContext 用来保留 ChannelHandler 的上下文,它蕴含了 ChannelHandler 生命周期中的所有事件,比方 connect/bind/read/write 等,这样设计的益处是,各个 ChannelHandler 进行数据传递时,前置和后置的通用逻辑就能够间接保留到 ChannelHandlerContext 中进行传递。
<center> 图 2 -13</center>
依据网络数据的流向,ChannelPipeline 分为入站 ChannelInBoundHandler 和出站 ChannelOutboundHandler 两个处理器,如图 2 -14 所示,客户端与服务端通信过程中,数据从客户端发向服务端的过程叫出站,对于服务端来说,数据从客户端流入到服务端,这个时候是入站。
<center> 图 2 -14 InBound 和 OutBound 的关系 </center>
ChannelHandler 事件触发机制
当某个 Channel 触发了 IO 事件后,会通过 Handler 进行解决,而 ChannelHandler 是围绕 I / O 事件的生命周期来设计的,比方建设连贯、读数据、写数据、连贯销毁等。
ChannelHandler 有两个重要的子接口实现,别离拦挡数据流入和数据流出的 I / O 事件
- ChannelInboundHandler
- ChannelOutboundHandler
图 2 -15 中显示的 Adapter 类,提供很多默认操作,比方 ChannelHandler 中有很多很多办法,咱们用户自定义的办法有时候不须要重载全副,只须要重载一两个办法,那么能够应用 Adapter 类,它外面有很多默认的办法。其它框架中结尾是 Adapter 的类的作用也大都是如此。所以咱们在应用 netty 的时候,往往很少间接实现 ChannelHandler 的接口,常常是继承 Adapter 类。
<img src=”https://mic-blob-bucket.oss-cn-beijing.aliyuncs.com/202111090025881.png” alt=”image-20210816200206761″ style=”zoom:67%;” />
<center> 图 2 -15 ChannelHandler 类关系图 </center>
ChannelInboundHandler 事件回调和触发机会如下
事件回调办法 | 触发机会 |
channelRegistered | Channel 被注册到 EventLoop |
channelUnregistered | Channel 从 EventLoop 中勾销注册 |
channelActive | Channel 处于就绪状态,能够被读写 |
channelInactive | Channel 处于非就绪状态 |
channelRead | Channel 能够从远端读取到数据 |
channelReadComplete | Channel 读取数据实现 |
userEventTriggered | 用户事件触发时 |
channelWritabilityChanged | Channel 的写状态发生变化 |
ChannelOutboundHandler 工夫回调触发机会
事件回调办法 | 触发机会 |
bind | 当申请将 channel 绑定到本地地址时被调用 |
connect | 当申请将 channel 连贯到近程节点时被调用 |
disconnect | 当申请将 channel 从近程节点断开时被调用 |
close | 当申请敞开 channel 时被调用 |
deregister | 当申请将 channel 从它的 EventLoop 登记时被调用 |
read | 当申请通过 channel 读取数据时被调用 |
flush | 当申请通过 channel 将入队数据刷新到近程节点时调用 |
write | 当申请通过 channel 将数据写到近程节点时被调用 |
public class NormalOutBoundHandler extends ChannelOutboundHandlerAdapter {
private final String name;
public NormalOutBoundHandler(String name) {this.name = name;}
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("OutBoundHandler:"+name);
super.write(ctx, msg, promise);
public class NormalInBoundHandler extends ChannelInboundHandlerAdapter {
private final String name;
private final boolean flush;
public NormalInBoundHandler(String name, boolean flush) {
this.name = name;
this.flush = flush;
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("InboundHandler:"+name);
}else {super.channelRead(ctx, msg);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
// 配置 Server 的通道,相当于 NIO 中的 ServerSocketChannel
//childHandler 示意给 worker 那些线程配置了一个处理器,// 这个就是下面 NIO 中说的,把解决业务的具体逻辑形象进去,放到 Handler 外面
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline()
.addLast(new NormalInBoundHandler("NormalInBoundA",false))
.addLast(new NormalInBoundHandler("NormalInBoundB",false))
.addLast(new NormalInBoundHandler("NormalInBoundC",true));
.addLast(new NormalOutBoundHandler("NormalOutBoundA"))
.addLast(new NormalOutBoundHandler("NormalOutBoundB"))
.addLast(new NormalOutBoundHandler("NormalOutBoundC"));
当客户端向服务端发送申请时,会触发服务端的 NormalInBound 调用链,依照排列程序一一调用 Handler,当 InBound 解决实现后调用 WriteAndFlush 办法向客户端写回数据,此时会触发 NormalOutBoundHandler 调用链的 write 事件。
从执行后果来看,Inbound 和 Outbound 的事件流传方向是不同的,Inbound 流传方向是 head->tail,Outbound 流传方向是 Tail-Head。
ChannelPipeline 工夫流传机制是典型的责任链模式,那么有同学必定会有疑难,如果这条链路中某个 handler 出现异常,那会导致什么问题呢?咱们对后面的例子批改 NormalInBoundHandler
public class NormalInBoundHandler extends ChannelInboundHandlerAdapter {
private final String name;
private final boolean flush;
public NormalInBoundHandler(String name, boolean flush) {
this.name = name;
this.flush = flush;
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("InboundHandler:"+name);
}else {
// 减少异样解决
throw new RuntimeException("InBoundHandler:"+name);
这个时候一旦抛出异样,会导致整个申请链被中断,在 ChannelHandler 中提供了一个异样捕捉办法,这个办法能够防止 ChannelHandler 链中某个 Handler 异样导致申请链路中断。它会把异样依照 Handler 链路的程序从 head 节点流传到 Tail 节点。如果用户最终没有对异样进行解决,则最初由 Tail 节点进行对立解决
批改 NormalInboundHandler,重写上面这个办法。
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("InboundHandlerException:"+name);
super.exceptionCaught(ctx, cause);
在 Netty 利用开发中,好的异样解决十分重要可能让问题排查变得很轻松,所以咱们能够通过一种对立拦挡的形式来解决异样解决问题。
public class ExceptionHandler extends ChannelDuplexHandler {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if(cause instanceof RuntimeException){System.out.println("解决业务异样");
super.exceptionCaught(ctx, cause);
把新增的 ExceptionHandler 增加到 ChannelPipeline 中
bootstrap.group(bossGroup, workerGroup)
// 配置 Server 的通道,相当于 NIO 中的 ServerSocketChannel
//childHandler 示意给 worker 那些线程配置了一个处理器,// 这个就是下面 NIO 中说的,把解决业务的具体逻辑形象进去,放到 Handler 外面
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline()
.addLast(new NormalInBoundHandler("NormalInBoundA",false))
.addLast(new NormalInBoundHandler("NormalInBoundB",false))
.addLast(new NormalInBoundHandler("NormalInBoundC",true));
.addLast(new NormalOutBoundHandler("NormalOutBoundA"))
.addLast(new NormalOutBoundHandler("NormalOutBoundB"))
.addLast(new NormalOutBoundHandler("NormalOutBoundC"))
.addLast(new ExceptionHandler());
