共计 3907 个字符,预计需要花费 10 分钟才能阅读完成。
概述
这是关于 Swoole 学习的第二篇文章:Swoole Task 的应用。
- 第一篇:Swoole Timer 的应用
Swoole 异步 Task,主要实现调用异步任务的执行。
常用的场景:异步支付处理、异步订单处理、异步日志处理、异步发送邮件 / 短信等。
Swoole 的实现方式是 worker 进程处理数据请求,分配给 task 进程执行。
官方介绍:
task 底层使用 Unix Socket 管道通信,是全内存的,没有 IO 消耗。单进程读写性能可达 100 万 /s,不同的进程使用不同的管道通信,可以最大化利用多核。
本地版本:PHP 7.2.6、Swoole 4.3.1。
不多说,先看效果图:
代码
server.php
<?php
class Server
{
private $serv;
public function __construct() {$this->serv = new swoole_server('0.0.0.0', 9501);
$this->serv->set([
'worker_num' => 2, // 开启 2 个 worker 进程
'max_request' => 4, // 每个 worker 进程 max_request 设置为 4 次
'task_worker_num' => 4, // 开启 4 个 task 进程
'dispatch_mode' => 2, // 数据包分发策略 - 固定模式
]);
$this->serv->on('Start', [$this, 'onStart']);
$this->serv->on('Connect', [$this, 'onConnect']);
$this->serv->on("Receive", [$this, 'onReceive']);
$this->serv->on("Close", [$this, 'onClose']);
$this->serv->on("Task", [$this, 'onTask']);
$this->serv->on("Finish", [$this, 'onFinish']);
$this->serv->start();}
public function onStart($serv) {
echo "#### onStart ####".PHP_EOL;
echo "SWOOLE".SWOOLE_VERSION . "服务已启动".PHP_EOL;
echo "master_pid: {$serv->master_pid}".PHP_EOL;
echo "manager_pid: {$serv->manager_pid}".PHP_EOL;
echo "########".PHP_EOL.PHP_EOL;
}
public function onConnect($serv, $fd) {
echo "#### onConnect ####".PHP_EOL;
echo "客户端:".$fd."已连接".PHP_EOL;
echo "########".PHP_EOL.PHP_EOL;
}
public function onReceive($serv, $fd, $from_id, $data) {
echo "#### onReceive ####".PHP_EOL;
echo "worker_pid: {$serv->worker_pid}".PHP_EOL;
echo "客户端:{$fd} 发来的 Email:{$data}".PHP_EOL;
$param = [
'fd' => $fd,
'email' => $data
];
$rs = $serv->task(json_encode($param));
if ($rs === false) {echo "任务分配失败 Task".$rs.PHP_EOL;} else {echo "任务分配成功 Task".$rs.PHP_EOL;}
echo "########".PHP_EOL.PHP_EOL;
}
public function onTask($serv, $task_id, $from_id, $data) {
echo "#### onTask ####".PHP_EOL;
echo "#{$serv->worker_id} onTask: [PID={$serv->worker_pid}]: task_id={$task_id}".PHP_EOL;
// 业务代码
for($i = 1 ; $i <= 5 ; $i ++) {sleep(2);
echo "Task {$task_id} 已完成了 {$i}/5 的任务".PHP_EOL;
}
$data_arr = json_decode($data, true);
$serv->send($data_arr['fd'] , 'Email:'.$data_arr['email'].', 发送成功');
$serv->finish($data);
echo "########".PHP_EOL.PHP_EOL;
}
public function onFinish($serv,$task_id, $data) {
echo "#### onFinish ####".PHP_EOL;
echo "Task {$task_id} 已完成".PHP_EOL;
echo "########".PHP_EOL.PHP_EOL;
}
public function onClose($serv, $fd) {echo "Client Close.".PHP_EOL;}
}
$server = new Server();
client.php
<?php
class Client
{
private $client;
public function __construct() {$this->client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
$this->client->on('Connect', [$this, 'onConnect']);
$this->client->on('Receive', [$this, 'onReceive']);
$this->client->on('Close', [$this, 'onClose']);
$this->client->on('Error', [$this, 'onError']);
}
public function connect() {if(!$fp = $this->client->connect("127.0.0.1", 9501 , 1)) {echo "Error: {$fp->errMsg}[{$fp->errCode}]".PHP_EOL;
return;
}
}
public function onConnect($cli) {fwrite(STDOUT, "输入 Email:");
swoole_event_add(STDIN, function() {fwrite(STDOUT, "输入 Email:");
$msg = trim(fgets(STDIN));
$this->send($msg);
});
}
public function onReceive($cli, $data) {echo PHP_EOL."Received:".$data.PHP_EOL;}
public function send($data) {$this->client->send($data);
}
public function onClose($cli) {echo "Client close connection".PHP_EOL;}
public function onError() {}
}
$client = new Client();
$client->connect();
小结
一、上面的配置总共开启了几个进程?
总共 8 个进程(1 个 master 进程、1 个 manager 进程、4 个 task 进程、2 个 worker 进程)
重新运行的可能与上图进程号不一致:
master 进程:22481
manager 进程:22485
task 进程:22488、22489、22490、22491
worker 进程:22492、22493
参考官方提供的进程图:
二、为什么执行了 5 次后,worker 进程号发生了改变?
因为我们设了置 worker 进程的 max_request=4,一个 worker 进程在完成最大请求次数任务后将自动退出,进程退出会释放所有的内存和资源,这样的机制主要是解决 PHP 进程内存溢出的问题。
三、当 task 执行任务异常,我们 kill 一个 task 进程,会再新增一个吗?
会。
四、如何设置 task_worker_num?
最大值不得超过 SWOOLE_CPU_NUM * 1000。
查看本机 CPU 核数:
echo "swoole_cpu_num:".swoole_cpu_num().PHP_EOL;
根据项目的任务量决定的,比如:1 秒会产生 200 个任务,执行每个任务需要 500ms。
想在 1s 中执行完成 200 个任务,需要 100 个 task 进程。
100 = 200/(1/0.5)
五、如何设置 worker_num?
默认设置为本机的 CPU 核数,最大不得超过 SWOOLE_CPU_NUM * 1000。
比如:1 个请求耗时 10ms,要提供 1000QPS 的处理能力,那就必须配置 10 个进程。
10 = 0.01*1000
假设每个进程占用 40M 内存,10 个进程就需要占用 400M 的内存。
扩展
- Server->taskwait
- Server->taskWaitMulti
- Server->taskCo
参考文档
- https://wiki.swoole.com/wiki/…
推荐阅读
- 系统的讲解 – SSO 单点登录
- 系统的讲解 – PHP WEB 安全防御
- 系统的讲解 – PHP 缓存技术
- 系统的讲解 – PHP 接口签名验证
- 系统的讲解 – PHP 浮点数高精度运算
本文欢迎转发,转发请注明作者和出处,谢谢!