共计 2533 个字符,预计需要花费 7 分钟才能阅读完成。
1. 消息确认模式
在 RabbitMQ 中, 消息确认主要有生产者发生确认和消费者接收确认
1.1 生产者发送确认
生产者发送消息到 RabbitMQ 服务器, 如果 RabbitMQ 服务器收到消息, 则会给生产者一个应答, 用于告诉生产者该消息已经成功到达 RabbitMQ 服务器中
1.2 消费者接收确认
用于确认消费者是否成功消费了该条消息
消息确认实现方式有两种
- 通过事务的方式
- confirm 确认机制, 因为事务模式比较消耗性能, 在实际工作中用的也不多
2. 生产者发送确认
2.1 开启 confirm 模式
当 Channel.Confirm(noWait bool)参数设置为 false 时,broker 会返回一个 confirm.ok 表示同意发送者将当前 channel 信道设置为 confirm 模式。
其他代码和 transaction 模式类似,只是没有 Channel.TxCommit()和 Channel.TxRollback()。
err = channel.Confirm(false)
2.2 以 confirm 模式发送消息
package main
import (
"fmt"
"github.com/streadway/amqp"
"rmq/db/rmq"
)
var (
channel *amqp.Channel
err error
queue amqp.Queue
conn *amqp.Connection
)
func main() {conn, err = rmq.GetConn()
defer conn.Close()
channel, err = conn.Channel()
if err != nil {fmt.Printf("error: %s \n", err.Error())
return
}
defer channel.Close()
err = channel.Confirm(false)
if err != nil {fmt.Printf("error: %s \n", err.Error())
return
}
queue, err = channel.QueueDeclare("confirm:message", false, false, false, false, nil)
if err != nil {fmt.Printf("error: %s \n", err.Error())
return
}
confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 1))
defer confirmOne(confirms)
err = channel.Publish("", queue.Name, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte("confirm message"),
})
if err != nil {fmt.Printf("error: %s \n", err.Error())
return
}
fmt.Println("消息发送成功")
}
func confirmOne(confirms <-chan amqp.Confirmation) {
if confirmed := <-confirms; confirmed.Ack {fmt.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
} else {fmt.Printf("confirmed delivery of delivery tag: %d", confirmed.DeliveryTag)
}
}
消息拒绝
_ = d.Nack(false, false) // 手动拒绝消息 可以拒绝多条消息 第二个参数设置为 true 将再次放入队列中
_ = d.Reject(true) // 手动拒绝消息 只能拒绝一条消息 为 true 将再次放入队列中
_ = d.Ack(false) // 手动确认
1. 简介
消息幂等性其实就是保证同一个消息不被消费者重复消费两次
当消费者消费完之后, 通常会发送一个 ack 应答确认消息给生产者
但是这中间有可能因为网络中断等原因, 导致生产者未能收到确认消息, 有此这条消息将被重复发送给消费者消费, 实际上这条消息已经被消费过了, 这就是重复消费的问题!!!
1.1 如何避免重复消费
- 消息全局 ID 或者写个唯一标识 (时间戳,uuid 等), 每次消费消息之前根据消息 id 去判断该消息是否已被消费过, 如果已经消费国, 则不处理该消息, 否则正常消费, 并且进行入库操作(消息全局 ID 作为数据库表的主键, 防止重复)
- 利用 redis 的 setnx 命令, 给消息分配一个全局 ID, 只要消费过该消息, 将 id message k:v 形式写入 redis 消费者开始消费前 先去 redis 查询有没有消费记录
1.2 代码演示
生产者
channel.Publish("", queue.Name, false, false,
amqp.Publishing{MessageId: uuid.NewV4().String(),
Timestamp: time.Now(),
ContentType: "text/plain",
Body: []byte(fmt.Sprintf("hello---%d", i)),
})
消费者
go func() {
for d := range megs {err = db.GetRedis().Get(d.MessageId).Err()
if err != redis.Nil {
// 消息已被消费 忽略
logger.Warn("消息已被消费 忽略 %s", d.MessageId)
_ = d.Reject(false)
continue
}
logger.Info("messageBody: %s", d.Body)
logger.Info("messageID: %s", d.MessageId)
logger.Info("messageID: %s", d.Timestamp.Format("2006-01-02 15:04:05"))
if err := d.Ack(false); err != nil {logger.Error("消息确认失败")
} else {db.GetRedis().SetNX(d.MessageId, d.Body, time.Hour*2)
logger.Warn("设置消息 id")
}
}
}()
正文完