一、基本概念

1、broker

broker 指的一个kafka服务器,一个kafka集群是由多个 kafka broker 组成。

2、producer

producer 指的是音讯生产者,即发送音讯到 kafka broker 的客户端。

3、consumer

consumer 指的音讯消费者,即从 kafka broker 获取音讯的客户端。

4、cousumer group

consumer group 指的是消费者组,领有雷同的 group id 的消费者形成一个消费者组。

  1. 消费者组与消费者之间互不影响。
  2. 消费者组内的每个消费者负责生产不同的分区的数据。
  3. 一个分区只能有同一个分区中的一个消费者生产

5、topic

topic 指的是主题。生产者生产音讯、消费者生产音讯,都须要指定一个topic。

6、partition

partition 指的是分区。

  1. 一个 topic 能够存在多个分区。
  2. 一个分区,只能被同一个音讯者组中的某一个消费者生产。
  3. 每个分区上的音讯都是有序的,然而主题(topic)上的音讯不是有序的。
  4. 每个分区可能存在多个follower,其中负责读/写的是leader
  5. 每个kafka broker能够是以后分区的leader,也能够是其它分区的follower
  6. 多个分区能够进步程序的并发性,因为一个分区只能一个分区音讯,多个分区能够多个消费者同时生产。

7、replicas

replicas 指的是正本。数据冗余,保障集群的可用性。

  1. leader 指的是topic主题的分区中,每个分区的 。生产者发送数据、消费者生产数据都是从 leader分区 中操作的。
  2. follower 指的是 topic主题的分区中,每个分区的 。负责从leader分区同步数据,当 leader 宕机时,follower 可能成为新的 leader。默认是从 ISR 中进行选举。设置unclean.leader.election.enable = true能够从非ISR节点中进行选举,这样能够导致失落数据,默认值是false,倡议是false
  3. ISR(in-sync replicas)

    1. 与leader放弃同步的follower汇合
    2. 如果follower在超过 replica.lag.time.max.ms 毫秒,没有与leader进行同步,则踢出ISR。

  4. OSR(out-sync replicas)

    1. 落后leader太多的正本
  5. AR

    1. ISR + OSR

是否容许非ISR的正本参加选举。

二、生产者分区策略

即生产者,发送发送音讯后,该音讯保留到kafka broker上的topic上的那个partition上。

1、默认分区策略

默认的分区策略应用的是 org.apache.kafka.clients.producer.internals.DefaultPartitioner 这个类。
即咱们代码中应用org.apache.kafka.clients.producer.ProducerRecord发送音讯时。

  1. 在指定了 partition 的状况下,间接应用指定的 分区。
  2. 没有指定 partition 但指定了key的状况下,将key的hash值与topic的partition

数进行取余失去partition值。

  1. 没有指partitionkey,在一个批次满的时候会发送到同一个分区,当一个新的批次创立时,会发送到另外的分区中。能够通过 KIP-480 获取更多的粘性分区常识。

2、自定义分区策略

  1. 实现 org.apache.kafka.clients.producer.Partitioner 接口。
  2. 代码实现
Properties prop = new Properties();prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"自定义实现的分区器的全门路");KafkaProducer<String, String> producer = new KafkaProducer<>(prop);

三、生产者 acks 应答机制

1、acks = 0

producer不期待broker返回ack信息,只是将音讯退出到socket的缓冲中,可能还未发送进来。

  1. 重试(retries)不会失效。
  2. 为每个记录返回的偏移量始终是-1
  3. 可能会存在失落数据的状况。

2、acks = 1

producer期待broker返回ack信息,只有这个partitionleader将音讯胜利保留后返回ack

  1. 如果leader返回ack后,然而还未同步给follower,此时leader宕机了,可能会呈现失落数据的状况。

3、acks = [all | -1]

producer期待broker返回ack信息,partitionleaderfollower全副将音讯保留胜利后,才返回ack信息。

  • 如果在leaderfollower将音讯都保留结束,然而在返回ack之前,leader宕机了,此时可能导致 音讯反复
  • 此处指的是 所有的follower 都不同完,而不是半数以上同步完,才发送ack。

        - 因为,当须要容忍 `n` 台节点故障时,`半数以上` 须要`2n+1`个正本,而`所有follower`都同步完,只须要`n+1`个正本。            

四、日志文件的 HW 和 LEO

1、HW

HW : 即 High Watermark,指的是该分区(partition)中所有正本(leader+follower)中最小的LEO

  • 下面中所有的正本:指的是 该分区的 ISR汇合。
  • HW 之前的音讯是可生产的,比方 HW=5,那么能够生产的 offset 是 [0-4] 不包含5.

2、LEO

LEO :即 Log End Offset,即以后日志文件中,下一条音讯待写入的 offset。

3、举例说明

比方,一个分区下有3个ISR,一个leader和2个follower。
leader 的leo 是 6
follower01 的leo是 5
follower02 的 leo 是 4
那么该分区的 hw 是 4,即最小的leo。

五、消费者的分区策略

咱们晓得,咱们的消费者(consumer)是隶属于消费者组(consumer group)的,消费者组中能够存在多个消费者,每个消费者能够生产多个主题(topic),每个主题又存在多个分区(partition),每个分区只能由一个消费者来生产,每个消费者又能够生产多个分区。那么就必然波及到 如何将某个分区调配给那个消费者生产

1、RangeAssignor - 基于订阅的 topic来调配
     默认分区策略
2、RoundRobinAssignor - 基于consumer group来调配,可能呈现误生产别的主题的状况

eg:     topicA 存在 0,1,2 三个分区     topicB 也存在 0,1,2三个分区consumerA 和 consumerB 同属于一个组    consumerA 订阅 topicA    consumerB 订阅 tocpiA 和 topicB此时依照 RoundRobinAssignor 策略,会先将所有的分区进行排序,则会产生6个主题分区对象(`TopicPartition`),因为topicA和topicB 每个主题共有3个分区。此时就有可能将topicB的音讯发送给了cousumerA,导致生产谬误。

3、StickyAssignor -
4、CooperativeStickyAssignor -

六、消费者offset的保护

1、主动提交offset

Properties prop = new Properties();// 设置主动提交offsetprop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);// 主动提交offset的距离,单位msprop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);KafkaConsumer<String,String> consumer = new KafkaConsumer<>(prop);

2、手动提交offset

consumer.commitAsync() : 异步提交
consumer.commitSync() : 同步提交

Properties prop = new Properties();// 设置手动提交offsetprop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);consumer.subscribe(Arrays.asList("topicA", "topicB"));while (true) {    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));    for (ConsumerRecord<String, String> record : records) {        System.out.println("offset: " + record.offset() + "value:" + record.value());    }    // 异步提交offset    consumer.commitAsync();    // 同步提交offset    // consumer.commitSync();}

3、自定义提交offset

须要实现 org.apache.kafka.clients.consumer.ConsumerRebalanceListener 接口,在消费者产生 rebalance 时,保留或获取自定义的offset。

七、拦截器

1、生产者拦截器

实现org.apache.kafka.clients.producer.ProducerInterceptor接口。能够实现音讯发送到kafka broker之前的音讯拦挡。

1、onSend(ProducerRecord<K, V> record)办法

该办法运行在主线程中,咱们能够在该办法中对生产进行任何操作,然而最好`不要批改topic`和`partition`,否则可能影响音讯指标分区的计算。

2、onAcknowledgement(RecordMetadata metadata, Exception exception)

该办法运行在`producer 的 I/O`线程中,因而不要执行一些比拟耗时的操作,否则会拖慢producer的发送音讯的速度。该办法在音讯发送kafka broker之后返回ack之后执行或发送到kafka broker的过程中产生异样执行。

须要本人保障实现的拦截器的线程平安问题。

Properties prop = new Properties();prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList("实现ProducerInterceptor类的全类名"));KafkaProducer<String, String> producer = new KafkaProducer<>(prop);

2、消费者拦截器

实现org.apache.kafka.clients.consumer.ConsumerInterceptor类,能够对从kafka broker获取到的音讯进行拦挡。和consumer#poll运行在同一个线程中。

Properties prop = new Properties();prop.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList("实现ConsumerInterceptor类的全类名"));KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);

八、参考文档

1、http://kafka.apache.org/documentation/#gettingStarted