poll 和 epoll 的应用应该不必再多说了。当 fd 很多时,应用 epoll 比 poll 效率更高。咱们通过内核源码剖析来看看到底是为什么。
poll 分析 poll 零碎调用:
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
对应的实现代码为:
[fs/select.c -->sys_poll] asmlinkage long sys_poll(struct pollfd __user * ufds, unsigned int nfds, long timeout){struct poll_wqueues table;int fdcount, err; unsigned int i;struct poll_list *head;struct poll_list *walk;/* Do a sanity check on nfds ... */ /* 用户给的 nfds 数不能够超过一个 struct file 构造反对的最大 fd 数(默认是 256)*/ if (nfds > current->files->max_fdset && nfds > OPEN_MAX) return -EINVAL; if (timeout) {/* Careful about overflow in the intermediate values */ if ((unsigned long) timeout < MAX_SCHEDULE_TIMEOUT / HZ)timeout = (unsigned long)(timeout*HZ+999)/1000+1; else /* Negative or overflow */ timeout = MAX_SCHEDULE_TIMEOUT; } poll_initwait(&table);
其中 poll_initwait 较为要害,从字面上看,应该是初始化变量 table,留神此处 table 在整个执行 poll 的过程中是很要害的变量。而 struct poll_table 其实就只蕴含了一个函数指针:
[fs/poll.h] /* * structures and helpers for f_op->poll implementations */ typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, structpoll_table_struct *); typedef struct poll_table_struct {poll_queue_proc qproc;} poll_table;
当初咱们来看看 poll_initwait 到底在做些什么
[fs/select.c] void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p); void poll_initwait(struct poll_wqueues *pwq) {&(pwq->pt)->qproc = __pollwait; /* 此行曾经被我“翻译”了,不便观看 */pwq->error = 0; pwq->table = NULL; }
须要 C /C++ Linux 服务器架构师学习材料加 q 裙 3223296726(材料包含 C /C++,Linux,golang 技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg 等),收费分享
很显著,poll_initwait 的次要动作就是把 table 变量的成员 poll_table 对应的回调函数置__pollwait。这个__pollwait 不仅是 poll 零碎调用须要,select 零碎调用也一样是用这个__pollwait,说白了,这是个操作系统的异步操作的“御用”回调函数。当然了,epoll 没有用这个,它另外新增了一个回调函数,以达到其高效运行的目标,这是后话,暂且不表。咱们先不探讨__pollwait 的具体实现,还是持续看 sys_poll:
[fs/select.c -->sys_poll] head = NULL; walk = NULL; i = nfds; err = -ENOMEM;while(i!=0) {struct poll_list *pp; pp = kmalloc(sizeof(struct poll_list)+ sizeof(struct pollfd)* (i>POLLFD_PER_PAGE?POLLFD_PER_PAGE:i), GFP_KERNEL); if(pp==NULL) goto out_fds; pp->next=NULL;pp->len = (i>POLLFD_PER_PAGE?POLLFD_PER_PAGE:i); if (head == NULL) head = pp; else walk->next = pp;walk = pp;if (copy_from_user(pp->entries, ufds + nfds-i, sizeof(struct pollfd)*pp->len)) {err = -EFAULT; goto out_fds;} i -= pp->len; }fdcount = do_poll(nfds, head, &table, timeout);
这一大堆代码就是建设一个链表,每个链表的节点是一个 page 大小(通常是 4k),这链表节点由一个指向 struct poll_list 的指针掌控,而泛滥的 struct pollfd 就通过 struct_list 的 entries 成员拜访。下面的循环就是把用户态的 struct pollfd 拷进这些 entries 里。通常用户程序的 poll 调用就监控几个 fd,所以下面这个链表通常也就只须要一个节点,即操作系统的一页。然而,当用户传入的 fd 很多时,因为 poll 零碎调用每次都要把所有 struct pollfd 拷进内核,所以参数传递和页调配此时就成了 poll 零碎调用的性能瓶颈。最初一句 do_poll,咱们跟进去:
[fs/select.c-->sys_poll()-->do_poll()] static void do_pollfd(unsigned int num, struct pollfd * fdpage, poll_table ** pwait, int *count) {int i; for (i = 0; i < num; i++) {int fd; unsigned int mask; struct pollfd *fdp; mask = 0; fdp = fdpage+i; fd = fdp->fd; if (fd >= 0) {struct file * file = fget(fd); mask = POLLNVAL; if (file != NULL) {mask = DEFAULT_POLLMASK; if (file->f_op && file->f_op->poll) mask = file->f_op->poll(file, *pwait); mask &= fdp->events | POLLERR | POLLHUP;fput(file); } if (mask) {*pwait = NULL; (*count)++; }} fdp->revents = mask; } } static int do_poll(unsigned int nfds, struct poll_list *list, struct poll_wqueues *wait, long timeout) {int count = 0; poll_table* pt = &wait->pt; if (!timeout) pt = NULL; for (;;) {struct poll_list *walk;set_current_state(TASK_INTERRUPTIBLE); walk = list; while(walk != NULL) {do_pollfd( walk->len, walk->entries, &pt, &count);walk = walk->next;} pt = NULL;if (count || !timeout || signal_pending(current))break;count = wait->error; if (count) break;timeout = schedule_timeout(timeout); /* 让 current 挂起,别的过程跑,timeout 到了当前再回来运行 current*/} __set_current_state(TASK_RUNNING);return count; }
留神 set_current_state 和 signal_pending,它们两句保障了当用户程序在调用 poll 后挂起时,发信号能够让程序迅速推出 poll 调用,而通常的零碎调用是不会被信号打断的。
纵览 do_poll 函数,次要是在循环内期待,直到 count 大于 0 才跳出循环,而 count 次要是靠 do_pollfd 函数解决。留神这段代码:
while(walk != NULL) {do_pollfd( walk->len, walk->entries, &pt, &count); walk = walk->next; }
当用户传入的 fd 很多时(比方 1000 个),对 do_pollfd 就会调用很屡次,poll 效率瓶颈的另一起因就在这里。do_pollfd 就是针对每个传进来的 fd,调用它们各自对应的 poll 函数,简化一下调用过程,如下:
struct file* file = fget(fd);file->f_op->poll(file, &(table->pt));
如果 fd 对应的是某个 socket,do_pollfd 调用的就是网络设备驱动实现的 poll;如果 fd 对应的是某个 ext3 文件系统上的一个关上文件,那 do_pollfd 调用的就是 ext3 文件系统驱动实现的 poll。一句话,这个 file->f_op->poll 是设施驱动程序实现的,那设施驱动程序的 poll 实现通常又是什么样子呢?其实,设施驱动程序的规范实现是:调用 poll_wait,即以设施本人的期待队列为参数(通常设施都有本人的期待队列,不然一个不反对异步操作的设施会让人很郁闷)调用 struct poll_table 的回调函数。作为驱动程序的代表,咱们看看 socket 在应用 tcp 时的代码:
[net/ipv4/tcp.c-->tcp_poll]unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait){unsigned int mask; struct sock *sk = sock->sk;struct tcp_opt *tp = tcp_sk(sk); poll_wait(file, sk->sk_sleep, wait);
代码就看这些,剩下的无非就是判断状态、返回状态值,tcp_poll 的外围实现就是 poll_wait,而
poll_wait 就是调用 struct poll_table 对应的回调函数,那 poll 零碎调用对应的回调函数就是__poll_wait,所以这里简直就能够把 tcp_poll 了解为一个语句:
__poll_wait(file, sk->sk_sleep, wait);
由此也能够看出,每个 socket 本人都带有一个期待队列 sk_sleep,所以下面咱们所说的“设施的期待队列”其实不止一个。这时候咱们再看看__poll_wait 的实现:
[fs/select.c-->__poll_wait()] void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *_p){struct poll_wqueues *p = container_of(_p, struct poll_wqueues, pt); struct poll_table_page *table = p->table; if (!table || POLL_TABLE_FULL(table)) {struct poll_table_page *new_table; new_table = (struct poll_table_page *) __get_free_page(GFP_KERNEL); if (!new_table) {p->error = -ENOMEM; __set_current_state(TASK_RUNNING); return;} new_table->entry = new_table->entries; new_table->next = table; p->table = new_table; table = new_table; } /* Add a new entry */ {struct poll_table_entry * entry = table->entry; table->entry = entry+1;get_file(filp); entry->filp = filp;entry->wait_address = wait_address; init_waitqueue_entry(&entry->wait, current); add_wait_queue(wait_address,&entry->wait); }}
__poll_wait 的作用就是创立了上图所示的数据结构(一次__poll_wait 即一次设施 poll 调用只创立一个 poll_table_entry),并通过 struct poll_table_entry 的 wait 成员,把 current 挂在了设施的期待队列
上,此处的期待队列是 wait_address,对应 tcp_poll 里的 sk->sk_sleep。当初咱们能够回顾一下 poll 零碎调用的原理了:先注册回调函数__poll_wait,再初始化 table 变量(类型为 struct poll_wqueues),接着拷贝用户传入的 struct pollfd(其实次要是 fd),而后轮流调用所有 fd 对应的 poll(把 current 挂到各个 fd 对应的设施期待队列上)。在设施收到一条音讯(网络设备)或填写完文件数据(磁盘设施)后,会唤醒设施期待队列上的过程,这时 current 便被唤醒了。current 醒来后来到 sys_poll 的操作绝对简略,这里就不逐行剖析了。
epoll
通过下面的剖析,poll 运行效率的两个瓶颈曾经找出,当初的问题是怎么改良。首先,每次 poll 都要把 1000 个 fd 拷入内核,太不迷信了,内核干嘛不本人保留曾经拷入的 fd 呢?答对了,epoll 就是本人保留拷入的 fd,它的 API 就曾经阐明了这一点——不是 epoll_wait 的时候才传入 fd,而是通过 epoll_ctl 把所有 fd 传入内核再一起 ”wait”,这就省掉了不必要的反复拷贝。其次,在 epoll_wait 时,也不是把 current 轮流的退出 fd 对应的设施期待队列,而是在设施期待队列醒来时调用一个回调函数(当然,这就须要“唤醒回调”机制),把产生事件的 fd 纳入一个链表,而后返回这个链表上的 fd。
epoll 分析
epoll 是个 module,所以先看看 module 的入口 eventpoll_init
[fs/eventpoll.c-->evetpoll_init()] static int __init eventpoll_init(void) {int error; init_MUTEX(&epsem); /* Initialize the structure used to perform safe poll wait head wake ups */ ep_poll_safewake_init(&psw); /* Allocates slab cache used to allocate "struct epitem" items */ epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),0, SLAB_HWCACHE_ALIGN|EPI_SLAB_DEBUG|SLAB_PANIC, NULL, NULL); /* Allocates slab cache used to allocate "struct eppoll_entry" */ pwq_cache = kmem_cache_create("eventpoll_pwq", sizeof(struct eppoll_entry), 0, EPI_SLAB_DEBUG|SLAB_PANIC, NULL, NULL); /* * Register the virtual file system that will be the source of inodes * for the eventpoll files */ error = register_filesystem(&eventpoll_fs_type); if (error)goto epanic;/* Mount the above commented virtual file system */ eventpoll_mnt = kern_mount(&eventpoll_fs_type); error = PTR_ERR(eventpoll_mnt); if (IS_ERR(eventpoll_mnt))goto epanic;DNPRINTK(3, (KERN_INFO "[%p] eventpoll: successfully initialized.\n", current));return 0; epanic: panic("eventpoll_init() failed\n"); }
很乏味,这个 module 在初始化时注册了一个新的文件系统,叫 ”eventpollfs”(在 eventpoll_fs_type 构造里),而后挂载此文件系统。另外创立两个内核 cache(在内核编程中,如果须要频繁调配小块内存,应该创立 kmem_cahe 来做“内存池”), 别离用于寄存 struct epitem 和 eppoll_entry。如果当前要开发新的文件系统,能够参考这段代码。当初想想 epoll_create 为什么会返回一个新的 fd?因为它就是在这个叫做 ”eventpollfs” 的文件系统里创立了一个新文件!如下:
[fs/eventpoll.c-->sys_epoll_create()] asmlinkage long sys_epoll_create(int size) {int error, fd; struct inode *inode; struct file *file; DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_create(%d)\n", current, size)); /* Sanity check on the size parameter */ error = -EINVAL; if (size <= 0) goto eexit_1; /* * Creates all the items needed to setup an eventpoll file. That is,* a file structure, and inode and a free file descriptor. */ error = ep_getfd(&fd, &inode, &file); if (error)goto eexit_1;/* Setup the file internal data structure ("struct eventpoll") */ error = ep_file_init(file); if (error) goto eexit_2;
函数很简略,其中 ep_getfd 看上去是“get”,其实在第一次调用 epoll_create 时,它是要创立新 inode、新的 file、新的 fd。而 ep_file_init 则要创立一个 struct eventpoll 构造,并把它放入 file-
>private_data,留神,这个 private_data 前面还要用到的。看到这里,兴许有人要问了,为什么 epoll 的开发者不做一个内核的超级大 map 把用户要创立的 epoll 句柄存起来,在 epoll_create 时返回一个指针?那仿佛很直观呀。然而,认真看看,linux 的零碎调用有多少是返回指针的?你会发现简直没有!(特此强调,malloc 不是零碎调用,malloc 调用的 brk 才是)因为 linux 做为 unix 的最卓越的继承人,它遵循了 unix 的一个微小长处——所有皆文件,输入输出是文件、socket 也
是文件,所有皆文件意味着应用这个操作系统的程序能够非常简单,因为一切都是文件操作而已!(unix 还不是齐全做到,plan 9 才算)。而且应用文件系统有个益处:epoll_create 返回的是一个 fd,而不是该死的指针,指针如果指错了,你几乎没方法判断,而 fd 则能够通过 current->files->fd_array[] 找到其真伪。epoll_create 好了,该 epoll_ctl 了,咱们略去判断性的代码:
[fs/eventpoll.c-->sys_epoll_ctl()] asmlinkage long sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user *event) {int error; struct file *file, *tfile; struct eventpoll *ep; struct epitem *epi; struct epoll_event epds;.... epi = ep_find(ep, tfile, fd);error = -EINVAL;switch (op) {case EPOLL_CTL_ADD:if (!epi) {epds.events |= POLLERR | POLLHUP; error = ep_insert(ep, &epds, tfile, fd); } else error = -EEXIST; break; case EPOLL_CTL_DEL: if (epi) error = ep_remove(ep, epi);elseerror = -ENOENT; break;case EPOLL_CTL_MOD: if (epi) {epds.events |= POLLERR | POLLHUP; error = ep_modify(ep, epi, &epds); } elseerror = -ENOENT; break;}
原来就是在一个大的构造(当初先不论是什么大构造)里先 ep_find,如果找到了 struct epitem 而用户操作是 ADD,那么返回 -EEXIST;如果是 DEL,则 ep_remove。如果找不到 struct epitem 而用户操作是 ADD,就 ep_insert 创立并插入一个。很直白。那这个“大构造”是什么呢?看 ep_find 的调用形式,ep 参数应该是指向这个“大构造”的指针,再看 ep = file->private_data,咱们才明确,原来这个“大构造”就是那个在 epoll_create 时创立的 struct eventpoll,具体再看看 ep_find 的实现,发现原来是 struct eventpoll 的 rbr 成员(struct rb_root),原来这是一个红黑树的根!而红黑树上挂的都是 struct epitem。当初分明了,一个新创建的 epoll 文件带有一个 struct eventpoll 构造,这个构造上再挂一个红黑树,而这个红黑树就是每次 epoll_ctl 时 fd 寄存的中央!当初数据结构都曾经分明了,咱们来看最外围的:
[fs/eventpoll.c-->sys_epoll_wait()] asmlinkage long sys_epoll_wait(int epfd, struct epoll_event __user *events, int maxevents, int timeout) {int error; struct file *file; struct eventpoll *ep; DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_wait(%d, %p, %d, %d)\n",current, epfd, events, maxevents, timeout)); /* The maximum number of event must be greater than zero */ if (maxevents <= 0) return -EINVAL;/* Verify that the area passed by the user is writeable */ if ((error = verify_area(VERIFY_WRITE, events, maxevents * sizeof(structepoll_event))))goto eexit_1; /* Get the "struct file *" for the eventpoll file */ error = -EBADF; file = fget(epfd); if (!file) goto eexit_1; /* * We have to check that the file structure underneath the fd * the user passed to us _is_ an eventpoll file. */error = -EINVAL; if (!IS_FILE_EPOLL(file)) goto eexit_2; /* * At this point it is safe to assume that the "private_data" contains * our own data structure. */ ep = file->private_data;/* Time to fish for events ... */ error = ep_poll(ep, events, maxevents, timeout); eexit_2: fput(file);eexit_1:DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_wait(%d, %p, %d, %d) =%d\n", current, epfd, events, maxevents, timeout, error)); return error; }
故伎重演,从 file->private_data 中拿到 struct eventpoll,再调用 ep_poll
[fs/eventpoll.c-->sys_epoll_wait()->ep_poll()] static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout) {int res, eavail;unsigned long flags; long jtimeout; wait_queue_t wait; /* * Calculate the timeout by checking for the "infinite" value ( -1) * and the overflow condition. The passed timeout is in milliseconds, * that why (t * HZ) / 1000. */ jtimeout = timeout == -1 || timeout > (MAX_SCHEDULE_TIMEOUT - 1000) / HZ ? MAX_SCHEDULE_TIMEOUT: (timeout * HZ + 999) / 1000;retry: write_lock_irqsave(&ep->lock, flags); res = 0; if (list_empty(&ep->rdllist)) {/* * We don't have any available event to return to the caller. * We need to sleep here, and we will be wake up by * ep_poll_callback() when events will become available.*/ init_waitqueue_entry(&wait, current); add_wait_queue(&ep->wq, &wait); for (;;) {/* * We don't want to sleep if the ep_poll_callback() sends us * a wakeup in between. That's why we set the task state * to TASK_INTERRUPTIBLE before doing the checks. */ set_current_state(TASK_INTERRUPTIBLE); if (!list_empty(&ep->rdllist) || !jtimeout) break; if (signal_pending(current)) {res = -EINTR; break;} write_unlock_irqrestore(&ep->lock, flags); jtimeout = schedule_timeout(jtimeout); write_lock_irqsave(&ep->lock, flags);} remove_wait_queue(&ep->wq, &wait); set_current_state(TASK_RUNNING);}
又是一个大循环,不过这个大循环比 poll 的那个好,因为认真一看——它竟然除了睡觉和判断 ep->rdllist 是否为空以外,啥也没做!什么也没做当然效率高了,但到底是谁来让 ep->rdllist 不为空呢?答案是 ep_insert 时设下的回调函数
[fs/eventpoll.c-->sys_epoll_ctl()-->ep_insert()] static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd) {int error, revents, pwake = 0; unsigned long flags; struct epitem *epi;struct ep_pqueue epq; error = -ENOMEM; if (!(epi = EPI_MEM_ALLOC())) goto eexit_1; /* Item initialization follow here ... */EP_RB_INITNODE(&epi->rbn);INIT_LIST_HEAD(&epi->rdllink); INIT_LIST_HEAD(&epi->fllink); INIT_LIST_HEAD(&epi->txlink); INIT_LIST_HEAD(&epi->pwqlist); epi->ep = ep; EP_SET_FFD(&epi->ffd, tfile, fd); epi->event = *event; atomic_set(&epi->usecnt, 1);epi->nwait = 0; /* Initialize the poll table using the queue callback */ epq.epi = epi; init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);/** Attach the item to the poll hooks and get current event bits. * We can safely use the file* here because its usage count has * been increased by the caller of this function. */ revents = tfile->f_op->poll(tfile, &epq.pt);
咱们留神 init_poll_funcptr(&epq.pt, ep_ptable_queue_proc); 这一行,其实就是 &(epq.pt)->qproc = ep_ptable_queue_proc; 紧接着 tfile->f_op->poll(tfile, &epq.pt) 其实就是调用被监控文件(epoll 里叫“target file”) 的 poll 办法,而这个 poll 其实就是调用 poll_wait(还记得 poll_wait 吗?每个反对 poll 的设施驱动程序都要调用的),最初就是调用 ep_ptable_queue_proc。这是比拟难解的一个调用关系,因为不是语言级的间接调用。ep_insert 还把 struct epitem 放到 struct file 里的 f_ep_links 连表里,以不便查找,struct epitem 里的 fllink 就是负担这个使命的。
[fs/eventpoll.c-->ep_ptable_queue_proc()] static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,poll_table *pt) {struct epitem *epi = EP_ITEM_FROM_EPQUEUE(pt); struct eppoll_entry *pwq; if (epi->nwait >= 0 && (pwq = PWQ_MEM_ALLOC())) {init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); pwq->whead = whead; pwq->base = epi; add_wait_queue(whead, &pwq->wait);list_add_tail(&pwq->llink, &epi->pwqlist); epi->nwait++; } else {/* We have to signal that an error occurred */epi->nwait = -1;} }
下面的代码就是 ep_insert 中要做的最重要的事:创立 struct eppoll_entry,设置其唤醒回调函数为
ep_poll_callback,而后退出设施期待队列(留神这里的 whead 就是上一章所说的每个设施驱动都要带的期待队列)。只有这样,当设施就绪,唤醒期待队列上的期待着时,ep_poll_callback 就会被调用。每次调用 poll 零碎调用,操作系统都要把 current(以后过程)挂到 fd 对应的所有设施的期待队列上,能够设想,fd 多到上千的时候,这样“挂”法很麻烦;而每次调用 epoll_wait 则没有这么罗嗦,epoll 只在 epoll_ctl 时把 current 挂一遍(这第一遍是免不了的)并给每个 fd 一个命令“好了就调回调函数”,如果设施有事件了,通过回调函数,会把 fd 放入 rdllist,而每次调用 epoll_wait 就只是收集 rdllist 里的 fd 就能够了——epoll 奇妙的利用回调函数,实现了更高效的事件驱动模型。当初咱们猜也能猜出来 ep_poll_callback 会干什么了——必定是把红黑树上的收到 event 的 epitem(代表每个 fd)插入 ep->rdllist 中,这样,当 epoll_wait 返回时,rdllist 里就都是就绪的 fd 了!
[fs/eventpoll.c-->ep_poll_callback()] static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key) {int pwake = 0; unsigned long flags; struct epitem *epi = EP_ITEM_FROM_WAIT(wait); struct eventpoll *ep = epi->ep; DNPRINTK(3, (KERN_INFO "[%p] eventpoll: poll_callback(%p) epi=%pep=%p\n", current, epi->file, epi, ep)); write_lock_irqsave(&ep->lock, flags); /* * If the event mask does not contain any poll(2) event, we consider the * descriptor to be disabled. This condition is likely the effect of the * EPOLLONESHOT bit that disables the descriptor when an event is received, * until the next EPOLL_CTL_MOD will be issued.*/ if (!(epi->event.events & ~EP_PRIVATE_BITS)) goto is_disabled; /* If this file is already in the ready list we exit soon */ if (EP_IS_LINKED(&epi->rdllink)) goto is_linked; list_add_tail(&epi->rdllink, &ep->rdllist);is_linked: /* * Wake up (if active) both the eventpoll wait list and the ->poll() * wait list. */ if (waitqueue_active(&ep->wq)) wake_up(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; is_disabled: write_unlock_irqrestore(&ep->lock, flags); /* We have to call this outside the lock */if (pwake) ep_poll_safewake(&psw, &ep->poll_wait); return 1; }
真正重要的只有 list_add_tail(&epi->rdllink, &ep->rdllist); 一句,就是把 struct epitem 放到 struct eventpoll 的 rdllist 中去。当初咱们能够画出 epoll 的外围数据结构图了:
epoll 独有的 EPOLLET
EPOLLET 是 epoll 零碎调用独有的 flag,ET 就是 Edge Trigger(边缘触发)的意思,具体含意和利用大家可 google 之。有了 EPOLLET,反复的事件就不会总是进去打搅程序的判断,故而常被应用。那 EPOLLET 的原理是什么呢?epoll 把 fd 都挂上一个回调函数,当 fd 对应的设施有音讯时,就把 fd 放入 rdllist 链表,这样 epoll_wait 只有查看这个 rdllist 链表就能够晓得哪些 fd 有事件了。咱们看看 ep_poll 的最初几行代码:
[fs/eventpoll.c->ep_poll()] /* * Try to transfer events to user space. In case we get 0 events and * there's still timeout left over, we go trying again in search of * more luck. */if (!res && eavail && !(res = ep_events_transfer(ep, events, maxevents)) && jtimeout) goto retry; return res; }
把 rdllist 里的 fd 拷到用户空间,这个工作是 ep_events_transfer 做的:
[fs/eventpoll.c->ep_events_transfer()] static int ep_events_transfer(struct eventpoll *ep,struct epoll_event __user *events, int maxevents) {int eventcnt = 0; struct list_head txlist; INIT_LIST_HEAD(&txlist); /* * We need to lock this because we could be hit by * eventpoll_release_file() and epoll_ctl(EPOLL_CTL_DEL). */down_read(&ep->sem);/* Collect/extract ready items */ if (ep_collect_ready_items(ep, &txlist, maxevents) > 0) {/* Build result set in userspace */eventcnt = ep_send_events(ep, &txlist, events);/* Reinject ready items into the ready list */ep_reinject_items(ep, &txlist); }up_read(&ep->sem); return eventcnt;}
代码很少,其中 ep_collect_ready_items 把 rdllist 里的 fd 挪到 txlist 里(挪完后 rdllist 就空了),接着
ep_send_events 把 txlist 里的 fd 拷给用户空间,而后 ep_reinject_items 把一部分 fd 从 txlist 里“返还”给
rdllist 以便下次还能从 rdllist 里发现它。其中 ep_send_events 的实现:
[fs/eventpoll.c->ep_send_events()] static int ep_send_events(struct eventpoll *ep, struct list_head *txlist, struct epoll_event __user *events) {int eventcnt = 0; unsigned int revents; struct list_head *lnk; struct epitem *epi; /* * We can loop without lock because this is a task private list. * The test done during the collection loop will guarantee us that * another task will not try to collect this file. Also, items * cannot vanish during the loop because we are holding "sem". */ list_for_each(lnk, txlist) {epi = list_entry(lnk, struct epitem, txlink); /* * Get the ready file event set. We can safely use the file * because we are holding the "sem" in read and this will * guarantee that both the file and the item will not vanish. */ revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL); /* * Set the return event set for the current file descriptor. * Note that only the task task was successfully able to link * the item to its "txlist" will write this field. */ epi->revents = revents & epi->event.events; if (epi->revents) {if (__put_user(epi->revents, &events[eventcnt].events) || __put_user(epi->event.data, &events[eventcnt].data)) return -EFAULT; if (epi->event.events & EPOLLONESHOT) epi->event.events &= EP_PRIVATE_BITS; eventcnt++; } } return eventcnt;}
这个拷贝实现其实没什么可看的,然而请留神 revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL); 这一行,这个 poll 很狡猾,它把第二个参数置为 NULL 来调用。咱们先看一下设施驱动通常是怎么实现 poll 的:
static unsigned int scull_p_poll(struct file *filp, poll_table *wait){struct scull_pipe *dev = filp->private_data;unsigned int mask = 0;/** The buffer is circular; it is considered full* if "wp" is right behind "rp" and empty if the* two are equal.*/down(&dev->sem);poll_wait(filp, &dev->inq, wait);poll_wait(filp, &dev->outq, wait);if (dev->rp != dev->wp)mask |= POLLIN | POLLRDNORM; /* readable */if (spacefree(dev))mask |= POLLOUT | POLLWRNORM; /* writable */up(&dev->sem);return mask;}
下面这段代码摘自《linux 设施驱动程序(第三版)》,相对经典,设施先要把 current(以后过程)挂在 inq 和 outq 两个队列上(这个“挂”操作是 wait 回调函数指针做的),而后等设施来唤醒,唤醒后就能通过 mask 拿到事件掩码了(留神那个 mask 参数,它就是负责拿事件掩码的)。那如果 wait 为 NULL,poll_wait 会做些什么呢?
[include/linux/poll.h->poll_wait] static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address,poll_table *p) {if (p && wait_address) p->qproc(filp, wait_address, p); }
如果 poll_table 为空,什么也不做。咱们倒回 ep_send_events,那句标红的 poll,实际上就是“我不想休眠,我只想拿到事件掩码”的意思。而后再把拿到的事件掩码拷给用户空间。ep_send_events 实现后,就轮到 ep_reinject_items 了:
[fs/eventpoll.c->ep_reinject_items] static void ep_reinject_items(struct eventpoll *ep, struct list_head *txlist) {int ricnt = 0, pwake = 0; unsigned long flags; struct epitem *epi; write_lock_irqsave(&ep->lock, flags); while (!list_empty(txlist)) {epi = list_entry(txlist->next, struct epitem, txlink);/* Unlink the current item from the transfer list */ EP_LIST_DEL(&epi->txlink);/** If the item is no more linked to the interest set, we don't * have to push it inside the ready list because the following * ep_release_epitem() is going to drop it. Also, if the current * item is set to have an Edge Triggered behaviour, we don't have * to push it back either. */ if (EP_RB_LINKED(&epi->rbn) && !(epi->event.events & EPOLLET) && (epi->revents & epi->event.events) && !EP_IS_LINKED(&epi->rdllink)) {list_add_tail(&epi->rdllink, &ep->rdllist);ricnt++; } } if (ricnt) {/** Wake up ( if active) both the eventpoll wait list and the ->poll() * wait list. */ if (waitqueue_active(&ep->wq)) wake_up(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; }write_unlock_irqrestore(&ep->lock, flags); /* We have to call this outside the lock */ if (pwake) ep_poll_safewake(&psw, &ep->poll_wait); }
ep_reinject_items 把 txlist 里的一部分 fd 又放回 rdllist,那么,是把哪一部分 fd 放回去呢?看下面 if (EP_RB_LINKED(&epi->rbn) && !(epi->event.events & EPOLLET) && 这个判断——是哪些“没有标上 EPOLLET”(标红代码)且“事件被关注”(标蓝代码)的 fd 被从新放回了 rdllist。那么下次 epoll_wait 当然会又把 rdllist 里的 fd 拿来拷给用户了。举个例子。假如一个 socket,只是 connect,还没有收发数据,那么它的 poll 事件掩码总是有 POLLOUT 的(参见下面的驱动示例),每次调用 epoll_wait 总是返回 POLLOUT 事件(比较烦),因为它的 fd 就总是被放回 rdllist;如果此时有人往这个 socket 里写了一大堆数据,造成 socket 塞住(不可写了),那么 (epi->revents & epi->event.events) && !EP_IS_LINKED(&epi->rdllink)) {里的判断就不成立了(没有 POLLOUT 了),fd 不会放回 rdllist,epoll_wait 将不会再返回用户 POLLOUT 事件。当初咱们给这个 socket 加上 EPOLLET,而后 connect,没有收发数据,此时,if (EP_RB_LINKED(&epi->rbn) && !(epi->event.events & EPOLLET) && 判断又不成立了,所以 epoll_wait 只会返回一次 POLLOUT 告诉给用户(因为此 fd 不会再回到 rdllist 了),接下来的 epoll_wait 都不会有任何事件告诉了。