项目地址Github: https://github.com/wuzhc/gmq
1. 概述gmq是基于redis提供的特性,使用go语言开发的一个简单易用的队列;关于redis使用特性可以参考之前本人写过一篇很简陋的文章Redis 实现队列; gmq的灵感和设计是基于有赞延迟队列设计,文章内容清晰而且很好理解,但是没有提供源码,在文章的最后也提到了一些未来架构方向; gmq不是简单按照有赞延迟队列的设计实现功能,在它的基础上,做了一些修改和优化,主要如下:
功能上
多种任务模式,不单单只是延迟队列;例如:延迟队列,普通队列,优先级队列架构上:
添加job由dispatcher调度分配各个bucket,而不是由timer每个bucket维护一个timer,而不是所有bucket一个timertimer每次扫描bucket到期job时,会一次性返回多个到期job,而不是每次只返回一个jobtimer的扫描时钟由bucket中下个job到期时间决定,而不是每秒扫描一次2. 应用场景延迟任务
延迟任务,例如用户下订单一直处于未支付状态,半个小时候自动关闭订单异步任务
异步任务,一般用于耗时操作,例如群发邮件等批量操作超时任务
规定时间内(TTR)没有执行完毕或程序被意外中断,则消息重新回到队列再次被消费,一般用于数据比较敏感,不容丢失的优先级任务
当多个任务同时产生时,按照任务设定等级优先被消费,例如a,b两种类型的job,优秀消费a,然后再消费b3. 安装3.1 源码运行配置文件位于gmq/conf.ini,可以根据自己项目需求修改配置
cd $GOPATH/src # 进入gopath/src目录git clone https://github.com/wuzhc/gmq.gitcd gmqgo get -u -v github.com/kardianos/govendor # 如果有就不需要安装了govendor sync -v # 如果很慢,可能需要翻墙go run main.go start3.2 执行文件运行cd $GOPATH/src/gmq# 编译成可执行文件go build # 启动./gmq start# 停止./gmq stop# 守护进程模式启动,不输出日志到consolenohup ./gmq start >/dev/null 2>&1 &# 守护进程模式下查看日志输出(配置文件conf.ini需要设置target_type=file,filename=gmq.log)tail -f gmq.log4. 客户端目前只实现python,go,php语言的客户端的demo,参考:https://github.com/wuzhc/demo/tree/master/mq
运行# php# 生产者php producer.php# 消费者php consumer.php# python# 生产者python producer.py# 消费者python consumer.py一条消息结构{ "id": "xxxx", # 任务id,这个必须是一个唯一值,将作为redis的缓存键 "topic": "xxx", # topic是一组job的分类名,消费者将订阅topic来消费该分类下的job "body": "xxx", # 消息内容 "delay": "111", # 延迟时间,单位秒 "TTR": "11111", # 执行超时时间,单位秒 "status": 1, # job执行状态,该字段由gmq生成 "consumeNum":1, # 被消费的次数,主要记录TTR>0时,被重复消费的次数,该字段由gmq生成}延迟任务 $data = [ 'id' => 'xxxx_id' . microtime(true) . rand(1,999999999), 'topic' => ["topic_xxx"], 'body' => 'this is a rpc test', 'delay' => '1800', // 单位秒,半个小时后执行 'TTR' => '0' ];超时任务 $data = [ 'id' => 'xxxx_id' . microtime(true) . rand(1,999999999), 'topic' => ["topic_xxx"], 'body' => 'this is a rpc test', 'delay' => '0', 'TTR' => '100' // 100秒后还未得到消费者ack确认,则再次添加到队列,将再次被被消费 ];异步任务$data = [ 'id' => 'xxxx_id' . microtime(true) . rand(1,999999999), 'topic' => ["topic_xxx"], 'body' => 'this is a rpc test', 'delay' => '0', 'TTR' => '0' ];优先级任务$data = [ 'id' => 'xxxx_id' . microtime(true) . rand(1,999999999), 'topic' => ["topic_A","topic_B","topic_C"], //优先消费topic_A,消费完后消费topic_B,最后再消费topic_C 'body' => 'this is a rpc test', 'delay' => '0', 'TTR' => '0' ];5. gmq流程图如下:
...