先说一个实际的业务场景:
Client 端有一个请求需要进行耗时处理或者查询,这个处理在 Server 端做。Server 端处理完后通知给请求的 Client 端。
这种场景可以称之为 RPC(Remote Procedure Call)
有两个点说明一下:
- <1>Client 端发送请求给 Server 端可以简单定义一个 Queue。Client 作为 Producer 发布消息,Server 端作为 Cosumer 消费消息
-
<2>Server 端处理完耗时处理后需要将处理结果返回给请求的客户端。
- 可以在 Client 声明一个不指定名称的 Queue,系统会自动生成一个随机名称的 Queue。将 Queue 的名称在 publish 是发送给 Server 端
- 因为 Server 端要将处理结果返回给对应的请求,所以在 Client 端需要生成一个 CorrelationId 发送给 Server 端
处理流程
Client 端
(1) 声明从 Server 返回消息用的 queue
respQueue, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // noWait
nil,
)
(2)发送请求消息到 rpc_queue
err = ch.Publish(
"", //exchange
config.QUEUENAME, //routing key
false,
false,
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: correlationID,
ReplyTo: respQueue.Name,
Body: []byte(msgBody),
})
corrId 为自己随机生成的 Id
(2)Server 端
(3) 声明 rpc_queue, 从 rpc_queue 中消费消息
q, err := ch.QueueDeclare(
config.QUEUENAME,
false,
false,
false,
false,
nil,
)
msgs, err := ch.Consume(
q.Name,
"",
false, // auto ack
false,
false,
false,
nil,
)
(4)执行处理后使用 msg 中的 ReplyTo 返回处理结果给 Client
err = ch.Publish(
"",
msg.ReplyTo,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: msg.CorrelationId,
Body: []byte(bookName),
})
msg.Ack(false)
(5)Client 端从 reply queue 中接收从 Server 端来的 response
respMsgs, err := ch.Consume(
respQueue.Name,
"",
true, // auto-ack
true, // exclusive
false, // noLocal
false, // nowait
nil,
)
详细代码如下
conf.go
package config
const (
RMQADDR = "amqp://guest:guest@172.17.84.205:5672/"
QUEUENAME = "rpc_queue"
SERVERINSTANCESCNT = 5
)
client.go
package main
import (
config "binTest/rabbitmqTest/t1/l6/conf"
"fmt"
"log"
"math/rand"
"os"
"github.com/streadway/amqp"
)
func main() {if len(os.Args) < 2 {log.Println("Arguments error")
return
}
conn, err := amqp.Dial(config.RMQADDR)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
msgBody := os.Args[1]
respQueue, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // noWait
nil,
)
failOnError(err, "Failed to declare a response queue")
correlationID := randomID(32)
err = ch.Publish(
"", //exchange
config.QUEUENAME, //routing key
false,
false,
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: correlationID,
ReplyTo: respQueue.Name,
Body: []byte(msgBody),
})
log.Printf("[x] Sent %s", msgBody)
failOnError(err, "Failed to publish a message")
respMsgs, err := ch.Consume(
respQueue.Name,
"",
true, // auto-ack
true, // exclusive
false, // noLocal
false, // nowait
nil,
)
for item := range respMsgs {
if item.CorrelationId == correlationID {fmt.Println("response:", string(item.Body))
break
}
}
}
func failOnError(err error, msg string) {
if err != nil {fmt.Printf("%s: %s\n", msg, err)
}
}
func randomID(length int) string {
if length <= 0 {return ""}
bytes := make([]byte, length)
for i := 0; i < length; i++ {bytes[i] = byte(rand.Intn(9))
}
return string(bytes)
}
server.go
package main
import (
config "binTest/rabbitmqTest/t1/l6/conf"
"fmt"
"log"
"math/rand"
"time"
"github.com/streadway/amqp"
)
func main() {conn, err := amqp.Dial(config.RMQADDR)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.Qos(
config.SERVERINSTANCESCNT,
0,
false,
)
forever := make(chan bool)
for routine := 0; routine < config.SERVERINSTANCESCNT; routine++ {go func(routineNum int) {
q, err := ch.QueueDeclare(
config.QUEUENAME,
false,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name,
"",
false, // auto ack
false,
false,
false,
nil,
)
for msg := range msgs {log.Printf("In %d start consuming message: %s\n", routineNum, msg.Body)
bookName := queryBookID(string(msg.Body))
err = ch.Publish(
"",
msg.ReplyTo,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
CorrelationId: msg.CorrelationId,
Body: []byte(bookName),
})
if err != nil {fmt.Println("Failed to reply msg to client")
} else {fmt.Println("Response to client:", bookName)
}
msg.Ack(false)
}
}(routine)
}
<-forever
}
func failOnError(err error, msg string) {
if err != nil {fmt.Printf("%s: %s\n", msg, err)
}
}
func queryBookID(bookID string) string {
bookName := "QUERIED_" + bookID
time.Sleep(time.Duration(rand.Intn(9)) * time.Second)
return bookName
}
执行效果
Client 端
Server 端
全部代码可以在如下处取得
https://github.com/BinWang-sh…