Go+Redis实现简略的音讯队列
前言
假如咱们有这样一个场景,零碎会在某个特定的状况下给用户推送一条音讯,可能是短信、邮件或站内信,因为该场景音讯可能会在某一个时刻同时推送大量的音讯,且主过程不心愿会阻塞。该场景对实时性要求不高,容许音讯被延时送达。在零碎的构建初期,应用业余的音讯队列中间件Rabbitmq和Kafka来实现音讯的异步推送就显得不是很不便,此时咱们能够思考应用Redis来实现简略的音讯队列。当然,如果咱们对音讯的实时性以及可靠性要求十分高,可能就须要应用MQ或kafka来实现了。
音讯队列
实现的原理是生产者将数据push到Redis的list里,消费者轮训去pop数据,如果能取到数据,则间接生产数据,如果等不到则持续循环pop数据。
以上示意图中的红色箭头就是模仿的一个生产者从左部增加数据,消费者在右侧获取数据的场景。反之一样。
然而这是就有个问题,如果队列空了怎么办?当列表为空时,消费者就会一直的轮训来获取数据,然而每次都获取不到数据,就会陷入一个取不到数据的死循环里,这不仅拉高了客户端的CPU,还拉高了Redis的QPS,并且这些拜访都是有效的。
这时咱们能够应用sleep(1)的形式去延时1秒,也能够应用Redis提供的阻塞式拜访,BRPP和BLPOP命令,消费者能够在获取不到数据的时候指定一个如果数据不存在的阻塞的超时工夫,如果在这个工夫内能取到数据,则会立刻返回,否则会返回null,当这个超时工夫设置为0的时候,示意会始终阻塞,但咱们通常并不倡议如此。如果都有多个客户端同时在阻塞期待音讯,则会依照先后顺序排序。
Redis client
首先持续先看一下redis client。
import ( "log" "github.com/go-redis/redis")// 队列的keyvar queueKey = "queue:message"var rdb *redis.Clientfunc NewRedis() { rdb = redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", Password: "", }) pong, err := rdb.Ping().Result() if err != nil { log.Fatalln(err) } log.Println(pong,"redis success.")}
生产者Producer
生产者的具体实现代码,模仿一个随机生成音讯的生产者,应用lpush将数据增加到list里。
// 应用list生产音讯func ProducerMessageList(){ rand.Seed(time.Now().UnixNano()) log.Println("开启生产者。。。。") for i := 0;i < 10;i++ { score := time.Now().Unix() log.Println("正在生产一条音讯...", score, i) _,err := rdb.LPush(queueListKey,i).Result() if err != nil { log.Println(err) } time.Sleep(time.Duration(rand.Intn(3)) * time.Second) }}
消费者Consumer
消费者则应用rpop来从队列里pop出一条音讯进行生产,但咱们后面讲过,如果队列空了,则会一直的轮训pop音讯,会造成大量的资源的节约,因而咱们此处应用brpop命令来实现阻塞的读,阻塞读在队列没有数据时会立刻进入休眠状态,一旦有数据了,则会立刻被唤醒并弹出音讯,提早能够忽略不计。
// 应用list格局生产音讯func ConsumerMessageList() { for { // 设置一个5秒的超时工夫 value, err := rdb.BRPop(5 *time.Second,"queue:list").Result() if err == redis.Nil{ // 查问不到数据 time.Sleep(1 * time.Second) continue } if err != nil { // 查问出错 time.Sleep(1 * time.Second) continue } log.Println("生产到数据:", value, "以后工夫是:", time.Now().Unix()) time.Sleep(time.Second) }}
从代码里能够看到,咱们设置了一个5秒的阻塞读超时工夫,是因为阻塞读也不能始终阻塞,长时间的阻塞可能会被服务器端被动断开链接,而后会抛出异样,所以这里须要设置一个不是很长的阻塞超时工夫。
延时队列
实现的原理是将生产的数据应用zadd命令存入Redis,将field的的score设置成须要延时的工夫戳,如果须要立刻执行,不须要提早的话,则将score设置成以后工夫戳即可。同时独自运行一个goroutine,应用zrangebyscore 命令去截取数据,数据的score为以后工夫戳,这样一来,达到执行工夫的数据将被取出,而后咱们取第一个数据,应用zrem命令将数据移除队列,移除胜利则示意该条音讯容许被发送,否则,从新执行以上流程。
Redis client
咱们先来看一下应用go-redis实现的Redis客户端链接。
import ( "log" "github.com/go-redis/redis")// 队列的keyvar queueKey = "queue:message"var rdb *redis.Clientfunc NewRedis() { rdb = redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", Password: "", }) pong, err := rdb.Ping().Result() if err != nil { log.Fatalln(err) } log.Println(pong,"redis success.")}
留神:这里demo环境下,我应用了一个var rdb *redis.Client,前面的代码均应用rdb间接来调用Redis,理论我的项目中依据我的项目的状况来。
生产者Producer
这里我模仿了一个随机产生音讯的状况, 咱们应用随机产生一个数字来延时一段时间,用来察看消费者是否能实时对数据的生产。
// 生产音讯func ProducerMessage() { rand.Seed(time.Now().UnixNano()) log.Println("开启生产者。。。。") for i := 0;i < 5;i++ { score := time.Now().Unix() log.Println("正在生产一条音讯...", score, i) rdb.ZAdd("queue:message", redis.Z{ Score: float64(score + 1),// 秒级工夫戳+1,示意延时1秒 Member: i, }) time.Sleep(time.Duration(rand.Intn(3)) * time.Second) }}
产生的数据将应用zadd命令存入Redis,并增加score为以后的工夫戳,并给score增加一个延时工夫,因为示例中应用的秒级工夫戳作为score,则咱们延时1秒就间接score+1即可。member里能够存咱们要延时的数据,比方将要发送的音讯。
消费者Consumer
应用zrangebyscore截取截止到以后工夫戳的音讯,如果音讯延时到以后工夫,则能够间接被截取进去,并应用zrem移除,目标是为了避免同一条数据被反复生产,移除胜利的音讯才可进行后续的生产过程。
func ConsumerMessage() { log.Println("正在启动消费者...") for { // score := time.Now().UnixNano() values, err := rdb.ZRangeByScore("queue:message", redis.ZRangeBy{ Min: "0", Max: fmt.Sprint(time.Now().Unix()), Offset: 0, Count: 1, }).Result() if err != nil { log.Fatalln(err) // Redis查问出错,提早1秒持续 time.Sleep(time.Second) continue } if len(values) == 0 { // 没有数据,提早1秒持续 // time.Sleep(time.Second) continue } // 因为应用zrangebyscore的时候指定了count=1,因而此处实践上只会有一条数据 value := values[0] num, err := rdb.ZRem("queue:message", value).Result() if err != redis.Nil && err != nil { log.Println(err) time.Sleep(time.Second) continue } if num == 1 { log.Println("生产到数据:", value, "以后工夫是:", time.Now().Unix()) // 模仿一个耗时的操作 time.Sleep(2 * time.Second) } }}
因为咱们应用秒级工夫戳作为score,因而咱们在查问无数据时能够采纳sleep(1)的操作缩小对Redis的反复申请。
发布者订阅者模型PubSub
Redis里有一个独自的模块来针对一对多的单向通信计划,这就是PubSub模块,PubSub模块能够用作播送模式,即一个发布者多个订阅者。
发布者订阅者模式能够了解为:订阅者(Subscriber)会在Redis上订阅一个通道,生产者在该通道上公布一条音讯,订阅者就会立即收到该音讯,如果咱们有多个订阅者,那么发布者公布的音讯会被多个订阅者同时收到截然不同的音讯。
咱们能够基于此模型来实现一个简略的音讯队列,应用一个发布者和一个订阅者来实现音讯的公布和订阅。
实战代码里的Redis client和下面一样,就不再列出了。
发布者(生产者)
发布者应用publish命令想频道里公布数据,订阅者即能够订阅到音讯
func ProducerMessagePubSub() { rand.Seed(time.Now().UnixNano()) for i := 0; i < 10; i++ { log.Println("正在生产一条音讯...", i) r, err := rdb.Publish("queue:pubsub", i).Result() if err != nil { log.Println(err,r) } time.Sleep(time.Duration(rand.Intn(3)) * time.Second) }}
订阅者(消费者)
订阅者订阅指定的频道,而后应用管道来接管音讯,并解决音讯。
func ConsumerMessagePubSub(node int){ //订阅频道 pubsub := rdb.Subscribe("queue:pubsub") // 用管道来接管音讯 ch := pubsub.Channel() // 解决音讯 for msg := range ch { log.Printf("以后节点:%d,生产到数据,channel:%s;message:%s\n", node, msg.Channel, msg.Payload) }}
当然如果咱们会公布大量的音讯, 同时会有多个消费者去生产,也能够将通道分成多个,每个通道有本人的订阅者订阅,而后发布者在公布音讯的时候依据节点ID或随机调配的形式调配到每个通道上来实现。
不过咱们也须要留神,以上的示例代码里,如果同时开启两个goroutine的话,发布者立即Publish音讯,而订阅者不能在第一工夫订阅到音讯,因为绝对于用go这个关键字去开启两个goroutine的话,简直是霎时的,因而肯定要先使订阅者订阅到频道后,再有公布操作。
参考:
Redis深度历险Redis 深度历险: 外围原理和利用实际