Java网络编程和NIO详解6Linux-epoll实现原理详解

27次阅读

共计 19677 个字符,预计需要花费 50 分钟才能阅读完成。

微信公众号【黄小斜】作者是蚂蚁金服 JAVA 工程师,目前在蚂蚁财富负责后端开发工作,专注于 JAVA 后端技术栈,同时也懂点投资理财,坚持学习和写作,用大厂程序员的视角解读技术与互联网,我的世界里不只有 coding!关注公众号后回复”架构师“即可领取 Java 基础、进阶、项目和架构师等免费学习资料,更有数据库、分布式、微服务等热门技术学习视频,内容丰富,兼顾原理和实践,另外也将赠送作者原创的 Java 学习指南、Java 程序员面试指南等干货资源

Linux epoll 实现原理详解

在 linux 没有实现 epoll 事件驱动机制之前,我们一般选择用 select 或者 poll 等 IO 多路复用的方法来实现并发服务程序。在大数据、高并发、集群等一些名词唱得火热之年代,select 和 poll 的用武之地越来越有限,风头已经被 epoll 占尽。

本文便来介绍 epoll 的实现机制,并附带讲解一下 select 和 poll。通过对比其不同的实现机制,真正理解为何 epoll 能实现高并发。

这部分转自 https://jeff.wtf/2017/02/IO-m…

为什么要 I/O 多路复用

当需要从一个叫 r_fd 的描述符不停地读取数据,并把读到的数据写入一个叫 w_fd 的描述符时,我们可以用循环使用阻塞 I/O:

<pre>123</pre><pre>while((n = read(r_fd, buf, BUF_SIZE)) > 0) if(write(w_fd, buf, n) != n) err_sys(“write error”);</pre>

但是,如果要从两个地方读取数据呢?这时,不能再使用会把程序阻塞住的 read 函数。因为可能在阻塞地等待 r_fd1 的数据时,来不及处理 r_fd2,已经到达的 r_fd2 的数据可能会丢失掉。

这个情况下需要使用非阻塞 I/O。

只要做个标记,把文件描述符标记为非阻塞的,以后再对它使用 read 函数:如果它还没有数据可读,函数会立即返回并把 errorno 这个变量的值设置为 35,于是我们知道它没有数据可读,然后可以立马去对其他描述符使用 read;如果它有数据可读,我们就读取它数据。对所有要读的描述符都调用了一遍 read 之后,我们可以等一个较长的时间(比如几秒),然后再从第一个文件描述符开始调用 read。这种循环就叫做轮询(polling)。

这样,不会像使用阻塞 I/O 时那样因为一个描述符 read 长时间处于等待数据而使程序阻塞。

轮询的缺点是浪费太多 CPU 时间。大多数时候我们没有数据可读,但是还是用了 read 这个系统调用,使用系统调用时会从用户态切换到内核态。而大多数情况下我们调用 read,然后陷入内核态,内核发现这个描述符没有准备好,然后切换回用户态并且只得到 EAGAIN(errorno 被设置为 35),做的是无用功。描述符非常多的时候,每次的切换过程就是巨大的浪费。

所以,需要 I/O 多路复用。I/O 多路复用通过使用一个系统函数,同时等待多个描述符的可读、可写状态。

为了达到这个目的,我们需要做的是:建立一个描述符列表,以及我们分别关心它们的什么事件(可读还是可写还是发生例外情况);调用一个系统函数,直到这个描述符列表里有至少一个描述符关联的事件发生时,这个函数才会返回。

select, poll, epoll 就是这样的系统函数。

select,poll,epoll 源码分析

[](https://jeff.wtf/2017/02/IO-m… “select”)select

我们可以在所有 POSIX 兼容的系统里使用 select 函数来进行 I/O 多路复用。我们需要通过 select 函数的参数传递给内核的信息有:

  • 我们关心哪些描述符
  • 我们关心它们的什么事件
  • 我们希望等待多长时间

select 的返回时,内核会告诉我们:

  • 可读的描述符的个数
  • 哪些描述符发生了哪些事件
<pre>123456</pre><pre>#include <sys/select.h>int select(int maxfdp1, fd_set readfds, fd_set writefds, fd_set exceptfds, struct timeval timeout);// 返回值: 已就绪的描述符的个数。超时时为 0,错误时为 -1</pre>

maxfdp1 意思是“max file descriptor plus 1”,就是把你要监视的所有文件描述符里最大的那个加上 1。(它实际上决定了内核要遍历文件描述符的次数,比如你监视了文件描述符 5 和 20 并把 maxfdp1 设置为 21,内核每次都会从描述符 0 依次检查到 20。)

中间的三个参数是你想监视的文件描述符的集合。可以把 fd_set 类型视为 1024 位的二进制数,这意味着 select 只能监视小于 1024 的文件描述符(1024 是由 Linux 的 sys/select.h 里 FD_SETSIZE 宏设置的值)。在 select 返回后我们通过 FD_ISSET 来判断代表该位的描述符是否是已准备好的状态。

最后一个参数是等待超时的时长:到达这个时长但是没有任一描述符可用时,函数会返回 0。

用一个代码片段来展示 select 的用法:

<pre>12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455</pre><pre>// 这个例子要监控文件描述符 3, 4 的可读状态,以及 4, 5 的可写状态 // 初始化两个 fd_set 以及 timevalfd_set read_set, write_set;FD_ZERO(read_set);FD_ZERO(write_set);timeval t;t.tv_sec = 5; // 超时为 5 秒 t.tv_usec = 0; // 加 0 微秒 // 设置好两个 fd_setint fd1 = 3;int fd2 = 4;int fd3 = 5;int maxfdp1 = 5 + 1;FD_SET(fd1, &read_set);FD_SET(fd2, &read_set);FD_SET(fd2, &write_set);FD_SET(fd3, &write_set);// 准备备用的 fd_setfd_set r_temp = read_set;fd_set w_temp = write_set;while(true){// 每次都要重新设置放入 select 的 fd_set read_set = r_temp; write_set = w_temp; // 使用 select int n = select(maxfdp1, &read_set, &write_set, NULL, &t); // 上面的 select 函数会一直阻塞,直到 // 3, 4 可读以及 4, 5 可写这四件事中至少一项发生 // 或者等待时间到达 5 秒,返回 0 for(int i=0; i<maxfdp1 && n>0; i++){if(FD_ISSET(i, &read_set)){n–; if(i==fd1) prinf(“ 描述符 3 可读 ”); if(i==fd2) prinf(“ 描述符 4 可读 ”); } if(FD_ISSET(i, &write_set)){n–; if(i==fd2) prinf(“ 描述符 3 可写 ”); if(i==fd3) prinf(“ 描述符 4 可写 ”); } } // 上面的 printf 语句换成对应的 read 或者 write 函数就 // 可以立即读取或者写入相应的描述符而不用等待}</pre>

可以看到,select 的缺点有:

  • 默认能监视的文件描述符不能大于 1024,也代表监视的总数不超过 1024。即使你因为需要监视的描述符大于 1024 而改动内核的 FD_SETSIZE 值,但由于 select 是每次都会线性扫描整个 fd_set,集合越大速度越慢,所以性能会比较差。
  • select 函数返回时只能看见已准备好的描述符数量,至于是哪个描述符准备好了需要循环用 FD_ISSET 来检查,当未准备好的描述符很多而准备好的很少时,效率比较低。
  • select 函数每次执行的时候,都把参数里传入的三个 fd_set 从用户空间复制到内核空间。而每次 fd_set 里要监视的描述符变化不大时,全部重新复制一遍并不划算。同样在每次都是未准备好的描述符很多而准备好的很少时,调用 select 会很频繁,用户 / 内核间的的数据复制就成了一个大的开销。

还有一个问题是在代码的写法上给我一些困扰的,就是每次调用 select 前必须重新设置三个 fd_set。fd_set 类型只是 1024 位的二进制数(实际上结构体里是几个 long 变量的数组;比如 64 位机器上 long 是 64 bit,那么 fd_set 里就是 16 个 long 变量的数组),由一位的 1 和 0 代表一个文件描述符的状态,但是其实调用 select 前后位的 1/0 状态意义是不一样的。

先讲一下几个对 fd_set 操作的函数的作用:FD_ZERO 把 fd_set 所有位设置为 0;FD_SET 把一个位设置为 1;FD_ISSET 判断一个位是否为 1。

调用 select 前:我们用 FD_ZERO 把 fd_set 先全部初始化,然后用 FD_SET 把我们关心的代表描述符的位设置为 1。我们这时可以用 FD_ISSET 判断这个位是否被我们设置,这时的含义是我们想要监视的描述符是否被设置为被监视的状态。

调用 select 时:内核判断 fd_set 里的位并把各个 fd_set 里所有值为 1 的位记录下来,然后把 fd_set 全部设置成 0;一个描述符上有对应的事件发生时,把对应 fd_set 里代表这个描述符的位设置为 1。

在 select 返回之后:我们同样用 FD_ISSET 判断各个我们关心的位是 0 还是 1,这时的含义是,这个位是否是发生了我们关心的事件。

所以,在下一次调用 select 前,我们不得不把已经被内核改掉的 fd_set 全部重新设置一下。

select 在监视大量描述符尤其是更多的描述符未准备好的情况时性能很差。《Unix 高级编程》里写,用 select 的程序通常只使用 3 到 10 个描述符。

[](https://jeff.wtf/2017/02/IO-m… “poll”)poll

poll 和 select 是相似的,只是给的接口不同。

<pre>1234</pre><pre>#include <poll.h>int poll(struct pollfd fdarray[], nfds_t nfds, int timeout);// 返回值: 已就绪的描述符的个数。超时时为 0,错误时为 -1</pre>

fdarray 是 pollfd 的数组。pollfd 结构体是这样的:

<pre>12345</pre><pre>struct pollfd {int fd; // 文件描述符 short events; // 我期待的事件 short revents; // 实际发生的事件:我期待的事件中发生的;或者异常情况};</pre>

nfds 是 fdarray 的长度,也就是 pollfd 的个数。

timeout 代表等待超时的毫秒数。

相比 select,poll 有这些优点:由于 poll 在 pollfd 里用 int fd 来表示文件描述符而不像 select 里用的 fd_set 来分别表示描述符,所以没有必须小于 1024 的限制,也没有数量限制;由于 poll 用 events 表示期待的事件,通过修改 revents 来表示发生的事件,所以不需要像 select 在每次调用前重新设置描述符和期待的事件。

除此之外,poll 和 select 几乎相同。在 poll 返回后,需要遍历 fdarray 来检查各个 pollfd 里的 revents 是否发生了期待的事件;每次调用 poll 时,把 fdarray 复制到内核空间。在描述符太多而每次准备好的较少时,poll 有同样的性能问题。

[](https://jeff.wtf/2017/02/IO-m… “epoll”)epoll

epoll 是在 Linux 2.5.44 中首度登场的。不像 select 和 poll,它提供了三个系统函数而不是一个。

[](https://jeff.wtf/2017/02/IO-m… “epoll_create 用来创建一个 epoll 描述符:”)epoll_create 用来创建一个 epoll 描述符:

<pre>1234</pre><pre>#include <sys/epoll.h>int epoll_create(int size);// 返回值:epoll 描述符 </pre>

size 用来告诉内核你想监视的文件描述符的数目,但是它并不是限制了能监视的描述符的最大个数,而是给内核最初分配的空间一个建议。然后系统会在内核中分配一个空间来存放事件表,并返回一个 epoll 描述符,用来操作这个事件表。

[](https://jeff.wtf/2017/02/IO-m… “epoll_ctl 用来增 / 删 / 改内核中的事件表:”)epoll_ctl 用来增 / 删 / 改内核中的事件表:

<pre>123</pre><pre>int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);// 返回值:成功时返回 0,失败时返回 -1</pre>

epfd 是 epoll 描述符。

op 是操作类型(增加 / 删除 / 修改)。

fd 是希望监视的文件描述符。

event 是一个 epoll_event 结构体的指针。epoll_event 的定义是这样的:

<pre>1234567891011</pre><pre>typedef union epoll_data {void *ptr; int fd; uint32_t u32; uint64_t u64;} epoll_data_t;struct epoll_event {uint32_t events; // 我期待的事件 epoll_data_t data; // 用户数据变量};</pre>

这个结构体里,除了期待的事件外,还有一个 data,是一个 union,它是用来让我们在得到下面第三个函数的返回值以后方便的定位文件描述符的。

[](https://jeff.wtf/2017/02/IO-m… “epoll_wait 用来等待事件 ”)epoll_wait 用来等待事件

<pre>1234</pre><pre>int epoll_wait(int epfd, struct epoll_event *result_events, int maxevents, int timeout);// 返回值:已就绪的描述符个数。超时时为 0,错误时为 -1</pre>

epfd 是 epoll 描述符。

result_events 是 epoll_event 结构体的指针,它将指向的是所有已经准备好的事件描述符相关联的 epoll_event(在上个步骤里调用 epoll_ctl 时关联起来的)。下面的例子可以让你知道这个参数的意义。

maxevents 是返回的最大事件个数,也就是你能通过 result_events 指针遍历到的最大的次数。

timeout 是等待超时的毫秒数。

用一个代码片段来展示 epoll 的用法:

<pre>123456789101112131415161718192021222324252627282930313233343536373839404142434445</pre><pre>// 这个例子要监控文件描述符 3, 4 的可读状态,以及 4, 5 的可写状态 / 通过 epoll_create 创建 epoll 描述符 /int epfd = epoll_create(4);int fd1 = 3;int fd2 = 4;int fd3 = 5;/ 通过 epoll_ctl 注册好四个事件 /struct epoll_event ev1;ev1.events = EPOLLIN; // 期待它的可读事件发生 ev1.data = fd1; // 我们通常就把 data 设置为 fd,方便以后查看 epoll_ctl(epfd, EPOLL_CTL_ADD, fd1, &ev1); // 添加到事件表 struct epoll_event ev2;ev2.events = EPOLLIN;ev2.data = fd2;epoll_ctl(epfd, EPOLL_CTL_ADD, fd2, &ev2);struct epoll_event ev3;ev3.events = EPOLLOUT; // 期待它的可写事件发生 ev3.data = fd2;epoll_ctl(epfd, EPOLL_CTL_ADD, fd2, &ev3);struct epoll_event ev4;ev4.events = EPOLLOUT;ev4.data = fd3;epoll_ctl(epfd, EPOLL_CTL_ADD, fd3, &ev4);/ 通过 epoll_wait 等待事件 /# DEFINE MAXEVENTS 4struct epoll_event result_events[MAXEVENTS];while(true){int n = epoll_wait(epfd, &result_events, MAXEVENTS, 5000); for(int i=0; i<n; n–){// result_events[i] 一定是 ev1 到 ev4 中的一个 if(result_events[i].events&EPOLLIN) printf(“ 描述符 %d 可读 ”, result_events[i].fd); else if(result_events[i].events&EPOLLOUT) printf(“ 描述符 %d 可写 ”, result_events[i].fd) }}</pre>

所以 epoll 解决了 poll 和 select 的问题:

  • 只在 epoll_ctl 的时候把数据复制到内核空间,这保证了每个描述符和事件一定只会被复制到内核空间一次;每次调用 epoll_wait 都不会复制新数据到内核空间。相比之下,select 每次调用都会把三个 fd_set 复制一遍;poll 每次调用都会把 fdarray 复制一遍。
  • epoll_wait 返回 n,那么只需要做 n 次循环,可以保证遍历的每一次都是有意义的。相比之下,select 需要做至少 n 次至多 maxfdp1 次循环;poll 需要遍历完 fdarray 即做 nfds 次循环。
  • 在内部实现上,epoll 使用了回调的方法。调用 epoll_ctl 时,就是注册了一个事件:在集合中放入文件描述符以及事件数据,并且加上一个回调函数。一旦文件描述符上的对应事件发生,就会调用回调函数,这个函数会把这个文件描述符加入到就绪队列上。当你调用 epoll_wait 时,它只是在查看就绪队列上是否有内容,有的话就返回给你的程序。select() poll()`epoll_wait()` 三个函数在操作系统看来,都是睡眠一会儿然后判断一会儿的循环,但是 select 和 poll 在醒着的时候要遍历整个文件描述符集合,而 epoll_wait 只是看看就绪队列是否为空而已。这是 epoll 高性能的理由,使得其 I/O 的效率不会像使用轮询的 select/poll 随着描述符增加而大大降低。

注 1:select/poll/epoll_wait 三个函数的等待超时时间都有一样的特性:等待时间设置为 0 时函数不阻塞而是立即返回,不论是否有文件描述符已准备好;poll/epoll_wait 中的 timeout 为 -1,select 中的 timeout 为 NULL 时,则无限等待,直到有描述符已准备好才会返回。

注 2:有的新手会把文件描述符是否标记为阻塞 I/O 等同于 I/O 多路复用函数是否阻塞。其实文件描述符是否标记为阻塞,决定了你 read 或 write 它时如果它未准备好是阻塞等待,还是立即返回 EAGAIN;而 I/O 多路复用函数除非你把 timeout 设置为 0,否则它总是会阻塞住你的程序。

注 3:上面的例子只是入门,可能是不准确或不全面的:一是数据要立即处理防止丢失;二是 EPOLLIN/EPOLLOUT 不完全等同于可读可写事件,具体要去搜索 poll/epoll 的事件具体有哪些;三是大多数实际例子里,比如一个 tcp server,都会在运行中不断增加 / 删除的文件描述符而不是记住固定的 3 4 5 几个描述符(用这种例子更能看出 epoll 的优势);四是 epoll 的优势更多的体现在处理大量闲连接的情况,如果场景是处理少量短连接,用 select 反而更好,而且用 select 的代码能运行在所有平台上。

Epoll 数据结构:

select()和 poll() IO 多路复用模型

select 的缺点:

  1. 单个进程能够监视的文件描述符的数量存在最大限制,通常是 1024,当然可以更改数量,但由于 select 采用轮询的方式扫描文件描述符,文件描述符数量越多,性能越差;(在 linux 内核头文件中,有这样的定义:#define __FD_SETSIZE    1024)
  2. 内核 / 用户空间内存拷贝问题,select 需要复制大量的句柄数据结构,产生巨大的开销;
  3. select 返回的是含有整个句柄的数组,应用程序需要遍历整个数组才能发现哪些句柄发生了事件;
  4. select 的触发方式是水平触发,应用程序如果没有完成对一个已经就绪的文件描述符进行 IO 操作,那么之后每次 select 调用还是会将这些文件描述符通知进程。

相比 select 模型,poll 使用链表保存文件描述符,因此没有了监视文件数量的限制,但其他三个缺点依然存在。

拿 select 模型为例,假设我们的服务器需要支持 100 万的并发连接,则在__FD_SETSIZE 为 1024 的情况下,则我们至少需要开辟 1k 个进程才能实现 100 万的并发连接。除了进程间上下文切换的时间消耗外,从内核 / 用户空间大量的无脑内存拷贝、数组轮询等,是系统难以承受的。因此,基于 select 模型的服务器程序,要达到 10 万级别的并发访问,是一个很难完成的任务。

因此,该 epoll 上场了。

epoll IO 多路复用模型实现机制

由于 epoll 的实现机制与 select/poll 机制完全不同,上面所说的 select 的缺点在 epoll 上不复存在。

设想一下如下场景:有 100 万个客户端同时与一个服务器进程保持着 TCP 连接。而每一时刻,通常只有几百上千个 TCP 连接是活跃的(事实上大部分场景都是这种情况)。如何实现这样的高并发?

在 select/poll 时代,服务器进程每次都把这 100 万个连接告诉操作系统(从用户态复制句柄数据结构到内核态),让操作系统内核去查询这些套接字上是否有事件发生,轮询完后,再将句柄数据复制到用户态,让服务器应用程序轮询处理已发生的网络事件,这一过程资源消耗较大,因此,select/poll 一般只能处理几千的并发连接。

epoll 的设计和实现与 select 完全不同。epoll 通过在 Linux 内核中申请一个简易的文件系统(文件系统一般用什么数据结构实现?B+ 树)。把原先的 select/poll 调用分成了 3 个部分:

1)调用 epoll_create()建立一个 epoll 对象(在 epoll 文件系统中为这个句柄对象分配资源)

2)调用 epoll_ctl 向 epoll 对象中添加这 100 万个连接的套接字

3)调用 epoll_wait 收集发生的事件的连接

如此一来,要实现上面说是的场景,只需要在进程启动时建立一个 epoll 对象,然后在需要的时候向这个 epoll 对象中添加或者删除连接。同时,epoll_wait 的效率也非常高,因为调用 epoll_wait 时,并没有一股脑的向操作系统复制这 100 万个连接的句柄数据,内核也不需要去遍历全部的连接。

下面来看看 Linux 内核具体的 epoll 机制实现思路。

当某一进程调用 epoll_create 方法时,Linux 内核会创建一个 eventpoll 结构体,这个结构体中有两个成员与 epoll 的使用方式密切相关。eventpoll 结构体如下所示:

 view plain copy

  1. struct eventpoll{
  2.     ….  
  3.     /红黑树的根节点,这颗树中存储着所有添加到 epoll 中的需要监控的事件/  
  4.     struct rb_root  rbr;  
  5.     /双链表中则存放着将要通过 epoll_wait 返回给用户的满足条件的事件/  
  6.     struct list_head rdlist;  
  7.     ….  
  8. };  

每一个 epoll 对象都有一个独立的 eventpoll 结构体,用于存放通过 epoll_ctl 方法向 epoll 对象中添加进来的事件。这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是 lgn,其中 n 为树的高度)。

而所有添加到 epoll 中的事件都会与设备 (网卡) 驱动程序建立回调关系,也就是说,当相应的事件发生时会调用这个回调方法。这个回调方法在内核中叫 ep_poll_callback, 它会将发生的事件添加到 rdlist 双链表中。

在 epoll 中,对于每一个事件,都会建立一个 epitem 结构体,如下所示:

 view plain copy

  1. struct epitem{
  2.     struct rb_node  rbn;// 红黑树节点  
  3.     struct list_head    rdllink;// 双向链表节点  
  4.     struct epoll_filefd  ffd;  // 事件句柄信息  
  5.     struct eventpoll *ep;    // 指向其所属的 eventpoll 对象  
  6.     struct epoll_event event; // 期待发生的事件类型  
  7. }  

当调用 epoll_wait 检查是否有事件发生时,只需要检查 eventpoll 对象中的 rdlist 双链表中是否有 epitem 元素即可。如果 rdlist 不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户。

epoll 数据结构示意图

从上面的讲解可知:通过红黑树和双链表数据结构,并结合回调机制,造就了 epoll 的高效。

OK,讲解完了 Epoll 的机理,我们便能很容易掌握 epoll 的用法了。一句话描述就是:三步曲。

第一步:epoll_create()系统调用。此调用返回一个句柄,之后所有的使用都依靠这个句柄来标识。

第二步:epoll_ctl()系统调用。通过此调用向 epoll 对象中添加、删除、修改感兴趣的事件,返回 0 标识成功,返回 - 1 表示失败。

第三部:epoll_wait()系统调用。通过此调用收集收集在 epoll 监控中已经发生的事件。

epoll 实例

最后,附上一个 epoll 编程实例。

 

几乎所有的 epoll 程序都使用下面的框架:

 view plaincopyprint?

  1. for(; ;)  
  2.    {
  3.        nfds = epoll_wait(epfd,events,20,500);  
  4.        for(i=0;i<nfds;++i)  < span=”” >
  5.        {
  6.            if(events[i].data.fd==listenfd) // 有新的连接  
  7.            {
  8.                connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); //accept 这个连接  
  9.                ev.data.fd=connfd;  
  10.                ev.events=EPOLLIN|EPOLLET;  
  11.                epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); // 将新的 fd 添加到 epoll 的监听队列中  
  12.            }  
  13.   
  14.            else if(events[i].events&EPOLLIN ) // 接收到数据,读 socket  
  15.            {
  16.                n = read(sockfd, line, MAXLINE)) < 0    // 读  
  17.                ev.data.ptr = md;     //md 为自定义类型,添加数据  
  18.                ev.events=EPOLLOUT|EPOLLET;  
  19.                epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);// 修改标识符,等待下一个循环时发送数据,异步处理的精髓  
  20.            }  
  21.            else if(events[i].events&EPOLLOUT) // 有数据待发送,写 socket  
  22.            {
  23.                struct myepoll_data md = (myepoll_data)events[i].data.ptr;    // 取数据  
  24.                sockfd = md->fd;  
  25.                send(sockfd, md->ptr, strlen((char*)md->ptr), 0 );        // 发送数据  
  26.                ev.data.fd=sockfd;  
  27.                ev.events=EPOLLIN|EPOLLET;  
  28.                epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); // 修改标识符,等待下一个循环时接收数据  
  29.            }  
  30.            else  
  31.            {
  32.                // 其他的处理  
  33.            }  
  34.        }  
  35.    }  

epoll 的程序实例

 view plaincopyprint?

  1.  #include   
  2. include   

  3. include   

  4. include   

  5. include   

  6. include   

  7. include   

  8. include   

  9. include   

  10.   
  11. define MAXEVENTS 64  

  12.   
  13. // 函数:  
  14. // 功能: 创建和绑定一个 TCP socket  
  15. // 参数: 端口  
  16. // 返回值: 创建的 socket  
  17. static int  
  18. create_and_bind (char *port)  
  19. {
  20.   struct addrinfo hints;  
  21.   struct addrinfo result, rp;  
  22.   int s, sfd;  
  23.   
  24.   memset (&hints, 0, sizeof (struct addrinfo));  
  25.   hints.ai_family = AF_UNSPEC;     / Return IPv4 and IPv6 choices /  
  26.   hints.ai_socktype = SOCK_STREAM; / We want a TCP socket /  
  27.   hints.ai_flags = AI_PASSIVE;     / All interfaces /  
  28.   
  29.   s = getaddrinfo (NULL, port, &hints, &result);  
  30.   if (s != 0)  
  31.     {
  32.       fprintf (stderr, “getaddrinfo: %sn”, gai_strerror (s));  
  33.       return -1;  
  34.     }  
  35.   
  36.   for (rp = result; rp != NULL; rp = rp->ai_next)  
  37.     {
  38.       sfd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol);  
  39.       if (sfd == -1)  
  40.         continue;  
  41.   
  42.       s = bind (sfd, rp->ai_addr, rp->ai_addrlen);  
  43.       if (s == 0)  
  44.         {
  45.           / We managed to bind successfully! /  
  46.           break;  
  47.         }  
  48.   
  49.       close (sfd);  
  50.     }  
  51.   
  52.   if (rp == NULL)  
  53.     {
  54.       fprintf (stderr, “Could not bindn”);  
  55.       return -1;  
  56.     }  
  57.   
  58.   freeaddrinfo (result);  
  59.   
  60.   return sfd;  
  61. }  
  62.   
  63.   
  64. // 函数  
  65. // 功能: 设置 socket 为非阻塞的  
  66. static int  
  67. make_socket_non_blocking (int sfd)  
  68. {
  69.   int flags, s;  
  70.   
  71.   // 得到文件状态标志  
  72.   flags = fcntl (sfd, F_GETFL, 0);  
  73.   if (flags == -1)  
  74.     {
  75.       perror (“fcntl”);  
  76.       return -1;  
  77.     }  
  78.   
  79.   // 设置文件状态标志  
  80.   flags |= O_NONBLOCK;  
  81.   s = fcntl (sfd, F_SETFL, flags);  
  82.   if (s == -1)  
  83.     {
  84.       perror (“fcntl”);  
  85.       return -1;  
  86.     }  
  87.   
  88.   return 0;  
  89. }  
  90.   
  91. // 端口由参数 argv[1]指定  
  92. int  
  93. main (int argc, char *argv[])  
  94. {
  95.   int sfd, s;  
  96.   int efd;  
  97.   struct epoll_event event;  
  98.   struct epoll_event *events;  
  99.   
  100.   if (argc != 2)  
  101.     {
  102.       fprintf (stderr, “Usage: %s [port]n”, argv[0]);  
  103.       exit (EXIT_FAILURE);  
  104.     }  
  105.   
  106.   sfd = create_and_bind (argv[1]);  
  107.   if (sfd == -1)  
  108.     abort ();  
  109.   
  110.   s = make_socket_non_blocking (sfd);  
  111.   if (s == -1)  
  112.     abort ();  
  113.   
  114.   s = listen (sfd, SOMAXCONN);  
  115.   if (s == -1)  
  116.     {
  117.       perror (“listen”);  
  118.       abort ();  
  119.     }  
  120.   
  121.   // 除了参数 size 被忽略外, 此函数和 epoll_create 完全相同  
  122.   efd = epoll_create1 (0);  
  123.   if (efd == -1)  
  124.     {
  125.       perror (“epoll_create”);  
  126.       abort ();  
  127.     }  
  128.   
  129.   event.data.fd = sfd;  
  130.   event.events = EPOLLIN | EPOLLET;// 读入, 边缘触发方式  
  131.   s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);  
  132.   if (s == -1)  
  133.     {
  134.       perror (“epoll_ctl”);  
  135.       abort ();  
  136.     }  
  137.   
  138.   / Buffer where events are returned /  
  139.   events = calloc (MAXEVENTS, sizeof event);  
  140.   
  141.   / The event loop /  
  142.   while (1)  
  143.     {
  144.       int n, i;  
  145.   
  146.       n = epoll_wait (efd, events, MAXEVENTS, -1);  
  147.       for (i = 0; i < n; i++)  
  148.         {
  149.           if ((events[i].events & EPOLLERR) ||  
  150.               (events[i].events & EPOLLHUP) ||  
  151.               (!(events[i].events & EPOLLIN)))  
  152.             {
  153.               /* An error has occured on this fd, or the socket is not 
  154.                  ready for reading (why were we notified then?) */  
  155.               fprintf (stderr, “epoll errorn”);  
  156.               close (events[i].data.fd);  
  157.               continue;  
  158.             }  
  159.   
  160.           else if (sfd == events[i].data.fd)  
  161.             {
  162.               /* We have a notification on the listening socket, which 
  163.                  means one or more incoming connections. */  
  164.               while (1)  
  165.                 {
  166.                   struct sockaddr in_addr;  
  167.                   socklen_t in_len;  
  168.                   int infd;  
  169.                   char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];  
  170.   
  171.                   in_len = sizeof in_addr;  
  172.                   infd = accept (sfd, &in_addr, &in_len);  
  173.                   if (infd == -1)  
  174.                     {
  175.                       if ((errno == EAGAIN) ||  
  176.                           (errno == EWOULDBLOCK))  
  177.                         {
  178.                           /* We have processed all incoming 
  179.                              connections. */  
  180.                           break;  
  181.                         }  
  182.                       else  
  183.                         {
  184.                           perror (“accept”);  
  185.                           break;  
  186.                         }  
  187.                     }  
  188.   
  189.                                   // 将地址转化为主机名或者服务名  
  190.                   s = getnameinfo (&in_addr, in_len,  
  191.                                    hbuf, sizeof hbuf,  
  192.                                    sbuf, sizeof sbuf,  
  193.                                    NI_NUMERICHOST | NI_NUMERICSERV);//flag 参数: 以数字名返回  
  194.                                   // 主机地址和服务地址  
  195.   
  196.                   if (s == 0)  
  197.                     {
  198.                       printf(“Accepted connection on descriptor %d ”  
  199.                              “(host=%s, port=%s)n”, infd, hbuf, sbuf);  
  200.                     }  
  201.   
  202.                   /* Make the incoming socket non-blocking and add it to the 
  203.                      list of fds to monitor. */  
  204.                   s = make_socket_non_blocking (infd);  
  205.                   if (s == -1)  
  206.                     abort ();  
  207.   
  208.                   event.data.fd = infd;  
  209.                   event.events = EPOLLIN | EPOLLET;  
  210.                   s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event);  
  211.                   if (s == -1)  
  212.                     {
  213.                       perror (“epoll_ctl”);  
  214.                       abort ();  
  215.                     }  
  216.                 }  
  217.               continue;  
  218.             }  
  219.           else  
  220.             {
  221.               /* We have data on the fd waiting to be read. Read and 
  222.                  display it. We must read whatever data is available 
  223.                  completely, as we are running in edge-triggered mode 
  224.                  and won’t get a notification again for the same 
  225.                  data. */  
  226.               int done = 0;  
  227.   
  228.               while (1)  
  229.                 {
  230.                   ssize_t count;  
  231.                   char buf[512];  
  232.   
  233.                   count = read (events[i].data.fd, buf, sizeof(buf));  
  234.                   if (count == -1)  
  235.                     {
  236.                       /* If errno == EAGAIN, that means we have read all 
  237.                          data. So go back to the main loop. */  
  238.                       if (errno != EAGAIN)  
  239.                         {
  240.                           perror (“read”);  
  241.                           done = 1;  
  242.                         }  
  243.                       break;  
  244.                     }  
  245.                   else if (count == 0)  
  246.                     {
  247.                       /* End of file. The remote has closed the 
  248.                          connection. */  
  249.                       done = 1;  
  250.                       break;  
  251.                     }  
  252.   
  253.                   / Write the buffer to standard output /  
  254.                   s = write (1, buf, count);  
  255.                   if (s == -1)  
  256.                     {
  257.                       perror (“write”);  
  258.                       abort ();  
  259.                     }  
  260.                 }  
  261.   
  262.               if (done)  
  263.                 {
  264.                   printf (“Closed connection on descriptor %dn”,  
  265.                           events[i].data.fd);  
  266.   
  267.                   /* Closing the descriptor will make epoll remove it 
  268.                      from the set of descriptors which are monitored. */  
  269.                   close (events[i].data.fd);  
  270.                 }  
  271.             }  
  272.         }  
  273.     }  
  274.   
  275.   free (events);  
  276.   
  277.   close (sfd);  
  278.   
  279.   return EXIT_SUCCESS;  
  280. }  

微信公众号【Java 技术江湖】一位阿里 Java 工程师的技术小站。(关注公众号后回复”Java“即可领取 Java 基础、进阶、项目和架构师等免费学习资料,更有数据库、分布式、微服务等热门技术学习视频,内容丰富,兼顾原理和实践,另外也将赠送作者原创的 Java 学习指南、Java 程序员面试指南等干货资源)

正文完
 0