Kafka-多线程代码赏析

在2020年8月 13号 IGOR BUZATOVIĆ 这个人在

https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/写下了这篇博客。

以下内容,纯属学习。

源代码门路

https://github.com/inovatrend/mtc-demo

MultithreadedKafkaConsumer

package com.inovatrend.mtcdemo;import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.errors.WakeupException;import org.apache.kafka.common.serialization.StringDeserializer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.time.Duration;import java.time.temporal.ChronoUnit;import java.util.*;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.atomic.AtomicBoolean;public class MultithreadedKafkaConsumer implements Runnable, ConsumerRebalanceListener {    private final KafkaConsumer<String, String> consumer;    private final ExecutorService executor = Executors.newFixedThreadPool(8);    private final Map<TopicPartition, Task> activeTasks = new HashMap<>();    private final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();    private final AtomicBoolean stopped = new AtomicBoolean(false);    private long lastCommitTime = System.currentTimeMillis();    private final Logger log = LoggerFactory.getLogger(MultithreadedKafkaConsumer.class);    public MultithreadedKafkaConsumer(String topic) {        Properties config = new Properties();        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");        config.put(ConsumerConfig.GROUP_ID_CONFIG, "multithreaded-consumer-demo");        consumer = new KafkaConsumer<>(config);        new Thread(this).start();    }    @Override    public void run() {        try {            consumer.subscribe(Collections.singleton("topic-name"), this);            while (!stopped.get()) {                ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));                handleFetchedRecords(records);                checkActiveTasks();                commitOffsets();            }        } catch (WakeupException we) {            if (!stopped.get())                throw we;        } finally {            consumer.close();        }    }    private void handleFetchedRecords(ConsumerRecords<String, String> records) {        if (records.count() > 0) {            List<TopicPartition> partitionsToPause = new ArrayList<>();             records.partitions().forEach(partition -> {                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);                 Task task = new Task(partitionRecords);                 partitionsToPause.add(partition);                 executor.submit(task);                 activeTasks.put(partition, task);             });            consumer.pause(partitionsToPause);        }    }    private void commitOffsets() {        try {            long currentTimeMillis = System.currentTimeMillis();            if (currentTimeMillis - lastCommitTime > 5000) {                if(!offsetsToCommit.isEmpty()) {                    consumer.commitSync(offsetsToCommit);                    offsetsToCommit.clear();                }                lastCommitTime = currentTimeMillis;            }        } catch (Exception e) {            log.error("Failed to commit offsets!", e);        }    }    private void checkActiveTasks() {        List<TopicPartition> finishedTasksPartitions = new ArrayList<>();        activeTasks.forEach((partition, task) -> {            if (task.isFinished())                finishedTasksPartitions.add(partition);            long offset = task.getCurrentOffset();            if (offset > 0)                offsetsToCommit.put(partition, new OffsetAndMetadata(offset));        });        finishedTasksPartitions.forEach(partition -> activeTasks.remove(partition));        consumer.resume(finishedTasksPartitions);    }    @Override    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {        // 1. Stop all tasks handling records from revoked partitions        Map<TopicPartition, Task> stoppedTask = new HashMap<>();        for (TopicPartition partition : partitions) {            Task task = activeTasks.remove(partition);            if (task != null) {                task.stop();                stoppedTask.put(partition, task);            }        }        // 2. Wait for stopped tasks to complete processing of current record        stoppedTask.forEach((partition, task) -> {            long offset = task.waitForCompletion();            if (offset > 0)                offsetsToCommit.put(partition, new OffsetAndMetadata(offset));        });        // 3. collect offsets for revoked partitions        Map<TopicPartition, OffsetAndMetadata> revokedPartitionOffsets = new HashMap<>();        partitions.forEach( partition -> {            OffsetAndMetadata offset = offsetsToCommit.remove(partition);            if (offset != null)                revokedPartitionOffsets.put(partition, offset);        });        // 4. commit offsets for revoked partitions        try {            consumer.commitSync(revokedPartitionOffsets);        } catch (Exception e) {            log.warn("Failed to commit offsets for revoked partitions!");        }    }    @Override    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {        consumer.resume(partitions);    }    public void stopConsuming() {        stopped.set(true);        consumer.wakeup();    }}

Task

上面赏析线程代码

package com.inovatrend.mtcdemo;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.List;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.atomic.AtomicLong;import java.util.concurrent.locks.ReentrantLock;public class Task implements Runnable {    private final List<ConsumerRecord<String, String>> records;    private volatile boolean stopped = false;    private volatile boolean started = false;    private volatile boolean finished = false;    private final CompletableFuture<Long> completion = new CompletableFuture<>();    private final ReentrantLock startStopLock = new ReentrantLock();    private final AtomicLong currentOffset = new AtomicLong();    private Logger log = LoggerFactory.getLogger(Task.class);    public Task(List<ConsumerRecord<String, String>> records) {        this.records = records;    }    public void run() {        startStopLock.lock();        if (stopped){            return;        }        started = true;        startStopLock.unlock();        for (ConsumerRecord<String, String> record : records) {            if (stopped)                break;            // process record here and make sure you catch all exceptions;            currentOffset.set(record.offset() + 1);        }        finished = true;        completion.complete(currentOffset.get());    }    public long getCurrentOffset() {        return currentOffset.get();    }    public void stop() {        startStopLock.lock();        this.stopped = true;        if (!started) {            finished = true;            completion.complete(currentOffset.get());        }        startStopLock.unlock();    }    public long waitForCompletion() {        try {            return completion.get();        } catch (InterruptedException | ExecutionException e) {            return -1;        }    }    public boolean isFinished() {        return finished;    }}

剖析

1.手动提交偏移量

属性:enable.auto.commit 设置为true ;则在轮询办法之后主动提交数据偏移量。

若设置为false ;则须要上面两种:

  • commitSync() 在记录解决实现且下一个轮询办法调用前
  • 实现 ConsumerRebalanceListener 这个接口,重写其中的办法,比方当分区被撤销,此时提交偏移量

2.处理速度慢的问题

当轮询获取的音讯,之后解决逻辑简单,如果消费者未能再此工夫距离内调用轮询办法,那此消费者会被移除监听。

kafka的max.poll.interval.ms配置,默认是5分钟,当应用线程生产模型时,,你能够依据上面两个配置解决这个问题。

  • max.poll.recoreds 设置更小的值
  • max.poll.interval.ms 设置更高的值
  • 执行两者的组合;看逻辑执行工夫,若轮询记录大小为50,每次逻辑解决为6秒,则是300秒(5分钟);这能够减小50,和进步工夫距离300秒以上

3.解决音讯异样

对程序中的异样解决,如下三种选项:

  • 进行解决且敞开生产(在此操作之前,能够抉择重试几次)
  • 将记录发送到死信队列且持续下个记录(在此操作之前,能够抉择重试几次)
  • 重试,直到胜利解决记录(这个可能破费很长时间)

第三种抉择,有限重试,在某些场景中是可取的。列如,如果一个内部零碎脱机了,且波及到写操作,你可能想放弃重试,直到内部零碎可用,无论它破费多久。

当然,在kafka中,因为有max.poll.interval.ms,所以在每一个线程执行生产模型时,当个记录解决必须在一个工夫限度内实现。否正会超出规定工夫,被生产组移除。

对于此起因,必须为重试实现相当简单的逻辑。

4.多线程下的不良影响

  • 1.在一个记录解决之前偏移量可能被提交
  • 2.从雷同分区获取的音讯可能被平行解决(呈现屡次雷同记录解决),音讯解决的程序不能保障

咱们当然心愿多线程像单线程一样放弃执行程序,且不反复获取雷同分区的雷同记录。对于此篇文章中的taskconsumer也只是,提供了解决问题的思路,不是适宜所有场景。

实现线程池、且配置好分区轮询获取的记录大小及数据量。

5.保障解决程序

既然轮询是以多线程形式,去解决逻辑,那能够在线程模型中,分区解决实现后,对消费者暂停此记录分区汇合。待所有主线程执行实现后,消费者再放开分区限度。

大体思路即如此。这里用到了KafkaConsumer两个API:

  • pause(Collection<TopicPartition> partitions)
  • resume(Collection<TopicPartition> partitions)

这里对于放开,也不是放开所有的task的分区。而是放开实现子线程的工作的分区。

6.解决组再均衡

因为是多线程,那消费者可能再均衡,且一些分区能够再调配给另外的消费者,此时仍旧有一些线程再执行那些分区的记录,这样,一些记录就可能被多个消费者解决。呈现反复

数据等。

当然,通过解决撤销分区的记录实现,且再分区被重写调配之前提交绝对应的偏移量,能够最小化因为组重均衡引起的反复解决。

ConsumerRebalanceListener 的实例作为参数设置KafkaConsumer.subscribe()办法,这样重写onPartiionsRevoke()办法。因为此被调用来自生产的轮询办法,产生再主线程。

所以consumer.commitSync()同步提交,不必放心报ConcurretnModificationException

如果有些线程工作以后正在解决来自撤销分区的记录。有两种选项能够解决这状况。

  • 1.期待所有线程工作实现。
  • 2.进行这些线程工作,且直期待以后被解决实现的记录。

在上述实现之后,这些分区的偏移量能够被提交。

onPartitionsRevoked() 办法期待后果正阻塞这个生产主过程,因而要意识到等太长时间会超出max.poll.interval.ms工夫距离,导致此生产被组移除。所以,

这第二种略微好些,因为它破费较少的工夫。

所以,在和其余零碎交互时,应该抉择比max.poll.interval.ms工夫距离更小,以防呈现下面的状况。

如果一个会话超时产生,这对应的偏移量不应该被提交,因为这个申请将不被作为胜利提交。这意味着这被解决的记录在分区被重新分配之后将会再次解决,那将会产生反复操作在零碎中,除非写操作是等幂的(任何几次操作和一次操作的影响一样)。

总结

实现一个多线程生产模型比每个消费者线程模型提供了更重要的长处,在这些用例中。只管有很多办法去实现,但对应的关键点总是雷同的:

  • 确保从分区来的记录通过一个线程被解决仅仅执行一次
  • 在记录被解决之后,主线程提交偏移量
  • 妥善处理组在均衡

CONFLUENT的博客 中还有一些二值得举荐。比方:

  • GitHub repository.源代码
  • watch my Kafka Summit talk
  • read about the Confluent Parallel Consumer