关于java:Netty-NIO-Server启动流程

34次阅读

共计 16508 个字符,预计需要花费 42 分钟才能阅读完成。

欢送关注公众号: NullObject,欢送扫码关注:

文章首发在集体博客 https://www.nullobject.cn,公众号 NullObject 同步更新。

本文基于 Java 8Netty 4.1.69.Final

Netty 是一个 Java 异步网络通信框架。Netty 中基于 Java NIO 实现了异步 IO 通信,其中实现的 NioServerSocketChannel/NioSocketChannel 等相干的组件,实际上就是对 Java NIO 绝对组件的封装。本文将从 Java NIO 启动过程开始,对 Netty 中 NIO Server 的启动流程进行学习和剖析。

1.0 NIO Server 启动流程

​ Netty 中 NioServerSocketChannel 等组件是在 Java NIO 根底上封装的,因而,在开始学习 Netty NIO 启动流程之前,先理清 Java NIO 的应用步骤,以便接下来的剖析中更易于了解 Netty Server 启动和运行过程。

1.1 启动步骤概述

NIO Server 启动和初始化次要通过以下步骤:

  • 初始化选择器 / 多路复用器 Selector
  • 初始化服务器通道 ServerSocketChannel 并配置异步工作模式
  • 将 ServerSocketChannel 注册到 Selector,获取对应的 SelectionKey 并注册须要关注的 Accept 事件
  • 绑定监听地址端口
  • 解决客户端连贯
  • 接收数据

启动流程图:

1.2 注册 ServerSocketChannel

须要将服务端通道 ssc 注册到选择器 selector 上,并注册须要关注的事件:

mKey = mSSC.register(mSelector,0);
mKey.interestOps(SelectionKey.OP_ACCEPT);

ServerSocketChannel#register 办法返回一个SelectionKey 对象,selectionKey 对象用于示意以后 SocketChannel 及其关联的 Selector 的注册关系,外部持有 SocketChannel 和 Selector 单方对象援用,并负责保护以后通道关注的事件以及曾经产生 / 就绪事件信息;SelectionKey#interestOps 办法用于注册对应的 Channel 所须要关注的事件,可关注的事件封装在 SelectionKey 中:

  • OP_READ

    读事件 ,示意 Selector 检测到其所关联的 Channel 中 有可读事件产生,包含 Channel 数据读取就绪、已达到流末端、近程读取通道被敞开或 Channel 中产生谬误挂起等,随后 Selector 会将 OP_READ 事件增加到 SelectionKey 对象中,并标识该事件为就绪事件。

  • OP_WRITE

    写事件 ,示意 Selector 检测到其所关联的 Channel 中 有可写事件产生,包含 Channel 数据写出就绪、近程写入通道被敞开,或 Channel 中产生谬误挂起等,随后 Selector 会将 OP_WRITE 事件增加到对应的 SelectionKey 中,并标识该事件为就绪事件。

  • OP_CONNECT

    连贯事件 ,示意 Selector 检测到所关联的 Channel 中 有连贯事件产生,包含 Channel 连贯结束就绪或 Channel 中产生谬误等,该事件通常产生在 NIO Client 胜利连贯服务器后,Selector 会将OP_CONNECT 事件增加到对应的 SelectionKey 中,并标识该事件为就绪事件。

  • OP_ACCEPT

    承受事件,标识 Selector 检测到所关联的 ServerSocketChannel 已准备就绪承受新到来的客户端连贯申请,或通道中产生了谬误,随后 Selector 会将 OP_ACCEPT 事件增加到对应的 SelectionKey 中,并标识该事件为就绪事件。

1.3 Selector 收集 IO 事件

Selector 是 NIO 中的多路复用器,其能够同时治理多个 IO 通道。注册 serverSocketChannel 之后,开始循环调用 selector.select()收集 serverSocketChannel 通道上产生的事件:

// 检测 selector 关联的所有 Channel 中已就绪的 IO 事件
// count:产生的 IO 事件数量
int count = mSelector.select();

select()办法会阻塞以后线程,直到收集到 IO 事件产生,才会持续运行。如果须要提前结束阻塞,能够调用其重载办法 mSelector.selector(interval),指定阻塞时长,当超过该时长没有收集到 IO 事件,则完结阻塞。或者在其余线程调用 mSelector.wakeup() 手动完结阻塞,继续执行,wakeup()执行后,select()办法会立刻返回。

1.4 解决客户端连贯申请

当检测到有 OP_ACCEPT 事件产生,示意有新的客户端连贯申请,须要调用 ServerSocketChannel#accept() 办法承受并建设客户端连贯,而后将该客户端连贯通道 SocketChannel 注册到 Selector 上,并注册读信息事件,以接管解决客户端发送的数据:

private void handleNewConnection(final SelectionKey key) throws IOException {
    // 解决新接入的申请音讯
    // Accept the new connection
    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
    // 承受新的连贯,创立 SocketChannel 示例,至此,相当于实现了 TCP 的三次握手,TCP 物理链路正式建设。SocketChannel socketChannel = serverSocketChannel.accept();
    // 设置为异步非阻塞模式
    socketChannel.configureBlocking(false);
    // 将新连贯注册到多路复用器,关联一个 bytebuffer 附件以寄存接管到的数据
    ByteBuffer buffer = ByteBuffer.allocate(INIT_READ_BUFFER_SIZE);
    // Register the new connection to the selector
    socketChannel.register(mSelector, SelectionKey.OP_READ, buffer);
}

1.5 接收数据

当检测到 OP_READ 事件产生,示意通道中接管到对端发送的数据,或者与对方的连贯断开,这时能够从 SocketChannel 中尝试读取数据:

private void handleReadData(final SelectionKey key) throws IOException {
    // Read the data
    // 读取客户端的申请音讯
    // 获取连贯通道实例
    SocketChannel socketChannel = (SocketChannel) key.channel();
    // 获取关联的 buffer 附件
    ByteBuffer readBuffer = (ByteBuffer) key.attachment();
    // 从 TCP 缓冲区读数据
    int readBytes;
    if ((readBytes = socketChannel.read(readBuffer)) > 0) {
        // 解决读取到的数据
        handleData(readBuffer);
        // 决定是否须要扩容
        if (readBuffer.position() == readBuffer.limit()) {final ByteBuffer buffer = ByteBuffer.allocate(readBuffer.capacity() * 2);
            readBuffer.flip();
            buffer.put(readBuffer);
            key.attach(buffer);
        }
    }
    // 链路曾经敞开,此时须要敞开 SocketChannel,开释资源
    if (readBytes < 0) {System.out.println("connection closed:" + socketChannel.getRemoteAddress());
        // 对端链路敞开
        key.cancel();
        socketChannel.close();}
}

能够看到,相比于传统的 Java BIO,NIO 的应用过程要简单一些,但得益于其 同步非阻塞 的 IO 模型,使 NIO 解决高并发与海量连贯成为可能。而 Netty 对 NIO 做了进一步的优化和封装,简化了异步 IO 的开发流程。

理清了 NIO 的启动流程,接下来剖析 Netty 中对 NIO 每个关键步骤的封装和执行过程。

2.0 启动 Netty Server

​ 能够十分不便地启动一个 Netty Server,通过简略的链式调用初始化相干组件,最初绑定端口即可启动 Socket 监听:

@Slf4j
public class TestSourceServer {public static void main(String[] args) {final ServerBootstrap bootstrap = new ServerBootstrap();
      // 执行 IO 事件的事件循环组
        final NioEventLoopGroup bossGroup = new NioEventLoopGroup();
      // 配置服务端启动器
        bootstrap.group(bossGroup)
                 .channel(NioServerSocketChannel.class)
                 .childHandler(new ChannelInitializer<NioSocketChannel>() {
                     @Override
                     protected void initChannel(final NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler());
                     }
                 });
      // 开始监听指定端口
        bootstrap.bind(8080);
    }
}

ServerBootStrap 是 Netty 服务端的启动器,通过 serverBootStrap 对象的一系列链式调用初始化各个组件:

  • group(bossGroup):配置用于执行 IO 事件 / 一般事件的事件循环组。
  • channel(NioServerSocketChannel.class):配置须要应用的 socket 服务端组件,本文剖析的是 NIO 相干的启动流程,所以须要配置服务端组件为 NioServerSocketChannel.class。
  • childHandler(initializer):配置客户端连贯通道初始化器。服务端建设与客户端连贯通道 SocketChannel 后,Netty 会通过调用链最终回调 initChannel 办法告诉用户 socketChannel 通道曾经创立好,此时用户能够在此对 socketChannel 进行一些业务上的初始化工作,比方设置音讯编解码器、增加日志处理器等。下面示例中,在每一次创立的客户端连贯通道中的事件处理管道上都增加了一个日志处理器 LoggingHandler。

serverBootstrap 对象在执行 ind()办法绑定端口之前的所有链式调用,都是对服务端进行一些配置,真正启动服务端监听,是从执行 bind()办法开始。

3.0 注册 ServerSocketChannel

3.1 创立 ServerSocketChannel 对象

​ 接下来以 debug 模式启动第 1 节中的示例,在 bootstrap.bind(8080) 一行开始执行,始终到 AbstractBootstrap#doBind() 办法(Line 272 ):

办法中第一行调用了一个 initAndRegister() 办法并返回一个 ChannelFuture 对象,从返回参数能够推断该办法外部执行了异步操作,并且依据办法名能够猜测,该办法内可能实现了 注册 serverSocketChannel这一步骤。接着往下执行进入 initAndRegister() 办法一探到底:

能够看到,办法中先是通过 channelFactory 工厂对象创立了一个 channel 对象Channel 是 Netty 中定义的形象接口,用于示意 Netty 中具备 IO 通信能力的组件,示例中的 NioServerSocketChannel 类便实现了 Channel 接口。接着往下执行,发现创立的 channel 对象正是在配置 serverBootStrap 时配置的 NioServerSocketChannel 类的实例:

在配置 serverBootStrap 步骤中,通过 bootstrap.channel(NioServerSocketChannel.class) 办法设置了实现服务端监听的具体组件,办法中创立了一个 ReflectiveChannelFactory 对象并将 NioServerSocketChannel 的类型信息传入,再将 reflectiveChannelFactory 对象赋值给 channelFactory 域,实现 Server IO Channel 组件的配置:

而 reflectiveChannelFactory 对象中的 newChannel 的实现,则通过反射调用 NioServerSocketChanel 的无参构造方法,实现 nioServerSocketChannel 对象的创立。因而,接下来程序会运行跳转到 NioServerSocketChannel 类的无参构造方法:

NioServerSocketChannel 类的构造方法中,又通过 newSocket 办法和调用了 selectorProvider.openServerSocketChannel()办法,最终实现对 Java NIO 的 ServerSocketChannel 对象的创立:

这里间接通过 selectorProvider.openServerSocketChannel() 创立 ServerSocketChannel 对象,而在 NIO 启动步骤中调用 ServerSocketChannel.open() 创立的形式,其外部也是通过 selectorProvider.openServerSocketChannel()办法来创立:

程序到这里,①实现了创立 ServerSocketChannel 的步骤

3.2 执行 channel 注册

​ 持续回到 abstractBootStrap.initAndRegister() 办法,获取到 channel 对象后,通过 init(channel)办法对 channel 进行初始化,而后进行注册:

其中 config().group() 获取到的是在初始化 serverBootStrap 配置时设置的事件循环组对象 bossGroup,config().group().register(channel)形象办法的实现位于 NioEventLoopGroup 的父类 MultithreadEventLoopGroup 中:

// MultithreadEventLoopGroup.java Line 85
@Override
public ChannelFuture register(Channel channel) {return next().register(channel);
}

next()办法会从初始化 bossGroup 时创立的事件循环池 (能够了解为线程池,一个事件循环对应一个线程,事件循环对应类是 NioEventLoop) 中抉择一个事件循环对象来执行 register(channel)办法。nioEventLoop.register()形象办法的实现位于 NioEventLoop 的父类 SingleThreadEventLoop 类中:

接着运行到 AbstractChannel$AbstractUnsafe.register 办法 (AbstractChannel.java Line 465),而该办法最终又调用 register0(promise)办法实现了注册的细节:

这里应用传入的 eventLoop 对象初始化了 channel 外部的 eventLoop 对象,并通过 eventLoop.inEventLoop()办法判断是否将注册工作交由 eventLoop 事件循环对象来执行,理论就是判断 eventLoop 中的线程是否与以后线程为同一线程环境,来决定是间接调用 register0 办法,还是交给 eventLoop。留神 这里的 eventLoop 对象在 nioBossGroup 事件循环组对象创立时便曾经被创立进去,但 eventLoop 中所蕴含的子线程对象尚未被初始化,并没有真正启动线程,只有在第一次向 eventLoop 提交异步工作时,eventLoop 才会初始化并启动线程,也就是将线程提早启动,这也是 Netty 中对应用线程资源的优化措施。接着进入 regster0 办法,外部又调用了 doRegster() 办法进行真正的注册:

通过 javaChannel() 获取后面步骤中创立的 java NIO ServerSocketChannel 对象,通过 register 办法将其注册到 selector 中,最初将以后 channel 对象作为附件保留到 selectionKey(在后续解决 IO 事件中,会以从 selectionKey 获取 selectionKey 关联的 channel 对象),并将注册后果 (SelectionKey) 赋值给以后 channl 的 selectionKey 域。至此,②Netty 中 ServerSocketChannel 注册结束

4.0 创立 Selector

​ 在 serverSocketChannel 到 selector 时,看到 selector 对象是通过 eventLoop().unwrappedSelector() 获取的,那 selector 对象是何时创立的呢?首先通过 eventLoop()获取以后 channel 对象的 eventLoop 对象,再通过 eventLoop.unWrappedSelector()办法获取到 nioEventLoop 内的 unwrappedSelector 对象作为 channel 的 selector,unwrappedSelector 则在 NioEventLoop 中被初始化:先调用 openSelector()并返回一个 SelectorTuple 对象,再通过 selectorTuple.unwrappedSelector 对 unwrappedSelector 进行赋值,openSelector()办法中,则最终调用了 selectorProvider.openSelector()办法创立了 selector,并将创立的 selector 包装到 selectorTuple 中返回:

其中 provider 就是 SelectorProvider 对象实例。至此,③Selector 创立实现

这里没有间接应用 selectorProvider.openSelector()创立的 selector,而是先将其封装了一个 selectorTuple 对象中再应用 unwrappedSelector,是因为 Netty 在封装该步骤时做了性能上的一些优化,具体过程在 NioEventLoop 类的 selector()办法实现中。

5.0 bind 开始监听

​ 程序调用 AbstractNioChannel.doRegister()办法实现 serverSocketChannel 注册后,接着开始绑定监听端口。回到 AbstractChannel.register0()办法中,doRegister()执行结束,将 registered 标记为 true,示意以后通道曾经注册结束,接着通过 pipeline.fireChannelRegistered() 办法将 serverSocketChannel 注册实现的事件发送到事件处理管道中以告诉下层代码解决。回到 AbstractBootstrap.doBind()办法中,initAndRegister()执行完通道注册流程,doBind()持续向下运行,通过 regFuture 判断注册曾经实现还是在异步注册,不论如何,接下来都会持续调用 doBind0 办法进行端口绑定。doBind0 办法中,则将绑定工作交给 serverSocketChannel 的事件循环来执行:

最终在 AbstractChannel$AbstractUnsafe.bind 办法中调用 doBind 实现端口绑定,NioServerSocketChannel 继承了 AbstractChannel 并实现了 doBind:

能够看到,NioServerSocketChannel.doBind 实现中,调用了 Java NIO ServerSocketChannel.bind()办法实现端口绑定,④Netty Server 绑定监听端口步骤至此实现

6.0 关注 ACCEPT 事件

​ AbstractChannel$AbstractUnsafe.bind()办法中实现端口绑定后,接着持续向像 eventLoop 中提交一个工作用于向 serverSocketChannel 的事件处理管道发送 channelActive 事件以告知下层代码解决:

pipeline.fireChannelActive() 调用链 中,执行到 DefaultChannelPipeline&dollar;HeadContext.channelActive():

ctx.fireChannelActive()会将事件持续传递,而 readIfIsAutoRead() 往下运行则实现了注册 OP_ACCEPT 事件。readIfIsAutoRead()始终运行到 AbstractNioChannel.doBeginRead()办法,并在该办法中实现注册 OP_READ 事件:

先从 serverSocketChannel 对应的 selectionKey 判断,如果 channel 尚未注册感兴趣的事件,则调用 selectionKey.interestOps 办法进行注册:

能够看到,未注册感兴趣事件前,channel 以后注册的事件值为 0,0 示意未注册任何事件;serverSocketChannel 须要注册的事件 (readInterestOp 域 ) 值未为 16,即对应 OP_ACCEPT 事件,readInterestOp 域在创立 srverSocketChannel 对象时通过向父类构造方法传入 SelectionKey.OP_ACCEPT 来赋值:

执行完 doBeginRead,⑤关注 ACCEPT 事件步骤实现。

至此,Netty NIO Server 的启动和初始化过程完结,接下来开始监听和解决客户端连贯申请,接管客户端发送的数据等。

7.0 收集和解决 IO 事件

7.1 启动 eventLoop 线程

​ 在 NIO 运行过程中,为了不阻塞主线程的运行,同时保障 selector 可能始终一直地收集到产生的 IO 事件,通常会将 selector.select()和解决 IO 事件的代码放到子线程中执行。Netty 中也是在子线程中收集和解决所产生的 IO 事件:将这些工作交给 serverSocketChannel 的 eventLoop 事件循环对象异步执行。上文中提到 eventLoop 在第一次执行工作时才会初始化外部线程对象并启动线程。回顾 Netty 的启动步骤,程序第一次向 eventLoop 提交工作是在 serverSocketChannel 的注册过程,进行真正注册阶段的代码,位于 AbstractChannel&dollar;AbstractUnsafe.register()办法中(Line 483 ):

通过 eventLoop.execute 办法提交 register0 工作。这里 eventLoop 对象是 NioEventLoop 类的实例,其 execute 办法实现在 NioEventLoop 父类 SingleThreadEventExecutor 中:

再看公有的 execute(runnable,boolean)办法,先是通过 inEventLoop()办法判断提交工作的线程环境与以后事件循环的子线程环境是否为同一个事件循环的子线程。跟进去不难发现,首次提交工作时其实就是判断 eventLoop 外部线程是否曾经启动(因为线程还未启动,所以这里会返回 false):

// SingleThreadEventExecutor(Line 558)
// thread: Thread.currentThread()
// this.thread: eventLoop 外部线程,未执行过工作前,this.thread 还未启动,this.thread 值为 null
@Override
public boolean inEventLoop(Thread thread) {return thread == this.thread;}

接着将工作增加到工作队列,判断如果以后 eventLoop 对象的线程尚未开始运行,则调用 startThread()->doStartThread() 办法对线程初始化:

能够看到,最终是通过 eventLoop 外部的线程池对象 executor 创立出线程,并赋值给 eventLoop 的 thread 对象,实现初始化工作,紧接着调用 SingleThreadEventExecutor.run()办法,启动无线循环。NioEventLoop 类中实现了 SingleThreadEventExecutor.run()办法:

// NioEventLoop.java Line 435
@Override
protected void run() {
  int selectCnt = 0;
  for (;;) {
    try {
      int strategy;
      try {strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
        switch (strategy) {
          case SelectStrategy.CONTINUE:
            continue;

          case SelectStrategy.BUSY_WAIT:
            // fall-through to SELECT since the busy-wait is not supported with NIO

          case SelectStrategy.SELECT:
            long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
            if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE; // nothing on the calendar}
            nextWakeupNanos.set(curDeadlineNanos);
            try {if (!hasTasks()) {strategy = select(curDeadlineNanos);
              }
            } finally {
              // This update is just to help block unnecessary selector wakeups
              // so use of lazySet is ok (no race condition)
              nextWakeupNanos.lazySet(AWAKE);
            }
            // fall through
          default:
        }
      } catch (IOException e) {
        // If we receive an IOException here its because the Selector is messed up. Let's rebuild
        // the selector and retry. https://github.com/netty/netty/issues/8566
        rebuildSelector0();
        selectCnt = 0;
        handleLoopException(e);
        continue;
      }

      selectCnt++;
      cancelledKeys = 0;
      needsToSelectAgain = false;
      final int ioRatio = this.ioRatio;
      boolean ranTasks;
      if (ioRatio == 100) {
        try {if (strategy > 0) {processSelectedKeys();
          }
        } finally {
          // Ensure we always run tasks.
          ranTasks = runAllTasks();}
      } else if (strategy > 0) {final long ioStartTime = System.nanoTime();
        try {processSelectedKeys();
        } finally {
          // Ensure we always run tasks.
          final long ioTime = System.nanoTime() - ioStartTime;
          ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
        }
      } else {ranTasks = runAllTasks(0); // This will run the minimum number of tasks
      }

      if (ranTasks || strategy > 0) {if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                       selectCnt - 1, selector);
        }
        selectCnt = 0;
      } else if (unexpectedSelectorWakeup(selectCnt)) {// Unexpected wakeup (unusual case)
        selectCnt = 0;
      }
    } catch (CancelledKeyException e) {
      // Harmless exception - log anyway
      if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + "raised by a Selector {} - JDK bug?",
                     selector, e);
      }
    } catch (Error e) {throw e;} catch (Throwable t) {handleLoopException(t);
    } finally {
      // Always handle shutdown even if the loop processing threw an exception.
      try {if (isShuttingDown()) {closeAll();
          if (confirmShutdown()) {return;}
        }
      } catch (Error e) {throw e;} catch (Throwable t) {handleLoopException(t);
      }
    }
  }
}

7.2 收集 IO 事件

​ 在子线程执行的有限循环中,eventLoop 一直解决 IO 事件以及程序中提交到 eventLoop 队列中的非 IO 工作 (NioEventLoop 外部不仅会解决 IO 事件,同时还会执行一般工作:包含 Netty 程序外部提交的工作,或者用户提交的工作)。Netty NioEventLoop 事件循环中解决 IO 事件,也是先调用 selector 收集产生的 IO 事件( 收集阶段 ),再遍历后果进行一一解决( 解决阶段 )。只不过因为 Netty 的封装和优化,这过程看起来要比 Java NIO 中的解决步骤要简单多一些。首先看 IO 事件收集阶段 的执行过程,eventLoop 中联合了 selector 的 select()select(interval)selectNow() 办法来实现收集 IO 事件,执行过程又分为以后 有一般工作 没有一般工作 两种状况(通过 hasTasks() 判断)进行。

  • eventLoop 中有一般工作待处理 :eventLoop 工作队列以后有未解决的工作时,间接执行 selector.selectNow() 办法,selectNow()办法不论有没有 IO 事件产生,都会立刻返回收集后果,不会阻塞线程运行,这过程对应下图代码中的步骤 :selectStrategy.calculateStrategy 办法中判断如果以后有未解决的一般工作,则调用 selectNowSupplier.get()办法,该办法实现中调用了 selector.selectNow()并返回收集后果,若没有待处理的一般工作,则将 SelectStrategy.SELECT 返回赋值给 strategy 变量;

  • eventLoop 工作队列中没有待处理工作: 这时程序会执行到 swith 的 SELECT 分支,对应上图中步骤 :此处又判断是否存在未解决的一般工作是因为提交和执行工作是异步执行,在步骤 执行到步骤 期间程序其余中央可能会提交工作进来。当工作队列仍旧为空时,调用 select(interval)并返回收集后果给 strategy 变量。select(interval)办法进一步判断决定是执行 selector.selectNow()还是 selector.select(interval),selector.select(interval)执行如果在 interval 指定的工夫内没有事件产生则完结阻塞,返回后果:
// NioEventLoop.java Line 808
private int select(long deadlineNanos) throws IOException {if (deadlineNanos == NONE) {return selector.select();
  }
  // Timeout will only be 0 if deadline is within 5 microsecs
  long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
  return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}

7.3 解决 IO 事件

从下面收集 IO 事件的过程能够看到,strategy 变量作用就是记录收集到的 IO 事件数量(strategy>0 阐明有 IO 事件产生并收集到了)。接着到 IO 事件处理阶段,对应的要害代码:

其中依据判断条件执行了 processSelectedKeys()或 runAllTasks()办法,processSelectedKeys()实现了接管和解决 IO 事件,runAllTask()则实现了执行 eventLoop 队列中的工作 。这里除了 strategy,还有个要害的局部变量 ioRatio:因为解决 IO 事件是一个相当耗时的过程,为了均衡解决 IO 事件和执行队列工作的工夫,保障 eventLoop 队列中的非 IO 工作可能被执行,这段代码通过 ioRatio 变量来管制解决 IO 事件和执行队列工作所耗时间的比例。若本轮循环中收集到 IO 事件(strategy>0 ),则通过 processSelectedKeys() 一一解决收集到的 IO 事件。

7.3.1 解决连贯申请

processSelectedKeys() 办法的调用链中,会遍历产生的 IO 事件集,并交给 processSelectedKey(SelectionKey k, AbstractNioChannel ch) 办法解决:

先是获取到产生的 IO 事件,判断如果是 OP_ACCEPT 或是 OP_READ 事件则调用 ch.unsafe().read()进行进一步解决。接下来在 processSelectedKey(SelectionKey k, AbstractNioChannel ch) 办法内打下断点,调试模式启动 Netty Server 端监听 8080 端口,并应用 TCP 客户端工具开启一个连贯申请:

点击开始连贯,返回 Idea,发现程序曾经执行到办法第一行断点处,持续往下运行到 Line 718 处:

首先步骤 1 先获取 IO 通道对象 ch 的外部类 NioUnsafe 对象实例 unsafe,留神此时 serverSocketChannel 接管到的该当是客户端连贯申请事件 OP_ACCEPT(方才在 TCP 客户端点击发动了连贯申请),ch 对应的是 NioServerSocketChannel 类,此时 ch 对象示意的是服务端 channel 对象 serverSocketChannel,对应的 ch.unsafe()对象的实现位于 NioServerSocketChannel 的父类 AbstractNioMessageChannel 中 。接着步骤 2 获取 SelectionKey 中所有收集到的事件并赋值给 readyOps 变量,通过单步调试能够看到,此时 readyOps 的值是 16,对应的正是 OP_ACCEPT 事件,因而程序会运行到 Line 719,执行unsafe.read()->doReadMessages(readBuf) 解决客户端连贯申请:

SocketUtils.accept 办法中实现了 serverSocketChannel.accept()办法调用,承受客户端连贯申请并返回对应的 NIO 连贯通道对象,接着用 NIO channel 对象初始化 Netty 的 NioSocketChannel 客户端连贯通道对象,并将 Netty socketChannel 返回给下层调用。至此,⑥承受客户端连贯申请,创立连贯通道步骤实现。最初,将承受连贯事件通过 pipeline.fireChannelRead 办法发送至 serverSocketChannel 的事件处理管道,告诉下层程序和用户,能够对客户端连贯通道 socketChannel 做一些进一步的初始化工作(包含 SocketChannel 的注册和用户做的业务上的初始化内容)。接着往下执行,最终会回调到用户在初始化 serverBootStrap 时设置的 childHandler 对象办法中:

这里的 NioSocketChannel 对象,就是在承受客户端连贯时创立连贯通道。在这里通过 ch.pipeline().addLast()办法,把自定义的 handler 增加到 socketChanel 的事件处理管道 pipeline 中,接下来便能够在 handler 中接管解决和发送音讯数据。

7.3.2 接管 IO 音讯

​ Netty Server 承受并创立客户端连贯通道后,就能够相互收发音讯了。无关客户端连贯通道 NioSocketChannel 对象的创立和初始化,与服务端通道 NioServerSocketChannel 对象的初始化过程根本大致相同,这里不再赘述。从后面的步骤中晓得,eventLoop 解决 IO 事件最终都会运行到 processSelectedKey(SelectionKey k, AbstractNioChannel ch) 办法来进行解决,在这个办法第一行打上断点,并服务端程序放弃运行,从 TCP 客户端发送字串“Hello”给服务器:

接着回到 Idea 会看到程序曾经运行断点处:

能够看到,这时的 ch 对象为客户端连贯通道 NioSocketChannel 的实例,收集到的 IO 事件值为 1,对应 OP_READ 读事件 (方才客户端向服务端发送了数据), 此时的 unsafe.read()办法对应的实现类是位于 NioSocketChannel 父类 AbstractNioByteChannel 中的 NioByteUnsafe 类 ,接着运行能够看到,程序最初在 NioSocketChannel.doReadBytes(byteBuf) 办法中实现对 socketChannel 中数据的读取:

最初将读取到的音讯数据通过 socketChannel.pipeline().fireChannelRead(byteBuf)办法将读到的数据传递给设置的 handler 处理器进行解决:

至此,⑦Netty Server 端接管并解决用户音讯步骤执行结束

8.0 小结

​ 以上便是 Netty Server 从初始化配置,启动服务端,初始化 NIO ServerSocketChannel、Selector 等组件,监听端口,到承受建设客户端连贯通道,接管并解决客户端音讯的整个过程。本文的指标在于理清 Netty Server 启动和初始化的整体过程,文章中根据 java NIO 的启动流程对 Netty Server 的运行过程进行了梳理,整个剖析过程并没有太大难度。而文章中没有剖析阐明的诸如事件循环 EventLoopGroup、EventLoop 组件,Channel 的事件处理管道 Pipeline 组件、事件处理器 Handler 相干组件、和音讯相干的 Bytebuf 组件等这些 Netty 的组成部分,是进一步学习了解 Netty 的要害,在后续对 Netty 的钻研中,将尝试对这些组件进行一一解读。

正文完
 0