乐趣区

epoll 是如何工作的

本文包含以下内容:

epoll 是如何工作的
本文不包含以下内容:

epoll 的用法

epoll 的缺陷

我实在非常喜欢像 epoll 这样使用方便、原理不深却有大用处的东西,即使它可能已经比较老了
select 和 poll 的缺点
epoll 对于动辄需要处理上万连接的网络服务应用的意义可以说是革命性的。对于普通的本地应用,select 和 poll 可能就很好用了,但对于像 C10K 这类高并发的网络场景,select 和 poll 就捉襟见肘了。
看看他们的 API
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);

int poll(struct pollfd *fds, nfds_t nfds, int timeout);
它们有一个共同点,用户需要将监控的文件描述符集合打包当做参数传入,每次调用时,这个集合都会从用户空间拷贝到内核空间,这么做的原因是内核对这个集合是无记忆的。对于绝大部分应用,这是一种十足的浪费,因为应用需要监控的描述符在大部分时间内基本都是不变的, 也许会有变化, 但都不大.
epoll 对此的改进
epoll 对此的改进也正是它的实现方式, 它需要完成以下两件事

描述符添加 — 内核可以记下用户关心哪些文件的哪些事件.
事件发生 — 内核可以记下哪些文件的哪些事件真正发生了, 当用户前来获取时, 能把结果提供给用户.

描述符添加
既然要有记忆, 那么理所当然的内核需要需要一个数据结构来记, 这个数据结构简单点就像下面这个图中的 epoll_instance, 它有一个链表头,链表上的元素 epoll_item 就是用户添加上去的,每一项都记录了描述符 fd 和感兴趣的事件组合 event
事件发生
事件有多种类型, 其中 POLLIN 表示的可读事件是用户使用的最多的。比如:

当一个 TCP 的 socket 收到报文,它会变得可读;
当一个 pipe 受到对端发送的数据,它会变得可读;
当一个 timerfd 对应的定时器超时,它会变得可读;

那么现在需要将这些可读事件和前面的 epoll_instance 关联起来。linux 中,每一个文件描述符在内核都有一个 struct file 结构对应, 这个 struct file 有一个 private_data 指针,根据文件的实际类型,它们指向不同的数据结构。

那么我能想到的最方便的做法就是 epoll_item 中增加一个指向 struct file 的指针,在 struct file 中增加一个指回 epoll item 的指针。
为了能记录有事件发生的文件,我们还需要在 epoll_instance 中增加一个就绪链表 readylist,在 private_data 指针指向的各种数据结构中增加一个指针回指到 struct file,在 epoll item 中增加一个挂接点字段,当一个文件可读时,就把它对应的 epoll item 挂接到 epoll_instance
在这之后,用户通过系统调用下来读取 readylist 就可以知道哪些文件就绪了。
好了,以上纯属我个人一拍脑袋想到的 epoll 大概的工作方式,其中一定包含不少缺陷。
不过真实的 epoll 的实现思想上与上面也差不多,下面来说一下
创建 epoll 实例
如同上面的 epoll_instance,内核需要一个数据结构保存记录用户的注册项,这个结构在内核中就是 struct eventpoll,当用户使用 epoll_create(2)或者 epoll_create1(2)时,内核 fs/eventpoll.c 实际就会创建一个这样的结构.
/*
* Create the internal data structure (“struct eventpoll”).
*/
error = ep_alloc(&ep);
这个结构中比较重要的部分就是几个链表了,不过实例刚创建时它们都是空的,后续可以看到它们的作用
epoll_create()最终会向用户返回一个文件描述符,用来方便用户之后操作该 epoll 实例,所以在创建 epoll 实例之后,内核就会分配一个文件描述符 fd 和对应的 struct file 结构
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));

file = anon_inode_getfile(“[eventpoll]”, &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
最后就是把它们和刚才的 epoll 实例 关联起来,然后向用户返回 fd
ep->file = file;

fd_install(fd, file);

return fd;
完成后,epoll 实例 就成这样了。

向 epoll 实例添加一个文件描述符
用户可以通过 epoll_ctl(2)向 epoll 实例 添加要监控的描述符和感兴趣的事件。如同前面的 epoll item,内核实际创建的是一个叫 struct epitem 的结构作为注册表项。如下图所示
为了在描述符很多时的也能有较高的搜索效率, epoll 实例 以红黑树的形式来组织每个 struct epitem (取代上面例子中链表)。struct epitem 结构中 ffd 是用来记录关联文件的字段, 同时它也作为该表项添加到红黑树上的 Key;
rdllink 的作用是当 fd 对应的文件准备好 (关心的事件发生) 时,内核会将它作为挂载点挂接到 epoll 实例中 ep->rdllist 链表上 fllink 的作用是作为挂载点挂接到 fd 对应的文件的 file->f_tfile_llink 链表上,一般这个链表最多只有一个元素,除非发生了 dup。pwqlist 是一个链表头,用来连接 poll wait queue。虽然它是链表,但其实链表上最多只会再挂接一个元素。
创建 struct epitem 的代码在 fs/evnetpoll.c 的 ep_insert()中
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM;
之后会进行各个字段初始化
/* Item initialization follow here … */
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
然后是设置局部变量 epq
struct ep_pqueue epq;

epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
epq 的数据结构是 struct ep_pqueue, 它是 poll table 的一层包装(加了一个 struct epitem* 的指针)
struct ep_pqueue{
poll_table pt;
struct epitem* epi;
}

poll table 包含一个函数和一个事件掩码
typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);

typedef struct poll_table_struct {
poll_queue_proc _qproc;
unsigned long _key; // store the interested event masks
}poll_table;
这个 poll table 用在哪里呢 ? 答案是, 用在了 struct file_operations 的 poll 操作 (这和本文开始说的 select`poll` 不是一个东西)
struct file_operations {
// code omitted…
unsigned int (*poll)(struct file*, struct poll_table_struct*);
// code omitted…
}
不同的文件有不同 poll 实现方式, 但一般它们的实现方式差不多是下面这种形式
static unsigned int XXXX_poll(struct file *file, poll_table *wait)
{
私有数据 = file->private_data;
unsigned int events = 0;

poll_wait(file, & 私有数据 ->wqh, wait);

if (文件可读了)
events |= POLLIN;

return events;
}
它们主要实现两个功能

将 XXX 放到文件私有数据的等待队列上 (一般 file->private_data 中都有一个等待队列头 wait_queue_head_t wqh), 至于 XXX 是啥, 各种类型文件实现各异, 取决于 poll_table 参数
查询是否真的有事件了, 若有则返回.

有兴趣的读者可以 timerfd_poll() 或者 pipe_poll() 它们的实现
poll_wait 的实现很简单, 就是调用 poll_table 中设置的函数, 将文件私有的等待队列当作了参数.
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && p->_qproc && wait_address)
p->_qproc(filp, wait_address, p);
}

回到 ep_insert()
所以这里设置的 poll_table 就是 ep_ptable_queue_proc().
然后
revents = ep_item_poll(epi, &epq.pt);
看其实现可以看到, 其实就是主动去调用文件的 poll 函数. 这里以 TCP socket 文件为例好了(毕竟网络应用是最广泛的)
/*
* ep_item_poll -> sock_poll -> tcp_poll
*/
unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
sock_poll_wait(file, sk_sleep(sk), wait); // will call poll_wait()
// code omitted…
}
可以看到, 最终还是调用到了 poll_wait(), 所以注册的 ep_ptable_queue_proc()会执行
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;

pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL)
这里面, 又分配了一个 struct eppoll_entry 结构. 其实它和 struct epitem 结构是一一对应的.
随后就是一些初始化
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); // set func:ep_poll_callback
pwq->whead = whead;
pwq->base = epi;

add_wait_queue(whead, &pwq->wait)
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
这其中比较重要的是设置 pwd->wait.func = ep_poll_callback。
现在, struct epitem 和 struct eppoll_entry 的关系就像下面这样
文件可读之后
对于 TCP socket, 当收到对端报文后, 最初设置的 sk->sk_data_ready 函数将被调用
void sock_init_data(struct socket *sock, struct sock *sk)
{
// code omitted…
sk->sk_data_ready = sock_def_readable;
// code omitted…
}
经过层层调用, 最终会调用到 __wake_up_common 这里面会遍历挂在 socket.wq 上的等待队列上的函数
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, int wake_flags, void *key)
{
wait_queue_t *curr, *next;

list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
unsigned flags = curr->flags;

if (curr->func(curr, mode, wake_flags, key) &&
(flags & WQ_FLAG_EXCLUSIVE) && !–nr_exclusive)
break;
}
}
于是, 顺着图中的这条红色轨迹, 就会调用到我们设置的 ep_poll_callback, 那么接下来就是要让 epoll 实例能够知有文件已经可读了

先从入参中取出当前表项 epi 和 ep
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;
再把 epi 挂到 ep 的就绪队列
if (!ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist)
}
接着唤醒阻塞在 (如果有) 该 epoll 实例的用户.
waitqueue_active(&ep->wq)
用户获取事件
谁有可能阻塞在 epoll 实例的等待队列上呢? 当然就是使用 epoll_wait 来从 epoll 实例获取发生了感兴趣事件的的描述符的用户.epoll_wait 会调用到 ep_poll()函数.
if (!ep_events_available(ep)) {
/*
* We don’t have any available event to return to the caller.
* We need to sleep here, and we will be wake up by
* ep_poll_callback() when events will become available.
*/
init_waitqueue_entry(&wait, current);
__add_wait_queue_exclusive(&ep->wq, &wait);
如果没有事件, 我们就将自己挂在 epoll 实例的等待队列上然后睡去 ….. 如果有事件, 那么我们就要将事件返回给用户
ep_send_events(ep, events, maxevents)
参考资料
the-implementation-of-epoll

退出移动版