共计 13911 个字符,预计需要花费 35 分钟才能阅读完成。
咱们一起来学 RabbitMQ 二:RabbiMQ 的 6 种模式的根本利用
嗨,大家好,我是小魔童哪吒,咱们从明天开始进入开源组件的学习,一边学习一边总结一边分享
文章提纲如下:
- RabbitMQ 成员组成
- RabbitMQ 的六种工作模式编码
RabbitMQ 成员组成
- 生产者 producer
- 消费者 consumer
- 交换机 exchange
用于承受、调配音讯
- 音讯 message
- 队列 queue
用于存储生产者的音讯
- 信道 channel AMQP
音讯推送应用的通道
- 连贯 connections
生成者或者消费者与 Rabbit 建设的 TCP 连贯
- 路由键 routingKey
用于把生成者的数据调配到交换器上
- 绑定键 BindingKey
用于把交换器的音讯绑定到队列上
- 连贯管理器 ConnectionFactory
应用程序与 Rabbit 之间建设连贯的管理器,程序代码中应用
RabbitMQ 的六种工作模式编码
single 模式
- 音讯产生者将音讯放入队列
- 音讯的消费者监听音讯队列,如果队列中有音讯就生产掉
目录如下:
.
├── consumer.go
├── go.mod
├── go.sum
├── main.go
└── xmtmq
└── xmtmq.go
理论编码如下:
每种模式的编码思路如下:
生产者 / 消费者
- 连贯 RabbitMQ 的 server
- 初始化连贯 connection
- 初始化通道 channel
- 初始化交换机 exchange
- 初始化队列 queue
- 应用路由 key,绑定队列 bind , key
- 生产音讯 / 生产音讯 produce , consume
音讯 xmtmq.go
package xmtmq
import (
"github.com/streadway/amqp"
"log"
)
// single 模式
// 定义 RabbitMQ 的数据结构
// go get github.com/streadway/amqp
type RabbitMQ struct {
conn *amqp.Connection // 连贯
channel *amqp.Channel // 通道
QueueName string // 队列名
Exchange string // 交换机
Key string // 路由键
MQUrl string // MQ 的虚拟机地址
}
// New 一个 RabbitMQ
func NewRabbitMQ(rbt *RabbitMQ) {
if rbt == nil || rbt.QueueName == ""|| rbt.MQUrl =="" {log.Panic("please check QueueName,Exchange,MQUrl ...")
}
conn, err := amqp.Dial(rbt.MQUrl)
if err != nil {log.Panicf("amqp.Dial error : %v", err)
}
rbt.conn = conn
channel, err := rbt.conn.Channel()
if err != nil {log.Panicf("rbt.conn.Channel error : %v", err)
}
rbt.channel = channel
}
func RabbitMQFree(rbt *RabbitMQ){
if rbt == nil{log.Printf("rbt is nil,free failed")
return
}
rbt.channel.Close()
rbt.conn.Close()}
func (rbt *RabbitMQ) Init() {
// 申请队列
_, err := rbt.channel.QueueDeclare(
rbt.QueueName, // 队列名
true, // 是否长久化
false, // 是否主动删除
false, // 是否排他
false, // 是否阻塞
nil, // 其余参数
)
if err != nil {log.Printf("rbt.channel.QueueDeclare error : %v", err)
return
}
}
// 生产音讯
func (rbt *RabbitMQ) Produce(data []byte) {
// 向队列中退出数据
err := rbt.channel.Publish(
rbt.Exchange, // 交换机
rbt.QueueName, // 队列名
false, // 若为 true,依据本身 exchange 类型和 routekey 规定无奈找到符合条件的队列会把音讯返还给发送者
false, // 若为 true,当 exchange 发送音讯到队列后发现队列上没有消费者,则会把音讯返还给发送者
amqp.Publishing{
ContentType: "text/plain",
Body: data,
},
)
if err != nil {log.Printf("rbt.channel.Publish error : %v", err)
return
}
return
}
// 生产音讯
func (rbt *RabbitMQ) Consume() {
// 生产数据
msg, err := rbt.channel.Consume(
rbt.QueueName, // 队列名
"xmt", // 消费者的名字
true, // 是否自动应答
false, // 是否排他
false, // 若为 true,示意 不能将同一个 Conenction 中生产者发送的消息传递给这个 Connection 中 的消费者
false, // 是否阻塞
nil, // 其余属性
)
if err != nil {log.Printf("rbt.channel.Consume error : %v", err)
return
}
for data := range msg {log.Printf("received data is %v", string(data.Body))
}
}
main.go
package main
import (
"fmt"
"log"
"time"
"xmt/xmtmq"
)
/*
RabbimtMQ single 模式 案例
利用场景:简略音讯队列的应用,一个生产者一个消费者
生产音讯
*/
func main() {
// 设置日志
log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt := &xmtmq.RabbitMQ{
QueueName: "xmtqueue",
MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
var index = 0
for {
// 生产音讯
rbt.Produce([]byte(fmt.Sprintf("hello wolrd %d", index)))
log.Println("发送胜利", index)
index++
time.Sleep(1 * time.Second)
}
}
consumer.go
package main
import (
"log"
"xmt/xmtmq"
)
func main() {log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt := &xmtmq.RabbitMQ{
QueueName: "xmtqueue",
MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
rbt.Consume()}
运行的时候,关上 2 个终端
终端 1:go run main.go
终端 2:go run consumer.go
work 模式
多个生产端生产同一个队列中的音讯,队列采纳轮询的形式将音讯是均匀发送给消费者,此处的资源是竞争关系
当生产者生产音讯的速度大于消费者生产的速度,就要思考用 work 工作模式,这样能进步处理速度进步负载
work 模式与 single 模式相似,只是 work 模式比 single 模式多了一些消费者
基于single
模式,开一个终端 3:go run consumer.go
publish / subscribe 模式
publish / subscribe
公布订阅模式,绝对于 Work queues 模式 多了一个 交换机,此处的资源是共享的
用于场景
- 邮件群发
- 群聊天
- 播送(广告等)
目录和上述编码保持一致:
xmtmq.go
开始用到交换机 exchange,fanout 类型
生产端先把音讯发送到交换机,再由交换机把音讯发送到绑定的队列中,每个绑定的队列都能收到由生产端发送的音讯
package xmtmq
import (
"github.com/streadway/amqp"
"log"
)
// publish 模式
// 定义 RabbitMQ 的数据结构
// go get github.com/streadway/amqp
type RabbitMQ struct {
conn *amqp.Connection // 连贯
channel *amqp.Channel // 通道
QueueName string // 队列名
Exchange string // 交换机
Key string // 路由键
MQUrl string // MQ 的虚拟机地址
}
// New 一个 RabbitMQ
func NewRabbitMQ(rbt *RabbitMQ) {
if rbt == nil || rbt.Exchange == ""|| rbt.MQUrl =="" {log.Panic("please check Exchange,MQUrl ...")
}
conn, err := amqp.Dial(rbt.MQUrl)
if err != nil {log.Panicf("amqp.Dial error : %v", err)
}
rbt.conn = conn
channel, err := rbt.conn.Channel()
if err != nil {log.Panicf("rbt.conn.Channel error : %v", err)
}
rbt.channel = channel
}
func RabbitMQFree(rbt *RabbitMQ) {
if rbt == nil {log.Printf("rbt is nil,free failed")
return
}
rbt.channel.Close()
rbt.conn.Close()}
func (rbt *RabbitMQ) Init() {
// 1、创立交换机
err := rbt.channel.ExchangeDeclare(
rbt.Exchange, // 交换机
amqp.ExchangeFanout, // 交换机类型
true, // 是否长久化
false, // 是否主动删除
false, //true 示意这个 exchange 不能够被 client 用来推送音讯,仅用来进行 exchange 和 exchange 之间的绑定
false, // 是否阻塞
nil, // 其余属性
)
if err != nil {log.Printf("rbt.channel.ExchangeDeclare error : %v", err)
return
}
}
// 生产音讯 publish
func (rbt *RabbitMQ) PublishMsg(data []byte) {
// 1、向队列中退出数据
err := rbt.channel.Publish(
rbt.Exchange, // 交换机
"", // 队列名
false, // 若为 true,依据本身 exchange 类型和 routekey 规定无奈找到符合条件的队列会把音讯返还给发送者
false, // 若为 true,当 exchange 发送音讯到队列后发现队列上没有消费者,则会把音讯返还给发送者
amqp.Publishing{
ContentType: "text/plain",
Body: data,
},
)
if err != nil {log.Printf("rbt.channel.Publish error : %v", err)
return
}
return
}
// 生产音讯
func (rbt *RabbitMQ) SubscribeMsg() {
// 1、创立队列
q, err := rbt.channel.QueueDeclare(
"", // 此处咱们传入的是空,则是随机产生队列的名称
true,
false,
false,
false,
nil,
)
if err != nil {log.Printf("rbt.channel.QueueDeclare error : %v", err)
return
}
// 2、绑定队列
err = rbt.channel.QueueBind(
q.Name, // 队列名字
"", // 在 publish 模式下,这里 key 为空
rbt.Exchange, // 交换机名称
false, // 是否阻塞
nil, // 其余属性
)
if err != nil {log.Printf("rbt.channel.QueueBind error : %v", err)
return
}
// 3、生产数据
msg, err := rbt.channel.Consume(
q.Name, // 队列名
"xmt", // 消费者的名字
true, // 是否自动应答
false, // 是否排他
false, // 若为 true,示意 不能将同一个 Conenction 中生产者发送的消息传递给这个 Connection 中 的消费者
false, // 是否阻塞
nil, // 其余属性
)
if err != nil {log.Printf("rbt.channel.Consume error : %v", err)
return
}
for data := range msg {log.Printf("received data is %v", string(data.Body))
}
}
main.go
package main
import (
"fmt"
"log"
"time"
"xmt/xmtmq"
)
/*
RabbimtMQ publish 模式 案例
利用场景:邮件群发, 群聊天, 播送(广告)
生产音讯
*/
func main() {log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt := &xmtmq.RabbitMQ{
Exchange: "xmtPubEx",
MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
rbt.Init()
var index = 0
for {rbt.PublishMsg([]byte(fmt.Sprintf("hello wolrd %d", index)))
log.Println("发送胜利", index)
index++
time.Sleep(1 * time.Second)
}
xmtmq.RabbitMQFree(rbt)
}
consumer.go
package main
import (
"log"
"xmt/xmtmq"
)
func main() {log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt := &xmtmq.RabbitMQ{
Exchange: "xmtPubEx",
MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
rbt.SubscribeMsg()
xmtmq.RabbitMQFree(rbt)
}
执行的操作和上述保持一致
终端 1:go run main.go
终端 2:go run consumer.go
终端 3:go run consumer.go
成果和上述 single
模式和 work
模式的显著区别是:公布订阅模式的案例,生产者生产的音讯,对应的消费者生产其生产的内容
routing 模式
音讯生产者将音讯发送给交换机依照路由判断,路由是字符串 以后产生的音讯携带路由字符(对象的办法),交换机依据路由的 key, 只能匹配上路由 key 对应的音讯队列, 对应的消费者能力生产音讯
利用场景:从零碎的代码逻辑中获取对应的性能字符串, 将音讯工作扔到对应的队列中业务场景, 例如处理错误,解决特定音讯等
生产者解决流程:
申明队列并申明交换机 -> 创立连贯 -> 创立通道 -> 通道申明交换机 -> 通道申明队列 -> 通过通道使队列绑定到交换机并指定该队列的 routingkey(通配符)-> 制订音讯 -> 发送音讯并指定 routingkey(通配符)
消费者解决流程:
申明队列并申明交换机 -> 创立连贯 -> 创立通道 -> 通道申明交换机 -> 通道申明队列 -> 通过通道使队列绑定到交换机并指定 routingkey(通配符)-> 重写音讯生产办法 -> 执行音讯办法
目录构造如下:
.
├── consumer2.go
├── consumer.go
├── go.mod
├── go.sum
├── main.go
└── xmtmq
└── xmtmq.go
xmtmq.go
- 用到交换机 为
direct
类型 - 用到 路由键
package xmtmq
import (
"github.com/streadway/amqp"
"log"
)
// routing 模式
// 定义 RabbitMQ 的数据结构
// go get github.com/streadway/amqp
type RabbitMQ struct {
conn *amqp.Connection // 连贯
channel *amqp.Channel // 通道
QueueName string // 队列名
Exchange string // 交换机
Key string // 路由键
MQUrl string // MQ 的虚拟机地址
}
// New 一个 RabbitMQ
func NewRabbitMQ(rbt *RabbitMQ) {
if rbt == nil || rbt.Exchange == ""|| rbt.QueueName =="" || rbt.Key == ""|| rbt.MQUrl =="" {log.Panic("please check Exchange,,QueueName,Key,MQUrl ...")
}
conn, err := amqp.Dial(rbt.MQUrl)
if err != nil {log.Panicf("amqp.Dial error : %v", err)
}
rbt.conn = conn
channel, err := rbt.conn.Channel()
if err != nil {log.Panicf("rbt.conn.Channel error : %v", err)
}
rbt.channel = channel
}
func RabbitMQFree(rbt *RabbitMQ) {
if rbt == nil {log.Printf("rbt is nil,free failed")
return
}
rbt.channel.Close()
rbt.conn.Close()}
func (rbt *RabbitMQ) Init() {
// 1、创立交换机
err := rbt.channel.ExchangeDeclare(
rbt.Exchange, // 交换机
amqp.ExchangeDirect, // 交换机类型
true, // 是否长久化
false, // 是否主动删除
false, //true 示意这个 exchange 不能够被 client 用来推送音讯,仅用来进行 exchange 和 exchange 之间的绑定
false, // 是否阻塞
nil, // 其余属性
)
if err != nil {log.Printf("rbt.channel.ExchangeDeclare error : %v", err)
return
}
// 2、创立队列
_, err = rbt.channel.QueueDeclare(
rbt.QueueName, // 此处咱们传入的是空,则是随机产生队列的名称
true,
false,
false,
false,
nil,
)
if err != nil {log.Printf("rbt.channel.QueueDeclare error : %v", err)
return
}
// 3、绑定队列
err = rbt.channel.QueueBind(
rbt.QueueName, // 队列名字
rbt.Key, // routing,这里 key 须要填
rbt.Exchange, // 交换机名称
false, // 是否阻塞
nil, // 其余属性
)
if err != nil {log.Printf("rbt.channel.QueueBind error : %v", err)
return
}
}
// 生产音讯 publish
func (rbt *RabbitMQ) ProduceRouting(data []byte) {
// 1、向队列中退出数据
err := rbt.channel.Publish(
rbt.Exchange, // 交换机
rbt.Key, // key
false, // 若为 true,依据本身 exchange 类型和 routekey 规定无奈找到符合条件的队列会把音讯返还给发送者
false, // 若为 true,当 exchange 发送音讯到队列后发现队列上没有消费者,则会把音讯返还给发送者
amqp.Publishing{
ContentType: "text/plain",
Body: data,
},
)
if err != nil {log.Printf("rbt.channel.Publish error : %v", err)
return
}
return
}
// 生产音讯
func (rbt *RabbitMQ) ConsumeRoutingMsg() {
// 4、生产数据
msg, err := rbt.channel.Consume(
rbt.QueueName, // 队列名
"", // 消费者的名字
true, // 是否自动应答
false, // 是否排他
false, // 若为 true,示意 不能将同一个 Conenction 中生产者发送的消息传递给这个 Connection 中 的消费者
false, // 是否阻塞
nil, // 其余属性
)
if err != nil {log.Printf("rbt.channel.Consume error : %v", err)
return
}
for data := range msg {log.Printf("received data is %v", string(data.Body))
}
}
main.go
package main
import (
"fmt"
"log"
"time"
"xmt/xmtmq"
)
/*
RabbimtMQ routing 模式 案例
利用场景:从零碎的代码逻辑中获取对应的性能字符串, 将音讯工作扔到对应的队列中业务场景, 例如处理错误,解决特定音讯等
生产音讯
*/
func main() {log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt1 := &xmtmq.RabbitMQ{
Exchange: "xmtPubEx2",
Key: "xmt1",
QueueName: "Routingqueuexmt1",
MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt1)
rbt1.Init()
rbt2 := &xmtmq.RabbitMQ{
Exchange: "xmtPubEx2",
Key: "xmt2",
QueueName: "Routingqueuexmt2",
MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt2)
rbt2.Init()
var index = 0
for {rbt1.ProduceRouting([]byte(fmt.Sprintf("hello wolrd xmt1 %d", index)))
log.Println("发送胜利 xmt1", index)
rbt2.ProduceRouting([]byte(fmt.Sprintf("hello wolrd xmt2 %d", index)))
log.Println("发送胜利 xmt2", index)
index++
time.Sleep(1 * time.Second)
}
xmtmq.RabbitMQFree(rbt1)
xmtmq.RabbitMQFree(rbt2)
}
consumer.go
package main
import (
"log"
"xmt/xmtmq"
)
func main() {log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt := &xmtmq.RabbitMQ{
Exchange: "xmtPubEx2",
Key: "xmt1",
QueueName: "Routingqueuexmt1",
MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
rbt.ConsumeRoutingMsg()
xmtmq.RabbitMQFree(rbt)
}
consumer2.go
package main
import (
"log"
"xmt/xmtmq"
)
func main() {log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt := &xmtmq.RabbitMQ{
Exchange: "xmtPubEx2",
Key: "xmt2",
QueueName: "Routingqueuexmt2",
MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
rbt.ConsumeRoutingMsg()
xmtmq.RabbitMQFree(rbt)
}
topic 模式
话题模式,一个音讯被多个消费者获取,音讯的指标 queue 可用 BindingKey
的通配符
Topics 模式实际上是路由模式的一种
他俩的最大的区别是:Topics 模式发送音讯和生产音讯的时候是通过 通配符 去进行匹配的
- * 号代表能够同通配一个单词
-
号代表能够通配零个或多个单词
编码的案例与上述 routing 模式放弃始终,只是 exchange 为 topic
类型
如下是上述几种模式波及到的 交换机
和队列
rpc 模式
RPC
近程过程调用,客户端近程调用服务端的办法,应用 MQ
能够实现 RPC
的异步调用
目录构造为:
.
├── consumer.go
├── go.mod
├── go.sum
├── main.go
└── xmtmq
└── xmtmq.go
- 客户端即是生产者也是消费者,向
RPC
申请队列发送RPC
调用音讯,同时监听RPC
响应队列 - 服务端监听
RPC
申请队列的音讯,收到音讯后执行服务端的办法,失去办法返回的后果 - 服务端将
RPC
办法 的后果发送到RPC
响应队列。 - 客户端监听
RPC
响应队列,接管到RPC
调用后果
xmtmq.go
package xmtmq
import (
"github.com/streadway/amqp"
"log"
"math/rand"
)
// rpc 模式
// 定义 RabbitMQ 的数据结构
// go get github.com/streadway/amqp
type RabbitMQ struct {
conn *amqp.Connection // 连贯
channel *amqp.Channel // 通道
QueueName string // 队列名
Exchange string // 交换机
Key string // 路由键
MQUrl string // MQ 的虚拟机地址
}
// New 一个 RabbitMQ
func NewRabbitMQ(rbt *RabbitMQ) {
if rbt == nil || rbt.QueueName == ""|| rbt.MQUrl =="" {log.Panic("please check QueueName,Exchange,MQUrl ...")
}
conn, err := amqp.Dial(rbt.MQUrl)
if err != nil {log.Panicf("amqp.Dial error : %v", err)
}
rbt.conn = conn
channel, err := rbt.conn.Channel()
if err != nil {log.Panicf("rbt.conn.Channel error : %v", err)
}
rbt.channel = channel
}
func RabbitMQFree(rbt *RabbitMQ) {
if rbt == nil {log.Printf("rbt is nil,free failed")
return
}
rbt.channel.Close()
rbt.conn.Close()}
// 生产音讯
func (rbt *RabbitMQ) Produce(data []byte) {
// 申请队列
q, err := rbt.channel.QueueDeclare(
rbt.QueueName, // 队列名
true, // 是否长久化
false, // 是否主动删除
false, // 是否排他
false, // 是否阻塞
nil, // 其余参数
)
if err != nil {log.Printf("rbt.channel.QueueDeclare error : %v", err)
return
}
err = rbt.channel.Qos(1, 0, false)
if err != nil {log.Printf("rbt.channel.Qos error : %v", err)
return
}
d, err := rbt.channel.Consume(
q.Name,
"",
false,
false,
false,
false,
nil)
if err != nil {log.Printf("rbt.channel.Consume error : %v", err)
return
}
for msg := range d {log.Println("received msg is", string(msg.Body))
err := rbt.channel.Publish(
"",
msg.ReplyTo,
false,
false,
amqp.Publishing{
ContentType: "test/plain",
CorrelationId: msg.CorrelationId,
Body: data,
})
if err != nil {log.Printf("rbt.channel.Publish error : %v", err)
return
}
msg.Ack(false)
log.Println("svr response ok")
}
return
}
func randomString(l int) string {bytes := make([]byte, l)
for i := 0; i < l; i++ {bytes[i] = byte(rand.Intn(l))
}
return string(bytes)
}
// 生产音讯
func (rbt *RabbitMQ) Consume() {
// 申请队列
q, err := rbt.channel.QueueDeclare(
"", // 队列名
true, // 是否长久化
false, // 是否主动删除
false, // 是否排他
false, // 是否阻塞
nil, // 其余参数
)
if err != nil {log.Printf("rbt.channel.QueueDeclare error : %v", err)
return
}
// 生产数据
msg, err := rbt.channel.Consume(
q.Name, // 队列名
"xmt", // 消费者的名字
true, // 是否自动应答
false, // 是否排他
false, // 若为 true,示意 不能将同一个 Conenction 中生产者发送的消息传递给这个 Connection 中 的消费者
false, // 是否阻塞
nil, // 其余属性
)
if err != nil {log.Printf("rbt.channel.Consume error : %v", err)
return
}
id := randomString(32)
err = rbt.channel.Publish(
"",
rbt.QueueName,
false,
false,
amqp.Publishing{
ContentType: "test/plain",
CorrelationId: id,
ReplyTo: q.Name,
Body: []byte("321"),
})
if err != nil {log.Printf("rbt.channel.Publish error : %v", err)
return
}
for data := range msg {log.Printf("received data is %v", string(data.Body))
}
}
main.go
package main
import (
"fmt"
"log"
"xmt/xmtmq"
)
/*
RabbimtMQ rpc 模式 案例
利用场景:简略音讯队列的应用,一个生产者一个消费者
生产音讯
*/
func main() {log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rbt := &xmtmq.RabbitMQ{
QueueName: "xmtqueue",
MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
rbt.Produce([]byte(fmt.Sprintf("hello wolrd")))
}
consumer.go
package main
import (
"log"
"math/rand"
"time"
"xmt/xmtmq"
)
func main() {log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)
rand.Seed(time.Now().UTC().UnixNano())
rbt := &xmtmq.RabbitMQ{
QueueName: "xmtqueue",
MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq",
}
xmtmq.NewRabbitMQ(rbt)
rbt.Consume()}
咱们先运行消费者,多运行几个,能够看到咱们的队列中曾经有数据了,咱们运行的是 2 个消费者,因而此处是 2
再运行生产者,就能看到生产者将消费者发送的音讯生产掉,并且通过 CorrelationId
找到对应消费者监听的队列,将数据发送到队列中
消费者监听的队列有数据了,消费者就取出来进行生产
总结
RabbitMQ
的六种工作模式:
- single 模式
- work 模式
- publish / subscribe 模式
- routing 模式
- topic 模式
- rpc 模式
参考资料:
RabbitMQ Tutorials
欢送点赞,关注,珍藏
敌人们,你的反对和激励,是我保持分享,提高质量的能源
好了,本次就到这里
技术是凋谢的,咱们的心态,更应是凋谢的。拥抱变动,背阴而生,致力向前行。
我是 小魔童哪吒,欢送点赞关注珍藏,下次见~