关于kafka:从Kafka到NIO

1次阅读

共计 3953 个字符,预计需要花费 10 分钟才能阅读完成。

在谈 NIO 之前,简略回顾下内核态和用户态

内核空间是 Linux 内核运行的空间,而用户空间是用户程序的运行空间,为了保障内核平安,它们之间是隔离的,即便用户的程序解体了,内核也不受影响。
内核空间能够执行任意命令,调用零碎的所有资源,用户空间只能执行简略运算,不能间接调用系统资源(I/O, 过程资源, 内存调配,外设,计时器,网络通信等),必须通过零碎接口(又称 system call),能力向内核收回指令。

用户过程通过零碎调用拜访系统资源的时候,须要切换到内核态,而这对应一些非凡的堆栈和内存环境,必须在零碎调用前建设好。而在零碎调用完结后,cpu 会从内核态切回到用户态,而堆栈又必须复原成用户过程的上下文。而这种切换就会有大量的耗时。

过程缓冲区

个别程序在读取文件的时候先申请一块内存数组,称为 buffer,而后每次调用 read,读取设定字节长度的数据,写入 buffer。(用较小的次数填满 buffer)。之后的程序都是从 buffer 中获取数据,当 buffer 应用完后,在进行下一次调用,填充 buffer。这里的 buffer 咱们称为用户缓冲区,它的目标是为了缩小频繁 I / O 操作而引起频繁的零碎调用,从而升高操作系统在用户态与外围态切换所消耗的工夫。

内核缓冲区

除了在过程中设计缓冲区,内核也有本人的缓冲区。

当一个用户过程要从磁盘读取数据时,内核个别不间接读磁盘,而是将内核缓冲区中的数据复制到过程缓冲区中。

但若是内核缓冲区中没有数据,内核会把对数据块的申请,退出到申请队列,而后把过程挂起,为其它过程提供服务。

等到数据曾经读取到内核缓冲区时,把内核缓冲区中的数据读取到用户过程中,才会告诉过程,当然不同的 io 模型,在调度和应用内核缓冲区的形式上有所不同。

你能够认为,read 是把数据从内核缓冲区复制到过程缓冲区。write 是把过程缓冲区复制到内核缓冲区。

当然,write 并不一定导致内核的写动作,比方 os 可能会把内核缓冲区的数据积攒到一定量后,再一次写入。这也就是为什么断电有时会导致数据失落。

所以,咱们进行 IO 操作的申请过程如下: 用户过程发动申请(调用零碎函数),内核接管到申请后(过程会从用户态切换到内核态), 从 I / O 设施中获取数据到内核 buffer 中,再将内核 buffer 中的数据 copy 到用户过程的地址空间,该用户过程获取到数据后再响应客户端。

I/ O 复用模型

JavaNIO 应用了 I / O 复用模型

从图中能够看出,咱们阻塞在 select 调用,期待数据报套接字变为可读。当 select 返回套接字可读这一条件的时候,咱们调用 recvfrom 把所读数据从内核缓冲区复制到利用过程缓冲区。

那么内核态怎么判断 I / O 流可读可写?
内核针对读缓冲区和写缓冲区来判断是否可读可写

而 java 从 1.5 开始就应用 epoll 代替了之前的 select, 它对 select 有所加强,比拟有特点的是 epoll 反对程度触发 (epoll 默认) 和边缘触发两种形式。

epoll 比 select 高效次要几种在两点, 这个能够参考知乎的这个答复(https://www.zhihu.com/questio…

  1. 缩小用户态和内核态之间的文件句柄拷贝
  2. 缩小对可读可写文件句柄的遍历

epoll 和 NIO 的操作形式对应图如下:

  1. epoll_ctl 注册事件
  2. epoll_wait 轮询所有的 socket
  3. 解决对应的事件

epoll 中比拟乏味的是程度触发 (LT) 和边缘触发(ET)。

程度触发(条件触发):读缓冲区只有不为空,就始终会触发读事件;写缓冲区只有不满(发送得速度比写得速度快),就始终会触发写事件。这个比拟合乎编程习惯,也是 epoll 的缺省模式。

边缘触发(状态触发):读缓冲区的状态,从空转为非空的时候,触发 1 次;写缓冲区的状态,从满转为非满的时候,触发 1 次。比方你发送一个大文件,把写缓存区塞满了,之后缓存区能够写了,就会产生一次从满到不满的切换。

通过剖析,咱们能够看出:
对于 LT 模式,要防止 ” 写的死循环 ” 问题:写缓冲区为满的概率很小,也就是 ” 写的条件 ” 会始终满足,所以如果你注册了写事件,没有数据要写,但它会始终触发,所以在 LT 模式下,写完数据,肯定要勾销写事件。

对应 ET 模式,要防止 ”short read” 问题: 比方你收到 100 个字节,它触发 1 次,但你只读到了 50 个字节,剩下的 50 个字节不读,它也不会再次触发,此时这个 socket 就废了。因而在 ET 模式,肯定要把 ” 读缓冲区 ” 的数据读完。

验证

代码太长了,我就只列出一段服务器端的次要代码,client 端的比较简单,写法和 server 端也相似就不列出来了

Selector selector = Selector.open();

// 创立通道 ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 将通道设置为非阻塞
serverSocketChannel.configureBlocking(false);

ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(8989));

/**
* 将通道 (Channel) 注册到通道管理器(Selector),并为该通道注册 selectionKey.OP_ACCEPT 事件
* 注册该事件后,当事件达到的时候,selector.select()会返回,* 如果事件没有达到 selector.select()会始终阻塞。*/
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

// 循环解决
while (true) {
    // 当注册事件达到时,办法返回,否则该办法会始终阻塞
    selector.select();

    // 获取监听事件
    Set<SelectionKey> selectionKeys = selector.selectedKeys();
    Iterator<SelectionKey> iterator = selectionKeys.iterator();

    // 迭代解决
    while (iterator.hasNext()) {
        // 获取事件
        SelectionKey key = iterator.next();

        // 移除事件,防止反复解决
        iterator.remove();

        // 客户端申请连贯事件,承受客户端连贯就绪
       if (key.isAcceptable()) {ServerSocketChannel server = (ServerSocketChannel) key.channel();
           SocketChannel socketChannel = server.accept();
           socketChannel.configureBlocking(false);
           // 给通道设置写事件,客户端监听到写事件后,进行读取操作
           socketChannel.register(selector, SelectionKey.OP_WRITE);
        } else if(key.isWritable()) {System.out.println("write");
            handleWrite(key);
        }
    }
}

当 client 连贯上 server 的时候就会发现 server 始终收到写事件,write 会始终打印。所以应用条件触发的 API 时,如果应用程序不须要写就不要关注 socket 可写的事件,否则就会有限次的立刻返回一个 write ready 告诉。大家罕用的 select 就是属于条件触发这一类,长期关注 socket 写事件会呈现 CPU 100% 的故障。所以在应用 Java 的 NIO 编程的时候,在没有数据能够往外写的时候要勾销写事件,在有数据往外写的时候再注册写事件。

勾销写事件能够这样写 selectionKey.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);

Kafka 中如何解决的

在上一篇对 Kafka 网络层的剖析中, 咱们晓得了它是通过 NIO 和服务端进行通信的。其中在 KafkaChannel 的 send()办法外面有这样一段代码:

private boolean send(Send send) throws IOException {send.writeTo(transportLayer);
  if (send.completed())
    transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

  return send.completed();}

请留神这里的transportLayer.removeInterestOps(SelectionKey.OP_WRITE),它移除了注册的 OP_WRITE 事件。

既然勾销了,必定会增加。在发送数据之前 KafkaChannel 的 setSend()办法外面又注册了 OP_WRITE 事件

public void setSend(Send send) {if (this.send != null)
      throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is" + id);
  this.send = send;
  this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

所以还是那句话: 在没有数据能够往外写的时候要勾销写事件,在有数据往外写的时候再注册写事件。

正文完
 0