前言
截止撰稿之时,ThinkPHP6.0 已经进入 RC3 阶段。按 ThinkPHP 作者流年的计划,ThinkPHP6.0 预计将会在今年秋季择机发布正式版,RC3 将可能是正式版之前的最后一个或者倒数第二个 RC 版本,这也就意味着,ThinkPHP6.0 已经日趋完善稳定,是一个值得尝试的候选版本了。
故此,随之有着重大变化的 think-queue 扩展也已经升级发布了接近正式版的 3.0.2。那么已经用 think-queue 上了生产环境的小伙伴们肯定很想知道,think-queue
3.0 是否已经可以尝鲜了?本期小编将带大家用 think-queue 3.0.2 的定时队列来打造一个定时扣费系统来告诉你答案。
PS:本人文中如有错误或者不失之处,还请海涵,欢迎各位大牛随时批评指导。
准备工作
使用 think-queue 队列,必须具备以下条件:
1:一个基于 liunx 系统的 server,windows 亦可但不推荐;
2:redis 服务端。建议 5.0 版本,
参考文章:https://www.jianshu.com/p/fe6…;
新手推荐使用宝塔面板,https://www.bt.cn/download/li… , 以省去精力和减少配置编译造成的服务端各种常见问题。
3:composer 包管理器
可参考:https://www.phpcomposer.com;
推荐镜像源:https://mirrors.aliyun.com/co…,在此感谢阿里云的贡献。
4:一个 redis 客户端,Windows 开发者推荐在以下项目里选择使用
https://github.com/uglide/Red…;
https://github.com/qishibo/An…;
https://github.com/cinience/R…;
以及 windows 自带的 Windows PowerShell。
对于 think-queue 不是很熟悉和了解的,请务必先阅读下面这边教程后再回来继续阅读:
https://github.com/coolseven/…
安装 thinkhp 和 think-queue
1: 首选创建 thinkphp6.0 新的项目,参考 https://www.kancloud.cn/manua…
composer create-project topthink/think=6.0.x-dev tp
2: 使用 think-queue 3.0.2
https://packagist.org/package…
composer require topthink/think-queue
也可以项目根目录下 composer.json 文件添加配置项
"require": {
"php": ">=7.1.0",
"topthink/framework": "6.0.*-dev",
"topthink/think-view": "^1.0",
"symfony/var-dumper":"^4.2",
"topthink/think-queue": "^3.0"
},
3: 检查是否安装成功
在项目根目录下运行
php think
看到
就表示 think-queue 已经安装成功。
接下来就要进行下一步:创建项目的数据库,结构我已经准备好了
用户会员表
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (`id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT,
`username` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT ''COMMENT' 用户名 ',
`nickname` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 昵称 ',
`realname` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 姓名 ',
`password` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT ''COMMENT' 密码 ',
`create_time` int(10) UNSIGNED NOT NULL DEFAULT 0 COMMENT '注册时间',
`update_time` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '更新时间',
`login_time` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '登陆时间',
`login_count` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '登陆次数',
`login_ip` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 登录 ip',
`vip` tinyint(2) UNSIGNED NULL DEFAULT 0 COMMENT 'vip 等级',
`vip_join` int(10) UNSIGNED NULL DEFAULT 0 COMMENT 'vip 加入时间',
`vip_time` int(10) UNSIGNED NULL DEFAULT 0 COMMENT 'vip 过期时间',
`ip` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 注册 ip',
`status` tinyint(1) UNSIGNED NOT NULL DEFAULT 1 COMMENT '状态 1: 正常 2: 禁用 3: 临时',
`lock_uid` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '封禁人',
`lock_time` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '封禁时间',
`lock_tips` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 封禁原因 ',
`back_time` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '解封时间',
`group` tinyint(1) UNSIGNED NOT NULL DEFAULT 1 COMMENT '身份 1: 普通 2: 管理员 3: 代理 4: 合作方 5: 渠道商',
`group_time` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '身份过期',
`safe_level` tinyint(1) UNSIGNED NOT NULL DEFAULT 1 COMMENT '安全等级',
`safe_code` char(8) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 密保安全码 ',
`safe_token` tinyint(1) UNSIGNED NULL DEFAULT 0 COMMENT '密保令牌 1: 未 2: 是',
`safe_device` tinyint(1) UNSIGNED NOT NULL DEFAULT 1 COMMENT '密保设备 1: 未 2: 是',
`safe_phone` bigint(11) UNSIGNED NULL DEFAULT 0 COMMENT '密保手机',
`safe_email` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 密保邮箱 ',
`verify_code` char(8) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 找回校验码 ',
`verify_lock` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '找回锁定',
`verify_time` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '校验码过期',
`money` decimal(10, 3) UNSIGNED NOT NULL DEFAULT 0.000 COMMENT '余额',
`give` decimal(10, 3) UNSIGNED NOT NULL DEFAULT 0.000 COMMENT '增送',
`brokerage` decimal(10, 3) UNSIGNED NOT NULL DEFAULT 0.000 COMMENT '佣金',
`server` int(10) UNSIGNED NOT NULL DEFAULT 10 COMMENT '服务器',
`gold` tinyint(8) UNSIGNED NOT NULL DEFAULT 0 COMMENT '金币',
`credits` tinyint(8) UNSIGNED NOT NULL DEFAULT 0 COMMENT '积分',
`union_id` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 微信 ',
`unionid` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT'QQ',
`inviters` tinyint(5) UNSIGNED NOT NULL DEFAULT 0 COMMENT '邀请次数',
`inviter_id` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '邀请人',
PRIMARY KEY (`id`) USING BTREE,
INDEX `id`(`id`, `username`, `realname`, `vip`, `group`, `money`, `create_time`) USING BTREE COMMENT '联合索引'
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '用户表' ROW_FORMAT = Dynamic;
日志表
DROP TABLE IF EXISTS `logs`;
CREATE TABLE `logs` (`id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT,
`uid` int(10) UNSIGNED NOT NULL DEFAULT 0 COMMENT '用户 id',
`subid` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '主机 id',
`op_id` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '操作人 id',
`type` tinyint(2) UNSIGNED NOT NULL DEFAULT 1 COMMENT '类型 1: 会员 2: 管理 3: 系统 5: 财务',
`time` int(10) UNSIGNED NOT NULL DEFAULT 0 COMMENT '时间',
`ip` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT ''COMMENT' 登录 ip',
`content` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '0' COMMENT '日志内容',
PRIMARY KEY (`id`) USING BTREE,
INDEX `idx_uid_type_time`(`uid`, `type`, `time`, `op_id`, `subid`) USING BTREE COMMENT '联合索引'
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '日志表' ROW_FORMAT = Dynamic;
用户主机表
DROP TABLE IF EXISTS `server`;
CREATE TABLE `server` (`id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT,
`uid` int(10) UNSIGNED NOT NULL COMMENT '所属用户',
`status` int(2) UNSIGNED NOT NULL DEFAULT 2 COMMENT '状态 1: 已停止 2: 运行中 3: 已过期 4: 需续费 5: 已删除 6: 异常',
`time` int(10) UNSIGNED NOT NULL COMMENT '创建时间',
`op` int(4) UNSIGNED NULL DEFAULT 0 COMMENT '操作人',
`op_time` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '操作时间',
`subid` bigint(11) UNSIGNED NULL DEFAULT 0 COMMENT '实例 id',
`ip_address` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT'IPv4',
`password` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT ''COMMENT'root 密码 ',
`snapshotid` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT ''COMMENT' 快照 id',
`port` int(10) UNSIGNED NULL DEFAULT 22 COMMENT '端口',
`ips` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 高防 IP',
`enable_ipv6` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT'IPv6',
`ipv6` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT'IPv6 地址 ',
`dcid` int(5) UNSIGNED NULL DEFAULT 0 COMMENT '位置',
`osid` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 操作系统 ',
`arch` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 系统类型 ',
`vpsplanid` double(32, 0) UNSIGNED NULL DEFAULT 0 COMMENT '配置规格',
`hostname` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 自定义名称 ',
`ddos` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT'DDOS',
`appid` int(6) UNSIGNED NULL DEFAULT 0 COMMENT '预装应用',
`destroy` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '删除时间',
`month` int(3) UNSIGNED NULL DEFAULT 0 COMMENT '购买时长',
`deduction` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '扣费次数',
`money` decimal(10, 3) UNSIGNED NULL DEFAULT 0.000 COMMENT '费用',
`deduction_time` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '扣费时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '主机表' ROW_FORMAT = Compact;
系统会员注册登录代码部分略过,请自己补充,也可以在文章末尾下载本教程对应实例代码。
开始使用队列
本教程 think-queue 队列使用的是 redis 驱动方式,请务必先配置好 redis,配置文件在 config 文件夹下面的 queue.php,别弄错了
return [
'default' => 'redis',
'connections' => [
'sync' => ['driver' => 'sync',],
'database' => [
'driver' => 'database',
'queue' => 'default',
'table' => 'jobs',
],
'redis' => [
'driver' => 'redis',
'queue' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => 'Xun166123',
'select' => 1,
'timeout' => 0,
'persistent' => false,
],
],
'failed' => [
'type' => 'none',
'table' => 'failed_jobs',
],
];
接下来添加队列
/*
* 队列任务
* @author zakeear <zakeear@86dede.com>
* @version v0.1.5
* @time 2019-06-10
*/
namespace app\queue\controller;
use think\Exception;
use think\facade\Db;
use think\facade\Queue;
class Host{
/**
* 添加队列
* @access public
* @param int $subid 主机 id
* @param string $type 任务名
* @param int $times 延时秒数
* @throws \think\Exception
*/
public function addTask(int $subid=0,string $type='server',int $times=0){$server=Db::name('server')->where(['subid'=>$subid])->find();
if(!$server){exit;}
switch($type){
case 'server':
$jobHandlerClassName='app\queue\job\Money@fire';
$jobDataArr=['submit'=>time(),'doit'=>time()+$times,'subid'=>$server['subid'],'hostname'=>$server['hostname']];
$jobQueueName="Money";
break;
case 'destroy':
$jobHandlerClassName='app\queue\job\Destroy@fire';
$jobDataArr=['submit'=>time(),'doit'=>time()+$times,'subid'=>$server['subid'],'hostname'=>$server['hostname']];
$jobQueueName="Destroy";
break;
default:
break;
}
if($times==0){$isPushed=Queue::push($jobHandlerClassName,$jobDataArr,$jobQueueName);
}else{$isPushed=Queue::later($times,$jobHandlerClassName,$jobDataArr,$jobQueueName);
}
}
}
然后是消费者类
/*
* 主机扣费类
* @author zakeear <zakeear@86dede.com>
* @version v0.2.0
* @time 2019-06-13
*/
namespace app\queue\job;
use think\queue\Job;
use think\facade\Db;
class Money{public function fire(Job $job,$data){
//job
$isJobDone=$this->doJob($job,$data);
$attempts=$job->attempts()+1;
if($isJobDone){print('<info>['.date('Y-m-d H:i:s',time())."] 主机".$data['hostname']."扣费任务完成,任务销毁 </info>\n");
$job->delete();}else{$release=strtotime(date('Y-m-d H:',time()).'00')+3599+date('i',$data['submit'])*60+date('s',$data['submit'])-time();
print('<info>['.date('Y-m-d H:i:s',time())."]".$release."秒后执行主机".$data['hostname']."第".$attempts."次扣费任务 </info>\n");
$job->release($release);
}
}
private function doJob($job,$data){
//job
$attempts=$job->attempts();
print('<info>['.date('Y-m-d H:i:s',time())."] 主机".$data['hostname']."第".$attempts."次扣费 </info>\n");
// 主机
$server=Db::name('server')->field('id,uid,subid,month,money,hostname,deduction')->where(['subid'=>$data['subid'],'status'=>2])->find();
if(!$server){print('<info>['.date('Y-m-d H:i:s')."] 主机".$data['hostname']."已经不存在或者被删除!"."\n");
return true;
}
// 日志
$logs=new \app\common\logic\Logs();
// 配置
$this->config = Db::name('config')->field('rate,month,is_buy,vultr_api,vultr_keys,web_name,web_icon,time')->where(['status'=>1])->order('time','desc')->find();
if($server['deduction']==$this->config['month']){
// 扣费
$logs->database($server['uid'],5,'',' 主机【'.$server['hostname'].'】达到月付限额,本月不再扣费 ',1,$server['subid']);
// 计数
Db::name('server')->where(['subid'=>$data['subid']])->update(['deduction'=>0,'deduction_time'=>0]);
//job
$attempts=$attempts-1;
print('<info>['.date('Y-m-d H:i:s')."] 主机".$data['hostname']."已经达到月付上限".$attempts."次 \n");
// 队列
$job=new \app\queue\controller\Host();
// 删除
$job->addTask($data['subid'],'destroy',0);
// 创建
$release=\app\Timer::nextMonth()[0]+date('i',$data['submit'])*60+date('s',$data['submit'])-time();// 下月重新计费
$job->addTask($data['subid'],'server',$release);
// 返回
return true;
}
// 用户
$user=Db::name('user')->field('id,money')->where(['id'=>$server['uid']])->find();
if($user['money']<$server['money']){
// 日志
$logs->database($server['uid'],5,'',' 主机【'.$server['hostname'].'】不足于支付:【'.$server['money'].'】',1,$server['subid']);
// 删除
Db::name('server')->where(['subid'=>$data['subid']])->update(['status'=>5,'destroy'=>time()]);
// 日志
$logs->database($server['uid'],5,'',' 主机【'.$server['hostname'].'】删除 ',1,$server['subid']);
// 队列
$job=new \app\queue\controller\Host();
// 删除
$job->addTask($data['subid'],'destroy',0);
//job
print('<info>['.date('Y-m-d H:i:s')."] 用户余额不足于支付".$user['money']."元 \n");
// 返回
return true;
}
// 费用
$money=new \app\common\logic\Money();
// 扣费
$money->hostDec($server['uid'],1,$server['money'],1,$server['subid'],'主机【'.$server['hostname'].'】支付费用');
// 日志
$logs->database($server['uid'],5,'',' 主机【'.$server['hostname'].'】支付费用:【'.$server['money'].'】',1,$server['subid']);
// 计数
Db::name('server')->where(['subid'=>$data['subid']])->inc('deduction',1)->update(['deduction_time'=>time()]);
}
}
创建好后,文件目录对应结构如下:
守护消费者进程
参考此文章
https://segmentfault.com/a/11…
或者使用宝塔的计划任务
添加以下脚本
# 检查 php Money 队列脚本是否启动
php_count=`ps -ef | grep Money | grep -v "grep" | wc -l`
if [$php_count == 0];then
echo '----php Money queue start'
`sudo -H -u www bash -c 'nohup php /www/wwwroot/www.demo.com/think queue:listen --queue Money > /www/wwwroot/www.demo.com/logs/Money.txt 2>&1 &'`
else
echo '----php Money queue ok'
fi
#检查 php DestroyQueue 队列脚本是否启动
php_count=`ps -ef | grep Destroy | grep -v "grep" | wc -l`
if [$php_count == 0];then
echo '----php Destroy queue start'
`sudo -H -u www bash -c 'nohup php /www/wwwroot/www.demo.com/think queue:listen --queue Destroy > /www/wwwroot/www.demo.com/logs/Destroy.txt 2>&1 &'`
else
echo '----php Destroy queue ok'
fi
如图:
代码解读
think-queue 目前为止还未实现 subscribe 功能,这里利用了 think-queue 的延时队列来实现定时任务,当消费者类里的任务完成以后,不 return true,使用延时抛回给队列就好,该队列会一直存在不会被删除,也就变相的实现定时任务了。
不过到这里会有一个疑问,如何准确告诉队列需要延时多久?
代码如下:
$release=strtotime(date('Y-m-d H:',time()).'00')+3599+date('i',$data['submit'])*60+date('s',$data['submit'])-time();
print('<info>['.date('Y-m-d H:i:s',time())."]".$release."秒后执行主机".$data['hostname']."第".$attempts."次扣费任务 </info>\n");
$job->release($release);
这里就很关键了,需要你自己计算出来,假如一分钟一次,按常理 $job->release(60) 就搞定了,但是忘记了消费者类运行本身是需要时间的,起码几百毫秒是要的。遇到任务多的时候,可能 1 - 2 秒才能完成,加入第一次任务在 15:00:01 开始消费任务,消费完成又花了 1 秒,消费完成后你再 $job->release(60)那么下次执行消费队列就是 15:01:02,那么第三次执行就是 15:01:03,依次类推,60 次任务后,中间出现了有长达一分钟没有扣费的情况,这对需要定时扣费的项目来说就是 bug 或者灾难。这里我们通过动态计算来决定延时多少秒后,就解决了这个问题,经常生产环境长达 2 个月的观察,误差是前后 2 秒。这里留个引子,如果控制到前后不超过 1 秒呢?假如这时候任务队列太多,堆积了成千上百条队列再排队了以后,又该如何处理?下一期我们可能会为你们讲解如何使用 think-queue 实现任务调度来打造一个支持高并发的订单下单系统。
文章里缺失的代码部分,请前往 https://github.com/zakeear/man 查找或者自行结合业务进行修改。
think-queue 3.0 和 think-queue 2.0 变化
在 https://github.com/coolseven/…,thinkphp 的队列核心是自己编写的,laravel 的队列核心依赖于 symfony/process 这个 composer 包。翻阅 think-queue 3.0 的源码后发现
正好契合了 thinkphp6.0 的理念,全面拥抱 composer!由此也带来了 think-queue 3.0 和 think-queue 2.0 最大的一个区别,think-queue 3.0 需要注册服务,think-queue 2.0 不需要,这点差异会引发一个小编在使用过程中遇到的极端问题,windows 下安装好的 think-queue 3.0 到了 liunx 上,php think queue 无效。如果有同类问题,解决把办法是在 liunx 上安装 thinkphp6.0 和 think-queue 3.0 后下载到 windows 下使用即可,这点习惯在 windows 下开发的小伙伴要注意。
另外一点,使用过 think-queue 2.0 的同学会发现,think-queue 3.0 要想正常使用必须依赖于 php 的两个内置函数, 而这 2 个内置函数太对于敏感,运维一般会禁用,小编推荐的宝塔也会默认禁用掉它们,think-queue 3.0 更换到 symfony/process 这个包以后,就不再依赖这 2 个内置函数,从这点来说所以 3.0 要比 2.0 更加安全。
结语
https://www.kancloud.cn/think… ThinkPHP 开发者周刊是 Thinkphp 生态的重要一环,流年已经独自一人更新维护了 1 年多,为广大 phper 提供了一个学习和认识 php 圈子里优秀的项目、书籍、开发者的渠道,目前该周刊已经转由志愿者维护,不远的将来,将会交由社区维护。目前有好的文章、项目、书籍和案例,欢迎大家投稿。投稿地址:QQ 群:780179357
本教程实有仓促,文中如有遗漏和错误欢迎指正。本教程完整实例源码已经托管到 giehub:https://github.com/zakeear/man