写在后面
又到了年底跳槽顶峰季,很多小伙伴进来面试时,不少面试官都会问到音讯队列的问题,不少小伙伴答复的不是很完满,有些小伙伴是心里晓得答案,嘴上却没有很好的表达出来,究其根本原因,还是对相干的知识点了解的不够透彻。明天,咱们就一起来探讨下这个话题。注:文章有点长,你说你能一鼓作气看完,我有点不信!!
文章已收录到:
https://github.com/sunshinelyz/technology-binghe
https://gitee.com/binghe001/technology-binghe
什么是音讯队列?
音讯队列(Message Queue)是在音讯的传输过程中保留音讯的容器,是利用间的通信形式。音讯发送后能够立刻返回,由音讯零碎保障音讯的牢靠传输,音讯发布者只管把音讯写到队列外面而不必思考谁须要音讯,而音讯的使用者也不须要晓得谁公布的音讯,只管到音讯队列外面取,这样生产和生产便能够做到拆散。
为什么要应用音讯队列?
长处:
- 异步解决:例如短信告诉、终端状态推送、App 推送、用户注册等
- 数据同步:业务数据推送同步
- 重试弥补:记账失败重试
- 零碎解耦:通信上下行、终端异样监控、分布式事件核心
- 流量消峰:秒杀场景下的下单解决
- 公布订阅:HSF 的服务状态变动告诉、分布式事件核心
- 高并发缓冲:日志服务、监控上报
应用音讯队列比拟外围的作用就是:解耦 、 异步 、 削峰。
毛病:
- 零碎可用性升高 零碎引入的内部依赖越多,越容易挂掉?如何保障音讯队列的高可用?
- 零碎复杂度进步 怎么保障音讯没有反复生产?怎么解决音讯失落的状况?怎么保障消息传递的程序性?
- 一致性问题 A 零碎解决完了间接返回胜利了,人都认为你这个申请就胜利了;然而问题是,要是 BCD 三个零碎那里,BD 两个零碎写库胜利了,后果 C 零碎写库失败了,咋整?你这数据就不统一了。
以下次要探讨的 RabbitMQ 和 Kafka 两种音讯队列。
如何保障音讯队列的高可用?
RabbitMQ 的高可用
RabbitMQ 的高可用是 基于主从(非分布式)做高可用性。RabbitMQ 有三种模式:单机模式(Demo 级别)、一般集群模式(无高可用性)、镜像集群模式(高可用性)。
- 一般集群模式
一般集群模式,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你 创立的 queue,只会放在一个 RabbitMQ 实例上,然而每个实例都同步 queue 的元数据(元数据能够认为是 queue 的一些配置信息,通过元数据,能够找到 queue 所在实例)。你生产的时候,实际上如果连贯到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过去。
这种形式的确很麻烦,也不怎么好,没做到所谓的分布式 ,就是个一般集群。因为这导致你要么消费者每次随机连贯一个实例而后拉取数据,要么固定连贯那个 queue 所在实例生产数据,前者有 数据拉取的开销 ,后者导致 单实例性能瓶颈。
而且如果那个放 queue 的实例宕机了,会导致接下来其余实例就无奈从那个实例拉取,如果你 开启了音讯长久化 ,让 RabbitMQ 落地存储音讯的话, 音讯不肯定会丢,得等这个实例复原了,而后才能够持续从这个 queue 拉取数据。
所以这个事儿就比拟难堪了,这就 没有什么所谓的高可用性 , 这计划次要是进步吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。
- 镜像集群模式
这种模式,才是所谓的 RabbitMQ 的高可用模式。跟一般集群模式不一样的是,在镜像集群模式下,你创立的 queue,无论元数据还是 queue 里的音讯都会 存在于多个实例上 ,就是说,每个 RabbitMQ 节点都有这个 queue 的一个 残缺镜像 ,蕴含 queue 的全副数据的意思。而后每次你写音讯到 queue 的时候,都会主动把 音讯同步 到多个实例的 queue 上。
那么 如何开启这个镜像集群模式 呢?其实很简略,RabbitMQ 有很好的治理控制台,就是在后盾新增一个策略,这个策略是 镜像集群模式的策略,指定的时候是能够要求数据同步到所有节点的,也能够要求同步到指定数量的节点,再次创立 queue 的时候,利用这个策略,就会主动将数据同步到其余的节点下来了。
这样的话,益处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还蕴含了这个 queue 的残缺数据,别的 consumer 都能够到其它节点下来生产数据。害处在于,第一,这个性能开销也太大了吧,音讯须要同步到所有机器上,导致网络带宽压力和耗费很重!第二,这么玩儿,不是分布式的,就 没有扩展性可言 了,如果某个 queue 负载很重,你加机器,新增的机器也蕴含了这个 queue 的所有数据,并 没有方法线性扩大 你的 queue。你想,如果这个 queue 的数据量很大,大到这个机器上的容量无奈包容了,此时该怎么办呢?
Kafka 的高可用
Kafka 一个最根本的架构意识:由多个 broker 组成,每个 broker 是一个节点;你创立一个 topic,这个 topic 能够划分为多个 partition,每个 partition 能够存在于不同的 broker 上,每个 partition 就放一部分数据。
这就是 人造的分布式音讯队列 ,就是说一个 topic 的数据,是 扩散放在多个机器上的,每个机器就放一部分数据。
实际上 RabbmitMQ 之类的,并不是分布式音讯队列,它就是传统的音讯队列,只不过提供了一些集群、HA(High Availability, 高可用性) 的机制而已,因为无论怎么玩儿,RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的残缺数据。
Kafka 0.8 以前,是没有 HA 机制的,就是任何一个 broker 宕机了,那个 broker 上的 partition 就废了,没法写也没法读,没有什么高可用性可言。
比如说,咱们假如创立了一个 topic,指定其 partition 数量是 3 个,别离在三台机器上。然而,如果第二台机器宕机了,会导致这个 topic 的 1/3 的数据就丢了,因而这个是做不到高可用的。
Kafka 0.8 当前,提供了 HA 机制,就是 replica(复制品)正本机制。每个 partition 的数据都会同步到其它机器上,造成本人的多个 replica 正本。所有 replica 会选举一个 leader 进去,那么生产和生产都跟这个 leader 打交道,而后其余 replica 就是 follower。写的时候,leader 会负责把数据同步到所有 follower 下来,读的时候就间接读 leader 上的数据即可。只能读写 leader?很简略,要是你能够随便读写每个 follower,那么就要 care 数据一致性的问题,零碎复杂度太高,很容易出问题。Kafka 会平均地将一个 partition 的所有 replica 散布在不同的机器上,这样才能够进步容错性。
这么搞,就有所谓的 高可用性 了,因为如果某个 broker 宕机了,没事儿,那个 broker 下面的 partition 在其余机器上都有正本的。如果这个宕机的 broker 下面有某个 partition 的 leader,那么此时会从 follower 中 从新选举 一个新的 leader 进去,大家持续读写那个新的 leader 即可。这就有所谓的高可用性了。
写数据 的时候,生产者就写 leader,而后 leader 将数据落地写本地磁盘,接着其余 follower 本人被动从 leader 来 pull 数据。一旦所有 follower 同步好数据了,就会发送 ack 给 leader,leader 收到所有 follower 的 ack 之后,就会返回写胜利的音讯给生产者。(当然,这只是其中一种模式,还能够适当调整这个行为)
生产 的时候,只会从 leader 去读,然而只有当一个音讯曾经被所有 follower 都同步胜利返回 ack 的时候,这个音讯才会被消费者读到。
如何保障音讯不反复生产(幂等性)?
首先,所有的音讯队列都会有这样反复生产的问题,因为这是不 MQ 来保障,而是咱们本人开发保障的,咱们应用 Kakfa 来探讨是如何实现的。
Kakfa 有个 offset 的概念,就是每个音讯写进去都会有一个 offset 值,代表生产的序号,而后 consumer 生产了数据之后,默认每隔一段时间会把本人生产过的音讯的 offset 值提交,示意我曾经生产过了,下次要是我重启啥的,就让我从以后提交的 offset 处来持续生产。
然而凡事总有意外,比方咱们之前生产常常遇到的,就是你有时候重启零碎,看你怎么重启了,如果碰到点焦急的,间接 kill 过程了,再重启。这会导致 consumer 有些音讯解决了,然而没来得及提交 offset,难堪了。重启之后,多数音讯会再次生产一次。
其实反复生产不可怕,可怕的是你没思考到反复生产之后,怎么保障幂等性。
举个例子吧。假如你有个零碎,生产一条音讯就往数据库里插入一条数据,要是你一个音讯反复两次,你不就插入了两条,这数据不就错了?然而你要是生产到第二次的时候,本人判断一下是否曾经生产过了,若是就间接扔了,这样不就保留了一条数据,从而保障了数据的正确性。一条数据反复呈现两次,数据库里就只有一条数据,这就保障了零碎的幂等性。幂等性,艰深点说,就一个数据,或者一个申请,给你反复来屡次,你得确保对应的数据是不会扭转的,不能出错。
所以第二个问题来了,怎么保障音讯队列生产的幂等性?
其实还是得联合业务来思考,我这里给几个思路:
- 比方你拿个数据要写库,你先依据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
- 比方你是写 Redis,那没问题了,反正每次都是 set,人造幂等性。
- 比方你不是下面两个场景,那做的略微简单一点,你须要让生产者发送每条数据的时候,外面加一个全局惟一的 id,相似订单 id 之类的货色,而后你这里生产到了之后,先依据这个 id 去比方 Redis 里查一下,之前生产过吗?如果没有生产过,你就解决,而后这个 id 写 Redis。如果生产过了,那你就别解决了,保障别反复解决雷同的音讯即可。
- 比方基于数据库的惟一键来保障反复数据不会反复插入多条。因为有惟一键束缚了,反复数据插入只会报错,不会导致数据库中呈现脏数据。
当然,如何保障 MQ 的生产是幂等性的,须要联合具体的业务来看。
如何保障音讯的牢靠传输(不失落)?
这个是必定的,MQ 的根本准则就是数据不能多一条,也不能少一条,不能多其实就是咱们后面反复生产的问题。不能少,就是数据不能丢,像计费,扣费的一些信息,是必定不能失落的。
数据的失落问题,可能呈现在生产者、MQ、消费者中,咱们从 RabbitMQ 和 Kafka 别离来剖析一下吧。
RabbitMQ 如何保障音讯的牢靠
生产者丢数据
生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。
此时能够抉择用 RabbitMQ 提供的事务性能,就是生产者 发送数据之前 开启 RabbitMQ 事务channel.txSelect
,而后发送音讯,如果音讯没有胜利被 RabbitMQ 接管到,那么生产者会收到异样报错,此时就能够回滚事务channel.txRollback
,而后重试发送音讯;如果收到了音讯,那么能够提交事务channel.txCommit
。
// 开启事务
channel.txSelect
try {// 这里发送音讯} catch (Exception e) {
channel.txRollback
// 这里再次重发这条音讯
}
// 提交事务
channel.txCommit
然而问题是,RabbitMQ 事务机制(同步)一搞,基本上 吞吐量会下来,因为太耗性能。
所以一般来说,如果你要确保说写 RabbitMQ 的音讯别丢,能够开启 confirm
模式,在生产者那里设置开启 confirm
模式之后,你每次写的音讯都会调配一个惟一的 id,而后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack
音讯,通知你说这个音讯 ok 了。如果 RabbitMQ 没能解决这个音讯,会回调你的一个 nack
接口,通知你这个音讯接管失败,你能够重试。而且你能够联合这个机制本人在内存里保护每个音讯 id 的状态,如果超过肯定工夫还没接管到这个音讯的回调,那么你能够重发。
事务机制和 confirm
机制最大的不同在于,事务机制是同步的 ,你提交一个事务之后会 阻塞 在那儿,然而 confirm
机制是 异步 的,你发送个音讯之后就能够发送下一个音讯,而后那个音讯 RabbitMQ 接管了之后会异步回调你的一个接口告诉你这个音讯接管到了。
所以个别在生产者这块 防止数据失落,都是用 confirm
机制的。
RabbitMQ 丢数据
就是 RabbitMQ 本人弄丢了数据,这个你必须 开启 RabbitMQ 的长久化 ,就是音讯写入之后会长久化到磁盘,哪怕是 RabbitMQ 本人挂了, 复原之后会主动读取之前存储的数据 ,个别数据不会丢。除非极其常见的是,RabbitMQ 还没长久化,本人就挂了, 可能导致大量数据失落,然而这个概率较小。
设置长久化有 两个步骤:
- 创立 queue 的时候将其设置为长久化 这样就能够保障 RabbitMQ 长久化 queue 的元数据,然而它是不会长久化 queue 里的数据的。
- 第二个是发送音讯的时候将音讯的
deliveryMode
设置为 2 就是将音讯设置为长久化的,此时 RabbitMQ 就会将音讯长久化到磁盘下来。
必须要同时设置这两个长久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启复原 queue,复原这个 queue 里的数据。
留神,哪怕是你给 RabbitMQ 开启了长久化机制,也有一种可能,就是这个音讯写到了 RabbitMQ 中,然而还没来得及长久化到磁盘上,后果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据失落。
所以,长久化能够跟生产者那边的 confirm
机制配合起来,只有音讯被长久化到磁盘之后,才会告诉生产者 ack
了,所以哪怕是在长久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack
,你也是能够本人重发的。
消费者丢数据
RabbitMQ 如果失落了数据,次要是因为你生产的时候,刚生产到,还没解决,后果过程挂了,比方重启了,那么就难堪了,RabbitMQ 认为你都生产了,这数据就丢了。
这个时候得用 RabbitMQ 提供的 ack
机制,简略来说,就是你必须敞开 RabbitMQ 的主动 ack
,能够通过一个 api 来调用就行,而后每次你本人代码里确保解决完的时候,再在程序里 ack
一把。这样的话,如果你还没解决完,不就没有 ack
了?那 RabbitMQ 就认为你还没解决完,这个时候 RabbitMQ 会把这个生产调配给别的 consumer 去解决,音讯是不会丢的。
Kakfa 如何保障音讯的牢靠
- 消费者丢数据
惟一可能导致消费者弄丢数据的状况,就是说,你生产到了这个音讯,而后消费者那边 主动提交了 offset,让 Kafka 认为你曾经生产好了这个音讯,但其实你才刚筹备解决这个音讯,你还没解决,你本人就挂了,此时这条音讯就丢咯。
这不是跟 RabbitMQ 差不多吗,大家都晓得 Kafka 会主动提交 offset,那么只有 敞开主动提交 offset,在解决完之后本人手动提交 offset,就能够保证数据不会丢。然而此时的确还是 可能会有反复生产,比方你刚解决完,还没提交 offset,后果本人挂了,此时必定会反复生产一次,本人保障幂等性就好了。
生产环境碰到的一个问题,就是说咱们的 Kafka 消费者生产到了数据之后是写到一个内存的 queue 里先缓冲一下,后果有的时候,你刚把音讯写入内存 queue,而后消费者会主动提交 offset。而后此时咱们重启了零碎,就会导致内存 queue 里还没来得及解决的数据就失落了。
-
Kafka 丢数据
这块比拟常见的一个场景,就是 Kafka 某个 broker 宕机,而后从新选举 partition 的 leader。大家想想,要是此时其余的 follower 刚好还有些数据没有同步,后果此时 leader 挂了,而后选举某个 follower 成 leader 之后,不就少了一些数据?这就丢了一些数据啊。
生产环境也遇到过,咱们也是,之前 Kafka 的 leader 机器宕机了,将 follower 切换为 leader 之后,就会发现说这个数据就丢了。
所以此时个别是要求起码设置如下 4 个参数:
- 给 topic 设置
replication.factor
参数:这个值必须大于 1,要求每个 partition 必须有至多 2 个正本。 - 在 Kafka 服务端设置
min.insync.replicas
参数:这个值必须大于 1,这个是要求一个 leader 至多感知到有至多一个 follower 还跟本人保持联系,没落伍,这样能力确保 leader 挂了还有一个 follower 吧。 - 在 producer 端设置
acks=all
:这个是要求每条数据,必须是 写入所有 replica 之后,能力认为是写胜利了。 - 在 producer 端设置
retries=MAX
(很大很大很大的一个值,有限次重试的意思):这个是 要求一旦写入失败,就有限重试,卡在这里了。
咱们生产环境就是依照上述要求配置的,这样配置之后,至多在 Kafka broker 端就能够保障在 leader 所在 broker 产生故障,进行 leader 切换时,数据不会失落。
- 给 topic 设置
- 生产者丢数据
如果依照上述的思路设置了
acks=all
,肯定不会丢,要求是,你的 leader 接管到音讯,所有的 follower 都同步到了音讯之后,才认为本次写胜利了。如果没满足这个条件,生产者会主动一直的重试,重试有限次。
如何保障音讯的程序性?
我举个例子,咱们以前做过一个 mysql binlog
同步的零碎,压力还是十分大的,日同步数据要达到上亿,就是说数据从一个 mysql 库一成不变地同步到另一个 mysql 库外面去(mysql -> mysql)。常见的一点在于说比方大数据 team,就须要同步一个 mysql 库过去,对公司的业务零碎的数据做各种简单的操作。
你在 mysql 里增删改一条数据,对应进去了增删改 3 条 binlog
日志,接着这三条 binlog
发送到 MQ 外面,再生产进去顺次执行,起码得保障人家是依照程序来的吧?不然原本是:减少、批改、删除;你楞是换了程序给执行成删除、批改、减少,不全错了么。
原本这个数据同步过去,应该最初这个数据被删除了;后果你搞错了这个程序,最初这个数据保留下来了,数据同步就出错了。
先看看程序会错乱的俩场景:
- RabbitMQ:一个 queue,多个 consumer。比方,生产者向 RabbitMQ 里发送了三条数据,程序顺次是 data1/data2/data3,压入的是 RabbitMQ 的一个内存队列。有三个消费者别离从 MQ 中生产这三条数据中的一条,后果消费者 2 先执行完操作,把 data2 存入数据库,而后是 data1/data3。这不显著乱了。
- Kafka:比如说咱们建了一个 topic,有三个 partition。生产者在写的时候,其实能够指定一个 key,比如说咱们指定了某个订单 id 作为 key,那么这个订单相干的数据,肯定会被散发到同一个 partition 中去,而且这个 partition 中的数据肯定是有程序的。消费者从 partition 中取出来数据的时候,也肯定是有程序的。到这里,程序还是 ok 的,没有错乱。接着,咱们在消费者里可能会搞 多个线程来并发解决音讯。因为如果消费者是单线程生产解决,而解决比拟耗时的话,比方解决一条音讯耗时几十 ms,那么 1 秒钟只能解决几十条音讯,这吞吐量太低了。而多个线程并发跑的话,程序可能就乱掉了。
RabbitMQ 解决方案
拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,的确是麻烦点;或者就一个 queue 然而对应一个 consumer,而后这个 consumer 外部用内存队列做排队,而后分发给底层不同的 worker 来解决。
Kafka 解决方案
- 一个 topic,一个 partition,一个 consumer,外部单线程生产,单线程吞吐量太低,个别不会用这个。
- 写 N 个内存 queue,具备雷同 key 的数据都到同一个内存 queue;而后对于 N 个线程,每个线程别离生产一个内存 queue 即可,这样就能保障程序性。
如何解决音讯推积?
大量音讯在 mq 里积压了几个小时了还没解决
一个消费者一秒是 1000 条,一秒 3 个消费者是 3000 条,一分钟就是 18 万条。所以如果你积压了几百万到上千万的数据,即便消费者复原了,也须要大略 1 小时的工夫能力恢复过来。
个别这个时候,只能长期紧急扩容了,具体操作步骤和思路如下:
- 先修复 consumer 的问题,确保其复原生产速度,而后将现有 consumer 都停掉。
- 新建一个 topic,partition 是原来的 10 倍,长期建设好原先 10 倍的 queue 数量。
- 而后写一个长期的散发数据的 consumer 程序,这个程序部署下来生产积压的数据,生产之后不做耗时的解决,间接平均轮询写入长期建设好的 10 倍数量的 queue。
- 接着长期征用 10 倍的机器来部署 consumer,每一批 consumer 生产一个长期 queue 的数据。这种做法相当于是长期将 queue 资源和 consumer 资源扩充 10 倍,以失常的 10 倍速度来生产数据。
- 等疾速生产完积压数据之后,得复原原先部署的架构 , 从新 用原先的 consumer 机器来生产音讯。
mq 中的音讯过期生效了
假如你用的是 RabbitMQ,RabbtiMQ 是能够设置过期工夫的,也就是 TTL。如果音讯在 queue 中积压超过肯定的工夫就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是 大量的数据会间接搞丢。
这个状况下,就不是说要减少 consumer 生产积压的音讯,因为实际上没啥积压,而是丢了大量的音讯。咱们能够采取一个计划,就是 批量重导,这个咱们之火线上也有相似的场景干过。就是大量积压的时候,咱们过后就间接抛弃数据了,而后等过了高峰期当前,比方大家一起喝咖啡熬夜到早晨 12 点当前,用户都睡觉了。这个时候咱们就开始写程序,将失落的那批数据,写个长期程序,一点一点的查出来,而后从新灌入 mq 外面去,把白天丢的数据给他补回来。也只能是这样了。
假如 1 万个订单积压在 mq 外面,没有解决,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。
mq 都快写满了
如果音讯积压在 mq 里,你很长时间都没有解决掉,此时导致 mq 都快写满了,咋办?这个还有别的方法吗?没有,谁让你第一个计划执行的太慢了,你长期写程序,接入数据来生产,生产一个抛弃一个,都不要了,疾速生产掉所有的音讯。而后走第二个计划,到了早晨再补数据吧。
参考资料:
- Kafa 深度解析
- RabbitMQ 源码解析
好了,明天就到这儿吧,我是冰河,大家有啥问题能够在下方留言,也能够加我微信:sun_shine_lyz,我拉你进群,一起交换技术,一起进阶,一起牛逼~~