关于java:横贯八方揭秘RabbitMQRocketMQKafka-的核心原理建议收藏

明天咱们通过一篇文章来认识一下常见音讯队列RabbitMQ、RocketMQ、Kafka。

RabbitMQ

RabbitMQ各组件的性能

  • Broker :一个RabbitMQ实例就是一个Broker
  • Virtual Host :虚拟主机。相当于MySQL的DataBase,一个Broker上能够存在多个vhost,vhost之间互相隔离。每个vhost都领有本人的队列、交换机、绑定和权限机制。vhost必须在连贯时指定,默认的vhost是/。
  • Exchange :交换机,用来接管生产者发送的音讯并将这些音讯路由给服务器中的队列。
  • Queue :音讯队列,用来保留音讯直到发送给消费者。它是音讯的容器。一个音讯可投入一个或多个队列。
  • Banding :绑定关系,用于音讯队列和交换机之间的关联。通过路由键(Routing Key)将交换机和音讯队列关联起来。
  • Channel :管道,一条双向数据流通道。不论是公布音讯、订阅队列还是接管音讯,这些动作都是通过管道实现。因为对于操作系统来说,建设和销毁TCP都是十分低廉的开销,所以引入了管道的概念,以复用一条TCP连贯。
  • Connection :生产者/消费者 与broker之间的TCP连贯。
  • Publisher :音讯的生产者。
  • Consumer :音讯的消费者。
  • Message :音讯,它是由音讯头和音讯体组成。音讯头则包含Routing-KeyPriority(优先级)等。

RabbitMQ的多种交换机类型

Exchange 散发音讯给 Queue 时, Exchange 的类型对应不同的散发策略,有3种类型的 Exchange :DirectFanoutTopic

  • Direct:音讯中的 Routing Key 如果和 Binding 中的 Routing Key 完全一致, Exchange 就会将音讯散发到对应的队列中。
  • Fanout:每个发到 Fanout 类型交换机的音讯都会散发到所有绑定的队列下来。Fanout交换机没有 Routing Key 。它在三种类型的交换机中转发音讯是最快的
  • Topic:Topic交换机通过模式匹配调配音讯,将 Routing Key 和某个模式进行匹配。它只能辨认两个通配符:”#”和”“。# 匹配0个或多个单词, 匹配1个单词。

TTL

TTL(Time To Live):生存工夫。RabbitMQ反对音讯的过期工夫,一共2种。

  • 在音讯发送时进行指定。通过配置音讯体的 Properties ,能够指定以后音讯的过期工夫。
  • 在创立Exchange时指定。从进入音讯队列开始计算,只有超过了队列的超时工夫配置,那么音讯会主动革除。

生产者的音讯确认机制

Confirm机制:

  • 音讯的确认,是指生产者投递音讯后,如果Broker收到音讯,则会给咱们生产者一个应答。
  • 生产者进行承受应答,用来确认这条音讯是否失常的发送到了Broker,这种形式也是音讯的可靠性投递的外围保障!

如何实现Confirm确认音讯?

  1. 在channel上开启确认模式:channel.confirmSelect()
  2. 在channel上开启监听:addConfirmListener ,监听胜利和失败的处理结果,依据具体的后果对音讯进行从新发送或记录日志解决等后续操作。

Return音讯机制:

Return Listener用于解决一些不可路由的音讯

咱们的音讯生产者,通过指定一个Exchange和Routing,把音讯送达到某一个队列中去,而后咱们的消费者监听队列进行音讯的生产解决操作。

然而在某些状况下,如果咱们在发送音讯的时候,以后的exchange不存在或者指定的路由key路由不到,这个时候咱们须要监听这种不可达音讯,就须要应用到Returrn Listener。

根底API中有个要害的配置项 Mandatory :如果为true,监听器会收到路由不可达的音讯,而后进行解决。如果为false,broker端会主动删除该音讯。

同样,通过监听的形式, chennel.addReturnListener(ReturnListener rl) 传入曾经重写过handleReturn办法的ReturnListener。

生产端ACK与NACK

生产端进行生产的时候,如果因为业务异样能够进行日志的记录,而后进行弥补。然而对于服务器宕机等重大问题,咱们须要手动ACK保障生产端生产胜利。

// deliveryTag:音讯在mq中的惟一标识// multiple:是否批量(和qos设置相似的参数)// requeue:是否须要重回队列。或者抛弃或者重回队首再次生产。public void basicNack(long deliveryTag, boolean multiple, boolean requeue) 

如上代码,音讯在生产端重回队列是为了对没有胜利解决音讯,把音讯从新返回到Broker。一般来说,理论利用中都会敞开重回队列(防止进入死循环),也就是设置为false。

死信队列DLX

死信队列(DLX Dead-Letter-Exchange):当音讯在一个队列中变成死信之后,它会被从新推送到另一个队列,这个队列就是死信队列。

DLX也是一个失常的Exchange,和个别的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。

当这个队列中有死信时,RabbitMQ就会主动的将这个音讯从新公布到设置的Exchange下来,进而被路由到另一个队列。

RocketMQ

阿里巴巴双十一官网指定音讯产品,撑持阿里巴巴团体所有的音讯服务,历经十余年高可用与高牢靠的严苛考验,是阿里巴巴交易链路的外围产品。

Rocket:火箭的意思。

RocketMQ的外围概念

他有以下外围概念:Broker 、 Topic 、 Tag 、 MessageQueue 、 NameServer 、 Group 、 Offset 、 Producer 以及 Consumer 。

上面来具体介绍。

  • Broker:音讯直达角色,负责存储音讯,转发音讯。
    Broker是具体提供业务的服务器,单个Broker节点与所有的NameServer节点放弃长连贯及心跳,并会定时将Topic信息注册到NameServer,顺带一提底层的通信和连贯都是基于Netty实现的。
    Broker负责音讯存储,以Topic为纬度反对轻量级的队列,单机能够撑持上万队列规模,反对音讯推拉模型。官网上有数据显示:具备上亿级音讯沉积能力,同时可严格保障音讯的有序性
  • Topic:主题!它是音讯的第一级类型。
    比方一个电商零碎能够分为:交易音讯、物流音讯等,一条音讯必须有一个 Topic 。Topic与生产者和消费者的关系十分涣散,一个 Topic 能够有0个、1个、多个生产者向其发送音讯,一个生产者也能够同时向不同的 Topic 发送音讯。一个 Topic 也能够被 0个、1个、多个消费者订阅。
  • Tag:标签!能够看作子主题,它是音讯的第二级类型,用于为用户提供额定的灵活性。
    应用标签,同一业务模块不同目标的音讯就能够用雷同Topic而不同的Tag来标识。比方交易音讯又能够分为:交易创立音讯、交易实现音讯等,一条音讯能够没有Tag。标签有助于放弃您的代码洁净和连贯,并且还能够为RabbitMQ提供的查问零碎提供帮忙。
  • MessageQueue:一个Topic下能够设置多个音讯队列,发送音讯时执行该音讯的Topic,RocketMQ会轮询该Topic下的所有队列将音讯收回去。音讯的物理治理单位。一个Topic下能够有多个Queue,Queue的引入使得音讯的存储能够分布式集群化,具备了程度扩大能力。
  • NameServer:相似Kafka中的ZooKeeper,但NameServer集群之间是没有通信的,绝对ZK来说更加轻量
    它次要负责对于源数据的治理,包含了对于Topic和路由信息的治理。每个Broker在启动的时候会到NameServer注册,Producer在发送音讯前会依据Topic去NameServer获取对应Broker的路由信息,Consumer也会定时获取 Topic 的路由信息。
  • Producer:生产者,反对三种形式发送音讯:同步、异步和单向
    单向发送 :音讯收回去后,能够持续发送下一条音讯或执行业务代码,不期待服务器回应,且没有回调函数
    异步发送 :音讯收回去后,能够持续发送下一条音讯或执行业务代码,不期待服务器回应,有回调函数
    同步发送 :音讯收回去后,期待服务器响应胜利或失败,能力持续前面的操作。
  • Consumer:消费者,反对 PUSH 和 PULL 两种生产模式,反对集群生产播送生产 集群生产 :该模式下一个消费者集群独特生产一个主题的多个队列,一个队列只会被一个消费者生产,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者持续生产。
    播送生产 :会发给消费者组中的每一个消费者进行生产。相当于RabbitMQ的公布订阅模式。
  • Group:分组,一个组能够订阅多个Topic。
    分为ProducerGroup,ConsumerGroup,代表某一类的生产者和消费者,一般来说同一个服务能够作为Group,同一个Group一般来说发送和生产的音讯都是一样的
  • Offset:在RocketMQ中,所有音讯队列都是长久化,长度有限的数据结构,所谓长度有限是指队列中的每个存储单元都是定长,拜访其中的存储单元应用Offset来拜访,Offset为Java Long类型,64位,实践上在 100年内不会溢出,所以认为是长度有限。也能够认为Message Queue是一个长度有限的数组,Offset就是下标。

延时音讯

开源版的RocketMQ不反对任意工夫精度,仅反对特定的level,例如定时5s,10s,1min等。其中,level=0级示意不延时,level=1示意1级延时,level=2示意2级延时,以此类推。

延时等级如下:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

程序音讯

音讯有序指的是能够依照音讯的发送程序来生产(FIFO)。RocketMQ能够严格的保障音讯有序,能够分为 分区有序 或者 全局有序 。

事务音讯

音讯队列MQ提供相似X/Open XA的分布式事务性能,通过音讯队列MQ事务音讯能达到分布式事务的最终统一。上图阐明了事务音讯的大抵流程:失常事务音讯的发送和提交、事务音讯的弥补流程。

事务音讯发送及提交:

  1. 发送half音讯
  2. 服务端响应音讯写入后果
  3. 依据发送后果执行本地事务(如果写入失败,此时half音讯对业务不可见,本地逻辑不执行);
  4. 依据本地事务状态执行Commit或Rollback(Commit操作生成音讯索引,音讯对消费者可见)。

事务音讯的弥补流程:

  1. 对没有Commit/Rollback的事务音讯(pending状态的音讯),从服务端发动一次“回查”;
  2. Producer收到回查音讯,查看回查音讯对应的本地事务的状态。
  3. 依据本地事务状态,从新Commit或RollBack

其中,弥补阶段用于解决音讯Commit或Rollback产生超时或者失败的状况。

事务音讯状态:

事务音讯共有三种状态:提交状态、回滚状态、中间状态:

  1. TransactionStatus.CommitTransaction:提交事务,它容许消费者生产此音讯。
  2. TransactionStatus.RollbackTransaction:回滚事务,它代表该音讯将被删除,不容许被生产。
  3. TransactionStatus.Unkonwn:中间状态,它代表须要查看音讯队列来确定音讯状态。

RocketMQ的高可用机制

‍RocketMQ是天生反对分布式的,能够配置主从以及程度扩大。

Master角色的Broker反对读和写,Slave角色的Broker仅反对读,也就是 Producer只能和Master角色的Broker连贯写入音讯;Consumer能够连贯 Master角色的Broker,也能够连贯Slave角色的Broker来读取音讯。

音讯生产的高可用(主从):

在Consumer的配置文件中,并不需要设置是从Master读还是从Slave读,当Master不可用或者忙碌的时候,Consumer会被主动切换到从Slave读。有了主动切换Consumer这种机制,当一个Master角色的机器呈现故障后,Consumer依然能够从Slave读取音讯,不影响Consumer程序。

在4.5版本之前如果Master节点挂了,Slave节点是不能主动切换成master节点的这个时候须要手动进行Slave角色的Broker,更改配置文件,用新的配置文件启动Broker。然而在4.5之后,RocketMQ引入了Dledger同步机制,这个时候如果Master节点挂了,Dledger会通过Raft协定选举出新的master节点,不须要手动批改配置。

音讯发送高可用(配置多个主节点):

在创立Topic的时候,把Topic的多个Message Queue创立在多个Broker组上(雷同Broker名称,不同 brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用后,其余组的Master依然可用,Producer依然能够发送音讯。

主从复制:

如果一个Broker组有Master和Slave,音讯须要从Master复制到Slave 上,有同步和异步两种复制形式。

  • 同步复制:同步复制形式是等Master和Slave均写胜利后才反馈给客户端写胜利状态。如果Master出故障, Slave上有全副的备份数据,容易复原同步复制会增大数据写入提早,升高零碎吞吐量。
  • 异步复制:异步复制形式是只有Master写胜利 即可反馈给客户端写胜利状态。在异步复制形式下,零碎领有较低的提早和较高的吞吐量,然而如果Master出了故障,有些数据因为没有被写 入Slave,有可能会失落

通常状况下,应该把Master和Slave配置成同步刷盘形式,主从之间配置成异步的复制形式,这样即便有一台机器出故障,依然能保证数据不丢,是个不错的抉择。

负载平衡

Producer负载平衡:

Producer端,每个实例在发消息的时候,默认会轮询所有的Message Queue发送,以达到让音讯均匀落在不同的Queue上。而因为Queue能够散落在不同的Broker,所以音讯就发送到不同的Broker下,如下图:

Consumer负载平衡:

如果Consumer实例的数量比Message Queue的总数量还多的话,多进去的Consumer实例将无奈分到Queue,也就无奈生产到音讯,也就无奈起到摊派负载的作用了。所以须要管制让Queue的总数量大于等于Consumer的数量。

  • 消费者的集群模式:启动多个消费者就能够保障消费者的负载平衡(均摊队列)
  • 默认应用的是均摊队列:会依照Queue的数量和实例的数量平均分配Queue给每个实例,这样每个消费者能够均摊生产的队列,如下图所示6个队列和三个生产者。
  • 另外一种均匀的算法环状轮流分Queue的模式,每个消费者,均摊不同主节点的一个音讯队列,如下图所示:

对于播送模式并不是负载平衡的,要求一条音讯须要投递到一个生产组上面所有的消费者实例,所以也就没有音讯被摊派生产的说法。

死信队列

当一条音讯生产失败,RocketMQ就会主动进行音讯重试。而如果音讯超过最大重试次数,RocketMQ就会认为这个音讯有问题。然而此时,RocketMQ不会立即将这个有问题的音讯抛弃,而会将其发送到这个消费者组对应的一种非凡队列:死信队列。死信队列的名称是 %DLQ%+ConsumGroup 。

死信队列具备以下个性:

  1. 一个死信队列对应一个Group ID, 而不是对应单个消费者实例。
  2. 如果一个Group ID未产生死信音讯,音讯队列RocketMQ不会为其创立相应的死信队列。
  3. 一个死信队列蕴含了对应Group ID产生的所有死信音讯,不管该音讯属于哪个Topic。

Kafka

Kafka是一个分布式、反对分区的、多正本的,基于ZooKeeper协调的分布式音讯零碎。

新版Kafka曾经不再须要ZooKeeper。

它最大的个性就是能够实时的解决大量数据以满足各种需要场景:比方基于Hadoop的批处理零碎、低提早的实时零碎、Storm/Spark流式解决引擎,Web/Nginx日志、拜访日志,音讯服务等等,用Scala语言编写。属于Apache基金会的顶级开源我的项目。

先看一下Kafka的架构图 :

Kafka的外围概念

在Kafka中有几个外围概念:

  • Broker:消息中间件解决节点,一个Kafka节点就是一个Broker,一个或者多个Broker能够组成一个Kafka集群
  • Topic:Kafka依据topic对音讯进行归类,公布到Kafka集群的每条音讯都须要指定一个topic
  • Producer:音讯生产者,向Broker发送音讯的客户端
  • Consumer:音讯消费者,从Broker读取音讯的客户端
  • ConsumerGroup:每个Consumer属于一个特定的ConsumerGroup,一条音讯能够被多个不同的ConsumerGroup生产,然而一个ConsumerGroup中只能有一个Consumer可能生产该音讯
  • Partition:物理上的概念,一个topic能够分为多个partition,每个partition内部消息是有序的
  • Leader:每个Partition有多个正本,其中有且仅有一个作为Leader,Leader是负责数据读写的Partition。
  • Follower:Follower追随Leader,所有写申请都通过Leader路由,数据变更会播送给所有Follower,Follower与Leader保持数据同步。如果Leader生效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,Leader会把这个Follower从 ISR列表 中删除,从新创立一个Follower。
  • Offset:偏移量。Kafka的存储文件都是依照offset.kafka来命名,用Offset做名字的益处是不便查找。例如你想找位于2049的地位,只有找到2048.kafka的文件即可。

能够这么来了解Topic,Partition和Broker:

一个Topic,代表逻辑上的一个业务数据集,比方订单相干操作音讯放入订单Topic,用户相干操作音讯放入用户Topic,对于大型网站来说,后端数据都是海量的,订单音讯很可能是十分巨量的,比方有几百个G甚至达到TB级别,如果把这么多数据都放在一台机器上可定会有容量限度问题,那么就能够在Topic外部划分多个Partition来分片存储数据,不同的Partition能够位于不同的机器上,相当于分布式存储。每台机器上都运行一个Kafka的过程Broker。

Kafka外围总控制器Controller

在Kafka集群中会有一个或者多个Broker,其中有一个Broker会被选举为控制器(Kafka Controller),能够了解为 Broker-Leader ,它负责管理整个 集群中所有分区和正本的状态。

Partition-Leader

Controller选举机制

在Kafka集群启动的时候,选举的过程是集群中每个Broker都会尝试在ZooKeeper上创立一个 /controller长期节点,ZooKeeper会保障有且仅有一个Broker能创立胜利,这个Broker就会成为集群的总控器Controller。

当这个Controller角色的Broker宕机了,此时ZooKeeper长期节点会隐没,集群里其余Broker会始终监听这个长期节 点,发现长期节点隐没了,就竞争再次创立长期节点,就是咱们下面说的选举机制,ZooKeeper又会保障有一个Broker成为新的Controller。具备控制器身份的Broker须要比其余一般的Broker多一份职责,具体细节如下:

  1. 监听Broker相干的变动。为ZooKeeper中的/brokers/ids/节点增加BrokerChangeListener,用来解决Broker增减的变动。
  2. 监听Topic相干的变动。为ZooKeeper中的/brokers/topics节点增加TopicChangeListener,用来解决Topic增减的变动;为ZooKeeper中的/admin/delete_topics节点增加TopicDeletionListener,用来解决删除Topic的动作。
  3. 从ZooKeeper中读取获取以后所有与Topic、Partition以及Broker无关的信息并进行相应的治理 。对于所有Topic所对应的ZooKeeper中的/brokers/topics/节点增加PartitionModificationsListener,用来监听Topic中的分区调配变动。
  4. 更新集群的元数据信息,同步到其余一般的Broker节点中

Partition正本选举Leader机制

Controller感知到分区Leader所在的Broker挂了,Controller会从ISR列表(参数 unclean.leader.election.enable=false的前提下)里挑第一个Broker作为Leader(第一个Broker最先放进ISR列表,可能是同步数据最多的正本),如果参数unclean.leader.election.enable为true,代表在ISR列表里所有正本都挂了的时候能够在ISR列表以外的正本当选Leader,这种设置,能够进步可用性,然而选出的新Leader有可能数据少很多。正本进入ISR列表有两个条件:

  1. 正本节点不能产生分区,必须能与ZooKeeper放弃会话以及跟Leader正本网络连通
  2. 副本能复制Leader上的所有写操作,并且不能落后太多。(与Leader正本同步滞后的正本,是由replica.lag.time.max.ms配置决定的,超过这个工夫都没有跟Leader同步过的一次的正本会被移出ISR列表)

消费者生产音讯的Offset记录机制

每个Consumer会定期将本人生产分区的Offset提交给Kafka外部Topic:consumer_offsets,提交过来的时候,key是consumerGroupId+topic+分区号,value就是以后Offset的值,Kafka会定期清理Topic里的音讯,最初就保留最新的那条数据。

因为__consumer_offsets可能会接管高并发的申请,Kafka默认给其调配50个分区(能够通过 offsets.topic.num.partitions设置),这样能够通过加机器的形式抗大并发。

消费者Rebalance机制

Rebalance就是说 如果生产组里的消费者数量有变动或生产的分区数有变动,Kafka会重新分配消费者与生产分区的关系 。比方consumer group中某个消费者挂了,此时会主动把调配给他的分区交给其余的消费者,如果他又重启了,那么又会把一些分区从新交还给他。

留神:Rebalance只针对subscribe这种不指定分区生产的状况,如果通过assign这种生产形式指定了分区,Kafka不会进行Rebalance。

如下状况可能会触发消费者Rebalance:

  1. 生产组里的Consumer减少或缩小了
  2. 动静给Topic减少了分区
  3. 生产组订阅了更多的Topic

Rebalance过程中,消费者无奈从Kafka生产音讯,这对Kafka的TPS会有影响,如果Kafka集群内节点较多,比方数百 个,那重均衡可能会耗时极多,所以应尽量避免在零碎高峰期的重均衡产生。

Rebalance过程如下

当有消费者退出生产组时,消费者、生产组及组协调器之间会经验以下几个阶段:

第一阶段:抉择组协调器

组协调器GroupCoordinator:每个consumer group都会抉择一个Broker作为本人的组协调器coordinator,负责监控这个生产组里的所有消费者的心跳,以及判断是否宕机,而后开启消费者Rebalance。consumer group中的每个consumer启动时会向Kafka集群中的某个节点发送FindCoordinatorRequest申请来查找对应的组协调器GroupCoordinator,并跟其建设网络连接。组协调器抉择形式:通过如下公式能够选出consumer生产的Offset要提交到__consumer_offsets的哪个分区,这个分区Leader对应的Broker就是这个consumer group的coordinator公式:

hash(consumer group id) % 对应主题的分区数

第二阶段:退出生产组JOIN GROUP

在胜利找到生产组所对应的GroupCoordinator之后就进入退出生产组的阶段,在此阶段的消费者会向GroupCoordinator发送JoinGroupRequest申请,并解决响应。而后GroupCoordinator从一个consumer group中抉择第一个退出group的consumer作为Leader(生产组协调器),把consumer group状况发送给这个Leader,接着这个Leader会负责制订分区计划。

第三阶段(SYNC GROUP)

consumer leader通过给GroupCoordinator发送SyncGroupRequest,接着GroupCoordinator就把分区计划下发给各个consumer,他们会依据指定分区的Leader Broker进行网络连接以及音讯生产。

消费者Rebalance分区调配策略

次要有三种Rebalance的策略:range 、 round-robin 、 sticky 。默认状况为range调配策略

假如一个主题有10个分区(0-9),当初有三个consumer生产:

range策略:依照分区序号排序调配 ,假如n=分区数/消费者数量 = 3, m=分区数%消费者数量 = 1,那么前 m 个消 费者每个调配 n+1 个分区,前面的(消费者数量-m )个消费者每个调配 n 个分区。比方分区0~ 3给一个consumer,分区4~ 6给一个consumer,分区7~9给一个consumer。

round-robin策略:轮询调配 ,比方分区0、3、6、9给一个consumer,分区1、4、7给一个consumer,分区2、5、 8给一个consumer

sticky策略:初始时调配策略与round-robin相似,然而在rebalance的时候,须要保障如下两个准则:

  1. 分区的调配要尽可能平均 。
  2. 分区的调配尽可能与上次调配的放弃雷同。

当两者发生冲突时,第一个指标优先于第二个指标 。这样能够最大水平维持原来的分区调配的策略。比方对于第一种range状况的调配,如果第三个consumer挂了,那么从新用sticky策略调配的后果如下:consumer1除了原有的0~ 3,会再调配一个7 consumer2除了原有的4~ 6,会再调配8和9。

Producer公布音讯机制分析

1、写入形式

producer采纳push模式将音讯公布到broker,每条音讯都被append到patition中,属于程序写磁盘(程序写磁盘 比 随机写 效率要高,保障 kafka 吞吐率)。

2、音讯路由

producer发送音讯到broker时,会依据分区算法抉择将其存储到哪一个partition。其路由机制为:

hash(key)%分区数

3、写入流程

  1. producer先从ZooKeeper的 “/brokers/…/state” 节点找到该partition的leader
  2. producer将音讯发送给该leader
  3. leader将音讯写入本地log
  4. followers从leader pull音讯,写入本地log后向leader发送ACK
  5. leader收到所有ISR中的replica的ACK后,减少HW(high watermark,最初commit的offset)并向producer发送ACK

HW与LEO

HW俗称高水位 ,HighWatermark的缩写,取一个partition对应的ISR中最小的LEO(log-end-offset)作为HW, consumer最多只能生产到HW所在的地位。另外每个replica都有HW,leader和follower各自负责更新本人的HW的状 态。对于leader新写入的音讯,consumer不能立即生产,leader会期待该音讯被所有ISR中的replicas同步后更新HW, 此时音讯能力被consumer生产。这样就保障了如果leader所在的broker生效,该音讯依然能够从新选举的leader中获取。对于来自外部broker的读取申请,没有HW的限度。

日志分段存储

Kafka一个分区的音讯数据对应存储在一个文件夹下,以topic名称+分区号命名,音讯在分区内是分段存储的, 每个段的音讯都存储在不一样的log文件里,Kafka规定了一个段位的log文件最大为1G,做这个限度目标是为了不便把log文件加载到内存去操作:

1 ### 局部音讯的offset索引文件,kafka每次往分区发4K(可配置)音讯就会记录一条以后音讯的offset到index文件, 2 ### 如果要定位音讯的offset会先在这个文件里疾速定位,再去log文件里找具体音讯 3 00000000000000000000.index 4 ### 音讯存储文件,次要存offset和音讯体 5 00000000000000000000.log 6 ### 音讯的发送工夫索引文件,kafka每次往分区发4K(可配置)音讯就会记录一条以后音讯的发送工夫戳与对应的offset到timeindex文件, 7 ### 如果须要依照工夫来定位音讯的offset,会先在这个文件里查找 8 00000000000000000000.timeindex 9 10 00000000000005367851.index 11 00000000000005367851.log 12 00000000000005367851.timeindex 13 14 00000000000009936472.index 15 00000000000009936472.log 16 00000000000009936472.timeindex

这个9936472之类的数字,就是代表了这个日志段文件里蕴含的起始 Offset,也就阐明这个分区里至多都写入了靠近1000万条数据了。Kafka Broker有一个参数,log.segment.bytes,限定了每个日志段文件的大小,最大就是1GB。一个日志段文件满了,就主动开一个新的日志段文件来写入,防止单个文件过大,影响文件的读写性能,这个过程叫做log rolling,正在被写入的那个日志段文件,叫做active log segment。

最初附一张ZooKeeper节点数据图

MQ带来的一些问题、及解决方案

如何保障程序生产?

  • RabbitMQ:一个Queue对应一个Consumer即可解决。
  • RocketMQ:hash(key)%队列数
  • Kafkahash(key)%分区数

如何实现提早生产?

  • RabbitMQ:两种计划 死信队列 + TTL引入RabbitMQ的提早插件
  • RocketMQ:天生反对延时音讯。
  • Kafka:步骤如下 专门为要提早的音讯创立一个Topic新建一个消费者去生产这个Topic音讯长久化再开一个线程定时去拉取长久化的音讯,放入理论要生产的Topic理论生产的消费者从理论要生产的Topic拉取音讯。

如何保障音讯的可靠性投递

RabbitMQ:

  • Broker–>消费者:手动ACK
  • 生产者–>Broker:两种计划

数据库长久化:

1.将业务订单数据和生成的Message进行长久化操作(个别状况下插入数据库,这里如果分库的话可能波及到分布式事务)2.将Message发送到Broker服务器中3.通过RabbitMQ的Confirm机制,在producer端,监听服务器是否ACK。4.如果ACK了,就将Message这条数据状态更新为已发送。如果失败,批改为失败状态。5.分布式定时工作查询数据库3分钟(这个具体工夫应该依据的时效性来定)之前的发送失败的音讯6.从新发送音讯,记录发送次数7.如果发送次数过多依然失败,那么就须要人工排查之类的操作。

长处:可能保障音讯百分百不失落。

毛病:第一步会波及到分布式事务问题。

音讯的提早投递:

流程图中,色彩不同的代表不同的message1.将业务订单长久化2.发送一条Message到broker(称之为主Message),再发送雷同的一条到不同的队列或者交换机(这条称为确认Message)中。3.主Message由理论业务解决端生产后,生成一条响应Message。之前的确认Message由Message Service利用解决入库。4~6.理论业务解决端发送的确认Message由Message Service接管后,将原Message状态批改。7.如果该条Message没有被确认,则通过rpc调用从新由producer进行全过程。

长处:绝对于长久化计划来说响应速度有所晋升

毛病:零碎复杂性有点高,万一两条音讯都失败了,音讯存在失落状况,仍需Confirm机制做弥补。

RocketMQ

生产者弄丢数据:

Producer在把Message发送Broker的过程中,因为网络问题等产生失落,或者Message到了Broker,然而出了问题,没有保留下来。针对这个问题,RocketMQ对Producer发送音讯设置了3种形式:

同步发送异步发送单向发送

Broker弄丢数据:

Broker接管到Message暂存到内存,Consumer还没来得及生产,Broker挂掉了。

能够通过 长久化 设置去解决:

  1. 创立Queue的时候设置长久化,保障Broker长久化Queue的元数据,然而不会长久化Queue外面的音讯
  2. 将Message的deliveryMode设置为2,能够将音讯长久化到磁盘,这样只有Message反对化到磁盘之后才会发送告诉Producer ack

这两步过后,即便Broker挂了,Producer必定收不到ack的,就能够进行重发。

消费者弄丢数据:

Consumer有生产到Message,然而外部呈现问题,Message还没解决,Broker认为Consumer解决完了,只会把后续的音讯发送。这时候,就要 敞开autoack,音讯解决过后,进行手动ack , 屡次生产失败的音讯,会进入 死信队列 ,这时候须要人工干预。

Kafka

生产者弄丢数据

设置了 acks=all ,肯定不会丢,要求是,你的 leader 接管到音讯,所有的 follower 都同步到了音讯之后,才认为本次写胜利了。如果没满足这个条件,生产者会主动一直的重试,重试有限次。

Broker弄丢数据

Kafka 某个 broker 宕机,而后从新选举 partition 的 leader。大家想想,要是此时其余的 follower 刚好还有些数据没有同步,后果此时 leader 挂了,而后选举某个 follower 成 leader 之后,不就少了一些数据?这就丢了一些数据啊。

此时个别是要求起码设置如下 4 个参数:

replication.factormin.insync.replicasacks=allretries=MAX

咱们生产环境就是依照上述要求配置的,这样配置之后,至多在 Kafka broker 端就能够保障在 leader 所在 broker 产生故障,进行 leader 切换时,数据不会失落。

消费者弄丢数据

你生产到了这个音讯,而后消费者那边主动提交了 offset,让 Kafka 认为你曾经生产好了这个音讯,但其实你才刚筹备解决这个音讯,你还没解决,你本人就挂了,此时这条音讯就丢咯。

这不是跟 RabbitMQ 差不多吗,大家都晓得 Kafka 会主动提交 offset,那么只有 敞开主动提交 offset,在解决完之后本人手动提交 offset,就能够保证数据不会丢。然而此时的确还是可能会有反复生产,比方你刚解决完,还没提交 offset,后果本人挂了,此时必定会反复生产一次,本人保障幂等性就好了。

如何保障音讯的幂等?

以 RocketMQ 为例,上面列出了音讯反复的场景:

发送时音讯反复

当一条音讯已被胜利发送到服务端并实现长久化,此时呈现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。如果此时生产者意识到音讯发送失败并尝试再次发送音讯,消费者后续会收到两条内容雷同并且Message ID也雷同的音讯。

投递时音讯反复

音讯生产的场景下,音讯已投递到消费者并实现业务解决,当客户端给服务端反馈应答的时候网络闪断。为了保障音讯至多被生产一次,音讯队列RocketMQ版的服务端将在网络复原后再次尝试投递之前已被解决过的音讯,消费者后续会收到两条内容雷同并且Message ID也雷同的音讯。

负载平衡时音讯反复(包含但不限于网络抖动、Broker重启以及消费者利用重启)

当音讯队列RocketMQ版的Broker或客户端重启、扩容或缩容时,会触发Rebalance,此时消费者可能会收到反复音讯。

那么,有什么解决方案呢?间接上图。

如何解决音讯积压的问题?

对于这个问题,有几个点须要思考:

如何疾速让积压的音讯被生产掉?

长期写一个音讯散发的消费者,把积压队列里的音讯平均散发到N个队列中,同时一个队列对应一个消费者,相当于生产速度进步了N倍。

批改前:

批改后:

积压工夫太久,导致局部音讯过期,怎么解决?

批量重导。在业务不忙碌的时候,比方凌晨,提前准备好程序,把失落的那批音讯查出来,从新导入到MQ中。

音讯大量积压,MQ磁盘被写满了,导致新音讯进不来了,丢掉了大量音讯,怎么解决?

这个没方法。谁让【音讯散发的消费者】写的太慢了,你长期写程序,接入数据来生产,生产一个抛弃一个,都不要了,疾速生产掉所有的音讯。而后走第二个计划,到了早晨再补数据吧。

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理