关于消息中间件:RabbitMQ-消息丢失也就这么回事

45次阅读

共计 4412 个字符,预计需要花费 12 分钟才能阅读完成。

大家好,我是小菜。
一个心愿可能成为 吹着牛 X 谈架构 的男人!如果你也想成为我想成为的人,不然点个关注做个伴,让小菜不再孤独!

本文次要介绍 RabbitMQ 的音讯失落问题

如有须要,能够参考

如有帮忙,不忘 点赞

微信公众号已开启,小菜良记,没关注的同学们记得关注哦!

是的,最终是对 RabbitMQ 下手了!

面试中常见的 RabbitMQ 面试题也是多了去了,常见的如下:

  • 音讯可靠性问题:如何确保发送的音讯至多被生产一次?
  • 提早音讯问题:如何实现音讯的提早投递?
  • 高可用问题:如何防止单点的 MQ 故障而导致的不可用问题?
  • 音讯沉积问题:如何解决数百万级以上音讯沉积,无奈及时生产问题?

这几个问题又得让你脑壳疼一阵子,是不是也在网上看了挺多博文介绍这方面的解决方案,然而却看了又忘,理论便是因为短少实操,这篇小菜便重点讲述下 RabbitMQ 如何解决音讯失落问题~

一、音讯可靠性问题

音讯可靠性问题咱们又可能将其了解为如何避免音讯失落?那为什么音讯会失落呢?咱们能够先看看音讯投递的整个过程:

咱们从图中能够从三个阶段剖析可能造成音讯失落:

  • publisher 发送音讯到 exchange
  • exchange 散发到 queue
  • queue 投递到 customer

既然咱们晓得了哪些阶段可能造成数据失落,那咱们就能够从源头防备于未然~!

工程构造

工程构造很简略,就是一个简略的 Spring Boot 我的项目,外面有个 消费者 生产者 两个模块

1、生产者发送失落

RabbitMQ 中提供了 publisher confirm 机制来防止音讯发送到 MQ 的过程中失落的问题。音讯发送到 MQ 当前,会返回一个确认后果给生产者,用于示意音讯是否确认胜利。该确认后果存在两种申请:

  • publisher-confirm

该类型是 发送者确认,存在两种状况

  1. 音讯胜利投递到交换机,返回 ack
  2. 音讯未投递到交换机,返回 nack
  • publisher-return

该类型是 发送者回执,存在两种状况

  1. 音讯投递到交换机,且胜利散发到队列,返回 ack
  2. 音讯投递到交换机,但未胜利散发到队列,返回 nack

留神:确认机制发送音讯时,须要给每个音讯设置一个全局惟一 ID,以辨别不同音讯,防止 ack 抵触

接下来咱们用代码来阐明具体的操作形式

1)配置文件

咱们首先看下 生产者 的配置文件

后面几个配置 RabbitMQ 的连贯信息没啥好讲的,咱们来看几个比拟生疏的配置

  • publisher-confirm-type

开启发送确认,这里能够反对两种类型

  1. simple:同步期待 confirm 后果,直到超时
  2. correlated:异步回调,定义 ConfirmCallback,MQ 返回后果时会回调这个 ConfirmCallback
  • publisher-returns

开启 public-return,同样是基于 CallBack 机制,不过是定义 ReturnCallback

  • template.mandatory

定义路由失败时的策略。

  • true:调用 ReturnCallback
  • false:间接抛弃音讯
2)定义回调事件

每个 RabbitTemplate 只能配置一个 ReturnCallback

3)发送音讯

执行发送代码之前,咱们确保曾经创立了(一个直连交换机direct-exchange,一个队列direct-queue,且绑定的 key 为direct

失常状况下,咱们执行代码必定是发送胜利的,能够看到控制台 绿色 输入

且咱们在音讯队列中也胜利接管到了音讯:

到这步是没有任何问题的,那咱们就须要手动给它制作点问题~ 咱们能够批改 交换机名称,这个时候发送音讯的时候找不到交换机,那么交换机必定就会返回 nack,再看是否能够进入到咱们代码中的判断:

代码执行尽管是绿色的,但因为 rabbitMQ 找不到正确的交换机,而导致音讯发送失败,也就是下图的这个过程:

这一个是 publish -> exchange 失败咱们顺利的捕捉到了,那么 exchange -> queue 这步的失败是咱们是否可能失常捕捉?咱们能够通过批改 路由 key 使交换机路由不到对应的 queue

能够发现当交换机没有路由到绝对应的 queue 时,也胜利触发了咱们自定义的回调函数,而后看 rabbitMQ 控制台是能够发现音讯曾经胜利投递到交换机

到这里,咱们通过两种简略的谬误模仿,使程序都能顺利的进入到咱们事后定义的回调中,如果遇到发送失败的状况,咱们能够在失败的回调中自定义 音讯重发 机制,最大水平上防止音讯失落的问题

4)总结

咱们能够通过 publisher-confirmpublisher-return 两种谬误捕捉机制,来防止 生产者 -> exchange -> queue 这条链路的音讯失落

  • publisher-confirm

    1. 音讯胜利发送到 exchange,返回 ack
    2. 音讯未能胜利发送到 exchange,返回 nack
    3. 音讯发送过程中出现异常,没有收到回执,则进入 failureCallback 回调
  • publisher-return

    1. 音讯胜利发送到 exchange,但没有路由到 queue,调用自定义回调函数 returnCallback

2、音讯存储失落

音讯存储失落是啥意思?其实就是 长久化 的概念,当音讯曾经胜利发送到 queue 时,这个时候如果消费者没有及时进行生产,rabbitMQ 又刚好宕机重启了,那么这个时候就会发现音讯失落了。

这是因为 MQ 默认是内存存储音讯,咱们能够通过开启长久化的性能来确保在 MQ 中的音讯不失落

其实咱们通过 RabbitMQ 提供的 GUI 创立交换机或队列的时候就能够发现有长久化的这个选项

如果将 durability 设为 durable 后,咱们能够发现无论如何重启 MQ,重启后交换机和队列仍然存在。

然而很多时候咱们 交换机 队列 的创立并非在 GUI 上创立,而是通过利用代码的形式创立

  • 交换机长久化

  • 队列长久化

  • 音讯长久化

默认状况下,AMQP 收回的音讯都是长久化的,不必特意指定

3、消费者生产失落

RabbitMQ 采取的机制是 当确认音讯被消费者生产后就会立刻删除

那么如何确认音讯已被消费者生产?那就还得依附回执来确认,消费者获取音讯后,须要向 RabbitMQ 发送 ack 回执,表明本人曾经解决音讯。其中 ack 在 AMQP 中有三种确认模式:

  • manual:手动 ack,须要在业务代码完结后,调用 api 发送 ack
  • auto:主动 ack,由 spring 监测 listener 代码是否出现异常,没有异样则返回 ack,反之返回 nack
  • none:敞开 ack,MQ 在音讯投递后会立刻删除音讯

上述三种形式都是通过批改配置文件:

1)manual

该形式须要用户本人手动确认,灵活性较好

这个时候如果执行逻辑是失常的,那么在 RabbitMQ 上就会将该音讯删除,然而如果执行的逻辑抛出了异样,没有进入到手动确认的环节,RabbitMQ 将会把该音讯保留:

2)auto

该形式在没有异样产生时会主动进行音讯确认

咱们在配置文件中将确认形式改为 auto 进行测试:

失常状况下接管音讯是没有任何问题的,那咱们同样制作些非正常状况:

咱们手动制作了点异样,发现音讯没有被 RabbitMQ 删除的同时,而且控制台始终在报错,无止境的在尝试从新生产,这如果放在线上环境不免有些令人解体。

当消费者出现异常后,音讯会一直 requeue(从新入队)到队列,再从新发送给消费者,而后再次异样,再次 requeue,有限循环,就会导致 MQ 的音讯解决飙升

而产生这种状况的起因所在便是因为 RabbitMQ 的 音讯失败重试机制,但很多时候咱们可能不想始终重试,只须要通过几次尝试,如果失败就放弃解决,这个时候咱们就须要在配置文件中配置失败重试机制:

开启该配置后,咱们重启我的项目进行察看

通过控制台能够看到在重试 3 次后,SpringAMQP 会抛出异样AmqpRejectAndDontRequeueException,阐明本地重试机制失效了。而且咱们回到 RabbitMQ 控制台能够看到对应音讯被删除了,阐明最初 SpringAMQP 返回的是 ack,导致音讯被 MQ 删除

然而这种解决形式并不 优雅 ,重试后间接删除音讯过于 暴力,那么有没有更好的解决形式?答案是有的!

咱们能够利用 AMQP 提供的 MessageRecovery 接口来实现,该接口有三种不同的实现形式:

  • RejectAndDontRequeueRecoverer:重试耗尽后,间接 reject,失落音讯。默认形式,以上就是采纳这种形式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,音讯从新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败音讯投递到指定的交换机

三种形式能够依据不同场景进行采纳,剖析一下,不难发现第三种 RepublishMessageRecoverer 是比拟优雅的~ 当重试失败后会将音讯投递到一个指定专门寄存异样音讯的队列,后续由人工集中进行解决!具体应用形式如下:

通过自定义异样解决后,咱们重启我的项目查看控制台:

能够发现重试 3 次后,咱们的异样音讯进入到了咱们自定义的异样队列中

3)none

该形式没啥好讲的~ 无论音讯异样与否 MQ 都会进行删除!

4、总结

如果这个时候面试再问你,如何确保 RabbitMQ 音讯的可靠性?那你可得好好唠嗑唠嗑

如何保障音讯不失落?

1)首先剖析失落的场景有哪些?

音讯失落可能产生在 发送时失落(未送达 exchange / 未路由到 queue)音讯未长久化而 MQ 宕机 消费者接管音讯未能正确生产

2)而后如何预防
  • 开启生产者确认机制,确保生产者的音讯能达到队列

确认机制包含 publisher-confirmpublisher-return

当未送达到 交换机 咱们能够通过 publisher-confirm 返回的 acknack 来确认

交换机 未胜利路由到 队列,咱们能够通过 publisher-return 自定义的回调函数来确认,每个 RabbitTemplate 只能配置一个 ReturnCallback

  • 开启长久化性能,确保音讯未生产前在队列中不会失落

长久化性能分为 交换机长久化 队列长久化 音讯长久化,咱们都须要将 durable 设置为 true

  • 开启消费者确认机制最低为 auto 级别

消费者确认机制有三种类型:manual (手动确认)auto (主动确认)none (敞开 ack)

  • 失败重试机制

咱们手动设置 MessageResovererRepublishMessageRecoverer 形式,将投递失败的音讯转到异样队列中,交由人工解决


这一套组合拳答复下来,面试官还不得默默抵赖你有点货色?

当然这只是 RabbitMQ 的问题之一,咱们下篇持续其余几个问题的解决形式~

不要空谈,不要贪懒,和小菜一起做个 吹着牛 X 做架构 的程序猿吧~ 点个关注做个伴,让小菜不再孤独。咱们下文见!

明天的你多致力一点,今天的你就能少说一句求人的话!
我是小菜,一个和你一起变强的男人。 💋
微信公众号已开启,小菜良记,没关注的同学们记得关注哦!

正文完
 0