关于kafka:kafka-系列-41消费者基本介绍

37次阅读

共计 7287 个字符,预计需要花费 19 分钟才能阅读完成。

1、消费者食用 DEMO

Properties prop = new Properties();
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumer");
prop.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumerDemo");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton("test"));
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {String key = record.key();
        String value = record.value();
        System.err.println(record.toString());
    }
}

2、消费者基本概念

kafka 消费者是以 组为根本单位 进行生产的。生产的模型如下

1 个 topic 容许被多个 生产组 生产。再次强调,kafka 生产是以组为单位。

prop.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumer");

以上这行代码设置了生产组。

2.1、partition 调配

topic 为逻辑上的概念,partition 才是物理上的概念。那么看完这个以上的生产模型图。你可能会很纳闷。当一个组下有多个消费者时,每个消费者是如何生产的?

先阐明:partition 的调配为平均分配

假如一:topic1 上面有 3 个分区。别离如下:p1 – p3。那么 groupA 下的三个消费者生产的对应 partition 为如下

instance1: p1
instance2: p2
instance3: p3

假如二:topic1 上面有 8 个分区。别离为 p1 – p8。那么 groupA 中每个消费者调配到的 partition 就如下

instance1: p1,p2,p3
instance2: p4,p5,p6
instance3: p7,p8

2.2、partition 重调配

假如三:topic1 上面有 8 个分区:P1 – P8。groupA 有三个消费者:c1,c2,c3。此时调配的 partition 如下

c1: p1,p2,p3
c2: p4,p5,p6
c3: p7,p8

如果此时,又有一个新的消费者退出到 groupA 会产生什么呢?partition 会被重新分配

c1: p1,p2
c2: p3,p4
c3: p5,p6
c4: p7,p8

3、消费者端 API 介绍

3.1、订阅主题

void subscribe(Collection<String> topics);
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);

从办法上看 kafka 容许一个消费者订阅多个 topic

void subscribe(Pattern pattern);
void subscribe(Pattern pattern, ConsumerRebalanceListener callback);

入参 Pattern 则示意,能够应用正则表达式匹配多个 topic. 实例代码如下

Pattern pattern = Pattern.compile("test?");
consumer.subscribe(pattern);

能够订阅主题,那么天然也能够勾销订阅主题

consumer.unsubscribe();

当然,也能够间接获取到生产组订阅的主题

Set<String> topics = consumer.subscription();

一个主题上面有多个 partition, 那么是否能够指定要生产的队列呢?答案是能够的

TopicPartition p1 = new TopicPartition("test1", 0);
TopicPartition p2 = new TopicPartition("test1", 1);
consumer.assign(Arrays.asList(p1, p2));

不过须要留神的是,如果指定了生产的分区,那么是消费者是无奈主动 rebanlance 的。

3.2、音讯生产

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

从消费者端的这行代码,咱们能够看出,kafka 音讯生产采纳的是拉取模式。当未拉取到音讯时,会阻塞线程。

poll 办法返回的 ConsumerRecords 实现 Iterable 接口,是 ConsumerRecord 的迭代器。ConsumerRecord 属性绝对简略

public class ConsumerRecord<K, V> { 
    private final String topic; // 主题
    private final int partition; // 分区
    private final long offset; // 音讯所属分区偏移量
    private final long timestamp; // 工夫戳
    private final TimestampType timestampType; // 两者类型,音讯创立工夫戳及音讯追加到日志的工夫戳 
    private final int serializedKeySize;
    private final int serializedValueSize; 
    private final Headers headers; // 发送的 header
    private final K key;   // 发送的 key
    private final V value; // 发送的内容
    private volatile Long checksum; // CRC32 校验值
} 

3.3、位移提交

对于分区而言,音讯会有一个惟一 offset, 示意音讯在分区中的地位,称之为 偏移量 。对于音讯生产而言,也有生产进度的 offset,称之为 位移
kafka 将音讯的生产进度存储在 kafka 外部主题 __onsumer_offset 中。
kafka 中默认每隔 5s 保留音讯的生产进度。可通过 auto.commit.interval.ms 进行配置。

kafka 提供手动提交的 API,上面演示一下。

Properties prop = new Properties();
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "testConsumer");
prop.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumerDemo");
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
consumer.subscribe(Collections.singleton("test"));
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {System.out.println("生产:" + record.toString());
    }
    consumer.commitSync();}

须要留神的是,须要将 enable.auto.commit 设置为 true.

3.4、设置新生产组从哪个地位开始生产

kafka 设置 新生产组 从哪个地位开始生产的配置为:auto.offset.reset
该配置有以下 3 个配置项

  • latest(默认配置)

默认从最新的地位,开始生产。

  • earliest

从最早的地位开始生产。当配置为该参数时,kafka 会打印如下日志:Resetting offset for partition

  • none

当生产组,没有对应生产进度时,会间接抛 NoOffsetForPartitionException 异样

kafka 还提供了 seek(TopicPartition partition, long offset) 办法,容许新的消费者,设置从哪个地位开始生产。

// 因为调配 分区的动作,产生在 pool 中,因而在设置生产偏移量时,须要先拉取音讯
Set<TopicPartition> assignment = new HashSet<>();

while (assignment.size() == 0) {consumer.poll(Duration.ofMillis(100));
    assignment = consumer.assignment();}

for (TopicPartition tp : assignment) {consumer.seek(tp, 50);
}
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {System.err.println("生产:" + record.toString());
    }
}

更多状况下,咱们可能会指定生产组从指定的工夫点开始生产

Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
for (TopicPartition tp : assignment) {
    // 指定从一天前开始生产
    timestampToSearch.put(tp, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}

Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch);

for (TopicPartition tp : assignment) {OffsetAndTimestamp timestamp = offsets.get(tp);
    if (null != timestamp) {consumer.seek(tp, timestamp.offset());
    }
}

3.5、分区再平衡

在分区再平衡期间,生产组内的消费者是无奈读取音讯的。并且如果之前的消费者没有及时提交生产进度,那么会造成反复生产。

kafkasubscribe 的时候,提供了回调函数,容许咱们在触发再平衡时,做管制

void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)

看一下 ConsumerRebalanceListener 定义的接口

// 再平衡开始之前和消费者进行读取音讯之前被调用,可利用该会掉,提交生产位移
void onPartitionsRevoked(Collection<TopicPartition> partitions);

// 从新分区后,消费者开始读取音讯之前被调用
void onPartitionsAssigned(Collection<TopicPartition> partitions);

上面演示,如何在再平衡之前,提交生产偏移

consumer.subscribe(Collections.singleton("test"), new ConsumerRebalanceListener() {

    // 在再平衡开始之前和消费者进行读取音讯之前被调用,可利用该会掉,提交生产位移
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 提交生产偏移
        consumer.commitSync();}
    
    // 从新分区后,消费者开始读取音讯之前被调用
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}});

3.6、消费者拦截器

消费者,容许在 生产之前 生产偏移提交之后 敞开之前 ,进行管制,多个拦截器则组成拦截器链, 且多个拦截器之前须要用 ‘,’ 号隔开。
先看拦截器定义的接口

public interface ConsumerInterceptor<K, V> extends Configurable {
    // 音讯生产之前
    ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
    
    // 提交之后调用
    void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
    
    // 敞开之前调用
    void close();}
Properties prop = new Properties();
prop.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName() + "," + MyConsumerInterceptor2.class.getName());

3.7、重要的消费者参数

  • fetch.min.bytes

默认 1Bpoll 时,拉取的最小数据量。

  • fetch.max.bytes

默认 5242880B,50MB,poll 时,拉取的最大数据量。

  • fetch.max.wait.ms

默认 500ms,如果 kafka 始终没有触发 poll 动作,那么最多期待 fetch.max.wait.ms

  • max.partition.fetch.bytes

默认 1048576B,1MB,分区拉取时的最大数据量

  • max.poll.records

默认 500 条,拉取的最大音讯条数

  • connections.max.idle.ms

默认 540000ms, 9 分钟,多久敞开闲置的连贯

  • receive.buffer.bytes

默认 65536B64KBSOCKET 承受音讯的缓冲区(SO_RECBUF

  • request.timeout.ms

默认 30000ms,配置 consumer 期待申请响应的最长工夫

  • metadata.max.age.ms

默认 300000ms,5 分钟,配置元数据过期工夫。元数据在限定的工夫内,没有更新,会被强制更新

  • reconnect.backoff.ms

默认 50ms,配置尝试连贯指定主机之前的等待时间,防止频繁连贯主机

  • retry.backoff.ms

默认 100ms,发送失败时,2 次的间隔时间

4、总结

  1. kafka 生产以组为单位,且容许一个生产组订阅多个 topic
  2. partition 重调配算法,为均匀算法
  3. KafkaConsumer 为线程不平安。因而 poll() 只有以后线程在拉取音讯。kafka 要实现多线程拉取绝对麻烦
  4. kafka 消费者端,提供的 API 非常灵活,容许从指定的地位生产,容许手动提交某个分区的生产偏移
  5. kafka 提供消费者拦截器链,容许在 生产之前,提交生产偏移之后 管制。

5、与 RocketMQ 异同

  1. RocketMQ 倡议 1 个生产组只生产一个 topic, 且在理论开发中,如果消费者订阅多个 topic 会无奈失常工作。kafka 中 1 个消费者能够订阅多个 topic
  2. RocketMQ 能够确保生产时,音讯不失落,kafka 无奈保障。
  3. RocketMQ 在消费者端,实现了多线程生产,kafka 则没有
  4. kafka 默认每 5s 长久化生产进度,RocketMQ 也是。不过 RocketMQ 会提交偏移量最小的音讯。比方,线程 A 生产了 20 的音讯。线程 B 生产了 10 的音讯。当线程 A 提交生产进度的时候,会提交 10,而不会提交 20。这也是 RocketMQ 能够确保音讯生产时不丢的起因。
  5. RocketMQ 产生 rebalance,即 kafka 的再调配。默认和 kafka 统一,采纳的是 平均分配算法。不过 RocketMQ 容许自定义再调配算法,且提供了丰盛算法反对。
  6. RocketMQkafka 统一,都存在反复生产问题。
  7. 从裸露进去的 API 来看,kafka 客户端会比 RocketMQ 更加灵便。
  8. kafka 设置 新的生产组 从哪个地位开始生产,没有额定的条件限度;RocketMQ 只有当旧音讯沉积十分多时,才会无效。

正文完
 0