乐趣区

thinkqueue-30实战教程打造一个定时扣费系统

前言

截止撰稿之时,ThinkPHP6.0 已经进入 RC3 阶段。按 ThinkPHP 作者流年的计划,ThinkPHP6.0 预计将会在今年秋季择机发布正式版,RC3 将可能是正式版之前的最后一个或者倒数第二个 RC 版本,这也就意味着,ThinkPHP6.0 已经日趋完善稳定,是一个值得尝试的候选版本了。

故此,随之有着重大变化的 think-queue 扩展也已经升级发布了接近正式版的 3.0.2。那么已经用 think-queue 上了生产环境的小伙伴们肯定很想知道,think-queue

3.0 是否已经可以尝鲜了?本期小编将带大家用 think-queue 3.0.2 的定时队列来打造一个定时扣费系统来告诉你答案。

PS:本人文中如有错误或者不失之处,还请海涵,欢迎各位大牛随时批评指导。

准备工作

使用 think-queue 队列,必须具备以下条件:

1:一个基于 liunx 系统的 server,windows 亦可但不推荐;

2:redis 服务端。建议 5.0 版本,

参考文章:https://www.jianshu.com/p/fe6…;

新手推荐使用宝塔面板,https://www.bt.cn/download/li… , 以省去精力和减少配置编译造成的服务端各种常见问题。

3:composer 包管理器

可参考:https://www.phpcomposer.com;

推荐镜像源:https://mirrors.aliyun.com/co…,在此感谢阿里云的贡献。

4:一个 redis 客户端,Windows 开发者推荐在以下项目里选择使用

https://github.com/uglide/Red…;

https://github.com/qishibo/An…;

https://github.com/cinience/R…;

以及 windows 自带的 Windows PowerShell。

对于 think-queue 不是很熟悉和了解的,请务必先阅读下面这边教程后再回来继续阅读:

https://github.com/coolseven/…

安装 thinkhp 和 think-queue

1: 首选创建 thinkphp6.0 新的项目,参考 https://www.kancloud.cn/manua…

composer create-project topthink/think=6.0.x-dev tp

2: 使用 think-queue 3.0.2

https://packagist.org/package…

composer require topthink/think-queue

也可以项目根目录下 composer.json 文件添加配置项

    "require": {
        "php": ">=7.1.0",
        "topthink/framework": "6.0.*-dev",
        "topthink/think-view": "^1.0",
        "symfony/var-dumper":"^4.2",
        "topthink/think-queue": "^3.0"
    },

3: 检查是否安装成功

在项目根目录下运行

php think

看到

就表示 think-queue 已经安装成功。

接下来就要进行下一步:创建项目的数据库,结构我已经准备好了

用户会员表

DROP TABLE IF EXISTS `user`;
CREATE TABLE `user`  (`id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT,
  `username` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT ''COMMENT' 用户名 ',
  `nickname` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 昵称 ',
  `realname` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 姓名 ',
  `password` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT ''COMMENT' 密码 ',
  `create_time` int(10) UNSIGNED NOT NULL DEFAULT 0 COMMENT '注册时间',
  `update_time` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '更新时间',
  `login_time` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '登陆时间',
  `login_count` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '登陆次数',
  `login_ip` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 登录 ip',
  `vip` tinyint(2) UNSIGNED NULL DEFAULT 0 COMMENT 'vip 等级',
  `vip_join` int(10) UNSIGNED NULL DEFAULT 0 COMMENT 'vip 加入时间',
  `vip_time` int(10) UNSIGNED NULL DEFAULT 0 COMMENT 'vip 过期时间',
  `ip` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 注册 ip',
  `status` tinyint(1) UNSIGNED NOT NULL DEFAULT 1 COMMENT '状态 1: 正常 2: 禁用 3: 临时',
  `lock_uid` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '封禁人',
  `lock_time` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '封禁时间',
  `lock_tips` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 封禁原因 ',
  `back_time` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '解封时间',
  `group` tinyint(1) UNSIGNED NOT NULL DEFAULT 1 COMMENT '身份 1: 普通 2: 管理员 3: 代理 4: 合作方 5: 渠道商',
  `group_time` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '身份过期',
  `safe_level` tinyint(1) UNSIGNED NOT NULL DEFAULT 1 COMMENT '安全等级',
  `safe_code` char(8) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 密保安全码 ',
  `safe_token` tinyint(1) UNSIGNED NULL DEFAULT 0 COMMENT '密保令牌 1: 未 2: 是',
  `safe_device` tinyint(1) UNSIGNED NOT NULL DEFAULT 1 COMMENT '密保设备 1: 未 2: 是',
  `safe_phone` bigint(11) UNSIGNED NULL DEFAULT 0 COMMENT '密保手机',
  `safe_email` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 密保邮箱 ',
  `verify_code` char(8) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 找回校验码 ',
  `verify_lock` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '找回锁定',
  `verify_time` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '校验码过期',
  `money` decimal(10, 3) UNSIGNED NOT NULL DEFAULT 0.000 COMMENT '余额',
  `give` decimal(10, 3) UNSIGNED NOT NULL DEFAULT 0.000 COMMENT '增送',
  `brokerage` decimal(10, 3) UNSIGNED NOT NULL DEFAULT 0.000 COMMENT '佣金',
  `server` int(10) UNSIGNED NOT NULL DEFAULT 10 COMMENT '服务器',
  `gold` tinyint(8) UNSIGNED NOT NULL DEFAULT 0 COMMENT '金币',
  `credits` tinyint(8) UNSIGNED NOT NULL DEFAULT 0 COMMENT '积分',
  `union_id` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 微信 ',
  `unionid` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT'QQ',
  `inviters` tinyint(5) UNSIGNED NOT NULL DEFAULT 0 COMMENT '邀请次数',
  `inviter_id` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '邀请人',
  PRIMARY KEY (`id`) USING BTREE,
  INDEX `id`(`id`, `username`, `realname`, `vip`, `group`, `money`, `create_time`) USING BTREE COMMENT '联合索引'
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '用户表' ROW_FORMAT = Dynamic;

日志表

DROP TABLE IF EXISTS `logs`;
CREATE TABLE `logs`  (`id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT,
  `uid` int(10) UNSIGNED NOT NULL DEFAULT 0 COMMENT '用户 id',
  `subid` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '主机 id',
  `op_id` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '操作人 id',
  `type` tinyint(2) UNSIGNED NOT NULL DEFAULT 1 COMMENT '类型 1: 会员 2: 管理 3: 系统 5: 财务',
  `time` int(10) UNSIGNED NOT NULL DEFAULT 0 COMMENT '时间',
  `ip` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT ''COMMENT' 登录 ip',
  `content` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '0' COMMENT '日志内容',
  PRIMARY KEY (`id`) USING BTREE,
  INDEX `idx_uid_type_time`(`uid`, `type`, `time`, `op_id`, `subid`) USING BTREE COMMENT '联合索引'
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '日志表' ROW_FORMAT = Dynamic;

用户主机表

DROP TABLE IF EXISTS `server`;
CREATE TABLE `server`  (`id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT,
  `uid` int(10) UNSIGNED NOT NULL COMMENT '所属用户',
  `status` int(2) UNSIGNED NOT NULL DEFAULT 2 COMMENT '状态 1: 已停止 2: 运行中 3: 已过期 4: 需续费 5: 已删除 6: 异常',
  `time` int(10) UNSIGNED NOT NULL COMMENT '创建时间',
  `op` int(4) UNSIGNED NULL DEFAULT 0 COMMENT '操作人',
  `op_time` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '操作时间',
  `subid` bigint(11) UNSIGNED NULL DEFAULT 0 COMMENT '实例 id',
  `ip_address` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT'IPv4',
  `password` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT ''COMMENT'root 密码 ',
  `snapshotid` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT ''COMMENT' 快照 id',
  `port` int(10) UNSIGNED NULL DEFAULT 22 COMMENT '端口',
  `ips` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 高防 IP',
  `enable_ipv6` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT'IPv6',
  `ipv6` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT'IPv6 地址 ',
  `dcid` int(5) UNSIGNED NULL DEFAULT 0 COMMENT '位置',
  `osid` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 操作系统 ',
  `arch` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 系统类型 ',
  `vpsplanid` double(32, 0) UNSIGNED NULL DEFAULT 0 COMMENT '配置规格',
  `hostname` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT' 自定义名称 ',
  `ddos` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT ''COMMENT'DDOS',
  `appid` int(6) UNSIGNED NULL DEFAULT 0 COMMENT '预装应用',
  `destroy` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '删除时间',
  `month` int(3) UNSIGNED NULL DEFAULT 0 COMMENT '购买时长',
  `deduction` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '扣费次数',
  `money` decimal(10, 3) UNSIGNED NULL DEFAULT 0.000 COMMENT '费用',
  `deduction_time` int(10) UNSIGNED NULL DEFAULT 0 COMMENT '扣费时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '主机表' ROW_FORMAT = Compact;

系统会员注册登录代码部分略过,请自己补充,也可以在文章末尾下载本教程对应实例代码。

开始使用队列

本教程 think-queue 队列使用的是 redis 驱动方式,请务必先配置好 redis,配置文件在 config 文件夹下面的 queue.php,别弄错了

return [
    'default' => 'redis',
    'connections' => [
        'sync' => ['driver' => 'sync',],
        'database' => [
            'driver' => 'database',
            'queue' => 'default',
            'table' => 'jobs',
        ],
        'redis' => [
            'driver' => 'redis',
            'queue' => 'default',
            'host' => '127.0.0.1',
            'port' => 6379,
            'password' => 'Xun166123',
            'select' => 1,
            'timeout' => 0,
            'persistent' => false,
        ],
    ],
    'failed' => [
        'type' => 'none',
        'table' => 'failed_jobs',
    ],
];

接下来添加队列

/*
* 队列任务
* @author zakeear <zakeear@86dede.com>
* @version v0.1.5
* @time 2019-06-10
*/
namespace app\queue\controller;
use think\Exception;
use think\facade\Db;
use think\facade\Queue;
class Host{
    /**
     * 添加队列
     * @access public
     * @param int $subid 主机 id
     * @param string $type 任务名
     * @param int $times 延时秒数
     * @throws \think\Exception
     */
    public function addTask(int $subid=0,string $type='server',int $times=0){$server=Db::name('server')->where(['subid'=>$subid])->find();
        if(!$server){exit;}
        switch($type){
            case 'server':
                $jobHandlerClassName='app\queue\job\Money@fire';
                $jobDataArr=['submit'=>time(),'doit'=>time()+$times,'subid'=>$server['subid'],'hostname'=>$server['hostname']];
                $jobQueueName="Money";
                break;
            case 'destroy':
                $jobHandlerClassName='app\queue\job\Destroy@fire';
                $jobDataArr=['submit'=>time(),'doit'=>time()+$times,'subid'=>$server['subid'],'hostname'=>$server['hostname']];
                $jobQueueName="Destroy";
                break;
            default:
                break;
        }
        if($times==0){$isPushed=Queue::push($jobHandlerClassName,$jobDataArr,$jobQueueName);
        }else{$isPushed=Queue::later($times,$jobHandlerClassName,$jobDataArr,$jobQueueName);
        }
    }
}

然后是消费者类

/*
* 主机扣费类
* @author zakeear <zakeear@86dede.com>
* @version v0.2.0
* @time 2019-06-13
*/
namespace app\queue\job;
use think\queue\Job;
use think\facade\Db;
class Money{public function fire(Job $job,$data){
        //job
        $isJobDone=$this->doJob($job,$data);
        $attempts=$job->attempts()+1;
        if($isJobDone){print('<info>['.date('Y-m-d H:i:s',time())."] 主机".$data['hostname']."扣费任务完成,任务销毁 </info>\n");
            $job->delete();}else{$release=strtotime(date('Y-m-d H:',time()).'00')+3599+date('i',$data['submit'])*60+date('s',$data['submit'])-time();
            print('<info>['.date('Y-m-d H:i:s',time())."]".$release."秒后执行主机".$data['hostname']."第".$attempts."次扣费任务 </info>\n");
            $job->release($release);
        }
    }
    private function doJob($job,$data){
        //job
        $attempts=$job->attempts();
        print('<info>['.date('Y-m-d H:i:s',time())."] 主机".$data['hostname']."第".$attempts."次扣费 </info>\n");
        // 主机
        $server=Db::name('server')->field('id,uid,subid,month,money,hostname,deduction')->where(['subid'=>$data['subid'],'status'=>2])->find();
        if(!$server){print('<info>['.date('Y-m-d H:i:s')."] 主机".$data['hostname']."已经不存在或者被删除!"."\n");
            return true;
        }
        // 日志
        $logs=new \app\common\logic\Logs();
        // 配置
        $this->config = Db::name('config')->field('rate,month,is_buy,vultr_api,vultr_keys,web_name,web_icon,time')->where(['status'=>1])->order('time','desc')->find();
        if($server['deduction']==$this->config['month']){
            // 扣费
            $logs->database($server['uid'],5,'',' 主机【'.$server['hostname'].'】达到月付限额,本月不再扣费 ',1,$server['subid']);
            // 计数
            Db::name('server')->where(['subid'=>$data['subid']])->update(['deduction'=>0,'deduction_time'=>0]);
            //job
            $attempts=$attempts-1;
            print('<info>['.date('Y-m-d H:i:s')."] 主机".$data['hostname']."已经达到月付上限".$attempts."次 \n");
            // 队列
            $job=new \app\queue\controller\Host();
            // 删除
            $job->addTask($data['subid'],'destroy',0);
            // 创建
            $release=\app\Timer::nextMonth()[0]+date('i',$data['submit'])*60+date('s',$data['submit'])-time();// 下月重新计费
            $job->addTask($data['subid'],'server',$release);
            // 返回
            return true;
        }
        // 用户
        $user=Db::name('user')->field('id,money')->where(['id'=>$server['uid']])->find();
        if($user['money']<$server['money']){
            // 日志
            $logs->database($server['uid'],5,'',' 主机【'.$server['hostname'].'】不足于支付:【'.$server['money'].'】',1,$server['subid']);
            // 删除
            Db::name('server')->where(['subid'=>$data['subid']])->update(['status'=>5,'destroy'=>time()]);
            // 日志
            $logs->database($server['uid'],5,'',' 主机【'.$server['hostname'].'】删除 ',1,$server['subid']);
            // 队列
            $job=new \app\queue\controller\Host();
            // 删除
            $job->addTask($data['subid'],'destroy',0);
            //job
            print('<info>['.date('Y-m-d H:i:s')."] 用户余额不足于支付".$user['money']."元 \n");
            // 返回
            return true;
        }
        // 费用
        $money=new \app\common\logic\Money();
        // 扣费
        $money->hostDec($server['uid'],1,$server['money'],1,$server['subid'],'主机【'.$server['hostname'].'】支付费用');
        // 日志
        $logs->database($server['uid'],5,'',' 主机【'.$server['hostname'].'】支付费用:【'.$server['money'].'】',1,$server['subid']);
        // 计数
        Db::name('server')->where(['subid'=>$data['subid']])->inc('deduction',1)->update(['deduction_time'=>time()]);
    }
}

创建好后,文件目录对应结构如下:

守护消费者进程

参考此文章

https://segmentfault.com/a/11…

或者使用宝塔的计划任务

添加以下脚本

# 检查 php Money 队列脚本是否启动
php_count=`ps -ef | grep Money | grep -v "grep" | wc -l`
if [$php_count == 0];then
    echo '----php Money queue start'
    `sudo -H -u www bash -c 'nohup php /www/wwwroot/www.demo.com/think queue:listen --queue Money > /www/wwwroot/www.demo.com/logs/Money.txt 2>&1 &'`
else
    echo '----php Money queue ok'
fi

#检查 php DestroyQueue 队列脚本是否启动
php_count=`ps -ef | grep Destroy | grep -v "grep" | wc -l`
if [$php_count == 0];then
    echo '----php Destroy queue start'
    `sudo -H -u www bash -c 'nohup php /www/wwwroot/www.demo.com/think queue:listen --queue Destroy > /www/wwwroot/www.demo.com/logs/Destroy.txt 2>&1 &'`
else
    echo '----php Destroy queue ok'
fi

如图:

代码解读

think-queue 目前为止还未实现 subscribe 功能,这里利用了 think-queue 的延时队列来实现定时任务,当消费者类里的任务完成以后,不 return true,使用延时抛回给队列就好,该队列会一直存在不会被删除,也就变相的实现定时任务了。

不过到这里会有一个疑问,如何准确告诉队列需要延时多久?

代码如下:

            $release=strtotime(date('Y-m-d H:',time()).'00')+3599+date('i',$data['submit'])*60+date('s',$data['submit'])-time();
            print('<info>['.date('Y-m-d H:i:s',time())."]".$release."秒后执行主机".$data['hostname']."第".$attempts."次扣费任务 </info>\n");
            $job->release($release);

这里就很关键了,需要你自己计算出来,假如一分钟一次,按常理 $job->release(60) 就搞定了,但是忘记了消费者类运行本身是需要时间的,起码几百毫秒是要的。遇到任务多的时候,可能 1 - 2 秒才能完成,加入第一次任务在 15:00:01 开始消费任务,消费完成又花了 1 秒,消费完成后你再 $job->release(60)那么下次执行消费队列就是 15:01:02,那么第三次执行就是 15:01:03,依次类推,60 次任务后,中间出现了有长达一分钟没有扣费的情况,这对需要定时扣费的项目来说就是 bug 或者灾难。这里我们通过动态计算来决定延时多少秒后,就解决了这个问题,经常生产环境长达 2 个月的观察,误差是前后 2 秒。这里留个引子,如果控制到前后不超过 1 秒呢?假如这时候任务队列太多,堆积了成千上百条队列再排队了以后,又该如何处理?下一期我们可能会为你们讲解如何使用 think-queue 实现任务调度来打造一个支持高并发的订单下单系统。

文章里缺失的代码部分,请前往 https://github.com/zakeear/man 查找或者自行结合业务进行修改。

think-queue 3.0 和 think-queue 2.0 变化

在 https://github.com/coolseven/…,thinkphp 的队列核心是自己编写的,laravel 的队列核心依赖于 symfony/process 这个 composer 包。翻阅 think-queue 3.0 的源码后发现

正好契合了 thinkphp6.0 的理念,全面拥抱 composer!由此也带来了 think-queue 3.0 和 think-queue 2.0 最大的一个区别,think-queue 3.0 需要注册服务,think-queue 2.0 不需要,这点差异会引发一个小编在使用过程中遇到的极端问题,windows 下安装好的 think-queue 3.0 到了 liunx 上,php think queue 无效。如果有同类问题,解决把办法是在 liunx 上安装 thinkphp6.0 和 think-queue 3.0 后下载到 windows 下使用即可,这点习惯在 windows 下开发的小伙伴要注意。

另外一点,使用过 think-queue 2.0 的同学会发现,think-queue 3.0 要想正常使用必须依赖于 php 的两个内置函数, 而这 2 个内置函数太对于敏感,运维一般会禁用,小编推荐的宝塔也会默认禁用掉它们,think-queue 3.0 更换到 symfony/process 这个包以后,就不再依赖这 2 个内置函数,从这点来说所以 3.0 要比 2.0 更加安全。

结语

https://www.kancloud.cn/think… ThinkPHP 开发者周刊是 Thinkphp 生态的重要一环,流年已经独自一人更新维护了 1 年多,为广大 phper 提供了一个学习和认识 php 圈子里优秀的项目、书籍、开发者的渠道,目前该周刊已经转由志愿者维护,不远的将来,将会交由社区维护。目前有好的文章、项目、书籍和案例,欢迎大家投稿。投稿地址:QQ 群:780179357

本教程实有仓促,文中如有遗漏和错误欢迎指正。本教程完整实例源码已经托管到 giehub:https://github.com/zakeear/man

退出移动版