关于java:RabbitMQ和Kafka如何处理消息丢失问题

31次阅读

共计 2856 个字符,预计需要花费 8 分钟才能阅读完成。

这篇文章次要围绕下列问题进行探讨:

  • 如何解决音讯的失落问题

数据的失落问题,可能呈现在生产者、MQ、消费者中,咱们从 RabbitMQ 和 Kafka 别离来剖析一下吧。

RabbitMQ

生产者弄丢了数据

生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。

此时能够抉择用 RabbitMQ 提供的事务性能,就是生产者发送数据之前开启 RabbitMQ 事务 channel.txSelect,而后发送音讯,如果音讯没有胜利被 RabbitMQ 接管到,那么生产者会收到异样报错,此时就能够回滚事务 channel.txRollback,而后重试发送音讯;如果收到了音讯,那么能够提交事务 channel.txCommit。

    // 开启事务
channel.txSelect
try {// 这里发送音讯} catch (Exception e) {
    channel.txRollback

    // 这里再次重发这条音讯
}

// 提交事务
channel.txCommit

然而问题是,RabbitMQ 事务机制(同步)一搞,基本上吞吐量会下来,因为太耗性能。

所以一般来说,如果你要确保说写 RabbitMQ 的音讯别丢,能够开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,你每次写的音讯都会调配一个惟一的 id,而后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 音讯,通知你说这个音讯 ok 了。如果 RabbitMQ 没能解决这个音讯,会回调你的一个 nack 接口,通知你这个音讯接管失败,你能够重试。而且你能够联合这个机制本人在内存里保护每个音讯 id 的状态,如果超过肯定工夫还没接管到这个音讯的回调,那么你能够重发。

事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,然而 confirm 机制是异步的,你发送个音讯之后就能够发送下一个音讯,而后那个音讯 RabbitMQ 接管了之后会异步回调你的一个接口告诉你这个音讯接管到了。

所以个别在生产者这块防止数据失落,都是用 confirm 机制的。

RabbitMQ 弄丢了数据

就是 RabbitMQ 本人弄丢了数据,这个你必须开启 RabbitMQ 的长久化,就是音讯写入之后会长久化到磁盘,哪怕是 RabbitMQ 本人挂了,复原之后会主动读取之前存储的数据,个别数据不会丢。除非极其常见的是,RabbitMQ 还没长久化,本人就挂了,可能导致大量数据失落,然而这个概率较小。

设置长久化有两个步骤:

创立 queue 的时候将其设置为长久化
这样就能够保障 RabbitMQ 长久化 queue 的元数据,然而它是不会长久化 queue 里的数据的。
第二个是发送音讯的时候将音讯的 deliveryMode 设置为 2
就是将音讯设置为长久化的,此时 RabbitMQ 就会将音讯长久化到磁盘下来。
必须要同时设置这两个长久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启复原 queue,复原这个 queue 里的数据。

留神,哪怕是你给 RabbitMQ 开启了长久化机制,也有一种可能,就是这个音讯写到了 RabbitMQ 中,然而还没来得及长久化到磁盘上,后果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据失落。

所以,长久化能够跟生产者那边的 confirm 机制配合起来,只有音讯被长久化到磁盘之后,才会告诉生产者 ack 了,所以哪怕是在长久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack,你也是能够本人重发的。

生产端弄丢了数据

RabbitMQ 如果失落了数据,次要是因为你生产的时候,刚生产到,还没解决,后果过程挂了,比方重启了,那么就难堪了,RabbitMQ 认为你都生产了,这数据就丢了。

这个时候得用 RabbitMQ 提供的 ack 机制,简略来说,就是你必须敞开 RabbitMQ 的主动 ack,能够通过一个 api 来调用就行,而后每次你本人代码里确保解决完的时候,再在程序里 ack 一把。这样的话,如果你还没解决完,不就没有 ack 了?那 RabbitMQ 就认为你还没解决完,这个时候 RabbitMQ 会把这个生产调配给别的 consumer 去解决,音讯是不会丢的。

总结一下:

Kafka

生产端弄丢了数据

惟一可能导致消费者弄丢数据的状况,就是说,你生产到了这个音讯,而后消费者那边主动提交了 offset,让 Kafka 认为你曾经生产好了这个音讯,但其实你才刚筹备解决这个音讯,你还没解决,你本人就挂了,此时这条音讯就丢咯。

这不是跟 RabbitMQ 差不多吗,大家都晓得 Kafka 会主动提交 offset,那么只有敞开主动提交 offset,在解决完之后本人手动提交 offset,就能够保证数据不会丢。然而此时的确还是可能会有反复生产,比方你刚解决完,还没提交 offset,后果本人挂了,此时必定会反复生产一次,本人保障幂等性就好了。

生产环境碰到的一个问题,就是说咱们的 Kafka 消费者生产到了数据之后是写到一个内存的 queue 里先缓冲一下,后果有的时候,你刚把音讯写入内存 queue,而后消费者会主动提交 offset。而后此时咱们重启了零碎,就会导致内存 queue 里还没来得及解决的数据就失落了。

Kafka 弄丢了数据

这块比拟常见的一个场景,就是 Kafka 某个 broker 宕机,而后从新选举 partition 的 leader。大家想想,要是此时其余的 follower 刚好还有些数据没有同步,后果此时 leader 挂了,而后选举某个 follower 成 leader 之后,不就少了一些数据?这就丢了一些数据啊。

生产环境也遇到过,咱们也是,之前 Kafka 的 leader 机器宕机了,将 follower 切换为 leader 之后,就会发现说这个数据就丢了。

所以此时个别是要求起码设置如下 4 个参数:

  • 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至多 2 个正本。
  • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至多感知到有至多一个 follower 还跟本人保持联系,没落伍,这样能力确保 leader 挂了还有一个 follower 吧。
  • 在 producer 端设置 acks=all:这个是要求每条数据,必须是写入所有 replica 之后,能力认为是写胜利了。
  • 在 producer 端设置 retries=MAX(很大很大很大的一个值,有限次重试的意思):这个是要求一旦写入失败,就有限重试,卡在这里了。

咱们生产环境就是依照上述要求配置的,这样配置之后,至多在 Kafka broker 端就能够保障在 leader 所在 broker 产生故障,进行 leader 切换时,数据不会失落。

生产者会不会弄丢数据?

如果依照上述的思路设置了 acks=all,肯定不会丢,要求是,你的 leader 接管到音讯,所有的 follower 都同步到了音讯之后,才认为本次写胜利了。如果没满足这个条件,生产者会主动一直的重试,重试有限次。

正文完
 0