咱们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的根本利用

嗨,大家好,我是小魔童哪吒,咱们从明天开始进入开源组件的学习,一边学习一边总结一边分享

文章提纲如下:

  • RabbitMQ 成员组成
  • RabbitMQ 的六种工作模式编码

RabbitMQ 成员组成

  • 生产者 producer
  • 消费者 consumer
  • 交换机 exchange

用于承受、调配音讯

  • 音讯 message
  • 队列 queue

用于存储生产者的音讯

  • 信道 channel AMQP

音讯推送应用的通道

  • 连贯 connections

生成者或者消费者与Rabbit 建设的TCP 连贯

  • 路由键 routingKey

用于把生成者的数据调配到交换器上

  • 绑定键 BindingKey

用于把交换器的音讯绑定到队列上

  • 连贯管理器 ConnectionFactory

应用程序与 Rabbit 之间建设连贯的管理器,程序代码中应用

RabbitMQ 的六种工作模式编码

single 模式

  • 音讯产生者将音讯放入队列
  • 音讯的消费者监听音讯队列,如果队列中有音讯就生产掉

目录如下:

.├── consumer.go├── go.mod├── go.sum├── main.go└── xmtmq    └── xmtmq.go

理论编码如下:

每种模式的编码思路如下:

生产者 / 消费者

  • 连贯 RabbitMQ 的 server
  • 初始化连贯 connection
  • 初始化通道 channel
  • 初始化交换机 exchange
  • 初始化队列 queue
  • 应用路由key,绑定队列 bind , key
  • 生产音讯 / 生产音讯 produce , consume

音讯xmtmq.go

package xmtmqimport (   "github.com/streadway/amqp"   "log")// single 模式// 定义 RabbitMQ 的数据结构// go get github.com/streadway/amqptype RabbitMQ struct {   conn      *amqp.Connection // 连贯   channel   *amqp.Channel    // 通道   QueueName string           // 队列名   Exchange  string           // 交换机   Key       string           // 路由键   MQUrl     string           // MQ的虚拟机地址}// New 一个 RabbitMQfunc NewRabbitMQ(rbt *RabbitMQ) {   if rbt == nil || rbt.QueueName == ""  || rbt.MQUrl == "" {      log.Panic("please check QueueName,Exchange,MQUrl ...")   }   conn, err := amqp.Dial(rbt.MQUrl)   if err != nil {      log.Panicf("amqp.Dial error : %v", err)   }   rbt.conn = conn   channel, err := rbt.conn.Channel()   if err != nil {      log.Panicf("rbt.conn.Channel error : %v", err)   }   rbt.channel = channel}func RabbitMQFree(rbt *RabbitMQ){   if rbt == nil{      log.Printf("rbt is nil,free failed")      return   }   rbt.channel.Close()   rbt.conn.Close()}func (rbt *RabbitMQ) Init() {   // 申请队列   _, err := rbt.channel.QueueDeclare(      rbt.QueueName, // 队列名      true,          // 是否长久化      false,         // 是否主动删除      false,         // 是否排他      false,         // 是否阻塞      nil,           // 其余参数   )   if err != nil {      log.Printf("rbt.channel.QueueDeclare error : %v", err)      return   }}// 生产音讯func (rbt *RabbitMQ) Produce(data []byte) {   // 向队列中退出数据   err := rbt.channel.Publish(      rbt.Exchange,        // 交换机      rbt.QueueName,       // 队列名      false,    // 若为true,依据本身exchange类型和routekey规定无奈找到符合条件的队列会把音讯返还给发送者      false,    // 若为true,当exchange发送音讯到队列后发现队列上没有消费者,则会把音讯返还给发送者      amqp.Publishing{         ContentType: "text/plain",         Body:        data,      },   )   if err != nil {      log.Printf("rbt.channel.Publish error : %v", err)      return   }   return}// 生产音讯func (rbt *RabbitMQ) Consume() {   // 生产数据   msg, err := rbt.channel.Consume(      rbt.QueueName,    // 队列名      "xmt",    // 消费者的名字      true,     // 是否自动应答      false,    // 是否排他      false,    // 若为true,示意 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者      false,    // 是否阻塞      nil,         // 其余属性   )   if err != nil {      log.Printf("rbt.channel.Consume error : %v", err)      return   }   for data := range msg {      log.Printf("received data is %v", string(data.Body))   }}

main.go

package mainimport (   "fmt"   "log"   "time"   "xmt/xmtmq")/*RabbimtMQ single 模式 案例利用场景:简略音讯队列的应用,一个生产者一个消费者生产音讯*/func main() {    // 设置日志   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)   rbt := &xmtmq.RabbitMQ{      QueueName: "xmtqueue",      MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",   }       xmtmq.NewRabbitMQ(rbt)   var index = 0   for {       // 生产音讯      rbt.Produce([]byte(fmt.Sprintf("hello wolrd %d ", index)))      log.Println("发送胜利 ", index)      index++      time.Sleep(1 * time.Second)   }}

consumer.go

package mainimport (   "log"   "xmt/xmtmq")func main() {   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)   rbt := &xmtmq.RabbitMQ{      QueueName: "xmtqueue",      MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",   }   xmtmq.NewRabbitMQ(rbt)   rbt.Consume()}

运行的时候,关上2个终端

终端1:go run main.go

终端2:go run consumer.go

work 模式

多个生产端生产同一个队列中的音讯,队列采纳轮询的形式将音讯是均匀发送给消费者,此处的资源是竞争关系

当生产者生产音讯的速度大于消费者生产的速度,就要思考用 work 工作模式,这样能进步处理速度进步负载

work 模式与 single 模式相似, 只是work 模式比 single 模式多了一些消费者

基于single 模式,开一个终端3 : go run consumer.go

publish / subscribe 模式

publish / subscribe 公布订阅模式 , 绝对于Work queues模式多了一个交换机,此处的资源是共享的

用于场景

  • 邮件群发
  • 群聊天
  • 播送(广告等)

目录和上述编码保持一致:

xmtmq.go

开始用到交换机 exchange ,fanout 类型

生产端先把音讯发送到交换机,再由交换机把音讯发送到绑定的队列中,每个绑定的队列都能收到由生产端发送的音讯

package xmtmqimport (   "github.com/streadway/amqp"   "log")// publish 模式// 定义 RabbitMQ 的数据结构// go get github.com/streadway/amqptype RabbitMQ struct {   conn      *amqp.Connection // 连贯   channel   *amqp.Channel    // 通道   QueueName string           // 队列名   Exchange  string           // 交换机   Key       string           // 路由键   MQUrl     string           // MQ的虚拟机地址}// New 一个 RabbitMQfunc NewRabbitMQ(rbt *RabbitMQ) {   if rbt == nil || rbt.Exchange == "" || rbt.MQUrl == "" {      log.Panic("please check Exchange,MQUrl ...")   }   conn, err := amqp.Dial(rbt.MQUrl)   if err != nil {      log.Panicf("amqp.Dial error : %v", err)   }   rbt.conn = conn   channel, err := rbt.conn.Channel()   if err != nil {      log.Panicf("rbt.conn.Channel error : %v", err)   }   rbt.channel = channel}func RabbitMQFree(rbt *RabbitMQ) {   if rbt == nil {      log.Printf("rbt is nil,free failed")      return   }   rbt.channel.Close()   rbt.conn.Close()}func (rbt *RabbitMQ) Init() {   // 1、创立交换机   err := rbt.channel.ExchangeDeclare(      rbt.Exchange,        // 交换机      amqp.ExchangeFanout, // 交换机类型      true,                // 是否长久化      false,               //是否主动删除      false,               //true示意这个exchange不能够被client用来推送音讯,仅用来进行exchange和exchange之间的绑定      false,               // 是否阻塞      nil,                 // 其余属性   )   if err != nil {      log.Printf("rbt.channel.ExchangeDeclare error : %v", err)      return   }}// 生产音讯 publishfunc (rbt *RabbitMQ) PublishMsg(data []byte) {   // 1、向队列中退出数据   err := rbt.channel.Publish(      rbt.Exchange, // 交换机      "",           // 队列名      false,        // 若为true,依据本身exchange类型和routekey规定无奈找到符合条件的队列会把音讯返还给发送者      false,        // 若为true,当exchange发送音讯到队列后发现队列上没有消费者,则会把音讯返还给发送者      amqp.Publishing{         ContentType: "text/plain",         Body:        data,      },   )   if err != nil {      log.Printf("rbt.channel.Publish error : %v", err)      return   }   return}// 生产音讯func (rbt *RabbitMQ) SubscribeMsg() {   // 1、创立队列   q, err := rbt.channel.QueueDeclare(      "", // 此处咱们传入的是空,则是随机产生队列的名称      true,      false,      false,      false,      nil,   )   if err != nil {      log.Printf("rbt.channel.QueueDeclare error : %v", err)      return   }   // 2、绑定队列   err = rbt.channel.QueueBind(      q.Name,       // 队列名字      "",           // 在publish模式下,这里key 为空      rbt.Exchange, // 交换机名称      false,        // 是否阻塞      nil,          // 其余属性   )   if err != nil {      log.Printf("rbt.channel.QueueBind error : %v", err)      return   }   // 3、生产数据   msg, err := rbt.channel.Consume(      q.Name, // 队列名      "xmt",  // 消费者的名字      true,   // 是否自动应答      false,  // 是否排他      false,  // 若为true,示意 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者      false,  // 是否阻塞      nil,    // 其余属性   )   if err != nil {      log.Printf("rbt.channel.Consume error : %v", err)      return   }   for data := range msg {      log.Printf("received data is %v", string(data.Body))   }}

main.go

package mainimport (   "fmt"   "log"   "time"   "xmt/xmtmq")/*RabbimtMQ publish 模式 案例利用场景:邮件群发,群聊天,播送(广告)生产音讯*/func main() {   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)   rbt := &xmtmq.RabbitMQ{      Exchange:  "xmtPubEx",      MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",   }   xmtmq.NewRabbitMQ(rbt)   rbt.Init()   var index = 0   for {      rbt.PublishMsg([]byte(fmt.Sprintf("hello wolrd %d ", index)))      log.Println("发送胜利 ", index)      index++      time.Sleep(1 * time.Second)   }   xmtmq.RabbitMQFree(rbt)}

consumer.go

package mainimport (   "log"   "xmt/xmtmq")func main() {   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)   rbt := &xmtmq.RabbitMQ{      Exchange: "xmtPubEx",      MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",   }   xmtmq.NewRabbitMQ(rbt)   rbt.SubscribeMsg()   xmtmq.RabbitMQFree(rbt)}

执行的操作和上述保持一致

终端1:go run main.go

终端2:go run consumer.go

终端3:go run consumer.go

成果和上述single 模式和 work模式的显著区别是:公布订阅模式的案例,生产者生产的音讯,对应的消费者生产其生产的内容

routing 模式

音讯生产者将音讯发送给交换机依照路由判断,路由是字符串 以后产生的音讯携带路由字符(对象的办法),交换机依据路由的key,只能匹配上路由key对应的音讯队列,对应的消费者能力生产音讯

利用场景:从零碎的代码逻辑中获取对应的性能字符串,将音讯工作扔到对应的队列中业务场景,例如处理错误,解决特定音讯等

生产者解决流程:

申明队列并申明交换机 -> 创立连贯 -> 创立通道 -> 通道申明交换机 -> 通道申明队列 -> 通过通道使队列绑定到交换机并指定该队列的routingkey(通配符) -> 制订音讯 -> 发送音讯并指定routingkey(通配符)

消费者解决流程:

申明队列并申明交换机 -> 创立连贯 -> 创立通道 -> 通道申明交换机 -> 通道申明队列 -> 通过通道使队列绑定到交换机并指定routingkey(通配符) -> 重写音讯生产办法 -> 执行音讯办法

目录构造如下:

.├── consumer2.go├── consumer.go├── go.mod├── go.sum├── main.go└── xmtmq    └── xmtmq.go

xmtmq.go

  • 用到交换机 为 direct 类型
  • 用到路由键
package xmtmqimport (   "github.com/streadway/amqp"   "log")// routing 模式// 定义 RabbitMQ 的数据结构// go get github.com/streadway/amqptype RabbitMQ struct {   conn      *amqp.Connection // 连贯   channel   *amqp.Channel    // 通道   QueueName string           // 队列名   Exchange  string           // 交换机   Key       string           // 路由键   MQUrl     string           // MQ的虚拟机地址}// New 一个 RabbitMQfunc NewRabbitMQ(rbt *RabbitMQ) {   if rbt == nil || rbt.Exchange == "" || rbt.QueueName == "" || rbt.Key == "" || rbt.MQUrl == "" {      log.Panic("please check Exchange,,QueueName,Key,MQUrl ...")   }   conn, err := amqp.Dial(rbt.MQUrl)   if err != nil {      log.Panicf("amqp.Dial error : %v", err)   }   rbt.conn = conn   channel, err := rbt.conn.Channel()   if err != nil {      log.Panicf("rbt.conn.Channel error : %v", err)   }   rbt.channel = channel}func RabbitMQFree(rbt *RabbitMQ) {   if rbt == nil {      log.Printf("rbt is nil,free failed")      return   }   rbt.channel.Close()   rbt.conn.Close()}func (rbt *RabbitMQ) Init() {   // 1、创立交换机   err := rbt.channel.ExchangeDeclare(      rbt.Exchange, // 交换机      amqp.ExchangeDirect,     // 交换机类型      true,         // 是否长久化      false,        //是否主动删除      false,        //true示意这个exchange不能够被client用来推送音讯,仅用来进行exchange和exchange之间的绑定      false,        // 是否阻塞      nil,          // 其余属性   )   if err != nil {      log.Printf("rbt.channel.ExchangeDeclare error : %v", err)      return   }   // 2、创立队列   _, err = rbt.channel.QueueDeclare(      rbt.QueueName, // 此处咱们传入的是空,则是随机产生队列的名称      true,      false,      false,      false,      nil,   )   if err != nil {      log.Printf("rbt.channel.QueueDeclare error : %v", err)      return   }   // 3、绑定队列   err = rbt.channel.QueueBind(      rbt.QueueName, // 队列名字      rbt.Key,       // routing,这里key 须要填      rbt.Exchange,  // 交换机名称      false,         // 是否阻塞      nil,           // 其余属性   )   if err != nil {      log.Printf("rbt.channel.QueueBind error : %v", err)      return   }}// 生产音讯 publishfunc (rbt *RabbitMQ) ProduceRouting(data []byte) {   // 1、向队列中退出数据   err := rbt.channel.Publish(      rbt.Exchange, // 交换机      rbt.Key,      // key      false,        // 若为true,依据本身exchange类型和routekey规定无奈找到符合条件的队列会把音讯返还给发送者      false,        // 若为true,当exchange发送音讯到队列后发现队列上没有消费者,则会把音讯返还给发送者      amqp.Publishing{         ContentType: "text/plain",         Body:        data,      },   )   if err != nil {      log.Printf("rbt.channel.Publish error : %v", err)      return   }   return}// 生产音讯func (rbt *RabbitMQ) ConsumeRoutingMsg() {   // 4、生产数据   msg, err := rbt.channel.Consume(      rbt.QueueName, // 队列名      "",     // 消费者的名字      true,   // 是否自动应答      false,  // 是否排他      false,  // 若为true,示意 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者      false,  // 是否阻塞      nil,    // 其余属性   )   if err != nil {      log.Printf("rbt.channel.Consume error : %v", err)      return   }   for data := range msg {      log.Printf("received data is %v", string(data.Body))   }}

main.go

package mainimport (   "fmt"   "log"   "time"   "xmt/xmtmq")/*RabbimtMQ routing 模式 案例利用场景:从零碎的代码逻辑中获取对应的性能字符串,将音讯工作扔到对应的队列中业务场景,例如处理错误,解决特定音讯等生产音讯*/func main() {   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)   rbt1 := &xmtmq.RabbitMQ{      Exchange: "xmtPubEx2",      Key: "xmt1",      QueueName: "Routingqueuexmt1",      MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",   }   xmtmq.NewRabbitMQ(rbt1)   rbt1.Init()   rbt2 := &xmtmq.RabbitMQ{      Exchange: "xmtPubEx2",      Key: "xmt2",      QueueName: "Routingqueuexmt2",      MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",   }   xmtmq.NewRabbitMQ(rbt2)   rbt2.Init()   var index = 0   for {      rbt1.ProduceRouting([]byte(fmt.Sprintf("hello wolrd xmt1  %d ", index)))      log.Println("发送胜利xmt1  ", index)      rbt2.ProduceRouting([]byte(fmt.Sprintf("hello wolrd xmt2  %d ", index)))      log.Println("发送胜利xmt2  ", index)      index++      time.Sleep(1 * time.Second)   }   xmtmq.RabbitMQFree(rbt1)   xmtmq.RabbitMQFree(rbt2)}

consumer.go

package mainimport (   "log"   "xmt/xmtmq")func main() {   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)   rbt := &xmtmq.RabbitMQ{      Exchange: "xmtPubEx2",      Key: "xmt1",      QueueName: "Routingqueuexmt1",      MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",   }   xmtmq.NewRabbitMQ(rbt)   rbt.ConsumeRoutingMsg()   xmtmq.RabbitMQFree(rbt)}

consumer2.go

package mainimport (   "log"   "xmt/xmtmq")func main() {   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)   rbt := &xmtmq.RabbitMQ{      Exchange: "xmtPubEx2",      Key:      "xmt2",      QueueName: "Routingqueuexmt2",      MQUrl:    "amqp://guest:guest@127.0.0.1:5672/xmtmq",   }   xmtmq.NewRabbitMQ(rbt)   rbt.ConsumeRoutingMsg()   xmtmq.RabbitMQFree(rbt)}

topic 模式

话题模式,一个音讯被多个消费者获取,音讯的指标 queue 可用 BindingKey 的通配符

Topics 模式实际上是路由模式的一种

他俩的最大的区别是 : Topics 模式发送音讯和生产音讯的时候是通过通配符去进行匹配的

  • *号代表能够同通配一个单词
  • 号代表能够通配零个或多个单词

编码的案例与上述 routing 模式放弃始终,只是 exchange 为 topic类型

如下是上述几种模式波及到的交换机队列

rpc 模式

RPC 近程过程调用,客户端近程调用服务端的办法 ,应用 MQ 能够实现 RPC 的异步调用

目录构造为:

.├── consumer.go├── go.mod├── go.sum├── main.go└── xmtmq    └── xmtmq.go
  • 客户端即是生产者也是消费者,向 RPC 申请队列发送 RPC 调用音讯,同时监听RPC响应队列
  • 服务端监听RPC申请队列的音讯,收到音讯后执行服务端的办法,失去办法返回的后果
  • 服务端将RPC办法 的后果发送到RPC响应队列。
  • 客户端监听RPC响应队列,接管到RPC调用后果

xmtmq.go

package xmtmqimport (   "github.com/streadway/amqp"   "log"   "math/rand")// rpc 模式// 定义 RabbitMQ 的数据结构// go get github.com/streadway/amqptype RabbitMQ struct {   conn      *amqp.Connection // 连贯   channel   *amqp.Channel    // 通道   QueueName string           // 队列名   Exchange  string           // 交换机   Key       string           // 路由键   MQUrl     string           // MQ的虚拟机地址}// New 一个 RabbitMQfunc NewRabbitMQ(rbt *RabbitMQ) {   if rbt == nil || rbt.QueueName == "" || rbt.MQUrl == "" {      log.Panic("please check QueueName,Exchange,MQUrl ...")   }   conn, err := amqp.Dial(rbt.MQUrl)   if err != nil {      log.Panicf("amqp.Dial error : %v", err)   }   rbt.conn = conn   channel, err := rbt.conn.Channel()   if err != nil {      log.Panicf("rbt.conn.Channel error : %v", err)   }   rbt.channel = channel}func RabbitMQFree(rbt *RabbitMQ) {   if rbt == nil {      log.Printf("rbt is nil,free failed")      return   }   rbt.channel.Close()   rbt.conn.Close()}// 生产音讯func (rbt *RabbitMQ) Produce(data []byte) {   // 申请队列   q, err := rbt.channel.QueueDeclare(      rbt.QueueName, // 队列名      true,          // 是否长久化      false,         // 是否主动删除      false,         // 是否排他      false,         // 是否阻塞      nil,           // 其余参数   )   if err != nil {      log.Printf("rbt.channel.QueueDeclare error : %v", err)      return   }   err = rbt.channel.Qos(1, 0, false)   if err != nil {      log.Printf("rbt.channel.Qos error : %v", err)      return   }   d, err := rbt.channel.Consume(      q.Name,      "",      false,      false,      false,      false,      nil)   if err != nil {      log.Printf("rbt.channel.Consume error : %v", err)      return   }   for msg := range d {      log.Println("received msg is  ", string(msg.Body))      err := rbt.channel.Publish(         "",         msg.ReplyTo,         false,         false,         amqp.Publishing{            ContentType:   "test/plain",            CorrelationId: msg.CorrelationId,            Body:          data,         })      if err != nil {         log.Printf("rbt.channel.Publish error : %v", err)         return      }      msg.Ack(false)      log.Println("svr response ok ")   }   return}func randomString(l int) string {   bytes := make([]byte, l)   for i := 0; i < l; i++ {      bytes[i] = byte(rand.Intn(l))   }   return string(bytes)}// 生产音讯func (rbt *RabbitMQ) Consume() {   // 申请队列   q, err := rbt.channel.QueueDeclare(      "",    // 队列名      true,  // 是否长久化      false, // 是否主动删除      false, // 是否排他      false, // 是否阻塞      nil,   // 其余参数   )   if err != nil {      log.Printf("rbt.channel.QueueDeclare error : %v", err)      return   }   // 生产数据   msg, err := rbt.channel.Consume(      q.Name, // 队列名      "xmt",  // 消费者的名字      true,   // 是否自动应答      false,  // 是否排他      false,  // 若为true,示意 不能将同一个Conenction中生产者发送的消息传递给这个Connection中 的消费者      false,  // 是否阻塞      nil,    // 其余属性   )   if err != nil {      log.Printf("rbt.channel.Consume error : %v", err)      return   }   id := randomString(32)   err = rbt.channel.Publish(      "",      rbt.QueueName,      false,      false,      amqp.Publishing{         ContentType:   "test/plain",         CorrelationId: id,         ReplyTo:       q.Name,         Body:          []byte("321"),      })   if err != nil {      log.Printf("rbt.channel.Publish error : %v", err)      return   }   for data := range msg {      log.Printf("received data is %v", string(data.Body))   }}

main.go

package mainimport (   "fmt"   "log"   "xmt/xmtmq")/*RabbimtMQ rpc 模式 案例利用场景:简略音讯队列的应用,一个生产者一个消费者生产音讯*/func main() {   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)   rbt := &xmtmq.RabbitMQ{      QueueName: "xmtqueue",      MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",   }   xmtmq.NewRabbitMQ(rbt)   rbt.Produce([]byte(fmt.Sprintf("hello wolrd")))}

consumer.go

package mainimport (   "log"   "math/rand"   "time"   "xmt/xmtmq")func main() {   log.SetFlags(log.Llongfile | log.Ltime | log.Ldate)   rand.Seed(time.Now().UTC().UnixNano())   rbt := &xmtmq.RabbitMQ{      QueueName: "xmtqueue",      MQUrl:     "amqp://guest:guest@127.0.0.1:5672/xmtmq",   }   xmtmq.NewRabbitMQ(rbt)   rbt.Consume()}

咱们先运行消费者,多运行几个,能够看到咱们的队列中曾经有数据了,咱们运行的是2个消费者,因而此处是 2

再运行生产者,就能看到生产者将消费者发送的音讯生产掉,并且通过 CorrelationId 找到对应消费者监听的队列,将数据发送到队列中

消费者监听的队列有数据了,消费者就取出来进行生产

总结

RabbitMQ 的六种工作模式:

  • single 模式
  • work 模式
  • publish / subscribe 模式
  • routing 模式
  • topic 模式
  • rpc 模式

参考资料:

RabbitMQ Tutorials

欢送点赞,关注,珍藏

敌人们,你的反对和激励,是我保持分享,提高质量的能源

好了,本次就到这里

技术是凋谢的,咱们的心态,更应是凋谢的。拥抱变动,背阴而生,致力向前行。

我是小魔童哪吒,欢送点赞关注珍藏,下次见~