RabbitMQ入门5消息确认模式和幂等性

56次阅读

共计 2533 个字符,预计需要花费 7 分钟才能阅读完成。

1. 消息确认模式

在 RabbitMQ 中, 消息确认主要有生产者发生确认和消费者接收确认

1.1 生产者发送确认

生产者发送消息到 RabbitMQ 服务器, 如果 RabbitMQ 服务器收到消息, 则会给生产者一个应答, 用于告诉生产者该消息已经成功到达 RabbitMQ 服务器中

1.2 消费者接收确认

用于确认消费者是否成功消费了该条消息
消息确认实现方式有两种

  1. 通过事务的方式
  2. confirm 确认机制, 因为事务模式比较消耗性能, 在实际工作中用的也不多

2. 生产者发送确认

2.1 开启 confirm 模式

当 Channel.Confirm(noWait bool)参数设置为 false 时,broker 会返回一个 confirm.ok 表示同意发送者将当前 channel 信道设置为 confirm 模式。
其他代码和 transaction 模式类似,只是没有 Channel.TxCommit()和 Channel.TxRollback()。

err = channel.Confirm(false)

2.2 以 confirm 模式发送消息

package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "rmq/db/rmq"
)

var (
    channel *amqp.Channel
    err     error
    queue   amqp.Queue
    conn    *amqp.Connection
)

func main() {conn, err = rmq.GetConn()

    defer conn.Close()

    channel, err = conn.Channel()

    if err != nil {fmt.Printf("error: %s \n", err.Error())
        return
    }

    defer channel.Close()

    err = channel.Confirm(false)

    if err != nil {fmt.Printf("error: %s \n", err.Error())
        return
    }

    queue, err = channel.QueueDeclare("confirm:message", false, false, false, false, nil)
    if err != nil {fmt.Printf("error: %s \n", err.Error())
        return
    }

    confirms := channel.NotifyPublish(make(chan amqp.Confirmation, 1))

    defer confirmOne(confirms)
    err = channel.Publish("", queue.Name, false, false, amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte("confirm message"),
    })

    if err != nil {fmt.Printf("error: %s \n", err.Error())
        return
    }

    fmt.Println("消息发送成功")

}

func confirmOne(confirms <-chan amqp.Confirmation) {
    if confirmed := <-confirms; confirmed.Ack {fmt.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
    } else {fmt.Printf("confirmed delivery of delivery tag: %d", confirmed.DeliveryTag)
    }
}

消息拒绝

_ = d.Nack(false, false) // 手动拒绝消息 可以拒绝多条消息 第二个参数设置为 true 将再次放入队列中
_ = d.Reject(true) // 手动拒绝消息 只能拒绝一条消息 为 true 将再次放入队列中
_ = d.Ack(false) // 手动确认

1. 简介

消息幂等性其实就是保证同一个消息不被消费者重复消费两次
当消费者消费完之后, 通常会发送一个 ack 应答确认消息给生产者
但是这中间有可能因为网络中断等原因, 导致生产者未能收到确认消息, 有此这条消息将被重复发送给消费者消费, 实际上这条消息已经被消费过了, 这就是重复消费的问题!!!

1.1 如何避免重复消费

  • 消息全局 ID 或者写个唯一标识 (时间戳,uuid 等), 每次消费消息之前根据消息 id 去判断该消息是否已被消费过, 如果已经消费国, 则不处理该消息, 否则正常消费, 并且进行入库操作(消息全局 ID 作为数据库表的主键, 防止重复)
  • 利用 redis 的 setnx 命令, 给消息分配一个全局 ID, 只要消费过该消息, 将 id message k:v 形式写入 redis 消费者开始消费前 先去 redis 查询有没有消费记录

1.2 代码演示

生产者
channel.Publish("", queue.Name, false, false,
            amqp.Publishing{MessageId:   uuid.NewV4().String(),
                Timestamp: time.Now(),
                ContentType: "text/plain",
                Body:        []byte(fmt.Sprintf("hello---%d", i)),
            })
消费者
go func() {
        for d := range megs {err = db.GetRedis().Get(d.MessageId).Err()
            if err != redis.Nil {
                // 消息已被消费 忽略
                logger.Warn("消息已被消费 忽略 %s", d.MessageId)
                _ = d.Reject(false)
                continue
            }

            logger.Info("messageBody: %s", d.Body)
            logger.Info("messageID: %s", d.MessageId)
            logger.Info("messageID: %s", d.Timestamp.Format("2006-01-02 15:04:05"))

            if err := d.Ack(false); err != nil {logger.Error("消息确认失败")
            } else {db.GetRedis().SetNX(d.MessageId, d.Body, time.Hour*2)
                logger.Warn("设置消息 id")
            }
        }
    }()

正文完
 0