乐趣区

关于kafka:Kafka知识点总结

1、什么是 Kafka?

Kafka 是一个 Scala 语言开发的多分区、多正本、分布式的基于公布 / 订阅模式的音讯队列。目前 Kafka 曾经定位为一个分布式流式解决平台,它以高吞吐、可长久化、可程度扩大、反对流数据处理等多种个性而被宽泛应用。

2、Kafka 架构

Kafak 总体架构图中蕴含多个概念:
(1)ZooKeeper:Zookeeper 负责保留 broker 集群元数据,并对控制器进行选举等操作。
(2)Producer:音讯生产者,就是向 kafka broker 发消息的客户端。
(3)Broker: 一个独立的 Kafka 服务器被称作 broker,一个集群由多个 broker 组成,一个 broker 能够包容多个 topic。broker 负责接管来自生产者的音讯,为音讯设置偏移量,并将音讯存储在磁盘。broker 为消费者提供服务,对读取分区的申请作出响应,返回曾经提交到磁盘上的音讯。
(4)Consumer:音讯消费者,向 kafka broker 取音讯的客户端。
(5)Consumer Group:消费者组,一个消费者组能够蕴含一个或多个 Consumer。消费者组内每个消费者负责生产不同分区的数据,一个分区只能由一个组内消费者生产。消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。应用多分区 + 多消费者形式能够极大进步数据上游的处理速度,同一消费者组中的消费者不会反复生产音讯,同样的,不同生产组中的消费者生产音讯时互不影响。Kafka 就是通过消费者组的形式来实现音讯 P2P 模式和播送模式。
(6)Topic:Kafka 中的音讯以 Topic 为单位进行划分,能够了解为一个队列。生产者将音讯发送到特定的 Topic,而消费者负责订阅 Topic 的音讯并进行生产。
(7)Partition:为了实现扩展性,一个十分大的 topic 能够散布到多个 broker(服务器)上,一个 topic 能够分为多个 partition,每个 partition 是一个有序的队列。同一个主题下不同分区蕴含的音讯是不同的,分区在存储层面能够看作一个可追加的日志(Log)文件,音讯在被
追加到分区日志文件的时候都会调配一个特定的偏移量(offset)。
(8)Offset:分区中每条音讯都会调配一个有序的 id,即偏移量。offset 不逾越分区,也就是说 Kafka 保障的是分区有序性而不是主题有序性。
(9)Replica:正本,为保障集群中的某个节点产生故障时,该节点上的 partition 数据不失落,且 kafka 依然可能持续工作,kafka 提供了正本机制,一个 topic 的每个分区都有若干个正本,一个 leader 和若干个 follower。通常只有 leader 正本对外提供读写服务,当主正本所在 broker 解体或产生网络异样,Kafka 会在 Controller 的治理下会从新抉择新的 leader 正本对外提供读写服务。
(10)Record:理论写入 Kafka 中并能够被读取的音讯记录。每个 record 蕴含了 key、value 和 timestamp。
(11)Leader: 每个分区多个正本的 “ 主 ” 正本,生产者发送数据的对象,以及消费者生产数据的对象都是 leader。
(12)Follower: 每个分区多个正本中的 ” 从 ” 正本,实时从 Leader 中同步数据,放弃和 leader 数据的同步。Leader 产生故障时,某个 follow 会成为新的 leader。
(13)ISR(In-Sync Replicas):正本同步队列,示意和 leader 放弃同步的正本的汇合(包含 leader 自身)。如果 follower 长时间不与 leader 同步数据则将该正本踢出 ISR 队列。leader 产生故障会从 ISR 中选举新 leader。
(14)OSR(Out-of-Sync Replicas):因同步提早过高而被踢出 ISR 的正本存在 OSR。
(15)AR(Assigned Replicas):所有正本汇合,即 AR = ISR + OSR。

3、公布订阅的音讯零碎那么多,为啥抉择 Kafka?(Kafka 的特点)

(1)多个生产者
KafKa 能够无缝地反对多个生产者,不论客户端应用一个主题,还是多个主题。Kafka 适宜从多个前端零碎收集数据,并以对立的格局堆外提供数据。
(2)多个消费者
Kafka 反对多个消费者从一个独自的音讯流中读取数据,并且消费者之间互不影响。这与其余队列零碎不同,其余队列零碎一旦被客户端读取,其余客户端就不能再读取它。并且多个消费者能够组成一个消费者组,他们共享一个音讯流,并保障消费者组对每个给定的音讯只生产一次。
(3)基于磁盘的数据存储(持久性,可靠性)
Kafka 容许消费者非实时地读取音讯,起因在于 Kafka 将音讯提交到磁盘上,设置了保留规定进行保留,无需放心音讯失落等问题。
(4)伸缩性,可扩展性
可扩大多台 broker。用户能够先应用单个 broker,到前面能够扩大到多个 broker
(5)高性能(高吞吐,低提早)
Kafka 能够轻松解决百万千万级音讯流,同时还能保障亚秒级的音讯提早。

4、kafka 如何做到高吞吐量 / 高性能的?

Kafka 实现高吞吐量和性能,次要通过以下几点:

1、页缓存技术
Kafka 是基于 操作系统 的页缓存来实现文件写入的。操作系统自身有一层缓存,叫做 page cache,是在 内存里的缓存,咱们也能够称之为 os cache,意思就是操作系统本人治理的缓存。Kafka 在写入磁盘文件的时候,能够间接写入这个 os cache 里,也就是仅仅写入内存中,接下来由操作系统本人决定什么时候把 os cache 里的数据真的刷入磁盘文件中。通过这一个步骤,就能够将磁盘文件写性能晋升很多了,因为其实这里相当于是在写内存,不是在写磁盘。

2、磁盘程序写
另一个次要性能是 kafka 写数据的时候,是以磁盘程序写的形式来写的。也就是说,仅仅将数据追加到 log 文件的开端,不是在文件的随机地位来批改数据。同样的磁盘,程序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构无关,程序写之所以快,是因为其省去了大量磁头寻址的工夫。

基于下面两点,kafka 就实现了写入数据的超高性能。

3、零拷贝
大家应该都晓得,从 Kafka 里常常要生产数据,那么生产的时候实际上就是要从 Kafka 的磁盘文件里读取某条数据而后发送给上游的消费者,如下图所示:

那么这里如果频繁的从磁盘读数据而后发给消费者,会减少两次没必要的拷贝,如下图:

一次是从操作系统的 cache 里拷贝到利用过程的缓存里,接着又从应用程序缓存里拷贝回操作系统的 Socket 缓存里。而且为了进行这两次拷贝,两头还产生了好几次上下文切换,一会儿是应用程序在执行,一会儿上下文切换到操作系统来执行。所以这种形式来读取数据是比拟耗费性能的。
Kafka 为了解决这个问题,在读数据的时候是引入零拷贝技术。
也就是说,间接让操作系统的 cache 中的数据发送到网卡后传输给上游的消费者,两头跳过了两次拷贝数据的步骤,Socket 缓存中仅仅会拷贝一个描述符过来,不会拷贝数据到 Socket 缓存,如下图所示:

通过 零拷贝技术,就不须要把 os cache 里的数据拷贝到利用缓存,再从利用缓存拷贝到 Socket 缓存了,两次拷贝都省略了,所以叫做零拷贝。对 Socket 缓存仅仅就是拷贝数据的描述符过来,而后数据就间接从 os cache 中发送到网卡下来了,这个过程大大的晋升了数据生产时读取文件数据的性能。Kafka 从磁盘读数据的时候,会先看看 os cache 内存中是否有,如果有的话,其实读数据都是间接读内存的。Kafka 集群通过良好的调优,数据间接写入 os cache 中,而后读数据的时候也是从 os cache 中读。相当于 Kafka 齐全基于内存提供数据的写和读了,所以这个整体性能会极其的高。

5、Kafka 和 Zookeeper 之间的关系

Kafka 应用 Zookeeper 来保留集群的元数据信息和消费者信息(偏移量),没有 zookeeper,kafka 是工作不起来。在 zookeeper 上会有一个专门用来进行 Broker 服务器列表记录的点,节点门路为 /brokers/ids。

每个 Broker 服务器在启动时,都会到 Zookeeper 上进行注册,即创立 /brokers/ids/[0-N] 的节点,而后写入 IP,端口等信息,Broker 创立的是长期节点,所以一旦 Broker 上线或者下线,对应 Broker 节点也就被删除了,因而能够通过 zookeeper 上 Broker 节点的变动来动静表征 Broker 服务器的可用性。

6、生产者向 Kafka 发送音讯的执行流程

如下图所示:

(1)生产者要往 Kafka 发送音讯时,须要创立 ProducerRecoder, 代码如下:

ProducerRecord<String,String> record 
      = new ProducerRecoder<>("CostomerCountry","Precision Products","France");
      try{producer.send(record);
      }catch(Exception e){e.printStackTrace();
      }

(2)ProducerRecoder 对象会蕴含指标 topic,分区内容,以及指定的 key 和 value, 在发送 ProducerRecoder 时,生产者会先把键和值对象序列化成字节数组,而后在网络上传输。

(3)生产者在将音讯发送到某个 Topic,须要通过拦截器、序列化器和分区器(Partitioner)。

(4)如果音讯 ProducerRecord 没有指定 partition 字段,那么就须要依赖分区器,依据 key 这个字段来计算 partition 的值。分区器的作用就是为音讯调配分区。

若没有指定分区,且音讯的 key 不为空,则应用 murmur 的 Hash 算法(非加密型 Hash 函数,具备高运算性能及低碰撞率)来计算分区调配。
若没有指定分区,且音讯的 key 也是空,则用轮询的形式抉择一个分区。

(5)分区抉择好之后,会将音讯增加到一个记录批次中,这个批次的所有音讯都会被发送到雷同的 Topic 和 partition 上。而后会有一个独立的线程负责把这些记录批次发送到相应的 broker 中。

(6)broker 接管到 Msg 后,会作出一个响应。如果胜利写入 Kafka 中,就返回一个 RecordMetaData 对象,它蕴含 Topic 和 Partition 信息,以及记录在分区的 offset。

(7)若写入失败,就返回一个谬误异样,生产者在收到谬误之后尝试从新发送音讯,几次之后如果还失败,就返回错误信息。

7、kafka 如何保障对应类型的音讯被写到雷同的分区?

通过 音讯键 和 分区器 来实现,分区器为键生成一个 offset,而后应用 offset 对主题分区进行取模,为音讯选取分区,这样就能够保障蕴含同一个键的音讯会被写到同一个分区上。

如果 ProducerRecord 没有指定分区,且音讯的 key 不为空,则应用 Hash 算法(非加密型 Hash 函数,具备高运算性能及低碰撞率)来计算分区调配。

如果 ProducerRecord 没有指定分区,且音讯的 key 也是空,则用 轮询 的形式抉择一个分区。

8、kafka 文件存储机制

在 Kafka 中,一个 Topic 会被宰割成多个 Partition,而 Partition 由多个更小的 Segment 的元素组成。Partition 在服务器上的表现形式就是一个一个的文件夹,每个 partition 文件夹上面会有多组 segment(逻辑分组,并不是实在存在),每个 segment 对应三个文件:.log 文件、.index 文件、.timeindex 文件。topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于多个 log 文件,该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被一直追加到该 log 文件末端,且每条数据都有本人的 offset。消费者组中的每个消费者,都会实时记录本人生产到了哪个 offset,以便出错复原时,从上次的地位持续生产。

Kafka 会依据 log.segment.bytes 的配置来决定单个 Segment 文件(log)的大小,当写入数据达到这个大小时就会创立新的 Segment。

9、如何依据 offset 找到对应的 Message?

每个索引项占用 8 个字节,分为两个局部:
(1) relativeOffset: 绝对偏移量,示意音讯绝对于 baseOffset 的偏移量,占用 4 个字节(relativeOffset = offset – baseOffset),以后索引文件的文件名即为 baseOffset 的值。

例如:一个日志片段的 baseOffset 为 32,那么其文件名就是 00000000000000000032.log,offset=35 的音讯在索引文件中的 relativeOffset 的值为 35-32=3

(2) position: 物理地址,也就是音讯在日志分段文件中对应的物理地位,占用 4 个字节。

(1)先找到 offset=3 的 message 所在的 segment 文件(利用二分法查找),先判断.index 文件名称 offset(baseOffset)是否小于 3;
若小于,则持续二分与下一个.inde 文件名称 offset 比拟;
若大于,则返回上次小于 3 的.index 文件,这里找到的就是在第一个 segment 文件。

(2)找到的 segment 中的.index 文件,用查找的 offset 减去.index 文件名的 offset(relativeOffset = offset – baseOffset),也就是 00000.index 文件,咱们要查找的 offset 为 3 的 message 在该.index 文件内的索引为 3(index 采纳稠密存储的形式,它不会为每一条 message 都建设索引,而是每隔 4k 左右,建设一条索引,防止索引文件占用过多的空间。毛病是没有建设索引的 offset 不能一次定位到 message 的地位,须要做一次程序扫描,然而扫描的范畴很小)。

(3)依据找到的 relative offset 为 3 的索引,确定 message 存储的物理偏移地址为 756。

(4)依据物理偏移地址,去.log 文件找相应的 Message

同理,我如果想找 offset= 8 对应的 Message 数据呢?

(1)首先依据二分查找法找到 segment 的对应的 00000000000000000006.index 索引文件

(2)依据 offset= 8 找到对应的索引文件中的地位,该地位保留了一个偏移量 326,依据偏移量 326 在 00000000000000000006.log 文件中找到对应的音讯 Message-8。

Kafka 的 Message 存储采纳了分区,磁盘程序读写,分段和稠密索引等一些伎俩来达到高效性,在 0.9 版本之后,offset 曾经间接保护在 kafka 集群的__consumer_offsets 这个 topic 中。

10、Producer 发送的一条 message 中蕴含哪些信息?

音讯由 可变长度 的 报头、可变长度的 不通明密钥字节数组和 可变长度的 不通明值字节数组组成。

RecordBatch 是 Kafka 数据的存储单元,一个 RecordBatch 中蕴含多个 Record(即咱们通常说的一条音讯)。RecordBatch 中各个字段的含意如下:

一个 RecordBatch 中能够蕴含多条音讯,即上图中的 Record,而每条音讯又能够蕴含多个 Header 信息,Header 是 Key-Value 模式的。

11、kafka 如何实现音讯有序

生产者:通过分区的 leader 正本负责数据以先进先出的程序写入,来保障音讯程序性。

消费者:同一个分区内的音讯只能被一个 group 里的一个消费者生产,保障分区内生产有序。

kafka 每个 partition 中的音讯在写入时都是有序的,生产时,每个 partition 只能被每一个消费者组中的一个消费者生产,保障了生产时也是有序的。

整个 kafka 不保障有序。如果为了保障 kafka 全局有序,那么设置一个生产者,一个分区,一个消费者。

12、kafka 有哪些分区算法?

Kafka 蕴含三种分区算法:

(1)轮询策略

也称 Round-robin 策略,即程序调配。比方一个 topic 下有 3 个分区,那么第一条音讯被发送到分区 0,第二条被发送到分区 1,第三条被发送到分区 2,以此类推。当生产第四条音讯时又会从新开始。

轮询策略是 kafka java 生产者 API 默认提供的分区策略。轮询策略有十分优良的负载平衡体现,它总是能保障音讯最大限度地被平均分配到所有分区上,故默认状况下它是最正当的分区策略,也是平时最罕用的分区策略之一。

(2)随机策略

也称 Randomness 策略。所谓随机就是咱们随便地将音讯搁置在任意一个分区上,如下图:

(3)按 key 调配策略

kafka 容许为每条音讯定义音讯键,简称为 key。一旦音讯被定义了 key,那么你就能够保障同一个 key 的所有音讯都进入到雷同的分区外面,因为每个分区下的音讯解决都是有程序的,如下图所示:

13、Kafka 的默认音讯保留策略

broker 默认的音讯保留策略分为两种:
日志片段通过 log.segment.bytes 配置(默认是 1GB)
日志片段通过 log.segment.ms 配置(默认 7 天)

14、kafka 如何实现单个集群间的音讯复制?

Kafka 音讯负责机制只能在单个集群中进行复制,不能在多个集群之间进行。

kafka 提供了一个叫做 MirrorMaker 的外围组件,该组件蕴含一个生产者和一个消费者,两者之间通过一个队列进行相连,当消费者从一个集群读取音讯,生产者把音讯发送到另一个集群。

15、Kafka 音讯确认 (ack 应答) 机制

为保障 producer 发送的数据,能牢靠的达到指定的 topic ,Producer 提供了音讯确认机制。生产者往 Broker 的 topic 中发送音讯时,能够通过配置来决定有几个正本收到这条音讯才算音讯发送胜利。能够在定义 Producer 时通过 acks 参数指定,这个参数反对以下三种值:

(1)acks = 0:producer 不会期待任何来自 broker 的响应。

特点:低提早,高吞吐,数据可能会失落。

如果当中呈现问题,导致 broker 没有收到音讯,那么 producer 无从得悉,会造成音讯失落。

(2)acks = 1(默认值):只有集群中 partition 的 Leader 节点收到音讯,生产者就会收到一个来自服务器的胜利响应。

如果在 follower 同步之前,leader 呈现故障,将会失落数据。

此时的吞吐量次要取决于应用的是 同步发送 还是 异步发送,吞吐量还受到发送中音讯数量的限度,例如 producer 在收到 broker 响应之前能够发送多少个音讯。

(3)acks = -1:只有当所有参加复制的节点全副都收到音讯时,生产者才会收到一个来自服务器的胜利响应。

这种模式是最平安的,能够保障不止一个服务器收到音讯,就算有服务器产生解体,整个集群仍然能够运行。

依据理论的利用场景,抉择设置不同的 acks,以此保证数据的可靠性。

另外,Producer 发送音讯还能够抉择同步或异步模式, 如果设置成异步,尽管会极大的进步音讯发送的性能,然而这样会减少失落数据的危险。如果须要确保音讯的可靠性,必须将 producer.type 设置为 sync。

# 同步模式
producer.type=sync 
#异步模式
producer.type=async 

16、说一下什么是正本?

kafka 为了保证数据不失落,从 0.8.0 版本开始引入了分区正本机制。在创立 topic 的时候指定 replication-factor, 默认正本为 3。

正本是绝对 partition 而言的,一个分区中蕴含一个或多个正本,其中一个为 leader 正本,其余为 follower 正本,各个正本位于不同的 broker 节点中。

所有的读写操作都是通过 Leader 进行的,同时 follower 会定期地去 leader 上复制数据。当 Leader 挂掉之后,其中一个 follower 会从新成为新的 Leader。通过分区正本,引入了数据冗余,同时也提供了 Kafka 的数据可靠性。

Kafka 的分区多正本架构是 Kafka 可靠性保障的外围,把音讯写入多个正本能够使 Kafka 在产生解体时仍能保障音讯的持久性。

17、Kafka 的 ISR 机制

在分区中,所有正本统称为 AR,Leader 保护了一个动静的 in-sync replica(ISR),ISR 是指与 leader 正本放弃同步状态的正本汇合。当然 leader 正本自身也是这个汇合中的一员。

当 ISR 中的 follower 实现数据同步之后,leader 就会给 follower 发送 ack , 如果其中一个 follower 长时间未向 leader 同步数据,该 follower 将会被踢出 ISR 汇合,该工夫阈值由 replica.log.time.max.ms 参数设定。当 leader 产生故障后,就会从 ISR 汇合中从新选举出新的 leader。

18、LEO、HW、LSO、LW 别离代表什么?

LEO:是 LogEndOffset 的简称,代表以后日志文件中下一条。

HW:水位或水印一词,也可称为高水位(high watermark), 通常被用在流式解决畛域(flink、spark),以表征元素或事件在基于工夫层面上的停顿。在 kafka 中,水位的概念与工夫无关,而是与地位信息相干。严格来说,它示意的就是地位信息,即位移(offset)。取 partition 对应的 ISR 中最小的 LEO 作为 HW,consumer 最多只能生产到 HW 所在的上一条信息。

LSO: 是 LastStableOffset 的简称,对未实现的事务而言,LSO 的值等于事务中第一条音讯的地位(firstUnstableOffset),对已实现的事务而言,它的值同 HW 雷同。

LW: Low Watermark 低水位,代表 AR 汇合中最小的 logStartOffset 值。

19、如何进行 Leader 正本选举?

每个分区的 leader 会保护一个 ISR 汇合,ISR 列表外面就是 follower 正本的 Borker 编号,只有“跟得上”Leader 的 follower 正本能力退出到 ISR 外面,这个是通过 replica.lag.time.max.ms 参数配置的。只有 ISR 里的成员才有被选为 leader 的可能。

所以当 Leader 挂掉了,而且 unclean.leader.election.enable=false 的状况下,Kafka 会从 ISR 列表中抉择 第一个 follower 作为新的 Leader,因为这个分区领有最新的曾经 committed 的音讯。通过这个能够保障曾经 committed 的音讯的数据可靠性。

20、如何进行 broker Leader 选举?

(1) 在 kafka 集群中,会有多个 broker 节点,集群中第一个启动的 broker 会通过在 zookeeper 中创立长期节点 /controller 来让本人成为控制器,其余 broker 启动时也会在 zookeeper 中创立长期节点,然而发现节点曾经存在,所以它们会收到一个异样,意识到控制器曾经存在,那么就会在 zookeeper 中创立 watch 对象,便于它们收到控制器变更的告诉。

(2) 如果集群中有一个 broker 产生异样退出了,那么控制器就会查看这个 broker 是否有分区的正本 leader,如果有那么这个分区就须要一个新的 leader,此时控制器就会去遍历其余正本,决定哪一个成为新的 leader,同时更新分区的 ISR 汇合。

(3) 如果有一个 broker 退出集群中,那么控制器就会通过 Broker ID 去判断新退出的 broker 中是否含有现有分区的正本,如果有,就会从分区正本中去同步数据。

(4) 集群中每选举一次控制器,就会通过 zookeeper 创立一个 controller epoch,每一个选举都会创立一个更大,蕴含最新信息的 epoch,如果有 broker 收到比这个 epoch 旧的数据,就会疏忽它们,kafka 也通过这个 epoch 来避免集群产生“脑裂”。

21、Kafka 事务

Kafka 在 0.11 版本引入事务反对,事务能够保障 Kafka 在 Exactly Once 语义的根底上,生产和生产能够跨分区和会话,要么全副胜利,要么全副失败。

Producer 事务

为了实现跨分区跨会话事务,须要引入一个全局惟一的 Transaction ID, 并将 Producer 获取的 PID 和 Transaction ID 绑定。这样当 Producer 重启后就能够通过正在进行的 Transaction ID 获取原来的 PID。

为了治理 Transaction,Kafka 引入了一个新的组件 Transaction Coordinator。Producer 就是通过和 Transaction Coordinator 交互取得 Transaction ID 对应的工作状态。Transaction Coordinator 还负责将事务所有写入 Kafka 的一个外部 Topic,这样即便整个服务重启,因为事务状态失去保留,进行中的事务状态能够失去复原,从而持续进行。

Consumer 事务

上述事务机制次要是从 Producer 方面思考,对于 Consumer 而言,事务的保障就会绝对较弱,尤其是无奈保障 Commit 的信息被准确生产。这是因为 Consumer 能够通过 offset 拜访任意信息,而且不同的 Segment File 生命周期不同,同一事务的音讯可能会呈现重启后被删除的状况。

22、Kafka 的消费者组跟分区之间有什么关系?

(1)在 Kafka 中,通过消费者组治理消费者,假如一个主题中蕴含 4 个分区,在一个消费者组中只有一个消费者。那消费者将收到全副 4 个分区的音讯。

(2)如果存在两个消费者,那么四个分区将依据分区调配策略调配个两个消费者。

(3)如果存在四个消费者,将平均分配,每个消费者生产一个分区。

(4)如果存在 5 个消费者,就会呈现消费者数量多于分区数量,那么多余的消费者将会被闲置,不会接管到任何信息。

23、如何保障每个应用程序都能够获取到 Kafka 主题中的所有音讯,而不是局部音讯?

为每个应用程序创立一个消费者组,而后往组中增加消费者来伸缩读取能力和解决能力,每个群组生产主题中的音讯时,互不烦扰。

24、如何实现 kafka 消费者每次只生产指定数量的音讯?

写一个队列,把 consumer 作为队列类的一个属性,而后减少一个生产计数的计数器,当达到指定数量时,敞开 consumer。

25、Kafka 如何实现多线程的生产?

kafka 容许同组的多个 partition 被一个 consumer 生产,但不容许一个 partition 被同组的多个 consumer 生产。

实现多线程步骤如下:

生产者随机分区提交数据 (自定义随机分区)。
消费者批改单线程模式为多线程,在生产方面得留神,得遍历所有分区,否则还是只生产了一个区。

26、Kafka 生产反对几种生产模式?

kafka 生产音讯时反对三种模式:

at most once 模式 最多一次。保障每一条音讯 commit 胜利之后,再进行生产解决。音讯可能会失落,但不会反复。

at least once 模式 至多一次。保障每一条音讯解决胜利之后,再进行 commit。音讯不会失落,但可能会反复。

exactly once 模式 准确传递一次。将 offset 作为惟一 id 与音讯同时解决,并且保障解决的原子性。音讯只会解决一次,不失落也不会反复。但这种形式很难做到。

kafka 默认的模式是 at least once,但这种模式可能会产生反复生产的问题,所以在业务逻辑必须做幂等设计。

在业务场景保留数据时应用了 INSERT INTO …ON DUPLICATE KEY UPDATE 语法,不存在时插入,存在时更新,是人造反对幂等性的。

27、Kafka 如何保证数据的不反复和不失落?

1、Exactly once 模式 准确传递一次。将 offset 作为惟一 id 与音讯同时解决,并且保障解决的原子性。音讯只会解决一次,不失落也不会反复。但这种形式很难做到。
kafka 默认的模式是 at least once,但这种模式可能会产生反复生产的问题,所以在业务逻辑必须做幂等设计。

2、幂等性:Producer 在生产发送音讯时,难免会反复发送音讯。Producer 进行 retry 时会产生重试机制,产生音讯反复发送。而引入幂等性后,反复发送只会生成一条无效的音讯。

具体实现:每个 Producer 在初始化时都会被调配一个惟一的 PID,这个 PID 对利用是通明的,齐全没有裸露给用户。对于一个给定的 PID,sequence number 将会从 0 开始自增。Producer 在发送数据时,将会给每条 msg 标识一个 sequence number,broker 也就是通过这个来验证数据是否反复。这里的 PID 是全局惟一的,Producer 故障后重新启动后会被调配一个新的 PID,这也是幂等性无奈做到跨会话的一个起因。broker 上每个 Topic-Partition 也会保护 pid-seq 的映射,并且每次 Commit 都会更新 lastSeq。这样 Record Batch 到来时,broker 会先查看 Record Batch 再保留数据。如果 batch 中 baseSeq(第一条音讯的 seq)比 Broker 保护的序号 (lastSeq) 大 1,则保留数据,否则不保留。

3、应用 Exactly Once + 幂等,能够保证数据不反复,不失落。

28、Kafka 是如何清理过期数据的?

kafka 将数据长久化到了硬盘上,容许你配置肯定的策略对数据清理,清理的策略有两个,删除和压缩。

数据清理的形式

1、删除

log.cleanup.policy=delete 启用删除策略

间接删除,删除后的音讯不可复原。可配置以下两个策略:

# 清理超过指定工夫清理:log.retention.hours=16
#超过指定大小后,删除旧的音讯:log.retention.bytes=1073741824

为了防止在删除时阻塞读操作,采纳了 copy-on-write 模式的实现,删除操作进行时,读取操作的二分查找性能理论是在一个动态的快照正本上进行的,这相似于 Java 的 CopyOnWriteArrayList。

2、压缩

将数据压缩,只保留每个 key 最初一个版本的数据。

首先在 broker 的配置中设置 log.cleaner.enable=true 启用 cleaner,这个默认是敞开的。

在 topic 的配置中设置 log.cleanup.policy=compact 启用压缩策略。

退出移动版