关于php:SWOOLE系列谈谈reactor

51次阅读

共计 14486 个字符,预计需要花费 37 分钟才能阅读完成。

前言

环境阐明

php --ri swoole

swoole

Swoole => enabled
Author => Swoole Team <team@swoole.com>
Version => 4.4.4
Built => Aug 27 2019 11:

php -version
PHP 7.3.7 (cli) (built: Jul  5 2019 12:44:05) (NTS)
Copyright (c) 1997-2018 The PHP Group
Zend Engine v3.3.7, Copyright (c) 1998-2018 Zend Technologies
    with Zend OPcache v7.3.7, Copyright (c) 1999-2018, by Zend Technologies

通过 swoole 如何不必 nginxphp-fpm疾速启动一个 http 服务

<?php
$http = new Swoole\Http\Server("0.0.0.0", 9501);

$http->on('request', function ($request, $response) {$response->header("Content-Type", "text/html; charset=utf-8");
    $response->end("立的 flag, 含着泪, 吃着屎也要做到");
});

$http->start();

cli 下用 php server.php 启动服务

curl http://localhost:9501
立的 flag, 含着泪, 吃着屎也要做到

服务是启动完了,性能如何?
那么问题来了!不卖关子了,间接给个论断吧 swoole 启动的 server 远大于 fpm 下的 http 服务。(能够自行本地 ab 看后果)

php 做 web 的瓶颈在哪?

大家在论坛、贴吧、社区常常看到以下对话
狗蛋 A:php 是世界最好的语言
狗蛋 B: 去你的,php 性能差
狗蛋 C: 哟!php 居然还活着。
狗蛋 D:php 的最终后果都是转向 java

。。。。。
这几年我也看淡了,这里我只想说 php 确实是世界最好的语言。

至于 php 有没有问题,我只能说确实有问题,每个语言都有本人的或多或少的问题,你让那些天天吹 go 好的人,你问问他们用 go CURD 的开心不开心!
php 被讥嘲最多的还是性能问题,你说的一点都没有错,谋求 极致 的性能 php 还真不行(除非无脑堆机器)

lnmp

在具体说 php 性能问题之前,咱们能够再来回顾下 lnmp
L:linux
N:nginx
M:mysql
P:php
让咱们来康康这四个货色,咱们发现不论啥语言基本上都依赖 LinuxNginxMysql。那么这仨必定没锅,问题就落到了php 上。
nginx是没法间接解析 php,那么借助是 php-fpm
整个工作流程如下:

php-fpm是一个多过程单线程的模型,如果一个申请卡了 60s 吗,那么这个 php-fpm 就属于占着茅坑不拉屎待岗状态。那么咱们得出结论,服务器承载的最高并发的短板是 fpm 的数量,那么有人会说无脑设置上线的 fpm 的数量不就好了吗?那么祝贺你能够间接找下一份工作了!
每个 fpm 均匀大略占用内存 20 到 30mb 那么机器反对最高的 fpm 数量公式如下:
fpm 数量 = 机器内存 * 1024M * 0.8 / 30(20)

乘 0.8 次要思考做人留一线,非要榨干服务器内存干啥,影响多不好啊

咱们试着想一个问题 nginx 的是怎么演变的? php 做 web 是否能够跟 nginx 一样
select -> poll -> epoll

没错 swoole 能够!php-fpm 做不到的,然而 swoole 做到了。

reactor

对于 reactor 的基本概念网络一大堆,能够总结以下几点
1.I/ O 对路复用
2. 事件注册、散发、调度
3. 异步非梗塞
基于 reactor 实现的大家做相熟 nginxredis 等(次要我也就晓得这两个)
咱们接下来来康康 swoole 怎么联合reactor

swoole 的 server 的运行流程图

浏览源码之前咱们先看看 server 的运行流程图



来自 https://wiki.swoole.com/wiki/…

摸索假相

根据上述的 demo 我能够看到 new Swoole\Http\Server("0.0.0.0", 9501) 创立一个server

咱们能够定位源码到 swoole_server.ccstatic PHP_METHOD(swoole_server, __construct)

static PHP_METHOD(swoole_server, __construct)
{
    ......
    // 初始化
    zval *zserv = ZEND_THIS;
    char *host;
    size_t host_len = 0;
    zend_long sock_type = SW_SOCK_TCP;
    zend_long serv_port = 0;
    zend_long serv_mode = SW_MODE_PROCESS;

    // 看到木有 只能 cli 执行
    if (!SWOOLE_G(cli))
    {zend_throw_exception_ex(swoole_exception_ce, -1, "%s can only be used in CLI mode", SW_Z_OBJCE_NAME_VAL_P(zserv));
        RETURN_FALSE;
    }

    if (sw_server() != NULL)
    {zend_throw_exception_ex(swoole_exception_ce, -3, "server is running. unable to create %s", SW_Z_OBJCE_NAME_VAL_P(zserv));
        RETURN_FALSE;
    }

    .....
    // serv_mode SWOOLE_BASE、SWOOLE_PROCESS 具体看官网 wiki 解释

    if (serv_mode != SW_MODE_BASE && serv_mode != SW_MODE_PROCESS)
    {php_swoole_fatal_error(E_ERROR, "invalid $mode parameters %d", (int) serv_mode);
        RETURN_FALSE;
    }

    // 申请内存
    serv = (swServer *) sw_malloc(sizeof(swServer));
    if (!serv)
    {zend_throw_exception_ex(swoole_exception_ce, errno, "malloc(%ld) failed", sizeof(swServer));
        RETURN_FALSE;
    }

    swServer_init(serv);
   
    ....
}

在构造方法里 咱们并没有看到 reactor 相干的代码,咱们持续往下追 $http->start() 定位到
static PHP_METHOD(swoole_server, start)

static PHP_METHOD(swoole_server, start)
{
    zval *zserv = ZEND_THIS;
    // 读一读什么叫规范的命名 获取 server 并且 检测这个服务 如许通熟易懂
    swServer *serv = php_swoole_server_get_and_check_server(zserv);

    if (serv->gs->start > 0)
    {php_swoole_fatal_error(E_WARNING, "server is running, unable to execute %s->start", SW_Z_OBJCE_NAME_VAL_P(zserv));
        RETURN_FALSE;
    }
    if (serv->gs->shutdown > 0)
    {php_swoole_fatal_error(E_WARNING, "server have been shutdown, unable to execute %s->start", SW_Z_OBJCE_NAME_VAL_P(zserv));
        RETURN_FALSE;
    }

    if (SwooleTG.reactor)
    {php_swoole_fatal_error(E_WARNING, "eventLoop has already been created, unable to start %s", SW_Z_OBJCE_NAME_VAL_P(zserv));
        RETURN_FALSE;
    }

    .....

    // swoole 服务启动前的前置工作
    php_swoole_server_before_start(serv, zserv);
    // 启动 sever
    if (swServer_start(serv) < 0)
    {php_swoole_fatal_error(E_ERROR, "failed to start server. Error: %s", sw_error);
    }

    RETURN_TRUE;
}

依照我多年的搬屎山的直觉来说,再启动之前必定须要做筹备工作,那么 reactor 的初始化必定在php_swoole_server_before_start

void php_swoole_server_before_start(swServer *serv, zval *zobject)
{
    /**
     * create swoole server
     */
    if (swServer_create(serv) < 0)
    {php_swoole_fatal_error(E_ERROR, "failed to create the server. Error: %s", sw_error);
        return;
    }
    .....
int swServer_create(swServer *serv)
{
    serv->factory.ptr = serv;

    serv->session_list = (swSession *) sw_shm_calloc(SW_SESSION_LIST_SIZE, sizeof(swSession));
    if (serv->session_list == NULL)
    {swError("sw_shm_calloc(%ld) for session_list failed", SW_SESSION_LIST_SIZE * sizeof(swSession));
        return SW_ERR;
    }

    if (serv->enable_static_handler && serv->locations == nullptr)
    {serv->locations = new std::unordered_set<std::string>;}

    if (serv->factory_mode == SW_MODE_BASE)
    {return swReactorProcess_create(serv);
    }
    else
    {return swReactorThread_create(serv);
    }
}

能够发现依据不同的执行模式 创立 reactor 也是不同,不过这次咱们只看 swReactorProcess_create 开始 剖析 瞎逼逼这块代码

reactor

int swReactorProcess_create(swServer *serv)
{
    serv->reactor_num = serv->worker_num;
    serv->connection_list = (swConnection *) sw_calloc(serv->max_connection, sizeof(swConnection));
    if (serv->connection_list == NULL)
    {swSysWarn("calloc[2](%d) failed", (int)(serv->max_connection * sizeof(swConnection)));
        return SW_ERR;
    }
    //create factry object
    if (swFactory_create(&(serv->factory)) < 0)
    {swError("create factory failed");
        return SW_ERR;
    }
    serv->factory.finish = swReactorProcess_send2client;
    return SW_OK;
}

其实这个函数只是将 swserver 这个构造体初始化对应的属性和回调函数
1.reactor_num
2.conection_list 初始化好空间
3.finish 的完结回调函数
4.factory

int swFactory_create(swFactory *factory)
{
    factory->dispatch = swFactory_dispatch;
    factory->finish = swFactory_finish;
    factory->start = swFactory_start;
    factory->shutdown = swFactory_shutdown;
    factory->end = swFactory_end;
    factory->notify = swFactory_notify;
    factory->free = swFactory_free;
    return SW_OK;
}

再初始化 swServer 后就可以看swServer_start 参数是之前初始化结构好的swServer

int swServer_start(swServer *serv)
{
    // factory 能够定位到 swFactory_create
    swFactory *factory = &serv->factory;
    int ret;

    // 启动前的检测 判断不同 mode 下的参数 和 php 上游回调函数的是否结构 比方 onTask 等。。ret = swServer_start_check(serv);
    if (ret < 0)
    {return SW_ERR;}
    // 检测钩子
    if (SwooleG.hooks[SW_GLOBAL_HOOK_BEFORE_SERVER_START])
    {swoole_call_hook(SW_GLOBAL_HOOK_BEFORE_SERVER_START, serv);
    }
    // sw_atomic_cmp_set 此处了解成一个锁 也就是同时工夫只能存在一个服务 
    //cannot start 2 servers at the same time, please use process->exec.
    if (!sw_atomic_cmp_set(&serv->gs->start, 0, 1))
    {swoole_error_log(SW_LOG_ERROR, SW_ERROR_SERVER_ONLY_START_ONE, "must only start one server");
        return SW_ERR;
    }
    // 这块规范输入 大家应该都懂 跳过跳过
    //run as daemon
    if (serv->daemonize > 0)
    {
        /**
         * redirect STDOUT to log file
         */
        if (SwooleG.log_fd > STDOUT_FILENO)
        {swoole_redirect_stdout(SwooleG.log_fd);
        }
        /**
         * redirect STDOUT_FILENO/STDERR_FILENO to /dev/null
         */
        else
        {serv->null_fd = open("/dev/null", O_WRONLY);
            if (serv->null_fd > 0)
            {swoole_redirect_stdout(serv->null_fd);
            }
            else
            {swSysWarn("open(/dev/null) failed");
            }
        }

        if (swoole_daemon(0, 1) < 0)
        {return SW_ERR;}
    }

    //master pid
    // 获取对应的 master 过程和启动工夫
    serv->gs->master_pid = getpid();
    serv->stats->start_time = time(NULL);

    /**
     * init method
     */
     // 持续初始化对于 tcp 相干函数
    serv->send = swServer_tcp_send;
    serv->sendwait = swServer_tcp_sendwait;
    serv->sendfile = swServer_tcp_sendfile;
    serv->close = swServer_tcp_close;
    serv->notify = swServer_tcp_notify;
    serv->feedback = swServer_tcp_feedback;
    // 申请 worker 的对应的内存空间
    serv->workers = (swWorker *) SwooleG.memory_pool->alloc(SwooleG.memory_pool, serv->worker_num * sizeof(swWorker));
    if (serv->workers == NULL)
    {swSysWarn("gmalloc[server->workers] failed");
        return SW_ERR;
    }

    if (swMutex_create(&serv->lock, 0) < 0)
    {return SW_ERR;}

    /**
     * store to swProcessPool object
     */
    
    serv->gs->event_workers.ptr = serv;
    serv->gs->event_workers.workers = serv->workers;
    serv->gs->event_workers.worker_num = serv->worker_num;
    serv->gs->event_workers.use_msgqueue = 0;

    uint32_t i;
    for (i = 0; i < serv->worker_num; i++)
    {serv->gs->event_workers.workers[i].pool = &serv->gs->event_workers;
        serv->gs->event_workers.workers[i].id = i;
        serv->gs->event_workers.workers[i].type = SW_PROCESS_WORKER;
    }

    /*
     * For swoole_server->taskwait, create notify pipe and result shared memory.
     */
    if (serv->task_worker_num > 0 && serv->worker_num > 0)
    {serv->task_result = (swEventData *) sw_shm_calloc(serv->worker_num, sizeof(swEventData));
        if (!serv->task_result)
        {swWarn("malloc[serv->task_result] failed");
            return SW_ERR;
        }
        serv->task_notify = (swPipe *) sw_calloc(serv->worker_num, sizeof(swPipe));
        if (!serv->task_notify)
        {swWarn("malloc[serv->task_notify] failed");
            sw_shm_free(serv->task_result);
            return SW_ERR;
        }
        for (i = 0; i < serv->worker_num; i++)
        {if (swPipeNotify_auto(&serv->task_notify[i], 1, 0))
            {sw_shm_free(serv->task_result);
                sw_free(serv->task_notify);
                return SW_ERR;
            }
        }
    }

    /**
     * user worker process
     */
    if (serv->user_worker_list)
    {
        i = 0;
        for (auto worker : *serv->user_worker_list)
        {
        // 此处能够看看 worker id 的生成机制 有没有课代表解释下 要两个 woker_num
            worker->id = serv->worker_num + serv->task_worker_num + i;
            i++;
        }
    }
    serv->running = 1;
    //factory start
    if (factory->start(factory) < 0)
    {return SW_ERR;}
    // 注册信号机制
    swServer_signal_init(serv);

    //write PID file
    if (serv->pid_file)
    {ret = sw_snprintf(SwooleTG.buffer_stack->str, SwooleTG.buffer_stack->size, "%d", getpid());
        swoole_file_put_contents(serv->pid_file, SwooleTG.buffer_stack->str, ret);
    }
    if (serv->factory_mode == SW_MODE_BASE)
    {ret = swReactorProcess_start(serv);
    }
    else
    {ret = swReactorThread_start(serv);
    }
    //failed to start
    if (ret < 0)
    {return SW_ERR;}
    swServer_destory(serv);
    //remove PID file
    if (serv->pid_file)
    {unlink(serv->pid_file);
    }
    return SW_OK;
}

代码很长,咱们概括为几件事件

  1. 启动检测
  2. 查看钩子 & 执行钩子
  3. 锁判断
  4. 如果 daemon 启动更改规范输入
  5. 初始化函数和初始化根本的信息(pid、工夫、woker 的内存等)
  6. 创立管道、共享内存
  7. 创立信号处理机制
  8. 创立 & 启动 woker、task、reactor
  9. 撒花完结

所以咱们就独自看看8

    if (serv->factory_mode == SW_MODE_BASE)
    {ret = swReactorProcess_start(serv);
    }
    else
    {ret = swReactorThread_start(serv);
    }

同样咱们还是只看SW_MODE_BASE

int swReactorProcess_start(swServer *serv)
{
    // 此处须要次要下 SW_MODE_BASE 下是单线程模式
    serv->single_thread = 1;

    // 监听 tcp
    if (serv->have_stream_sock == 1)
    {for (auto ls : *serv->listen_list)
        {
        // 过滤 udp
            if (swSocket_is_dgram(ls->type))
            {continue;}
            // 复用端口的解决 
#ifdef HAVE_REUSEPORT
            if (serv->enable_reuse_port)
            {if (close(ls->socket->fd) < 0)
                {swSysWarn("close(%d) failed", ls->socket->fd);
                }
                continue;
            }
            else
#endif
            {
                // 监听 socket
                if (swPort_listen(ls) < 0)
                {return SW_ERR;}
            }
        }
    }

    swProcessPool *pool = &serv->gs->event_workers;
    if (swProcessPool_create(pool, serv->worker_num, 0, SW_IPC_UNIXSOCK) < 0)
    {return SW_ERR;}
    swProcessPool_set_max_request(pool, serv->max_request, serv->max_request_grace);

    /**
     * store to swProcessPool object
     */
    serv->gs->event_workers.ptr = serv;
    serv->gs->event_workers.max_wait_time = serv->max_wait_time;
    serv->gs->event_workers.use_msgqueue = 0;
    serv->gs->event_workers.main_loop = swReactorProcess_loop;
    serv->gs->event_workers.onWorkerNotFound = swManager_wait_other_worker;

    uint32_t i;
    for (i = 0; i < serv->worker_num; i++)
    {serv->gs->event_workers.workers[i].pool = &serv->gs->event_workers;
        serv->gs->event_workers.workers[i].id = i;
        serv->gs->event_workers.workers[i].type = SW_PROCESS_WORKER;
    }

    //single worker
    if (swServer_is_single(serv))
    {return swReactorProcess_loop(&serv->gs->event_workers, &serv->gs->event_workers.workers[0]);
    }

    for (i = 0; i < serv->worker_num; i++)
    {if (swServer_worker_create(serv, &serv->gs->event_workers.workers[i]) < 0)
        {return SW_ERR;}
    }

    //task workers
    if (serv->task_worker_num > 0)
    {if (swServer_create_task_workers(serv) < 0)
        {return SW_ERR;}
        swTaskWorker_init(serv);
        if (swProcessPool_start(&serv->gs->task_workers) < 0)
        {return SW_ERR;}
    }

    /**
     * create user worker process
     */
    if (serv->user_worker_list)
    {serv->user_workers = (swWorker *) sw_malloc(serv->user_worker_num * sizeof(swWorker));
        if (serv->user_workers == NULL)
        {swSysWarn("gmalloc[server->user_workers] failed");
            return SW_ERR;
        }
        for (auto worker : *serv->user_worker_list)
        {
            /**
             * store the pipe object
             */
            if (worker->pipe_object)
            {swServer_store_pipe_fd(serv, worker->pipe_object);
            }
            swManager_spawn_user_worker(serv, worker);
        }
    }

    /**
     * manager process is the same as the master process
     */
    SwooleG.pid = serv->gs->manager_pid = getpid();
    SwooleG.process_type = SW_PROCESS_MANAGER;

    /**
     * manager process can not use signalfd
     */
    SwooleG.use_signalfd = 0;

    swProcessPool_start(&serv->gs->event_workers);
    swServer_signal_init(serv);

    if (serv->onStart)
    {swWarn("The onStart event with SWOOLE_BASE is deprecated");
        serv->onStart(serv);
    }

    if (serv->onManagerStart)
    {serv->onManagerStart(serv);
    }

    swProcessPool_wait(&serv->gs->event_workers);
    swProcessPool_shutdown(&serv->gs->event_workers);

    swManager_kill_user_workers(serv);

    if (serv->onManagerStop)
    {serv->onManagerStop(serv);
    }

    return SW_OK;
}

持续总结该函做的事件

  1. 监听 tcp 端口 & 端口复用
  2. 创立 woker 过程
  3. 创立 task 过程
  4. 信号处理
  5. 期待 wokers 过程的完结
  6. wokers 过程技术的后 shuntdown 解决
  7. onManagerStop 的解决
  8. 撒花完结

要害 server 的启动代码完了,必定也是雨里雾里的,别问我,我也是,咱们其实想看 reactorswoole的 server 服务承当什么角色 咱们看这些干嘛

其实只有看到完启动的代码能力晓得一些咱们想要的 如果依据 epoll 的模型外围就在notify,咱们能够回过头再看下总结下

1. 对于 reactor 的线程创立

int swReactor_create(swReactor *reactor, int max_event)
{
    int ret;
    bzero(reactor, sizeof(swReactor));

#ifdef HAVE_EPOLL
    ret = swReactorEpoll_create(reactor, max_event);
#elif defined(HAVE_KQUEUE)
    ret = swReactorKqueue_create(reactor, max_event);
#elif defined(HAVE_POLL)
    ret = swReactorPoll_create(reactor, max_event);
#else
    ret = swReactorSelect_create(reactor);
#endif

    reactor->running = 1;

    reactor->onFinish = reactor_finish;
    reactor->onTimeout = reactor_timeout;
    reactor->is_empty = swReactor_empty;
    reactor->can_exit = SwooleG.reactor_can_exit;

    reactor->write = swReactor_write;
    reactor->close = swReactor_close;

    reactor->defer = defer_task_add;
    reactor->defer_tasks = nullptr;

    reactor->default_write_handler = swReactor_onWrite;

    Socket::init_reactor(reactor);
    System::init_reactor(reactor);
    swClient_init_reactor(reactor);

    if (SwooleG.hooks[SW_GLOBAL_HOOK_ON_REACTOR_CREATE])
    {swoole_call_hook(SW_GLOBAL_HOOK_ON_REACTOR_CREATE, reactor);
    }

    return ret;
}

2. 对于 tcp 的函数

    serv->send = swServer_tcp_send;
    serv->sendwait = swServer_tcp_sendwait;
    serv->sendfile = swServer_tcp_sendfile;
    serv->close = swServer_tcp_close;
    serv->notify = swServer_tcp_notify;
    serv->feedback = swServer_tcp_feedback;

3. 对于swServer_tcp_notify

/**
 * use in master process
 */
static int swServer_tcp_notify(swServer *serv, swConnection *conn, int event)
{swDataHead notify_event = {};
    notify_event.type = event;
    notify_event.reactor_id = conn->reactor_id;
    notify_event.fd = conn->fd;
    notify_event.server_fd = conn->server_fd;
    return serv->factory.notify(&serv->factory, &notify_event);
}

4. 对于swFactory_create

int swFactory_create(swFactory *factory)
{
    factory->dispatch = swFactory_dispatch;
    factory->finish = swFactory_finish;
    factory->start = swFactory_start;
    factory->shutdown = swFactory_shutdown;
    factory->end = swFactory_end;
    factory->notify = swFactory_notify;
    factory->free = swFactory_free;
    return SW_OK;
}

5. 对于 notify

/**
 * only stream fd
 */
static int swFactory_notify(swFactory *factory, swDataHead *info)
{swServer *serv = (swServer *) factory->ptr;
    swConnection *conn = swServer_connection_get(serv, info->fd);
    if (conn == NULL || conn->active == 0)
    {swWarn("dispatch[type=%d] failed, connection#%d is not active", info->type, info->fd);
        return SW_ERR;
    }
    //server active close, discard data.
    if (conn->closed)
    {swWarn("dispatch[type=%d] failed, connection#%d is closed by server", info->type, info->fd);
        return SW_OK;
    }
    //converted fd to session_id
    info->fd = conn->session_id;
    info->server_fd = conn->server_fd;
    info->flags = SW_EVENT_DATA_NORMAL;

    return swWorker_onTask(factory, (swEventData *) info);
}

6. 对于onReceiveonTaskonFinish

能够开始记笔记了

Reactor

1.reactor 是线程状态,能够单线程也能够多线程 取决 woker 数量
2. 负责保护客户端TCP 连贯、解决网络IO、解决协定、收发数据
3. 不执行任何 PHP 代码

Worker

1.worker 是过程状态,能够单过程也能够多过程
2. 承受由Reactor 线程投递的申请数据包,并执行 PHP 回调函数解决数据
3. 生成响应数据并发给Reactor 线程,由 Reactor 线程发送给 TCP 客户端

总结就是再 swoolereactor就是 nginx, worker 就是 php-fpm,不同的有了协程后 woker 能够是 异步 的,那么实践上承载的并发是无下限的。(有人开喷了:你当文件句柄关上限度是吃素的吗!)

写给最初

swoole的源码的剖析的文章可能要先缓缓,因为越看发现越吃力,还是因为本人底子不够好。

BUT 立的 flag 还是要实现。接下来可能先从 redis 动手,因为书籍比拟全面(copy 更加的顺畅丝滑)。
看完上述给个大家一个小小的问题
写过 swoole 的小伙伴都晓得 为什么 mysqlredis 连贯须要在 onStart 的时候进行解决?

正文完
 0