关于kafka:kafka数据如何被重复消费

75次阅读

共计 2925 个字符,预计需要花费 8 分钟才能阅读完成。

近段时间学习极客工夫李玥老师的后端存储实战课时,看到一个很多意思的货色:用 kafka 存储点击流的数据,并反复解决。在以往的应用中,kafka 只是一个音讯传输的载体,音讯被生产后就不能再次生产。新常识与印象相冲突,于是就有了本篇文章:kafka 数据如何被反复生产。

后期实践理解

首先我先去官网纠正了我对 kafka 的整体理解。

官网对 kafka 的形容是:一个分布式流平台。怪本人的学艺不精。

其次,我从新看了一下 kafka 消费者的生产过程:kafka 首先通过 push/poll(默认为 poll)获取音讯,接管音讯解决实现后手动 / 主动提交生产胜利,kafka 服务器则依据提交状况决定是否挪动以后偏移量。

计划确定

kafka 消费者读取数据的地位是通过偏移量判断,那如果我能将偏移量手动设置为起始地位,就能实现反复生产?这个有搞头。

如何手动设置偏移量是要害。

show me the code

代码的要害次要在于偏移量设置 api 的调用,其余没什么特地。

要留神的是,代码中我别离调用了作用不同的设置偏移量,仅作为展现,可按需取用。

最初消费者音讯音讯时,我只应用默认的拉取条数设置生产一次,可按需进行批改。

 /**  
  * repeat kafka message  
  * @param host kafka host  
  * @param groupId kafka consumer group id  
  * @param autoCommit whether auto commit consume  
  * @param topic consume topic  
  * @param consumeTimeOut consume time out  
 */  
  private void textResetOffset(String host, String groupId, Boolean autoCommit, String topic, Long consumeTimeOut){  
  //form a properties to new consumer  
  Properties properties = new Properties();  
  properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);  
  properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);  
  properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit.toString());  
  properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());  
  properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());  
  KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);  
  //subscribe incoming topic  
  consumer.subscribe(Collections.singletonList(topic));  
  //get consumer consume partitions  
  List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);  
  List<TopicPartition> topicPartitions = new ArrayList<>();  
  for(PartitionInfo partitionInfo : partitionInfos){TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());  
  topicPartitions.add(topicPartition);  
  }  
  // poll data from kafka server to prevent lazy operation  
  consumer.poll(Duration.ofSeconds(consumeTimeOut));  
  //reset offset from beginning  
  consumer.seekToBeginning(topicPartitions);  
  //reset designated partition offset by designated spot  
  int offset = 20;  
  consumer.seek(topicPartitions.get(0), offset);  
  //reset offset to end  
  consumer.seekToEnd(topicPartitions);  
  //consume message as usual  
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));  
  Iterator<ConsumerRecord<String, String>> iterator = records.iterator();  
  while (iterator.hasNext()){ConsumerRecord<String, String> record = iterator.next();  
  log.info("consume data: {}", record.value());  
  }  
  }
运行后果

需注意的点

在手动设置偏移量时,遇到了一个 exception

 java.lang.IllegalStateException: No current assignment for partition test-0

翻了一下 stackoverflow 以及官网文档后,才理解到设置偏移量是一个 lazy operation,官网的解释如下。

Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the first offset in all partitions only when poll(long)) or position(TopicPartition)) are called. If no partition is provided, seek to the first offset for all of the currently assigned partitions.

于是我先进行一次 poll 操作后再设置偏移量。

本文首发于 cartoon 的博客
转载请注明出处:https://cartoonyu.github.io/cartoon-blog/post/message-queue/kafka 数据如何被反复生产 /

正文完
 0