共计 4865 个字符,预计需要花费 13 分钟才能阅读完成。
一、基本概念
1、broker
broker
指的一个 kafka
服务器,一个 kafka 集群是由多个 kafka broker 组成。
2、producer
producer
指的是音讯生产者,即发送音讯到 kafka broker
的客户端。
3、consumer
consumer
指的音讯消费者,即从 kafka broker
获取音讯的客户端。
4、cousumer group
consumer group
指的是消费者组,领有雷同的 group id
的消费者形成一个消费者组。
- 消费者组与消费者之间互不影响。
- 消费者组内的每个消费者负责生产不同的分区的数据。
一个分区只能有同一个分区中的一个消费者生产
。
5、topic
topic
指的是主题。生产者生产音讯、消费者生产音讯,都须要指定一个 topic。
6、partition
partition
指的是分区。
- 一个
topic
能够存在多个分区。 - 一个分区,只能被同一个音讯者组中的某一个消费者生产。
每个分区上的音讯都是有序的
,然而主题(topic
) 上的音讯不是有序的。- 每个分区可能存在多个
follower
, 其中负责读 / 写
的是leader
。 - 每个
kafka broker
能够是以后分区的 leader,也能够是其它分区的follower
。 - 多个分区能够进步程序的并发性,因为一个分区只能一个分区音讯,多个分区能够多个消费者同时生产。
7、replicas
replicas
指的是正本。数据冗余,保障集群的可用性。
leader
指的是topic
主题的分区中,每个分区的主
。生产者发送数据、消费者生产数据都是从leader 分区
中操作的。follower
指的是topic
主题的分区中,每个分区的从
。负责从leader 分区
同步数据,当 leader 宕机时,follower 可能成为新的 leader。默认是从ISR
中进行选举。设置unclean.leader.election.enable = true
能够从非ISR
节点中进行选举,这样能够导致失落数据,默认值是false
,倡议是false
。-
ISR(in-sync replicas)
- 与 leader 放弃同步的 follower 汇合
- 如果 follower 在超过
replica.lag.time.max.ms
毫秒,没有与 leader 进行同步,则踢出 ISR。
-
OSR(out-sync replicas)
- 落后 leader 太多的正本
-
AR
- ISR + OSR
是否容许非 ISR 的正本参加选举。
二、生产者分区策略
即生产者,发送发送音讯后,该音讯保留到 kafka broker
上的 topic
上的那个 partition
上。
1、默认分区策略
默认的分区策略应用的是 org.apache.kafka.clients.producer.internals.DefaultPartitioner
这个类。
即咱们代码中应用 org.apache.kafka.clients.producer.ProducerRecord
发送音讯时。
- 在指定了
partition
的状况下,间接应用指定的 分区。 - 没有指定
partition
但指定了key
的状况下,将 key 的 hash 值与 topic 的 partition
数进行取余失去 partition 值。
- 没有指
partition
和key
, 在一个批次满的时候会发送到同一个分区,当一个新的批次创立时,会发送到另外的分区中。能够通过KIP-480
获取更多的粘性分区常识。
2、自定义分区策略
- 实现
org.apache.kafka.clients.producer.Partitioner
接口。 - 代码实现
Properties prop = new Properties();
prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"自定义实现的分区器的全门路");
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
三、生产者 acks 应答机制
1、acks = 0
producer
不期待 broker
返回 ack
信息,只是将音讯退出到 socket 的缓冲中,可能还未发送进来。
- 重试 (
retries
) 不会失效。 - 为每个记录返回的偏移量始终是
-1
。 - 可能会存在
失落数据
的状况。
2、acks = 1
producer
期待 broker
返回 ack
信息,只有这个 partition
的leader
将音讯胜利保留后返回ack
。
- 如果
leader
返回ack
后, 然而还未同步给follower
,此时leader
宕机了,可能会呈现失落数据
的状况。
3、acks = [all | -1]
producer
期待 broker
返回 ack
信息,partition
的 leader
和follower
全副将音讯保留胜利后,才返回 ack
信息。
- 如果在
leader
和follower
将音讯都保留结束,然而在返回ack
之前,leader
宕机了,此时可能导致音讯反复
。 -
此处指的是 所有的 follower 都不同完,而不是半数以上同步完,才发送 ack。
- 因为,当须要容忍 `n` 台节点故障时,` 半数以上 ` 须要 `2n+1` 个正本,而 ` 所有 follower` 都同步完,只须要 `n+1` 个正本。
四、日志文件的 HW 和 LEO
1、HW
HW
:即 High Watermark,指的是该分区 (partition
) 中所有正本(leader+follower)中 最小的 LEO
。
- 下面中所有的正本:指的是 该分区的
ISR 汇合。
HW 之前的音讯是可生产的
,比方 HW=5, 那么能够生产的 offset 是 [0-4] 不包含 5.
2、LEO
LEO
:即 Log End Offset,即以后日志文件中,下一条
音讯待写入的 offset。
3、举例说明
比方,一个分区下有 3 个 ISR,一个 leader 和 2 个 follower。
leader 的 leo 是 6
follower01 的 leo 是 5
follower02 的 leo 是 4
那么该分区的 hw 是 4,即最小的 leo。
五、消费者的分区策略
咱们晓得,咱们的消费者 (consumer
) 是隶属于消费者组 (consumer group
) 的,消费者组中能够存在多个消费者,每个消费者能够生产多个主题 (topic
),每个主题又存在多个分区(partition
),每个分区只能由一个消费者来生产,每个消费者又能够生产多个分区。那么就必然波及到 如何将某个分区调配给那个消费者生产。
1、RangeAssignor – 基于订阅的 topic 来调配
默认分区策略
2、RoundRobinAssignor – 基于 consumer group 来调配, 可能呈现误生产别的主题的状况
eg:
topicA 存在 0,1,2 三个分区
topicB 也存在 0,1,2 三个分区
consumerA 和 consumerB 同属于一个组
consumerA 订阅 topicA
consumerB 订阅 tocpiA 和 topicB
此时依照 RoundRobinAssignor 策略,会先将所有的分区进行排序,则会产生 6 个主题分区对象(`TopicPartition`),因为 topicA 和 topicB 每个主题共有 3 个分区。此时就有可能将 topicB 的音讯发送给了 cousumerA,导致生产谬误。
3、StickyAssignor –
4、CooperativeStickyAssignor –
六、消费者 offset 的保护
1、主动提交 offset
Properties prop = new Properties();
// 设置主动提交 offset
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
// 主动提交 offset 的距离,单位 ms
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(prop);
2、手动提交 offset
consumer.commitAsync()
:异步提交 consumer.commitSync()
:同步提交
Properties prop = new Properties();
// 设置手动提交 offset
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Arrays.asList("topicA", "topicB"));
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {System.out.println("offset:" + record.offset() + "value:" + record.value());
}
// 异步提交 offset
consumer.commitAsync();
// 同步提交 offset
// consumer.commitSync();}
3、自定义提交 offset
须要实现 org.apache.kafka.clients.consumer.ConsumerRebalanceListener
接口,在消费者产生 rebalance
时,保留或获取自定义的 offset。
七、拦截器
1、生产者拦截器
实现 org.apache.kafka.clients.producer.ProducerInterceptor
接口。能够实现音讯发送到 kafka broker
之前的音讯拦挡。
1、onSend(ProducerRecord<K, V> record)办法
该办法运行在主线程中,咱们能够在该办法中对生产进行任何操作,然而最好 ` 不要批改 topic` 和 `partition`,否则可能影响音讯指标分区的计算。
2、onAcknowledgement(RecordMetadata metadata, Exception exception)
该办法运行在 `producer 的 I/O` 线程中,因而不要执行一些比拟耗时的操作,否则会拖慢 producer 的发送音讯的速度。该办法在音讯发送 kafka broker 之后返回 ack 之后执行或发送到 kafka broker 的过程中产生异样执行。
须要本人保障实现的拦截器的线程平安问题。
Properties prop = new Properties();
prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList("实现 ProducerInterceptor 类的全类名"));
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
2、消费者拦截器
实现 org.apache.kafka.clients.consumer.ConsumerInterceptor
类,能够对从 kafka broker
获取到的音讯进行拦挡。和 consumer#poll
运行在同一个线程中。
Properties prop = new Properties();
prop.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList("实现 ConsumerInterceptor 类的全类名"));
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
八、参考文档
1、http://kafka.apache.org/documentation/#gettingStarted