前言
分析之前请大家务必了解消息队列的实现
如果不了解请先阅读下:
有赞消息队列设计
去哪儿网消息队列设计
tp5 的消息队列是基于 database redis 和 tp 官方自己实现的 Topthink
本章是围绕 redis 来做分析
存储 key:
key | 类型 | 描述 |
---|---|---|
queues:queueName |
list | 要执行的任务 |
think:queue:restart |
string | 重启队列时间戳 |
queues:queueName:delayed |
zSet | 延迟任务 |
queues:queueName:reserved |
zSet | 执行失败, 等待重新执行 |
执行命令
work 和 listen 的区别在下面会解释
| 命令 | 描述 |
php think queue:work |
监听队列 |
php think queue:listen |
监听队列 |
php think queue:restart |
重启队列 |
php think queue:subscribe |
暂无, 可能是保留的 官方有什么其他想法但是还没实现 |
行为标签
标签 | 描述 |
---|---|
worker_daemon_start |
守护进程开启 |
worker_memory_exceeded |
内存超出 |
worker_queue_restart |
重启守护进程 |
worker_before_process |
任务开始执行之前 |
worker_before_sleep |
任务延迟执行 |
queue_failed |
任务执行失败 |
命令参数
参数 | 默认值 | 可以使用的模式 | 描述 |
---|---|---|---|
queue |
null | work,listen | 要执行的任务名称 |
daemon |
null | work | 以守护进程执行任务 |
delay |
0 | work,listen | 失败后重新执行的时间 |
force |
null | work | 失败后重新执行的时间 |
memory |
128M | work,listen | 限制最大内存 |
sleep |
3 | work,listen | 没有任务的时候等待的时间 |
tries |
0 | work,listen | 任务失败后最大尝试次数 |
模式区别
1: 执行原理不同
work: 单进程的处理模式;
无 daemon 参数 work 进程在处理完下一个消息后直接结束当前进程。当不存在新消息时,会 sleep 一段时间然后退出;
有 daemon 参数 work 进程会循环地处理队列中的消息,直到内存超出参数配置才结束进程。当不存在新消息时,会在每次循环中 sleep 一段时间;
listen: 父进程 + 子进程 的处理模式;
会在所在的父进程会创建一个单次执行模式的 work 子进程, 并通过该 work 子进程来处理队列中的下一个消息, 当这个 work 子进程退出之后;
所在的父进程会监听到该子进程的退出信号, 并重新创建一个新的单次执行的 work 子进程;
2: 退出时机不同
work: 看上面
listen: 所在的父进程正常情况会一直运行, 除非遇到下面两种情况
01: 创建的某个 work 子进程的执行时间超过了 listen 命令行中的 –timeout 参数配置; 此时 work 子进程会被强制结束,listen 所在的父进程也会抛出一个 ProcessTimeoutException 异常并退出;
开发者可以选择捕获该异常, 让父进程继续执行;
02: 所在的父进程因某种原因存在内存泄露,则当父进程本身占用的内存超过了命令行中的 –memory 参数配置时, 父子进程均会退出。正常情况下,listen 进程本身占用的内存是稳定不变的。
3: 性能不同
work: 是在脚本内部做循环,框架脚本在命令执行的初期就已加载完毕;
listen: 是处理完一个任务之后新开一个 work 进程, 此时会重新加载框架脚本;
因此 work 模式的性能会比 listen 模式高。
注意: 当代码有更新时,work 模式下需要手动去执行 php think queue:restart 命令重启队列来使改动生效;而 listen 模式会自动生效, 无需其他操作。
4: 超时控制能力
work: 本质上既不能控制进程自身的运行时间, 也无法限制执行中的任务的执行时间;
listen: 可以限制其创建的 work 子进程的超时时间;
可通过 timeout 参数限制 work 子进程允许运行的最长时间, 超过该时间限制仍未结束的子进程会被强制结束;
expire 和 time 的区别
expire 在配置文件中设置, 指任务的过期时间 这个时间是全局的,影响到所有的 work 进程
timeout 在命令行参数中设置, 指 work 子进程的超时时间, 这个时间只对当前执行的 listen 命令有效,timeout 针对的对象是 work 子进程;
5: 使用场景不同
work 适用场景是:
01: 任务数量较多
02: 性能要求较高
03: 任务的执行时间较短
04: 消费者类中不存在死循环,sleep(),exit() ,die() 等容易导致 bug 的逻辑
listen 适用场景是:
01: 任务数量较少
02: 任务的执行时间较长
03: 任务的执行时间需要有严格限制
公有操作
由于我们是根据 redis 来做分析 所以只需要分析 src/queue/connector/redis.php
01: 首先调用src/Queue.php
中的魔术方法__callStatic
02: 在__callStatic 方法中调用了buildConnector
03: buildConnector 中首先加载配置文件 如果无将是同步执行
04: 根据配置文件去创建连接并且传入配置
在 redis.php 类的构造方法中的操作:
01: 检测 redis 扩展是否安装
02: 合并配置
03: 检测是 redis 扩展还是 pRedis
04: 创建连接对象
发布过程
发布参数
参数名 | 默认值 | 描述 | 可以使用的方法 |
---|---|---|---|
$job | 无 | 要执行任务的类 | push,later |
$data | 空 | 任务数据 | push,later |
$queue | default | 任务名称 | push,later |
$delay | null | 延迟时间 | later |
立即执行
push($job, $data, $queue)
Queue::push(Test::class, ['id' => 1], 'test');
一顿骚操作后返回一个数组 并且序列化后 rPush 到 redis 中 key 为 queue:queueName
数组结构:
[
'job' => $job, // 要执行任务的类
'data' => $data, // 任务数据
'id'=>'xxxxx' // 任务 id
]
写入 redis 并且返回队列 id
至于中间的那顿骚操作太长了就没写
延迟发布
later($delay, $job, $data, $queue)
Queue::later(100, Test::class, ['id' => 1], 'test');
跟上面的差不多
一顿骚操作后返回一个数组 并且序列化后 zAdd 到 redis 中 key 为 queue:queueName:delayed
score 为当前的时间戳 +$delay
执行过程
执行过程有 work 模式和 listen 模式 两种 区别上面已经说了 代码逻辑由于太多等下回分解;
最后讲一下标签的使用
// 守护进程开启
'worker_daemon_start' => [\app\index\behavior\WorkerDaemonStart::class],
// 内存超出
'worker_memory_exceeded' => [\app\index\behavior\WorkerMemoryExceeded::class],
// 重启守护进程
'worker_queue_restart' => [\app\index\behavior\WorkerQueueRestart::class],
// 任务开始执行之前
'worker_before_process' => [\app\index\behavior\WorkerBeforeProcess::class],
// 任务延迟执行
'worker_before_sleep' => [\app\index\behavior\WorkerBeforeSleep::class],
// 任务执行失败
'queue_failed' => [\app\index\behavior\QueueFailed::class]
public function run(Output $output)
{$output->write('<info> 任务执行失败 </info>', true);
}
控制台执行 php think queue:work --queue test --daemon
会在控制台一次输出
守护进程开启
任务延迟执行
失败的处理 如果有任务执行失败或者执行次数达到最大值
会触发 queue_failed
在 app\index\behavior@run
方法里面写失败的逻辑 比如邮件通知 写入日志等
最后我们来说一下如何在其他框架或者项目中给 tp 的项目推送消息队列, 例如两个项目是分开的 另一个使用的却不是 tp5 的框架
<?php
class Index
{
private $redis = null;
public function __construct()
{$this->redis = new Redis();
$this->redis->connect('127.0.0.1', 6379);
$this->redis->select(10);
}
public function push($job, $data, $queue)
{$payload = $this->createPayload($job, $data);
$this->redis->rPush('queues:' . $queue, $payload);
}
public function later($delay, $job, $data, $queue)
{$payload = $this->createPayload($job, $data);
$this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload);
}
private function createPayload($job, $data)
{$payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));
return $this->setMeta($payload, 'attempts', 1);
}
private function setMeta($payload, $key, $value)
{$payload = json_decode($payload, true);
$payload[$key] = $value;
$payload = json_encode($payload);
if (JSON_ERROR_NONE !== json_last_error()) {throw new InvalidArgumentException('Unable to create payload:' . json_last_error_msg());
}
return $payload;
}
private function random(int $length = 16): string
{
$str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
$randomString = '';
for ($i = 0; $i < $length; $i++) {$randomString .= $str[rand(0, strlen($str) - 1)];
}
return $randomString;
}
}
(new Index())->later(10, 'app\index\jobs\Test', ['id' => 1], 'test');