明天咱们通过一篇文章来认识一下常见音讯队列 RabbitMQ、RocketMQ、Kafka。
RabbitMQ
RabbitMQ 各组件的性能
- Broker:一个 RabbitMQ 实例就是一个 Broker
- Virtual Host:虚拟主机。相当于 MySQL 的 DataBase,一个 Broker 上能够存在多个 vhost,vhost 之间互相隔离。每个 vhost 都领有本人的队列、交换机、绑定和权限机制。vhost 必须在连贯时指定,默认的 vhost 是 /。
- Exchange:交换机,用来接管生产者发送的音讯并将这些音讯路由给服务器中的队列。
- Queue:音讯队列,用来保留音讯直到发送给消费者。它是音讯的容器。一个音讯可投入一个或多个队列。
- Banding:绑定关系,用于 音讯队列和交换机之间的关联。通过路由键(Routing Key)将交换机和音讯队列关联起来。
- Channel:管道,一条双向数据流通道。不论是公布音讯、订阅队列还是接管音讯,这些动作都是通过管道实现。因为对于操作系统来说,建设和销毁 TCP 都是十分低廉的开销,所以引入了管道的概念,以复用一条 TCP 连贯。
- Connection:生产者 / 消费者 与 broker 之间的 TCP 连贯。
- Publisher:音讯的生产者。
- Consumer:音讯的消费者。
- Message:音讯,它是由音讯头和音讯体组成。音讯头则包含Routing-Key、Priority(优先级)等。
RabbitMQ 的多种交换机类型
Exchange 散发音讯给 Queue 时,Exchange 的类型对应不同的散发策略,有 3 种类型的 Exchange:Direct、Fanout、Topic。
- Direct:音讯中的 Routing Key 如果和 Binding 中的 Routing Key 完全一致,Exchange 就会将音讯散发到对应的队列中。
- Fanout:每个发到 Fanout 类型交换机的音讯都会散发到所有绑定的队列下来。Fanout 交换机没有 Routing Key。它在三种类型的交换机中转发音讯是最快的。
- Topic:Topic 交换机通过模式匹配调配音讯,将 Routing Key 和某个模式进行匹配。它只能辨认两个 通配符:”#” 和 ”“。# 匹配 0 个或多个单词, 匹配 1 个单词。
TTL
TTL(Time To Live):生存工夫。RabbitMQ 反对音讯的过期工夫,一共 2 种。
- 在音讯发送时进行指定。通过配置音讯体的 Properties,能够指定以后音讯的过期工夫。
- 在创立 Exchange 时指定。从进入音讯队列开始计算,只有超过了队列的超时工夫配置,那么音讯会主动革除。
生产者的音讯确认机制
Confirm 机制:
- 音讯的确认,是指生产者投递音讯后,如果 Broker 收到音讯,则会给咱们生产者一个应答。
- 生产者进行承受应答,用来确认这条音讯是否失常的发送到了 Broker,这种形式也是 音讯的可靠性投递的外围保障!
如何实现 Confirm 确认音讯?
- 在 channel 上开启确认模式:channel.confirmSelect()
- 在 channel 上开启监听:addConfirmListener,监听胜利和失败的处理结果,依据具体的后果对音讯进行从新发送或记录日志解决等后续操作。
Return 音讯机制:
Return Listener用于解决一些不可路由的音讯。
咱们的音讯生产者,通过指定一个 Exchange 和 Routing,把音讯送达到某一个队列中去,而后咱们的消费者监听队列进行音讯的生产解决操作。
然而在某些状况下,如果咱们在发送音讯的时候,以后的 exchange 不存在或者指定的路由 key 路由不到,这个时候咱们须要监听这种不可达音讯,就须要应用到 Returrn Listener。
根底 API 中有个要害的配置项 Mandatory:如果为 true,监听器会收到路由不可达的音讯,而后进行解决。如果为 false,broker 端会主动删除该音讯。
同样,通过监听的形式,chennel.addReturnListener(ReturnListener rl) 传入曾经重写过 handleReturn 办法的 ReturnListener。
生产端 ACK 与 NACK
生产端进行生产的时候,如果因为业务异样能够进行日志的记录,而后进行弥补。然而对于服务器宕机等重大问题,咱们须要 手动 ACK保障生产端生产胜利。
// deliveryTag:音讯在 mq 中的惟一标识 // multiple:是否批量(和 qos 设置相似的参数)// requeue:是否须要重回队列。或者抛弃或者重回队首再次生产。public void basicNack(long deliveryTag, boolean multiple, boolean requeue)
如上代码,音讯在 生产端重回队列 是为了对没有胜利解决音讯,把音讯从新返回到 Broker。一般来说,理论利用中都会敞开重回队列(防止进入死循环),也就是设置为 false。
死信队列 DLX
死信队列(DLX Dead-Letter-Exchange):当音讯在一个队列中变成死信之后,它会被从新推送到另一个队列,这个队列就是死信队列。
DLX 也是一个失常的 Exchange,和个别的 Exchange 没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
当这个队列中有死信时,RabbitMQ 就会主动的将这个音讯从新公布到设置的 Exchange 下来,进而被路由到另一个队列。
RocketMQ
阿里巴巴双十一官网指定音讯产品,撑持阿里巴巴团体所有的音讯服务,历经十余年高可用与高牢靠的严苛考验,是阿里巴巴交易链路的外围产品。
Rocket:火箭的意思。
RocketMQ 的外围概念
他有以下外围概念:Broker、Topic、Tag、MessageQueue、NameServer、Group、Offset、Producer 以及 Consumer。
上面来具体介绍。
- Broker:音讯直达角色,负责 存储音讯 ,转发音讯。
Broker 是具体提供业务的服务器,单个 Broker 节点与所有的 NameServer 节点放弃长连贯及心跳,并会定时将 Topic 信息注册到 NameServer,顺带一提底层的通信和连贯都是 基于 Netty 实现 的。
Broker负责音讯存储,以 Topic 为纬度反对轻量级的队列,单机能够撑持上万队列规模,反对音讯推拉模型。官网上有数据显示:具备 上亿级音讯沉积能力 ,同时可 严格保障音讯的有序性。 - Topic:主题!它是音讯的第一级类型。
比方一个电商零碎能够分为:交易音讯、物流音讯等,一条音讯必须有一个 Topic。Topic与生产者和消费者的关系十分涣散,一个 Topic 能够有 0 个、1 个、多个生产者向其发送音讯,一个生产者也能够同时向不同的 Topic 发送音讯。一个 Topic 也能够被 0 个、1 个、多个消费者订阅。 - Tag:标签!能够看作子主题,它是音讯的第二级类型,用于为用户提供额定的灵活性。
应用标签,同一业务模块不同目标的音讯就能够用雷同 Topic 而不同的 Tag 来标识。比方交易音讯又能够分为:交易创立音讯、交易实现音讯等,一条音讯能够没有Tag。标签有助于放弃您的代码洁净和连贯,并且还能够为 RabbitMQ 提供的查问零碎提供帮忙。 - MessageQueue:一个 Topic 下能够设置多个音讯队列,发送音讯时执行该音讯的 Topic,RocketMQ 会轮询该 Topic 下的所有队列将音讯收回去。音讯的物理治理单位。一个 Topic 下能够有多个 Queue,Queue 的引入使得音讯的存储能够分布式集群化,具备了程度扩大能力。
- NameServer:相似 Kafka 中的 ZooKeeper,但 NameServer 集群之间是 没有通信 的,绝对 ZK 来说更加 轻量 。
它次要负责对于源数据的治理,包含了对于 Topic 和路由信息的治理。每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送音讯前会依据 Topic 去 NameServer获取对应 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。 - Producer:生产者,反对三种形式发送音讯:同步、异步和单向 。
单向发送:音讯收回去后,能够持续发送下一条音讯或执行业务代码,不期待服务器回应,且 没有回调函数 。
异步发送:音讯收回去后,能够持续发送下一条音讯或执行业务代码,不期待服务器回应,有回调函数 。
同步发送:音讯收回去后,期待服务器响应胜利或失败,能力持续前面的操作。 - Consumer:消费者,反对 PUSH 和 PULL 两种生产模式,反对 集群生产 和播送生产 集群生产:该模式下一个消费者集群独特生产一个主题的多个队列,一个队列只会被一个消费者生产,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者持续生产。
播送生产:会发给消费者组中的每一个消费者进行生产。相当于 RabbitMQ 的公布订阅模式。 - Group:分组,一个组能够订阅多个 Topic。
分为 ProducerGroup,ConsumerGroup,代表某一类的生产者和消费者,一般来说同一个服务能够作为 Group,同一个 Group 一般来说发送和生产的音讯都是一样的 - Offset:在 RocketMQ 中,所有音讯队列都是长久化,长度有限的数据结构,所谓长度有限是指队列中的每个存储单元都是定长,拜访其中的存储单元应用 Offset 来拜访,Offset 为 Java Long 类型,64 位,实践上在 100 年内不会溢出,所以认为是长度有限。也能够认为 Message Queue 是一个长度有限的数组,Offset就是下标。
延时音讯
开源版的 RocketMQ 不反对任意工夫精度,仅反对特定的 level,例如定时 5s,10s,1min 等。其中,level= 0 级示意不延时,level= 1 示意 1 级延时,level= 2 示意 2 级延时,以此类推。
延时等级如下:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
程序音讯
音讯有序指的是能够依照音讯的发送程序来生产(FIFO)。RocketMQ 能够严格的保障音讯有序,能够分为 分区有序 或者 全局有序。
事务音讯
音讯队列 MQ 提供相似 X /Open XA 的分布式事务性能,通过音讯队列 MQ 事务音讯能达到分布式事务的最终统一。上图阐明了事务音讯的大抵流程:失常事务音讯的发送和提交、事务音讯的弥补流程。
事务音讯发送及提交:
- 发送 half 音讯
- 服务端响应音讯写入后果
- 依据发送后果执行本地事务(如果写入失败,此时 half 音讯对业务不可见,本地逻辑不执行);
- 依据本地事务状态执行 Commit 或 Rollback(Commit 操作生成音讯索引,音讯对消费者可见)。
事务音讯的弥补流程:
- 对没有 Commit/Rollback 的事务音讯(pending 状态的音讯),从服务端发动一次“回查”;
- Producer 收到回查音讯,查看回查音讯对应的本地事务的状态。
- 依据本地事务状态,从新 Commit 或 RollBack
其中,弥补阶段用于解决音讯 Commit 或 Rollback 产生超时或者失败的状况。
事务音讯状态:
事务音讯共有三种状态:提交状态、回滚状态、中间状态:
- TransactionStatus.CommitTransaction:提交事务,它容许消费者生产此音讯。
- TransactionStatus.RollbackTransaction:回滚事务,它代表该音讯将被删除,不容许被生产。
- TransactionStatus.Unkonwn:中间状态,它代表须要查看音讯队列来确定音讯状态。
RocketMQ 的高可用机制
RocketMQ 是天生反对分布式的,能够配置主从以及程度扩大。
Master 角色的 Broker 反对读和写,Slave 角色的 Broker 仅反对读,也就是 Producer 只能和 Master 角色的 Broker 连贯写入音讯;Consumer 能够连贯 Master 角色的 Broker,也能够连贯 Slave 角色的 Broker 来读取音讯。
音讯生产的高可用(主从):
在 Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave 读,当 Master 不可用或者忙碌的时候,Consumer 会被主动切换到从 Slave 读。有了主动切换 Consumer 这种机制,当一个 Master 角色的机器呈现故障后,Consumer 依然能够从 Slave 读取音讯,不影响 Consumer 程序。
在 4.5 版本之前如果 Master 节点挂了,Slave 节点是不能主动切换成 master 节点的这个时候须要手动进行 Slave 角色的 Broker,更改配置文件,用新的配置文件启动 Broker。然而在 4.5 之后,RocketMQ 引入了 Dledger 同步机制,这个时候如果 Master 节点挂了,Dledger 会通过 Raft 协定选举出新的 master 节点,不须要手动批改配置。
音讯发送高可用(配置多个主节点):
在创立 Topic 的时候,把 Topic 的多个 Message Queue 创立在多个 Broker 组上(雷同 Broker 名称,不同 brokerId 的机器组成一个 Broker 组),这样当一个 Broker 组的 Master 不可用后,其余组的 Master 依然可用,Producer 依然能够发送音讯。
主从复制:
如果一个 Broker 组有 Master 和 Slave,音讯须要从 Master 复制到 Slave 上,有同步和异步两种复制形式。
- 同步复制:同步复制形式是等 Master 和 Slave 均写胜利后才反馈给客户端写胜利状态。如果 Master 出故障,Slave 上有全副的备份数据,容易复原同步复制会增大数据写入提早,升高零碎吞吐量。
- 异步复制:异步复制形式是只有 Master 写胜利 即可反馈给客户端写胜利状态。在异步复制形式下,零碎领有较低的提早和较高的吞吐量,然而如果 Master 出了故障,有些数据因为没有被写 入 Slave,有可能会失落
通常状况下,应该把 Master 和 Slave 配置成同步刷盘形式,主从之间配置成异步的复制形式,这样即便有一台机器出故障,依然能保证数据不丢,是个不错的抉择。
负载平衡
Producer 负载平衡:
Producer 端,每个实例在发消息的时候,默认会 轮询 所有的 Message Queue 发送,以达到让音讯均匀落在不同的 Queue 上。而因为 Queue 能够散落在不同的 Broker,所以音讯就发送到不同的 Broker 下,如下图:
Consumer 负载平衡:
如果 Consumer 实例的数量比 Message Queue 的总数量还多的话,多进去的 Consumer 实例将无奈分到 Queue,也就无奈生产到音讯,也就无奈起到摊派负载的作用了。所以须要管制让 Queue 的总数量大于等于 Consumer 的数量。
- 消费者的集群模式:启动多个消费者就能够保障消费者的负载平衡(均摊队列)
- 默认应用的是均摊队列:会依照 Queue 的数量和实例的数量平均分配 Queue 给每个实例,这样每个消费者能够均摊生产的队列,如下图所示 6 个队列和三个生产者。
- 另外一种均匀的算法 环状轮流分 Queue的模式,每个消费者,均摊不同主节点的一个音讯队列,如下图所示:
对于播送模式并不是负载平衡的,要求一条音讯须要投递到一个生产组上面所有的消费者实例,所以也就没有音讯被摊派生产的说法。
死信队列
当一条音讯生产失败,RocketMQ 就会主动进行音讯重试。而如果音讯超过最大重试次数,RocketMQ 就会认为这个音讯有问题。然而此时,RocketMQ 不会立即将这个有问题的音讯抛弃,而会将其发送到这个消费者组对应的一种非凡队列:死信队列。死信队列的名称是 %DLQ%+ConsumGroup。
死信队列具备以下个性:
- 一个死信队列对应一个 Group ID,而不是对应单个消费者实例。
- 如果一个 Group ID 未产生死信音讯,音讯队列 RocketMQ 不会为其创立相应的死信队列。
- 一个死信队列蕴含了对应 Group ID 产生的所有死信音讯,不管该音讯属于哪个 Topic。
Kafka
Kafka 是一个分布式、反对分区的、多正本的,基于 ZooKeeper协调的分布式音讯零碎。
新版 Kafka 曾经不再须要 ZooKeeper。
它最大的个性就是能够实时的解决大量数据以满足各种需要场景:比方基于 Hadoop 的批处理零碎、低提早的实时零碎、Storm/Spark 流式解决引擎,Web/Nginx 日志、拜访日志,音讯服务等等,用Scala 语言编写。属于 Apache 基金会的顶级开源我的项目。
先看一下 Kafka 的架构图:
Kafka 的外围概念
在 Kafka 中有几个外围概念:
- Broker:消息中间件解决节点,一个 Kafka 节点就是一个 Broker,一个或者多个 Broker 能够组成一个 Kafka 集群
- Topic:Kafka 依据 topic 对音讯进行归类,公布到 Kafka 集群的每条音讯都须要指定一个 topic
- Producer:音讯生产者,向 Broker 发送音讯的客户端
- Consumer:音讯消费者,从 Broker 读取音讯的客户端
- ConsumerGroup:每个 Consumer 属于一个特定的 ConsumerGroup,一条音讯能够被多个不同的 ConsumerGroup 生产,然而一个 ConsumerGroup 中只能有一个 Consumer 可能生产该音讯
- Partition:物理上的概念,一个 topic 能够分为多个 partition,每个 partition 内部消息是有序的
- Leader:每个 Partition 有多个正本,其中有且仅有一个作为 Leader,Leader 是负责数据读写的 Partition。
- Follower:Follower 追随 Leader,所有写申请都通过 Leader 路由,数据变更会播送给所有 Follower,Follower 与 Leader 保持数据同步。如果 Leader 生效,则从 Follower 中选举出一个新的 Leader。当 Follower 与 Leader 挂掉、卡住或者同步太慢,Leader 会把这个 Follower 从 ISR 列表 中删除,从新创立一个 Follower。
- Offset:偏移量。Kafka 的存储文件都是依照 offset.kafka 来命名,用 Offset 做名字的益处是不便查找。例如你想找位于 2049 的地位,只有找到 2048.kafka 的文件即可。
能够这么来了解 Topic,Partition 和 Broker:
一个 Topic,代表逻辑上的一个业务数据集,比方订单相干操作音讯放入订单 Topic,用户相干操作音讯放入用户 Topic,对于大型网站来说,后端数据都是海量的,订单音讯很可能是十分巨量的,比方有几百个 G 甚至达到 TB 级别,如果把这么多数据都放在一台机器上可定会有容量限度问题,那么就能够在 Topic 外部划分多个 Partition 来分片存储数据,不同的 Partition 能够位于不同的机器上,相当于 分布式存储。每台机器上都运行一个 Kafka 的过程 Broker。
Kafka 外围总控制器 Controller
在 Kafka 集群中会有一个或者多个 Broker,其中有一个 Broker 会被选举为控制器(Kafka Controller),能够了解为 Broker-Leader,它负责管理整个 集群中所有分区和正本的状态。
Partition-Leader
Controller 选举机制
在 Kafka 集群启动的时候,选举的过程是集群中每个 Broker 都会尝试在 ZooKeeper 上创立一个 /controller 长期节点,ZooKeeper 会保障有且仅有一个 Broker 能创立胜利,这个 Broker 就会成为集群的总控器 Controller。
当这个 Controller 角色的 Broker 宕机了,此时 ZooKeeper 长期节点会隐没,集群里其余 Broker 会始终监听这个长期节 点,发现长期节点隐没了,就竞争再次创立长期节点,就是咱们下面说的选举机制,ZooKeeper 又会保障有一个 Broker 成为新的 Controller。具备控制器身份的 Broker 须要比其余一般的 Broker 多一份职责,具体细节如下:
- 监听 Broker 相干的变动。为 ZooKeeper 中的 /brokers/ids/ 节点增加 BrokerChangeListener,用来解决 Broker 增减的变动。
- 监听 Topic 相干的变动。为 ZooKeeper 中的 /brokers/topics 节点增加 TopicChangeListener,用来解决 Topic 增减的变动;为 ZooKeeper 中的 /admin/delete_topics 节点增加 TopicDeletionListener,用来解决删除 Topic 的动作。
- 从 ZooKeeper 中读取获取以后所有与 Topic、Partition 以及 Broker 无关的信息并进行相应的治理。对于所有 Topic 所对应的 ZooKeeper 中的 /brokers/topics/ 节点增加 PartitionModificationsListener,用来监听 Topic 中的分区调配变动。
- 更新集群的元数据信息,同步到其余一般的 Broker 节点中
Partition 正本选举 Leader 机制
Controller 感知到分区 Leader 所在的 Broker 挂了,Controller 会从 ISR 列表(参数 unclean.leader.election.enable=false 的前提下)里挑第一个 Broker 作为 Leader(第一个 Broker 最先放进 ISR 列表,可能是同步数据最多的正本),如果参数 unclean.leader.election.enable 为 true,代表在 ISR 列表里所有正本都挂了的时候能够在 ISR 列表以外的正本当选 Leader,这种设置,能够进步可用性,然而选出的新 Leader 有可能数据少很多。正本进入 ISR 列表有两个条件:
- 正本节点不能产生分区,必须能与 ZooKeeper 放弃会话以及跟 Leader 正本网络连通
- 副本能复制 Leader 上的所有写操作,并且不能落后太多。(与 Leader 正本同步滞后的正本,是由 replica.lag.time.max.ms 配置决定的,超过这个工夫都没有跟 Leader 同步过的一次的正本会被移出 ISR 列表)
消费者生产音讯的 Offset 记录机制
每个 Consumer 会定期将本人生产分区的 Offset 提交给 Kafka 外部 Topic:consumer_offsets,提交过来的时候,key 是 consumerGroupId+topic+ 分区号,value 就是以后 Offset 的值,Kafka 会定期清理 Topic 里的音讯,最初就保留最新的那条数据。
因为__consumer_offsets 可能会接管高并发的申请,Kafka 默认给其调配 50 个分区(能够通过 offsets.topic.num.partitions 设置),这样能够通过加机器的形式抗大并发。
消费者 Rebalance 机制
Rebalance 就是说 如果生产组里的消费者数量有变动或生产的分区数有变动,Kafka 会重新分配消费者与生产分区的关系。比方 consumer group 中某个消费者挂了,此时会主动把调配给他的分区交给其余的消费者,如果他又重启了,那么又会把一些分区从新交还给他。
留神:Rebalance 只针对 subscribe 这种不指定分区生产的状况,如果通过 assign 这种生产形式指定了分区,Kafka 不会进行 Rebalance。
如下状况可能会触发消费者 Rebalance:
- 生产组里的 Consumer 减少或缩小了
- 动静给 Topic 减少了分区
- 生产组订阅了更多的 Topic
Rebalance 过程中,消费者无奈从 Kafka 生产音讯,这对 Kafka 的 TPS 会有影响,如果 Kafka 集群内节点较多,比方数百 个,那重均衡可能会耗时极多,所以应尽量避免在零碎高峰期的重均衡产生。
Rebalance 过程如下
当有消费者退出生产组时,消费者、生产组及组协调器之间会经验以下几个阶段:
第一阶段:抉择组协调器
组协调器 GroupCoordinator:每个 consumer group 都会抉择一个 Broker 作为本人的组协调器 coordinator,负责监控这个生产组里的所有消费者的心跳,以及判断是否宕机,而后开启消费者 Rebalance。consumer group 中的每个 consumer 启动时会向 Kafka 集群中的某个节点发送 FindCoordinatorRequest 申请来查找对应的组协调器 GroupCoordinator,并跟其建设网络连接。组协调器抉择形式:通过如下公式能够选出 consumer 生产的 Offset 要提交到__consumer_offsets 的哪个分区,这个分区 Leader 对应的 Broker 就是这个 consumer group 的 coordinator 公式:
hash(consumer group id) % 对应主题的分区数
第二阶段:退出生产组 JOIN GROUP
在胜利找到生产组所对应的 GroupCoordinator 之后就进入退出生产组的阶段,在此阶段的消费者会向 GroupCoordinator 发送 JoinGroupRequest 申请,并解决响应。而后 GroupCoordinator 从一个 consumer group 中抉择第一个退出 group 的 consumer 作为 Leader(生产组协调器),把 consumer group 状况发送给这个 Leader,接着这个 Leader 会负责制订分区计划。
第三阶段(SYNC GROUP)
consumer leader 通过给 GroupCoordinator 发送 SyncGroupRequest,接着 GroupCoordinator 就把分区计划下发给各个 consumer,他们会依据指定分区的 Leader Broker 进行网络连接以及音讯生产。
消费者 Rebalance 分区调配策略
次要有三种 Rebalance 的策略:range、round-robin、sticky。默认状况为 range 调配策略。
假如一个主题有 10 个分区(0-9),当初有三个 consumer 生产:
range 策略:依照分区序号排序调配,假如 n=分区数/消费者数量 = 3,m=分区数 % 消费者数量 = 1,那么前 m 个消 费者每个调配 n+1 个分区,前面的(消费者数量-m)个消费者每个调配 n 个分区。比方分区 0~ 3 给一个 consumer,分区 4~ 6 给一个 consumer,分区 7~9 给一个 consumer。
round-robin 策略:轮询调配,比方分区 0、3、6、9 给一个 consumer,分区 1、4、7 给一个 consumer,分区 2、5、8 给一个 consumer
sticky 策略:初始时调配策略与 round-robin 相似,然而在 rebalance 的时候,须要保障如下两个准则:
- 分区的调配要尽可能平均。
- 分区的调配尽可能与上次调配的放弃雷同。
当两者发生冲突时,第一个指标优先于第二个指标。这样能够最大水平维持原来的分区调配的策略。比方对于第一种 range 状况的调配,如果第三个 consumer 挂了,那么从新用 sticky 策略调配的后果如下:consumer1 除了原有的 0~ 3,会再调配一个 7 consumer2 除了原有的 4~ 6,会再调配 8 和 9。
Producer 公布音讯机制分析
1、写入形式
producer 采纳 push 模式将音讯公布到 broker,每条音讯都被 append 到 patition 中,属于程序写磁盘(程序写磁盘 比 随机写 效率要高,保障 kafka 吞吐率)。
2、音讯路由
producer 发送音讯到 broker 时,会依据分区算法抉择将其存储到哪一个 partition。其路由机制为:
hash(key)% 分区数
3、写入流程
- producer 先从 ZooKeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
- producer 将音讯发送给该 leader
- leader 将音讯写入本地 log
- followers 从 leader pull 音讯,写入本地 log 后向 leader 发送 ACK
- leader 收到所有 ISR 中的 replica 的 ACK 后,减少 HW(high watermark,最初 commit 的 offset)并向 producer 发送 ACK
HW 与 LEO
HW 俗称高水位,HighWatermark 的缩写,取一个 partition 对应的 ISR 中最小的 LEO(log-end-offset)作为 HW,consumer 最多只能生产到 HW 所在的地位。另外每个 replica 都有 HW,leader 和 follower 各自负责更新本人的 HW 的状 态。对于 leader 新写入的音讯,consumer 不能立即生产,leader 会期待该音讯被所有 ISR 中的 replicas 同步后更新 HW,此时音讯能力被 consumer 生产。这样就保障了如果 leader 所在的 broker 生效,该音讯依然能够从新选举的 leader 中获取。对于来自外部 broker 的读取申请,没有 HW 的限度。
日志分段存储
Kafka 一个分区的音讯数据对应存储在一个文件夹下,以 topic 名称 + 分区号命名,音讯在分区内是分段存储的,每个段的音讯都存储在不一样的 log 文件里,Kafka 规定了一个段位的 log 文件最大为 1G,做这个限度目标是为了不便把 log 文件加载到内存去操作:
1 ### 局部音讯的 offset 索引文件,kafka 每次往分区发 4K(可配置)音讯就会记录一条以后音讯的 offset 到 index 文件,2 ### 如果要定位音讯的 offset 会先在这个文件里疾速定位,再去 log 文件里找具体音讯 3 00000000000000000000.index 4 ### 音讯存储文件,次要存 offset 和音讯体 5 00000000000000000000.log 6 ### 音讯的发送工夫索引文件,kafka 每次往分区发 4K(可配置)音讯就会记录一条以后音讯的发送工夫戳与对应的 offset 到 timeindex 文件,7 ### 如果须要依照工夫来定位音讯的 offset,会先在这个文件里查找 8 00000000000000000000.timeindex 9 10 00000000000005367851.index 11 00000000000005367851.log 12 00000000000005367851.timeindex 13 14 00000000000009936472.index 15 00000000000009936472.log 16 00000000000009936472.timeindex
这个 9936472 之类的数字,就是代表了这个日志段文件里蕴含的起始 Offset,也就阐明这个分区里至多都写入了靠近 1000 万条数据了。Kafka Broker 有一个参数,log.segment.bytes,限定了每个日志段文件的大小,最大就是 1GB。一个日志段文件满了,就主动开一个新的日志段文件来写入,防止单个文件过大,影响文件的读写性能,这个过程叫做 log rolling,正在被写入的那个日志段文件,叫做 active log segment。
最初附一张 ZooKeeper 节点数据图
MQ 带来的一些问题、及解决方案
如何保障程序生产?
- RabbitMQ:一个 Queue 对应一个 Consumer 即可解决。
- RocketMQ:hash(key)% 队列数
- Kafka:hash(key)% 分区数
如何实现提早生产?
- RabbitMQ:两种计划 死信队列 + TTL 引入 RabbitMQ 的提早插件
- RocketMQ:天生反对延时音讯。
- Kafka:步骤如下 专门为要提早的音讯创立一个 Topic 新建一个消费者去生产这个 Topic 音讯长久化再开一个线程定时去拉取长久化的音讯,放入理论要生产的 Topic 理论生产的消费者从理论要生产的 Topic 拉取音讯。
如何保障音讯的可靠性投递
RabbitMQ:
- Broker–> 消费者:手动 ACK
- 生产者 –>Broker:两种计划
数据库长久化:
1. 将业务订单数据和生成的 Message 进行长久化操作(个别状况下插入数据库,这里如果分库的话可能波及到分布式事务)2. 将 Message 发送到 Broker 服务器中 3. 通过 RabbitMQ 的 Confirm 机制,在 producer 端,监听服务器是否 ACK。4. 如果 ACK 了,就将 Message 这条数据状态更新为已发送。如果失败,批改为失败状态。5. 分布式定时工作查询数据库 3 分钟(这个具体工夫应该依据的时效性来定)之前的发送失败的音讯 6. 从新发送音讯,记录发送次数 7. 如果发送次数过多依然失败,那么就须要人工排查之类的操作。
长处:可能保障音讯百分百不失落。
毛病:第一步会波及到分布式事务问题。
音讯的提早投递:
流程图中,色彩不同的代表不同的 message1. 将业务订单长久化 2. 发送一条 Message 到 broker(称之为主 Message),再发送雷同的一条到不同的队列或者交换机 (这条称为确认 Message) 中。3. 主 Message 由理论业务解决端生产后,生成一条响应 Message。之前的确认 Message 由 Message Service 利用解决入库。4~6. 理论业务解决端发送的确认 Message 由 Message Service 接管后,将原 Message 状态批改。7. 如果该条 Message 没有被确认,则通过 rpc 调用从新由 producer 进行全过程。
长处:绝对于长久化计划来说响应速度有所晋升
毛病:零碎复杂性有点高,万一两条音讯都失败了,音讯存在失落状况,仍需 Confirm 机制做弥补。
RocketMQ
生产者弄丢数据:
Producer 在把 Message 发送 Broker 的过程中,因为网络问题等产生失落,或者 Message 到了 Broker,然而出了问题,没有保留下来。针对这个问题,RocketMQ 对 Producer 发送音讯设置了 3 种形式:
同步发送异步发送单向发送
Broker 弄丢数据:
Broker 接管到 Message 暂存到内存,Consumer 还没来得及生产,Broker 挂掉了。
能够通过 长久化 设置去解决:
- 创立 Queue 的时候设置长久化,保障 Broker 长久化 Queue 的元数据,然而不会长久化 Queue 外面的音讯
- 将 Message 的 deliveryMode 设置为 2,能够将音讯长久化到磁盘,这样只有 Message 反对化到磁盘之后才会发送告诉 Producer ack
这两步过后,即便 Broker 挂了,Producer 必定收不到 ack 的,就能够进行重发。
消费者弄丢数据:
Consumer 有生产到 Message,然而外部呈现问题,Message 还没解决,Broker 认为 Consumer 解决完了,只会把后续的音讯发送。这时候,就要 敞开 autoack,音讯解决过后,进行手动 ack , 屡次生产失败的音讯,会进入 死信队列,这时候须要人工干预。
Kafka
生产者弄丢数据
设置了 acks=all,肯定不会丢,要求是,你的 leader 接管到音讯,所有的 follower 都同步到了音讯之后,才认为本次写胜利了。如果没满足这个条件,生产者会主动一直的重试,重试有限次。
Broker 弄丢数据
Kafka 某个 broker 宕机,而后从新选举 partition 的 leader。大家想想,要是此时其余的 follower 刚好还有些数据没有同步,后果此时 leader 挂了,而后选举某个 follower 成 leader 之后,不就少了一些数据?这就丢了一些数据啊。
此时个别是要求起码设置如下 4 个参数:
replication.factormin.insync.replicasacks=allretries=MAX
咱们生产环境就是依照上述要求配置的,这样配置之后,至多在 Kafka broker 端就能够保障在 leader 所在 broker 产生故障,进行 leader 切换时,数据不会失落。
消费者弄丢数据
你生产到了这个音讯,而后消费者那边主动提交了 offset,让 Kafka 认为你曾经生产好了这个音讯,但其实你才刚筹备解决这个音讯,你还没解决,你本人就挂了,此时这条音讯就丢咯。
这不是跟 RabbitMQ 差不多吗,大家都晓得 Kafka 会主动提交 offset,那么只有 敞开主动提交 offset,在解决完之后本人手动提交 offset,就能够保证数据不会丢。然而此时的确还是可能会有反复生产,比方你刚解决完,还没提交 offset,后果本人挂了,此时必定会反复生产一次,本人保障幂等性就好了。
如何保障音讯的幂等?
以 RocketMQ 为例,上面列出了音讯反复的场景:
发送时音讯反复
当一条音讯已被胜利发送到服务端并实现长久化,此时呈现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。如果此时生产者意识到音讯发送失败并尝试再次发送音讯,消费者后续会收到两条内容雷同并且 Message ID 也雷同的音讯。
投递时音讯反复
音讯生产的场景下,音讯已投递到消费者并实现业务解决,当客户端给服务端反馈应答的时候网络闪断。为了保障音讯至多被生产一次,音讯队列 RocketMQ 版的服务端将在网络复原后再次尝试投递之前已被解决过的音讯,消费者后续会收到两条内容雷同并且 Message ID 也雷同的音讯。
负载平衡时音讯反复(包含但不限于网络抖动、Broker 重启以及消费者利用重启)
当音讯队列 RocketMQ 版的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到反复音讯。
那么,有什么解决方案呢?间接上图。
如何解决音讯积压的问题?
对于这个问题,有几个点须要思考:
如何疾速让积压的音讯被生产掉?
长期写一个音讯散发的消费者,把积压队列里的音讯平均散发到 N 个队列中,同时一个队列对应一个消费者,相当于生产速度进步了 N 倍。
批改前:
批改后:
积压工夫太久,导致局部音讯过期,怎么解决?
批量重导。在业务不忙碌的时候,比方凌晨,提前准备好程序,把失落的那批音讯查出来,从新导入到 MQ 中。
音讯大量积压,MQ 磁盘被写满了,导致新音讯进不来了,丢掉了大量音讯,怎么解决?
这个没方法。谁让【音讯散发的消费者】写的太慢了,你长期写程序,接入数据来生产,生产一个抛弃一个,都不要了,疾速生产掉所有的音讯。而后走第二个计划,到了早晨再补数据吧。