读本文之前,你应该曾经理解 RabbitMQ 的一些概念,如队列、交换机之类。

死信概念

艰深来讲,无奈被失常生产的音讯,咱们能够称之为死信。咱们将其放入死信队列,独自解决这部分“异样”音讯。

当音讯合乎以下的一个条件时,将会称为死信。

  • 音讯被回绝,不从新放回队列(应用 basic.reject / basic.nack 办法回绝音讯,并且这两个办法的参数 requeue = false)
  • 音讯TTL过期
  • 队列达到最大长度

利用

利用场景:当消费者无奈失常生产音讯、音讯产生异样时,为了保证数据不失落,将异样的音讯置为死信,放入死信队列。在死信队列中的音讯,将启动独自的生产程序非凡解决。

架构图:

上面跟着架构图来实现代码。

生产者

一个生产者一般来说只须要做两件事,一是创立链接,二是发送音讯。

RabbitMQ 中波及的队列、交换机、routing-key,这些都须要在代码中实现创立。这些操作既能够由生产者创立,也能够由消费者创立。对于谁来创立的探讨,见RabbitMq:谁来创立队列和交换机?此文。

本文中队列、交换机、routing-key 放到生产者一方来实现。所以生产者一共须要做这几件事。

  1. 创立连贯
  2. 设置队列(队列、交换机、绑定)
  3. 设置死信队列(队列、交换机、绑定)
  4. 公布音讯

创立连贯

利用streadway/amqp包,与RabbitMQ 建设连贯。

func main() {    mq := util.NewRabbitMQ()    defer mq.Close()    mqCh := mq.Channel    ……}……// util.NewRabbitMQ()func NewRabbitMQ() *RabbitMQ {    conn, err := amqp.Dial(constant.MqUrl)    FailOnError(err, "Failed to connect to RabbitMQ")    ch, err := conn.Channel()    FailOnError(err, "Failed to open a channel")    return &RabbitMQ{        Conn: conn,        Channel: ch,    }}

设置队列(队列、交换机、绑定)

外围操作就是设置队列阶段。

申明一般队列,并指定死信交换机、指定死信routing-key。后续死信队列创立后会与死信交换机、指定死信routing-key进行绑定

var err error_, err = mqCh.QueueDeclare(constant.NormalQueue, true, false, false, false, amqp.Table{    "x-message-ttl": 5000, // 音讯过期工夫,毫秒    "x-dead-letter-exchange": constant.DeadExchange, // 指定死信交换机    "x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key})util.FailOnError(err, "创立normal队列失败")

申明交换机

err = mqCh.ExchangeDeclare(constant.NormalExchange, amqp.ExchangeDirect, true, false, false, false, nil)util.FailOnError(err, "创立normal交换机失败")

目前,一般队列和交换机都曾经创立,但它们都是独立存在,没有关联。

通过 QueueBind 将队列、routing-key、交换机三者绑定到一起。

err = mqCh.QueueBind(constant.NormalQueue, constant.NormalRoutingKey, constant.NormalExchange, false, nil)util.FailOnError(err, "normal:队列、交换机、routing-key 绑定失败")

设置死信队列(队列、交换机、绑定)

同样死信队列,也须要创立队列、创立交换机和绑定。

// 申明死信队列// args 为 nil。切记不要给死信队列设置音讯过期工夫,否则生效的音讯进入死信队列后会再次过期。_, err = mqCh.QueueDeclare(constant.DeadQueue, true, false, false, false, nil)util.FailOnError(err, "创立dead队列失败")// 申明交换机err = mqCh.ExchangeDeclare(constant.DeadExchange, amqp.ExchangeDirect, true, false, false, false, nil)util.FailOnError(err, "创立dead队列失败")// 队列绑定(将队列、routing-key、交换机三者绑定到一起)err = mqCh.QueueBind(constant.DeadQueue, constant.DeadRoutingKey, constant.DeadExchange, false, nil)util.FailOnError(err, "dead:队列、交换机、routing-key 绑定失败")

当死信队列建设结束,一般队列通过 x-dead-letter-exchangex-dead-letter-routing-key 参数的指定,便可失效,死信队列便与一般队列连通。

公布音讯

message := "msg" + strconv.Itoa(int(time.Now().Unix()))fmt.Println(message)// 公布音讯err = mqCh.Publish(constant.NormalExchange, constant.NormalRoutingKey, false, false, amqp.Publishing{    ContentType: "text/plain",    Body:        []byte(message),})util.FailOnError(err, "音讯公布失败")

生产者残缺代码

package mainimport (    "fmt"    "github.com/streadway/amqp"    "learn_gin/go/rabbitmq/deadletter/constant"    "learn_gin/go/rabbitmq/deadletter/util"    "strconv"    "time")func main() {    // # ========== 1.创立连贯 ==========    mq := util.NewRabbitMQ()    defer mq.Close()    mqCh := mq.Channel    // # ========== 2.设置队列(队列、交换机、绑定) ==========    // 申明队列    var err error    _, err = mqCh.QueueDeclare(constant.NormalQueue, true, false, false, false, amqp.Table{        "x-message-ttl": 5000, // 音讯过期工夫,毫秒        "x-dead-letter-exchange": constant.DeadExchange, // 指定死信交换机        "x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key    })    util.FailOnError(err, "创立normal队列失败")    // 申明交换机    err = mqCh.ExchangeDeclare(constant.NormalExchange, amqp.ExchangeDirect, true, false, false, false, nil)    util.FailOnError(err, "创立normal交换机失败")    // 队列绑定(将队列、routing-key、交换机三者绑定到一起)    err = mqCh.QueueBind(constant.NormalQueue, constant.NormalRoutingKey, constant.NormalExchange, false, nil)    util.FailOnError(err, "normal:队列、交换机、routing-key 绑定失败")    // # ========== 3.设置死信队列(队列、交换机、绑定) ==========    // 申明死信队列    // args 为 nil。切记不要给死信队列设置音讯过期工夫,否则生效的音讯进入死信队列后会再次过期。    _, err = mqCh.QueueDeclare(constant.DeadQueue, true, false, false, false, nil)    util.FailOnError(err, "创立dead队列失败")    // 申明交换机    err = mqCh.ExchangeDeclare(constant.DeadExchange, amqp.ExchangeDirect, true, false, false, false, nil)    util.FailOnError(err, "创立dead队列失败")    // 队列绑定(将队列、routing-key、交换机三者绑定到一起)    err = mqCh.QueueBind(constant.DeadQueue, constant.DeadRoutingKey, constant.DeadExchange, false, nil)    util.FailOnError(err, "dead:队列、交换机、routing-key 绑定失败")    // # ========== 4.公布音讯 ==========    message := "msg" + strconv.Itoa(int(time.Now().Unix()))    fmt.Println(message)    // 公布音讯    err = mqCh.Publish(constant.NormalExchange, constant.NormalRoutingKey, false, false, amqp.Publishing{        ContentType: "text/plain",        Body:        []byte(message),    })    util.FailOnError(err, "音讯公布失败")}

消费者

因为队列、交换机都交由生产者来创立,消费者只需做两件,一是建设连贯、二是生产音讯。

也因为这个起因,消费者要晚于生产者启动,能够保障生产的时候,队列是存在的。

package mainimport (    "learn_gin/go/rabbitmq/deadletter/constant"    "learn_gin/go/rabbitmq/deadletter/util"    "log")func main() {    // # ========== 1.创立连贯 ==========    mq := util.NewRabbitMQ()    defer mq.Close()    mqCh := mq.Channel    // # ========== 2.生产音讯 ==========    msgsCh, err := mqCh.Consume(constant.NormalQueue, "", false, false, false, false, nil)    util.FailOnError(err, "生产normal队列失败")    forever := make(chan bool)    go func() {        for d := range msgsCh {            // 要实现的逻辑            log.Printf("接管的音讯: %s", d.Body)            // 手动应答            d.Ack(false)            //d.Reject(true)        }    }()    log.Printf("[*] Waiting for message, To exit press CTRL+C")    <-forever}

死信消费者

死信队列、交换机都也交由生产者来创立了,死信消费者也只需做两件,建设连贯和生产音讯。

package mainimport (    "learn_gin/go/rabbitmq/deadletter/constant"    "learn_gin/go/rabbitmq/deadletter/util"    "log")func main() {    // # ========== 1.创立连贯 ==========    mq := util.NewRabbitMQ()    defer mq.Close()    mqCh := mq.Channel    // # ========== 2.生产死信音讯 ==========    msgsCh, err := mqCh.Consume(constant.DeadQueue, "", false, false, false, false, nil)    util.FailOnError(err, "生产dead队列失败")    forever := make(chan bool)    go func() {        for d := range msgsCh {            // 要实现的逻辑            log.Printf("接管的音讯: %s", d.Body)            // 手动应答            d.Ack(false)            //d.Reject(true)        }    }()    log.Printf("[*] Waiting for message, To exit press CTRL+C")    <-forever}

源码Mr-houzi/go-demo

end!

集体博客同步文章 Golang 实现 RabbitMQ 的死信队列