作者:京东科技 徐拥

入门

1、什么是kafka?

apache Kafka is a distributed streaming platform. What exactly dose that mean?

Apache Kafka 是音讯引擎零碎,也是一个分布式流解决平台(Distributed Streaming Platform)

2、kafka全景图:

3、Kafka的版本演进:

4、kafka选型:

Apache Kafka:也称社区版 Kafka。劣势在于迭代速度快,社区响应度高,应用它能够让你有更高的把控度;缺点在于仅提供根底外围组件,缺失一些高级的个性。(如果你仅仅须要一个音讯引擎零碎亦或是简略的流解决利用场景,同时须要对系统有较大把控度,那么我举荐你应用 Apache Kafka)

Confluent Kafka :Confluent 公司提供的 Kafka。劣势在于集成了很多高级个性且由 Kafka 原班人马打造,品质上有保障;缺点在于相干文档资料不全,普及率较低,没有太多可供参考的范例。(如果你须要用到 Kafka 的一些高级个性,那么举荐你应用 Confluent Kafka。)

CDH/HDP Kafka:大数据云公司提供的 Kafka,内嵌 Apache Kafka。劣势在于操作简略,节俭运维老本;缺点在于把控度低,演进速度较慢。(如果你须要疾速地搭建音讯引擎零碎,或者你须要搭建的是多框架形成的数据平台且 Kafka 只是其中一个组件,那么我举荐你应用这些大数据云公司提供的 Kafka)

5、Kafka的基本概念:

6、Kafka的根本构造:

7、Kafka的集群构造:

8、kafka的利用场景(用户注册/异步):

9、kafka队列模式---点对点:

10、kafka队列模式---公布/订阅:

11、kafka形成角色:

1、broker:

音讯格局: 主题 - 分区 - 音讯 、主题下的每条音讯只会保留在某一个分区中,而不会在多个分区中被保留多份

这样设计的起因是:不应用多topic做负载平衡,意义在于对业务屏蔽该逻辑。业务只须要对topic进行发送,指定负载平衡策略即可 同时 topic分区是实现负载平衡以及高吞吐量的要害

Topic的创立流程

2、Producer:

发送音讯流程

3、Consumer:

Kafka消费者对象订阅主题并接管Kafka的音讯,而后验证音讯并保留后果。Kafka消费者是消费者组的一部分。一个消费者组里的消费者订阅的是同一个主题,每个消费者接管主题一部分分区的音讯。消费者组的设计是对消费者进行的一个横向伸缩,用于解决消费者生产数据的速度跟不上生产者生产数据的速度的问题,通过减少消费者,让它们分担负载,别离解决局部分区的音讯

4、Consumer Group:

它是kafka提供的具备可扩大且可容错的消费者机制

个性:

1、 Consumer Group下能够有一个或多个 Consumer实例;

2、在一个Katka集群中,Group ID标识惟一的一个Consumer Group;

3、 Consumer Group 下所有实例订阅的主题的单个分区,只能调配给组内的 某个Consumer实例生产。

Consumer Group 两大模型:

1、如果所有实例都属于同一个Group,那么它实现的是音讯队列模型;

2、如果所有实例别离属于不同的GrouD,那么它实现的就是公布/订阅模型。

12、Kafka的工作流程:

13、Kafka常用命令:

进阶

14、Kafka的文件存储机制—log:

15、Kafka的文件存储机制—分片/索引:

16、Kafka的文件存储机制—index/log:

17、kafka 如何反对百万QPS?

程序读写 :

生产者写入数据和消费者读取数据都是程序读写的

Batch Data(数据批量解决):

当消费者(consumer)须要生产数据时,首先想到的是消费者须要一条,kafka发送一条,消费者再要一条kafka再发送一条。但实际上 Kafka 不是这样做的,Kafka 耍小聪明了。Kafka 把所有的音讯都寄存在一个一个的文件中,当消费者须要数据的时候 Kafka 间接把文件发送给消费者。比如说100万条音讯放在一个文件中可能是10M的数据量,如果消费者和Kafka之间网络良好,10MB大略1秒就能发送完,既100万TPS,Kafka每秒解决了10万条音讯。

MMAP(内存映射文件):

MMAP也就是内存映射文件,在64位操作系统中个别能够示意 20G 的数据文件,它的工作原理是间接利用操作系统的 Page 来实现文件到物理内存的间接映射,实现映射之后对物理内存的操作会被同步到硬盘上。

通过MMAP技术过程能够像读写硬盘一样读写内存(逻辑内存),不用关怀内存的大小,因为有虚拟内存兜底。这种形式能够获取很大的I/O晋升,省去了用户空间到内核空间复制的开销。也有一个很显著的缺点,写到MMAP中的数据并没有被真正的写到硬盘,操作系统会在程序被动调用 flush 的时候才把数据真正的写到硬盘。

Zero Copy(零拷贝):

如果不应用零拷贝技术,消费者(consumer)从Kafka生产数据,Kafka从磁盘读数据而后发送到网络下来,数据一共产生了四次传输的过程。其中两次是 DMA 的传输,另外两次,则是通过 CPU 管制的传输。

第一次传输:从硬盘上将数据读到操作系统内核的缓冲区里,这个传输是通过 DMA 搬运的。

第二次传输:从内核缓冲区外面的数据复制到调配的内存外面,这个传输是通过 CPU 搬运的。

第三次传输:从调配的内存外面再写到操作系统的 Socket 的缓冲区外面去,这个传输是由 CPU 搬运的。

第四次传输:从 Socket 的缓冲区外面写到网卡的缓冲区外面去,这个传输是通过 DMA 搬运的。

实际上在kafka中只进行了两次数据传输,如下图:

第一次传输:通过 DMA从硬盘间接读到操作系统内核的读缓冲区外面。

第二次传输:依据 Socket 的描述符信息间接从读缓冲区外面写入到网卡的缓冲区外面。

咱们能够看到同一份数据的传输次数从四次变成了两次,并且没有通过 CPU 来进行数据搬运,所有的数据都是通过 DMA 来进行传输的。没有在内存层面去复制(Copy)数据,这个办法称之为零拷贝(Zero-Copy)。

无论传输数据量的大小,传输同样的数据应用了零拷贝可能缩短 65%的工夫,大幅度晋升了机器传输数据的吞吐量,这也是Kafka可能反对百万TPS的一个重要起因

18、压缩:

个性:

节俭网络传输带宽以及 Kafka Broker 端的磁盘占用。

生产者配置 :

compression.type

Properties props = new Properties();

props.put("bootstrap.servers", "localhost:9092");

props.put("acks", "all");

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 开启GZIP压缩

props.put("compression.type", "gzip");

Producerproducer = new KafkaProducer<>(props)

broker开启压缩:

Broker 端也有一个参数叫 compression.type 默认值为none,这意味着发送的音讯是未压缩的。否则,您指定反对的类型:gzip、snappy、lz4或zstd。 Producer 端压缩、Broker 端放弃、Consumer 端解压缩。

broker何时压缩:

状况一:Broker 端指定了和 Producer 端不同的压缩算法。(危险:可能会产生意料之外的压缩 / 解压缩操作,体现为 Broker 端 CPU 使用率飙升)

设想一个对话:

Producer 说:“我要应用 GZIP 进行压缩。

Broker 说:“不要,我这边接管的音讯必须应用配置的 lz4 进行压缩

状况二: Broker 端产生了音讯格局转换 (危险:波及额定压缩/解压缩,且 Kafka 丢失 Zero Copy 个性)

Kafka 共有两大类音讯格局,社区别离称之为 V1 版本和 V2 版本

为了兼容老版本的格局,Broker 端会对新版本音讯执行向老版本格局的转换。这个过程中会波及音讯的解压缩和从新压缩

音讯何时解压缩:

Consumer:收到到压缩过的音讯会解压缩还原成之前的音讯。

broker:收到producer的音讯 压缩算法和本人的不统一/兼容新老版本的音讯格局

压缩算法比照:

以 Kafka 为例,吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;压缩比方面,zstd > LZ4 > GZIP > Snappy;

具体到物理资源,应用 Snappy 算法占用的网络带宽最多,zstd 起码,这是正当的,毕竟 zstd 就是要提供超高的压缩比;

在 CPU 使用率方面,各个算法体现得差不多,只是在压缩时 Snappy 算法应用的 CPU 较多一些,而在解压缩时 GZIP 算法则可能应用更多的 CPU;

19、Exactly-Once(ACK应答机制):

1、At Least Once

起码发送一次,Ack级别为-1,保证数据不失落

2、At Most Once

最多发送一次,Ack级别为1,保证数据不反复

3、幂等性

保障producer发送的数据在broker只长久化一条

4、Exactly Once(0.11版本)

At Least Once + 幂等性 = Exactly Once

要启用幂等性,只须要将Producer的参数中 enable.idompotence设置为 true即可。 Kafka的幂等性实现其实就是将原来上游须要做的去重放在了数据上游。

20、producer如何获取metadata:

1:在创立KafkaProducer实例时 第一步:生产者利用会在后盾创立并启动一个名为Sender的线程,

2:该Sender线程开始运行时,首先会创立与Broker的连贯。 第二步:此时不晓得要连贯哪个Broker,kafka会通过METADATA申请获取集群的元数据,连贯所有的Broker。

3:Producer 通过 metadata.max.age.ms定期更新元数据,在连贯多个broker的状况下,producer的InFlightsRequests中保护着每个broker的期待回复音讯的队列,期待数量越少阐明broker处理速度越快,负载越小,就会发到哪个broker上

21、kafka真的会丢音讯吗?

kafka最优配置:

Producer:

如果是Java客户端 倡议应用 producer.send(msg, callback) ,callback(回调)它能精确地通知你音讯是否真的提交胜利了。

设置 acks = all。acks 是 Producer 的参数,如果设置成 all,须要所有正本 Broker 都要接管到音讯,该音讯才算是“已提交”。这是最高等级的“已提交”定义。

设置 retries 为一个较大的值。当呈现网络的刹时抖动时,音讯发送可能会失败,此时配置了 retries > 0 的 Producer 可能主动重试音讯发送,防止音讯失落。

Consumer:

音讯生产实现再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采纳手动提交位移的形式。

broker :

设置 unclean.leader.election.enable = false。它管制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成音讯的失落。故个别都要将该参数设置成 false,即不容许这种状况的产生。

设置 replication.factor >= 3,目前避免音讯失落的次要机制就是冗余。

设置 min.insync.replicas > 1,管制的是音讯至多要被写入到多少个正本才算是“已提交”。设置成大于 1 能够晋升音讯持久性。在理论环境中千万不要应用默认值 1。 确保 replication.factor > min.insync.replicas。如果两者相等,那么只有有一个正本挂机,整个分区就无奈失常工作了。咱们不仅要改善音讯的持久性,避免数据失落,还要在不升高可用性的根底上实现。举荐设置成 replication.factor = min.insync.replicas + 1。

22、kafka Replica:

实质就是一个只能追加写音讯的提交日志。依据 Kafka 正本机制的定义,同一个分区下的所有正本保留有雷同的音讯序列,这些正本扩散保留在不同的 Broker 上,从而可能反抗局部 Broker 宕机带来的数据不可用

3个个性:

第一,在 Kafka 中,正本分成两类:领导者正本(Leader Replica)和追随者正本(Follower Replica)。每个分区在创立时都要选举一个正本,称为领导者正本,其余的正本主动称为追随者正本。

第二,Kafka 的正本机制比其余分布式系统要更严格一些。在 Kafka 中,追随者正本是不对外提供服务的。这就是说,任何一个追随者正本都不能响应消费者和生产者的读写申请。所有的申请都必须由领导者副原本解决,或者说,所有的读写申请都必须发往领导者正本所在的 Broker,由该 Broker 负责解决。追随者正本不解决客户端申请,它惟一的工作就是从领导者正本异步拉取音讯,并写入到本人的提交日志中,从而实现与领导者正本的同步。

第三,当领导者正本挂掉了,或者说领导者正本所在的 Broker 宕机时,Kafka 依靠于监控性能可能实时感知到,并立刻开启新一轮的领导者选举,从追随者正本当选一个作为新的领导者。老 Leader 正本重启回来后,只能作为追随者正本退出到集群中。

意义: 不便实现“Read-your-writes”

(1)含意:当应用生产者API向Kafka胜利写入音讯后,马上应用音讯者API去读取方才生产的音讯。 (2)如果容许追随者正本对外提供服务,因为正本同步是异步的,就可能因为数据同步时间差,从而使客户端看不到最新写入的音讯。 B :不便实现枯燥读(Monotonic Reads) (1)枯燥读:对于一个消费者用户而言,在多处音讯音讯时,他不会看到某条音讯一会存在,一会不存在。 (2)如果容许追随者正本提供读服务,因为音讯是异步的,则多个追随者正本的状态可能不统一。若客户端每次命中的正本不同,就可能呈现一条音讯一会看到,一会看不到

23、ISR(In-Sync Replica Set)LEO&HW 机制:

HW(High Watermark)是所有正本中最小的LEO。

比方: 一个分区有3个正本,一个leader,2个follower。producer向leader写了10条音讯,follower1从leader处拷贝了5条音讯,follower2从leader处拷贝了3条音讯,那么leader正本的LEO就是10,HW=3;follower1正本的LEO就是5

HW作用:保障生产数据的一致性和正本数据的一致性 通过HW机制。leader处的HW要等所有follower LEO都越过了才会前移

ISR: 所有与leader正本放弃肯定水平同步的正本(包含leader正本在内)组成ISR(In-Sync Replicas)

1、Follower故障:

当follower挂掉之后,会被踢出ISR;

当follower复原后,会读取本地磁盘记录的HW,而后截掉HW之后的局部,从HW开始从leader持续同步数据,当该follower的LEO大于等于该partition的HW的时候,就是它追上leader的时候,会被重新加入到ISR中

2、Leader故障:

当leader故障之后,会从follower中选出新的leader,为保障多个正本之间的数据一致性,其余的follower会将各自HW之后的局部截掉(新leader如果没有那局部数据 follower就会截掉造成数据失落),从新从leader开始同步数据,然而只能保障正本之间的数据一致性,并不能保证数据不反复或失落。

24、Consumer分区调配策略:

自定义分区策略:

你须要显式地配置生产者端的参数partitioner.class。这个参数该怎么设定呢?办法很简略,在编写生产者程序时,你能够编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。这个接口也很简略,只定义了两个办法:partition()和close(),通常你只须要实现最重要的 partition 办法

int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster){

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

//随机

//return ThreadLocalRandom.current().nextInt(partitions.size());

//按音讯键保序策略

//return Math.abs(key.hashCode()) % partitions.size();

//指定条件

return partitions.stream().filter(Predicate(指定条件))).map(PartitionInfo::partition).findAny().get();

}

25、kafka中一个鲜为人知的topic:

consumer_offsets:

老版本的Kafka会把位移信息保留在Zk中 ,但zk不适用于高频的写操作,这令zk集群性能重大降落,在新版本中将位移数据作为一条条一般的Kafka音讯,提交至外部主题(\_consumer\_offsets)中保留,实现高持久性和高频写操作。

位移主题每条音讯内容格局:Group ID,主题名,分区号

当Kafka集群中的第一个Consumer程序启动时,Kafka会主动创立位移主题。也能够手动创立 分区数依赖于Broker端的offsets.topic.num.partitions的取值,默认为50 正本数依赖于Broker端的offsets.topic.replication.factor的取值,默认为3

思考:

只有 Consumer 始终启动着,它就会无限期地向位移主题写入音讯,就算没有新音讯进来 也会通过定时工作反复写雷同位移 最终撑爆磁盘?

Kafka 提供了专门的后盾线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据,这个后盾线程叫 Log Cleaner,对雷同的key只保留最新的一条音讯。

26、Consumer Group Rebalance:

术语简介:

Rebalance :就是让一个 Consumer Group 下所有的 Consumer 实例就如何生产订阅主题的所有分区达成共识的过程。

Coordinator:它专门为 Consumer Group 服务,负责为 Group 执行 Rebalance 以及提供位移治理和组成员治理等。

Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 利用启动时,也是向 Coordinator 所在的 Broker 发送各种申请,而后由 Coordinator 负责执行消费者组的注册、成员治理记录等元数据管理操作。

如何确定Coordinator地位 :partitionId=Math.abs(groupId.hashCode() % offsetsTopicPartitionCount) 比方(abs(627841412 % 50)=12 Coordinator就在 partitionId=12的Leader 正本所在的 Broker)。

Rebalance的危害:

Rebalance 影响 Consumer 端 TPS 这期间不会工作

Rebalance 很慢 Consumer越多 Rebalance工夫越长

Rebalance 效率不高 须要所有成员参加

触发 Rebalance场景:

组成员数量发生变化

订阅主题数量发生变化

订阅主题的分区数发生变化

如何防止 Rebalance:

设置 session.timeout.ms = 15s (session连接时间 默认10)

设置 heartbeat.interval.ms = 2s(心跳工夫)

max.poll.interval.ms (取决你一批音讯解决时长 默认5分钟)

要保障 Consumer 实例在被断定为“dead”之前,可能发送至多 3 轮的心跳申请,即 session.timeout.ms >= 3 * heartbeat.interval.ms。

27、Kafka 拦截器:

Kafka 拦截器分为生产者拦截器和消费者拦截器,能够利用于包含客户端监控、端到端系统性能检测、音讯审计等多种性能在内的场景。

例:生产者Interceptor

局部图文材料地址

-- 极客工夫 Kafka 核心技术与实战

--google