4种不同的交换机类型
直连交换机:Direct exchange
扇形交换机:Fanout exchange
主题交换机:Topic exchange
首部交换机:Headers exchange
交换机具体含意参考 https://www.jianshu.com/p/469...
Fanout 交换机
以下代码参数具体含意能够参考 https://segmentfault.com/a/11...
Demo
生产者示例 producer.go
package mainimport ( "fmt" "github.com/streadway/amqp" "time")//因:疾速实现逻辑,故:不处理错误逻辑func main() { conn, _ := amqp.Dial("amqp://user:password@host:ip/vhost") ch, _ := conn.Channel() body := "Hello World! " + time.Now().Format("2006-01-02 15:04:05") fmt.Println(body) //申明交换器 ch.ExchangeDeclare("j_exch_fanout", amqp.ExchangeFanout, true, false, false, false, nil) // fanout ch.Publish( "j_exch_fanout", // exchange 这里为空则不抉择 exchange "j_exch_fanout_key", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), //Expiration: "3000", // 设置过期工夫 }) // defer 关键字 defer conn.Close() // 压栈 后进先出 defer ch.Close() // 压栈 后进先出}
消费者 consumer.go
package mainimport ( "github.com/streadway/amqp" "log")func main() { conn, _ := amqp.Dial("amqp://user:password@host:ip/vhost") ch, _ := conn.Channel() var exchange_name string = "j_exch_fanout" var routing_key string = "j_exch_fanout_key" var queue_name string = "j_queue" ch.QueueDeclare(queue_name, true, false, true, false, nil) //申明交换器 amqp.ExchangeFanout // 扇形交换机 ch.ExchangeDeclare(exchange_name, amqp.ExchangeFanout, true, false, false, false, nil) ch.QueueBind( queue_name, // queue name routing_key, // routing key: fanout 模式不失效 exchange_name, // exchange false, nil) //监听 msgs, _ := ch.Consume( queue_name, // queue name "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) forever := make(chan bool) go func() { for d := range msgs { //println("tset") log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever}