PHP-FPM源码剖析

一个申请从浏览器达到PHP脚本执行两头有个必要模块是网络解决模块,FPM是这个模块的一部分,配合fastcgi协定实现对申请的从监听到转发到PHP解决,并将后果返回这条流程。
FPM采纳多过程模型,就是创立一个master过程,在master过程中创立并监听socket,而后fork多个子过程,而后子过程各自accept申请,子过程在启动后阻塞在accept上,有申请达到后开始读取申请 数据,读取实现后开始解决而后再返回,在这期间是不会接管其它申请的,也就是说fpm的子过程同时只能响应 一个申请,只有把这个申请解决实现后才会accept下一个申请,这是一种同步阻塞的模型。master过程负责管理子过程,监听子过程的状态,管制子过程的数量。master过程与worker过程之间通过共享变量同步信息。

从main函数开始

int main(int argc, char *argv[]){    zend_signal_startup();    // 将全局变量sapi_module设置为cgi_sapi_module    sapi_startup(&cgi_sapi_module);    fcgi_init();    // 获取命令行参数,其中php-fpm -D、-i等参数都是在这里被解析进去的    // ...        cgi_sapi_module.startup(&cgi_sapi_module);        fpm_init(argc, argv, fpm_config ? fpm_config : CGIG(fpm_config), fpm_prefix, fpm_pid, test_conf, php_allow_to_run_as_root, force_daemon, force_stderr);        // master过程会在这一步死循环,前面的流程都是子过程在执行。    fcgi_fd = fpm_run(&max_requests);            fcgi_fd = fpm_run(&max_requests);    request = fpm_init_request(fcgi_fd);        // accept申请    // ....}

main()函数展示了这个fpm运行残缺的框架,可见整个fpm次要分为三个局部:1、运行前的fpm_init();2、运行函数fpm_run();3、子过程accept申请解决。

FPM中的事件监听机制

在具体理解fpm工作过程前,咱们要先理解fpm中的事件机制。在fpm中事件的监听默认应用kqueue来实现,对于kqueue的介绍能够看看我之前整顿的这篇文章kqueue用法简介。

// fpm中的事件构造体struct fpm_event_s {    // 事件的句柄    int fd;    // 下一次触发的事件    struct timeval timeout;    // 频率:多久执行一次    struct timeval frequency;    // 事件触发时调用的函数    void (*callback)(struct fpm_event_s *, short, void *);    void *arg;                // 调用callback时的参数    // FPM_EV_READ:读;FPM_EV_TIMEOUT:;FPM_EV_PERSIST:;FPM_EV_EDGE:;    int flags;    int index;                // 在fd句柄数组中的索引    // 事件的类型 FPM_EV_READ:读;FPM_EV_TIMEOUT:计时器;FPM_EV_PERSIST:;FPM_EV_EDGE:;    short which;};// 事件队列typedef struct fpm_event_queue_s {    struct fpm_event_queue_s *prev;    struct fpm_event_queue_s *next;    struct fpm_event_s *ev;} fpm_event_queue;

以fpm_run()中master过程注册的一个sp[0]的可读事件为例:

void fpm_event_loop(int err){    static struct fpm_event_s signal_fd_event;        // 创立一个事件:管道sp[0]可读时触发    fpm_event_set(&signal_fd_event, fpm_signals_get_fd(), FPM_EV_READ, &fpm_got_signal, NULL);    // 将事件增加进queue    fpm_event_add(&signal_fd_event, 0);        // 解决定时器等逻辑        // 以阻塞的形式获取事件    // module->wait()是一个接口定义的办法签名,上面展现kqueue的实现    ret = module->wait(fpm_event_queue_fd, timeout);}int fpm_event_add(struct fpm_event_s *ev, unsigned long int frequency){    // ...    // 如果事件是触发事件则之间增加进queue中    // 对于定时器事件先依据事件的frequency设置事件的触发频率和下一次触发的事件    if (fpm_event_queue_add(&fpm_event_queue_timer, ev) != 0) {        return -1;    }    return 0;}static int fpm_event_queue_add(struct fpm_event_queue_s **queue, struct fpm_event_s *ev){    // ...    // 构建并将以后事件插入事件队列queue中    if (*queue == fpm_event_queue_fd && module->add) {        // module->add(ev)是一个接口定义的办法签名,上面展现kqueue的实现        module->add(ev);    }    return 0;}// kqueue对于增加事件到kqueue的实现static int fpm_event_kqueue_add(struct fpm_event_s *ev) /* {{{ */{    struct kevent k;    int flags = EV_ADD;    if (ev->flags & FPM_EV_EDGE) {            flags = flags | EV_CLEAR;    }    EV_SET(&k, ev->fd, EVFILT_READ, flags, 0, 0, (void *)ev);    if (kevent(kfd, &k, 1, NULL, 0, NULL) < 0) {        zlog(ZLOG_ERROR, "kevent: unable to add event");        return -1;    }    /* mark the event as registered */    ev->index = ev->fd;    return 0;}

FPM中对于kqueue的实现

// kqueue对于从kqueue中监听事件的实现static int fpm_event_kqueue_wait(struct fpm_event_queue_s *queue, unsigned long int timeout) /* {{{ */{    struct timespec t;    int ret, i;    /* ensure we have a clean kevents before calling kevent() */    memset(kevents, 0, sizeof(struct kevent) * nkevents);    /* convert ms to timespec struct */    t.tv_sec = timeout / 1000;    t.tv_nsec = (timeout % 1000) * 1000 * 1000;    /* wait for incoming event or timeout */    ret = kevent(kfd, NULL, 0, kevents, nkevents, &t);    if (ret == -1) {        /* trigger error unless signal interrupt */        if (errno != EINTR) {            zlog(ZLOG_WARNING, "epoll_wait() returns %d", errno);            return -1;        }    }    /* fire triggered events */    for (i = 0; i < ret; i++) {        if (kevents[i].udata) {            struct fpm_event_s *ev = (struct fpm_event_s *)kevents[i].udata;            fpm_event_fire(ev);            /* sanity check */            if (fpm_globals.parent_pid != getpid()) {                return -2;            }        }    }    return ret;}

fpm_init

fpm_init()负责启动前的初始化工作,包含注册各个模块的销毁时用于清理变量的callback。上面只介绍几个重要的init。

fpm_conf_init_main

负责解析php-fpm.conf配置文件,调配worker pool内存构造并保留到全局变量fpm_worker_all_pools中,各worker pool配置解析到 fpm_worker_pool_s->config 中。
所谓worker pool 是fpm能够同时监听多个端口,每个端口对应一个worker pool。

fpm_scoreboard_init_main

为每个worker pool调配一个fpm_scoreboard_s构造的内存空间scoreboard,用于记录worker过程运行信息。

// fpm_scoreboard_s 构造struct fpm_scoreboard_s {    union {        atomic_t lock;        char dummy[16];    };    char pool[32];    int pm;                    // 过程的治理形式 static、dynamic、ondemand    time_t start_epoch;    int idle;                // 闲暇的worker过程数    int active;                // 忙碌的worker过程数    int active_max;            // 最大忙碌过程数    unsigned long int requests;    unsigned int max_children_reached;    int lq;    int lq_max;    unsigned int lq_len;    unsigned int nprocs;    int free_proc;    unsigned long int slow_rq;    struct fpm_scoreboard_proc_s *procs[];};

fpm_signals_init_main

fpm注册本人的信号量,并设置监听函数的解决逻辑。

int fpm_signals_init_main() /* {{{ */{    struct sigaction act;    // 创立一个全双工套接字    // 全双工的套接字是一个能够读、写的socket通道[0]和[1],每个过程固定一个管道。    // 写数据时:管道不满不会被阻塞;读数据时:管道里没有数据会阻塞(可设置)    // 向sp[0]写入数据时,sp[0]的读取将会被阻塞,sp[1]的写管道会被阻塞,sp[1]中此时读取sp[0]的数据    if (0 > socketpair(AF_UNIX, SOCK_STREAM, 0, sp)) {        zlog(ZLOG_SYSERROR, "failed to init signals: socketpair()");        return -1;    }    if (0 > fd_set_blocked(sp[0], 0) || 0 > fd_set_blocked(sp[1], 0)) {        zlog(ZLOG_SYSERROR, "failed to init signals: fd_set_blocked()");        return -1;    }    if (0 > fcntl(sp[0], F_SETFD, FD_CLOEXEC) || 0 > fcntl(sp[1], F_SETFD, FD_CLOEXEC)) {        zlog(ZLOG_SYSERROR, "falied to init signals: fcntl(F_SETFD, FD_CLOEXEC)");        return -1;    }    memset(&act, 0, sizeof(act));    act.sa_handler = sig_handler;       // 监听到信号调用这个函数    sigfillset(&act.sa_mask);    if (0 > sigaction(SIGTERM,  &act, 0) ||        0 > sigaction(SIGINT,   &act, 0) ||        0 > sigaction(SIGUSR1,  &act, 0) ||        0 > sigaction(SIGUSR2,  &act, 0) ||        0 > sigaction(SIGCHLD,  &act, 0) ||        0 > sigaction(SIGQUIT,  &act, 0)) {        zlog(ZLOG_SYSERROR, "failed to init signals: sigaction()");        return -1;    }    return 0;}// 所有信号共用同一个处理函数static void sig_handler(int signo) /* {{{ */{    static const char sig_chars[NSIG + 1] = {        [SIGTERM] = 'T',        [SIGINT]  = 'I',        [SIGUSR1] = '1',        [SIGUSR2] = '2',        [SIGQUIT] = 'Q',        [SIGCHLD] = 'C'    };    char s;    int saved_errno;    if (fpm_globals.parent_pid != getpid()) {        return;    }    saved_errno = errno;    s = sig_chars[signo];    zend_quiet_write(sp[1], &s, sizeof(s)); // 将信息对应的字节写进管道sp[1]端,此时sp[1]端的读数据会阻塞;数据能够从sp[0]端读取    errno = saved_errno;}

fpm_sockets_init_main

每个worker pool 开启一个socket套接字。

fpm_event_init_main

这里启动master的事件管理器。用于治理IO、定时事件,其中IO事件通过kqueue、epoll、 poll、select等治理,定时事件就是定时器,肯定工夫后触发某个事件。同样,咱们以kqueue的实现为例看下源码。

int fpm_event_init_main(){    // ...    if (module->init(max) < 0) {        zlog(ZLOG_ERROR, "Unable to initialize the event module %s", module->name);        return -1;    }    // ...}// max用于指定kqueue事件数组的大小static int fpm_event_kqueue_init(int max) /* {{{ */{    if (max < 1) {        return 0;    }    kfd = kqueue();    if (kfd < 0) {        zlog(ZLOG_ERROR, "kqueue: unable to initialize");        return -1;    }    kevents = malloc(sizeof(struct kevent) * max);    if (!kevents) {        zlog(ZLOG_ERROR, "epoll: unable to allocate %d events", max);        return -1;    }    memset(kevents, 0, sizeof(struct kevent) * max);    nkevents = max;    return 0;}

fpm_run

fpm_init到此结束,上面进入fpm_run阶段,在这个阶段master过程会依据配置fork出多个子过程而后master过程会进入fpm_event_loop(0)函数,并在这个函数外部死循环,也就是说master过程将不再执行前面的代码,前面的逻辑全副是子过程执行的操作。
master过程在fpm_event_loop里通过管道sp来监听子过程的各个事件,同时也要解决本身产生的一些事件、定时器等工作,来响应的治理子过程。外部的逻辑在介绍事件监听机制时曾经具体说过。

int fpm_run(int *max_requests) /* {{{ */{    struct fpm_worker_pool_s *wp;    /* create initial children in all pools */    for (wp = fpm_worker_all_pools; wp; wp = wp->next) {        int is_parent;        is_parent = fpm_children_create_initial(wp);        if (!is_parent) {            goto run_child;        }    }    /* run event loop forever */    fpm_event_loop(0);run_child: /* only workers reach this point */    fpm_cleanups_run(FPM_CLEANUP_CHILD);    *max_requests = fpm_globals.max_requests;    return fpm_globals.listening_socket;}

子过程解决申请

回到main函数,fpm_run前面的逻辑都是子过程在运行。首先会初始化一个fpm的request构造的变量,而后子过程会阻塞在fcgi_accept_request(request)函数上期待申请。对于fcgi_accept_request函数就是死循环一个socket编程的accept函数来接管申请,并将申请数据全副取出。

...    // 初始化requestrequest = fpm_init_request(fcgi_fd);zend_first_try {    // accept接管申请    while (EXPECTED(fcgi_accept_request(request) >= 0)) {        init_request_info();        fpm_request_info();                if (UNEXPECTED(php_request_startup() == FAILURE)) {            // ...        }        if (UNEXPECTED(fpm_status_handle_request())) {            goto fastcgi_request_done;        }        ...        // 关上配置文件中DOCUMENT_ROOT设置的脚本        if (UNEXPECTED(php_fopen_primary_script(&file_handle) == FAILURE)) {            ...        }                fpm_request_executing();        // 执行脚本        php_execute_script(&file_handle);        ...    }    // 销毁申请request    fcgi_destroy_request(request);    // fcgi退出    fcgi_shutdown();    if (cgi_sapi_module.php_ini_path_override) {        free(cgi_sapi_module.php_ini_path_override);    }    if (cgi_sapi_module.ini_entries) {        free(cgi_sapi_module.ini_entries);    }} zend_catch {    ...} zend_end_try();