乐趣区

golang-rabbitmq的使用四

之前几篇说了类似广播的 fanout 类型的 Exchange,支持分类的 direct 类型的 Exchange。在使用 direct 类型的 Exchange 中使用了 log 的例子,我们可以区分 info, debug, warn, error 类型的 log。但是实际中可能还会有更进一步的需求类似,我希望看到系统内核的 error 日志信息,希望看到请求耗时最长的接口的 debug 日志。对于这样的需求就可以使用 topic 类型的 exchange。

使用 topic 类型的 exchange 方法如下

(1)在 producer 和 consumer 中都声明相同 topic 类型的 exchange

err = ch.ExchangeDeclare(
                "logs_topic", // name
                "topic",      // type
                true,         // durable
                false,        // auto-deleted
                false,        // internal
                false,        // no-wait
                nil,          // arguments
        )

(2)在 producer 中发布消息时指定 routing key

err = ch.Publish(
                "logs_topic",          // exchange
                "error.kernel", // routing key
                false, // mandatory
                false, // immediate
                amqp.Publishing{
                        ContentType: "text/plain",
                        Body:        []byte(body),
                })

(3)在 consumer 中绑定 queue 到 exchange

err = ch.QueueBind(
                        q.Name,         // queue name
                        "error.kernel", // routing key
                        "logs_topic",   // exchange
                        false,
                        nil)
routing key 定义

routing key 可以明确指定明确的匹配也可使用通配符进行匹配。
通配符 ”#” 代表零个或多个 words
通配符 ”*” 代表一个词 (word)
routing key 如果指定为 ”#” 则接受所有的消息,类似 fanout 类型的 exchange
不带通配符 ”#” 或 ”*” 的 routing key 则接受指定的消息,类似 direct 类型的 exchange
routing key 使用以 ”.” 分割的层级结构。

例如:
"info.payment.vip" 是只消费 vip 支付的 info 消息。"info.payment.*" 接受所有支付的 info 消息

具体代码如下
conf.go

consumer.go

package main

import (
    config "binTest/rabbitmqTest/t1/l5/conf"
    "fmt"
    "log"
    "os"

    "github.com/streadway/amqp"
)

/*


./consumer "#" info.payment.* *.log debug.payment.#

*/

func main() {conn, err := amqp.Dial(config.RMQADDR)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    forever := make(chan bool)

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        config.EXCHANGENAME, //exchange name
        "topic",             //exchange kind
        true,                //durable
        false,               //autodelete
        false,
        false,
        nil,
    )

    failOnError(err, "Failed to declare exchange")

    if len(os.Args) < 2 {log.Println(len(os.Args))
        log.Println(`"Arguments error(Example: ./consumer"#"info.payment.* *.log debug.payment.#"`)
        return
    }

    topics := os.Args[1:]
    topicsCnt := len(topics)

    for routing := 0; routing < topicsCnt; routing++ {go func(routingNum int) {

            q, err := ch.QueueDeclare(
                "",
                false, //durable
                false, //delete when unused
                true,  //exclusive
                false, //no-wait
                nil,   //arguments
            )

            failOnError(err, "Failed to declare a queue")

            err = ch.QueueBind(
                q.Name,
                topics[routingNum],
                config.EXCHANGENAME,
                false,
                nil,
            )
            failOnError(err, "Failed to bind exchange")

            msgs, err := ch.Consume(
                q.Name,
                "",
                true, //Auto Ack
                false,
                false,
                false,
                nil,
            )

            failOnError(err, "Failed to register a consumer")

            for msg := range msgs {log.Printf("In %s consume a message: %s\n", topics[routingNum], msg.Body)
            }

        }(routing)
    }

    <-forever
}

func failOnError(err error, msg string) {
    if err != nil {fmt.Printf("%s: %s\n", msg, err)
    }
}

producer.go

package main

import (
    config "binTest/rabbitmqTest/t1/l5/conf"
    "fmt"
    "log"
    "os"

    "github.com/streadway/amqp"
)

func main() {conn, err := amqp.Dial(config.RMQADDR)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        config.EXCHANGENAME, //exchange name
        "topic",             //exchange kind
        true,                //durable
        false,               //autodelete
        false,
        false,
        nil,
    )

    failOnError(err, "Failed to declare exchange")

    if len(os.Args) < 3 {fmt.Println("Arguments error(ex:producer topic msg1 msg2 msg3")
        return
    }

    routingKey := os.Args[1]

    msgs := os.Args[2:]

    msgNum := len(msgs)

    for cnt := 0; cnt < msgNum; cnt++ {msgBody := msgs[cnt]
        err = ch.Publish(
            config.EXCHANGENAME, //exchange
            routingKey,          //routing key
            false,
            false,
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(msgBody),
            })

        log.Printf("[x] Sent %s", msgBody)
    }
    failOnError(err, "Failed to publish a message")

}

func failOnError(err error, msg string) {
    if err != nil {fmt.Printf("%s: %s\n", msg, err)
    }
}

运行结果
consumer

producer

根据结果可以看出
consumer 指定了 4 个 routing key

  • “#” 接收所有消息。producer 的所有消息都会接收
  • “info.payment.*” 接受所有 info.payment 的消息。
    producer 发出的如下消息会被接收
    ./producer info.payment.googlepay “start payment at 16:00”
  • “*.log” 接受所有第二层级为 log 的消息
    producer 发出的如下消息会被接收
    ./producer info.log “start recording payment log”
  • “debug.payment.#” 接收所有发往 routing key 为 debug.payment 开头的消息
    producer 发出的如下消息会被接收
    ./producer debug.payment.Alipay “10 yuan payment at 15:25” “1000 dollors payment at 15:26” “10 pounds payment at 15:27”
    如下的消息也会被接收(因为 #代表零个或多个 words)
    ./producer debug.payment.payinfo.Alipay “test payment info”

详细代码在如下
https://github.com/BinWang-sh…

退出移动版