共计 16500 个字符,预计需要花费 42 分钟才能阅读完成。
运营研发团队 谭淼
一、nginx 模块介绍
高并发是 nginx 最大的优势之一,而高并发的原因就是 nginx 强大的事件模块。本文将重点介绍 nginx 是如果利用 Linux 系统的 epoll 来完成高并发的。
首先介绍 nginx 的模块,nginx1.15.5 源码中,自带的模块主要分为 core 模块、conf 模块、event 模块、http 模块和 mail 模块五大类。其中 mail 模块比较特殊,本文暂不讨论。
查看 nginx 模块属于哪一类也很简单,对于每一个模块,都有一个 ngx_module_t 类型的结构体,该结构体的 type 字段就是标明该模块是属于哪一类模块的。以 ngx_http_module 为例:
ngx_module_t ngx_http_module = {
NGX_MODULE_V1,
&ngx_http_module_ctx, /* module context */
ngx_http_commands, /* module directives */
NGX_CORE_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
可以 ngx_core_module 是属于 NGX_CORE_MODULE 类型的模块。
由于本文主要介绍使用 epoll 来完成 nginx 的事件驱动,故主要介绍 core 模块的 ngx_events_module 与 event 模块的 ngx_event_core_module、ngx_epoll_module。
二、epoll 介绍
2.1、epoll 原理
关于 epoll 的实现原理,本文不会具体介绍,这里只是介绍 epoll 的工作流程。具体的实现参考:https://titenwang.github.io/2…
epoll 的使用是三个函数:
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
首先 epoll_create 函数会在内核中创建一块独立的内存存储一个 eventpoll 结构体,该结构体包括一颗红黑树和一个链表,如下图所示:
然后通过 epoll_ctl 函数,可以完成两件事。
(1)将事件添加到红黑树中,这样可以防止重复添加事件;
(2)将事件与网卡建立回调关系,当事件发生时,网卡驱动会回调 ep_poll_callback 函数,将事件添加到 epoll_create 创建的链表中。
最后,通过 epoll_wait 函数,检查并返回链表中是否有事件。该函数是阻塞函数,阻塞时间为 timeout,当双向链表有事件或者超时的时候就会返回链表长度(发生事件的数量)。
2.2、epoll 相关函数的参数
(1)epoll_create 函数的参数 size 表示该红黑树的大致数量,实际上很多操作系统没有使用这个参数。
(2)epoll_ctl 函数的参数为 epfd,op,fd 和 event。
(3)epoll_wait 函数的参数为 epfd,events,maxevents 和 timeout
三、事件模块的初始化
众所周知,nginx 是 master/worker 框架,在 nginx 启动时是一个进程,在启动的过程中 master 会 fork 出了多个子进程作为 worker。master 主要是管理 worker,本身并不处理请求。而 worker 负责处理请求。因此,事件模块的初始化也是分成两部分。一部分发生在 fork 出 worker 前,主要是配置文件解析等操作,另外一部分发生在 fork 之后,主要是向 epoll 中添加监听事件。
3.1 启动进程对事件模块的初始化
启动进程对事件模块的初始化分为配置文件解析、开始监听端口和 ngx_event_core_module 模块的初始化。这三个步骤均在 ngx_init_cycle 函数进行。
调用关系:main() —> ngx_init_cycle()
下图是 ngx_init_cycle 函数的流程,红框是本节将要介绍的三部分内容。
3.1.1 配置文件解析
启动进程的一个主要工作是解析配置文件。在 nginx 中,用户主要通过 nginx 配置文件 nginx.conf 的 event 块来控制和调节事件模块的参数。下面是一个 event 块配置的示例:
user nobody;
worker_processes 1;
error_log logs/error.log;
pid logs/nginx.pid;
……
events {
use epoll;
worker_connections 1024;
accept_mutex on;
}
http {
……
}
首先我们先看看 nginx 是如何解析 event 块,并将 event 块存储在什么地方。
在 nginx 中,解析配置文件的工作是调用 ngx_init_cycle 函数完成的。下图是该函数在解析配置文件部分的一个流程:
(1)ngx_init_cycle 函数首先会进行一些初始化工作,包括更新时间,创建内存池和创建并更新 ngx_cycle_t 结构体 cycle;
(2)调用各个 core 模块的 create_conf 方法,可以创建 cycle 的 conf_ctx 数组,该阶段完成后 cycle->conf_ctx 如下图所示:
(3)初始化 ngx_conf_t 类型的结构体 conf,将 cycle->conf_ctx 结构体赋值给 conf 的 ctx 字段
(4)解析配置文件
解析配置文件会调用 ngx_conf_parse 函数,该函数会解析一行命令,当遇到块时会递归调用自身。解析的方法也很简单,就是读取一个命令,然后在所有模块的 cmd 数组中寻找该命令,若找到则调用该命令的 cmd->set(),完成参数的解析。下面介绍 event 块的解析。
event 命令是在 event/ngx_event.c 文件中定义的,代码如下。
static ngx_command_t ngx_events_commands[] = {
{ngx_string(“events”),
NGX_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS,
ngx_events_block,
0,
0,
NULL },
ngx_null_command
};
在从配置文件中读取到 event 后,会调用 ngx_events_block 函数。下面是 ngx_events_block 函数的主要工作:
解析完配置文件中的 event 块后,cycle->conf_ctx 如下图所示:
(5)解析完整个配置文件后,调用各个 core 类型模块的 init_conf 方法。ngx_event_module 的 ctx 的 init_conf 方法为 ngx_event_init_conf。该方法并没有实际的用途,暂不详述。
3.1.2 监听 socket
虽然监听 socket 和事件模块并没有太多的关系,但是为了使得整个流程完整,此处会简单介绍一下启动进程是如何监听端口的。
该过程首先检查 old_cycle,如果 old_cycle 中有和 cycle 中相同的 socket,就直接把 old_cycle 中的 fd 赋值给 cycle。之后会调用 ngx_open_listening_socket 函数,监听端口。
下面是 ngx_open_listening_sockets 函数,该函数的作用是遍历所有需要监听的端口,然后调用 socket(),bind()和 listen()函数,该函数会重试 5 次。
ngx_int_t
ngx_open_listening_sockets(ngx_cycle_t *cycle)
{
……
/* 重试 5 次 */
for (tries = 5; tries; tries–) {
failed = 0;
/* 遍历需要监听的端口 */
ls = cycle->listening.elts;
for (i = 0; i < cycle->listening.nelts; i++) {
……
/* ngx_socket 函数就是 socket 函数 */
s = ngx_socket(ls[i].sockaddr->sa_family, ls[i].type, 0);
……
/* 设置 socket 属性 */
if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
(const void *) &reuseaddr, sizeof(int))
== -1)
{
……
}
……
/* IOCP 事件操作 */
if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) {
if (ngx_nonblocking(s) == -1) {
……
}
}
……
/* 绑定 socket 和地址 */
if (bind(s, ls[i].sockaddr, ls[i].socklen) == -1) {
……
}
……
/* 开始监听 */
if (listen(s, ls[i].backlog) == -1) {
……
}
ls[i].listen = 1;
ls[i].fd = s;
}
……
/* 两次重试间隔 500ms */
ngx_msleep(500);
}
……
return NGX_OK;
}
3.1.3 ngx_event_core_module 模块的初始化
在 ngx_init_cycle 函数监听完端口,并提交新的 cycle 后,便会调用 ngx_init_modules 函数,该方法会遍历所有模块并调用其 init_module 方法。对于该阶段,和事件驱动模块有关系的只有 ngx_event_core_module 的 ngx_event_module_init 方法。该方法主要做了下面三个工作:
(1)获取 core 模块配置结构体中的时间精度 timer_resolution,用在 epoll 里更新缓存时间
(2)调用 getrlimit 方法,检查连接数是否超过系统的资源限制
(3)利用 mmap 分配一块共享内存,存储负载均衡锁(ngx_accept_mutex)、连接计数器(ngx_connection_counter)
3.2 worker 进程对事件模块的初始化
启动进程在完成一系列操作后,会 fork 出 master 进程,并自我关闭,让 master 进程继续完成初始化工作。master 进程会在 ngx_spawn_process 函数中 fork 出 worker 进程,并让 worker 进程调用 ngx_worker_process_cycle 函数。ngx_worker_process_cycle 函数是 worker 进程的主循环函数,该函数首先会调用 ngx_worker_process_init 函数完成 worker 的初始化,然后就会进入到一个循环中,持续监听处理请求。
事件模块的初始化就发生在 ngx_worker_process_init 函数中。
其调用关系:main() —> ngx_master_process_cycle() —> ngx_start_worker_processes() —> ngx_spawn_process() —> ngx_worker_process_cycle() —> ngx_worker_process_init()。
对于 ngx_worker_process_init 函数,会调用各个模块的 init_process 方法:
static void
ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker)
{
……
for (i = 0; cycle->modules[i]; i++) {
if (cycle->modules[i]->init_process) {
if (cycle->modules[i]->init_process(cycle) == NGX_ERROR) {
/* fatal */
exit(2);
}
}
}
……
}
在此处,会调用 ngx_event_core_module 的 ngx_event_process_init 函数。该函数较为关键,将会重点解析。在介绍 ngx_event_process_init 函数前,先介绍两个终于的结构体,由于这两个结构体较为复杂,故只介绍部分字段:
(1)ngx_event_s 结构体。nginx 中,事件会使用 ngx_event_s 结构体来表示。
ngx_event_s
struct ngx_event_s {
/* 通常指向 ngx_connection_t 结构体 */
void *data;
/* 事件可写 */
unsigned write:1;
/* 事件可建立新连接 */
unsigned accept:1;
/* 检测事件是否过期 */
unsigned instance:1;
/* 通常将事件加入到 epoll 中会将该字段置为 1 */
unsigned active:1;
……
/* 事件超时 */
unsigned timedout:1;
/* 事件是否在定时器中 */
unsigned timer_set:1;
……
/* 事件是否在延迟处理队列中 */
unsigned posted:1;
……
/* 事件的处理函数 */
ngx_event_handler_pt handler;
……
/* 定时器红黑树节点 */
ngx_rbtree_node_t timer;
/* 延迟处理队列节点 */
ngx_queue_t queue;
……
};
(2)ngx_connection_s 结构体代表一个 nginx 连接
struct ngx_connection_s {
/* 若该结构体未使用,则指向下一个为使用的 ngx_connection_s,若已使用,则指向 ngx_http_request_t */
void *data;
/* 指向一个读事件结构体,这个读事件结构体表示该连接的读事件 */
ngx_event_t *read;
/* 指向一个写事件结构体,这个写事件结构体表示该连接的写事件 */
ngx_event_t *write;
/* 连接的套接字 */
ngx_socket_t fd;
……
/* 该连接对应的监听端口,表示是由该端口建立的连接 */
ngx_listening_t *listening;
……
};
下面介绍 ngx_event_process_init 函数的实现,代码如下:
/* 此方法在 worker 进程初始化时调用 */
static ngx_int_t
ngx_event_process_init(ngx_cycle_t *cycle)
{
……
/* 打开 accept_mutex 负载均衡锁,用于防止惊群 */
if (ccf->master && ccf->worker_processes > 1 && ecf->accept_mutex) {
ngx_use_accept_mutex = 1;
ngx_accept_mutex_held = 0;
ngx_accept_mutex_delay = ecf->accept_mutex_delay;
} else {
ngx_use_accept_mutex = 0;
}
/* 初始化两个队列,一个用于存放不能及时处理的建立连接事件,一个用于存储不能及时处理的读写事件 */
ngx_queue_init(&ngx_posted_accept_events);
ngx_queue_init(&ngx_posted_events);
/* 初始化定时器 */
if (ngx_event_timer_init(cycle->log) == NGX_ERROR) {
return NGX_ERROR;
}
/**
* 调用使用的 ngx_epoll_module 的 ctx 的 actions 的 init 方法,即 ngx_epoll_init 函数
* 该函数主要的作用是调用 epoll_create()和创建用于 epoll_wait()返回事件链表的 event_list
**/
for (m = 0; cycle->modules[m]; m++) {
……
if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) {
exit(2);
}
break;
}
/* 如果在配置中设置了 timer_resolution,则要设置控制时间精度。通过 setitimer 方法会设置一个定时器,每隔 timer_resolution 的时间会发送一个 SIGALRM 信号 */
if (ngx_timer_resolution && !(ngx_event_flags & NGX_USE_TIMER_EVENT)) {
……
sa.sa_handler = ngx_timer_signal_handler;
sigemptyset(&sa.sa_mask);
if (sigaction(SIGALRM, &sa, NULL) == -1) {
……
}
itv.it_interval.tv_sec = ngx_timer_resolution / 1000;
……
if (setitimer(ITIMER_REAL, &itv, NULL) == -1) {
……
}
}
……
/* 分配连接池空间 */
cycle->connections =
ngx_alloc(sizeof(ngx_connection_t) * cycle->connection_n, cycle->log);
……
c = cycle->connections;
/* 分配读事件结构体数组空间,并初始化读事件的 closed 和 instance */
cycle->read_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n,
cycle->log);
……
rev = cycle->read_events;
for (i = 0; i < cycle->connection_n; i++) {
rev[i].closed = 1;
rev[i].instance = 1;
}
/* 分配写事件结构体数组空间,并初始化写事件的 closed */
cycle->write_events = ngx_alloc(sizeof(ngx_event_t) * cycle->connection_n,
cycle->log);
……
wev = cycle->write_events;
for (i = 0; i < cycle->connection_n; i++) {
wev[i].closed = 1;
}
/* 将序号为 i 的读事件结构体和写事件结构体赋值给序号为 i 的 connections 结构体的元素 */
i = cycle->connection_n;
next = NULL;
do {
i–;
/* 将 connection 的 data 字段设置为下一个 connection */
c[i].data = next;
c[i].read = &cycle->read_events[i];
c[i].write = &cycle->write_events[i];
c[i].fd = (ngx_socket_t) -1;
next = &c[i];
} while (i);
/* 初始化 cycle->free_connections */
cycle->free_connections = next;
cycle->free_connection_n = cycle->connection_n;
/* 为每个监听端口分配连接 */
ls = cycle->listening.elts;
for (i = 0; i < cycle->listening.nelts; i++) {
……
c = ngx_get_connection(ls[i].fd, cycle->log);
……
rev = c->read;
……
/* 为监听的端口的 connection 结构体的 read 事件设置回调函数 */
rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept
: ngx_event_recvmsg;
/* 将监听的 connection 的 read 事件添加到事件驱动模块(epoll)*/
……
if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
return NGX_ERROR;
}
}
return NGX_OK;
}
该方法主要做了下面几件事:
(1)打开 accept_mutex 负载均衡锁,用于防止惊群。惊群是指当多个 worker 都处于等待事件状态,如果突然来了一个请求,就会同时唤醒多个 worker,但是只有一个 worker 会处理该请求,这就造成系统资源浪费。为了解决这个问题,nginx 使用了 accept_mutex 负载均衡锁。各个 worker 首先会抢锁,抢到锁的 worker 才会监听各个端口。
(2)初始化两个队列,一个用于存放不能及时处理的建立连接事件,一个用于存储不能及时处理的读写事件。
(3)初始化定时器,该定时器就是一颗红黑树,根据时间对事件进行排序。
(4)调用使用的 ngx_epoll_module 的 ctx 的 actions 的 init 方法,即 ngx_epoll_init 函数。该函数较为简单,主要的作用是调用 epoll_create()和创建用于存储 epoll_wait()返回事件的链表 event_list。
(5)如果再配置中设置了 timer_resolution,则要设置控制时间精度,用于控制 nginx 时间。这部分在第五部分重点讲解。
(6)分配连接池空间、读事件结构体数组、写事件结构体数组。
上文介绍了 ngx_connection_s 和 ngx_event_s 结构体,我们了解到每一个 ngx_connection_s 结构体都有两个 ngx_event_s 结构体,一个读事件,一个写事件。在这个阶段,会向内存池中申请三个数组:cycle->connections、cycle->read_events 和 cycle->write_events,并将序号为 i 的读事件结构体和写事件结构体赋值给序号为 i 的 connections 结构体的元素。并将 cycle->free_connections 指向第一个未使用的 ngx_connections 结构体。
(7)为每个监听端口分配连接
在此阶段,会获取 cycle->listening 数组中的 ngx_listening_s 结构体元素。在 3.1.2 小节中,我们已经讲了 nginx 启动进程会监听端口,并将 socket 连接的 fd 存储在 cycle->listening 数组中。在这里,会获取到 3.1.2 小节中监听的端口,并为每个监听分配连接结构体。
(8)为每个监听端口的连接的读事件设置 handler
在为 cycle->listening 的元素分配完 ngx_connection_s 类型的连接后,会为连接的读事件设置回调方法 handler。这里 handler 为 ngx_event_accept 函数,对于该函数,将在后文讲解。
(9)将每个监听端口的连接的读事件添加到 epoll 中
在此处,会调用 ngx_epoll_module 的 ngx_epoll_add_event 函数,将监听端口的连接的读事件 (ls[i].connection->read) 添加到 epoll 中。ngx_epoll_add_event 函数的流程如下:
在向 epoll 中添加事件前,需要判断之前是否添加过该连接的事件。
至此,ngx_event_process_init 的工作完成,事件模块的初始化也完成了。后面 worker 开始进入循环监听阶段。
四、事件处理
4.1 worker 的主循环函数 ngx_worker_process_cycle
worker 在初始化完成之后,开始循环监听端口,并处理请求。下面开始我们开始讲解 worker 是如何处理事件的。worker 的循环代码如下:
static void
ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{
ngx_int_t worker = (intptr_t) data;
ngx_process = NGX_PROCESS_WORKER;
ngx_worker = worker;
/* 初始化 worker */
ngx_worker_process_init(cycle, worker);
ngx_setproctitle(“worker process”);
for (;;) {
if (ngx_exiting) {
……
}
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, “worker cycle”);
/* 处理 IO 事件和时间事件 */
ngx_process_events_and_timers(cycle);
if (ngx_terminate) {
……
}
if (ngx_quit) {
……
}
if (ngx_reopen) {
……
}
}
}
可以看到,在 worker 初始化后进入一个 for 循环,所有的 IO 事件和时间事件都是在函数 ngx_process_events_and_timers 中处理的。
4.2 worker 的事件处理函数 ngx_process_events_and_timers
在 worker 的主循环中,所有的事件都是通过函数 ngx_process_events_and_timers 处理的,该函数的代码如下:
/* 事件处理函数和定时器处理函数 */
void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
ngx_uint_t flags;
ngx_msec_t timer, delta;
/* timer_resolution 模式,设置 epoll_wait 函数阻塞 ngx_timer_resolution 的时间 */
if (ngx_timer_resolution) {
/* timer_resolution 模式 */
timer = NGX_TIMER_INFINITE;
flags = 0;
} else {
/* 非 timer_resolution 模式,epoll_wait 函数等待至下一个定时器事件到来时返回 */
timer = ngx_event_find_timer();
flags = NGX_UPDATE_TIME;
}
/* 是否使用 accept_mutex */
if (ngx_use_accept_mutex) {
/**
* 该 worker 是否负载过高,若负载过高则不抢锁
* 判断负载过高是判断该 worker 建立的连接数是否大于该 worker 可以建立的最大连接数的 7 /8
**/
if (ngx_accept_disabled > 0) {
ngx_accept_disabled–;
} else {
/* 抢锁 */
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}
if (ngx_accept_mutex_held) {
/* 抢到锁,则收到事件后暂不处理,先扔到事件队列中 */
flags |= NGX_POST_EVENTS;
} else {
/* 未抢到锁,要修改 worker 在 epoll_wait 函数等待的时间,使其不要过大 */
if (timer == NGX_TIMER_INFINITE
|| timer > ngx_accept_mutex_delay)
{
timer = ngx_accept_mutex_delay;
}
}
}
}
/* delta 用于计算 ngx_process_events 的耗时 */
delta = ngx_current_msec;
/* 事件处理函数,epoll 使用的是 ngx_epoll_process_events 函数 */
(void) ngx_process_events(cycle, timer, flags);
delta = ngx_current_msec – delta;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
“timer delta: %M”, delta);
/* 处理 ngx_posted_accept_events 队列的连接事件 */
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
/* 若持有 accept_mutex,则释放锁 */
if (ngx_accept_mutex_held) {
ngx_shmtx_unlock(&ngx_accept_mutex);
}
/* 若事件处理函数的执行时间不为 0,则要处理定时器事件 */
if (delta) {
ngx_event_expire_timers();
}
/* 处理 ngx_posted_events 队列的读写事件 */
ngx_event_process_posted(cycle, &ngx_posted_events);
}
ngx_process_events_and_timers 函数是 nginx 处理事件的核心函数,主要的工作可以分为下面几部分:
(1)设置 nginx 更新时间的方式。
nginx 会将时间存储在内存中,每隔一段时间调用 ngx_time_update 函数更新时间。那么多久更新一次呢?nginx 提供两种方式:
方式一:timer_resolution 模式。在 nginx 配置文件中,可以使用 timer_resolution 之类来选择此方式。如果使用此方式,会将 epoll_wait 的阻塞时间设置为无穷大,即一直阻塞。那么如果 nginx 一直都没有收到事件,会一直阻塞吗?答案是不会的。在本文 3.2 节中讲解的 ngx_event_process_init 函数 (第 5 步) 将会设置一个时间定时器和一个信号处理函数,其中时间定时器会每隔 timer_resolution 的时间发送一个 SIGALRM 信号,而当 worker 收到时间定时器发送的信号,会将 epoll_wait 函数终端,同时调用 SIGALRM 信号的中断处理函数,将全局变量 ngx_event_timer_alarm 置为 1。后面会检查该变量,调用 ngx_time_update 函数来更新 nginx 的时间。
方式二:如果不在配置文件中设置 timer_resolution,nginx 默认会使用方式二来更新 nginx 的时间。首先会调用 ngx_event_find_timer 函数来设置 epoll_wait 的阻塞时间,ngx_event_find_timer 函数返回的是下一个时间事件发生的时间与当前时间的差值,即让 epoll_wait 阻塞到下一个时间事件发生为止。当使用这种模式,每当 epoll_wait 返回,都会调用 ngx_time_update 函数更新时间。
(2)使用负载均衡锁 ngx_use_accept_mutex。
上文曾经提过一个问题,当多个 worker 都处于等待事件状态,如果突然来了一个请求,就会同时唤醒多个 worker,但是只有一个 worker 会处理该请求,这就造成系统资源浪费。nginx 如果解决这个问题呢?答案就是使用一个锁来解决。在监听事件前,各个 worker 会进行一次抢锁行为,只有抢到锁的 worker 才会监听端口,而其他 worker 值处理已经建立连接的事件。
首先函数会通过 ngx_accept_disabled 是否大于 0 来判断是否过载,过载的 worker 是不允许抢锁的。ngx_accept_disabled 的计算方式如下。
/**
* ngx_cycle->connection_n 是每个进程最大连接数,也是连接池的总连接数,ngx_cycle->free_connection_n 是连接池中未使用的连接数量。
* 当未使用的数量小于总数量的 1 / 8 时,会使 ngx_accept_disabled 大于 0。这时认为该 worker 过载。
**/
ngx_accept_disabled = ngx_cycle->connection_n / 8 – ngx_cycle->free_connection_n;
若 ngx_accept_disabled 小于 0,worker 可以抢锁。这时会通过 ngx_trylock_accept_mutex 函数抢锁。该函数的流程如下图所示:
在抢锁结束后,若 worker 抢到锁,设置该 worker 的 flag 为 NGX_POST_EVENTS,表示抢到锁的这个 worker 在收到事件后并不会立即调用事件的处理函数,而是会把事件放到一个队列里,后期处理。
(3)调用事件处理函数 ngx_process_events,epoll 使用的是 ngx_epoll_process_events 函数。此代码较为重要,下面是代码:
static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
int events;
uint32_t revents;
ngx_int_t instance, i;
ngx_uint_t level;
ngx_err_t err;
ngx_event_t *rev, *wev;
ngx_queue_t *queue;
ngx_connection_t *c;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
“epoll timer: %M”, timer);
/* 调用 epoll_wait,从 epoll 中获取发生的事件 */
events = epoll_wait(ep, event_list, (int) nevents, timer);
err = (events == -1) ? ngx_errno : 0;
/* 两种方式更新 nginx 时间,timer_resolution 模式 ngx_event_timer_alarm 为 1,非 timer_resolution 模式 flags & NGX_UPDATE_TIME 不为 0,均会进入 if 条件 */
if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
ngx_time_update();
}
/* 处理 epoll_wait 返回为 - 1 的情况 */
if (err) {
/**
* 对于 timer_resolution 模式,如果 worker 接收到 SIGALRM 信号,会调用该信号的处理函数,将 ngx_event_timer_alarm 置为 1,从而更新时间。
* 同时如果在 epoll_wait 阻塞的过程中接收到 SIGALRM 信号,会中断 epoll_wait,使其返回 NGX_EINTR。由于上一步已经更新了时间,这里要把 ngx_event_timer_alarm 置为 0。
**/
if (err == NGX_EINTR) {
if (ngx_event_timer_alarm) {
ngx_event_timer_alarm = 0;
return NGX_OK;
}
level = NGX_LOG_INFO;
} else {
level = NGX_LOG_ALERT;
}
ngx_log_error(level, cycle->log, err, “epoll_wait() failed”);
return NGX_ERROR;
}
/* 若 events 返回为 0,判断是因为 epoll_wait 超时还是其他原因 */
if (events == 0) {
if (timer != NGX_TIMER_INFINITE) {
return NGX_OK;
}
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
“epoll_wait() returned no events without timeout”);
return NGX_ERROR;
}
/* 对 epoll_wait 返回的链表进行遍历 */
for (i = 0; i < events; i++) {
c = event_list[i].data.ptr;
/* 从 data 中获取 connection & instance 的值,并解析出 instance 和 connection */
instance = (uintptr_t) c & 1;
c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
/* 取出 connection 的 read 事件 */
rev = c->read;
/* 判断读事件是否过期 */
if (c->fd == -1 || rev->instance != instance) {
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
“epoll: stale event %p”, c);
continue;
}
/* 取出事件的类型 */
revents = event_list[i].events;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
“epoll: fd:%d ev:%04XD d:%p”,
c->fd, revents, event_list[i].data.ptr);
/* 若连接发生错误,则将 EPOLLIN、EPOLLOUT 添加到 revents 中,在调用读写事件时能够处理连接的错误 */
if (revents & (EPOLLERR|EPOLLHUP)) {
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
“epoll_wait() error on fd:%d ev:%04XD”,
c->fd, revents);
revents |= EPOLLIN|EPOLLOUT;
}
/* 事件为读事件且读事件在 epoll 中 */
if ((revents & EPOLLIN) && rev->active) {
#if (NGX_HAVE_EPOLLRDHUP)
if (revents & EPOLLRDHUP) {
rev->pending_eof = 1;
}
rev->available = 1;
#endif
rev->ready = 1;
/* 事件是否需要延迟处理?对于抢到锁监听端口的 worker,会将事件延迟处理 */
if (flags & NGX_POST_EVENTS) {
/* 根据事件的是否是 accept 事件,加到不同的队列中 */
queue = rev->accept ? &ngx_posted_accept_events
: &ngx_posted_events;
ngx_post_event(rev, queue);
} else {
/* 若不需要延迟处理,直接调用 read 事件的 handler */
rev->handler(rev);
}
}
/* 取出 connection 的 write 事件 */
wev = c->write;
/* 事件为写事件且写事件在 epoll 中 */
if ((revents & EPOLLOUT) && wev->active) {
/* 判断写事件是否过期 */
if (c->fd == -1 || wev->instance != instance) {
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
“epoll: stale event %p”, c);
continue;
}
wev->ready = 1;
#if (NGX_THREADS)
wev->complete = 1;
#endif
/* 事件是否需要延迟处理?对于抢到锁监听端口的 worker,会将事件延迟处理 */
if (flags & NGX_POST_EVENTS) {
ngx_post_event(wev, &ngx_posted_events);
} else {
/* 若不需要延迟处理,直接调用 write 事件的 handler */
wev->handler(wev);
}
}
}
return NGX_OK;
}
该函数的流程图如下:
(4)计算 ngx_process_events 函数的调用时间。
(5)处理 ngx_posted_accept_events 队列的连接事件。这里就是遍历 ngx_posted_accept_events 队列,调用事件的 handler 方法,这里 accept 事件的 handler 为 ngx_event_accept。
(6)释放负载均衡锁。
(7)处理定时器事件,具体操作是在定时器红黑树中查找过期的事件,调用其 handler 方法。
(8)处理 ngx_posted_events 队列的读写事件,即遍历 ngx_posted_events 队列,调用事件的 handler 方法。
结束
至此,我们介绍完了 nginx 事件模块的事件处理函数 ngx_process_events_and_timers。nginx 事件模块的相关知识也初步介绍完了。