乐趣区

关于kafka:你真的了解Flink-Kafka-source吗

Flink 提供了专门的 Kafka 连接器,向 Kafka topic 中读取或者写入数据。Flink Kafka Consumer 集成了 Flink 的 Checkpoint 机制,可提供 exactly-once 的解决语义。为此,Flink 并不齐全依赖于跟踪 Kafka 生产组的偏移量,而是在外部跟踪和查看偏移量。

引言

当咱们在应用 Spark Streaming、Flink 等计算框架进行数据实时处理时,应用 Kafka 作为一款公布与订阅的音讯零碎成为了标配。Spark Streaming 与 Flink 都提供了绝对应的 Kafka Consumer,应用起来十分的不便,只须要设置一下 Kafka 的参数,而后增加 kafka 的 source 就高枕无忧了。如果你真的感觉事件就是如此的 so easy,感觉妈妈再也不必放心你的学习了,那就真的是 too young too simple sometimes naive 了。本文以 Flink 的 Kafka Source 为探讨对象,首先从根本的应用动手,而后深刻源码逐个分析,一并为你拨开 Flink Kafka connector 的神秘面纱。值得注意的是,本文假设读者具备了 Kafka 的相干常识,对于 Kafka 的相干细节问题,不在本文的探讨范畴之内。

Flink Kafka Consumer 介绍

Flink Kafka Connector 有很多个版本,能够依据你的 kafka 和 Flink 的版本抉择相应的包(maven artifact id)和类名。本文所波及的 Flink 版本为 1.10,Kafka 的版本为 2.3.4。Flink 所提供的 Maven 依赖于类名如下表所示:

Demo 示例

增加 Maven 依赖

<!-- 本文应用的是通用型的 connector-->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.10.0</version>
</dependency>

简略代码案例

public class KafkaConnector {public static void main(String[] args) throws Exception {StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        // 开启 checkpoint,工夫距离为毫秒
        senv.enableCheckpointing(5000L);
        // 抉择状态后端
        senv.setStateBackend((StateBackend) new FsStateBackend("file:///E://checkpoint"));
        //senv.setStateBackend((StateBackend) new FsStateBackend("hdfs://kms-1:8020/checkpoint"));
        Properties props = new Properties();
        // kafka broker 地址
        props.put("bootstrap.servers", "kms-2:9092,kms-3:9092,kms-4:9092");
        // 仅 kafka0.8 版本须要配置
        props.put("zookeeper.connect", "kms-2:2181,kms-3:2181,kms-4:2181");
        // 消费者组
        props.put("group.id", "test");
        // 主动偏移量提交
        props.put("enable.auto.commit", true);
        // 偏移量提交的工夫距离,毫秒
        props.put("auto.commit.interval.ms", 5000);
        // kafka 音讯的 key 序列化器
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // kafka 音讯的 value 序列化器
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 指定 kafka 的消费者从哪里开始生产数据
        // 共有三种形式,// #earliest
        // 当各分区下有已提交的 offset 时,从提交的 offset 开始生产;// 无提交的 offset 时,从头开始生产
        // #latest
        // 当各分区下有已提交的 offset 时,从提交的 offset 开始生产;// 无提交的 offset 时,生产新产生的该分区下的数据
        // #none
        // topic 各分区都存在已提交的 offset 时,// 从 offset 后开始生产;// 只有有一个分区不存在已提交的 offset,则抛出异样
        props.put("auto.offset.reset", "latest");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                "qfbap_ods.code_city",
                new SimpleStringSchema(),
                props);
        // 设置 checkpoint 后在提交 offset,即 oncheckpoint 模式
        // 该值默认为 true,consumer.setCommitOffsetsOnCheckpoints(true);
     
        // 最早的数据开始生产
        // 该模式下,Kafka 中的 committed offset 将被疏忽,不会用作起始地位。//consumer.setStartFromEarliest();

        // 消费者组最近一次提交的偏移量,默认。// 如果找不到分区的偏移量,那么将会应用配置中的 auto.offset.reset 设置
        //consumer.setStartFromGroupOffsets();

        // 最新的数据开始生产
        // 该模式下,Kafka 中的 committed offset 将被疏忽,不会用作起始地位。//consumer.setStartFromLatest();

        // 指定具体的偏移量工夫戳, 毫秒
        // 对于每个分区,其工夫戳大于或等于指定工夫戳的记录将用作起始地位。// 如果一个分区的最新记录早于指定的工夫戳,则只从最新记录读取该分区数据。// 在这种模式下,Kafka 中的已提交 offset 将被疏忽,不会用作起始地位。//consumer.setStartFromTimestamp(1585047859000L);

        // 为每个分区指定偏移量
        /*Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
        specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 0), 23L);
        specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 1), 31L);
        specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 2), 43L);
        consumer1.setStartFromSpecificOffsets(specificStartOffsets);*/
        /**
         *
         * 请留神:当 Job 从故障中主动复原或应用 savepoint 手动复原时,* 这些起始地位配置办法不会影响生产的起始地位。* 在复原时,每个 Kafka 分区的起始地位由存储在 savepoint 或 checkpoint 中的 offset 确定
         *
         */

        DataStreamSource<String> source = senv.addSource(consumer);
        // TODO
        source.print();
        senv.execute("test kafka connector");
    }
}

参数配置解读

在 Demo 示例中,给出了具体的配置信息,上面将对下面的参数配置进行逐个剖析。

kakfa 的 properties 参数配置

  • bootstrap.servers:kafka broker 地址
  • zookeeper.connect:仅 kafka0.8 版本须要配置
  • group.id:消费者组
  • enable.auto.commit:

    主动偏移量提交,该值的配置不是最终的偏移量提交模式,须要思考用户是否开启了 checkpoint,

    在上面的源码剖析中会进行解读

  • auto.commit.interval.ms:偏移量提交的工夫距离,毫秒
  • key.deserializer:

    kafka 音讯的 key 序列化器,如果不指定会应用 ByteArrayDeserializer 序列化器

  • value.deserializer:

kafka 音讯的 value 序列化器,如果不指定会应用 ByteArrayDeserializer 序列化器

  • auto.offset.reset:

    指定 kafka 的消费者从哪里开始生产数据,共有三种形式,

    • 第一种:earliest
      当各分区下有已提交的 offset 时,从提交的 offset 开始生产;无提交的 offset 时,从头开始生产
    • 第二种:latest
      当各分区下有已提交的 offset 时,从提交的 offset 开始生产;无提交的 offset 时,生产新产生的该分区下的数据
    • 第三种:none
      topic 各分区都存在已提交的 offset 时,从 offset 后开始生产;只有有一个分区不存在已提交的 offset,则抛出异样

留神:下面的指定生产模式并不是最终的生产模式,取决于用户在 Flink 程序中配置的生产模式

Flink 程序用户配置的参数

  • consumer.setCommitOffsetsOnCheckpoints(true)

​ 解释:设置 checkpoint 后在提交 offset,即 oncheckpoint 模式,该值默认为 true,该参数会影响偏移量的提交形式,上面的源码中会进行剖析

  • consumer.setStartFromEarliest()

    解释:最早的数据开始生产,该模式下,Kafka 中的 committed offset 将被疏忽,不会用作起始地位。该办法为继承父类 FlinkKafkaConsumerBase 的办法。

  • consumer.setStartFromGroupOffsets()

    解释:消费者组最近一次提交的偏移量,默认。如果找不到分区的偏移量,那么将会应用配置中的 auto.offset.reset 设置,该办法为继承父类 FlinkKafkaConsumerBase 的办法。

  • consumer.setStartFromLatest()

    解释:最新的数据开始生产,该模式下,Kafka 中的 committed offset 将被疏忽,不会用作起始地位。该办法为继承父类 FlinkKafkaConsumerBase 的办法。

  • consumer.setStartFromTimestamp(1585047859000L)

    解释:指定具体的偏移量工夫戳, 毫秒。对于每个分区,其工夫戳大于或等于指定工夫戳的记录将用作起始地位。如果一个分区的最新记录早于指定的工夫戳,则只从最新记录读取该分区数据。在这种模式下,Kafka 中的已提交 offset 将被疏忽,不会用作起始地位。

  • consumer.setStartFromSpecificOffsets(specificStartOffsets)

解释:为每个分区指定偏移量,该办法为继承父类 FlinkKafkaConsumerBase 的办法。

请留神:当 Job 从故障中主动复原或应用 savepoint 手动复原时,这些起始地位配置办法不会影响生产的起始地位。在复原时,每个 Kafka 分区的起始地位由存储在 savepoint 或 checkpoint 中的 offset 确定。

Flink Kafka Consumer 源码解读

继承关系

Flink Kafka Consumer 继承了 FlinkKafkaConsumerBase 抽象类,而 FlinkKafkaConsumerBase 抽象类又继承了 RichParallelSourceFunction,所以要实现一个自定义的 source 时,有两种实现形式:一种是通过实现 SourceFunction 接口来自定义并行度为 1 的数据源;另一种是通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 来自定义具备并行度的数据源。FlinkKafkaConsumer 的继承关系如下图所示。

源码解读

FlinkKafkaConsumer 源码

先看一下 FlinkKafkaConsumer 的源码,为了方面浏览,本文将尽量给出本比拟残缺的源代码片段,具体如下所示:代码较长,在这里能够先有有一个总体的印象,上面会对重要的代码片段具体进行剖析。

public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {

    // 配置轮询超时超时工夫,应用 flink.poll-timeout 参数在 properties 进行配置
    public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
    // 如果没有可用数据,则期待轮询所需的工夫(以毫秒为单位)。如果为 0,则立刻返回所有可用的记录
    // 默认轮询超时工夫
    public static final long DEFAULT_POLL_TIMEOUT = 100L;
    // 用户提供的 kafka 参数配置
    protected final Properties properties;
    // 如果没有可用数据,则期待轮询所需的工夫(以毫秒为单位)。如果为 0,则立刻返回所有可用的记录
    protected final long pollTimeout;
    /**
     * 创立一个 kafka 的 consumer source
     * @param topic                   生产的主题名称
     * @param valueDeserializer       反序列化类型,用于将 kafka 的字节音讯转换为 Flink 的对象
     * @param props                   用户传入的 kafka 参数
     */
    public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {this(Collections.singletonList(topic), valueDeserializer, props);
    }
    /**
     * 创立一个 kafka 的 consumer source
     * 该构造方法容许传入 KafkaDeserializationSchema,该反序列化类反对拜访 kafka 生产的额定信息
     * 比方:key/value 对,offsets(偏移量),topic(主题名称)
     * @param topic                生产的主题名称
     * @param deserializer         反序列化类型,用于将 kafka 的字节音讯转换为 Flink 的对象
     * @param props                用户传入的 kafka 参数
     */
    public FlinkKafkaConsumer(String topic, KafkaDeserializationSchema<T> deserializer, Properties props) {this(Collections.singletonList(topic), deserializer, props);
    }
    /**
     * 创立一个 kafka 的 consumer source
     * 该构造方法容许传入多个 topic(主题),反对生产多个主题
     * @param topics          生产的主题名称,多个主题为 List 汇合
     * @param deserializer    反序列化类型,用于将 kafka 的字节音讯转换为 Flink 的对象
     * @param props           用户传入的 kafka 参数
     */
    public FlinkKafkaConsumer(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
    }
    /**
     * 创立一个 kafka 的 consumer source
     * 该构造方法容许传入多个 topic(主题),反对生产多个主题,
     * @param topics         生产的主题名称,多个主题为 List 汇合
     * @param deserializer   反序列化类型,用于将 kafka 的字节音讯转换为 Flink 的对象, 反对获取额定信息
     * @param props          用户传入的 kafka 参数
     */
    public FlinkKafkaConsumer(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {this(topics, null, deserializer, props);
    }
    /**
     * 基于正则表达式订阅多个 topic
     * 如果开启了分区发现,即 FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS 值为非正数
     * 只有是可能正则匹配上,主题一旦被创立就会立刻被订阅
     * @param subscriptionPattern   主题的正则表达式
     * @param valueDeserializer   反序列化类型,用于将 kafka 的字节音讯转换为 Flink 的对象, 反对获取额定信息
     * @param props               用户传入的 kafka 参数
     */
    public FlinkKafkaConsumer(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {this(null, subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props);
    }
    /**
     * 基于正则表达式订阅多个 topic
     * 如果开启了分区发现,即 FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS 值为非正数
     * 只有是可能正则匹配上,主题一旦被创立就会立刻被订阅
     * @param subscriptionPattern   主题的正则表达式
     * @param deserializer          该反序列化类反对拜访 kafka 生产的额定信息, 比方:key/value 对,offsets(偏移量),topic(主题名称)
     * @param props                 用户传入的 kafka 参数
     */
    public FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) {this(null, subscriptionPattern, deserializer, props);
    }
    private FlinkKafkaConsumer(
        List<String> topics,
        Pattern subscriptionPattern,
        KafkaDeserializationSchema<T> deserializer,
        Properties props) {// 调用父类 (FlinkKafkaConsumerBase) 构造方法,PropertiesUtil.getLong 办法第一个参数为 Properties,第二个参数为 key,第三个参数为 value 默认值
        super(
            topics,
            subscriptionPattern,
            deserializer,
            getLong(checkNotNull(props, "props"),
                KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
            !getBoolean(props, KEY_DISABLE_METRICS, false));

        this.properties = props;
        setDeserializer(this.properties);

        // 配置轮询超时工夫,如果在 properties 中配置了 KEY_POLL_TIMEOUT 参数,则返回具体的配置值,否则返回默认值 DEFAULT_POLL_TIMEOUT
        try {if (properties.containsKey(KEY_POLL_TIMEOUT)) {this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
            } else {this.pollTimeout = DEFAULT_POLL_TIMEOUT;}
        }
        catch (Exception e) {throw new IllegalArgumentException("Cannot parse poll timeout for'" + KEY_POLL_TIMEOUT + '\'', e);
        }
    }
   // 父类 (FlinkKafkaConsumerBase) 办法重写,该办法的作用是返回一个 fetcher 实例,// fetcher 的作用是连贯 kafka 的 broker,拉去数据并进行反序列化,而后将数据输入为数据流(data stream)
    @Override
    protected AbstractFetcher<T, ?> createFetcher(
        SourceContext<T> sourceContext,
        Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
        SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
        SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
        StreamingRuntimeContext runtimeContext,
        OffsetCommitMode offsetCommitMode,
        MetricGroup consumerMetricGroup,
        boolean useMetrics) throws Exception {// 确保当偏移量的提交模式为 ON_CHECKPOINTS(条件 1:开启 checkpoint,条件 2:consumer.setCommitOffsetsOnCheckpoints(true))时,禁用主动提交
        // 该办法为父类 (FlinkKafkaConsumerBase) 的静态方法
        // 这将笼罩用户在 properties 中配置的任何设置
        // 当 offset 的模式为 ON_CHECKPOINTS,或者为 DISABLED 时,会将用户配置的 properties 属性进行笼罩
        // 具体是将 ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit" 的值重置为 "false
        // 能够了解为:如果开启了 checkpoint,并且设置了 consumer.setCommitOffsetsOnCheckpoints(true),默认为 true,// 就会将 kafka properties 的 enable.auto.commit 强制置为 false
        adjustAutoCommitConfig(properties, offsetCommitMode);
        return new KafkaFetcher<>(
            sourceContext,
            assignedPartitionsWithInitialOffsets,
            watermarksPeriodic,
            watermarksPunctuated,
            runtimeContext.getProcessingTimeService(),
            runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
            runtimeContext.getUserCodeClassLoader(),
            runtimeContext.getTaskNameWithSubtasks(),
            deserializer,
            properties,
            pollTimeout,
            runtimeContext.getMetricGroup(),
            consumerMetricGroup,
            useMetrics);
    }
    // 父类 (FlinkKafkaConsumerBase) 办法重写
    // 返回一个分区发现类,分区发现能够应用 kafka broker 的高级 consumer API 发现 topic 和 partition 的元数据
    @Override
    protected AbstractPartitionDiscoverer createPartitionDiscoverer(
        KafkaTopicsDescriptor topicsDescriptor,
        int indexOfThisSubtask,
        int numParallelSubtasks) {return new KafkaPartitionDiscoverer(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, properties);
    }

    /**
     * 判断是否在 kafka 的参数开启了主动提交,即 enable.auto.commit=true,* 并且 auto.commit.interval.ms>0,
     * 留神:如果没有没有设置 enable.auto.commit 的参数,则默认为 true
     *       如果没有设置 auto.commit.interval.ms 的参数,则默认为 5000 毫秒
     * @return
     */
    @Override
    protected boolean getIsAutoCommitEnabled() {
        //
        return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
            PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
    }

    /**
     * 确保配置了 kafka 音讯的 key 与 value 的反序列化形式,* 如果没有配置,则应用 ByteArrayDeserializer 序列化器,* 该类的 deserialize 办法是间接将数据进行 return,未做任何解决
     * @param props
     */
    private static void setDeserializer(Properties props) {final String deSerName = ByteArrayDeserializer.class.getName();

        Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);

        if (keyDeSer != null && !keyDeSer.equals(deSerName)) {LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        }
        if (valDeSer != null && !valDeSer.equals(deSerName)) {LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
        }
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
    }
}

剖析

下面的代码曾经给出了十分具体的正文,上面将对比拟要害的局部进行剖析。

  • 构造方法剖析

FlinkKakfaConsumer 提供了 7 种构造方法,如上图所示。不同的构造方法别离具备不同的性能,通过传递的参数也能够大抵剖析出每种构造方法特有的性能,为了不便了解,本文将对其进行分组讨论,具体如下:

单 topic

/**
     * 创立一个 kafka 的 consumer source
     * @param topic                   生产的主题名称
     * @param valueDeserializer       反序列化类型,用于将 kafka 的字节音讯转换为 Flink 的对象
     * @param props                   用户传入的 kafka 参数
     */
    public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {this(Collections.singletonList(topic), valueDeserializer, props);
    }

/**
     * 创立一个 kafka 的 consumer source
     * 该构造方法容许传入 KafkaDeserializationSchema,该反序列化类反对拜访 kafka 生产的额定信息
     * 比方:key/value 对,offsets(偏移量),topic(主题名称)
     * @param topic                生产的主题名称
     * @param deserializer         反序列化类型,用于将 kafka 的字节音讯转换为 Flink 的对象
     * @param props                用户传入的 kafka 参数
     */
    public FlinkKafkaConsumer(String topic, KafkaDeserializationSchema<T> deserializer, Properties props) {this(Collections.singletonList(topic), deserializer, props);
    }

下面两种构造方法只反对单个 topic,区别在于反序列化的形式不一样。第一种应用的是 DeserializationSchema,第二种应用的是 KafkaDeserializationSchema,其中应用带有 KafkaDeserializationSchema 参数的构造方法能够获取更多的从属信息,比方在某些场景下须要获取 key/value 对,offsets(偏移量),topic(主题名称)等信息,能够抉择应用此形式的构造方法。以上两种办法都调用了公有的构造方法,公有构造方法的剖析见上面。

多 topic

/**
     * 创立一个 kafka 的 consumer source
     * 该构造方法容许传入多个 topic(主题),反对生产多个主题
     * @param topics          生产的主题名称,多个主题为 List 汇合
     * @param deserializer    反序列化类型,用于将 kafka 的字节音讯转换为 Flink 的对象
     * @param props           用户传入的 kafka 参数
     */
    public FlinkKafkaConsumer(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
    }
    /**
     * 创立一个 kafka 的 consumer source
     * 该构造方法容许传入多个 topic(主题),反对生产多个主题,
     * @param topics         生产的主题名称,多个主题为 List 汇合
     * @param deserializer   反序列化类型,用于将 kafka 的字节音讯转换为 Flink 的对象, 反对获取额定信息
     * @param props          用户传入的 kafka 参数
     */
    public FlinkKafkaConsumer(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {this(topics, null, deserializer, props);
    }

下面的两种多 topic 的构造方法,能够应用一个 list 汇合接管多个 topic 进行生产,区别在于反序列化的形式不一样。第一种应用的是 DeserializationSchema,第二种应用的是 KafkaDeserializationSchema,其中应用带有 KafkaDeserializationSchema 参数的构造方法能够获取更多的从属信息,比方在某些场景下须要获取 key/value 对,offsets(偏移量),topic(主题名称)等信息,能够抉择应用此形式的构造方法。以上两种办法都调用了公有的构造方法,公有构造方法的剖析见上面。

正则匹配 topic

/**
     * 基于正则表达式订阅多个 topic
     * 如果开启了分区发现,即 FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS 值为非正数
     * 只有是可能正则匹配上,主题一旦被创立就会立刻被订阅
     * @param subscriptionPattern   主题的正则表达式
     * @param valueDeserializer   反序列化类型,用于将 kafka 的字节音讯转换为 Flink 的对象, 反对获取额定信息
     * @param props               用户传入的 kafka 参数
     */
    public FlinkKafkaConsumer(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {this(null, subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props);
    }
    /**
     * 基于正则表达式订阅多个 topic
     * 如果开启了分区发现,即 FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS 值为非正数
     * 只有是可能正则匹配上,主题一旦被创立就会立刻被订阅
     * @param subscriptionPattern   主题的正则表达式
     * @param deserializer          该反序列化类反对拜访 kafka 生产的额定信息, 比方:key/value 对,offsets(偏移量),topic(主题名称)
     * @param props                 用户传入的 kafka 参数
     */
    public FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) {this(null, subscriptionPattern, deserializer, props);
    }

理论的生产环境中可能有这样一些需要,比方有一个 flink 作业须要将多种不同的数据聚合到一起,而这些数据对应着不同的 kafka topic,随着业务增长,新增一类数据,同时新增了一个 kafka topic,如何在不重启作业的状况下作业主动感知新的 topic。首先须要在构建 FlinkKafkaConsumer 时的 properties 中设置 flink.partition-discovery.interval-millis 参数为非负值,示意开启动静发现的开关,以及设置的工夫距离。此时 FLinkKafkaConsumer 外部会启动一个独自的线程定期去 kafka 获取最新的 meta 信息。具体的调用执行信息,参见上面的公有构造方法

公有构造方法


    private FlinkKafkaConsumer(
        List<String> topics,
        Pattern subscriptionPattern,
        KafkaDeserializationSchema<T> deserializer,
        Properties props) {// 调用父类 (FlinkKafkaConsumerBase) 构造方法,PropertiesUtil.getLong 办法第一个参数为 Properties,第二个参数为 key,第三个参数为 value 默认值。KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS 值是开启分区发现的配置参数,在 properties 外面配置 flink.partition-discovery.interval-millis=5000(大于 0 的数), 如果没有配置则应用 PARTITION_DISCOVERY_DISABLED=Long.MIN_VALUE(示意禁用分区发现)
        super(
            topics,
            subscriptionPattern,
            deserializer,
            getLong(checkNotNull(props, "props"),
                KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
            !getBoolean(props, KEY_DISABLE_METRICS, false));

        this.properties = props;
        setDeserializer(this.properties);

        // 配置轮询超时工夫,如果在 properties 中配置了 KEY_POLL_TIMEOUT 参数,则返回具体的配置值,否则返回默认值 DEFAULT_POLL_TIMEOUT
        try {if (properties.containsKey(KEY_POLL_TIMEOUT)) {this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
            } else {this.pollTimeout = DEFAULT_POLL_TIMEOUT;}
        }
        catch (Exception e) {throw new IllegalArgumentException("Cannot parse poll timeout for'" + KEY_POLL_TIMEOUT + '\'', e);
        }
    }
  • 其余办法剖析

KafkaFetcher 对象创立

   // 父类 (FlinkKafkaConsumerBase) 办法重写,该办法的作用是返回一个 fetcher 实例,// fetcher 的作用是连贯 kafka 的 broker,拉去数据并进行反序列化,而后将数据输入为数据流(data stream)
    @Override
    protected AbstractFetcher<T, ?> createFetcher(
        SourceContext<T> sourceContext,
        Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
        SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
        SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
        StreamingRuntimeContext runtimeContext,
        OffsetCommitMode offsetCommitMode,
        MetricGroup consumerMetricGroup,
        boolean useMetrics) throws Exception {// 确保当偏移量的提交模式为 ON_CHECKPOINTS(条件 1:开启 checkpoint,条件 2:consumer.setCommitOffsetsOnCheckpoints(true))时,禁用主动提交
        // 该办法为父类 (FlinkKafkaConsumerBase) 的静态方法
        // 这将笼罩用户在 properties 中配置的任何设置
        // 当 offset 的模式为 ON_CHECKPOINTS,或者为 DISABLED 时,会将用户配置的 properties 属性进行笼罩
        // 具体是将 ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit" 的值重置为 "false
        // 能够了解为:如果开启了 checkpoint,并且设置了 consumer.setCommitOffsetsOnCheckpoints(true),默认为 true,// 就会将 kafka properties 的 enable.auto.commit 强制置为 false
        adjustAutoCommitConfig(properties, offsetCommitMode);
        return new KafkaFetcher<>(
            sourceContext,
            assignedPartitionsWithInitialOffsets,
            watermarksPeriodic,
            watermarksPunctuated,
            runtimeContext.getProcessingTimeService(),
            runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
            runtimeContext.getUserCodeClassLoader(),
            runtimeContext.getTaskNameWithSubtasks(),
            deserializer,
            properties,
            pollTimeout,
            runtimeContext.getMetricGroup(),
            consumerMetricGroup,
            useMetrics);
    }

该办法的作用是返回一个 fetcher 实例,fetcher 的作用是连贯 kafka 的 broker,拉去数据并进行反序列化,而后将数据输入为数据流 (data stream),在这里对主动偏移量提交模式进行了强制调整,即确保当偏移量的提交模式为 ON_CHECKPOINTS(条件 1:开启 checkpoint,条件 2:consumer.setCommitOffsetsOnCheckpoints(true)) 时,禁用主动提交。这将笼罩用户在 properties 中配置的任何设置,简略能够了解为:如果开启了 checkpoint,并且设置了 consumer.setCommitOffsetsOnCheckpoints(true),默认为 true,就会将 kafka properties 的 enable.auto.commit 强制置为 false。对于 offset 的提交模式,见下文的偏移量提交模式分析。

判断是否设置了主动提交

   @Override
    protected boolean getIsAutoCommitEnabled() {
        //
        return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
            PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
    }

判断是否在 kafka 的参数开启了主动提交,即 enable.auto.commit=true,并且 auto.commit.interval.ms>0, 留神:如果没有没有设置 enable.auto.commit 的参数,则默认为 true, 如果没有设置 auto.commit.interval.ms 的参数,则默认为 5000 毫秒。该办法会在 FlinkKafkaConsumerBase 的 open 办法进行初始化的时候调用。

反序列化

private static void setDeserializer(Properties props) {
         // 默认的反序列化形式 
        final String deSerName = ByteArrayDeserializer.class.getName();
         // 获取用户配置的 properties 对于 key 与 value 的反序列化模式
        Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
         // 如果配置了,则应用用户配置的值
        if (keyDeSer != null && !keyDeSer.equals(deSerName)) {LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        }
        if (valDeSer != null && !valDeSer.equals(deSerName)) {LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
        }
        // 没有配置,则应用 ByteArrayDeserializer 进行反序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
    }

确保配置了 kafka 音讯的 key 与 value 的反序列化形式,如果没有配置,则应用 ByteArrayDeserializer 序列化器,
ByteArrayDeserializer 类的 deserialize 办法是间接将数据进行 return,未做任何解决。

FlinkKafkaConsumerBase 源码


@Internal
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
        CheckpointListener,
        ResultTypeQueryable<T>,
        CheckpointedFunction {

    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
    public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";
    private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
    private boolean enableCommitOnCheckpoints = true;
    /**
     * 偏移量的提交模式,仅能通过在 FlinkKafkaConsumerBase#open(Configuration)进行配置
     * 该值取决于用户是否开启了 checkpoint

     */
    private OffsetCommitMode offsetCommitMode;
    /**
     * 配置从哪个地位开始生产 kafka 的音讯,* 默认为 StartupMode#GROUP_OFFSETS,即从以后提交的偏移量开始生产
     */
    private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
    private Map<KafkaTopicPartition, Long> specificStartupOffsets;
    private Long startupOffsetsTimestamp;

    /**
     * 确保当偏移量的提交模式为 ON_CHECKPOINTS 时,禁用主动提交,* 这将笼罩用户在 properties 中配置的任何设置。* 当 offset 的模式为 ON_CHECKPOINTS,或者为 DISABLED 时,会将用户配置的 properties 属性进行笼罩
     * 具体是将 ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit" 的值重置为 "false,即禁用主动提交
     * @param properties       kafka 配置的 properties,会通过该办法进行笼罩
     * @param offsetCommitMode    offset 提交模式
     */
    static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        }
    }

    /**
     * 决定是否在开启 checkpoint 时,在 checkpoin 之后提交偏移量,* 只有用户配置了启用 checkpoint,该参数才会其作用
     * 如果没有开启 checkpoint,则应用 kafka 的配置参数:enable.auto.commit
     * @param commitOnCheckpoints
     * @return
     */
    public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints) {
        this.enableCommitOnCheckpoints = commitOnCheckpoints;
        return this;
    }
    /**
     * 从最早的偏移量开始生产,* 该模式下,Kafka 中的曾经提交的偏移量将被疏忽,不会用作起始地位。* 能够通过 consumer1.setStartFromEarliest()进行设置
     */
    public FlinkKafkaConsumerBase<T> setStartFromEarliest() {
        this.startupMode = StartupMode.EARLIEST;
        this.startupOffsetsTimestamp = null;
        this.specificStartupOffsets = null;
        return this;
    }

    /**
     * 从最新的数据开始生产,
     *  该模式下,Kafka 中的 已提交的偏移量将被疏忽,不会用作起始地位。*
     */
    public FlinkKafkaConsumerBase<T> setStartFromLatest() {
        this.startupMode = StartupMode.LATEST;
        this.startupOffsetsTimestamp = null;
        this.specificStartupOffsets = null;
        return this;
    }
    
    /**
     * 指定具体的偏移量工夫戳, 毫秒
     * 对于每个分区,其工夫戳大于或等于指定工夫戳的记录将用作起始地位。* 如果一个分区的最新记录早于指定的工夫戳,则只从最新记录读取该分区数据。* 在这种模式下,Kafka 中的已提交 offset 将被疏忽,不会用作起始地位。*/
    protected FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {checkArgument(startupOffsetsTimestamp >= 0, "The provided value for the startup offsets timestamp is invalid.");

        long currentTimestamp = System.currentTimeMillis();
        checkArgument(startupOffsetsTimestamp <= currentTimestamp,
            "Startup time[%s] must be before current time[%s].", startupOffsetsTimestamp, currentTimestamp);

        this.startupMode = StartupMode.TIMESTAMP;
        this.startupOffsetsTimestamp = startupOffsetsTimestamp;
        this.specificStartupOffsets = null;
        return this;
    }

    /**
     *
     * 从具体的消费者组最近提交的偏移量开始生产,为默认形式
     * 如果没有发现分区的偏移量,应用 auto.offset.reset 参数配置的值
     * @return
     */
    public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
        this.startupMode = StartupMode.GROUP_OFFSETS;
        this.startupOffsetsTimestamp = null;
        this.specificStartupOffsets = null;
        return this;
    }

    /**
     * 为每个分区指定偏移量进行生产
     */
    public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets) {
        this.startupMode = StartupMode.SPECIFIC_OFFSETS;
        this.startupOffsetsTimestamp = null;
        this.specificStartupOffsets = checkNotNull(specificStartupOffsets);
        return this;
    }
    @Override
    public void open(Configuration configuration) throws Exception {
        // determine the offset commit mode
        // 决定偏移量的提交模式,// 第一个参数为是否开启了主动提交,// 第二个参数为是否开启了 CommitOnCheckpoint 模式
        // 第三个参数为是否开启了 checkpoint
        this.offsetCommitMode = OffsetCommitModes.fromConfiguration(getIsAutoCommitEnabled(),
                enableCommitOnCheckpoints,
                ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
       
       // 省略的代码
    }

// 省略的代码
    /**
     * 创立一个 fetcher 用于连贯 kafka 的 broker,拉去数据并进行反序列化,而后将数据输入为数据流(data stream)
     * @param sourceContext   数据输入的上下文
     * @param subscribedPartitionsToStartOffsets  以后 sub task 须要解决的 topic 分区汇合,即 topic 的 partition 与 offset 的 Map 汇合
     * @param watermarksPeriodic    可选, 一个序列化的工夫戳提取器,生成 periodic 类型的 watermark
     * @param watermarksPunctuated  可选, 一个序列化的工夫戳提取器,生成 punctuated 类型的 watermark
     * @param runtimeContext        task 的 runtime context 上下文
     * @param offsetCommitMode      offset 的提交模式, 有三种,别离为:DISABLED(禁用偏移量主动提交),ON_CHECKPOINTS(仅仅当 checkpoints 实现之后,才提交偏移量给 kafka)
     * KAFKA_PERIODIC(应用 kafka 主动提交函数,周期性主动提交偏移量)
     * @param kafkaMetricGroup   Flink 的 Metric
     * @param useMetrics         是否应用 Metric
     * @return                   返回一个 fetcher 实例
     * @throws Exception
     */
    protected abstract AbstractFetcher<T, ?> createFetcher(
            SourceContext<T> sourceContext,
            Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
            SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
            SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
            StreamingRuntimeContext runtimeContext,
            OffsetCommitMode offsetCommitMode,
            MetricGroup kafkaMetricGroup,
            boolean useMetrics) throws Exception;
    protected abstract boolean getIsAutoCommitEnabled();
    // 省略的代码
}

上述代码是 FlinkKafkaConsumerBase 的局部代码片段,基本上对其做了具体正文,外面的有些办法是 FlinkKafkaConsumer 继承的,有些是重写的。之所以在这里给出,能够对照 FlinkKafkaConsumer 的源码,从而不便了解。

偏移量提交模式分析

Flink Kafka Consumer 容许有配置如何将 offset 提交回 Kafka broker(或 0.8 版本的 Zookeeper)的行为。请留神:Flink Kafka Consumer 不依赖于提交的 offset 来实现容错保障。提交的 offset 只是一种办法,用于公开 consumer 的进度以便进行监控。

配置 offset 提交行为的办法是否雷同,取决于是否为 job 启用了 checkpointing。在这里先给出提交模式的具体论断,上面会对两种形式进行具体的剖析。根本的论断为:

  • 开启 checkpoint

    • 状况 1:用户通过调用 consumer 上的 setCommitOffsetsOnCheckpoints(true) 办法来启用 offset 的提交(默认状况下为 true)

那么当 checkpointing 实现时,Flink Kafka Consumer 将提交的 offset 存储在 checkpoint 状态中。
这确保 Kafka broker 中提交的 offset 与 checkpoint 状态中的 offset 统一。
留神,在这个场景中,Properties 中的主动定期 offset 提交设置会被齐全疏忽。
此状况应用的是 ON_CHECKPOINTS

    • 状况 2:用户通过调用 consumer 上的 setCommitOffsetsOnCheckpoints(“false”) 办法来禁用 offset 的提交,则应用 DISABLED 模式提交 offset
    • 未开启 checkpoint
      Flink Kafka Consumer 依赖于外部应用的 Kafka client 主动定期 offset 提交性能,因而,要禁用或启用 offset 的提交
    • 状况 1:配置了 Kafka properties 的参数配置了 ”enable.auto.commit” = “true” 或者 Kafka 0.8 的 auto.commit.enable=true,应用 KAFKA_PERIODIC 模式提交 offset,即主动提交 offset

      • 状况 2:没有配置 enable.auto.commit 参数,应用 DISABLED 模式提交 offset,这意味着 kafka 不晓得以后的消费者组的消费者每次生产的偏移量。

    提交模式源码剖析

    • offset 的提交模式
    public enum OffsetCommitMode {
        // 禁用偏移量主动提交
        DISABLED,
        // 仅仅当 checkpoints 实现之后,才提交偏移量给 kafka
        ON_CHECKPOINTS,
        // 应用 kafka 主动提交函数,周期性主动提交偏移量
        KAFKA_PERIODIC;
    }
    • 提交模式的调用
    public class OffsetCommitModes {
        public static OffsetCommitMode fromConfiguration(
                boolean enableAutoCommit,
                boolean enableCommitOnCheckpoint,
                boolean enableCheckpointing) {
            // 如果开启了 checkinpoint,执行上面判断
            if (enableCheckpointing) {// 如果开启了 checkpoint,进一步判断是否在 checkpoin 启用时提交(setCommitOffsetsOnCheckpoints(true)),如果是则应用 ON_CHECKPOINTS 模式
                // 否则应用 DISABLED 模式
                return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
            } else {
                // 若 Kafka properties 的参数配置了 "enable.auto.commit" = "true",则应用 KAFKA_PERIODIC 模式提交 offset
                // 否则应用 DISABLED 模式
                return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
            }
        }
    }

    小结

    本文次要介绍了 Flink Kafka Consumer,首先对 FlinkKafkaConsumer 的不同版本进行了比照,而后给出了一个残缺的 Demo 案例,并对案例的配置参数进行了具体解释,接着剖析了 FlinkKafkaConsumer 的继承关系,并别离对 FlinkKafkaConsumer 以及其父类 FlinkKafkaConsumerBase 的源码进行了解读,最初从源码层面剖析了 Flink Kafka Consumer 的偏移量提交模式,并对每一种提交模式进行了梳理。

    公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

    退出移动版