关于java:基于大量图片与实例深度解析Netty中的核心组件

6次阅读

共计 11217 个字符,预计需要花费 29 分钟才能阅读完成。

本篇文章次要详细分析 Netty 中的外围组件。

启动器 Bootstrap 和 ServerBootstrap 作为 Netty 构建客户端和服务端的路口,是编写 Netty 网络程序的第一步。它能够让咱们把 Netty 的外围组件像搭积木一样组装在一起。在 Netty Server 端构建的过程中,咱们须要关注三个重要的步骤

  • 配置线程池
  • Channel 初始化
  • Handler 处理器构建

调度器详解

后面咱们讲过 NIO 多路复用的设计模式之 Reactor 模型,Reactor 模型的次要思维就是把网络连接、事件散发、工作解决的职责进行拆散,并且通过引入多线程来进步 Reactor 模型中的吞吐量。其中包含三种 Reactor 模型

  • 单线程单 Reactor 模型
  • 多线程单 Reactor 模型
  • 多线程多 Reactor 模型

在 Netty 中,能够十分轻松的实现上述三种线程模型,并且 Netty 举荐应用主从多线程模型,这样就能够轻松的实现成千上万的客户端连贯的解决。在海量的客户端并发申请中,主从多线程模型能够通过减少 SubReactor 线程数量,充分利用多核能力晋升零碎吞吐量。

Reactor 模型的运行机制分为四个步骤,如图 2 -10 所示。

  • 连贯注册,Channel 建设后,注册到 Reactor 线程中的 Selector 选择器
  • 事件轮询,轮询 Selector 选择器中曾经注册的所有 Channel 的 I / O 事件
  • 事件散发,为准备就绪的 I / O 事件调配相应的解决线程
  • 工作解决,Reactor 线程还负责工作队列中的非 I / O 工作,每个 Worker 线程从各自保护的工作队列中取出工作异步执行。

<center> 图 2 -10 Reactor 工作流程 </center>

EventLoop 事件循环

在 Netty 中,Reactor 模型的事件处理器是应用 EventLoop 来实现的,一个 EventLoop 对应一个线程,EventLoop 外部保护了一个 Selector 和 taskQueue,别离用来解决网络 IO 事件以及外部工作,它的工作原理如图 2 -11 所示。

<center> 图 2 -11 NioEventLoop 原理 </center>

EventLoop 根本利用

上面这段代码示意 EventLoop,别离实现 Selector 注册以及一般工作提交性能。

public class EventLoopExample {public static void main(String[] args) {EventLoopGroup group=new NioEventLoopGroup(2);
        System.out.println(group.next()); // 输入第一个 NioEventLoop
        System.out.println(group.next()); // 输入第二个 NioEventLoop
        System.out.println(group.next()); // 因为只有两个,所以又会从第一个开始
        // 获取一个事件循环对象 NioEventLoop
        group.next().register(); // 注册到 selector 上
        group.next().submit(()->{System.out.println(Thread.currentThread().getName()+"-----");
        });
    }
}

EventLoop 的外围流程

基于上述的解说,了解了 EventLoop 的工作机制后,咱们再通过一个整体的流程图来阐明,如图 2 -12 所示。

EventLoop 是一个 Reactor 模型 的事件处理器,一个 EventLoop 对应一个线程,其外部会保护一个 selector 和 taskQueue,负责解决 IO 事件和外部工作。IO 事件和外部工作执行工夫百分比通过 ioRatio 来调节,ioRatio 示意执行 IO 工夫所占百分比。工作包含一般工作和曾经到时的提早工作,提早工作寄存到一个优先级队列 PriorityQueue 中,执行工作前从 PriorityQueue 读取所有到时的 task,而后增加到 taskQueue 中,最初对立执行 task。

<center> 图 2 -12 EventLoop 工作机制 </center>

EventLoop 如何实现多种 Reactor 模型

  • 单线程模式

    EventLoopGroup group=new NioEventLoopGroup(1);
    ServerBootstrap b=new ServerBootstrap();
    b.group(group);
  • 多线程模式

    EventLoopGroup group =new NioEventLoopGroup(); // 默认会设置 cpu 外围数的 2 倍
    ServerBootstrap b=new ServerBootstrap();
    b.group(group);
  • 多线程主从模式

    EventLoopGroup boss=new NioEventLoopGroup(1);
    EventLoopGroup work=new NioEventLoopGroup();
    ServerBootstrap b=new ServerBootstrap();
    b.group(boss,work);

EventLoop 实现原理

  • EventLoopGroup 初始化办法,在 MultithreadEventExecutorGroup.java 中,依据配置的 nThreads 数量,构建一个 EventExecutor 数组

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {checkPositive(nThreads, "nThreads");
    
        if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
    
        children = new EventExecutor[nThreads];
    
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {children[i] = newChild(executor, args);
            }
        }
    }
  • 注册 channel 到多路复用器的实现,MultithreadEventLoopGroup.register 办法()

    SingleThreadEventLoop ->AbstractUnsafe.register ->AbstractChannel.register0->AbstractNioChannel.doRegister()

    能够看到会把 channel 注册到某一个 eventLoop 中的 unwrappedSelector 复路器中。

    protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                    return;
                }
            }
    }
  • 事件处理过程,通过 NioEventLoop 中的 run 办法一直遍历

    protected void run() {
        int selectCnt = 0;
        for (;;) {
            try {
                int strategy;
                try {
                    // 计算策略,依据阻塞队列中是否含有工作来决定以后的解决形式
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.BUSY_WAIT:
                            // fall-through to SELECT since the busy-wait is not supported with NIO
                        case SelectStrategy.SELECT:
                            long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                            if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE; // nothing on the calendar}
                            nextWakeupNanos.set(curDeadlineNanos);
                            try {if (!hasTasks()) { // 如果队列中数据为空,则调用 select 查问就绪事件
                                    strategy = select(curDeadlineNanos);
                                }
                            } finally {nextWakeupNanos.lazySet(AWAKE);
                            }
                        default:
                    }
                }
                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                   /* ioRatio 调节连贯事件和外部工作执行事件百分比
                    * ioRatio 越大,连贯事件处理占用百分比越大 */
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) {
                    try {if (strategy > 0) { // 解决 IO 工夫
                            processSelectedKeys();}
                    } finally {
                        // 确保每次都要执行队列中的工作
                        ranTasks = runAllTasks();}
                } else if (strategy > 0) {final long ioStartTime = System.nanoTime();
                    try {processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }
                if (ranTasks || strategy > 0) {if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                     selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) {// Unexpected wakeup (unusual case)
                    selectCnt = 0;
                }
            }
    }

服务编排层 Pipeline 的协调解决

通过 EventLoop 能够实现工作的调度,负责监听 I / O 事件、信号事件等,当收到相干事件后,须要有人来响应这些事件和数据,而这些事件是通过 ChannelPipeline 中所定义的 ChannelHandler 实现的,他们是 Netty 中服务编排层的外围组件。

在上面这段代码中,咱们减少了 h1 和 h2 两个 InboundHandler,用来解决客户端数据的读取操作,代码如下。

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
    // 配置 Server 的通道,相当于 NIO 中的 ServerSocketChannel
    .channel(NioServerSocketChannel.class)
    //childHandler 示意给 worker 那些线程配置了一个处理器,// 这个就是下面 NIO 中说的,把解决业务的具体逻辑形象进去,放到 Handler 外面
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {//                            socketChannel.pipeline().addLast(new NormalMessageHandler());
            socketChannel.pipeline().addLast("h1",new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("handler-01");
                    super.channelRead(ctx, msg);
                }
            }).addLast("h2",new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("handler-02");
                    super.channelRead(ctx, msg);
                }
            });
        }
    });

上述代码构建了一个 ChannelPipeline,失去如图 2 -13 所示的构造,每个 Channel 都会绑定一个 ChannelPipeline,一个 ChannelPipeline 蕴含多个 ChannelHandler,这些 Handler 会被包装成 ChannelHandlerContext 退出到 Pipeline 构建的双向链表中。

ChannelHandlerContext 用来保留 ChannelHandler 的上下文,它蕴含了 ChannelHandler 生命周期中的所有事件,比方 connect/bind/read/write 等,这样设计的益处是,各个 ChannelHandler 进行数据传递时,前置和后置的通用逻辑就能够间接保留到 ChannelHandlerContext 中进行传递。

<center> 图 2 -13</center>

出站和入站操作

依据网络数据的流向,ChannelPipeline 分为入站 ChannelInBoundHandler 和出站 ChannelOutboundHandler 两个处理器,如图 2 -14 所示,客户端与服务端通信过程中,数据从客户端发向服务端的过程叫出站,对于服务端来说,数据从客户端流入到服务端,这个时候是入站。

<center> 图 2 -14 InBound 和 OutBound 的关系 </center>

ChannelHandler 事件触发机制

当某个 Channel 触发了 IO 事件后,会通过 Handler 进行解决,而 ChannelHandler 是围绕 I / O 事件的生命周期来设计的,比方建设连贯、读数据、写数据、连贯销毁等。

ChannelHandler 有两个重要的子接口实现,别离拦挡数据流入和数据流出的 I / O 事件

  • ChannelInboundHandler
  • ChannelOutboundHandler

图 2 -15 中显示的 Adapter 类,提供很多默认操作,比方 ChannelHandler 中有很多很多办法,咱们用户自定义的办法有时候不须要重载全副,只须要重载一两个办法,那么能够应用 Adapter 类,它外面有很多默认的办法。其它框架中结尾是 Adapter 的类的作用也大都是如此。所以咱们在应用 netty 的时候,往往很少间接实现 ChannelHandler 的接口,常常是继承 Adapter 类。

<img src=”https://mic-blob-bucket.oss-cn-beijing.aliyuncs.com/202111090025881.png” alt=”image-20210816200206761″ style=”zoom:67%;” />

<center> 图 2 -15 ChannelHandler 类关系图 </center>

ChannelInboundHandler 事件回调和触发机会如下

事件回调办法 触发机会
channelRegistered Channel 被注册到 EventLoop
channelUnregistered Channel 从 EventLoop 中勾销注册
channelActive Channel 处于就绪状态,能够被读写
channelInactive Channel 处于非就绪状态
channelRead Channel 能够从远端读取到数据
channelReadComplete Channel 读取数据实现
userEventTriggered 用户事件触发时
channelWritabilityChanged Channel 的写状态发生变化

ChannelOutboundHandler 工夫回调触发机会

事件回调办法 触发机会
bind 当申请将 channel 绑定到本地地址时被调用
connect 当申请将 channel 连贯到近程节点时被调用
disconnect 当申请将 channel 从近程节点断开时被调用
close 当申请敞开 channel 时被调用
deregister 当申请将 channel 从它的 EventLoop 登记时被调用
read 当申请通过 channel 读取数据时被调用
flush 当申请通过 channel 将入队数据刷新到近程节点时调用
write 当申请通过 channel 将数据写到近程节点时被调用

事件流传机制演示

public class NormalOutBoundHandler extends ChannelOutboundHandlerAdapter {
    private final String name;

    public NormalOutBoundHandler(String name) {this.name = name;}

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("OutBoundHandler:"+name);
        super.write(ctx, msg, promise);
    }
}
public class NormalInBoundHandler extends ChannelInboundHandlerAdapter {
    private final String name;
    private final boolean flush;

    public NormalInBoundHandler(String name, boolean flush) {
        this.name = name;
        this.flush = flush;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("InboundHandler:"+name);
        if(flush){ctx.channel().writeAndFlush(msg);
        }else {super.channelRead(ctx, msg);
        }
    }
}
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
    // 配置 Server 的通道,相当于 NIO 中的 ServerSocketChannel
    .channel(NioServerSocketChannel.class)
    //childHandler 示意给 worker 那些线程配置了一个处理器,// 这个就是下面 NIO 中说的,把解决业务的具体逻辑形象进去,放到 Handler 外面
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline()
                .addLast(new NormalInBoundHandler("NormalInBoundA",false))
                .addLast(new NormalInBoundHandler("NormalInBoundB",false))
                .addLast(new NormalInBoundHandler("NormalInBoundC",true));
            socketChannel.pipeline()
                .addLast(new NormalOutBoundHandler("NormalOutBoundA"))
                .addLast(new NormalOutBoundHandler("NormalOutBoundB"))
                .addLast(new NormalOutBoundHandler("NormalOutBoundC"));
        }
    });

上述代码运行后会失去如下执行后果

InboundHandler:NormalInBoundA
InboundHandler:NormalInBoundB
InboundHandler:NormalInBoundC
OutBoundHandler:NormalOutBoundC
OutBoundHandler:NormalOutBoundB
OutBoundHandler:NormalOutBoundA

当客户端向服务端发送申请时,会触发服务端的 NormalInBound 调用链,依照排列程序一一调用 Handler,当 InBound 解决实现后调用 WriteAndFlush 办法向客户端写回数据,此时会触发 NormalOutBoundHandler 调用链的 write 事件。

从执行后果来看,Inbound 和 Outbound 的事件流传方向是不同的,Inbound 流传方向是 head->tail,Outbound 流传方向是 Tail-Head。

异样流传机制

ChannelPipeline 工夫流传机制是典型的责任链模式,那么有同学必定会有疑难,如果这条链路中某个 handler 出现异常,那会导致什么问题呢?咱们对后面的例子批改 NormalInBoundHandler

public class NormalInBoundHandler extends ChannelInboundHandlerAdapter {
    private final String name;
    private final boolean flush;

    public NormalInBoundHandler(String name, boolean flush) {
        this.name = name;
        this.flush = flush;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("InboundHandler:"+name);
        if(flush){ctx.channel().writeAndFlush(msg);
        }else {
            // 减少异样解决
            throw new RuntimeException("InBoundHandler:"+name);
        }
    }
}

这个时候一旦抛出异样,会导致整个申请链被中断,在 ChannelHandler 中提供了一个异样捕捉办法,这个办法能够防止 ChannelHandler 链中某个 Handler 异样导致申请链路中断。它会把异样依照 Handler 链路的程序从 head 节点流传到 Tail 节点。如果用户最终没有对异样进行解决,则最初由 Tail 节点进行对立解决

批改 NormalInboundHandler,重写上面这个办法。

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("InboundHandlerException:"+name);
    super.exceptionCaught(ctx, cause);
}

在 Netty 利用开发中,好的异样解决十分重要可能让问题排查变得很轻松,所以咱们能够通过一种对立拦挡的形式来解决异样解决问题。

增加一个复合处理器实现类

public class ExceptionHandler extends ChannelDuplexHandler {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if(cause instanceof RuntimeException){System.out.println("解决业务异样");
        }
        super.exceptionCaught(ctx, cause);
    }
}

把新增的 ExceptionHandler 增加到 ChannelPipeline 中

bootstrap.group(bossGroup, workerGroup)
    // 配置 Server 的通道,相当于 NIO 中的 ServerSocketChannel
    .channel(NioServerSocketChannel.class)
    //childHandler 示意给 worker 那些线程配置了一个处理器,// 这个就是下面 NIO 中说的,把解决业务的具体逻辑形象进去,放到 Handler 外面
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline()
                .addLast(new NormalInBoundHandler("NormalInBoundA",false))
                .addLast(new NormalInBoundHandler("NormalInBoundB",false))
                .addLast(new NormalInBoundHandler("NormalInBoundC",true));
            socketChannel.pipeline()
                .addLast(new NormalOutBoundHandler("NormalOutBoundA"))
                .addLast(new NormalOutBoundHandler("NormalOutBoundB"))
                .addLast(new NormalOutBoundHandler("NormalOutBoundC"))
                .addLast(new ExceptionHandler());
        }
    });

最终,咱们就可能实现异样的对立解决。

版权申明:本博客所有文章除特地申明外,均采纳 CC BY-NC-SA 4.0 许可协定。转载请注明来自 Mic 带你学架构
如果本篇文章对您有帮忙,还请帮忙点个关注和赞,您的保持是我一直创作的能源。欢送关注同名微信公众号获取更多技术干货!

正文完
 0