共计 4859 个字符,预计需要花费 13 分钟才能阅读完成。
Go+Redis 实现简略的音讯队列
前言
假如咱们有这样一个场景,零碎会在某个特定的状况下给用户推送一条音讯,可能是短信、邮件或站内信,因为该场景音讯可能会在某一个时刻同时推送大量的音讯,且主过程不心愿会阻塞。该场景对实时性要求不高,容许音讯被延时送达。在零碎的构建初期,应用业余的音讯队列中间件 Rabbitmq 和 Kafka 来实现音讯的异步推送就显得不是很不便,此时咱们能够思考应用 Redis 来实现简略的音讯队列。当然,如果咱们对音讯的实时性以及可靠性要求十分高,可能就须要应用 MQ 或 kafka 来实现了。
音讯队列
实现的原理是生产者将数据 push 到 Redis 的 list 里,消费者轮训去 pop 数据,如果能取到数据,则间接生产数据,如果等不到则持续循环 pop 数据。
以上示意图中的红色箭头就是模仿的一个生产者从左部增加数据,消费者在右侧获取数据的场景。反之一样。
然而这是就有个问题, 如果队列空了怎么办? 当列表为空时,消费者就会一直的轮训来获取数据,然而每次都获取不到数据,就会陷入一个取不到数据的死循环里,这不仅拉高了客户端的 CPU,还拉高了 Redis 的 QPS,并且这些拜访都是有效的。
这时咱们能够应用 sleep(1) 的形式去延时 1 秒,也能够应用 Redis 提供的阻塞式拜访,BRPP 和 BLPOP 命令,消费者能够在获取不到数据的时候指定一个如果数据不存在的阻塞的超时工夫,如果在这个工夫内能取到数据,则会立刻返回,否则会返回 null,当这个超时工夫设置为 0 的时候,示意会始终阻塞,但咱们通常并不倡议如此。如果都有多个客户端同时在阻塞期待音讯,则会依照先后顺序排序。
Redis client
首先持续先看一下 redis client。
import (
"log"
"github.com/go-redis/redis"
)
// 队列的 key
var queueKey = "queue:message"
var rdb *redis.Client
func NewRedis() {
rdb = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
})
pong, err := rdb.Ping().Result()
if err != nil {log.Fatalln(err)
}
log.Println(pong,"redis success.")
}
生产者 Producer
生产者的具体实现代码,模仿一个随机生成音讯的生产者,应用 lpush 将数据增加到 list 里。
// 应用 list 生产音讯
func ProducerMessageList(){rand.Seed(time.Now().UnixNano())
log.Println("开启生产者。。。。")
for i := 0;i < 10;i++ {score := time.Now().Unix()
log.Println("正在生产一条音讯...", score, i)
_,err := rdb.LPush(queueListKey,i).Result()
if err != nil {log.Println(err)
}
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
}
}
消费者 Consumer
消费者则应用 rpop 来从队列里 pop 出一条音讯进行生产,但咱们后面讲过,如果队列空了,则会一直的轮训 pop 音讯,会造成大量的资源的节约,因而咱们此处应用 brpop 命令来实现阻塞的读,阻塞读在队列没有数据时会立刻进入休眠状态,一旦有数据了,则会立刻被唤醒并弹出音讯,提早能够忽略不计。
// 应用 list 格局生产音讯
func ConsumerMessageList() {
for {
// 设置一个 5 秒的超时工夫
value, err := rdb.BRPop(5 *time.Second,"queue:list").Result()
if err == redis.Nil{
// 查问不到数据
time.Sleep(1 * time.Second)
continue
}
if err != nil {
// 查问出错
time.Sleep(1 * time.Second)
continue
}
log.Println("生产到数据:", value, "以后工夫是:", time.Now().Unix())
time.Sleep(time.Second)
}
}
从代码里能够看到,咱们设置了一个 5 秒的阻塞读超时工夫,是因为阻塞读也不能始终阻塞,长时间的阻塞可能会被服务器端被动断开链接,而后会抛出异样,所以这里须要设置一个不是很长的阻塞超时工夫。
延时队列
实现的原理是将生产的数据应用 zadd 命令存入 Redis,将 field 的的 score 设置成须要延时的工夫戳,如果须要立刻执行,不须要提早的话,则将 score 设置成以后工夫戳即可。同时独自运行一个 goroutine,应用 zrangebyscore 命令去截取数据,数据的 score 为以后工夫戳,这样一来,达到执行工夫的数据将被取出,而后咱们取第一个数据,应用 zrem 命令将数据移除队列,移除胜利则示意该条音讯容许被发送,否则,从新执行以上流程。
Redis client
咱们先来看一下应用 go-redis 实现的 Redis 客户端链接。
import (
"log"
"github.com/go-redis/redis"
)
// 队列的 key
var queueKey = "queue:message"
var rdb *redis.Client
func NewRedis() {
rdb = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
})
pong, err := rdb.Ping().Result()
if err != nil {log.Fatalln(err)
}
log.Println(pong,"redis success.")
}
留神:这里 demo 环境下,我应用了一个 var rdb *redis.Client,前面的代码均应用 rdb 间接来调用 Redis,理论我的项目中依据我的项目的状况来。
生产者 Producer
这里我模仿了一个随机产生音讯的状况,咱们应用随机产生一个数字来延时一段时间,用来察看消费者是否能实时对数据的生产。
// 生产音讯
func ProducerMessage() {rand.Seed(time.Now().UnixNano())
log.Println("开启生产者。。。。")
for i := 0;i < 5;i++ {score := time.Now().Unix()
log.Println("正在生产一条音讯...", score, i)
rdb.ZAdd("queue:message", redis.Z{Score: float64(score + 1),// 秒级工夫戳 +1,示意延时 1 秒
Member: i,
})
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
}
}
产生的数据将应用 zadd 命令存入 Redis,并增加 score 为以后的工夫戳,并给 score 增加一个延时工夫,因为示例中应用的秒级工夫戳作为 score,则咱们延时 1 秒就间接 score+ 1 即可。member 里能够存咱们要延时的数据,比方将要发送的音讯。
消费者 Consumer
应用 zrangebyscore 截取截止到以后工夫戳的音讯,如果音讯延时到以后工夫,则能够间接被截取进去,并应用 zrem 移除,目标是为了避免同一条数据被反复生产,移除胜利的音讯才可进行后续的生产过程。
func ConsumerMessage() {log.Println("正在启动消费者...")
for {// score := time.Now().UnixNano()
values, err := rdb.ZRangeByScore("queue:message", redis.ZRangeBy{
Min: "0",
Max: fmt.Sprint(time.Now().Unix()),
Offset: 0,
Count: 1,
}).Result()
if err != nil {log.Fatalln(err)
// Redis 查问出错,提早 1 秒持续
time.Sleep(time.Second)
continue
}
if len(values) == 0 {
// 没有数据,提早 1 秒持续
// time.Sleep(time.Second)
continue
}
// 因为应用 zrangebyscore 的时候指定了 count=1, 因而此处实践上只会有一条数据
value := values[0]
num, err := rdb.ZRem("queue:message", value).Result()
if err != redis.Nil && err != nil {log.Println(err)
time.Sleep(time.Second)
continue
}
if num == 1 {log.Println("生产到数据:", value, "以后工夫是:", time.Now().Unix())
// 模仿一个耗时的操作
time.Sleep(2 * time.Second)
}
}
}
因为咱们应用秒级工夫戳作为 score,因而咱们在查问无数据时能够采纳 sleep(1) 的操作缩小对 Redis 的反复申请。
发布者订阅者模型 PubSub
Redis 里有一个独自的模块来针对一对多的单向通信计划,这就是 PubSub 模块,PubSub 模块能够用作播送模式,即一个发布者多个订阅者。
发布者订阅者模式能够了解为:订阅者(Subscriber)会在 Redis 上订阅一个通道,生产者在该通道上公布一条音讯,订阅者就会立即收到该音讯,如果咱们有多个订阅者,那么发布者公布的音讯会被多个订阅者同时收到截然不同的音讯。
咱们能够基于此模型来实现一个简略的音讯队列,应用一个发布者和一个订阅者来实现音讯的公布和订阅。
实战代码里的 Redis client 和下面一样,就不再列出了。
发布者(生产者)
发布者应用 publish 命令想频道里公布数据,订阅者即能够订阅到音讯
func ProducerMessagePubSub() {rand.Seed(time.Now().UnixNano())
for i := 0; i < 10; i++ {log.Println("正在生产一条音讯...", i)
r, err := rdb.Publish("queue:pubsub", i).Result()
if err != nil {log.Println(err,r)
}
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
}
}
订阅者(消费者)
订阅者订阅指定的频道,而后应用管道来接管音讯,并解决音讯。
func ConsumerMessagePubSub(node int){
// 订阅频道
pubsub := rdb.Subscribe("queue:pubsub")
// 用管道来接管音讯
ch := pubsub.Channel()
// 解决音讯
for msg := range ch {log.Printf("以后节点:%d,生产到数据,channel:%s;message:%s\n", node, msg.Channel, msg.Payload)
}
}
当然如果咱们会公布大量的音讯,同时会有多个消费者去生产,也能够将通道分成多个,每个通道有本人的订阅者订阅,而后发布者在公布音讯的时候依据节点 ID 或随机调配的形式调配到每个通道上来实现。
不过咱们也须要留神,以上的示例代码里,如果同时开启两个 goroutine 的话,发布者立即 Publish 音讯,而订阅者不能在第一工夫订阅到音讯,因为绝对于用 go 这个关键字去开启两个 goroutine 的话,简直是霎时的,因而肯定要先使订阅者订阅到频道后,再有公布操作。
参考:
Redis 深度历险 Redis 深度历险: 外围原理和利用实际