咱们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的根本利用
嗨,大家好,我是小魔童哪吒,咱们从明天开始进入开源组件的学习,一边学习一边总结一边分享
文章提纲如下:
- RabbitMQ 成员组成
- RabbitMQ 的六种工作模式编码
RabbitMQ 成员组成
- 生产者 producer
- 消费者 consumer
- 交换机 exchange
用于承受、调配音讯
- 音讯 message
- 队列 queue
用于存储生产者的音讯
- 信道 channel AMQP
音讯推送应用的通道
- 连贯 connections
生成者或者消费者与Rabbit 建设的TCP 连贯
- 路由键 routingKey
用于把生成者的数据调配到交换器上
- 绑定键 BindingKey
用于把交换器的音讯绑定到队列上
- 连贯管理器 ConnectionFactory
应用程序与 Rabbit 之间建设连贯的管理器,程序代码中应用
RabbitMQ 的六种工作模式编码
single 模式
- 音讯产生者将音讯放入队列
- 音讯的消费者监听音讯队列,如果队列中有音讯就生产掉
目录如下:
.├── consumer.go├── go.mod├── go.sum├── main.go└── xmtmq └── xmtmq.go
理论编码如下:
每种模式的编码思路如下:
生产者 / 消费者
- 连贯 RabbitMQ 的 server
- 初始化连贯 connection
- 初始化通道 channel
- 初始化交换机 exchange
- 初始化队列 queue
- 应用路由key,绑定队列 bind , key
- 生产音讯 / 生产音讯 produce , consume
音讯xmtmq.go
package xmtmqimport ( "github.com/streadway/amqp" "log")// single 模式// 定义 RabbitMQ 的数据结构// go get github.com/streadway/amqptype RabbitMQ struct { conn *amqp.Connection // 连贯 channel *amqp.Channel // 通道 QueueName string // 队列名 Exchange string // 交换机 Key string // 路由键 MQUrl string // MQ的虚拟机地址}// New 一个 RabbitMQfunc NewRabbitMQ(rbt *RabbitMQ) { if rbt == nil || rbt.QueueName == "" || rbt.MQUrl == "" { log.Panic("please check QueueName,Exchange,MQUrl ...") } conn, err := amqp.Dial(rbt.MQUrl) if err != nil { log.Panicf("amqp.Dial error : %v", err) } rbt.conn = conn channel, err := rbt.conn.Channel() if err != nil { log.Panicf("rbt.conn.Channel error : %v", err) } rbt.channel = channel}func RabbitMQFree(rbt *RabbitMQ){ if rbt == nil{ log.Printf("rbt is nil,free failed") return } rbt.channel.Close() rbt.conn.Close()}func (rbt *RabbitMQ) Init() { // 申请队列 _, err := rbt.channel.QueueDeclare( rbt.QueueName, // 队列名 true, // 是否长久化 false, // 是否主动删除 false, // 是否排他 false, // 是否阻塞 nil, // 其余参数 ) if err != nil { log.Printf("rbt.channel.QueueDeclare error : %v", err) return }}// 生产音讯func (rbt *RabbitMQ) Produce(data []byte) { // 向队列中退出数据 err := rbt.channel.Publish( rbt.Exchange, // 交换机 rbt.QueueName, // 队列名 false, // 若为true,依据本身exchange类型和routekey规定无奈找到符合条件的队列会把音讯返还给发送者 false, // 若为true,当exchange发送音讯到队列后发现队列上没有消费者,则会把音讯返还给发送者 amqp.Publishing{ ContentType: "text/plain", Body: data, }, ) if err != nil { log.Printf("rbt.channel.Publish error : %v", err) return } return}// 生产音讯func (rbt *RabbitMQ) Consume() { // 生产数据 msg, err := rbt.channel.Consume( rbt.QueueName, // 队列名 "xmt", // 消费者的名字 true, // 是否自动应答 false, // 是否排他 false, // 若为true,示意 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者 false, // 是否阻塞 nil, // 其余属性 ) if err != nil { log.Printf("rbt.channel.Consume error : %v", err) return } for data := range msg { log.Printf("received data is %v", string(data.Body)) }}
main.go
package mainimport ( "fmt" "log" "time" "xmt/xmtmq")/*RabbimtMQ single 模式 案例利用场景:简略音讯队列的应用,一个生产者一个消费者生产音讯*/func main() { // 设置日志 log.SetFlags(log.Llongfile | log.Ltime | log.Ldate) rbt := &xmtmq.RabbitMQ{ QueueName: "xmtqueue", MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq", } xmtmq.NewRabbitMQ(rbt) var index = 0 for { // 生产音讯 rbt.Produce([]byte(fmt.Sprintf("hello wolrd %d ", index))) log.Println("发送胜利 ", index) index++ time.Sleep(1 * time.Second) }}
consumer.go
package mainimport ( "log" "xmt/xmtmq")func main() { log.SetFlags(log.Llongfile | log.Ltime | log.Ldate) rbt := &xmtmq.RabbitMQ{ QueueName: "xmtqueue", MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq", } xmtmq.NewRabbitMQ(rbt) rbt.Consume()}
运行的时候,关上2个终端
终端1:go run main.go
终端2:go run consumer.go
work 模式
多个生产端生产同一个队列中的音讯,队列采纳轮询的形式将音讯是均匀发送给消费者,此处的资源是竞争关系
当生产者生产音讯的速度大于消费者生产的速度,就要思考用 work 工作模式,这样能进步处理速度进步负载
work 模式与 single 模式相似, 只是work 模式比 single 模式多了一些消费者
基于single
模式,开一个终端3 : go run consumer.go
publish / subscribe 模式
publish / subscribe
公布订阅模式 , 绝对于Work queues模式多了一个交换机,此处的资源是共享的
用于场景
- 邮件群发
- 群聊天
- 播送(广告等)
目录和上述编码保持一致:
xmtmq.go
开始用到交换机 exchange ,fanout 类型
生产端先把音讯发送到交换机,再由交换机把音讯发送到绑定的队列中,每个绑定的队列都能收到由生产端发送的音讯
package xmtmqimport ( "github.com/streadway/amqp" "log")// publish 模式// 定义 RabbitMQ 的数据结构// go get github.com/streadway/amqptype RabbitMQ struct { conn *amqp.Connection // 连贯 channel *amqp.Channel // 通道 QueueName string // 队列名 Exchange string // 交换机 Key string // 路由键 MQUrl string // MQ的虚拟机地址}// New 一个 RabbitMQfunc NewRabbitMQ(rbt *RabbitMQ) { if rbt == nil || rbt.Exchange == "" || rbt.MQUrl == "" { log.Panic("please check Exchange,MQUrl ...") } conn, err := amqp.Dial(rbt.MQUrl) if err != nil { log.Panicf("amqp.Dial error : %v", err) } rbt.conn = conn channel, err := rbt.conn.Channel() if err != nil { log.Panicf("rbt.conn.Channel error : %v", err) } rbt.channel = channel}func RabbitMQFree(rbt *RabbitMQ) { if rbt == nil { log.Printf("rbt is nil,free failed") return } rbt.channel.Close() rbt.conn.Close()}func (rbt *RabbitMQ) Init() { // 1、创立交换机 err := rbt.channel.ExchangeDeclare( rbt.Exchange, // 交换机 amqp.ExchangeFanout, // 交换机类型 true, // 是否长久化 false, //是否主动删除 false, //true示意这个exchange不能够被client用来推送音讯,仅用来进行exchange和exchange之间的绑定 false, // 是否阻塞 nil, // 其余属性 ) if err != nil { log.Printf("rbt.channel.ExchangeDeclare error : %v", err) return }}// 生产音讯 publishfunc (rbt *RabbitMQ) PublishMsg(data []byte) { // 1、向队列中退出数据 err := rbt.channel.Publish( rbt.Exchange, // 交换机 "", // 队列名 false, // 若为true,依据本身exchange类型和routekey规定无奈找到符合条件的队列会把音讯返还给发送者 false, // 若为true,当exchange发送音讯到队列后发现队列上没有消费者,则会把音讯返还给发送者 amqp.Publishing{ ContentType: "text/plain", Body: data, }, ) if err != nil { log.Printf("rbt.channel.Publish error : %v", err) return } return}// 生产音讯func (rbt *RabbitMQ) SubscribeMsg() { // 1、创立队列 q, err := rbt.channel.QueueDeclare( "", // 此处咱们传入的是空,则是随机产生队列的名称 true, false, false, false, nil, ) if err != nil { log.Printf("rbt.channel.QueueDeclare error : %v", err) return } // 2、绑定队列 err = rbt.channel.QueueBind( q.Name, // 队列名字 "", // 在publish模式下,这里key 为空 rbt.Exchange, // 交换机名称 false, // 是否阻塞 nil, // 其余属性 ) if err != nil { log.Printf("rbt.channel.QueueBind error : %v", err) return } // 3、生产数据 msg, err := rbt.channel.Consume( q.Name, // 队列名 "xmt", // 消费者的名字 true, // 是否自动应答 false, // 是否排他 false, // 若为true,示意 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者 false, // 是否阻塞 nil, // 其余属性 ) if err != nil { log.Printf("rbt.channel.Consume error : %v", err) return } for data := range msg { log.Printf("received data is %v", string(data.Body)) }}
main.go
package mainimport ( "fmt" "log" "time" "xmt/xmtmq")/*RabbimtMQ publish 模式 案例利用场景:邮件群发,群聊天,播送(广告)生产音讯*/func main() { log.SetFlags(log.Llongfile | log.Ltime | log.Ldate) rbt := &xmtmq.RabbitMQ{ Exchange: "xmtPubEx", MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq", } xmtmq.NewRabbitMQ(rbt) rbt.Init() var index = 0 for { rbt.PublishMsg([]byte(fmt.Sprintf("hello wolrd %d ", index))) log.Println("发送胜利 ", index) index++ time.Sleep(1 * time.Second) } xmtmq.RabbitMQFree(rbt)}
consumer.go
package mainimport ( "log" "xmt/xmtmq")func main() { log.SetFlags(log.Llongfile | log.Ltime | log.Ldate) rbt := &xmtmq.RabbitMQ{ Exchange: "xmtPubEx", MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq", } xmtmq.NewRabbitMQ(rbt) rbt.SubscribeMsg() xmtmq.RabbitMQFree(rbt)}
执行的操作和上述保持一致
终端1:go run main.go
终端2:go run consumer.go
终端3:go run consumer.go
成果和上述single
模式和 work
模式的显著区别是:公布订阅模式的案例,生产者生产的音讯,对应的消费者生产其生产的内容
routing 模式
音讯生产者将音讯发送给交换机依照路由判断,路由是字符串 以后产生的音讯携带路由字符(对象的办法),交换机依据路由的key,只能匹配上路由key对应的音讯队列,对应的消费者能力生产音讯
利用场景:从零碎的代码逻辑中获取对应的性能字符串,将音讯工作扔到对应的队列中业务场景,例如处理错误,解决特定音讯等
生产者解决流程:
申明队列并申明交换机 -> 创立连贯 -> 创立通道 -> 通道申明交换机 -> 通道申明队列 -> 通过通道使队列绑定到交换机并指定该队列的routingkey(通配符) -> 制订音讯 -> 发送音讯并指定routingkey(通配符)
消费者解决流程:
申明队列并申明交换机 -> 创立连贯 -> 创立通道 -> 通道申明交换机 -> 通道申明队列 -> 通过通道使队列绑定到交换机并指定routingkey(通配符) -> 重写音讯生产办法 -> 执行音讯办法
目录构造如下:
.├── consumer2.go├── consumer.go├── go.mod├── go.sum├── main.go└── xmtmq └── xmtmq.go
xmtmq.go
- 用到交换机 为
direct
类型 - 用到路由键
package xmtmqimport ( "github.com/streadway/amqp" "log")// routing 模式// 定义 RabbitMQ 的数据结构// go get github.com/streadway/amqptype RabbitMQ struct { conn *amqp.Connection // 连贯 channel *amqp.Channel // 通道 QueueName string // 队列名 Exchange string // 交换机 Key string // 路由键 MQUrl string // MQ的虚拟机地址}// New 一个 RabbitMQfunc NewRabbitMQ(rbt *RabbitMQ) { if rbt == nil || rbt.Exchange == "" || rbt.QueueName == "" || rbt.Key == "" || rbt.MQUrl == "" { log.Panic("please check Exchange,,QueueName,Key,MQUrl ...") } conn, err := amqp.Dial(rbt.MQUrl) if err != nil { log.Panicf("amqp.Dial error : %v", err) } rbt.conn = conn channel, err := rbt.conn.Channel() if err != nil { log.Panicf("rbt.conn.Channel error : %v", err) } rbt.channel = channel}func RabbitMQFree(rbt *RabbitMQ) { if rbt == nil { log.Printf("rbt is nil,free failed") return } rbt.channel.Close() rbt.conn.Close()}func (rbt *RabbitMQ) Init() { // 1、创立交换机 err := rbt.channel.ExchangeDeclare( rbt.Exchange, // 交换机 amqp.ExchangeDirect, // 交换机类型 true, // 是否长久化 false, //是否主动删除 false, //true示意这个exchange不能够被client用来推送音讯,仅用来进行exchange和exchange之间的绑定 false, // 是否阻塞 nil, // 其余属性 ) if err != nil { log.Printf("rbt.channel.ExchangeDeclare error : %v", err) return } // 2、创立队列 _, err = rbt.channel.QueueDeclare( rbt.QueueName, // 此处咱们传入的是空,则是随机产生队列的名称 true, false, false, false, nil, ) if err != nil { log.Printf("rbt.channel.QueueDeclare error : %v", err) return } // 3、绑定队列 err = rbt.channel.QueueBind( rbt.QueueName, // 队列名字 rbt.Key, // routing,这里key 须要填 rbt.Exchange, // 交换机名称 false, // 是否阻塞 nil, // 其余属性 ) if err != nil { log.Printf("rbt.channel.QueueBind error : %v", err) return }}// 生产音讯 publishfunc (rbt *RabbitMQ) ProduceRouting(data []byte) { // 1、向队列中退出数据 err := rbt.channel.Publish( rbt.Exchange, // 交换机 rbt.Key, // key false, // 若为true,依据本身exchange类型和routekey规定无奈找到符合条件的队列会把音讯返还给发送者 false, // 若为true,当exchange发送音讯到队列后发现队列上没有消费者,则会把音讯返还给发送者 amqp.Publishing{ ContentType: "text/plain", Body: data, }, ) if err != nil { log.Printf("rbt.channel.Publish error : %v", err) return } return}// 生产音讯func (rbt *RabbitMQ) ConsumeRoutingMsg() { // 4、生产数据 msg, err := rbt.channel.Consume( rbt.QueueName, // 队列名 "", // 消费者的名字 true, // 是否自动应答 false, // 是否排他 false, // 若为true,示意 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者 false, // 是否阻塞 nil, // 其余属性 ) if err != nil { log.Printf("rbt.channel.Consume error : %v", err) return } for data := range msg { log.Printf("received data is %v", string(data.Body)) }}
main.go
package mainimport ( "fmt" "log" "time" "xmt/xmtmq")/*RabbimtMQ routing 模式 案例利用场景:从零碎的代码逻辑中获取对应的性能字符串,将音讯工作扔到对应的队列中业务场景,例如处理错误,解决特定音讯等生产音讯*/func main() { log.SetFlags(log.Llongfile | log.Ltime | log.Ldate) rbt1 := &xmtmq.RabbitMQ{ Exchange: "xmtPubEx2", Key: "xmt1", QueueName: "Routingqueuexmt1", MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq", } xmtmq.NewRabbitMQ(rbt1) rbt1.Init() rbt2 := &xmtmq.RabbitMQ{ Exchange: "xmtPubEx2", Key: "xmt2", QueueName: "Routingqueuexmt2", MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq", } xmtmq.NewRabbitMQ(rbt2) rbt2.Init() var index = 0 for { rbt1.ProduceRouting([]byte(fmt.Sprintf("hello wolrd xmt1 %d ", index))) log.Println("发送胜利xmt1 ", index) rbt2.ProduceRouting([]byte(fmt.Sprintf("hello wolrd xmt2 %d ", index))) log.Println("发送胜利xmt2 ", index) index++ time.Sleep(1 * time.Second) } xmtmq.RabbitMQFree(rbt1) xmtmq.RabbitMQFree(rbt2)}
consumer.go
package mainimport ( "log" "xmt/xmtmq")func main() { log.SetFlags(log.Llongfile | log.Ltime | log.Ldate) rbt := &xmtmq.RabbitMQ{ Exchange: "xmtPubEx2", Key: "xmt1", QueueName: "Routingqueuexmt1", MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq", } xmtmq.NewRabbitMQ(rbt) rbt.ConsumeRoutingMsg() xmtmq.RabbitMQFree(rbt)}
consumer2.go
package mainimport ( "log" "xmt/xmtmq")func main() { log.SetFlags(log.Llongfile | log.Ltime | log.Ldate) rbt := &xmtmq.RabbitMQ{ Exchange: "xmtPubEx2", Key: "xmt2", QueueName: "Routingqueuexmt2", MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq", } xmtmq.NewRabbitMQ(rbt) rbt.ConsumeRoutingMsg() xmtmq.RabbitMQFree(rbt)}
topic 模式
话题模式,一个音讯被多个消费者获取,音讯的指标 queue 可用 BindingKey
的通配符
Topics 模式实际上是路由模式的一种
他俩的最大的区别是 : Topics 模式发送音讯和生产音讯的时候是通过通配符去进行匹配的
- *号代表能够同通配一个单词
号代表能够通配零个或多个单词
编码的案例与上述 routing 模式放弃始终,只是 exchange 为 topic
类型
如下是上述几种模式波及到的交换机
和队列
rpc 模式
RPC
近程过程调用,客户端近程调用服务端的办法 ,应用 MQ
能够实现 RPC
的异步调用
目录构造为:
.├── consumer.go├── go.mod├── go.sum├── main.go└── xmtmq └── xmtmq.go
- 客户端即是生产者也是消费者,向
RPC
申请队列发送RPC
调用音讯,同时监听RPC
响应队列 - 服务端监听
RPC
申请队列的音讯,收到音讯后执行服务端的办法,失去办法返回的后果 - 服务端将
RPC
办法 的后果发送到RPC
响应队列。 - 客户端监听
RPC
响应队列,接管到RPC
调用后果
xmtmq.go
package xmtmqimport ( "github.com/streadway/amqp" "log" "math/rand")// rpc 模式// 定义 RabbitMQ 的数据结构// go get github.com/streadway/amqptype RabbitMQ struct { conn *amqp.Connection // 连贯 channel *amqp.Channel // 通道 QueueName string // 队列名 Exchange string // 交换机 Key string // 路由键 MQUrl string // MQ的虚拟机地址}// New 一个 RabbitMQfunc NewRabbitMQ(rbt *RabbitMQ) { if rbt == nil || rbt.QueueName == "" || rbt.MQUrl == "" { log.Panic("please check QueueName,Exchange,MQUrl ...") } conn, err := amqp.Dial(rbt.MQUrl) if err != nil { log.Panicf("amqp.Dial error : %v", err) } rbt.conn = conn channel, err := rbt.conn.Channel() if err != nil { log.Panicf("rbt.conn.Channel error : %v", err) } rbt.channel = channel}func RabbitMQFree(rbt *RabbitMQ) { if rbt == nil { log.Printf("rbt is nil,free failed") return } rbt.channel.Close() rbt.conn.Close()}// 生产音讯func (rbt *RabbitMQ) Produce(data []byte) { // 申请队列 q, err := rbt.channel.QueueDeclare( rbt.QueueName, // 队列名 true, // 是否长久化 false, // 是否主动删除 false, // 是否排他 false, // 是否阻塞 nil, // 其余参数 ) if err != nil { log.Printf("rbt.channel.QueueDeclare error : %v", err) return } err = rbt.channel.Qos(1, 0, false) if err != nil { log.Printf("rbt.channel.Qos error : %v", err) return } d, err := rbt.channel.Consume( q.Name, "", false, false, false, false, nil) if err != nil { log.Printf("rbt.channel.Consume error : %v", err) return } for msg := range d { log.Println("received msg is ", string(msg.Body)) err := rbt.channel.Publish( "", msg.ReplyTo, false, false, amqp.Publishing{ ContentType: "test/plain", CorrelationId: msg.CorrelationId, Body: data, }) if err != nil { log.Printf("rbt.channel.Publish error : %v", err) return } msg.Ack(false) log.Println("svr response ok ") } return}func randomString(l int) string { bytes := make([]byte, l) for i := 0; i < l; i++ { bytes[i] = byte(rand.Intn(l)) } return string(bytes)}// 生产音讯func (rbt *RabbitMQ) Consume() { // 申请队列 q, err := rbt.channel.QueueDeclare( "", // 队列名 true, // 是否长久化 false, // 是否主动删除 false, // 是否排他 false, // 是否阻塞 nil, // 其余参数 ) if err != nil { log.Printf("rbt.channel.QueueDeclare error : %v", err) return } // 生产数据 msg, err := rbt.channel.Consume( q.Name, // 队列名 "xmt", // 消费者的名字 true, // 是否自动应答 false, // 是否排他 false, // 若为true,示意 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者 false, // 是否阻塞 nil, // 其余属性 ) if err != nil { log.Printf("rbt.channel.Consume error : %v", err) return } id := randomString(32) err = rbt.channel.Publish( "", rbt.QueueName, false, false, amqp.Publishing{ ContentType: "test/plain", CorrelationId: id, ReplyTo: q.Name, Body: []byte("321"), }) if err != nil { log.Printf("rbt.channel.Publish error : %v", err) return } for data := range msg { log.Printf("received data is %v", string(data.Body)) }}
main.go
package mainimport ( "fmt" "log" "xmt/xmtmq")/*RabbimtMQ rpc 模式 案例利用场景:简略音讯队列的应用,一个生产者一个消费者生产音讯*/func main() { log.SetFlags(log.Llongfile | log.Ltime | log.Ldate) rbt := &xmtmq.RabbitMQ{ QueueName: "xmtqueue", MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq", } xmtmq.NewRabbitMQ(rbt) rbt.Produce([]byte(fmt.Sprintf("hello wolrd")))}
consumer.go
package mainimport ( "log" "math/rand" "time" "xmt/xmtmq")func main() { log.SetFlags(log.Llongfile | log.Ltime | log.Ldate) rand.Seed(time.Now().UTC().UnixNano()) rbt := &xmtmq.RabbitMQ{ QueueName: "xmtqueue", MQUrl: "amqp://guest:guest@127.0.0.1:5672/xmtmq", } xmtmq.NewRabbitMQ(rbt) rbt.Consume()}
咱们先运行消费者,多运行几个,能够看到咱们的队列中曾经有数据了,咱们运行的是2个消费者,因而此处是 2
再运行生产者,就能看到生产者将消费者发送的音讯生产掉,并且通过 CorrelationId
找到对应消费者监听的队列,将数据发送到队列中
消费者监听的队列有数据了,消费者就取出来进行生产
总结
RabbitMQ
的六种工作模式:
- single 模式
- work 模式
- publish / subscribe 模式
- routing 模式
- topic 模式
- rpc 模式
参考资料:
RabbitMQ Tutorials
欢送点赞,关注,珍藏
敌人们,你的反对和激励,是我保持分享,提高质量的能源
好了,本次就到这里
技术是凋谢的,咱们的心态,更应是凋谢的。拥抱变动,背阴而生,致力向前行。
我是小魔童哪吒,欢送点赞关注珍藏,下次见~