PHP-FPM 源码剖析
一个申请从浏览器达到 PHP 脚本执行两头有个必要模块是网络解决模块,FPM 是这个模块的一部分,配合 fastcgi 协定实现对申请的从监听到转发到 PHP 解决,并将后果返回这条流程。
FPM 采纳多过程模型,就是创立一个 master 过程,在 master 过程中创立并监听 socket,而后 fork 多个子过程,而后子过程各自 accept 申请,子过程在启动后阻塞在 accept 上,有申请达到后开始读取申请 数据,读取实现后开始解决而后再返回,在这期间是不会接管其它申请的,也就是说 fpm 的子过程同时只能响应 一个申请,只有把这个申请解决实现后才会 accept 下一个申请,这是一种同步阻塞的模型。master 过程负责管理子过程,监听子过程的状态,管制子过程的数量。master 过程与 worker 过程之间通过共享变量同步信息。
从 main 函数开始
int main(int argc, char *argv[])
{zend_signal_startup();
// 将全局变量 sapi_module 设置为 cgi_sapi_module
sapi_startup(&cgi_sapi_module);
fcgi_init();
// 获取命令行参数,其中 php-fpm -D、- i 等参数都是在这里被解析进去的
// ...
cgi_sapi_module.startup(&cgi_sapi_module);
fpm_init(argc, argv, fpm_config ? fpm_config : CGIG(fpm_config), fpm_prefix, fpm_pid, test_conf, php_allow_to_run_as_root, force_daemon, force_stderr);
// master 过程会在这一步死循环, 前面的流程都是子过程在执行。fcgi_fd = fpm_run(&max_requests);
fcgi_fd = fpm_run(&max_requests);
request = fpm_init_request(fcgi_fd);
// accept 申请
// ....
}
main()函数展示了这个 fpm 运行残缺的框架, 可见整个 fpm 次要分为三个局部:1、运行前的 fpm_init();2、运行函数 fpm_run();3、子过程 accept 申请解决。
FPM 中的事件监听机制
在具体理解 fpm 工作过程前,咱们要先理解 fpm 中的事件机制。在 fpm 中事件的监听默认应用 kqueue 来实现,对于 kqueue 的介绍能够看看我之前整顿的这篇文章 kqueue 用法简介。
// fpm 中的事件构造体
struct fpm_event_s {
// 事件的句柄
int fd;
// 下一次触发的事件
struct timeval timeout;
// 频率: 多久执行一次
struct timeval frequency;
// 事件触发时调用的函数
void (*callback)(struct fpm_event_s *, short, void *);
void *arg; // 调用 callback 时的参数
// FPM_EV_READ: 读;FPM_EV_TIMEOUT:;FPM_EV_PERSIST:;FPM_EV_EDGE:;
int flags;
int index; // 在 fd 句柄数组中的索引
// 事件的类型 FPM_EV_READ: 读;FPM_EV_TIMEOUT: 计时器;FPM_EV_PERSIST:;FPM_EV_EDGE:;
short which;
};
// 事件队列
typedef struct fpm_event_queue_s {
struct fpm_event_queue_s *prev;
struct fpm_event_queue_s *next;
struct fpm_event_s *ev;
} fpm_event_queue;
以 fpm_run()中 master 过程注册的一个 sp[0]的可读事件为例:
void fpm_event_loop(int err)
{
static struct fpm_event_s signal_fd_event;
// 创立一个事件: 管道 sp[0]可读时触发
fpm_event_set(&signal_fd_event, fpm_signals_get_fd(), FPM_EV_READ, &fpm_got_signal, NULL);
// 将事件增加进 queue
fpm_event_add(&signal_fd_event, 0);
// 解决定时器等逻辑
// 以阻塞的形式获取事件
// module->wait()是一个接口定义的办法签名,上面展现 kqueue 的实现
ret = module->wait(fpm_event_queue_fd, timeout);
}
int fpm_event_add(struct fpm_event_s *ev, unsigned long int frequency)
{
// ...
// 如果事件是触发事件则之间增加进 queue 中
// 对于定时器事件先依据事件的 frequency 设置事件的触发频率和下一次触发的事件
if (fpm_event_queue_add(&fpm_event_queue_timer, ev) != 0) {return -1;}
return 0;
}
static int fpm_event_queue_add(struct fpm_event_queue_s **queue, struct fpm_event_s *ev)
{
// ...
// 构建并将以后事件插入事件队列 queue 中
if (*queue == fpm_event_queue_fd && module->add) {// module->add(ev)是一个接口定义的办法签名,上面展现 kqueue 的实现
module->add(ev);
}
return 0;
}
// kqueue 对于增加事件到 kqueue 的实现
static int fpm_event_kqueue_add(struct fpm_event_s *ev) /* {{{ */
{
struct kevent k;
int flags = EV_ADD;
if (ev->flags & FPM_EV_EDGE) {flags = flags | EV_CLEAR;}
EV_SET(&k, ev->fd, EVFILT_READ, flags, 0, 0, (void *)ev);
if (kevent(kfd, &k, 1, NULL, 0, NULL) < 0) {zlog(ZLOG_ERROR, "kevent: unable to add event");
return -1;
}
/* mark the event as registered */
ev->index = ev->fd;
return 0;
}
FPM 中对于 kqueue 的实现
// kqueue 对于从 kqueue 中监听事件的实现
static int fpm_event_kqueue_wait(struct fpm_event_queue_s *queue, unsigned long int timeout) /* {{{ */
{
struct timespec t;
int ret, i;
/* ensure we have a clean kevents before calling kevent() */
memset(kevents, 0, sizeof(struct kevent) * nkevents);
/* convert ms to timespec struct */
t.tv_sec = timeout / 1000;
t.tv_nsec = (timeout % 1000) * 1000 * 1000;
/* wait for incoming event or timeout */
ret = kevent(kfd, NULL, 0, kevents, nkevents, &t);
if (ret == -1) {
/* trigger error unless signal interrupt */
if (errno != EINTR) {zlog(ZLOG_WARNING, "epoll_wait() returns %d", errno);
return -1;
}
}
/* fire triggered events */
for (i = 0; i < ret; i++) {if (kevents[i].udata) {struct fpm_event_s *ev = (struct fpm_event_s *)kevents[i].udata;
fpm_event_fire(ev);
/* sanity check */
if (fpm_globals.parent_pid != getpid()) {return -2;}
}
}
return ret;
}
fpm_init
fpm_init()负责启动前的初始化工作,包含注册各个模块的销毁时用于清理变量的 callback。上面只介绍几个重要的 init。
fpm_conf_init_main
负责解析 php-fpm.conf 配置文件,调配 worker pool 内存构造并保留到全局变量 fpm_worker_all_pools 中,各 worker pool 配置解析到 fpm_worker_pool_s->config 中。
所谓 worker pool 是 fpm 能够同时监听多个端口,每个端口对应一个 worker pool。
fpm_scoreboard_init_main
为每个 worker pool 调配一个 fpm_scoreboard_s 构造的内存空间 scoreboard,用于记录 worker 过程运行信息。
// fpm_scoreboard_s 构造
struct fpm_scoreboard_s {
union {
atomic_t lock;
char dummy[16];
};
char pool[32];
int pm; // 过程的治理形式 static、dynamic、ondemand
time_t start_epoch;
int idle; // 闲暇的 worker 过程数
int active; // 忙碌的 worker 过程数
int active_max; // 最大忙碌过程数
unsigned long int requests;
unsigned int max_children_reached;
int lq;
int lq_max;
unsigned int lq_len;
unsigned int nprocs;
int free_proc;
unsigned long int slow_rq;
struct fpm_scoreboard_proc_s *procs[];};
fpm_signals_init_main
fpm 注册本人的信号量,并设置监听函数的解决逻辑。
int fpm_signals_init_main() /* {{{ */
{
struct sigaction act;
// 创立一个全双工套接字
// 全双工的套接字是一个能够读、写的 socket 通道 [0] 和[1],每个过程固定一个管道。// 写数据时:管道不满不会被阻塞;读数据时:管道里没有数据会阻塞(可设置)
// 向 sp[0]写入数据时,sp[0]的读取将会被阻塞,sp[1]的写管道会被阻塞,sp[1]中此时读取 sp[0]的数据
if (0 > socketpair(AF_UNIX, SOCK_STREAM, 0, sp)) {zlog(ZLOG_SYSERROR, "failed to init signals: socketpair()");
return -1;
}
if (0 > fd_set_blocked(sp[0], 0) || 0 > fd_set_blocked(sp[1], 0)) {zlog(ZLOG_SYSERROR, "failed to init signals: fd_set_blocked()");
return -1;
}
if (0 > fcntl(sp[0], F_SETFD, FD_CLOEXEC) || 0 > fcntl(sp[1], F_SETFD, FD_CLOEXEC)) {zlog(ZLOG_SYSERROR, "falied to init signals: fcntl(F_SETFD, FD_CLOEXEC)");
return -1;
}
memset(&act, 0, sizeof(act));
act.sa_handler = sig_handler; // 监听到信号调用这个函数
sigfillset(&act.sa_mask);
if (0 > sigaction(SIGTERM, &act, 0) ||
0 > sigaction(SIGINT, &act, 0) ||
0 > sigaction(SIGUSR1, &act, 0) ||
0 > sigaction(SIGUSR2, &act, 0) ||
0 > sigaction(SIGCHLD, &act, 0) ||
0 > sigaction(SIGQUIT, &act, 0)) {zlog(ZLOG_SYSERROR, "failed to init signals: sigaction()");
return -1;
}
return 0;
}
// 所有信号共用同一个处理函数
static void sig_handler(int signo) /* {{{ */
{static const char sig_chars[NSIG + 1] = {[SIGTERM] = 'T',
[SIGINT] = 'I',
[SIGUSR1] = '1',
[SIGUSR2] = '2',
[SIGQUIT] = 'Q',
[SIGCHLD] = 'C'
};
char s;
int saved_errno;
if (fpm_globals.parent_pid != getpid()) {return;}
saved_errno = errno;
s = sig_chars[signo];
zend_quiet_write(sp[1], &s, sizeof(s)); // 将信息对应的字节写进管道 sp[1]端,此时 sp[1]端的读数据会阻塞; 数据能够从 sp[0]端读取
errno = saved_errno;
}
fpm_sockets_init_main
每个 worker pool 开启一个 socket 套接字。
fpm_event_init_main
这里启动 master 的事件管理器。用于治理 IO、定时事件,其中 IO 事件通过 kqueue、epoll、poll、select 等治理,定时事件就是定时器,肯定工夫后触发某个事件。同样,咱们以 kqueue 的实现为例看下源码。
int fpm_event_init_main()
{
// ...
if (module->init(max) < 0) {zlog(ZLOG_ERROR, "Unable to initialize the event module %s", module->name);
return -1;
}
// ...
}
// max 用于指定 kqueue 事件数组的大小
static int fpm_event_kqueue_init(int max) /* {{{ */
{if (max < 1) {return 0;}
kfd = kqueue();
if (kfd < 0) {zlog(ZLOG_ERROR, "kqueue: unable to initialize");
return -1;
}
kevents = malloc(sizeof(struct kevent) * max);
if (!kevents) {zlog(ZLOG_ERROR, "epoll: unable to allocate %d events", max);
return -1;
}
memset(kevents, 0, sizeof(struct kevent) * max);
nkevents = max;
return 0;
}
fpm_run
fpm_init 到此结束,上面进入 fpm_run 阶段,在这个阶段 master 过程会依据配置 fork 出多个子过程而后 master 过程会进入 fpm_event_loop(0)函数,并在这个函数外部死循环,也就是说 master 过程将不再执行前面的代码,前面的逻辑全副是子过程执行的操作。
master 过程在 fpm_event_loop 里通过管道 sp 来监听子过程的各个事件,同时也要解决本身产生的一些事件、定时器等工作,来响应的治理子过程。外部的逻辑在介绍事件监听机制时曾经具体说过。
int fpm_run(int *max_requests) /* {{{ */
{
struct fpm_worker_pool_s *wp;
/* create initial children in all pools */
for (wp = fpm_worker_all_pools; wp; wp = wp->next) {
int is_parent;
is_parent = fpm_children_create_initial(wp);
if (!is_parent) {goto run_child;}
}
/* run event loop forever */
fpm_event_loop(0);
run_child: /* only workers reach this point */
fpm_cleanups_run(FPM_CLEANUP_CHILD);
*max_requests = fpm_globals.max_requests;
return fpm_globals.listening_socket;
}
子过程解决申请
回到 main 函数,fpm_run 前面的逻辑都是子过程在运行。首先会初始化一个 fpm 的 request 构造的变量,而后子过程会阻塞在 fcgi_accept_request(request)函数上期待申请。对于 fcgi_accept_request 函数就是死循环一个 socket 编程的 accept 函数来接管申请,并将申请数据全副取出。
...
// 初始化 request
request = fpm_init_request(fcgi_fd);
zend_first_try {
// accept 接管申请
while (EXPECTED(fcgi_accept_request(request) >= 0)) {init_request_info();
fpm_request_info();
if (UNEXPECTED(php_request_startup() == FAILURE)) {// ...}
if (UNEXPECTED(fpm_status_handle_request())) {goto fastcgi_request_done;}
...
// 关上配置文件中 DOCUMENT_ROOT 设置的脚本
if (UNEXPECTED(php_fopen_primary_script(&file_handle) == FAILURE)) {...}
fpm_request_executing();
// 执行脚本
php_execute_script(&file_handle);
...
}
// 销毁申请 request
fcgi_destroy_request(request);
// fcgi 退出
fcgi_shutdown();
if (cgi_sapi_module.php_ini_path_override) {free(cgi_sapi_module.php_ini_path_override);
}
if (cgi_sapi_module.ini_entries) {free(cgi_sapi_module.ini_entries);
}
} zend_catch {...} zend_end_try();