本篇文章次要详细分析Netty中的外围组件。

启动器Bootstrap和ServerBootstrap作为Netty构建客户端和服务端的路口,是编写Netty网络程序的第一步。它能够让咱们把Netty的外围组件像搭积木一样组装在一起。在Netty Server端构建的过程中,咱们须要关注三个重要的步骤

  • 配置线程池
  • Channel初始化
  • Handler处理器构建

调度器详解

后面咱们讲过NIO多路复用的设计模式之Reactor模型,Reactor模型的次要思维就是把网络连接、事件散发、工作解决的职责进行拆散,并且通过引入多线程来进步Reactor模型中的吞吐量。其中包含三种Reactor模型

  • 单线程单Reactor模型
  • 多线程单Reactor模型
  • 多线程多Reactor模型

在Netty中,能够十分轻松的实现上述三种线程模型,并且Netty举荐应用主从多线程模型,这样就能够轻松的实现成千上万的客户端连贯的解决。在海量的客户端并发申请中,主从多线程模型能够通过减少SubReactor线程数量,充分利用多核能力晋升零碎吞吐量。

Reactor模型的运行机制分为四个步骤,如图2-10所示。

  • 连贯注册,Channel建设后,注册到Reactor线程中的Selector选择器
  • 事件轮询,轮询Selector选择器中曾经注册的所有Channel的I/O事件
  • 事件散发,为准备就绪的I/O事件调配相应的解决线程
  • 工作解决,Reactor线程还负责工作队列中的非I/O工作,每个Worker线程从各自保护的工作队列中取出工作异步执行。

<center>图2-10 Reactor工作流程</center>

EventLoop事件循环

在Netty中,Reactor模型的事件处理器是应用EventLoop来实现的,一个EventLoop对应一个线程,EventLoop外部保护了一个Selector和taskQueue,别离用来解决网络IO事件以及外部工作,它的工作原理如图2-11所示。

<center>图2-11 NioEventLoop原理</center>

EventLoop根本利用

上面这段代码示意EventLoop,别离实现Selector注册以及一般工作提交性能。

public class EventLoopExample {    public static void main(String[] args) {        EventLoopGroup group=new NioEventLoopGroup(2);        System.out.println(group.next()); //输入第一个NioEventLoop        System.out.println(group.next()); //输入第二个NioEventLoop        System.out.println(group.next()); //因为只有两个,所以又会从第一个开始        //获取一个事件循环对象NioEventLoop        group.next().register(); //注册到selector上        group.next().submit(()->{            System.out.println(Thread.currentThread().getName()+"-----");        });    }}

EventLoop的外围流程

基于上述的解说,了解了EventLoop的工作机制后,咱们再通过一个整体的流程图来阐明,如图2-12所示。

EventLoop是一个Reactor模型的事件处理器,一个EventLoop对应一个线程,其外部会保护一个selector和taskQueue,负责解决IO事件和外部工作。IO事件和外部工作执行工夫百分比通过ioRatio来调节,ioRatio示意执行IO工夫所占百分比。工作包含一般工作和曾经到时的提早工作,提早工作寄存到一个优先级队列PriorityQueue中,执行工作前从PriorityQueue读取所有到时的task,而后增加到taskQueue中,最初对立执行task。

<center>图2-12 EventLoop工作机制</center>

EventLoop如何实现多种Reactor模型

  • 单线程模式

    EventLoopGroup group=new NioEventLoopGroup(1);ServerBootstrap b=new ServerBootstrap();b.group(group);
  • 多线程模式

    EventLoopGroup group =new NioEventLoopGroup(); //默认会设置cpu外围数的2倍ServerBootstrap b=new ServerBootstrap();b.group(group);
  • 多线程主从模式

    EventLoopGroup boss=new NioEventLoopGroup(1);EventLoopGroup work=new NioEventLoopGroup();ServerBootstrap b=new ServerBootstrap();b.group(boss,work);

EventLoop实现原理

  • EventLoopGroup初始化办法,在MultithreadEventExecutorGroup.java中,依据配置的nThreads数量,构建一个EventExecutor数组

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,                                        EventExecutorChooserFactory chooserFactory, Object... args) {    checkPositive(nThreads, "nThreads");    if (executor == null) {        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());    }    children = new EventExecutor[nThreads];    for (int i = 0; i < nThreads; i ++) {        boolean success = false;        try {            children[i] = newChild(executor, args);        }    }}
  • 注册channel到多路复用器的实现,MultithreadEventLoopGroup.register办法()

    SingleThreadEventLoop ->AbstractUnsafe.register ->AbstractChannel.register0->AbstractNioChannel.doRegister()

    能够看到会把channel注册到某一个eventLoop中的unwrappedSelector复路器中。

    protected void doRegister() throws Exception {        boolean selected = false;        for (;;) {            try {                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);                return;            }        }}
  • 事件处理过程,通过NioEventLoop中的run办法一直遍历

    protected void run() {    int selectCnt = 0;    for (;;) {        try {            int strategy;            try {                //计算策略,依据阻塞队列中是否含有工作来决定以后的解决形式                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());                switch (strategy) {                    case SelectStrategy.CONTINUE:                        continue;                    case SelectStrategy.BUSY_WAIT:                        // fall-through to SELECT since the busy-wait is not supported with NIO                    case SelectStrategy.SELECT:                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();                        if (curDeadlineNanos == -1L) {                            curDeadlineNanos = NONE; // nothing on the calendar                        }                        nextWakeupNanos.set(curDeadlineNanos);                        try {                            if (!hasTasks()) { //如果队列中数据为空,则调用select查问就绪事件                                strategy = select(curDeadlineNanos);                            }                        } finally {                            nextWakeupNanos.lazySet(AWAKE);                        }                    default:                }            }            selectCnt++;            cancelledKeys = 0;            needsToSelectAgain = false;               /* ioRatio调节连贯事件和外部工作执行事件百分比                * ioRatio越大,连贯事件处理占用百分比越大 */            final int ioRatio = this.ioRatio;            boolean ranTasks;            if (ioRatio == 100) {                try {                    if (strategy > 0) { //解决IO工夫                        processSelectedKeys();                    }                } finally {                    //确保每次都要执行队列中的工作                    ranTasks = runAllTasks();                }            } else if (strategy > 0) {                final long ioStartTime = System.nanoTime();                try {                    processSelectedKeys();                } finally {                    // Ensure we always run tasks.                    final long ioTime = System.nanoTime() - ioStartTime;                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);                }            } else {                ranTasks = runAllTasks(0); // This will run the minimum number of tasks            }            if (ranTasks || strategy > 0) {                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",                                 selectCnt - 1, selector);                }                selectCnt = 0;            } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)                selectCnt = 0;            }        }}

服务编排层Pipeline的协调解决

通过EventLoop能够实现工作的调度,负责监听I/O事件、信号事件等,当收到相干事件后,须要有人来响应这些事件和数据,而这些事件是通过ChannelPipeline中所定义的ChannelHandler实现的,他们是Netty中服务编排层的外围组件。

在上面这段代码中,咱们减少了h1和h2两个InboundHandler,用来解决客户端数据的读取操作,代码如下。

ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup)    //配置Server的通道,相当于NIO中的ServerSocketChannel    .channel(NioServerSocketChannel.class)    //childHandler示意给worker那些线程配置了一个处理器,    // 这个就是下面NIO中说的,把解决业务的具体逻辑形象进去,放到Handler外面    .childHandler(new ChannelInitializer<SocketChannel>() {        @Override        protected void initChannel(SocketChannel socketChannel) throws Exception {            //                            socketChannel.pipeline().addLast(new NormalMessageHandler());            socketChannel.pipeline().addLast("h1",new ChannelInboundHandlerAdapter(){                @Override                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {                    System.out.println("handler-01");                    super.channelRead(ctx, msg);                }            }).addLast("h2",new ChannelInboundHandlerAdapter(){                @Override                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {                    System.out.println("handler-02");                    super.channelRead(ctx, msg);                }            });        }    });

上述代码构建了一个ChannelPipeline,失去如图2-13所示的构造,每个Channel都会绑定一个ChannelPipeline,一个ChannelPipeline蕴含多个ChannelHandler,这些Handler会被包装成ChannelHandlerContext退出到Pipeline构建的双向链表中。

ChannelHandlerContext用来保留ChannelHandler的上下文,它蕴含了ChannelHandler生命周期中的所有事件,比方connect/bind/read/write等,这样设计的益处是,各个ChannelHandler进行数据传递时,前置和后置的通用逻辑就能够间接保留到ChannelHandlerContext中进行传递。

<center>图2-13</center>

出站和入站操作

依据网络数据的流向,ChannelPipeline分为入站ChannelInBoundHandler和出站ChannelOutboundHandler两个处理器,如图2-14所示,客户端与服务端通信过程中,数据从客户端发向服务端的过程叫出站,对于服务端来说,数据从客户端流入到服务端,这个时候是入站。

<center>图2-14 InBound和OutBound的关系</center>

ChannelHandler事件触发机制

当某个Channel触发了IO事件后,会通过Handler进行解决,而ChannelHandler是围绕I/O事件的生命周期来设计的,比方建设连贯、读数据、写数据、连贯销毁等。

ChannelHandler有两个重要的子接口实现,别离拦挡数据流入和数据流出的I/O事件

  • ChannelInboundHandler
  • ChannelOutboundHandler

图2-15中显示的Adapter类,提供很多默认操作,比方ChannelHandler中有很多很多办法,咱们用户自定义的办法有时候不须要重载全副,只须要重载一两个办法,那么能够应用Adapter类,它外面有很多默认的办法。其它框架中结尾是Adapter的类的作用也大都是如此。所以咱们在应用netty的时候,往往很少间接实现ChannelHandler的接口,常常是继承Adapter类。

<img src="https://mic-blob-bucket.oss-cn-beijing.aliyuncs.com/202111090025881.png" alt="image-20210816200206761" style="zoom:67%;" />

<center>图2-15 ChannelHandler类关系图</center>

ChannelInboundHandler事件回调和触发机会如下

事件回调办法触发机会
channelRegisteredChannel 被注册到 EventLoop
channelUnregisteredChannel 从 EventLoop 中勾销注册
channelActiveChannel 处于就绪状态,能够被读写
channelInactiveChannel 处于非就绪状态
channelReadChannel 能够从远端读取到数据
channelReadCompleteChannel 读取数据实现
userEventTriggered用户事件触发时
channelWritabilityChangedChannel 的写状态发生变化

ChannelOutboundHandler工夫回调触发机会

事件回调办法触发机会
bind当申请将channel绑定到本地地址时被调用
connect当申请将channel连贯到近程节点时被调用
disconnect当申请将channel从近程节点断开时被调用
close当申请敞开channel时被调用
deregister当申请将channel从它的EventLoop登记时被调用
read当申请通过channel读取数据时被调用
flush当申请通过channel将入队数据刷新到近程节点时调用
write当申请通过channel将数据写到近程节点时被调用

事件流传机制演示

public class NormalOutBoundHandler extends ChannelOutboundHandlerAdapter {    private final String name;    public NormalOutBoundHandler(String name) {        this.name = name;    }    @Override    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {        System.out.println("OutBoundHandler:"+name);        super.write(ctx, msg, promise);    }}
public class NormalInBoundHandler extends ChannelInboundHandlerAdapter {    private final String name;    private final boolean flush;    public NormalInBoundHandler(String name, boolean flush) {        this.name = name;        this.flush = flush;    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("InboundHandler:"+name);        if(flush){            ctx.channel().writeAndFlush(msg);        }else {            super.channelRead(ctx, msg);        }    }}
ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup)    //配置Server的通道,相当于NIO中的ServerSocketChannel    .channel(NioServerSocketChannel.class)    //childHandler示意给worker那些线程配置了一个处理器,    // 这个就是下面NIO中说的,把解决业务的具体逻辑形象进去,放到Handler外面    .childHandler(new ChannelInitializer<SocketChannel>() {        @Override        protected void initChannel(SocketChannel socketChannel) throws Exception {            socketChannel.pipeline()                .addLast(new NormalInBoundHandler("NormalInBoundA",false))                .addLast(new NormalInBoundHandler("NormalInBoundB",false))                .addLast(new NormalInBoundHandler("NormalInBoundC",true));            socketChannel.pipeline()                .addLast(new NormalOutBoundHandler("NormalOutBoundA"))                .addLast(new NormalOutBoundHandler("NormalOutBoundB"))                .addLast(new NormalOutBoundHandler("NormalOutBoundC"));        }    });

上述代码运行后会失去如下执行后果

InboundHandler:NormalInBoundAInboundHandler:NormalInBoundBInboundHandler:NormalInBoundCOutBoundHandler:NormalOutBoundCOutBoundHandler:NormalOutBoundBOutBoundHandler:NormalOutBoundA

当客户端向服务端发送申请时,会触发服务端的NormalInBound调用链,依照排列程序一一调用Handler,当InBound解决实现后调用WriteAndFlush办法向客户端写回数据,此时会触发NormalOutBoundHandler调用链的write事件。

从执行后果来看,Inbound和Outbound的事件流传方向是不同的,Inbound流传方向是head->tail,Outbound流传方向是Tail-Head。

异样流传机制

ChannelPipeline工夫流传机制是典型的责任链模式,那么有同学必定会有疑难,如果这条链路中某个handler出现异常,那会导致什么问题呢?咱们对后面的例子批改NormalInBoundHandler

public class NormalInBoundHandler extends ChannelInboundHandlerAdapter {    private final String name;    private final boolean flush;    public NormalInBoundHandler(String name, boolean flush) {        this.name = name;        this.flush = flush;    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("InboundHandler:"+name);        if(flush){            ctx.channel().writeAndFlush(msg);        }else {            //减少异样解决            throw new RuntimeException("InBoundHandler:"+name);        }    }}

这个时候一旦抛出异样,会导致整个申请链被中断,在ChannelHandler中提供了一个异样捕捉办法,这个办法能够防止ChannelHandler链中某个Handler异样导致申请链路中断。它会把异样依照Handler链路的程序从head节点流传到Tail节点。如果用户最终没有对异样进行解决,则最初由Tail节点进行对立解决

批改NormalInboundHandler,重写上面这个办法。
@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {    System.out.println("InboundHandlerException:"+name);    super.exceptionCaught(ctx, cause);}

在Netty利用开发中,好的异样解决十分重要可能让问题排查变得很轻松,所以咱们能够通过一种对立拦挡的形式来解决异样解决问题。

增加一个复合处理器实现类

public class ExceptionHandler extends ChannelDuplexHandler {    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        if(cause instanceof RuntimeException){            System.out.println("解决业务异样");        }        super.exceptionCaught(ctx, cause);    }}

把新增的ExceptionHandler增加到ChannelPipeline中

bootstrap.group(bossGroup, workerGroup)    //配置Server的通道,相当于NIO中的ServerSocketChannel    .channel(NioServerSocketChannel.class)    //childHandler示意给worker那些线程配置了一个处理器,    // 这个就是下面NIO中说的,把解决业务的具体逻辑形象进去,放到Handler外面    .childHandler(new ChannelInitializer<SocketChannel>() {        @Override        protected void initChannel(SocketChannel socketChannel) throws Exception {            socketChannel.pipeline()                .addLast(new NormalInBoundHandler("NormalInBoundA",false))                .addLast(new NormalInBoundHandler("NormalInBoundB",false))                .addLast(new NormalInBoundHandler("NormalInBoundC",true));            socketChannel.pipeline()                .addLast(new NormalOutBoundHandler("NormalOutBoundA"))                .addLast(new NormalOutBoundHandler("NormalOutBoundB"))                .addLast(new NormalOutBoundHandler("NormalOutBoundC"))                .addLast(new ExceptionHandler());        }    });

最终,咱们就可能实现异样的对立解决。

版权申明:本博客所有文章除特地申明外,均采纳 CC BY-NC-SA 4.0 许可协定。转载请注明来自 Mic带你学架构
如果本篇文章对您有帮忙,还请帮忙点个关注和赞,您的保持是我一直创作的能源。欢送关注同名微信公众号获取更多技术干货!