关于java:基于大量图片与实例深度解析Netty中的核心组件

本篇文章次要详细分析Netty中的外围组件。

启动器Bootstrap和ServerBootstrap作为Netty构建客户端和服务端的路口,是编写Netty网络程序的第一步。它能够让咱们把Netty的外围组件像搭积木一样组装在一起。在Netty Server端构建的过程中,咱们须要关注三个重要的步骤

  • 配置线程池
  • Channel初始化
  • Handler处理器构建

调度器详解

后面咱们讲过NIO多路复用的设计模式之Reactor模型,Reactor模型的次要思维就是把网络连接、事件散发、工作解决的职责进行拆散,并且通过引入多线程来进步Reactor模型中的吞吐量。其中包含三种Reactor模型

  • 单线程单Reactor模型
  • 多线程单Reactor模型
  • 多线程多Reactor模型

在Netty中,能够十分轻松的实现上述三种线程模型,并且Netty举荐应用主从多线程模型,这样就能够轻松的实现成千上万的客户端连贯的解决。在海量的客户端并发申请中,主从多线程模型能够通过减少SubReactor线程数量,充分利用多核能力晋升零碎吞吐量。

Reactor模型的运行机制分为四个步骤,如图2-10所示。

  • 连贯注册,Channel建设后,注册到Reactor线程中的Selector选择器
  • 事件轮询,轮询Selector选择器中曾经注册的所有Channel的I/O事件
  • 事件散发,为准备就绪的I/O事件调配相应的解决线程
  • 工作解决,Reactor线程还负责工作队列中的非I/O工作,每个Worker线程从各自保护的工作队列中取出工作异步执行。

<center>图2-10 Reactor工作流程</center>

EventLoop事件循环

在Netty中,Reactor模型的事件处理器是应用EventLoop来实现的,一个EventLoop对应一个线程,EventLoop外部保护了一个Selector和taskQueue,别离用来解决网络IO事件以及外部工作,它的工作原理如图2-11所示。

<center>图2-11 NioEventLoop原理</center>

EventLoop根本利用

上面这段代码示意EventLoop,别离实现Selector注册以及一般工作提交性能。

public class EventLoopExample {

    public static void main(String[] args) {
        EventLoopGroup group=new NioEventLoopGroup(2);
        System.out.println(group.next()); //输入第一个NioEventLoop
        System.out.println(group.next()); //输入第二个NioEventLoop
        System.out.println(group.next()); //因为只有两个,所以又会从第一个开始
        //获取一个事件循环对象NioEventLoop
        group.next().register(); //注册到selector上
        group.next().submit(()->{
            System.out.println(Thread.currentThread().getName()+"-----");
        });
    }
}

EventLoop的外围流程

基于上述的解说,了解了EventLoop的工作机制后,咱们再通过一个整体的流程图来阐明,如图2-12所示。

EventLoop是一个Reactor模型的事件处理器,一个EventLoop对应一个线程,其外部会保护一个selector和taskQueue,负责解决IO事件和外部工作。IO事件和外部工作执行工夫百分比通过ioRatio来调节,ioRatio示意执行IO工夫所占百分比。工作包含一般工作和曾经到时的提早工作,提早工作寄存到一个优先级队列PriorityQueue中,执行工作前从PriorityQueue读取所有到时的task,而后增加到taskQueue中,最初对立执行task。

<center>图2-12 EventLoop工作机制</center>

EventLoop如何实现多种Reactor模型

  • 单线程模式

    EventLoopGroup group=new NioEventLoopGroup(1);
    ServerBootstrap b=new ServerBootstrap();
    b.group(group);
  • 多线程模式

    EventLoopGroup group =new NioEventLoopGroup(); //默认会设置cpu外围数的2倍
    ServerBootstrap b=new ServerBootstrap();
    b.group(group);
  • 多线程主从模式

    EventLoopGroup boss=new NioEventLoopGroup(1);
    EventLoopGroup work=new NioEventLoopGroup();
    ServerBootstrap b=new ServerBootstrap();
    b.group(boss,work);

EventLoop实现原理

  • EventLoopGroup初始化办法,在MultithreadEventExecutorGroup.java中,依据配置的nThreads数量,构建一个EventExecutor数组

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        checkPositive(nThreads, "nThreads");
    
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
    
        children = new EventExecutor[nThreads];
    
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
            }
        }
    }
  • 注册channel到多路复用器的实现,MultithreadEventLoopGroup.register办法()

    SingleThreadEventLoop ->AbstractUnsafe.register ->AbstractChannel.register0->AbstractNioChannel.doRegister()

    能够看到会把channel注册到某一个eventLoop中的unwrappedSelector复路器中。

    protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                    return;
                }
            }
    }
  • 事件处理过程,通过NioEventLoop中的run办法一直遍历

    protected void run() {
        int selectCnt = 0;
        for (;;) {
            try {
                int strategy;
                try {
                    //计算策略,依据阻塞队列中是否含有工作来决定以后的解决形式
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.BUSY_WAIT:
                            // fall-through to SELECT since the busy-wait is not supported with NIO
                        case SelectStrategy.SELECT:
                            long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                            if (curDeadlineNanos == -1L) {
                                curDeadlineNanos = NONE; // nothing on the calendar
                            }
                            nextWakeupNanos.set(curDeadlineNanos);
                            try {
                                if (!hasTasks()) { //如果队列中数据为空,则调用select查问就绪事件
                                    strategy = select(curDeadlineNanos);
                                }
                            } finally {
                                nextWakeupNanos.lazySet(AWAKE);
                            }
                        default:
                    }
                }
                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                   /* ioRatio调节连贯事件和外部工作执行事件百分比
                    * ioRatio越大,连贯事件处理占用百分比越大 */
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) { //解决IO工夫
                            processSelectedKeys();
                        }
                    } finally {
                        //确保每次都要执行队列中的工作
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }
                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                     selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    selectCnt = 0;
                }
            }
    }

服务编排层Pipeline的协调解决

通过EventLoop能够实现工作的调度,负责监听I/O事件、信号事件等,当收到相干事件后,须要有人来响应这些事件和数据,而这些事件是通过ChannelPipeline中所定义的ChannelHandler实现的,他们是Netty中服务编排层的外围组件。

在上面这段代码中,咱们减少了h1和h2两个InboundHandler,用来解决客户端数据的读取操作,代码如下。

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
    //配置Server的通道,相当于NIO中的ServerSocketChannel
    .channel(NioServerSocketChannel.class)
    //childHandler示意给worker那些线程配置了一个处理器,
    // 这个就是下面NIO中说的,把解决业务的具体逻辑形象进去,放到Handler外面
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            //                            socketChannel.pipeline().addLast(new NormalMessageHandler());
            socketChannel.pipeline().addLast("h1",new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    System.out.println("handler-01");
                    super.channelRead(ctx, msg);
                }
            }).addLast("h2",new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    System.out.println("handler-02");
                    super.channelRead(ctx, msg);
                }
            });
        }
    });

上述代码构建了一个ChannelPipeline,失去如图2-13所示的构造,每个Channel都会绑定一个ChannelPipeline,一个ChannelPipeline蕴含多个ChannelHandler,这些Handler会被包装成ChannelHandlerContext退出到Pipeline构建的双向链表中。

ChannelHandlerContext用来保留ChannelHandler的上下文,它蕴含了ChannelHandler生命周期中的所有事件,比方connect/bind/read/write等,这样设计的益处是,各个ChannelHandler进行数据传递时,前置和后置的通用逻辑就能够间接保留到ChannelHandlerContext中进行传递。

<center>图2-13</center>

出站和入站操作

依据网络数据的流向,ChannelPipeline分为入站ChannelInBoundHandler和出站ChannelOutboundHandler两个处理器,如图2-14所示,客户端与服务端通信过程中,数据从客户端发向服务端的过程叫出站,对于服务端来说,数据从客户端流入到服务端,这个时候是入站。

<center>图2-14 InBound和OutBound的关系</center>

ChannelHandler事件触发机制

当某个Channel触发了IO事件后,会通过Handler进行解决,而ChannelHandler是围绕I/O事件的生命周期来设计的,比方建设连贯、读数据、写数据、连贯销毁等。

ChannelHandler有两个重要的子接口实现,别离拦挡数据流入和数据流出的I/O事件

  • ChannelInboundHandler
  • ChannelOutboundHandler

图2-15中显示的Adapter类,提供很多默认操作,比方ChannelHandler中有很多很多办法,咱们用户自定义的办法有时候不须要重载全副,只须要重载一两个办法,那么能够应用Adapter类,它外面有很多默认的办法。其它框架中结尾是Adapter的类的作用也大都是如此。所以咱们在应用netty的时候,往往很少间接实现ChannelHandler的接口,常常是继承Adapter类。

<img src=”https://mic-blob-bucket.oss-cn-beijing.aliyuncs.com/202111090025881.png” alt=”image-20210816200206761″ style=”zoom:67%;” />

<center>图2-15 ChannelHandler类关系图</center>

ChannelInboundHandler事件回调和触发机会如下

事件回调办法 触发机会
channelRegistered Channel 被注册到 EventLoop
channelUnregistered Channel 从 EventLoop 中勾销注册
channelActive Channel 处于就绪状态,能够被读写
channelInactive Channel 处于非就绪状态
channelRead Channel 能够从远端读取到数据
channelReadComplete Channel 读取数据实现
userEventTriggered 用户事件触发时
channelWritabilityChanged Channel 的写状态发生变化

ChannelOutboundHandler工夫回调触发机会

事件回调办法 触发机会
bind 当申请将channel绑定到本地地址时被调用
connect 当申请将channel连贯到近程节点时被调用
disconnect 当申请将channel从近程节点断开时被调用
close 当申请敞开channel时被调用
deregister 当申请将channel从它的EventLoop登记时被调用
read 当申请通过channel读取数据时被调用
flush 当申请通过channel将入队数据刷新到近程节点时调用
write 当申请通过channel将数据写到近程节点时被调用

事件流传机制演示

public class NormalOutBoundHandler extends ChannelOutboundHandlerAdapter {
    private final String name;

    public NormalOutBoundHandler(String name) {
        this.name = name;
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("OutBoundHandler:"+name);
        super.write(ctx, msg, promise);
    }
}
public class NormalInBoundHandler extends ChannelInboundHandlerAdapter {
    private final String name;
    private final boolean flush;

    public NormalInBoundHandler(String name, boolean flush) {
        this.name = name;
        this.flush = flush;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InboundHandler:"+name);
        if(flush){
            ctx.channel().writeAndFlush(msg);
        }else {
            super.channelRead(ctx, msg);
        }
    }
}
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
    //配置Server的通道,相当于NIO中的ServerSocketChannel
    .channel(NioServerSocketChannel.class)
    //childHandler示意给worker那些线程配置了一个处理器,
    // 这个就是下面NIO中说的,把解决业务的具体逻辑形象进去,放到Handler外面
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline()
                .addLast(new NormalInBoundHandler("NormalInBoundA",false))
                .addLast(new NormalInBoundHandler("NormalInBoundB",false))
                .addLast(new NormalInBoundHandler("NormalInBoundC",true));
            socketChannel.pipeline()
                .addLast(new NormalOutBoundHandler("NormalOutBoundA"))
                .addLast(new NormalOutBoundHandler("NormalOutBoundB"))
                .addLast(new NormalOutBoundHandler("NormalOutBoundC"));
        }
    });

上述代码运行后会失去如下执行后果

InboundHandler:NormalInBoundA
InboundHandler:NormalInBoundB
InboundHandler:NormalInBoundC
OutBoundHandler:NormalOutBoundC
OutBoundHandler:NormalOutBoundB
OutBoundHandler:NormalOutBoundA

当客户端向服务端发送申请时,会触发服务端的NormalInBound调用链,依照排列程序一一调用Handler,当InBound解决实现后调用WriteAndFlush办法向客户端写回数据,此时会触发NormalOutBoundHandler调用链的write事件。

从执行后果来看,Inbound和Outbound的事件流传方向是不同的,Inbound流传方向是head->tail,Outbound流传方向是Tail-Head。

异样流传机制

ChannelPipeline工夫流传机制是典型的责任链模式,那么有同学必定会有疑难,如果这条链路中某个handler出现异常,那会导致什么问题呢?咱们对后面的例子批改NormalInBoundHandler

public class NormalInBoundHandler extends ChannelInboundHandlerAdapter {
    private final String name;
    private final boolean flush;

    public NormalInBoundHandler(String name, boolean flush) {
        this.name = name;
        this.flush = flush;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InboundHandler:"+name);
        if(flush){
            ctx.channel().writeAndFlush(msg);
        }else {
            //减少异样解决
            throw new RuntimeException("InBoundHandler:"+name);
        }
    }
}

这个时候一旦抛出异样,会导致整个申请链被中断,在ChannelHandler中提供了一个异样捕捉办法,这个办法能够防止ChannelHandler链中某个Handler异样导致申请链路中断。它会把异样依照Handler链路的程序从head节点流传到Tail节点。如果用户最终没有对异样进行解决,则最初由Tail节点进行对立解决

批改NormalInboundHandler,重写上面这个办法。

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    System.out.println("InboundHandlerException:"+name);
    super.exceptionCaught(ctx, cause);
}

在Netty利用开发中,好的异样解决十分重要可能让问题排查变得很轻松,所以咱们能够通过一种对立拦挡的形式来解决异样解决问题。

增加一个复合处理器实现类

public class ExceptionHandler extends ChannelDuplexHandler {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if(cause instanceof RuntimeException){
            System.out.println("解决业务异样");
        }
        super.exceptionCaught(ctx, cause);
    }
}

把新增的ExceptionHandler增加到ChannelPipeline中

bootstrap.group(bossGroup, workerGroup)
    //配置Server的通道,相当于NIO中的ServerSocketChannel
    .channel(NioServerSocketChannel.class)
    //childHandler示意给worker那些线程配置了一个处理器,
    // 这个就是下面NIO中说的,把解决业务的具体逻辑形象进去,放到Handler外面
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline()
                .addLast(new NormalInBoundHandler("NormalInBoundA",false))
                .addLast(new NormalInBoundHandler("NormalInBoundB",false))
                .addLast(new NormalInBoundHandler("NormalInBoundC",true));
            socketChannel.pipeline()
                .addLast(new NormalOutBoundHandler("NormalOutBoundA"))
                .addLast(new NormalOutBoundHandler("NormalOutBoundB"))
                .addLast(new NormalOutBoundHandler("NormalOutBoundC"))
                .addLast(new ExceptionHandler());
        }
    });

最终,咱们就可能实现异样的对立解决。

版权申明:本博客所有文章除特地申明外,均采纳 CC BY-NC-SA 4.0 许可协定。转载请注明来自 Mic带你学架构
如果本篇文章对您有帮忙,还请帮忙点个关注和赞,您的保持是我一直创作的能源。欢送关注同名微信公众号获取更多技术干货!

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理