关于kafka:kafka的基本概念

30次阅读

共计 4865 个字符,预计需要花费 13 分钟才能阅读完成。

一、基本概念

1、broker

broker 指的一个 kafka 服务器,一个 kafka 集群是由多个 kafka broker 组成。

2、producer

producer 指的是音讯生产者,即发送音讯到 kafka broker 的客户端。

3、consumer

consumer 指的音讯消费者,即从 kafka broker 获取音讯的客户端。

4、cousumer group

consumer group 指的是消费者组,领有雷同的 group id 的消费者形成一个消费者组。

  1. 消费者组与消费者之间互不影响。
  2. 消费者组内的每个消费者负责生产不同的分区的数据。
  3. 一个分区只能有同一个分区中的一个消费者生产

5、topic

topic 指的是主题。生产者生产音讯、消费者生产音讯,都须要指定一个 topic。

6、partition

partition 指的是分区。

  1. 一个 topic 能够存在多个分区。
  2. 一个分区,只能被同一个音讯者组中的某一个消费者生产。
  3. 每个分区上的音讯都是有序的 ,然而主题(topic) 上的音讯不是有序的。
  4. 每个分区可能存在多个 follower, 其中 负责读 / 写 的是leader
  5. 每个 kafka broker 能够是以后分区的 leader,也能够是其它分区的follower
  6. 多个分区能够进步程序的并发性,因为一个分区只能一个分区音讯,多个分区能够多个消费者同时生产。

7、replicas

replicas 指的是正本。数据冗余,保障集群的可用性。

  1. leader 指的是 topic 主题的分区中,每个分区的 。生产者发送数据、消费者生产数据都是从 leader 分区 中操作的。
  2. follower 指的是 topic主题的分区中,每个分区的 。负责从leader 分区 同步数据,当 leader 宕机时,follower 可能成为新的 leader。默认是从 ISR 中进行选举。设置 unclean.leader.election.enable = true 能够从非 ISR 节点中进行选举,这样能够导致失落数据,默认值是false,倡议是false
  3. ISR(in-sync replicas)

    1. 与 leader 放弃同步的 follower 汇合
    2. 如果 follower 在超过 replica.lag.time.max.ms 毫秒,没有与 leader 进行同步,则踢出 ISR。

  4. OSR(out-sync replicas)

    1. 落后 leader 太多的正本
  5. AR

    1. ISR + OSR

是否容许非 ISR 的正本参加选举。

二、生产者分区策略

即生产者,发送发送音讯后,该音讯保留到 kafka broker 上的 topic 上的那个 partition 上。

1、默认分区策略

默认的分区策略应用的是 org.apache.kafka.clients.producer.internals.DefaultPartitioner 这个类。
即咱们代码中应用 org.apache.kafka.clients.producer.ProducerRecord 发送音讯时。

  1. 在指定了 partition 的状况下,间接应用指定的 分区。
  2. 没有指定 partition 但指定了 key 的状况下,将 key 的 hash 值与 topic 的 partition

数进行取余失去 partition 值。

  1. 没有指 partitionkey, 在一个批次满的时候会发送到同一个分区,当一个新的批次创立时,会发送到另外的分区中。能够通过 KIP-480 获取更多的粘性分区常识。

2、自定义分区策略

  1. 实现 org.apache.kafka.clients.producer.Partitioner 接口。
  2. 代码实现
Properties prop = new Properties();
prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"自定义实现的分区器的全门路");
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);

三、生产者 acks 应答机制

1、acks = 0

producer不期待 broker 返回 ack 信息,只是将音讯退出到 socket 的缓冲中,可能还未发送进来。

  1. 重试 (retries) 不会失效。
  2. 为每个记录返回的偏移量始终是-1
  3. 可能会存在 失落数据 的状况。

2、acks = 1

producer期待 broker 返回 ack 信息,只有这个 partitionleader将音讯胜利保留后返回ack

  1. 如果 leader 返回 ack 后, 然而还未同步给 follower,此时leader 宕机了,可能会呈现 失落数据 的状况。

3、acks = [all | -1]

producer期待 broker 返回 ack 信息,partitionleaderfollower全副将音讯保留胜利后,才返回 ack 信息。

  • 如果在 leaderfollower将音讯都保留结束,然而在返回 ack 之前,leader宕机了,此时可能导致 音讯反复
  • 此处指的是 所有的 follower 都不同完,而不是半数以上同步完,才发送 ack。

      
      - 因为,当须要容忍 `n` 台节点故障时,` 半数以上 ` 须要 `2n+1` 个正本,而 ` 所有 follower` 都同步完,只须要 `n+1` 个正本。

四、日志文件的 HW 和 LEO

1、HW

HW:即 High Watermark,指的是该分区 (partition) 中所有正本(leader+follower)中 最小的 LEO

  • 下面中所有的正本:指的是 该分区的 ISR 汇合。
  • HW 之前的音讯是可生产的,比方 HW=5, 那么能够生产的 offset 是 [0-4] 不包含 5.

2、LEO

LEO:即 Log End Offset,即以后日志文件中,下一条 音讯待写入的 offset。

3、举例说明

比方,一个分区下有 3 个 ISR,一个 leader 和 2 个 follower。
leader 的 leo 是 6
follower01 的 leo 是 5
follower02 的 leo 是 4
那么该分区的 hw 是 4,即最小的 leo。

五、消费者的分区策略

咱们晓得,咱们的消费者 (consumer) 是隶属于消费者组 (consumer group) 的,消费者组中能够存在多个消费者,每个消费者能够生产多个主题 (topic),每个主题又存在多个分区(partition),每个分区只能由一个消费者来生产,每个消费者又能够生产多个分区。那么就必然波及到 如何将某个分区调配给那个消费者生产

1、RangeAssignor – 基于订阅的 topic 来调配
      默认分区策略
2、RoundRobinAssignor – 基于 consumer group 来调配, 可能呈现误生产别的主题的状况

eg:
     topicA 存在 0,1,2 三个分区
     topicB 也存在 0,1,2 三个分区
consumerA 和 consumerB 同属于一个组
    consumerA 订阅 topicA
    consumerB 订阅 tocpiA 和 topicB
此时依照 RoundRobinAssignor 策略,会先将所有的分区进行排序,则会产生 6 个主题分区对象(`TopicPartition`),因为 topicA 和 topicB 每个主题共有 3 个分区。此时就有可能将 topicB 的音讯发送给了 cousumerA,导致生产谬误。

3、StickyAssignor –
4、CooperativeStickyAssignor –

六、消费者 offset 的保护

1、主动提交 offset

Properties prop = new Properties();
// 设置主动提交 offset
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
// 主动提交 offset 的距离,单位 ms
prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(prop);

2、手动提交 offset

consumer.commitAsync():异步提交
consumer.commitSync():同步提交

Properties prop = new Properties();
// 设置手动提交 offset
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Arrays.asList("topicA", "topicB"));
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
    for (ConsumerRecord<String, String> record : records) {System.out.println("offset:" + record.offset() + "value:" + record.value());
    }
    // 异步提交 offset
    consumer.commitAsync();
    // 同步提交 offset
    // consumer.commitSync();}

3、自定义提交 offset

须要实现 org.apache.kafka.clients.consumer.ConsumerRebalanceListener 接口,在消费者产生 rebalance 时,保留或获取自定义的 offset。

七、拦截器

1、生产者拦截器

实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口。能够实现音讯发送到 kafka broker 之前的音讯拦挡。

1、onSend(ProducerRecord<K, V> record)办法

该办法运行在主线程中,咱们能够在该办法中对生产进行任何操作,然而最好 ` 不要批改 topic` 和 `partition`,否则可能影响音讯指标分区的计算。

2、onAcknowledgement(RecordMetadata metadata, Exception exception)

该办法运行在 `producer 的 I/O` 线程中,因而不要执行一些比拟耗时的操作,否则会拖慢 producer 的发送音讯的速度。该办法在音讯发送 kafka broker 之后返回 ack 之后执行或发送到 kafka broker 的过程中产生异样执行。

须要本人保障实现的拦截器的线程平安问题。

Properties prop = new Properties();
prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList("实现 ProducerInterceptor 类的全类名"));
KafkaProducer<String, String> producer = new KafkaProducer<>(prop);

2、消费者拦截器

实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 类,能够对从 kafka broker 获取到的音讯进行拦挡。和 consumer#poll 运行在同一个线程中。

Properties prop = new Properties();
prop.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, Collections.singletonList("实现 ConsumerInterceptor 类的全类名"));
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);

八、参考文档

1、http://kafka.apache.org/documentation/#gettingStarted

正文完
 0