一、基本概念
1、broker
broker
指的一个kafka
服务器,一个kafka集群是由多个 kafka broker 组成。
2、producer
producer
指的是音讯生产者,即发送音讯到 kafka broker
的客户端。
3、consumer
consumer
指的音讯消费者,即从 kafka broker
获取音讯的客户端。
4、cousumer group
consumer group
指的是消费者组,领有雷同的 group id
的消费者形成一个消费者组。
- 消费者组与消费者之间互不影响。
- 消费者组内的每个消费者负责生产不同的分区的数据。
一个分区只能有同一个分区中的一个消费者生产
。
5、topic
topic
指的是主题。生产者生产音讯、消费者生产音讯,都须要指定一个topic。
6、partition
partition
指的是分区。
- 一个
topic
能够存在多个分区。 - 一个分区,只能被同一个音讯者组中的某一个消费者生产。
每个分区上的音讯都是有序的
,然而主题(topic
)上的音讯不是有序的。- 每个分区可能存在多个
follower
,其中负责读/写
的是leader
。 - 每个
kafka broker
能够是以后分区的leader,也能够是其它分区的follower
。 - 多个分区能够进步程序的并发性,因为一个分区只能一个分区音讯,多个分区能够多个消费者同时生产。
7、replicas
replicas
指的是正本。数据冗余,保障集群的可用性。
leader
指的是topic
主题的分区中,每个分区的主
。生产者发送数据、消费者生产数据都是从leader分区
中操作的。follower
指的是topic
主题的分区中,每个分区的从
。负责从leader分区
同步数据,当 leader 宕机时,follower 可能成为新的 leader。默认是从ISR
中进行选举。设置unclean.leader.election.enable = true
能够从非ISR
节点中进行选举,这样能够导致失落数据,默认值是false
,倡议是false
。ISR(in-sync replicas)
- 与leader放弃同步的follower汇合
- 如果follower在超过
replica.lag.time.max.ms
毫秒,没有与leader进行同步,则踢出ISR。
OSR(out-sync replicas)
- 落后leader太多的正本
AR
- ISR + OSR
是否容许非ISR的正本参加选举。
二、生产者分区策略
即生产者,发送发送音讯后,该音讯保留到kafka broker
上的topic
上的那个partition
上。
1、默认分区策略
默认的分区策略应用的是 org.apache.kafka.clients.producer.internals.DefaultPartitioner
这个类。
即咱们代码中应用org.apache.kafka.clients.producer.ProducerRecord
发送音讯时。
- 在指定了
partition
的状况下,间接应用指定的 分区。 - 没有指定
partition
但指定了key
的状况下,将key的hash值与topic的partition
数进行取余失去partition值。
- 没有指
partition
和key
,在一个批次满的时候会发送到同一个分区,当一个新的批次创立时,会发送到另外的分区中。能够通过KIP-480
获取更多的粘性分区常识。
2、自定义分区策略
- 实现
org.apache.kafka.clients.producer.Partitioner
接口。 - 代码实现
Properties prop = new Properties();prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"自定义实现的分区器的全门路");KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
三、生产者 acks 应答机制
1、acks = 0
producer
不期待broker
返回ack
信息,只是将音讯退出到socket的缓冲中,可能还未发送进来。
- 重试(
retries
)不会失效。 - 为每个记录返回的偏移量始终是
-1
。 - 可能会存在
失落数据
的状况。
2、acks = 1
producer
期待broker
返回ack
信息,只有这个partition
的leader
将音讯胜利保留后返回ack
。
- 如果
leader
返回ack
后,然而还未同步给follower
,此时leader
宕机了,可能会呈现失落数据
的状况。
3、acks = [all | -1]
producer
期待broker
返回ack
信息,partition
的leader
和follower
全副将音讯保留胜利后,才返回ack
信息。
- 如果在
leader
和follower
将音讯都保留结束,然而在返回ack
之前,leader
宕机了,此时可能导致音讯反复
。 此处指的是 所有的follower 都不同完,而不是半数以上同步完,才发送ack。
- 因为,当须要容忍 `n` 台节点故障时,`半数以上` 须要`2n+1`个正本,而`所有follower`都同步完,只须要`n+1`个正本。
四、日志文件的 HW 和 LEO
1、HW
HW
: 即 High Watermark,指的是该分区(partition
)中所有正本(leader+follower)中最小的LEO
。
- 下面中所有的正本:指的是 该分区的
ISR汇合。
HW 之前的音讯是可生产的
,比方 HW=5,那么能够生产的 offset 是 [0-4] 不包含5.
2、LEO
LEO
:即 Log End Offset,即以后日志文件中,下一条
音讯待写入的 offset。
3、举例说明
比方,一个分区下有3个ISR,一个leader和2个follower。
leader 的leo 是 6
follower01 的leo是 5
follower02 的 leo 是 4
那么该分区的 hw 是 4,即最小的leo。
五、消费者的分区策略
咱们晓得,咱们的消费者(consumer
)是隶属于消费者组(consumer group
)的,消费者组中能够存在多个消费者,每个消费者能够生产多个主题(topic
),每个主题又存在多个分区(partition
),每个分区只能由一个消费者来生产,每个消费者又能够生产多个分区。那么就必然波及到 如何将某个分区调配给那个消费者生产。
1、RangeAssignor - 基于订阅的 topic来调配
默认分区策略
2、RoundRobinAssignor - 基于consumer group来调配,可能呈现误生产别的主题的状况
eg: topicA 存在 0,1,2 三个分区 topicB 也存在 0,1,2三个分区consumerA 和 consumerB 同属于一个组 consumerA 订阅 topicA consumerB 订阅 tocpiA 和 topicB此时依照 RoundRobinAssignor 策略,会先将所有的分区进行排序,则会产生6个主题分区对象(`TopicPartition`),因为topicA和topicB 每个主题共有3个分区。此时就有可能将topicB的音讯发送给了cousumerA,导致生产谬误。
3、StickyAssignor -
4、CooperativeStickyAssignor -
六、消费者offset的保护
1、主动提交offset
Properties prop = new Properties();// 设置主动提交offsetprop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);// 主动提交offset的距离,单位msprop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);KafkaConsumer<String,String> consumer = new KafkaConsumer<>(prop);
2、手动提交offset
consumer.commitAsync()
: 异步提交consumer.commitSync()
: 同步提交
Properties prop = new Properties();// 设置手动提交offsetprop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);consumer.subscribe(Arrays.asList("topicA", "topicB"));while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<String, String> record : records) { System.out.println("offset: " + record.offset() + "value:" + record.value()); } // 异步提交offset consumer.commitAsync(); // 同步提交offset // consumer.commitSync();}
3、自定义提交offset
须要实现 org.apache.kafka.clients.consumer.ConsumerRebalanceListener
接口,在消费者产生 rebalance
时,保留或获取自定义的offset。
七、拦截器
1、生产者拦截器
实现org.apache.kafka.clients.producer.ProducerInterceptor
接口。能够实现音讯发送到kafka broker
之前的音讯拦挡。
1、onSend(ProducerRecord<K, V> record)办法
该办法运行在主线程中,咱们能够在该办法中对生产进行任何操作,然而最好`不要批改topic`和`partition`,否则可能影响音讯指标分区的计算。
2、onAcknowledgement(RecordMetadata metadata, Exception exception)
该办法运行在`producer 的 I/O`线程中,因而不要执行一些比拟耗时的操作,否则会拖慢producer的发送音讯的速度。该办法在音讯发送kafka broker之后返回ack之后执行或发送到kafka broker的过程中产生异样执行。
须要本人保障实现的拦截器的线程平安问题。
Properties prop = new Properties();prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList("实现ProducerInterceptor类的全类名"));KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
2、消费者拦截器
实现org.apache.kafka.clients.consumer.ConsumerInterceptor
类,能够对从kafka broker
获取到的音讯进行拦挡。和consumer#poll
运行在同一个线程中。
Properties prop = new Properties();prop.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList("实现ConsumerInterceptor类的全类名"));KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
八、参考文档
1、http://kafka.apache.org/documentation/#gettingStarted