引言
在探索 Kafka 外围常识之前,咱们先思考一个问题:什么场景会促使咱们应用 Kafka? 说到这里,咱们头脑中或多或少会蹦出 异步解耦 和削峰填谷 等字样,是的,这就是 Kafka 最重要的落地场景。
- 异步解耦:同步调用转换成异步音讯告诉,实现生产者和消费者的解耦。设想一个场景,在商品交易时,在订单创立实现之后,须要触发一系列其余的操作,比方进行用户订单数据的统计、给用户发送短信、给用户发送邮件等等。如果所有操作都采纳同步形式实现,将重大影响零碎性能。针对此场景,咱们能够利用消息中间件解耦订单创立操作和其余后续行为。
- 削峰填谷:利用 Broker 缓冲上游生产者刹时突发的流量,使消费者生产流量整体平滑。对于发送能力很强的上游零碎,如果没有消息中间件的爱护,上游零碎可能会间接被压垮导致全链路服务雪崩。设想秒杀业务场景,上游业务发动下单申请,上游业务执行秒杀业务(库存查看,库存解冻,余额解冻,生成订单等等),上游业务解决的逻辑是相当简单的,并发能力无限,如果上游服务不做限流策略,刹时可能把上游服务压垮。针对此场景,咱们能够利用 MQ 来做削峰填谷,让顶峰流量填充低谷闲暇资源,达到系统资源的正当利用。
通过上述例子能够发现交易、领取等场景常须要 异步解耦 和削峰填谷 性能解决问题,而交易、领取等场景对性能、可靠性要求特地高。那么,咱们本文的配角 Kafka 是否满足相应要求呢?上面咱们来探讨下。
Kafka 宏观认知
在探索 Kafka 的高性能、高可靠性之前,咱们从宏观上来看下 Kafka 的零碎架构
如上图所示,Kafka 由 Producer、Broker、Consumer 以及负责集群治理的 ZooKeeper 组成,各局部性能如下:
- Producer:生产者,负责音讯的创立并通过肯定的路由策略发送音讯到适合的 Broker;
- Broker:服务实例,负责音讯的长久化、直达等性能;
- Consumer:消费者,负责从 Broker 中拉取(Pull)订阅的音讯并进行生产,通常多个消费者形成一个分组,音讯只能被同组中的一个消费者生产;
- ZooKeeper:负责 Broker、Consumer 集群元数据的治理等;(留神:Producer 端间接连贯 Broker,不在 ZK 上存任何数据,只是通过 ZK 监听 Broker 和 Topic 等信息)
上图音讯流转过程中,还有几个特地重要的概念—主题(Topic)、分区(Partition)、分段(Segment)、位移(Offset)。
- topic:音讯主题。Kafka 按 Topic 对音讯进行分类,咱们在收发音讯时只需指定 Topic。
- partition:分区。为了晋升零碎的吞吐,一个 Topic 下通常有多个 Partition,Partition 散布在不同的 Broker 上,用于存储 Topic 的音讯,这使 Kafka 能够在多台机器上解决、存储音讯,给 Kafka 提供给了并行的音讯解决能力和横向扩容能力。另外,为了晋升零碎的可靠性,Partition 通常会分组,且每组有一个主 Partition、多个正本 Partition,且散布在不同的 Broker 上,从而起到 容灾 的作用。
- Segment:分段。宏观上看,一个 Partition 对应一个日志(Log)。因为生产者生产的音讯会一直 追加到 Log 文件开端 ,为避免 Log 文件过大导致数据检索效率低下,Kafka 采取了分段和索引机制,将每个 Partition 分为多个 Segment,同时也便于音讯的保护和清理。每个 Segment 蕴含一个.log 日志文件、两个索引(.index、timeindex) 文件以及其余可能的文件。每个 Segment 的数据文件以该段中最小的 Offset 为文件名,当查找 Offset 的 Message 的时候,通过二分查找快找到 Message 所处于的 Segment 中。
- Offset:音讯在日志中的地位,音讯在被追加到分区日志文件的时候都会调配一个特定的偏移量。Offset 是音讯在分区中的惟一标识,是一个枯燥递增且不变的值。Kafka 通过它来保障音讯在分区内的程序性,不过 Offset 并不逾越分区,也就是说,Kafka 保障的是分区有序而不是主题有序。
Kafka 高可靠性、高性能探索
在对 Kafka 的整体零碎框架及相干概念简略理解后,上面咱们来进一步深入探讨下高可靠性、高性能实现原理。
Kafka 高可靠性探索
Kafka 高可靠性的外围是保障音讯在传递过程中不失落,波及如下外围环节:
- 音讯从生产者牢靠地发送至 Broker;– 网络、本地丢数据
- 发送到 Broker 的音讯牢靠长久化;— Pagecache 缓存落盘、单点解体、主从同步跨网络
- 消费者从 Broker 生产到音讯且最好只生产一次 — 跨网络音讯传输
音讯从生产者牢靠地发送至 Broker
为了保障音讯从生产者牢靠地发送至 Broker,咱们须要确保两点
1. Producer 发送音讯后,可能收到来自 Broker 的音讯保留胜利 Ack;
2. Producer 发送音讯后,可能捕捉超时、失败 Ack 等异样 Ack 并做解决;
Ack 策略
针对问题 1,Kafka 为咱们提供了三种 Ack 策略,
- Request.required.acks = 0:申请发送即认为胜利,不关怀有没有写胜利,罕用于日志进行剖析场景。
- Request.required.acks = 1:当 Leader partition 写入胜利当前,才算写入胜利,有丢数据的可能。
- Request.required.acks= -1:ISR 列表外面的所有正本都写完当前,这条音讯才算写入胜利,强可靠性保障。
为了实现强牢靠的 Kafka 零碎,咱们须要设置 Request.required.acks= -1,同时还会设置集群中处于失常同步状态的正本 Follower 数量 min.insync.replicas>2,另外,设置 unclean.leader.election.enable=false 使得集群中 ISR 的 Follower 才可变成新的 Leader,防止非凡状况下音讯截断的呈现。
音讯发送策略
针对问题 2,Kafka 提供两类音讯发送形式:同步(Sync)发送和异步(Async)发送,相干参数如下:
| 参数 | 含意 | 默认值 | 推荐值 |
| —— | —— | —— | —— |
|async | 是否进行异步生产,可选值:
0:同步生产,默认值
1:异步生产 |0 | 个别状况下应用 0,如果应用异步生产,须要通过 channel 捕获音讯生产失败的状况,并进行异步修复解决,逻辑会绝对简单。
对于延时比拟敏感,防止生产音讯导致耗时过高的场景能够思考异步。|
以 Sarama 实现为例,在音讯发送的过程中,无论是同步发送还是异步发送都会波及到两个协程 – 负责音讯发送的主协程和负责音讯散发的 Dispatcher 协程。
异步发送
对于异步发送 (Ack != 0 场景,等于 0 时不关怀写 Kafka 后果,后文具体解说) 而言,其流程大略如下
1. 在 主协程 中调用异步发送 Kafka 音讯的时候,其本质是将音讯体放进了一个 Input 的 Channel,只有入 Channel 胜利,则这个函数间接返回,不会产生任何阻塞。相同,如果入 Channel 失败,则会返回错误信息。因而调用 Async 写入的时候返回的错误信息是入 Channel 的错误信息,至于具体最终音讯有没有发送到 Kafka 的 Broker,咱们无奈从返回值得悉。
2. 当音讯进入 Input 的 Channel 后,会有另一个 Dispatcher 的协程 负责遍历 Input,来真正发送音讯到特定 Broker 上的主 Partition 上。发送后果通过一个 异步协程 进行监听,循环解决 Err Channel 和 Success Channel,呈现了 Error 就记一个日志。因而异步写入场景时,写 Kafka 的错误信息,咱们临时仅可能从这个谬误日志来得悉具体产生了什么错,并且也不反对咱们自建函数进行兜底解决,这一点在 trpc-go 的官网也失去了抵赖。
同步发送
同步发送 (Ack != 0 场景) 是在异步发送的根底上加以条件限度实现的。同步音讯发送在 newSyncProducerFromAsyncProducer 中开启两个 异步协程 解决音讯胜利与失败的“回调”,并应用 WaitGroup 进行期待,从而将异步操作转变为同步操作。其流程大略如下
通过上述剖析能够发现,Kafka 音讯发送实质上都是异步的,不过同步发送通过 WaitGroup 将异步操作转变为同步操作。同步发送在肯定水平上确保了咱们在跨网络向 Broker 传输音讯时,音讯肯定能够牢靠地传输到 Broker。因为在同步发送场景咱们能够明确感知音讯是否发送至 Broker,若因网络抖动、机器宕机等故障导致音讯发送失败或后果不明,可通过重试等伎俩确保音讯 至多一次(at least once) 发送到 Broker。另外,Kafka(0.11.0.0 版本后)还为 Producer 提供两种机制来实现 准确一次(exactly once) 音讯发送:幂等性(Idempotence)和事务(Transaction)。
| 类别 | 开启 | 特色 | 实现原理 | 注意事项 |
| :— | :— | :—|:—|:—|
| 幂等性 Producer | enable.idempotence=true | 1. 单分区幂等性:只能保障单分区上幂等性,无奈实现多个分区的幂等。2. 单会话幂等性:只能实现单会话上的冥等性,当 Producer 重启后,这种幂等性保障就生效了。| 为每个 Producer 设置惟一的 PID,Broker 发送音讯时会带上 PID 并为每个音讯生产一个 Seq number,Broker 端依据 PID 和 Seq number 去重 |Broker 端不会反复写入同一 PID 的 Producer 发送的雷同的音讯,底层日志中只会长久化一次。|
| 事务型 Producer |enable.idempotence=true,生产者设置 Transcational.id;若要实现 consumer-transform-producer,还需设置 Consumer 端参数 Isolation.level=read_committed| 1. 跨分区事务:可能保障将音讯原子性地写入到多个分区中。2. 跨会话的事务复原:如果一个利用实例挂了,启动的下一个实例仍然能够保障上一个事务实现(Commit 或者 Abort)。|1. Kafka 引入了一个协调者组件 TransactionCoordinator 来治理 Transaction。Producer 在开始一个事务时,通知协调者事务开始,而后开始向多个 Topic-Partition 写数据,只有这批数据全副写完(两头没有出现异常),Producer 会调用 Commit 接口进行 Commit,而后事务真正提交,否则如果两头出现异常,那么事务将会被 Abort(Producer 通过 Abort 接口通知协调者执行 Abort 操作)。2. 引入一个全局惟一的 Transaction ID,并将 Producer 取得的 PID 和 Transaction ID 绑定。这样当 Producer 重启后就能够通过正在进行的 Transaction ID 取得原来的 PID。3. TransactionCoordinator 还负责将事务状态写入 Kafka 的一个外部 Topic:__transaction_state 中,这样即便整个服务重启,因为事务状态失去保留,进行中的事务状态能够失去复原,从而持续进行。|1. 开启事务对性能影响很大,在应用时要充分考虑。2. 当具备雷同 Transaction ID 的新的 Producer 实例被创立且工作时,旧的且领有雷同 Transaction ID 的 Producer 将不再工作。|
小结
通过 Ack 策略配置、同步发送、事务音讯组合能力,咱们能够实现 exactly once 语意 跨网络向 Broker 传输音讯。然而,Producer 收到 Broker 的胜利 Ack,音讯肯定不会失落吗?为了搞清这个问题,咱们首先要搞明确 Broker 在接管到音讯后做了哪些解决。
发送到 Broker 的音讯牢靠长久化
为了确保 Producer 收到 Broker 的胜利 Ack 后,音讯肯定不在 Broker 环节失落,咱们外围要关注以下几点:
– Broker 返回 Producer 胜利 Ack 时,音讯是否曾经落盘;
– Broker 宕机是否会导致数据失落,容灾机制是什么;
– Replica 正本机制带来的多正本间数据同步一致性问题如何解决;
Broker 异步刷盘机制
Kafka 为了取得更高吞吐,Broker 接管到音讯后只是将数据写入 PageCache 后便认为音讯已写入胜利,而 PageCache 中的数据通过 Linux 的 Flusher 程序进行异步刷盘(刷盘触发条:被动调用 Sync 或 Fsync 函数、可用内存低于阀值、Dirty Data 工夫达到阀值),将数据程序写到磁盘。音讯解决示意图如下:
因为音讯是写入到 PageCache,单机场景,如果还没刷盘 Broker 就宕机了,那么 Producer 产生的这部分数据就可能失落。为了解决单机故障可能带来的数据失落问题,Kafka 为分区引入了正本机制。
Replica 正本机制
Kafka 每组分区通常有多个正本,同组分区的不同正本散布在不同的 Broker 上,保留雷同的音讯(可能有滞后)。正本之间是“一主多从”的关系,其中 Leader 正本负责解决读写申请,Follower 正本负责从 Leader 拉取音讯进行同步。分区的所有正本统称为 AR(Assigned Replicas),其中所有与 Leader 正本放弃肯定同步的正本(包含 Leader 正本在内)组成 ISR(In-Sync Replicas),与 Leader 同步滞后过多的正本组成 OSR(Out-of-Sync Replicas),由此可见,AR=ISR+OSR。Follower 正本是否与 Leader 同步的判断规范取决于 Broker 端参数 replica.lag.time.max.ms(默认为 10 秒),Follower 默认每隔 500ms 向 Leader Fetch 一次数据,只有一个 Follower 正本落后 Leader 正本的工夫不间断超过 10 秒,那么 Kafka 就认为该 Follower 正本与 Leader 是同步的。在失常状况下,所有的 Follower 正本都应该与 Leader 正本放弃肯定水平的同步,即 AR=ISR,OSR 汇合为空。
当 Leader 正本所在 Broker 宕机时,Kafka 会借助 ZK 从 Follower 正本中选举新的 Leader 持续对外提供服务,实现故障的主动转移,保障服务可用。为了使选举的新 Leader 和旧 Leader 数据尽可能统一,当 Leader 正本产生故障时,默认状况下只有在 ISR 汇合中的正本才有资格被选举为新的 Leader,而在 OSR 汇合中的正本则没有任何机会(可通过设置 unclean.leader.election.enable 扭转)。
当 Kafka 通过多正本机制解决单机故障问题时,同时也带来了多正本间数据同步一致性问题。Kafka 通过高水位更新机制、正本同步机制、Leader Epoch 等多种措施解决了多正本间数据同步一致性问题,上面咱们来顺次看下这几大措施。
HW 和 LEO
首先,咱们来看下两个和 Kafka 中日志相干的重要概念 HW 和 LEO:
- HW: High Watermark,高水位,示意曾经提交 (Commit) 的最大日志偏移量,Kafka 中某条日志“已提交”的意思是 ISR 中所有节点都蕴含了此条日志,并且消费者只能生产 HW 之前的数据;
- LEO: Log End Offset,示意以后 Log 文件中下一条待写入音讯的 Offset;
如上图所示,它代表一个日志文件,这个日志文件中有 8 条音讯,0 至 5 之间的音讯为已提交音讯,5 至 7 的音讯为未提交音讯。日志文件的 HW 为 6,示意消费者只能拉取到 5 之前的音讯,而 Offset 为 5 的音讯对消费者而言是不可见的。日志文件的 LEO 为 8,下一条音讯将在此处写入。
留神:所有正本都有对应的 HW 和 LEO,只不过 Leader 正本比拟非凡,Kafka 应用 Leader 正本的高水位来定义所在分区的高水位。换句话说,分区的高水位就是其 Leader 正本的高水位。Leader 正本和 Follower 正本的 HW 有如下特点:
- Leader HW:min(所有正本 LEO),为此 Leader 正本不仅要保留本人的 HW 和 LEO,还要保留 Follower 正本的 HW 和 LEO,而 Follower 正本只需保留本人的 HW 和 LEO;
- Follower HW:min(Follower 本身 LEO,Leader HW)。
留神:为不便形容,上面 Leader HW 简记为 HW L,Follower HW 简记为 HW F,Leader LEO 简记为 LEOL,Follower LEO 简记为 LEO F。
上面咱们演示一次残缺的 HW / LEO 更新流程:
1. 初始状态
HW L=0,LEO L=0,HW F=0,LEO F=0。
2. Follower 第一次 Fetch
- Leader 收到 Producer 发来的一条音讯实现存储, 更新 LEO L=1;
- Follower 从 Leader Fetch 数据, Leader 收到申请,记录 Follower 的 LEO F =0,并且尝试更新 HW L =min(全副正本 LEO)=0;
- Leader 返回 HW L=0 和 LEO L=1 给 Follower,Follower 存储音讯并更新 LEO F =1, HW=min(LEO F,HW L)=0。
3. Follower 第二次 Fetch
- Follower 再次从 Leader Fetch 数据, Leader 收到申请,记录 Follower 的 LEO F =1,并且尝试更新 HW L =min(全副正本 LEO)=1;
- leader 返回 HW L= 1 和 LEO L=1 给 Follower,Leader 收到申请,更新本人的 HW=min(LEO F,HW L)=1。
上述更新流程中 Follower 和 Leader 的 HW 更新有工夫 GAP。如果 Leader 节点在此期间产生故障,则 Follower 的 HW 和 Leader 的 HW 可能会处于不统一状态,如果 Follower 被选为新的 Leader 并且以本人的 HW 为准对外提供服务,则可能带来数据失落或数据错乱问题。
KIP-101 问题:数据失落 & 数据错乱
数据失落
第 1 步:
1. 正本 B 作为 Leader 收到 Producer 的 m2 音讯并写入本地文件,期待正本 A 拉取。
2. 正本 A 发动音讯拉取申请,申请中携带本人的最新的日志 Offset(LEO=1),B 收到后更新本人的 HW 为 1,并将 HW=1 的信息以及音讯 m2 返回给 A。
3. A 收到拉取后果后更新本地的 HW 为 1,并将 m2 写入本地文件。发动新一轮拉取申请(LEO=2),B 收到 A 拉取申请后更新本人的 HW 为 2,没有新数据只将 HW=2 的信息返回给 A,并且回复给 Producer 写入胜利。此处的状态就是图中第一步的状态。
第 2 步:
此时,如果没有异样,A 会收到 B 的回复,得悉目前的 HW 为 2,而后更新本身的 HW 为 2。但在此时 A 重启了,没有来得及收到 B 的回复,此时 B 依然是 Leader。A 重启之后会以 HW 为规范截断本人的日志,因为 A 作为 Follower 不晓得多出的日志是否是被提交过的,避免数据不统一从而截断多余的数据并尝试从 Leader 那里从新同步
第 3 步:
B 解体了,min.isr 设置的是 1,所以 Zookeeper 会从 ISR 中再抉择一个作为 Leader,也就是 A,然而 A 的数据不是残缺的,从而呈现了数据失落景象。
问题在哪里?在于 A 重启之后以 HW 为规范截断了多余的日志。不截断行不行?不行,因为这个日志可能没被提交过(也就是没有被 ISR 中的所有节点写入过),如果保留会导致日志错乱。
数据错乱
在剖析日志错乱的问题之前,咱们须要理解到 Kafka 的正本可靠性保障有一个前提:在 ISR 中至多有一个节点。如果节点均宕机的状况下,是不保障可靠性的,在这种状况会呈现数据失落,数据失落是可承受的。这里咱们剖析的问题比数据失落更加槽糕,会引发日志错乱甚至导致整个零碎异样,而这是不可承受的。
第 1 步:
1. A 和 B 均为 ISR 中的节点。正本 A 作为 Leader,收到 Producer 的音讯 m2 的申请后写入 PageCache 并在某个时刻刷新到本地磁盘。
2. 正本 B 拉取到 m2 后写入 PageCage 后(尚未刷盘) 再次去 A 中拉取新音讯并告知 A 本人的 LEO=2,A 收到更新本人的 HW 为 1 并回复给 Producer 胜利。
3. 此时 A 和 B 同时宕机,B 的 m2 因为尚未刷盘,所以 m2 音讯失落。此时的状态就是第 1 步的状态。
第 2 步:
因为 A 和 B 均宕机,而 min.isr=1 并且 unclean.leader.election.enable=true(敞开 Unclean 抉择策略),所以 Kafka 会等到第一个 ISR 中的节点复原并选为 Leader,这里可怜的是 B 被选为 Leader,而且还接管到 Producer 发来的新音讯 m3。留神,这里失落 m2 音讯是可承受的,毕竟所有节点都宕机了。
第 3 步:
A 复原重启后发现自己是 Follower,而且 HW 为 2,并没有多余的数据须要截断,所以开始和 B 进行新一轮的同步。但此时 A 和 B 均没有意识到,Offset 为 1 的音讯不统一了。
问题在哪里?在于日志的写入是异步的,下面也提到 Kafka 的正本策略的一个设计是音讯的长久化是异步的,这就会导致在场景二的状况下被选出的 Leader 不肯定蕴含所有数据,从而引发日志错乱的问题。
Leader Epoch
为了解决上述缺点,Kafka 引入了 Leader Epoch 的概念。Leader Epoch 和 Raft 中的任期号的概念很相似,每次从新抉择 Leader 的时候,用一个严格枯燥递增的 ID 来标记,能够让所有 Follower 意识到 Leader 的变动。而 Follower 也不再以 HW 为准,每次奔溃重启后都须要去 Leader 那边确认下以后 Leader 的日志是从哪个 Offset 开始的。上面看下 Leader Epoch 是如何解决下面两个问题的。
数据失落解决
这里的关键点在于正本 A 重启后作为 Follower,不是忙着以 HW 为准截断本人的日志,而是先发动 LeaderEpochRequest 询问正本 B 第 0 代的最新的偏移量是多少,正本 B 会返回本人的 LEO 为 2 给正本 A,A 此时就晓得音讯 m2 不能被截断,所以 m2 失去了保留。当 A 选为 Leader 的时候就保留了所有已提交的日志,日志失落的问题失去解决。
如果发动 LeaderEpochRequest 的时候就曾经挂了怎么办?这种场景下,不会呈现日志失落,因为正本 A 被选为 Leader 后不会截断本人的日志,日志截断只会产生在 Follower 身上。
数据错乱解决
这里的关键点还是在第 3 步,正本 A 重启作为 Follower 的第一步还是须要发动 LeaderEpochRequest 询问 Leader 以后第 0 代最新的偏移量是多少,因为正本 B 曾经通过换代,所以会返回给 A 第 1 代的起始偏移(也就是 1),A 发现抵触后会截断本人偏移量为 1 的日志,并从新开始和 Leader 同步。正本 A 和正本 B 的日志达到了统一,解决了日志错乱。
小结
Broker 接管到音讯后只是将数据写入 PageCache 后便认为音讯已写入胜利,然而,通过正本机制并联合 ACK 策略能够大概率躲避单机宕机带来的数据失落问题,并通过 HW、正本同步机制、Leader Epoch 等多种措施解决了多正本间数据同步一致性问题,最终实现了 Broker 数据的牢靠长久化。
消费者从 Broker 生产到音讯且最好只生产一次
Consumer 在生产音讯的过程中须要向 Kafka 汇报本人的位移数据,只有当 Consumer 向 Kafka 汇报了音讯位移,该条音讯才会被 Broker 认为曾经被生产。因而,Consumer 端音讯的可靠性次要和 Offset 提交形式无关,Kafka 生产端提供了两种音讯提交形式:
| 提交形式 | 参数设置 | 性能特点 | 注意事项 |
| :— | :— | :—| :—|
| 主动提交 | enable.auto.commit = true,auto.commit.interval.ms(提交距离)| Kafka Consumer 在后盾默默地为你提交位移,开发者无需在代码中显示提交 | Consumer 收到音讯就返回正确给 Brocker,如果业务逻辑没有走完中断了,即音讯没有生产胜利,则相干 音讯可能失落 。这种场景实用于可靠性要求不高的业务。|
| 手动提交 | enable.auto.commit = false | 开发者须要在代码中本人提交位移,Kafka Consumer 压根不论 | 该模式下,业务开发者须要在音讯被解决实现后进行手动提交。如果音讯解决实现后还没来得及提价位移,零碎产生重启,则之前生产到未提交的音讯会从新生产到,即 音讯会投递屡次 。因而, 生产侧须要做幂等保障。|
失常状况下咱们很难实现 exactly once 语意的音讯,通常是通过手动提交 + 幂等实现音讯的牢靠生产。
Kafka 高性能探索
Kafka 高性能的外围是保障系统低提早、高吞吐地解决音讯,为此,Kafaka 采纳了许多精妙的设计:
- 异步发送
- 批量发送
- 压缩技术
- Pagecach 机制 & 程序追加落盘
- 零拷贝
- 稠密索引
- Broker & 数据分区
- 多 Reactor 多线程网络模型
异步发送
如上文所述,Kafka 提供了异步和同步两种音讯发送形式。在异步发送中,整个流程都是异步的。调用异步发送办法后,音讯会被写入 Channel,而后立刻返回胜利。Dispatcher 协程会从 Channel 轮询音讯,将其发送到 Broker,同时会有另一个异步协程负责解决 Broker 返回的后果。同步发送实质上也是异步的,然而在处理结果时,同步发送通过 WaitGroup 将异步操作转换为同步。应用异步发送能够最大化进步音讯发送的吞吐能力。
批量发送
Kafka 反对批量发送音讯,将多个音讯打包成一个批次进行发送,从而缩小网络传输的开销,进步网络传输的效率和吞吐量。
Kafka 的批量发送音讯是通过以下两个参数来管制的:
1. Batch.size:管制批量发送音讯的大小,默认值为 16KB,可适当减少 Batch.size 参数值晋升吞吐。然而,须要留神的是,如果批量发送的大小设置得过大,可能会导致音讯发送的提早减少,因而须要依据理论状况进行调整。
2. linger.ms:管制音讯在批量发送前的等待时间,默认值为 0。当 Linger.ms 大于 0 时,如果有音讯发送,Kafka 会期待指定的工夫,如果等待时间达到或者批量大小达到 Batch.size,就会将音讯打包成一个批次进行发送。可适当减少 Linger.ms 参数值晋升吞吐,比方 10~100。
在 Kafka 的生产者客户端中,当发送音讯时,如果启用了批量发送,Kafka 会将音讯缓存到缓冲区中。当缓冲区中的音讯大小达到 Batch.size 或者等待时间达到 Linger.ms 时,Kafka 会将缓冲区中的音讯打包成一个批次进行发送。如果在等待时间内没有达到 Batch.size,Kafka 也会将缓冲区中的音讯发送进来,从而防止音讯积压。
压缩技术
Kafka 反对压缩技术,通过将音讯进行压缩后再进行传输,从而缩小网络传输的开销(压缩和解压缩的过程会耗费肯定的 CPU 资源,因而须要依据理论状况进行调整。),进步网络传输的效率和吞吐量。Kafka 反对多种压缩算法,在 Kafka2.1.0 版本之前,仅反对 GZIP,Snappy 和 LZ4,2.1.0 后还反对 Zstandard 算法(Facebook 开源,可能提供超高压缩比)。这些压缩算法性能比照(两指标都是越高越好)如下:
- 吞吐量:LZ4>Snappy>zstd 和 GZIP,压缩比:zstd>LZ4>GZIP>Snappy。
在 Kafka 中,压缩技术是通过以下两个参数来管制的:
1. Compression.type:管制压缩算法的类型,默认值为 None,示意不进行压缩。
2. Compression.level:管制压缩的级别,取值范畴为 0 -9,默认值为 -1。当值为 - 1 时,示意应用默认的压缩级别。
在 Kafka 的生产者客户端中,当发送音讯时,如果启用了压缩技术,Kafka 会将音讯进行压缩后再进行传输。在消费者客户端中,如果音讯进行了压缩,Kafka 会在生产音讯时将其解压缩。留神:Broker 如果设置了和生产者不通的压缩算法,接管音讯后会解压后从新压缩保留。Broker 如果存在音讯版本兼容也会触发解压后再压缩。
Pagecache 机制 & 程序追加落盘
Kafka 为了晋升零碎吞吐、升高时延,Broker 接管到音讯后只是将数据写入 PageCache 后便认为音讯已写入胜利,而 PageCache 中的数据通过 Linux 的 Flusher 程序进行异步刷盘(防止了同步刷盘的微小零碎开销),将数据 程序追加写 到磁盘日志文件中。因为 Pagecache 是在内存中进行缓存,因而读写速度十分快,能够大大提高读写效率。程序追加写充分利用程序 I/O 写操作,防止了迟缓的随机 I/O 操作,可无效晋升 Kafka 吞吐。
如上图所示,音讯被程序追加到每个分区日志文件的尾部。
零拷贝
Kafka 中存在大量的网络数据长久化到磁盘(Producer 到 Broker)和磁盘文件通过网络发送(Broker 到 Consumer)的过程,这一过程的性能间接影响 Kafka 的整体吞吐量。传统的 IO 操作存在屡次数据拷贝和上下文切换,性能比拟低。Kafka 利用零拷贝技术晋升上述过程性能,其中网络数据长久化磁盘次要用 mmap 技术,网络数据传输环节次要应用 Sendfile 技术。
网络数据长久化磁盘之 mmap
传统模式下,数据从网络传输到文件须要 4 次数据拷贝、4 次上下文切换和两次零碎调用。如下图所示
为了缩小上下文切换以及数据拷贝带来的性能开销,Broker 在对 Producer 传来的网络数据进行长久化时应用了 mmap 技术。通过这种技术手段,Broker 读取到 Socket Buffer 的网络数据,能够间接在内核空间实现落盘,没有必要将 Socket Buffer 的网络数据读取到利用过程缓冲区。
网络数据传输之 Sendfile
传统形式实现:先读取磁盘、再用 Socket 发送,理论也是进过四次 Copy。如下图所示:
为了缩小上下文切换以及数据拷贝带来的性能开销,Kafka 在 Consumer 从 Broker 读数据过程中应用了 Sendfile 技术。具体在这里采纳的计划是通过 NIO 的 transferTo/transferFrom
调用操作系统的 Sendfile 实现零拷贝。总共产生 2 次内核数据拷贝、2 次上下文切换和一次零碎调用,打消了 CPU 数据拷贝,如下:
稠密索引
为了不便对日志进行检索和过期清理,Kafka 日志文件除了有用于存储日志的.log 文件,还有一个 位移索引文件.index 和一个 工夫戳索引文件.timeindex 文件,并且三文件的名字完全相同,如下
Kafka 的索引文件是依照稠密索引的思维进行设计的。稠密索引的外围是不会为每个记录都保留索引,而是写入肯定的记录之后才会减少一个索引值,具体这个距离有多大则通过 log.index.interval.bytes 参数进行管制,默认大小为 4 KB,意味着 Kafka 至多写入 4KB 音讯数据之后,才会在索引文件中减少一个索引项。可见,单条音讯大小会影响 Kakfa 索引的插入频率,因而 log.index.interval.bytes 也是 Kafka 调优一个重要参数值。因为索引文件也是依照音讯的程序性进行减少索引项的,因而 Kafka 能够利用二分查找算法来搜寻指标索引项,把工夫复杂度降到了 O(lgN),大大减少了查找的工夫。
位移索引文件.index
位移索引文件的索引项构造如下:
绝对位移 :保留于索引文件名字下面的起始位移的差值,假如一个索引文件为:00000000000000000100.index,那么起始位移值即 100,当存储位移为 150 的音讯索引时,在索引文件中的绝对位移则为 150 – 100 = 50,这么做的益处是应用 4 字节保留位移即可,可 以节俭十分多的磁盘空间。
文件物理地位 :音讯在 log 文件中保留的地位,也就是说 Kafka 可依据音讯位移,通过位移索引文件疾速找到音讯在 Log 文件中的物理地位,有了该物理地位的值,咱们就能够疾速地从 log 文件中找到对应的音讯了。
上面我用图来示意 Kafka 是如何疾速检索音讯:
假如 Kafka 须要找出位移为 3550 的音讯,那么 Kafka 首先会应用二分查找算法找到小于 3550 的最大索引项:[3528, 2310272],失去索引项之后,Kafka 会依据该索引项的文件物理地位在 Log 文件中从地位 2310272 开始程序查找,直至找到位移为 3550 的音讯记录为止。
工夫戳索引文件.timeindex
Kafka 在 0.10.0.0 当前的版本当中,音讯中减少了工夫戳信息,为了满足用户须要依据工夫戳查问音讯记录,Kafka 减少了工夫戳索引文件,工夫戳索引文件的索引项构造如下:
工夫戳索引文件的检索与位移索引文件相似,如下疾速检索音讯示意图:
Broker & 数据分区
Kafka 集群蕴含多个 Broker。一个 Topic 下通常有多个 Partition,Partition 散布在不同的 Broker 上,用于存储 Topic 的音讯,这使 Kafka 能够在多台机器上解决、存储音讯,给 Kafka 提供给了并行的音讯解决能力和横向扩容能力。
多 Reactor 多线程网络模型
多 Reactor 多线程网络模型 是一种高效的网络通信模型,能够充分利用多核 CPU 的性能,进步零碎的吞吐量和响应速度。Kafka 为了晋升零碎的吞吐,在 Broker 端解决音讯时采纳了该模型,示意如下:
SocketServer 和 KafkaRequestHandlerPool 是其中最重要的两个组件:
- SocketServer:实现 Reactor 模式,用于解决多个 Client(包含客户端和其余 Broker 节点)的并发申请,并将处理结果返回给 Client。
- KafkaRequestHandlerPool:Reactor 模式中的 Worker 线程池,外面定义了多个工作线程,用于解决理论的 I/O 申请逻辑。
整个服务端解决申请的流程大抵分为以下几个步骤:
1. Acceptor 接管客户端发来的申请
2. 轮询分发给 Processor 线程解决
3. Processor 将申请封装成 Request 对象,放到 RequestQueue 队列
4. KafkaRequestHandlerPool 调配工作线程,解决 RequestQueue 中的申请
5. KafkaRequestHandler 线程解决完申请后,将响应 Response 返回给 Processor 线程
6. Processor 线程将响应返回给客户端
其余常识探索
负载平衡
生产者负载平衡
Kafka 生产端的负载平衡次要指如何将音讯发送到适合的分区。Kafka 生产者生产音讯时,依据分区器将音讯投递到指定的分区中,所以 Kafka 的负载平衡很大水平上依赖于分区器。Kafka 默认的分区器是 Kafka 提供的 DefaultPartitioner。它的分区策略是依据 Key 值进行分区调配的:
- 如果 key 不为 null:对 Key 值进行 Hash 计算,从 所有分区 中依据 Key 的 Hash 值计算出一个分区号;领有雷同 Key 值的音讯被写入同一个分区,程序音讯实现的要害;
- 如果 key 为 null:音讯将以 轮询 的形式,在 所有可用分区 中别离写入音讯。
- 如果不想应用 Kafka 默认的分区器,用户能够实现 Partitioner 接口,自行实现分区办法。
消费者负载平衡
在 Kafka 中,每个分区(Partition)只能由一个消费者组中的一个消费者生产。当消费者组中有多个消费者时,Kafka 会主动进行负载平衡,将分区平均地调配给每个消费者。在 Kafka 中,消费者负载平衡算法能够通过设置消费者组的 Partition.assignment.strategy 参数来抉择。目前支流的分区调配策略以下几种:
- Range: 在保障平衡的前提下,将间断的分区调配给消费者,对应的实现是 RangeAssignor;
- Round-robin:在保障平衡的前提下,轮询调配,对应的实现是 RoundRobinAssignor;
-
0.11.0.0 版本引入了一种新的分区调配策略 StickyAssignor,其劣势在于可能保障分区平衡的前提下尽量放弃原有的分区调配后果,从而防止许多冗余的分区调配操作,缩小分区再调配的执行工夫。
集群治理
Kafka 借助 ZooKeeper 进行集群治理。Kafka 中很多信息都在 ZK 中保护,如 Broker 集群信息、Consumer 集群信息、Topic 相干信息、Partition 信息等。Kafka 的很多性能也是基于 ZK 实现的,如 Partition 选主、Broker 集群治理、Consumer 负载平衡等,限于篇幅本文将不开展陈说,这里先附一张网上截图大家感触下:
参考文献
1. https://www.cnblogs.com/arvinhuang/p/16437948.html
2. https://segmentfault.com/a/1190000039133960
3. http://matt33.com/2018/11/04/kafka-transaction/
4. https://blog.51cto.com/u_14020077/5836698
5. https://t1mek1ller.github.io/2020/02/15/kafka-leader-epoch/
6. https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+A…
7. https://xie.infoq.cn/article/c06fea629926e2b6a8073e2f0
8. https://xie.infoq.cn/article/8191412c8da131e78cbfa6600
9. https://mp.weixin.qq.com/s/iEk0loXsKsMO_OCVlUsk2Q
10. https://cloud.tencent.com/developer/article/1657649
11. https://www.cnblogs.com/vivotech/p/16347074.html