乐趣区

关于golang:Golang-实现-RabbitMQ-的延迟队列

读本文之前,你应该曾经理解 RabbitMQ 的一些概念,如队列、交换机之类。

提早队列简介

一个队列中的音讯在提早一段时间后才被消费者生产,这样的队列能够称之为提早队列。

提早队列的利用场景非常宽泛,如:下单后 30 分钟内未付款则勾销订单;在某个工夫下发一条告诉等。

通过死信实现提早队列

通过 Golang 实现 RabbitMQ 的死信队列的介绍,咱们能够很容易的实现一个提早队列。

  1. 将失常队列的消费者勾销;
  2. 发消息时设置 TTL;

通过下面两点,失常队列的音讯始终不会被生产,而是期待音讯 TTL 到期,进入死信队列,让死信消费者进行生产,从而达到提早队列的成果。

下面看上去仿佛没什么问题,实测一下就会发现 音讯不会“如期死亡”

当先生产一个 TTL 为 60s 的音讯,再生产一个 TTL 为 5s 的音讯,第二个音讯并不会再 5s 后过期进入死信队列,而是须要等到第一个音讯 TTL 到期后,与第一个音讯一起进入死信队列。这是因为 RabbitMQ 只会判断队列中的第一个音讯是否过期。

通过插件实现提早队列

架构

对于上文的问题,天然有解决办法,那就是通过 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来解决。本文不赘述 RabbitMQ 和插件的装置,你能够参考此文装置或应用 Docker 来装置。

此插件的原理是将音讯在 交换机处 暂存储在 mnesia(一个分布式数据系统)表中,提早投递到队列中,等到音讯到期再投递到队列当中。

简略理解了插件的原理,咱们便能够如此设计提早队列。

实现

生产者实现的关键点:

1. 在申明交换机时不在是 direct 类型,而是 x-delayed-message 类型,这是由插件提供的类型;

2. 交换机要减少 "x-delayed-type": "direct" 参数设置;

3. 公布音讯时,要在 Headers 中设置 x-delay 参数,来管制音讯从交换机过期工夫;

err = mqCh.Publish(constant.Exchange1, constant.RoutingKey1, false, false, amqp.Publishing{
    ContentType: "text/plain",
    Body:        []byte(message),
    //Expiration: "10000", // 音讯过期工夫(音讯级别), 毫秒
    Headers: map[string]interface{}{"x-delay": "5000", // 音讯从交换机过期工夫, 毫秒(x-dead-message 插件提供)},
})

生产者残缺代码:

// producter.go
package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "learn_gin/go/rabbitmq/delayletter/constant"
    "learn_gin/go/rabbitmq/util"
    "strconv"
    "time"
)

func main() {
    // # ========== 1. 创立连贯 ==========
    mq := util.NewRabbitMQ()
    defer mq.Close()
    mqCh := mq.Channel

    // # ========== 2. 设置队列(队列、交换机、绑定)==========
    // 申明队列
    var err error
    _, err = mqCh.QueueDeclare(constant.Queue1, true, false, false, false, amqp.Table{// "x-message-ttl": 60000, // 音讯过期工夫(队列级别), 毫秒})
    util.FailOnError(err, "创立队列失败")

    // 申明交换机
    //err = mqCh.ExchangeDeclare(Exchange1, amqp.ExchangeDirect, true, false, false, false, nil)
    err = mqCh.ExchangeDeclare(constant.Exchange1, "x-delayed-message", true, false, false, false, amqp.Table{"x-delayed-type": "direct",})
    util.FailOnError(err, "创立交换机失败")

    // 队列绑定(将队列、routing-key、交换机三者绑定到一起)err = mqCh.QueueBind(constant.Queue1, constant.RoutingKey1, constant.Exchange1, false, nil)
    util.FailOnError(err, "队列、交换机、routing-key 绑定失败")

    // # ========== 4. 公布音讯 ==========
    message := "msg" + strconv.Itoa(int(time.Now().Unix()))
    fmt.Println(message)
    // 公布音讯
    err = mqCh.Publish(constant.Exchange1, constant.RoutingKey1, false, false, amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte(message),
        //Expiration: "10000", // 音讯过期工夫(音讯级别), 毫秒
        Headers: map[string]interface{}{"x-delay": "5000", // 音讯从交换机过期工夫, 毫秒(x-dead-message 插件提供)},
    })
    util.FailOnError(err, "音讯公布失败")
}

因为在生产者端建设队列和交换机,所以消费者并不需要非凡的设置,间接附代码。

消费者残缺代码:

// consumer.go
package main

import (
    "learn_gin/go/rabbitmq/delayletter/constant"
    "learn_gin/go/rabbitmq/util"
    "log"
)

func main() {
    // # ========== 1. 创立连贯 ==========
    mq := util.NewRabbitMQ()
    defer mq.Close()
    mqCh := mq.Channel

    // # ========== 2. 生产音讯 ==========
    msgsCh, err := mqCh.Consume(constant.Queue1, "", false, false, false, false, nil)
    util.FailOnError(err, "生产队列失败")

    forever := make(chan bool)
    go func() {
        for d := range msgsCh {
            // 要实现的逻辑
            log.Printf("接管的音讯: %s", d.Body)

            // 手动应答
            d.Ack(false)
            //d.Reject(true)
        }
    }()
    log.Printf("[*] Waiting for message, To exit press CTRL+C")
    <-forever
}

end!

源码 Mr-houzi/go-demo

退出移动版