相干视频举荐
面试中正经“八股文”网络原理tcp/udp,网络编程epoll/reactor
epoll 原理分析 以及 reactor 模型利用
epoll原理分析以及三握四挥的解决
LinuxC++后盾服务器开发架构师收费学习地址
通过上一剖析,poll运行效率的两个瓶颈曾经找出,当初的问题是怎么改良。首先,如果要监听1000个fd,每次poll都要把1000个fd 拷入内核,太不迷信了,内核干嘛不本人保留曾经拷入的fd呢?答对了,epoll就是本人保留拷入的fd,它的API就曾经阐明了这一点——不是 epoll_wait的时候才传入fd,而是通过epoll_ctl把所有fd传入内核再一起"wait",这就省掉了不必要的反复拷贝。其次,在 epoll_wait时,也不是把current轮流的退出fd对应的设施期待队列,而是在设施期待队列醒来时调用一个回调函数(当然,这就须要“唤醒回调”机制),把产生事件的fd纳入一个链表,而后返回这个链表上的fd。
另外,epoll机制实现了本人特有的文件系统eventpoll filesystem
1. 内核数据结构
(1) struct eventpoll { spinlock_t lock; struct mutex mtx; wait_queue_head_t wq; /* Wait queue used by sys_epoll_wait() ,调用epoll_wait()时, 咱们就是"睡"在了这个期待队列上*/ wait_queue_head_t poll_wait; /* Wait queue used by file->poll() , 这个用于epollfd本事被poll的时候*/ struct list_head rdllist; /* List of ready file descriptors, 所有曾经ready的epitem都在这个链表外面*/ structrb_root rbr; /* RB tree root used to store monitored fd structs, 所有要监听的epitem都在这里*/ epitem *ovflist; /*寄存的epitem都是咱们在传递数据给用户空间时监听到了事件*/. struct user_struct *user; /*这里保留了一些用户变量,比方fd监听数量的最大值等*/ };
通过epoll_ctl接口退出该epoll描述符监听的套接字则属于socket filesystem,这点肯定要留神。每个增加的待监听(这里监听和listen调用不同)都对应于一个epitem构造体,该构造体已红黑树的构造组织,eventpoll构造中保留了树的根节点(rbr成员)。同时有监听事件到来的套接字的该构造以双向链表组织起来,链表头保留在eventpoll中(rdllist成员)。
/* * Each file descriptor added to the eventpoll interface will have an entry of this type linked to the "rbr" RB tree. */ (2) struct epitem { struct rb_node rbn; /* RB tree node used to link this structure to the eventpoll RB tree */ struct list_head rdllink; /* 链表节点, 所有曾经ready的epitem都会被链到eventpoll的rdllist中 */ struct epitem *next; struct epoll_filefd ffd; /* The file descriptor information this item refers to */ int nwait; /* Number of active wait queue attached to poll operations */ struct list_head pwqlist; /* List containing poll wait queues */ struct eventpoll *ep; /* The "container" of this item */ struct list_head fllink; /* List header used to link this item to the "struct file" items list */ struct epoll_event event; /*以后的epitem关系哪些events, 这个数据是调用epoll_ctl时从用户态传递过去 */ }; (3) struct epoll_filefd {struct file *file;int fd;};(4) struct eppoll_entry { /* Wait structure used by the poll hooks */struct list_head llink; /* List header used to link this structure to the "struct epitem" */struct epitem *base; /* The "base" pointer is set to the container "struct epitem" */wait_queue_t wait; / Wait queue item that will be linked to the target file wait queue head. /wait_queue_head_t *whead;/The wait queue head that linked the "wait" wait queue item */};//注:后两项相当于期待队列(5) struct ep_pqueue {/* Wrapper struct used by poll queueing */poll_table pt; // struct poll_table是一个函数指针的包裹struct epitem *epi;};(6) struct ep_send_events_data { /* Used by the ep_send_events() function as callback private data */int maxevents;struct epoll_event __user *events;};
各个数据结构的关系如下图:
2. 函数调用剖析
epoll函数调用关系全局图:
【文章福利】:小编整顿了一些集体感觉比拟好的学习书籍、视频材料共享在群文件外面,有须要的能够自行添加哦!(须要自取)
3. 函数实现剖析
3.1 eventpoll_init
epoll是个module,所以先看看module的入口eventpoll_init
[fs/eventpoll.c-->evetpoll_init()](简化后)
static int __init eventpoll_init(void)
{ epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem), 0, SLAB_HWCACHE_ALIGN|EPI_SLAB_DEBUG|SLAB_PANIC, NULL, NULL); pwq_cache = kmem_cache_create("eventpoll_pwq", sizeof(struct eppoll_entry), 0, EPI_SLAB_DEBUG|SLAB_PANIC, NULL, NULL); //注册了一个新的文件系统,叫"eventpollfs" error = register_filesystem(&eventpoll_fs_type);eventpoll_mnt = kern_mount(&eventpoll_fs_type);; }
很乏味,这个module在初始化时注册了一个新的文件系统,叫"eventpollfs"(在eventpoll_fs_type构造里),而后挂载此文件系统。另外创立两个内核cache(在内核编程中,如果须要频繁调配小块内存,应该创立kmem_cahe来做“内存池”),别离用于寄存struct epitem和eppoll_entry。
当初想想epoll_create为什么会返回一个新的fd?因为它就是在这个叫做"eventpollfs"的文件系统里创立了一个新文件!如下:
3.2 sys_epoll_create
[fs/eventpoll.c-->sys_epoll_create()] asmlinkage long sys_epoll_create(int size) { int error, fd; struct inode *inode; struct file *file; error = ep_getfd(&fd, &inode, &file); /* Setup the file internal data structure ( "struct eventpoll" ) */ error = ep_file_init(file);}
函数很简略,其中ep_getfd看上去是“get”,其实在第一次调用epoll_create时,它是要创立新inode、新的file、新的fd。而ep_file_init则要创立一个struct eventpoll构造,并把它放入file->private_data,留神,这个private_data前面还要用到的。
3.3 epoll_ctl
epoll_create好了,该epoll_ctl了,咱们略去判断性的代码:[fs/eventpoll.c-->sys_epoll_ctl()] asmlinkage long sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user *event){ struct file *file, *tfile; struct eventpoll *ep; struct epitem *epi; struct epoll_event epds;.... epi = ep_find(ep, tfile, fd);//tfile寄存要监听的fd对应在rb-tree中的epitem switch (op) {//省略了判空解决 case EPOLL_CTL_ADD: epds.events |= POLLERR | POLLHUP; error = ep_insert(ep, &epds, tfile, fd); break; case EPOLL_CTL_DEL: error = ep_remove(ep, epi); break; case EPOLL_CTL_MOD: epds.events |= POLLERR | POLLHUP; error = ep_modify(ep, epi, &epds); break; }
原来就是在一个“大的构造”(struct eventpoll)里先ep_find,如果找到了struct epitem,而依据用户操作是ADD、DEL、MOD调用相应的函数,这些函数在epitem组成红黑树中减少、删除、批改相应节点(每一个监听fd对应一个节点)。很直白。那这个“大构造”是什么呢?看ep_find的调用形式,ep参数应该是指向这个“大构造”的指针,再看ep = file->private_data,咱们才明确,原来这个“大构造”就是那个在epoll_create时创立的struct eventpoll,具体再看看ep_find的实现,发现原来是struct eventpoll的rbr成员(struct rb_root),原来这是一个红黑树的根!而红黑树上挂的都是struct epitem。
当初分明了,一个新创建的epoll文件带有一个struct eventpoll构造,这个构造上再挂一个红黑树,而这个红黑树就是每次epoll_ctl时fd寄存的中央!
3.4 sys_epoll_wait
当初数据结构都曾经分明了,咱们来看最外围的:
[fs/eventpoll.c-->sys_epoll_wait()] asmlinkage long sys_epoll_wait(int epfd, struct epoll_event __user *events, int maxevents, int timeout) { struct file *file; struct eventpoll *ep; /* Get the "struct file *" for the eventpoll file */ file = fget(epfd);/* * We have to check that the file structure underneath the fd * the user passed to us _is_ an eventpoll file.(所以如果这里是一般的文件fd会出错) */ if (!IS_FILE_EPOLL(file)) goto eexit_2; ep = file->private_data; error = ep_poll(ep, events, maxevents, timeout);…… }
故伎重演,从file->private_data中拿到struct eventpoll,再调用ep_poll
3.5 ep_poll()
[fs/eventpoll.c-->sys_epoll_wait()->ep_poll()] static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout) { int res; wait_queue_t wait;//期待队列项 if (list_empty(&ep->rdllist)) { //ep->rdllist寄存的是已就绪(read)的fd,为空时阐明以后没有就绪的fd,所以须要将以后 init_waitqueue_entry(&wait, current);//创立一个期待队列项,并应用以后过程(current)初始化 add_wait_queue(&ep->wq, &wait);//将刚创立的期待队列项退出到ep中的期待队列(行将以后过程增加到期待队列) for (;;) { /*将过程状态设置为TASK_INTERRUPTIBLE,因为咱们不心愿这期间ep_poll_callback()发信号唤醒过程的时候,过程还在sleep */ set_current_state(TASK_INTERRUPTIBLE); if (!list_empty(&ep->rdllist) || !jtimeout)//如果ep->rdllist非空(即有就绪的fd)或工夫到则跳 出循环 break; if (signal_pending(current)) { res = -EINTR; break; } } remove_wait_queue(&ep->wq, &wait);//将期待队列项移出期待队列(将以后过程移出) set_current_state(TASK_RUNNING); }....
又是一个大循环,不过这个大循环比poll的那个好,因为认真一看——它竟然除了睡觉和判断ep->rdllist是否为空以外,啥也没做!什么也没做当然效率高了,但到底是谁来让ep->rdllist不为空呢?答案是ep_insert时设下的回调函数.
3.6 ep_insert()
[fs/eventpoll.c-->sys_epoll_ctl()-->ep_insert()]static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd){struct epitem *epi; struct ep_pqueue epq;// 创立ep_pqueue对象epi = EPI_MEM_ALLOC();//调配一个epitem/* 初始化这个epitem ... */ epi->ep = ep;//将创立的epitem增加到传进来的struct eventpoll/*后几行是设置epitem的相应字段*/ EP_SET_FFD(&epi->ffd, tfile, fd);//将要监听的fd退出到刚创立的epitem epi->event = *event; epi->nwait = 0;/* Initialize the poll table using the queue callback */epq.epi = epi; //将一个epq和新插入的epitem(epi)关联//上面一句等价于&(epq.pt)->qproc = ep_ptable_queue_proc;init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);revents = tfile->f_op->poll(tfile, &epq.pt); //tfile代表target file,即被监听的文件,poll()返回就绪事件的掩码,赋给revents.list_add_tail(&epi->fllink, &tfile->f_ep_links);// 每个文件会将所有监听本人的epitem链起来ep_rbtree_insert(ep, epi);// 都搞定后, 将epitem插入到对应的eventpoll中去……}
紧接着 tfile->f_op->poll(tfile, &epq.pt)其实就是调用被监控文件(epoll里叫“target file”)的poll办法,而这个poll其实就是调用poll_wait(还记得poll_wait吗?每个反对poll的设施驱动程序都要调用的),最初就是调用ep_ptable_queue_proc。(注:f_op->poll()一般来说只是个wrapper, 它会调用真正的poll实现, 拿UDP的socket来举例, 这里就是这样的调用流程: f_op->poll(), sock_poll(), udp_poll(), datagram_poll(), sock_poll_wait()。)这是比拟难解的一个调用关系,因为不是语言级的间接调用。ep_insert还把struct epitem放到struct file里的f_ep_links连表里,以不便查找,struct epitem里的fllink就是负担这个使命的。
3.7 ep_ptable_queue_proc
[fs/eventpoll.c-->ep_ptable_queue_proc()]static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt){ struct epitem *epi = EP_ITEM_FROM_EPQUEUE(pt); struct eppoll_entry *pwq;if (epi->nwait >= 0 && (pwq = PWQ_MEM_ALLOC())) { init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); pwq->whead = whead; pwq->base = epi; add_wait_queue(whead, &pwq->wait); list_add_tail(&pwq->llink, &epi->pwqlist); epi->nwait++; } else { /* We have to signal that an error occurred */ epi->nwait = -1; } }
下面的代码就是ep_insert中要做的最重要的事:创立struct eppoll_entry,设置其唤醒回调函数为ep_poll_callback,而后退出设施期待队列(留神这里的whead就是上一章所说的每个设施驱动都要带的期待队列)。只有这样,当设施就绪,唤醒期待队列上的期待过程时,ep_poll_callback就会被调用。每次调用poll零碎调用,操作系统都要把current(以后过程)挂到fd对应的所有设施的期待队列上,能够设想,fd多到上千的时候,这样“挂”法很麻烦;而每次调用epoll_wait则没有这么罗嗦,epoll只在epoll_ctl时把current挂一遍(这第一遍是免不了的)并给每个fd一个命令“好了就调回调函数”,如果设施有事件了,通过回调函数,会把fd放入rdllist,而每次调用epoll_wait就只是收集rdllist里的fd就能够了——epoll奇妙的利用回调函数,实现了更高效的事件驱动模型。
当初咱们猜也能猜出来ep_poll_callback会干什么了——必定是把红黑树(ep->rbr)上的收到event的epitem(代表每个fd)插入ep->rdllist中,这样,当epoll_wait返回时,rdllist里就都是就绪的fd了!
3.8 ep_poll_callback
[fs/eventpoll.c-->ep_poll_callback()]static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key){ int pwake = 0; struct epitem *epi = EP_ITEM_FROM_WAIT(wait); struct eventpoll *ep = epi->ep; /* If this file is already in the ready list we exit soon */ if (EP_IS_LINKED(&epi->rdllink)) goto is_linked; list_add_tail(&epi->rdllink, &ep->rdllist); is_linked: /* * Wake up ( if active ) both the eventpoll wait list and the ->poll() * wait list. */ if (waitqueue_active(&ep->wq)) wake_up(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; }
4. epoll独有的EPOLLET
EPOLLET是epoll零碎调用独有的flag,ET就是Edge Trigger(边缘触发)的意思,具体含意和利用大家可google之。有了EPOLLET,反复的事件就不会总是进去打搅程序的判断,故而常被应用。那EPOLLET的原理是什么呢?
上篇咱们讲到epoll把fd都挂上一个回调函数,当fd对应的设施有音讯时,回调函数就把fd放入rdllist链表,这样epoll_wait只有查看这个rdllist链表就能够晓得哪些fd有事件了。咱们看看ep_poll的最初几行代码:
4.1 ep_poll() (接3.5)
[fs/eventpoll.c->ep_poll()] /* Try to transfer events to user space. */ ep_events_transfer(ep, events, maxevents) ......
把rdllist里的fd拷到用户空间,这个工作是ep_events_transfer做的.
4.2 ep_events_transfer
[fs/eventpoll.c->ep_events_transfer()] static int ep_events_transfer(struct eventpoll *ep, struct epoll_event __user *events, int maxevents){int eventcnt = 0;struct list_head txlist;INIT_LIST_HEAD(&txlist);/* Collect/extract ready items */if (ep_collect_ready_items(ep, &txlist, maxevents) > 0) {/* Build result set in userspace */eventcnt = ep_send_events(ep, &txlist, events);/* Reinject ready items into the ready list */ep_reinject_items(ep, &txlist);}up_read(&ep->sem);return eventcnt;}
代码很少,其中ep_collect_ready_items把rdllist里的fd挪到txlist里(挪完后rdllist就空了),接着ep_send_events把txlist里的fd拷给用户空间,而后ep_reinject_items把一部分fd从txlist里“返还”给rdllist以便下次还能从rdllist里发现它。
其中ep_send_events的实现:
4.3 ep_send_events()
[fs/eventpoll.c->ep_send_events()]static int ep_send_events(struct eventpoll *ep, struct list_head *txlist,struct epoll_event __user *events){int eventcnt = 0;unsigned int revents;struct list_head *lnk;struct epitem *epi;list_for_each(lnk, txlist) {epi = list_entry(lnk, struct epitem, txlink);revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL);//调用每个监听文件的poll办法获取就绪事件(掩码),并赋值给reventsepi->revents = revents & epi->event.events;if (epi->revents) { if (__put_user(epi->revents, &events[eventcnt].events) || __put_user(epi->event.data, &events[eventcnt].data))//将event从内核空间发送到用户空间 return -EFAULT; if (epi->event.events & EPOLLONESHOT) epi->event.events &= EP_PRIVATE_BITS; eventcnt++; } } return eventcnt; }
这个拷贝实现其实没什么可看的,然而请留神红色的一行,这个poll很狡猾,它把第二个参数置为NULL来调用。咱们先看一下设施驱动通常是怎么实现poll的:
static unsigned int scull_p_poll(struct file *filp, poll_table *wait){struct scull_pipe *dev = filp->private_data;unsigned int mask = 0;poll_wait(filp, &dev->inq, wait);poll_wait(filp, &dev->outq, wait);if (dev->rp != dev->wp)mask |= POLLIN | POLLRDNORM; /* readable */if (spacefree(dev))mask |= POLLOUT | POLLWRNORM; /* writable */return mask;}
下面这段代码摘自《linux设施驱动程序(第三版)》,相对经典,设施先要把current(以后过程)挂在inq和outq两个队列上(这个“挂”操作是wait回调函数指针做的),而后等设施来唤醒,唤醒后就能通过mask拿到事件掩码了(留神那个mask参数,它就是负责拿事件掩码的)。那如果wait为NULL,poll_wait会做些什么呢?
4.4 poll_wait
[include/linux/poll.h->poll_wait] static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address,poll_table *p) { if (p && wait_address) p->qproc(filp, wait_address, p); }
喏,看见了,如果poll_table为空,什么也不做。咱们倒回ep_send_events,那句标红的poll,实际上就是“我不想休眠,我只想拿到事件掩码”的意思。而后再把拿到的事件掩码拷给用户空间。ep_send_events实现后,就轮到ep_reinject_items了。
4.5 p_reinject_items
[fs/eventpoll.c->ep_reinject_items]static void ep_reinject_items(struct eventpoll *ep, struct list_head *txlist){ int ricnt = 0, pwake = 0; unsigned long flags; struct epitem *epi; while (!list_empty(txlist)) {//遍历txlist(此时txlist寄存的是已就绪的epitem) epi = list_entry(txlist->next, struct epitem, txlink); EP_LIST_DEL(&epi->txlink);//将以后的epitem从txlist中删除 if (EP_RB_LINKED(&epi->rbn) && !(epi->event.events & EPOLLET) && (epi->revents & epi->event.events) && !EP_IS_LINKED(&epi->rdllink)) { list_add_tail(&epi->rdllink, &ep->rdllist);//将以后epitem重新加入ep->rdllist ricnt++;// ep->rdllist中epitem的个数(即从新退出就绪的epitem的个数) } } if (ricnt) {//如果ep->rdllist不空,从新唤醒等、期待队列的过程(current) if (waitqueue_active(&ep->wq)) wake_up(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; } ……}
ep_reinject_items把txlist里的一部分fd又放回rdllist,那么,是把哪一部分fd放回去呢?看下面那个判断——是那些“没有标上EPOLLET(即默认的LT)”(标红代码)且“事件被关注”(标蓝代码)的fd被从新放回了rdllist。那么下次epoll_wait当然会又把rdllist里的fd拿来拷给用户了。举个例子。假如一个socket,只是connect,还没有收发数据,那么它的poll事件掩码总是有POLLOUT的(参见下面的驱动示例),每次调用epoll_wait总是返回POLLOUT事件(比较烦),因为它的fd就总是被放回rdllist;如果此时有人往这个socket里写了一大堆数据,造成socket塞住(不可写了),那么标蓝色的判断就不成立了(没有POLLOUT了),fd不会放回rdllist,epoll_wait将不会再返回用户POLLOUT事件。当初咱们给这个socket加上EPOLLET,而后connect,没有收发数据,此时,标红的判断又不成立了,所以epoll_wait只会返回一次POLLOUT告诉给用户(因为此fd不会再回到rdllist了),接下来的epoll_wait都不会有任何事件告诉了。
总结:
epoll函数调用关系全局图:
注:上述函数关系图中有个问题,当ep_reinject_items()将LT的上次就绪的eptiem从新放回就绪链表,下次ep_poll()间接返回,这不就造成了一个循环了吗?什么时候这些LT的epitem才不再退出就绪链表呢?这个问题的解决在4.3——ep_send_events()中,留神这个函数中标红的那个poll调用,咱们剖析过当传入NULL时,poll仅仅是拿到事件掩码,所以如果之前用户对事件的解决导致的文件的revents(状态)扭转,那么这里就会失去更新。例如:用户以可读监听,当读完数据后文件的会变为不可读,这时ep_send_events()中获取的revents中将不再有可读事件,也就不满足ep_reinject_items()中的蓝色判断,所以epitem不再被退出就绪链表(ep->rdllist)。然而如果只读局部数据,并不会引起文件状态扭转(文件仍可读),所以仍会退出就绪链表告诉用户空间,这也就是如果是TL,就会始终告诉用户读事件,直到某些操作导致那个文件描述符不再为就绪状态了(比方,你在发送,接管或者接管申请,或者发送接管的数据少于一定量时导致了一个EWOULDBLOCK 谬误)。
将上述调用增加到函数调用关系图后,如下(增加的为蓝线):
epoll实现数据结构全局关系图: