共计 6545 个字符,预计需要花费 17 分钟才能阅读完成。
0.1、索引
https://blog.waterflow.link/articles/1663772504649
RabbitMQ 是一个轻量级且易于部署的音讯队列。它反对开箱即用的多种消息传递协定。咱们将应用 AMQP(高级音讯队列协定)
1、概念
既然是音讯队列,顾名思义,必定会有 生产者 生产音讯,消费者 生产音讯,还会有 队列 用来保留音讯,等等。
咱们先来看下这些概念:
- Producer: 将音讯推送到 rabbitmq 交换机的利用
- Consumer: 从队列读取音讯并解决他们的利用
- Exchange: 交换机负责在 Binding 和 Routing key 的帮忙下,将音讯路由到不同的队列。从上图能够看出 rabbitmq 有多种类型的交换机
- Binding: Binding 是队列和交换机之间的链接
- Routing key: 交换机用来决定如何将音讯路由到队列的键。能够看做是音讯的地址
- Queue: 存储音讯的缓冲区
- Connection:生产者到 Broker(rabbitmq 服务),消费者到 Broker 的连贯
- Channel:为了复用一个连贯,一个 connection 下能够有多个 channel,能够把 connection 了解成电线,channel 就是电线外面的铜丝。
消息传递的残缺流程是这样的:
- 生产者初始化一个到 rabbitmq 服务的连贯
- 获取连贯的管道,通过管道申明一个交换机
- 通过管道申明一个队列,通过绑定的路由键将队列和交换机绑定(发送音讯的时候申明一个队列并绑定交换机,音讯会进到队列里。如果不申明也能够放到消费者去申明队列和绑定交换机。须要留神的是生产者没有申明队列的话,此时曾经生产多条音讯,而后去开启消费者生产,是不会生产到之前的音讯的)
- 通过管道发送音讯到指定的交换机
- 消费者初始化一个到 rabbitmq 服务的连贯
- 获取连贯的管道,通过管道申明一个队列
- 通过绑定的路由键将队列和交换机绑定
- 从队列中生产音讯
交换机类型:
- direct:间接指定到某个队列
- topic:公布订阅模式,一个交换机能够对应多个队列,通过路由规定匹配
- fanout:顾名思义,无脑播送模式
2、示例
生产者:
package main
import (
"fmt"
"time"
"github.com/streadway/amqp"
)
var (
conn *amqp.Connection
channel *amqp.Channel
queue amqp.Queue
mymsg = "Hello HaiCoder"
err error
confirms chan amqp.Confirmation
)
func main() {
// 建设连贯
conn, err = amqp.Dial("amqp://guest:[email protected]:5672/")
if err != nil {fmt.Println(err)
return
}
defer conn.Close()
// 创立 channel
if channel, err = conn.Channel(); err != nil {fmt.Println(err)
return
}
// 申明交换机
err = channel.ExchangeDeclare("liutest", amqp.ExchangeDirect, false, false, false, false, nil)
if err != nil {fmt.Println("ExchangeDeclare Err =", err)
return
}
// 创立队列
if queue, err = channel.QueueDeclare("liutest", false, false, false, false, nil); err != nil {fmt.Println("QueueDeclare Err =", err)
return
}
// 队列和交换机绑定
err = channel.QueueBind(queue.Name, "queueroutekey", "liutest", false, nil)
if err != nil {fmt.Println("QueueBind Err =", err)
return
}
channel.Confirm(false)
confirms = channel.NotifyPublish(make(chan amqp.Confirmation, 1))
// 发送数据
go func() {
for {
if err = channel.Publish("liutest", "queueroutekey", false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte(mymsg),
}); err != nil {fmt.Println("Publish Err =", err)
return
}
fmt.Println("Send msg ok, msg =", mymsg)
time.Sleep(time.Second * 5)
}
}()
go func() {
for confirm := range confirms {
if confirm.Ack {fmt.Printf("confirmed delivery with delivery tag: %d \n", confirm.DeliveryTag)
} else {fmt.Printf("confirmed delivery of delivery tag: %d \n", confirm.DeliveryTag)
}
}
}()
select {}}
消费者:
package main
import (
"fmt"
"github.com/streadway/amqp"
)
var (
conn *amqp.Connection
channel *amqp.Channel
queue amqp.Queue
err error
msgs <-chan amqp.Delivery
)
func main() {
// 建设连贯
conn, err = amqp.Dial("amqp://guest:[email protected]:5672/")
if err != nil {fmt.Println(err)
return
}
defer conn.Close()
// 创立 channel
if channel, err = conn.Channel(); err != nil {fmt.Println(err)
return
}
// 创立队列
if queue, err = channel.QueueDeclare("liutest", false, false, false, false, nil); err != nil {fmt.Println("QueueDeclare Err =", err)
return
}
err = channel.QueueBind("liutest", "queueroutekey", "liutest", false, nil)
if err != nil {fmt.Println("QueueBind Err =", err)
return
}
// 读取数据
if msgs, err = channel.Consume(queue.Name, "", false, false, false, false, nil); err != nil {fmt.Println("Consume Err =", err)
return
}
go func() {
for msg := range msgs {fmt.Println("Receive Msg =", string(msg.Body))
msg.Ack(false)
}
}()
select {}}
3、音讯可靠性
生产者可靠性
// 将通道设置为确认模式
func (ch *Channel) Confirm(noWait bool) error {
if err := ch.call(&confirmSelect{Nowait: noWait},
&confirmSelectOk{},); err != nil {return err}
ch.confirmM.Lock()
ch.confirming = true
ch.confirmM.Unlock()
return nil
}
// 用于承受服务端的确认响应
func (ch *Channel) NotifyPublish(confirm chan Confirmation) chan Confirmation {ch.notifyM.Lock()
defer ch.notifyM.Unlock()
if ch.noNotify {close(confirm)
} else {ch.confirms.Listen(confirm)
}
return confirm
}
Confirm 将此通道置为确认模式,以便生产者能够确保服务端已胜利接管所有音讯。进入该模式后,服务端将发送一个 basic.ack 或 basic.nack 音讯,其中 deliver tag 设置为一个基于 1 的增量索引(用来标识音讯的唯一性),对应于该办法返回后收到的每次 ack。
在 Channel.NotifyPublish 上监听以响应 ack。如果未调用 Channel.NotifyPublish,则 ack 将被疏忽。
ack 的程序不受投递音讯程序的束缚。
Ack 和 Nack 确认将在将来的某个工夫达到。
在告诉任何 Channel.NotifyReturn 侦听器后,立刻确认不可路由的 mandatory 或 immediate 音讯。当所有应该将音讯路由到它们的队列都已收到传递确认或已将音讯退出队列时,其余音讯将被确认,必要时将音讯长久化。
注:当 mandatory 标记位设置为 true 时,如果 exchange 依据本身类型和音讯 routingKey 无奈找到一个适合的 queue 存储音讯,那么 broker 会调用 basic.return 办法将音讯返还给生产者; 当 mandatory 设置为 false 时,呈现上述情况 broker 会间接将音讯抛弃; 艰深的讲,mandatory 标记通知 broker 代理服务器至多将音讯 route 到一个队列中,否则就将音讯 return 给发送者;
当 noWait 为真时,客户端不会期待响应。如果服务端不反对此办法,则可能会产生通道异样。
具体代码实现如下:
...
// 设置音讯确认
channel.Confirm(false)
confirms = channel.NotifyPublish(make(chan amqp.Confirmation, 1))
...
go func() {
for confirm := range confirms {
if confirm.Ack { // 音讯已确认
fmt.Printf("confirmed delivery with delivery tag: %d \n", confirm.DeliveryTag)
} else { // 未确认的音讯能够从新发送
fmt.Printf("failed confirmed delivery of delivery tag: %d \n", confirm.DeliveryTag)
}
}
}()
...
消费者可靠性
// 将 autoAck 设置为 false
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
// When we return from ch.call, there may be a delivery already for the
// consumer that hasn't been added to the consumer hash yet. Because of
// this, we never rely on the server picking a consumer tag for us.
if err := args.Validate(); err != nil {return nil, err}
if consumer == "" {consumer = uniqueConsumerTag()
}
req := &basicConsume{
Queue: queue,
ConsumerTag: consumer,
NoLocal: noLocal,
NoAck: autoAck,
Exclusive: exclusive,
NoWait: noWait,
Arguments: args,
}
res := &basicConsumeOk{}
deliveries := make(chan Delivery)
ch.consumers.add(consumer, deliveries)
if err := ch.call(req, res); err != nil {ch.consumers.cancel(consumer)
return nil, err
}
return (<-chan Delivery)(deliveries), nil
}
立刻开始生产排队的音讯。
在 Connection 或 Channel 上的任何其余操作之前开始接管返回的 chan Delivery。
音讯会持续往返回的 chan Delivery 传递,直到产生 Channel.Cancel、Connection.Close、Channel.Close 或 AMQP 异样。消费者必须在 chan 范畴内确保收到所有音讯。未收到的音讯将阻塞同一连贯上的所有办法。
AMQP 中的所有音讯都必须失去确认。消费者在胜利解决音讯后最好手动调用 Delivery.Ack。如果消费者被勾销或通道或连贯被敞开,任何未确认的音讯将在同一队列的开端从新入队。
消费者由一个字符串标识,该字符串是惟一的,实用于该 channal 上的所有消费者。如果心愿最终勾销消费者,请在 Channel.Cancel 中应用雷同的非空标识符。空字符串将导致从新成惟一标识。消费者身份将蕴含在 ConsumerTag 字段中的每个音讯中
当 autoAck(也称为 noAck)为真时,服务器将在将音讯写入网络之前向该消费者确认确认。当 autoAck 为真时,消费者不应调用 Delivery.Ack。<u> 主动确认音讯意味着如果服务器投递音讯后消费者无奈解决某些音讯,则可能会失落某些音讯 </u>。
当 exclusive 为 true 时,服务器将确保这是该队列中的惟一消费者。当 exclusive 为 false 时,服务器将在多个消费者之间偏心地散发音讯。RabbitMQ 不反对 noLocal 标记。倡议对 Channel.Publish 和 Channel.Consume 应用独自的连贯,免得在公布时 TCP 回推影响生产音讯的能力,因而这里次要是为了完整性。当 noWait 为 true 时,不要期待服务器确认申请并立刻开始生产。如果无奈生产,则会引发通道异样并敞开通道。
生产音讯时,将 autoAck 设置为 false
func (d Delivery) Ack(multiple bool) error {
if d.Acknowledger == nil {return errDeliveryNotInitialized}
return d.Acknowledger.Ack(d.DeliveryTag, multiple)
}
客户端生产到音讯后,须要调用 ack 确认接管到音讯
AMQP 中的所有音讯的投递都必须失去确认。如果应用 autoAck true 调用 Channel.Consume,那么服务端将主动确认每条音讯,然而不应该调用此办法,因为这个不能保障生产端业务解决胜利。所以,必须在胜利解决音讯后调用 Delivery.Ack。当 multiple 为真时,此音讯和同一通道上所有先前未确认的音讯将被确认,这对于音讯的批处理很有用(然而有个弊病就是,如果有一个出错了,所有批处理的数据都须要重发)。对于每个未主动确认的音讯,都必须调用 Delivery.Ack、Delivery.Reject 或 Delivery.Nack。
生产端的确认机制的实现:
...
// 读取数据
if msgs, err = channel.Consume(queue.Name, "", false, false, false, false, nil); err != nil {fmt.Println("Consume Err =", err)
return
}
go func() {
for msg := range msgs {fmt.Println("Receive Msg =", string(msg.Body))
// 确认音讯
msg.Ack(false)
}
}()
...