关于Netty:netty

86次阅读

共计 5139 个字符,预计需要花费 13 分钟才能阅读完成。

根本组件

  • NioEventLoop: 监听客户端连贯和解决客户端读写
  • Channel: 对一个 Socket 连贯的封装,可进行数据的读写
  • Pipline: 对数据的逻辑解决链
  • ChannelHandler: Pipline 里的一个解决逻辑
  • ByteBuf: 字节缓冲的容器

Netty 服务端启动步骤:

1. 创立服务端 Channel

创立 JDK 定义的 Channel,将它包装成 netty 的 Channel,并创立一些根本组件绑定在 Channel 上

newChannel 办法里通过反射调用 clazz.newInstance()返回一个 Channel 对象,这个 clazz 的类型是在创立 ServerBootStrap 的时候传入的 NioServerSocketChannel。

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .childOption(ChannelOption.TCP_NODELAY, true)
    .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
    .handler(new ServerHandler())
    .childHandler(null);

NioServerSocketChannel 的创立流程:

2. 初始化服务端 Channel

初始化一些根本属性,增加一些逻辑解决

3. 注册 Selector

将 JDK 的 Channel 注册到事件轮询器 Selector 上,并把 netty 的服务端 Channel 作为一个 attachment 绑定在 JDK 的 Channel 上

4. 端口绑定

调用 JDK API,实现对本地端口的监听

AbstractChannel#bind

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    ...

    // wasActive 示意端口绑定之前的状态
    boolean wasActive = isActive();
    try {// 调用 javaChannel().bind 将 JDK 的 channel 绑定到指定端口
        doBind(localAddress);
    } catch (Throwable t) {safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
    // 端口绑定之后,isActive 返回 true
    if (!wasActive && isActive()) {invokeLater(new Runnable() {
            @Override
            public void run() {pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}

fireChannelActive 办法流传事件调用到 doBeginRead 办法,增加 selectionKey 感兴趣的事件。
就是在服务端端口绑定胜利之后,开始承受连贯申请

protected void doBeginRead() throws Exception {
    // selectionKey 是注册服务端 channel 到 selector 上的时候返回的
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {return;}

    readPending = true;
    // 获取以后感兴趣的事件,初始值是 0
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        // 增加感兴趣的事件,readInterestOp 代表的是 accept 事件,承受申请
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

NioEventLoop

NioEventLoop 创立

NioEventLoop 在 NioEventLoopGroup 构造函数中被创立,默认创立 2*CPU 核数个 NioEventLoop
每个 NioEventLoop 持有一个线程。

newChild()办法创立 NioEventLoop,次要做了三件事:

  • 保留线程执行器 ThreadPerTaskExecutor
  • 为 NioEventLoop 创立一个 Selector
  • 创立一个 MpscQueue 工作队列

ThreadPerTaskExecutor: 执行每个工作都去创立一个线程去执行

chooserFactory.newChooser()
chooser 的作用: 采纳循环的形式为新连贯绑定一个 NioEventLoop

NioEventLoop 启动

NioEventLoop 启动触发条件

  • 服务端启动绑定端口

  • 新连贯接入通过 chooser 绑定一个 NioEventLoop

NioEventLoop 执行

protected void run() {for (;;) {
        try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    // 轮询 I / O 事件
                    select(wakenUp.getAndSet(false));

                    if (wakenUp.get()) {selector.wakeup();
                    }
                default:
                    // fallthrough
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    runAllTasks();}
            } else {final long ioStartTime = System.nanoTime();
                try {processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {if (isShuttingDown()) {closeAll();
                if (confirmShutdown()) {return;}
            }
        } catch (Throwable t) {handleLoopException(t);
        }
    }
}

整个执行过程次要做三件事:

  • select(): 查看是否有 I / O 事件
  • processSelectKeys(): 解决 I / O 事件
  • runAllTasks: 解决异步工作队列
  1. select():

select()办法也是在一个 for(;;) 循环里执行

  • 判断 select 操作是否超过截止工夫
  • 执行阻塞式的 select
  • 为防止 jkd 空轮训的 bug,查看空轮训次数;如果超过 512 就创立一个新的 selector,并将原有 selector 的 selectionKey 赋给新的 selector
  1. processSelectKeys()

对 NioEventLoop 读取到的 selectionKey 对应的事件进行解决

  1. runAllTasks()

工作有两种,一种是一般的工作,另一种是定时工作。在解决之前,工作被增加到了不同的队列里,所以要先把两种工作合并到一个工作队列,而后顺次执行。
此外,还要查看以后工夫是否超过了容许执行工作的工夫,如果超时就间接中断,进行下一次 NioEventLoop 的执行循环。

新连贯接入

检测新连贯

总体流程

在服务端的 NioEventLoop 执行的第二个过程 processSelectKeys 中检测出 accept 事件后,通过 JDK 的 accept 办法创立 JDK 的一个 channel

创立 NioSocketChannel

逐层调用父类构造函数,将服务端 NioServerSocketChannel 和 accept()创立的 channel 作为参数;设置阻塞模式为 false,保留读事件,创立 unsafe、pipline 组件,禁止 Nagel 算法

服务端 channel 在检测到新连贯并创立完 NioSocketChannel 后,会通过 ServerBootStrapAcceptor 对 NioSocketChannel 做一些解决:

  • 增加 childHandler
  • 设置 options 和 attrs
  • 通过 chooser 在 childGroup 中抉择 NioEventLoop 并注册 selector

创立实现之后,调用 pipeline.fireChannelActive()向 selector 注册读事件,开始筹备承受 I / O 数据。

在执行工作的时候,常常会用到 inEventLoop()办法,这个办法是用来判断以后线程是否是 EventLoop 中的线程。
如果是间接执行,不是就把工作放到 MpscQueue(Multi producer single consumer)工作队列中,这样工作交给指定的 NioEventLoop 执行,不用加锁。

pipeline

初始化

pipeline 在创立 channel 的时候被创立

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();}

pipeline 中的节点是 ChannelHandlerContext 类型的,双向链表构造;在初始化时就会创立 head、tail 两个节点

protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

tail 和 head 别离是 inbound 和 outbound 类型的。

channelHandler 的增加在用户代码配置 childHandler 时实现

childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) {ch.pipeline().addLast(new AuthHandler());
    }
})

增加时首先判断是否反复增加,而后创立节点 (HandlerContext) 增加至链表,最初回调增加实现的事件。

channelHandler 的删除操作与增加相似,首先找到节点将它从链表上删除,最初回调删除 handler 事件。

事件流传

事件流传有两种形式:

  • pipeline 流传: 这种形式会从 pipeline 的 head 节点开始流传
  • channelContext 流传: 从以后节点开始流传

inbound 事件

inBound 事件包含 register、active、active 事件

每次流传都会找到下一个 inbound 类型的节点,最初流传给 tail 节点(tail 也是 inbound 类型)

outBound 事件

outBound 事件流传方向与 inBound 相同,是从 tail 向 head 流传

异样事件流传与以上两种不同,它不关怀是 inBoundhandler 还是 outboundhandler,并且只会从产生异样的节点开始向后流传,达到 tail 节点。
所以,最好在 tail 节点前创立一个专门用于解决异样的节点,重写 exceptionCaught 办法,这样所有异样都能被捕捉。

正文完
 0