乐趣区

golang-rabbitmq的使用五

先说一个实际的业务场景:
Client 端有一个请求需要进行耗时处理或者查询,这个处理在 Server 端做。Server 端处理完后通知给请求的 Client 端。
这种场景可以称之为 RPC(Remote Procedure Call)

有两个点说明一下:

  • <1>Client 端发送请求给 Server 端可以简单定义一个 Queue。Client 作为 Producer 发布消息,Server 端作为 Cosumer 消费消息
  • <2>Server 端处理完耗时处理后需要将处理结果返回给请求的客户端。

    • 可以在 Client 声明一个不指定名称的 Queue,系统会自动生成一个随机名称的 Queue。将 Queue 的名称在 publish 是发送给 Server 端
    • 因为 Server 端要将处理结果返回给对应的请求,所以在 Client 端需要生成一个 CorrelationId 发送给 Server 端

处理流程

Client 端
(1) 声明从 Server 返回消息用的 queue

respQueue, err := ch.QueueDeclare(
        "",    // name
        false, // durable
        false, // delete when unused
        true,  // exclusive
        false, // noWait
        nil,
    )

(2)发送请求消息到 rpc_queue

    err = ch.Publish(
        "",               //exchange
        config.QUEUENAME, //routing key
        false,
        false,
        amqp.Publishing{
            ContentType:   "text/plain",
            CorrelationId: correlationID,
            ReplyTo:       respQueue.Name,
            Body:          []byte(msgBody),
        })

corrId 为自己随机生成的 Id

(2)Server 端
(3) 声明 rpc_queue, 从 rpc_queue 中消费消息

q, err := ch.QueueDeclare(
                config.QUEUENAME,
                false,
                false,
                false,
                false,
                nil,
            )

            msgs, err := ch.Consume(
                q.Name,
                "",
                false, // auto ack
                false,
                false,
                false,
                nil,
            )

(4)执行处理后使用 msg 中的 ReplyTo 返回处理结果给 Client

    err = ch.Publish(
        "",
        msg.ReplyTo,
        false,
        false,
        amqp.Publishing{
            ContentType:   "text/plain",
            CorrelationId: msg.CorrelationId,
            Body:          []byte(bookName),
        })

    msg.Ack(false)

(5)Client 端从 reply queue 中接收从 Server 端来的 response

respMsgs, err := ch.Consume(
        respQueue.Name,
        "",
        true,  // auto-ack
        true,  // exclusive
        false, // noLocal
        false, // nowait
        nil,
    )

详细代码如下
conf.go

package config

const (
    RMQADDR            = "amqp://guest:guest@172.17.84.205:5672/"
    QUEUENAME          = "rpc_queue"
    SERVERINSTANCESCNT = 5
)

client.go

package main

import (
    config "binTest/rabbitmqTest/t1/l6/conf"
    "fmt"
    "log"
    "math/rand"
    "os"

    "github.com/streadway/amqp"
)

func main() {if len(os.Args) < 2 {log.Println("Arguments error")
        return
    }

    conn, err := amqp.Dial(config.RMQADDR)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    msgBody := os.Args[1]

    respQueue, err := ch.QueueDeclare(
        "",    // name
        false, // durable
        false, // delete when unused
        true,  // exclusive
        false, // noWait
        nil,
    )
    failOnError(err, "Failed to declare a response queue")

    correlationID := randomID(32)

    err = ch.Publish(
        "",               //exchange
        config.QUEUENAME, //routing key
        false,
        false,
        amqp.Publishing{
            ContentType:   "text/plain",
            CorrelationId: correlationID,
            ReplyTo:       respQueue.Name,
            Body:          []byte(msgBody),
        })

    log.Printf("[x] Sent %s", msgBody)
    failOnError(err, "Failed to publish a message")

    respMsgs, err := ch.Consume(
        respQueue.Name,
        "",
        true,  // auto-ack
        true,  // exclusive
        false, // noLocal
        false, // nowait
        nil,
    )

    for item := range respMsgs {
        if item.CorrelationId == correlationID {fmt.Println("response:", string(item.Body))
            break
        }
    }
}

func failOnError(err error, msg string) {
    if err != nil {fmt.Printf("%s: %s\n", msg, err)
    }
}

func randomID(length int) string {
    if length <= 0 {return ""}

    bytes := make([]byte, length)
    for i := 0; i < length; i++ {bytes[i] = byte(rand.Intn(9))
    }

    return string(bytes)
}

server.go

package main

import (
    config "binTest/rabbitmqTest/t1/l6/conf"
    "fmt"
    "log"
    "math/rand"
    "time"

    "github.com/streadway/amqp"
)

func main() {conn, err := amqp.Dial(config.RMQADDR)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.Qos(
        config.SERVERINSTANCESCNT,
        0,
        false,
    )

    forever := make(chan bool)

    for routine := 0; routine < config.SERVERINSTANCESCNT; routine++ {go func(routineNum int) {

            q, err := ch.QueueDeclare(
                config.QUEUENAME,
                false,
                false,
                false,
                false,
                nil,
            )
            failOnError(err, "Failed to declare a queue")

            msgs, err := ch.Consume(
                q.Name,
                "",
                false, // auto ack
                false,
                false,
                false,
                nil,
            )

            for msg := range msgs {log.Printf("In %d start consuming message: %s\n", routineNum, msg.Body)

                bookName := queryBookID(string(msg.Body))

                err = ch.Publish(
                    "",
                    msg.ReplyTo,
                    false,
                    false,
                    amqp.Publishing{
                        ContentType:   "text/plain",
                        CorrelationId: msg.CorrelationId,
                        Body:          []byte(bookName),
                    })

                if err != nil {fmt.Println("Failed to reply msg to client")
                } else {fmt.Println("Response to client:", bookName)
                }
                msg.Ack(false)
            }
        }(routine)
    }

    <-forever
}

func failOnError(err error, msg string) {
    if err != nil {fmt.Printf("%s: %s\n", msg, err)
    }
}

func queryBookID(bookID string) string {
    bookName := "QUERIED_" + bookID
    time.Sleep(time.Duration(rand.Intn(9)) * time.Second)

    return bookName
}

执行效果
Client 端

Server 端

全部代码可以在如下处取得
https://github.com/BinWang-sh…

退出移动版