关于javascript:从libuv源码看清nodejs事件循环几个钻牛角尖的问题

35次阅读

共计 5461 个字符,预计需要花费 14 分钟才能阅读完成。

前言

本文是在开始学习 nodejs 事件循环时,联合官网文档和其余材料来解答本人了解不够清晰的问题

1.poll 阶段不阻塞(阻塞工夫 timeout 为 0)、有限阻塞(阻塞工夫 timout 为 -1),到底会不会执行 poll 回调队列的回调函数

1.1 I/ O 是什么,文件描述符又是什么?

I/O,就是输出(input)和输入(output)的简写。Linux 零碎中,把所有都看做是文件。文件(惯例文件、socket、FIFO、管道、终端……)就是一串二进制流,当信息替换中,咱们对这些流进行数据的收发操作,就是 I / O 操作。当过程关上现有的文件或者创立新文件时,内核向过程返回一个文件操作符。文件操作符是一个索引,它就是一个整数,指向零碎级文件形容表,它蕴含了文件操作、文件类型、拜访权限等等信息。所有执行 I / O 操作的零碎调用都会通过文件描述符。

1.2 epoll 是什么

epoll 是 Linux 内核的可扩大I/ O 事件告诉机制。在浏览器环境,当想监听鼠标事件时,咱们会element.addEventListener('click', cbFn),这就是浏览器的事件告诉机制。而相似地,libuv 调用 epoll 相干的 api 来实现 I / O 事件告诉。Observer(观察者)注册到被观察者(Subject),当被观察者(Subject)产生某种变动,会告诉已注册的 Observer(观察者)执行回调。

1.3 epoll 工作流程

epoll 有三个步骤:

  1. epoll_create,在 epoll 文件系统建设了个 file 节点,并开拓 epoll 本人的内核高速 cache 区,建设红黑树,调配好想要的 size 的内存对象,建设一个 list 链表,用于存储准备就绪的事件。
  2. epoll_ctl,把要监听的文件放到对应的红黑树上,给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的数据到了,就把它放到 就绪列表
  3. epoll_wait,察看就绪列表外面有没有数据,并进行提取和清空就绪列表。

1.4 libuv 对 poll 阶段的实现

void uv__io_poll(uv_loop_t* loop, int timeout) {
  // ...
  // 如果没有任何观察者,间接返回
  if (loop->nfds == 0) {assert(QUEUE_EMPTY(&loop->watcher_queue));
    return;
  }
  memset(&e, 0, sizeof(e));
  // 向 epoll 零碎注册所有 I / O 观察者
  while (!QUEUE_EMPTY(&loop->watcher_queue)) {
    // 获取队列头部,并将队列从 loop->watcher_queue 移除
    q = QUEUE_HEAD(&loop->watcher_queue);
    QUEUE_REMOVE(q);
    QUEUE_INIT(q);
    // 获取 I / O 观察者构造
    w = QUEUE_DATA(q, uv__io_t, watcher_queue);
    assert(w->pevents != 0);
    assert(w->fd >= 0);
    assert(w->fd < (int) loop->nwatchers);
    e.events = w->pevents;
    e.data.fd = w->fd;
    if (w->events == 0)
      op = EPOLL_CTL_ADD;
    else
      op = EPOLL_CTL_MOD;
    // epoll_ctl 操作,向 epoll 注册文件描述符及须要监控的 I / O 事件
    if (epoll_ctl(loop->backend_fd, op, w->fd, &e)) {if (errno != EEXIST)
        abort();
      assert(op == EPOLL_CTL_ADD);
      // loop->backend_fd 通过 epoll_create 创立
      if (epoll_ctl(loop->backend_fd, EPOLL_CTL_MOD, w->fd, &e))
        abort();}
    w->events = w->pevents;
  }
  // ...
  // 记录以后工夫,以便计算达到工夫之后跳出上面的循环
  base = loop->time;
  // count 缩小到 0,上面的循环跳出
  count = 48; /* Benchmarks suggest this gives the best throughput. */
  real_timeout = timeout;
  /* 
    进入 epoll_pwait 轮询 I / O 事件
    以下循环次要由 timeout 和 count 管制是否跳出,合乎整个事件循环
  */
  for (;;) {
    // ...
    // nfds 示意产生 I / O 事件的文件描述符的数量,0 为没有事件产生,可能因为超时工夫到了,或者 timeout=0
    // events 保留了从内核失去的事件汇合
    nfds = epoll_pwait(loop->backend_fd,
                       events,
                       ARRAY_SIZE(events),
                       timeout,
                       psigset);
    // ...
    // 没有 I / O 事件
    if (nfds == 0) {
      // ...
      // 如果 timeout 为 - 1 则持续循环
      if (timeout == -1)
        continue;
      // 如果 timeout 为 0 函数间接返回
      if (timeout == 0)
        return;
      // 更新下次 epoll_pwait 的 timeout 工夫
      goto update_timeout;
    }
    // epoll_pwait 返回谬误
    if (nfds == -1) {if (errno != EINTR)
        abort();
      // 如果 timeout 为 - 1 则持续循环
      if (timeout == -1)
        continue;
      // 如果 timeout 为 0 函数间接返回
      if (timeout == 0)
        return;
      // 更新下次 epoll_pwait 的 timeout 工夫
      goto update_timeout;
    }
    // ...
    // 获取 I / O 观察者,调用关联的回调函数
    for (i = 0; i < nfds; i++) {
      pe = events + i;
      fd = pe->data.fd;
      // ...
      // 如果存在无效事件
      if (pe->events != 0) {if (w == &loop->signal_IOWatcher)
          have_signals = 1;
        else
          // 执行回调
          w->cb(loop, w, pe->events);
        nevents++;
      }
    }
    // ...
    if (nevents != 0) {
      // 如果所有文件描述符上都有事件产生,且 count 不为 0,再循环一次
      if (nfds == ARRAY_SIZE(events) && --count != 0) {
        /* Poll for more events but don't block this time. */
        timeout = 0;
        continue;
      }
      return;
    }
    // 如果 timeout 为 0 函数间接返回
    if (timeout == 0)
      return;
    // 如果 timeout 为 - 1 则持续循环
    if (timeout == -1)
      continue;

// 从新计算 timeout
update_timeout:
    assert(timeout > 0);

    real_timeout -= (loop->time - base);
    if (real_timeout <= 0)
      return;
    // 残余 timeout
    timeout = real_timeout;
  }
}
  1. epoll 注册 I / O 观察者
  2. 调用 epoll_ctl,注册文件描述符以及须要监控的 I / O 事件
  3. 进入循环,调用 epoll_pwait 轮询 I / O 事件
    3.1 如果没有 I / O 事件,timeout 为 0,则间接退出轮询,timeout 为 -1,则持续轮询
    3.2 如果 epoll_pwait 返回谬误,timeout 为 0,则间接退出轮询,timeout 为 -1,则持续轮询
    3.3 有 I / O 事件,则调用关联的回调函数
    3.4 如果 timeout 为 0,则间接退出轮询
    3.5 如果 timeout 为 -1,则持续轮询

所以,当阻塞工夫 timeout 为 0,有 I / O 事件则执行回调,而后进入下个阶段,如果没有 I / O 事件,则间接进入下个阶段;当阻塞工夫 timeout 为 -1,则始终轮询 I / O 事件,有回调就执行。

2.pending callbacks 在什么时候注册回调队列的

static int uv__run_pending(uv_loop_t* loop) {
  QUEUE* q;
  QUEUE pq;
  uv__io_t* w;

  if (QUEUE_EMPTY(&loop->pending_queue))
    return 0;

  QUEUE_MOVE(&loop->pending_queue, &pq);

  while (!QUEUE_EMPTY(&pq)) {q = QUEUE_HEAD(&pq);
    QUEUE_REMOVE(q);
    QUEUE_INIT(q);
    w = QUEUE_DATA(q, uv__io_t, pending_queue);
    w->cb(loop, w, POLLOUT);
  }

  return 1;
}

该函数遍历 loop->pending_queue 队列节点,获得 I / O 观察者后调用 cb。经搜寻,只有 uv__io_feed 中存在向 loop->pending_queue 队列插入节点的代码,如下

void uv__io_feed(uv_loop_t* loop, uv__io_t* w) {if (QUEUE_EMPTY(&w->pending_queue))
    QUEUE_INSERT_TAIL(&loop->pending_queue, &w->pending_queue);
}

持续搜寻uv__io_feed,调用的中央如下

// src/unix/pipe.c
void uv_pipe_connect(uv_connect_t* req,
                    uv_pipe_t* handle,
                    const char* name,
                    uv_connect_cb cb) {
  // ...
  if (err)
    uv__io_feed(handle->loop, &handle->io_watcher);
}
// src/unix/stream.c
static void uv__write_req_finish(uv_write_t* req) {
  // ...
  uv__io_feed(stream->loop, &stream->io_watcher);
}
// src/unix/tpc.c
int uv__tcp_connect(uv_connect_t* req,
                    uv_tcp_t* handle,
                    const struct sockaddr* addr,
                    unsigned int addrlen,
                    uv_connect_cb cb) {
  // ...
  if (handle->delayed_error)
    uv__io_feed(handle->loop, &handle->io_watcher);
  // ...
}

还有三处在 src/unix/udp.c 处调用

所以,pending callbacks 阶段,别离在以下场景注册回调:

  1. pipe 连贯出错时
  2. stream 流写申请实现时
  3. tcp 连贯有提早谬误时
  4. udp 的几个场景

3.timer 阈值达到之后尽快执行,可能会提早它们的操作系统调度或其它正在运行的回调具体是指什么?

cpp 源码解读能够看这里:传送门
TL;DR; 流程如下:

  1. setTimeout/setInterval 是通过内置类 Timeout 实现的,它的工夫阈值为 1 ~ 231-1 ms,且为整数。所以 setTimeout(callback, 0) 会转换为setTimeout(callback, 1)
  2. 进入 tick 之后,会获取这一 tick 开始的工夫,通过 uv__hrtime 函数调用零碎工夫,过程中可能会受到其余利用的影响
  3. libuv 所有计时器都是以执行工夫节点形成的二叉最小堆构造来存储。二叉最小堆,特点是父节点始终比子节点小,所以根节点是最小的
  4. 计时器回调的执行工夫节点 = 注册回调时的 tick 开始工夫 time+ 计时器阈值 timeout
  5. 二叉最小堆的根节点计时器回调的执行工夫节点 <= 以后工夫循环 tick 的开始工夫,示意至多有一个过期的定时器,循环迭代二叉最小堆的根节点,并调用该计时器所应的回调函数。
  6. 二叉最小堆的根节点计时器回调的执行工夫节点 > 以后工夫循环 tick 的开始工夫,示意还没有到执行机会,依据二叉最小堆的特点,根节点的工夫都不能满足执行机会的话,那么前面的节点也没有过期。此时,退出 timer 阶段的回调函数执行,进入下一个阶段
  7. 执行 pending callbacks、idel、prepare 的回调函数
  8. 计算 poll 阻塞以后 tick 的工夫 p,如果 pending callbacks、idel、close callbacks 回调队列非空,则为 0,尽快进入下个 tick 执行对应的回调;如果有超时的计时器,则为 0,尽快进入下个 tick 执行超时计时器的回调;如果有未超时的计时器,则 阻塞工夫 = 二叉最小堆的根节点计时器回调的执行工夫节点 - 以后工夫循环 tick 的开始工夫;如果没有计时器,则为 -1,有限阻塞
  9. 执行 check、close callbacks 的回调函数

综上,操作系统调度或其它正在运行的回调,是指:

  1. 零碎工夫调用,过程中可能会受到其余利用的影响
  2. poll 阻塞的时候线程会挂起,CPU 会调度去做其余事,CPU 接回来解决的工夫不可管制
  3. 各阶段回调执行工夫不可管制

所以,才会呈现超过阈值尽快执行的成果,而不是到了工夫点马上执行。

Reference

epoll 的作用和原理介绍
从 libuv 看 nodejs 事件循环
libuv 源码剖析(五)IO 观察者(io_watcher)
文件描述符(File Descriptor)简介
I/ O 的内核原理与 5 种 I / O 模型

正文完
 0