关于消息队列:消息队列RabbitMQ简单使用

7次阅读

共计 2815 个字符,预计需要花费 8 分钟才能阅读完成。

装置应用

  • 地址:http://www.rabbitmq.com/
  • 须要依据不同的版本抉择不同的 erlang

装置 erlang

装置 RabbitMQ 首先须要装置 erlang 环境,依据 GitHub 上 erlang 中 README 配置 yum 源

将下面的内容写入红线标注的文件中保留退出,而后装置 erlang

yum update -y
yum install -y erlang-23.3.4

装置 RabbitMQ

下载实现 RabbitMQ 的安装包后执行上面的命令装置

yum install rabbitmq-server-3.8.19-1.el7.noarch.rpm

装置实现后能够看到有 6 个相干的可执行程序

其中外围是 rabbitmq-server

RabbitMQ 的应用

启动

rabbitmq-server start & # 后盾启动 

启动 RabbitMQ 图形化治理界面

RabbitMQ 的图形化治理界面模式是没有关上的,须要咱们手动去关上,rabbitmq-plugins 就是治理各种插件的工具

rabbitmq-plugins list # 查看所有插件
rabbitmq-plugins enable rabbitmq_management # 关上图形化界面
rabbitmq-plugins disable rabbitmq_management # 敞开图形化界面 

拜访图形化治理界面

关上图形化治理界面插件后就能够通过本机浏览器拜访 RabbitMQ 了:http://127.0.0.1:15672/ 用户名和明码都是 guest

如果要在其余机器上拜访该界面则须要配置用户名和明码,权限等

rabbitmqctl add_user Guest Guest # 增加用户
rabbitmqctl set_user_tags Guest administrator # 赋予该用户管理员角色
rabbitmqctl set_permissions -p / Guest ".*" ".*" ".*" # 给该用户赋予权限 

这样就能够在其余机器上拜访 RabbitMQ 的治理界面了

上面通过 Go 语言实现一个简略的音讯队列的利用

consumer.go

package main

import (
   "fmt"
   "github.com/streadway/amqp"
)

// 消费者

func main() {
   //1. 建设连贯
   connection, err := amqp.Dial("amqp://Guest:Guest@192.168.27.139:5672")
   if err != nil {panic(err)
   }
   defer connection.Close()
   //2. 设置通道
   ch, err := connection.Channel()
   if err != nil {panic(err)
   }
   //3. 定义 exchange
   //err = ch.ExchangeDeclare("exchange-name", "direct", false, false, false, false, nil)
   err = ch.ExchangeDeclare("exchange-name-topic", "topic", false, false, false, false, nil)
   if err != nil {panic(err)
   }
   //4. 定义队列 queue
   queue, err := ch.QueueDeclare("", false, false,false,false, nil)
   if err != nil {panic(err)
   }
   //5. 绑定 bind
   //err = ch.QueueBind(queue.Name, "direct_key", "exchange-name", false, nil)
   //topic 含糊匹配
   err = ch.QueueBind(queue.Name, "topic.#", "exchange-name-topic", false, nil)
   if err != nil {panic(err)
   }
   //6. 接管音讯
   consume_msg, err := ch.Consume(queue.Name, "", false, false, false, false, nil)
   if err != nil {panic(err)
   }
   //7. 打印消息
   //msg := <-consume_msg
   //fmt.Printf("received msg: %s\n", msg.Body)
   for msg := range consume_msg {fmt.Printf("received msg: %s\n", msg.Body)
   }
}

producer.go

package main

import "github.com/streadway/amqp"

// 生产者

func main() {
   //1. 建设连贯
   connection, err := amqp.Dial("amqp://Guest:Guest@192.168.27.139:5672")
   if err != nil {panic(err)
   }
   defer connection.Close()
   //2. 设置通道
   ch, err := connection.Channel()
   if err != nil {panic(err)
   }
   //3. 定义 exchange
   //err = ch.ExchangeDeclare("exchange-name", "direct", false, false, false, false, nil)
   err = ch.ExchangeDeclare("exchange-name-topic", "topic", false, false, false, false, nil)
   if err != nil {panic(err)
   }
   //4. 定义队列 queue
   //queue, err := ch.QueueDeclare("", false, false,false,false, nil)
   //if err != nil {// panic(err)
   //}
   //5. 绑定 bind
   //6. 定义公布的音讯
   //msg := "hello"
   //7. 公布音讯
   //err = ch.Publish("exchange-name", "direct_key", false, false, amqp.Publishing{Body: []byte(msg)})
   err = ch.Publish("exchange-name-topic", "topic.first", false, false, amqp.Publishing{Body: []byte("topic1")})
   err = ch.Publish("exchange-name-topic", "TOPIC.eg", false, false, amqp.Publishing{Body: []byte("topic-eg")})
   err = ch.Publish("exchange-name-topic", "topic.last", false, false, amqp.Publishing{Body: []byte("topic2")})
   if err != nil {panic(err)
   }
}
正文完
 0