近段时间学习极客工夫李玥老师的后端存储实战课时,看到一个很多意思的货色:用kafka存储点击流的数据,并反复解决。在以往的应用中,kafka只是一个音讯传输的载体,音讯被生产后就不能再次生产。新常识与印象相冲突,于是就有了本篇文章:kafka数据如何被反复生产。
后期实践理解
首先我先去官网纠正了我对kafka的整体理解。
官网对kafka的形容是:一个分布式流平台。怪本人的学艺不精。
其次,我从新看了一下kafka消费者的生产过程:kafka首先通过push/poll(默认为poll)获取音讯,接管音讯解决实现后手动/主动提交生产胜利,kafka服务器则依据提交状况决定是否挪动以后偏移量。
计划确定
kafka消费者读取数据的地位是通过偏移量判断,那如果我能将偏移量手动设置为起始地位,就能实现反复生产?这个有搞头。
如何手动设置偏移量是要害。
show me the code
代码的要害次要在于偏移量设置 api 的调用,其余没什么特地。
要留神的是,代码中我别离调用了作用不同的设置偏移量,仅作为展现,可按需取用。
最初消费者音讯音讯时,我只应用默认的拉取条数设置生产一次,可按需进行批改。
/** * repeat kafka message * @param host kafka host * @param groupId kafka consumer group id * @param autoCommit whether auto commit consume * @param topic consume topic * @param consumeTimeOut consume time out */ private void textResetOffset(String host, String groupId, Boolean autoCommit, String topic, Long consumeTimeOut){ //form a properties to new consumer Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit.toString()); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties); //subscribe incoming topic consumer.subscribe(Collections.singletonList(topic)); //get consumer consume partitions List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); List<TopicPartition> topicPartitions = new ArrayList<>(); for(PartitionInfo partitionInfo : partitionInfos){ TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); topicPartitions.add(topicPartition); } // poll data from kafka server to prevent lazy operation consumer.poll(Duration.ofSeconds(consumeTimeOut)); //reset offset from beginning consumer.seekToBeginning(topicPartitions); //reset designated partition offset by designated spot int offset = 20; consumer.seek(topicPartitions.get(0), offset); //reset offset to end consumer.seekToEnd(topicPartitions); //consume message as usual ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); Iterator<ConsumerRecord<String, String>> iterator = records.iterator(); while (iterator.hasNext()){ ConsumerRecord<String, String> record = iterator.next(); log.info("consume data: {}", record.value()); } }
运行后果
需注意的点
在手动设置偏移量时,遇到了一个exception
java.lang.IllegalStateException: No current assignment for partition test-0
翻了一下stackoverflow以及官网文档后,才理解到设置偏移量是一个lazy operation,官网的解释如下。
Seek to the first offset for each of the given partitions. This function evaluates lazily, seeking to the first offset in all partitions only whenpoll(long)
) orposition(TopicPartition)
) are called. If no partition is provided, seek to the first offset for all of the currently assigned partitions.
于是我先进行一次 poll 操作后再设置偏移量。
本文首发于 cartoon的博客
转载请注明出处:https://cartoonyu.github.io/cartoon-blog/post/message-queue/kafka数据如何被反复生产/