关于后端:基于RabbitMQ构建延迟队列

30次阅读

共计 2195 个字符,预计需要花费 6 分钟才能阅读完成。

提早工作在业务中是一个很常见的需要,比方:

订单下单 15 分钟之后,用户没有领取,则主动勾销订单
用户做了某些操作,5 分钟之后发短信揭示用户
诸如此类的场景亘古未有,一种最常见的实现形式,就是开启一个定时工作,而后始终轮询数据库,这种实现形式在数据量小的时候还好,然而数据量一旦过大,这轮询数据库就会给数据库造成很大的压力,此时全面扫表的实现形式就显得不牢靠了。

另外一种实现形式,就是用提早队列的形式来实现,然而 RabbitMQ 自身是没有实现提早队列的,不过能够应用 TTL+ 死信队列的形式来实现提早队列。

音讯的 TTL

TTL 全称 Time To Live,即生存工夫。音讯的 TTL 也就是音讯的生存工夫。在 RabbitMQ 中设置 TTL 有两种

第一种是申明队列的时候,在队列的属性中设置 TTL,这样该队列中的音讯都会有雷同的有效期
第二种是发送音讯时给音讯设置属性,能够为每条音讯都设置不同的 TTL
如果两者都设置,生存工夫取两者最小的那一个。这里咱们采纳第二种,即为每条音讯设置 TTL

死信交换机 / 死信队列

一个音讯在满足如下的条件的时候,就会变成“死信”,并且能被投递到死信交换机(Dead-Letter-Exchange),最初进入到死信交换机绑定的队列,也称死信队列(Dead-Letter-Queue)

  • 音讯被回绝而且 requeue=false
  • 音讯的 TTL 到了,即音讯过期
  • 队列排满了,排在后面的音讯会被抛弃或者扔到死信路由上

死信交换机和一般的交换机是没有区别的,只是某一个设置死信交换机的队列中有音讯过期了,会主动触发音讯的转发,发送到死信交换机中去,再由死信交换机转发到死信队列中。死信队列也是一个一般的队列,并没有什么其它非凡的。

提早队列的实现

接着来看看 TTL+ 死信交换机是如何实现提早队列的

下面的流程就是实现提早队列的思路,比方说 15 分钟勾销订单,那么用户下单之后,音讯的 TTL 设置为 15 分钟,当音讯在 Queue1 待的工夫到了 15 分钟,那么就会被转发到 Dead-Letter-Exchange,从而转发到 Dead-Letter-Queue,最初被消费者生产,实现提早工作。

先在 RabbitMQ 控制台创立一个名为 dlx 的交换机,作为死信交换机,并绑定上一个 dlxQueue 队列,作为 Dead-Letter-Queue

// 生产者.go
package main

import (
    "github.com/streadway/amqp"
    "mq/fail"
)

func main() {conn, err := amqp.Dial("amqp://123:123@localhost:5672")
    fail.OnError(err)
    defer conn.Close()

    ch, err := conn.Channel()
    fail.OnError(err)
    defer ch.Close()

    args := amqp.Table{"x-dead-letter-exchange": "dlx"} 
    q, err := ch.QueueDeclare("test", true, false, false, false, args) // 申明一个 test 队列,并设置队列的死信交换机为 "dlx"

    body := "hello world1"
    for i := 0; i < 10; i++ {
        err = ch.Publish("", q.Name, false, false, amqp.Publishing{Body:       []byte(body),
            Expiration: "5000", // 设置 TTL 为 5 秒
        })
        fail.OnError(err)
    }
}

启动生产者,能够看到音讯被投递到 test 队列中

5 秒之后,音讯被转发到 dlxQueue 队列中

之后有一个消费者,专门解决这个 dlxQueu 队列中的音讯

// 消费者.go
package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "mq/fail"
)

func main() {conn, err := amqp.Dial("amqp://123:123@localhost:5672")
    fail.OnError(err)

    c, err := conn.Channel()
    fail.OnError(err)

    msgs, err := c.Consume("dlxQueue", "", true, false, false, false, nil) // 监听 dlxQueue 队列
    fail.OnError(err)

    for d := range msgs {fmt.Printf("收到信息: %s\n", d.Body) // 收到音讯,业务解决
    }
}
// 5 秒之后,打印
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1
// 收到信息: hello world1

总结
应用 TTL+ 死信交换机实现提早工作还是十分不便的,除此之外还能够应用相干的插件 abbitmq-delayed-message-exchange,来实现提早队列,也是十分的不便。

转自:

juejin.cn/post/6844904142155022344

本文由 mdnice 多平台公布

正文完
 0