关于rabbitmq:RabbitMQ进阶高级用法

52次阅读

共计 8034 个字符,预计需要花费 21 分钟才能阅读完成。

音讯的去处

mandatory 和 immediate 是发送音讯办法中的两个参数,它们都有当消息传递过程中目的地不可达时将音讯返回给生产者的性能。RabbitMQ 提供的备份交换器能够将未能被交换器路由的音讯存储起来,而不必返回给客户端。

mandatory 参数

当 mandatory 参数设为 true 时,交换器无奈依据本身的类型和路由键找到一个符合条件的队列,那么 RabbitMQ 会将音讯返回给生产者。当 mandatory 参数设置为 false 时,则音讯间接被抛弃。

那么生产者如何获取到没有被正确路由到适合队列的音讯呢?这时候能够通过注册音讯返回监听器来实现:

func (ch *Channel) NotifyReturn(c chan Return) chan Return

immediate 参数

当 immediate 参数设为 true 时,如果交换器在将音讯路由到队列时发现队列上并不存在任何消费者,那么这条音讯将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该音讯会返回至生产者。

概括来说,mandatory 参数通知服务器至多将该音讯路由到一个队列中,否则将音讯返回给生产者。immediate 参数通知服务器,如果该音讯关联的队列上有消费者,则立即生产;如果所有匹配的队列上都没有消费者,则间接将音讯返还给生产看,不必将音讯存入队列而消费者了

备份交换器

生产者在发送音讯的时候如果不设置 mandatory 参数,那么音讯在未被路由的状况下将会失落。如果设置了 mandatory 参数,那么须要增加 ReturnListener 的编程逻辑,生产者的代码将变得复杂。如果既不想复杂化生产者的编程逻辑,又不想音讯失落,那么能够应用备份交换器,这样能够将未被路由的音讯存储在 RabbitMQ 中,再在须要的时候去解决这些音讯。

能够通过在申明交换器的时候设置 args 参数中的 alternate-exchange 选项来实现,也能够通过策略的形式实现。如果两者同时应用,则前者的优先级更高,会笼罩掉 Policy 的设置。

备份交换器其实和一般的交换器没有太大的区别,须要留神的是,音讯被从新发送到备份交换器时的路由键和从生产者收回的路由键是一样的。

对于备份交换器,总结了以下几种非凡状况:

  • 如果设置的备份交换器不存在,客户端和 RabbitMQ 服务端都不会有异样呈现,此时音讯会失落。
  • 如果备份交换器没有绑定任何队列,客户端和 RabbitMQ 服务端都不会有异样呈现,此时音讯会失落。
  • 如果备份交换器没有任何匹配的队列,客户端和 RabbitMQ 服务端都不会有异样呈现,此时音讯会失落。
  • 如果备份交换器和 mandatory 参数一起应用,那么 mandatory 参数有效。

过期工夫(TTL)

设置音讯的 TTL

目前有两种办法能够设置音讯的 TTL。 第一种办法是通过队列属性设置,队列中所有音讯都有雷同的过期工夫。第二种办法是对音讯自身进行独自设置,每条音讯的 TTL 能够不同。如果两种办法一起应用,则音讯的 TTL 以两者之间较小的那个数值为准。 音讯在队列中的生存工夫一旦超过设置的 TTL 值时,就会变成“死信”。

通过队列属性设置音讯 TTL 的办法是在 QueueDeclare 办法的 args 参数中退出 x -message-ttl 选项实现的,这个参数的单位是毫秒。同时也能够通过 Policy 或 HTTP API 接口设置。

如果不设置 TTL,则示意此音讯不会过期。如果将 TTL 设置为 0,则示意除非此时能够间接将音讯投递到消费者,否则该音讯会被立刻抛弃。

针对每条音讯设置 TTL 的办法是在 Publish 办法的音讯 Publishing 构造体中设置 Expiration 属性,单位为毫秒。也能够通过 HTTP API 接口设置。

对于第一种设置队列 TTL 属性的办法,一旦音讯过期,就会从队列中删除,而在第二种办法中,即便音讯过期,也不会马上从队列中删除,因为每条音讯是否过期是在行将投递到消费者之前断定的。

设置队列的 TTL

通过 QueueDeclare 办法的 args 参数中 x -expires 选项能够管制队列被主动删除前处于未应用状态的最大工夫,单位是毫秒,不能设置为 0。未应用的意思是队列上没有任何的消费者,队列也没有被从新申明,并且在过期时间段内也未调用过 Get 办法。

设置队列里的 TTL 能够利用于相似 RPC 形式的回复队列,在 RPC 中,会创立很多未被应用的队列。

RabbitMQ 会确保在过期工夫达到后将队列删除,然而不保障删除的动作有多及时。在 RabbitMQ 重启后,长久化的队列的过期工夫会被从新计算。

死信队列

DLX,全称为 Dead-Letter-Exchange,能够称之为死信交换器。当音讯在一个队列中变成死信之后,它能被从新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。

音讯变成死信个别是因为以下几种状况:

  • 音讯被回绝(Reject/Nack),并且设置 requeue 参数为 false。
  • 音讯过期。
  • 队列达到最大长度。

DLX 也是一个失常的交换器,和个别的交换器没有区别,它能在任何的队列上被指定(实际上就是设置某个队列的属性)。当这个队列中存在死信时,RabbitMQ 就会主动地将这个音讯从新公布到设置的 DLX 下来,进而被路由到死信队列。能够监听这个队列中的音讯以进行相应的解决,这个个性与将音讯的 TTL 设置为 0 配合应用能够补救 immeaiate 参数的性能。

通过在 QueueDeclare 办法的 args 参数中设置 x -dead-letter-exchange 选项来为这个队列增加 DLX:

err = ch.ExchangeDeclare(
        "dlx_exchange",
        "direct",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {log.Fatalf("%s: %s", err, "Failed to declare an exchange")
        return
    }
    args := make(map[string]interface{})
    args["x-dead-letter-exchange"] = "dlx_exchange"
    // 为队列 my_queue 增加 DLX
    q, err := ch.QueueDeclare(
        "my_queue",
        true,
        false,
        false,
        false,
        args,
    )
    if err != nil {log.Fatalf("%s: %s", err, "Failed to declare a queue")
        return
    }

也能够为这个 DLX 指定路由键,如果没有非凡指定,则应用原队列的路由键:

args["x-dead-letter-routing-key"] = "dlx_routing_key"

对于 RabbitMQ 来说,DLX 是一个十分有用的个性。它能够解决异常情况下,音讯不可能被消费者正确生产(消费者调用了 Nack 或者 Reject)而被置入死信队列中的状况,后续分析程序能够通过生产这个死信队列中的内容来剖析过后所遇到的异常情况,进而能够改善和优化零碎。

提早队列

提早队列存储的对象是对应的提早音讯,所谓“提早音讯”是指当音讯被发送当前,并不想让消费者立即拿到音讯,而是期待特定工夫后,消费者能力拿到这个音讯进行生产。

提早队列的应用场景有很多,比方:

  • 在订单零碎中,一个用户下单之后通常有 30 分钟的工夫进行领取,如果 30 分钟之内没有领取胜利,那么这个订单将进行异样解决,这时就能够应用提早队列来解决这些订单了。
  • 用户心愿通过手机近程遥控家里的智能设施在指定的工夫进行工作。这时候就能够将用户指令发送到提早队列,当指令设定的工夫到了再将指令推送到智能设施。

RabbitMQ 自身没有间接反对提早队列的性能,然而能够通过后面所介绍的 DLX 和 TTL 模拟出提早队列的性能。

假如一个利用中须要将每条音讯都设置为 10 秒的提早,能够创立两组交换器和队列:惯例交换器 exchange.normal、惯例队列 queue.normal 和死信交换器 exchange.dlx、死信队列 queue.dlx,而后为 queue.normal 增加死信交换器 exchange.dlx。生产者将音讯发送到 exchange.normal 并设置 TTL 为 10 秒,同时消费者订阅 queue.dlx 而非 queue.normal。当音讯从 queue.normal 中过期被存入 queue.dlx 中,消费者就凑巧生产到了提早 10 秒的这条音讯。

优先级队列

优先级队列,顾名思义,具备高优先级的队列具备高的优先权,优先级高的音讯具备优先被生产的特权。

优先级队列能够通过在 QueueDeclare 办法的 args 参数中设置 x -max-priority 选项来实现。不过这只是配置一个队列的最大优先级,在此之后须要在发送时设置音讯的优先级,设置形式是 Publishing 构造体中的 Priority 属性。

音讯的优先级默认最低为 0,最高为队列设置的最大优先级。优先级高的音讯能够被优先生产,不过如果在消费者的生产速度大于生产者的速度且 Broker 中没有音讯沉积的状况下,对发送的音讯设置优先级也就没有什么实际意义。

RPC 实现

RPC,是 Remote Procedure Call 的简称,即近程过程调用。它是一种通过网络从近程计算机上申请服务,而不须要理解底层网络的技术。RPC 的次要功能是让构建分布式计算更容易,在提供弱小的近程调用能力时不损失本地调用的语义简洁性。

在 RabbitMQ 中实现 RPC 也很简略。 客户端发送申请音讯,为了接管响应音讯,咱们须要在申请音讯中发送一个回调队列。 能够应用默认的队列:

err = ch.Publish(
    "","rpc_queue",
    false,
    false,
    amqp.Publishing{
        ContentType:   "text/plain",
        CorrelationId: "",        
        ReplyTo:       "",
        Body:          []byte("rpc"),
    },
)

对于代码中波及的 Publishing 构造体,这里须要用到它的两个属性:

  • ReplyTo:通常用来设置一个回调队列。
  • CorrelationId:用来关联申请和其调用 RPC 之后的回复。

如果像下面的代码中一样,为每个 RPC 申请创立一个回调队列,则是十分低效的。然而侥幸的是这里有一个通用的解决方案——能够为每个客户端创立一个繁多的回调队列。

咱们应该为每一个申请设置一个惟一的 correlationId,对于回调队列而言,在其接管到一条回复的音讯之后,它能够依据这个属性匹配到相应的申请。如果回调队列接管到一条未知 correlationId 的回复音讯,能够简略地将其抛弃。

如图所示,RPC 的解决流程如下:

  1. 当客户端启动时,创立一个匿名的回调队列。
  2. 客户端为 RPC 申请设置 replyTo 和 correlationId。
  3. 申请被发送到 rpc_queue 队列中。
  4. RPC 服务端监听 rpc_queue 队列中的申请,当申请到来时,服务端会解决并且把带有后果的音讯发送给客户端。接管的队列就是 replyTo 设定的回调队列。
  5. 客户端监听回调队列,当有音讯时,查看 correlationId 属性,如果与申请匹配,进行其余回调解决。

能够参考 RabbitMQ 官网的示例,RPC 客户端通过 RPC 来调用服务端的办法以便失去相应的斐波那契值:
rpc_server.go
rpc_client.go

生产者确认

在应用 RabbitMQ 的时候,当音讯的生产者将音讯发送进来之后,音讯到底有没有正确地达到服务器呢?RabbitMQ 针对这个问题,提供了两种解决形式:

  • 事务机制
  • 发送方确认(publisher confirm)机制

事务机制

RabbitMQ 客户端中与事务机制相干的办法有三个:

func (ch *Channel) Tx() error
func (ch *Channel) TxCommit() error
func (ch *Channel) TxRollback() error

channel.Tx 用于将以后的信道设置成事务模式,channel.TxCommit 用于提交事务,channel.TxRollback 用于事务回滚。

在通过 channel.Tx 办法开启事务之后,咱们便能够公布音讯给 RabbitMQ 了,如果事务提交胜利,则音讯肯定达到了 RabbitMQ,如果在事务提交执行之前因为 RabbitMQ 异样解体或者其余起因抛出了异样,这个时候咱们便能够将其捕捉,进而通过执行 channel.txRollback 办法来实现事务回滚。

如果要发送多条音讯,则将 channel.Publish 和 channel.TxCommit 等办法包裹进循环内即可。

事务的确可能解决音讯发送方和 RabbitMQ 之间音讯确认的问题,只有音讯胜利被 RabbitMQ 接管,事务能力提交胜利,否则便可在捕捉异样之后进行事务回滚,与此同时能够进行音讯重发。然而应用事务机制会重大影响 RabbitMQ 的性能,那么有没有更好的办法呢?RabbitMQ 提供了一个改良计划,即发送方确认机制。

发送方确认机制

生产者能够将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道下面公布的音讯都会被指派一个惟一的 ID(从 1 开始),当音讯被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认(Ack)给生产者(蕴含音讯 ID),这就使得生产者通晓音讯曾经正确达到了目的地了。如果音讯和队列是可长久化的,那么确认音讯会在音讯写入磁盘之后收回。RabbitMQ 回传给生产者的确认音讯中的 deliveryTag 蕴含了确认音讯的序号,此外 RabbitMQ 也能够设置 Ack 办法中的 multiple 参数,示意在这个序号之前的所有音讯都曾经失去了解决。

事务机制在一条音讯发送之后会使发送端阻塞,以期待 RabbitMQ 的回应,之后能力持续发送下一条音讯。相比之下, 发送方确认机制最大的益处在于它是异步的 ,一旦公布一条音讯,生产者应用程序就能够在期待信道返回确认的同时持续发送下一条音讯,当音讯最终失去确认之后,生产者应用程序便能够通过回调办法来解决该确认音讯,如果 RabbitMQ 因为本身外部谬误导致音讯失落,就会发送一条 Nack 命令,生产者应用程序同样能够在回调办法中解决该 Nack 命令。

在 confirm 模式中,所有发送的音讯都会被 Ack 或者 Nack 一次,不会呈现一条音讯既被 Ack 又被 Nack 的状况 ,不过 RabbitMQ 也并没有对音讯被 confirm 的快慢做任何保障。

上面是一个官网上的代码示例:

package main

import (
    "log"

    amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
    if err != nil {log.Fatalf("%s: %s", msg, err)
    }
}

func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()
    
    // 初始化接管确认音讯的通道
    confirms := make(chan amqp.Confirmation)
    // 注册通道使之监听公布的确认音讯
    ch.NotifyPublish(confirms)
    go func() {
        for confirm := range confirms {
            // 返回确认音讯
            if confirm.Ack {
                // code when messages is confirmed
                log.Printf("Confirmed")
            } else {
                // code when messages is nack-ed
                log.Printf("Nacked")
            }
        }
    }()

    // 将信道设置成 confirm 模式
    err = ch.Confirm(false)
    failOnError(err, "Failed to confirm")

    q, err := ch.QueueDeclare(
        "",    // name
        false, // durable
        false, // delete when unused
        false, // exclusive
        false, // no-wait
        nil,   // arguments
    )
    failOnError(err, "Failed to declare a queue")

    consume(ch, q.Name)
    publish(ch, q.Name, "hello")

    log.Printf("[*] Waiting for messages. To exit press CTRL+C")
    forever := make(chan bool)
    <-forever
}

func consume(ch *amqp.Channel, qName string) {
    msgs, err := ch.Consume(
        qName, // queue
        "",    // consumer
        true,  // auto-ack
        false, // exclusive
        false, // no-local
        false, // no-wait
        nil,   // args
    )
    failOnError(err, "Failed to register a consumer")

    go func() {
        for d := range msgs {log.Printf("Received a message: %s", d.Body)
        }
    }()}

func publish(ch *amqp.Channel, qName, text string) {
    err := ch.Publish("", qName, false, false, amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte(text),
    })
    failOnError(err, "Failed to publish a message")
}

注:事务机制和 publisher confirm 机制两者是互斥的,不能共存。

生产端要点

音讯散发

当 RabbitMQ 队列领有多个消费者时,队列收到的音讯将以轮询的形式分发给消费者。每条音讯只会发送给订阅列表里的一个消费者。如果当初负载减轻,那么只须要创立更多的消费者来生产解决音讯即可。

不过很多时候轮询的散发机制不那么优雅。如果某些消费者工作沉重,来不及生产那么多的音讯,而某些其余消费者因为某些起因(比方业务逻辑简略、机器性能卓越等)很快地解决完了所调配到的音讯,进而过程闲暇,这样就会造成整体利用吞吐量的降落。

这里就要用到 Qos 这个办法,它能够限度信道上的消费者所能放弃的最大未确认音讯的数量:

func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error
  • prefetchCount:消费者未确认音讯的个数下限。设置为 0 示意没有下限。
  • prefetchSize:消费者未确认音讯的大小下限,单位为 B。设置为 0 示意没有下限。
  • global:是否全局失效,true 示意是。 全局失效指的是信道上所有的消费者都受到 prefetchCount 和 prefetchSize 的限度(否则只有新消费者才会受到限制)。

注:Qos 办法对于拉模式有效。

音讯程序性

RabbitMQ 在不应用任何高级个性,也没有音讯失落、网络故障之类异样的状况产生,并且只有一个消费者的状况下,最好也只有一个生产者的状况下能够保障音讯的程序性。如果有多个生产者同时发送音讯,无奈确定音讯达到 Broker 的前后程序,也就无奈验证音讯的程序性。

在很多情景下,都会导致 RabbitMQ 音讯错序。如果要保障音讯的程序性,须要业务方应用 RabbitMQ 之后做进一步的解决,比方在音讯体内增加全局有序标识来实现。

正文完
 0