共计 1740 个字符,预计需要花费 5 分钟才能阅读完成。
前言
技术点:restful 微信开发,支付,订阅发布,rpc。
不知道自己会在什么时候中断,先继续。
1. 订阅推送
在高并发探测系列已经实战过消息队列。与发布订阅原理相同,实时功能需要 cli 在线服务,否则就要在页面做定时刷新。已知有 redis 发布订阅、rabbitmq 发布订阅、swoole 发布订阅,先看下 swoole 的订阅。
a.swoole 发布订阅
参考官方《订阅模式》,swoole 的订阅是基于 redis 的:
发布者 pub.php:
go(function (){$redis = new Swoole\Coroutine\Redis(); | |
$redis->connect('172.1.13.11', 6379); | |
$redis->auth('123456'); | |
$channels = ['channel1', 'channel2', 'channel3']; | |
swoole_timer_tick(1000, function () use ($redis, $channels) {$channel = $channels[array_rand($channels)]; | |
$news = ['time'=>date("Y-m-d H:i:s"), | |
'document'=> "这是".date("Y-m-d", strtotime('+ 1 days'))."的新闻内容" | |
]; | |
$redis->publish($channel, json_encode($news, JSON_UNESCAPED_UNICODE)); | |
print_r("-- 发布成功:".$channel .PHP_EOL); | |
}); | |
}); |
订阅者 sub.php
go(function (){$redis = new Swoole\Coroutine\Redis(); | |
$redis->connect('172.1.13.11', 6379); | |
$redis->auth('123456'); | |
function counter (){ | |
$c = 1; // 准备静态化 | |
return function () use (&$c) {return $c++;}; | |
} | |
$counter = counter(); // 实例计数器 | |
if ($redis->subscribe(['channel1', 'channel2', 'channel3'])) { // 或者使用 psubscribe | |
while ($msg = $redis->recv()) { | |
// msg 是一个数组, 包含以下信息 | |
// $type # 返回值的类型:显示订阅成功 | |
// $name # 订阅的频道名字 或 来源频道名字 | |
// $info # 目前已订阅的频道数量 或 信息内容 | |
list($type, $name, $info) = $msg; | |
if ($type == 'subscribe'){ // 或 psubscribe | |
// 频道订阅成功消息,订阅几个频道就有几条:首次连接 | |
var_dump("频道订阅成功消息", $msg); | |
} else if ($type == 'unsubscribe' && $info == 0) { // 或 punsubscribe | |
break; // 收到取消订阅消息,并且剩余订阅的频道数为 0,不再接收,结束循环 | |
} else if ($type == 'message') { // 若为 psubscribe,此处为 pmessage | |
$rev_times = $counter(); | |
// 标记退订 | |
// 打印来源频道名字、消息 | |
print_r(['rev_times'=>$rev_times, 'name'=>$name, 'info'=>$info] ); | |
// 处理消息 | |
// balabalaba.... | |
if ($rev_times%10==0){ // 测试退订,每 10 个退订一个 channel | |
$status = $redis->unsubscribe($msg); // 继续 recv 等待退订完成 | |
var_dump("准备退订 $name", $status); | |
} | |
} | |
} | |
} | |
}); |
cli 界面运行。
在 swoole 里的协程操作,使用 CoroutineHttpClient、CoroutineHttpServer、CoroutineHttp2Client、CoroutineRedis、CoroutineSocket、CoroutineMySQL、CoroutinePostgreSQL,二次封装的类操作。
正文完