基于golang和redis实现队列功能

28次阅读

共计 5794 个字符,预计需要花费 15 分钟才能阅读完成。

项目地址

Github: https://github.com/wuzhc/gmq

1. 概述

gmq是基于 redis 提供的特性, 使用 go 语言开发的一个简单易用的队列; 关于 redis 使用特性可以参考之前本人写过一篇很简陋的文章 Redis 实现队列;
gmq的灵感和设计是基于有赞延迟队列设计, 文章内容清晰而且很好理解, 但是没有提供源码, 在文章的最后也提到了一些未来架构方向; gmq不是简单按照有赞延迟队列的设计实现功能, 在它的基础上, 做了一些修改和优化, 主要如下:

  • 功能上

    • 多种任务模式, 不单单只是延迟队列; 例如: 延迟队列, 普通队列, 优先级队列
  • 架构上:

    • 添加 job 由 dispatcher 调度分配各个bucket, 而不是由timer
    • 每个 bucket 维护一个timer, 而不是所有 bucket 一个timer
    • timer每次扫描 bucket 到期 job 时, 会一次性返回多个到期job, 而不是每次只返回一个job
    • timer的扫描时钟由 bucket 中下个 job 到期时间决定, 而不是每秒扫描一次

2. 应用场景

  • 延迟任务

    • 延迟任务, 例如用户下订单一直处于未支付状态,半个小时候自动关闭订单
  • 异步任务

    • 异步任务, 一般用于耗时操作, 例如群发邮件等批量操作
  • 超时任务

    • 规定时间内 (TTR) 没有执行完毕或程序被意外中断, 则消息重新回到队列再次被消费, 一般用于数据比较敏感, 不容丢失的
  • 优先级任务

    • 当多个任务同时产生时, 按照任务设定等级优先被消费, 例如 a,b 两种类型的 job, 优秀消费 a, 然后再消费 b

3. 安装

3.1 源码运行

配置文件位于gmq/conf.ini, 可以根据自己项目需求修改配置

cd $GOPATH/src # 进入 gopath/src 目录
git clone https://github.com/wuzhc/gmq.git
cd gmq
go get -u -v github.com/kardianos/govendor # 如果有就不需要安装了
govendor sync -v # 如果很慢, 可能需要翻墙
go run main.go start

3.2 执行文件运行

cd $GOPATH/src/gmq
# 编译成可执行文件
go build 
# 启动
./gmq start
# 停止
./gmq stop

# 守护进程模式启动, 不输出日志到 console
nohup ./gmq start >/dev/null 2>&1  &
# 守护进程模式下查看日志输出(配置文件 conf.ini 需要设置 target_type=file,filename=gmq.log)
tail -f gmq.log

4. 客户端

目前只实现 python,go,php 语言的客户端的 demo, 参考:https://github.com/wuzhc/demo/tree/master/mq

运行

# php
# 生产者
php producer.php
# 消费者
php consumer.php

# python
# 生产者
python producer.py
# 消费者
python consumer.py

一条消息结构

{
    "id": "xxxx",    # 任务 id, 这个必须是一个唯一值, 将作为 redis 的缓存键
    "topic": "xxx",  # topic 是一组 job 的分类名, 消费者将订阅 topic 来消费该分类下的 job
    "body": "xxx",   # 消息内容
    "delay": "111",  # 延迟时间, 单位秒
    "TTR": "11111",  # 执行超时时间, 单位秒
    "status": 1,     # job 执行状态, 该字段由 gmq 生成
    "consumeNum":1,  # 被消费的次数, 主要记录 TTR>0 时, 被重复消费的次数, 该字段由 gmq 生成
}

延迟任务

 $data = ['id'    => 'xxxx_id' . microtime(true) . rand(1,999999999),
        'topic' => ["topic_xxx"],
        'body'  => 'this is a rpc test',
        'delay' => '1800', // 单位秒, 半个小时后执行
        'TTR'   => '0'
    ];

超时任务

 $data = ['id'    => 'xxxx_id' . microtime(true) . rand(1,999999999),
        'topic' => ["topic_xxx"],
        'body'  => 'this is a rpc test',
        'delay' => '0', 
        'TTR'   => '100' // 100 秒后还未得到消费者 ack 确认, 则再次添加到队列, 将再次被被消费
    ];

异步任务

$data = ['id'    => 'xxxx_id' . microtime(true) . rand(1,999999999),
        'topic' => ["topic_xxx"],
        'body'  => 'this is a rpc test',
        'delay' => '0', 
        'TTR'   => '0' 
    ];

优先级任务

$data = ['id'    => 'xxxx_id' . microtime(true) . rand(1,999999999),
        'topic' => ["topic_A","topic_B","topic_C"], // 优先消费 topic_A, 消费完后消费 topic_B, 最后再消费 topic_C
        'body'  => 'this is a rpc test',
        'delay' => '0', 
        'TTR'   => '0' 
    ];

5. gmq 流程图如下:

5.1 延迟时间 delay

  • 当 job.delay>0 时,job 会被分配到 bucket 中,bucket 会有周期性扫描到期 job, 如果到期,job 会被 bucket 移到ready queue, 等待被消费
  • 当 job.delay= 0 时,job 会直接加到ready queue, 等待被消费

5.2 执行超时时间 TTR

参考第一个图的流程, 当 job 被消费者读取后, 如果job.TTR>0, 即 job 设置了执行超时时间, 那么 job 会在读取后会被添加到 TTRBucket(专门存放设置了超时时间的 job), 并且设置job.delay = job.TTR, 如果在 TTR 时间内没有得到消费者 ack 确认然后删除 job,job 将在 TTR 时间之后添加到ready queue, 然后再次被消费(如果消费者在 TTR 时间之后才请求 ack, 会得到失败的响应)

5.3 确认机制

主要和 TTR 的设置有关系, 确认机制可以分为两种:

  • 当 job.TTR= 0 时, 消费者 pop 出 job 时, 即会自动删除 job pool 中的 job 元数据
  • 当 job.TTR>0 时, 即 job 执行超时时间, 这个时间是指用户 pop 出 job 时开始到用户 ack 确认删除结束这段时间, 如果在这段时间没有 ACK,job 会被再次加入到ready queue, 然后再次被消费, 只有用户调用了ACK, 才会去删除job pool 中 job 元数据

6. web 监控

gmq提供了一个简单 web 监控平台 (后期会提供根据 job.Id 追踪消息的功能), 方便查看当前堆积任务数, 默认监听端口为8000, 例如:http://127.0.0.1:8000, 界面如下:

后台模板来源于 https://github.com/george518/…

7. 遇到问题

以下是开发遇到的问题, 以及一些粗糙的解决方案

7.1 安全退出

如果强行中止 gmq 的运行, 可能会导致一些数据丢失, 例如下面一个例子:

如果发生上面的情况, 就会出现 job 不在 bucket 中, 也不在 ready queue, 这就出现了 job 丢失的情况, 而且将没有任何机会去删除job pool 中已丢失的 job, 长久之后 job pool 可能会堆积很多的已丢失 job 的元数据; 所以安全退出需要在接收到退出信号时, 应该等待所有 goroutine 处理完手中的事情, 然后再退出

7.1.1 gmq退出流程


首先 gmq 通过 context 传递关闭信号给 dispatcher,dispatcher 接收到信号会关闭 dispatcher.closed, 每个bucket 会收到 close 信号, 然后先退出 timer 检索, 再退出 bucket,dispatcher 等待所有 bucket 退出后, 然后退出

dispatcher退出顺序流程: timer -> bucket -> dispatcher

7.1.2 注意

不要使用 kill -9 pid 来强制杀死进程, 因为系统无法捕获 SIGKILL 信号, 导致 gmq 可能执行到一半就被强制中止, 应该使用 kill -15 pid,kill -1 pidkill -2 pid, 各个数字对应信号如下:

  • 9 对应 SIGKILL
  • 15 对应 SIGTERM
  • 1 对应 SIGHUP
  • 2 对应 SIGINT
  • 信号参考 https://www.jianshu.com/p/5729fc095b2a

7.2 智能定时器

  • 每一个 bucket 都会维护一个 timer, 不同于有赞设计,timer 不是每秒轮询一次, 而是根据 bucket 下一个 job 到期时间来设置 timer 的定时时间 , 这样的目的在于如果 bucket 没有 job 或 job 到期时间要很久才会发生, 就可以减少不必要的轮询;
  • timer只有处理完一次业务后才会重置定时器;, 这样的目的在于可能出现上一个时间周期还没执行完毕, 下一个定时事件又发生了
  • 如果到期的时间很相近,timer就会频繁重置定时器时间, 就目前使用来说, 还没出现什么性能上的问题

7.3 原子性问题

我们知道 redis 的命令是排队执行, 在一个复杂的业务中可能会多次执行 redis 命令, 如果在大并发的场景下, 这个业务有可能中间插入了其他业务的命令, 导致出现各种各样的问题;
redis 保证整个事务原子性和一致性问题一般用 multi/execlua 脚本 ,gmq 在操作涉及复杂业务时使用的是 lua 脚本, 因为lua 脚本 除了有 multi/exec 的功能外, 还有 Pipepining 功能 (主要打包命令, 减少和redis server 通信次数), 下面是一个 gmq 定时器扫描 bucket 集合到期 job 的 lua 脚本:

-- 获取到期的 50 个 job
local jobIds = redis.call('zrangebyscore',KEYS[1], 0, ARGV[4], 'withscores', 'limit', 0, 50)
local res = {}
for k,jobId in ipairs(jobIds) do 
    if k%2~=0 then
        local jobKey = string.format('%s:%s', ARGV[3], jobId)
        local status = redis.call('hget', jobKey, 'status')
        -- 检验 job 状态
        if tonumber(status) == tonumber(ARGV[1]) or tonumber(status) == tonumber(ARGV[2]) then
            -- 先移除集合中到期的 job, 然后到期的 job 返回给 timer
            local isDel = redis.call('zrem', KEYS[1], jobId)
            if isDel == 1 then
                table.insert(res, jobId)
            end
        end
    end
end

local nextTime
-- 计算下一个 job 执行时间, 用于设置 timer 下一个时钟周期
local nextJob = redis.call('zrange', KEYS[1], 0, 0, 'withscores')
if next(nextJob) == nil then
    nextTime = -1
else
    nextTime = tonumber(nextJob[2]) - tonumber(ARGV[4])
    if nextTime < 0 then
        nextTime = 1
    end
end

table.insert(res,1,nextTime)
return res

7.4 redis 连接池

可能一般 phper 写业务很少会接触到连接池, 其实这是由 php 本身所决定他应用不大, 当然在 php 的扩展 swoole 还是很有用处的
gmq的 redis 连接池是使用 gomodule/redigo/redis 自带连接池, 它带来的好处是限制 redis 连接数, 通过复用 redis 连接来减少开销, 另外可以防止 tcp 被消耗完, 这在生产者大量生成数据时会很有用

// gmq/mq/redis.go
Redis = &RedisDB{
    Pool: &redis.Pool{
        MaxIdle:     30,    // 最大空闲链接
        MaxActive:   10000, // 最大链接
        IdleTimeout: 240 * time.Second, // 空闲链接超时
        Wait:        true, // 当连接池耗尽时, 是否阻塞等待
        Dial: func() (redis.Conn, error) {c, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword(""))
            if err != nil {return nil, err}
            return c, nil
        },
        TestOnBorrow: func(c redis.Conn, t time.Time) error {if time.Since(t) < time.Minute {return nil}
            _, err := c.Do("PING")
            return err
        },
    },
}

8. 注意问题

  • job.id 在 job pool 是唯一的, 它将作为 redis 的缓存键;gmq不自动为 job 生成唯一 id 值是为了用户可以根据自己生成的 job.id 来追踪 job 情况, 如果 job.id 是重复的,push 时会报重复 id 的错误
  • bucket 数量不是越多越好, 一般来说, 添加到 ready queue 的速度取决与 redis 性能, 而不是 bucket 数量

9. 使用中可能出现的问题

9.1 客户端出现大量的 TIME_WAIT 状态, 并且新的连接被拒绝

netstat -anp | grep 9503 | wc -l
tcp        0      0 10.8.8.188:41482        10.8.8.185:9503         TIME_WAIT   -                   

这个是正常现象, 由 tcp 四次挥手可以知道, 当接收到 LAST_ACK 发出的 FIN 后会处于 TIME_WAIT 状态, 主动关闭方 (客户端) 为了确保被动关闭方 (服务端) 收到 ACK,会等待 2MSL 时间,这个时间是为了再次发送 ACK, 例如被动关闭方可能因为接收不到 ACK 而重传 FIN; 另外也是为了旧数据过期, 不影响到下一个链接,; 如果要避免大量 TIME_WAIT 的连接导致 tcp 被耗尽; 一般方法如下:

  • 使用长连接, 目的是避免重复建立连接
  • 配置文件, 网上很多教程, 就是让系统尽快的回收 TIME_WAIT 状态的连接
  • 使用连接池, 目的是限制连接数, 当连接池耗尽时, 阻塞等待, 直到回收再利用

10. 相关链接

  • 有赞延迟队列设计
  • Redis 实现队列

正文完
 0