Flink 提供了专门的 Kafka 连接器,向 Kafka topic 中读取或者写入数据。Flink Kafka Consumer 集成了 Flink 的 Checkpoint 机制,可提供 exactly-once 的解决语义。为此,Flink 并不齐全依赖于跟踪 Kafka 生产组的偏移量,而是在外部跟踪和查看偏移量。
引言
当咱们在应用Spark Streaming、Flink等计算框架进行数据实时处理时,应用Kafka作为一款公布与订阅的音讯零碎成为了标配。Spark Streaming与Flink都提供了绝对应的Kafka Consumer,应用起来十分的不便,只须要设置一下Kafka的参数,而后增加kafka的source就高枕无忧了。如果你真的感觉事件就是如此的so easy,感觉妈妈再也不必放心你的学习了,那就真的是too young too simple sometimes naive了。本文以Flink 的Kafka Source为探讨对象,首先从根本的应用动手,而后深刻源码逐个分析,一并为你拨开Flink Kafka connector的神秘面纱。值得注意的是,本文假设读者具备了Kafka的相干常识,对于Kafka的相干细节问题,不在本文的探讨范畴之内。
Flink Kafka Consumer介绍
Flink Kafka Connector有很多个版本,能够依据你的kafka和Flink的版本抉择相应的包(maven artifact id)和类名。本文所波及的Flink版本为1.10,Kafka的版本为2.3.4。Flink所提供的Maven依赖于类名如下表所示:
Demo示例
增加Maven依赖
<!--本文应用的是通用型的connector-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
简略代码案例
public class KafkaConnector {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启checkpoint,工夫距离为毫秒
senv.enableCheckpointing(5000L);
// 抉择状态后端
senv.setStateBackend((StateBackend) new FsStateBackend("file:///E://checkpoint"));
//senv.setStateBackend((StateBackend) new FsStateBackend("hdfs://kms-1:8020/checkpoint"));
Properties props = new Properties();
// kafka broker地址
props.put("bootstrap.servers", "kms-2:9092,kms-3:9092,kms-4:9092");
// 仅kafka0.8版本须要配置
props.put("zookeeper.connect", "kms-2:2181,kms-3:2181,kms-4:2181");
// 消费者组
props.put("group.id", "test");
// 主动偏移量提交
props.put("enable.auto.commit", true);
// 偏移量提交的工夫距离,毫秒
props.put("auto.commit.interval.ms", 5000);
// kafka 音讯的key序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// kafka 音讯的value序列化器
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 指定kafka的消费者从哪里开始生产数据
// 共有三种形式,
// #earliest
// 当各分区下有已提交的offset时,从提交的offset开始生产;
// 无提交的offset时,从头开始生产
// #latest
// 当各分区下有已提交的offset时,从提交的offset开始生产;
// 无提交的offset时,生产新产生的该分区下的数据
// #none
// topic各分区都存在已提交的offset时,
// 从offset后开始生产;
// 只有有一个分区不存在已提交的offset,则抛出异样
props.put("auto.offset.reset", "latest");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"qfbap_ods.code_city",
new SimpleStringSchema(),
props);
//设置checkpoint后在提交offset,即oncheckpoint模式
// 该值默认为true,
consumer.setCommitOffsetsOnCheckpoints(true);
// 最早的数据开始生产
// 该模式下,Kafka 中的 committed offset 将被疏忽,不会用作起始地位。
//consumer.setStartFromEarliest();
// 消费者组最近一次提交的偏移量,默认。
// 如果找不到分区的偏移量,那么将会应用配置中的 auto.offset.reset 设置
//consumer.setStartFromGroupOffsets();
// 最新的数据开始生产
// 该模式下,Kafka 中的 committed offset 将被疏忽,不会用作起始地位。
//consumer.setStartFromLatest();
// 指定具体的偏移量工夫戳,毫秒
// 对于每个分区,其工夫戳大于或等于指定工夫戳的记录将用作起始地位。
// 如果一个分区的最新记录早于指定的工夫戳,则只从最新记录读取该分区数据。
// 在这种模式下,Kafka 中的已提交 offset 将被疏忽,不会用作起始地位。
//consumer.setStartFromTimestamp(1585047859000L);
// 为每个分区指定偏移量
/*Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 2), 43L);
consumer1.setStartFromSpecificOffsets(specificStartOffsets);*/
/**
*
* 请留神:当 Job 从故障中主动复原或应用 savepoint 手动复原时,
* 这些起始地位配置办法不会影响生产的起始地位。
* 在复原时,每个 Kafka 分区的起始地位由存储在 savepoint 或 checkpoint 中的 offset 确定
*
*/
DataStreamSource<String> source = senv.addSource(consumer);
// TODO
source.print();
senv.execute("test kafka connector");
}
}
参数配置解读
在Demo示例中,给出了具体的配置信息,上面将对下面的参数配置进行逐个剖析。
kakfa的properties参数配置
- bootstrap.servers:kafka broker地址
- zookeeper.connect:仅kafka0.8版本须要配置
- group.id:消费者组
- enable.auto.commit:
主动偏移量提交,该值的配置不是最终的偏移量提交模式,须要思考用户是否开启了checkpoint,
在上面的源码剖析中会进行解读
- auto.commit.interval.ms:偏移量提交的工夫距离,毫秒
- key.deserializer:
kafka 音讯的key序列化器,如果不指定会应用ByteArrayDeserializer序列化器
- value.deserializer:
kafka 音讯的value序列化器,如果不指定会应用ByteArrayDeserializer序列化器
-
auto.offset.reset:
指定kafka的消费者从哪里开始生产数据,共有三种形式,
- 第一种:earliest
当各分区下有已提交的offset时,从提交的offset开始生产; 无提交的offset时,从头开始生产 - 第二种:latest
当各分区下有已提交的offset时,从提交的offset开始生产;无提交的offset时,生产新产生的该分区下的数据 - 第三种:none
topic各分区都存在已提交的offset时,从offset后开始生产;只有有一个分区不存在已提交的offset,则抛出异样
- 第一种:earliest
留神:下面的指定生产模式并不是最终的生产模式,取决于用户在Flink程序中配置的生产模式
Flink程序用户配置的参数
- consumer.setCommitOffsetsOnCheckpoints(true)
解释:设置checkpoint后在提交offset,即oncheckpoint模式,该值默认为true,该参数会影响偏移量的提交形式,上面的源码中会进行剖析
- consumer.setStartFromEarliest()
解释: 最早的数据开始生产 ,该模式下,Kafka 中的 committed offset 将被疏忽,不会用作起始地位。该办法为继承父类FlinkKafkaConsumerBase的办法。
- consumer.setStartFromGroupOffsets()
解释:消费者组最近一次提交的偏移量,默认。 如果找不到分区的偏移量,那么将会应用配置中的 auto.offset.reset 设置,该办法为继承父类FlinkKafkaConsumerBase的办法。
- consumer.setStartFromLatest()
解释:最新的数据开始生产,该模式下,Kafka 中的 committed offset 将被疏忽,不会用作起始地位。该办法为继承父类FlinkKafkaConsumerBase的办法。
- consumer.setStartFromTimestamp(1585047859000L)
解释:指定具体的偏移量工夫戳,毫秒。对于每个分区,其工夫戳大于或等于指定工夫戳的记录将用作起始地位。 如果一个分区的最新记录早于指定的工夫戳,则只从最新记录读取该分区数据。在这种模式下,Kafka 中的已提交 offset 将被疏忽,不会用作起始地位。
- consumer.setStartFromSpecificOffsets(specificStartOffsets)
解释:为每个分区指定偏移量,该办法为继承父类FlinkKafkaConsumerBase的办法。
请留神:当 Job 从故障中主动复原或应用 savepoint 手动复原时,这些起始地位配置办法不会影响生产的起始地位。在复原时,每个 Kafka 分区的起始地位由存储在 savepoint 或 checkpoint 中的 offset 确定。
Flink Kafka Consumer源码解读
继承关系
Flink Kafka Consumer继承了FlinkKafkaConsumerBase抽象类,而FlinkKafkaConsumerBase抽象类又继承了RichParallelSourceFunction,所以要实现一个自定义的source时,有两种实现形式:一种是通过实现SourceFunction接口来自定义并行度为1的数据源;另一种是通过实现ParallelSourceFunction接口或者继承RichParallelSourceFunction来自定义具备并行度的数据源。FlinkKafkaConsumer的继承关系如下图所示。
源码解读
FlinkKafkaConsumer源码
先看一下FlinkKafkaConsumer的源码,为了方面浏览,本文将尽量给出本比拟残缺的源代码片段,具体如下所示:代码较长,在这里能够先有有一个总体的印象,上面会对重要的代码片段具体进行剖析。
public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
// 配置轮询超时超时工夫,应用flink.poll-timeout参数在properties进行配置
public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
// 如果没有可用数据,则期待轮询所需的工夫(以毫秒为单位)。 如果为0,则立刻返回所有可用的记录
//默认轮询超时工夫
public static final long DEFAULT_POLL_TIMEOUT = 100L;
// 用户提供的kafka 参数配置
protected final Properties properties;
// 如果没有可用数据,则期待轮询所需的工夫(以毫秒为单位)。 如果为0,则立刻返回所有可用的记录
protected final long pollTimeout;
/**
* 创立一个kafka的consumer source
* @param topic 生产的主题名称
* @param valueDeserializer 反序列化类型,用于将kafka的字节音讯转换为Flink的对象
* @param props 用户传入的kafka参数
*/
public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
this(Collections.singletonList(topic), valueDeserializer, props);
}
/**
* 创立一个kafka的consumer source
* 该构造方法容许传入KafkaDeserializationSchema,该反序列化类反对拜访kafka生产的额定信息
* 比方:key/value对,offsets(偏移量),topic(主题名称)
* @param topic 生产的主题名称
* @param deserializer 反序列化类型,用于将kafka的字节音讯转换为Flink的对象
* @param props 用户传入的kafka参数
*/
public FlinkKafkaConsumer(String topic, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(Collections.singletonList(topic), deserializer, props);
}
/**
* 创立一个kafka的consumer source
* 该构造方法容许传入多个topic(主题),反对生产多个主题
* @param topics 生产的主题名称,多个主题为List汇合
* @param deserializer 反序列化类型,用于将kafka的字节音讯转换为Flink的对象
* @param props 用户传入的kafka参数
*/
public FlinkKafkaConsumer(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
}
/**
* 创立一个kafka的consumer source
* 该构造方法容许传入多个topic(主题),反对生产多个主题,
* @param topics 生产的主题名称,多个主题为List汇合
* @param deserializer 反序列化类型,用于将kafka的字节音讯转换为Flink的对象,反对获取额定信息
* @param props 用户传入的kafka参数
*/
public FlinkKafkaConsumer(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(topics, null, deserializer, props);
}
/**
* 基于正则表达式订阅多个topic
* 如果开启了分区发现,即FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS值为非正数
* 只有是可能正则匹配上,主题一旦被创立就会立刻被订阅
* @param subscriptionPattern 主题的正则表达式
* @param valueDeserializer 反序列化类型,用于将kafka的字节音讯转换为Flink的对象,反对获取额定信息
* @param props 用户传入的kafka参数
*/
public FlinkKafkaConsumer(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
this(null, subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props);
}
/**
* 基于正则表达式订阅多个topic
* 如果开启了分区发现,即FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS值为非正数
* 只有是可能正则匹配上,主题一旦被创立就会立刻被订阅
* @param subscriptionPattern 主题的正则表达式
* @param deserializer 该反序列化类反对拜访kafka生产的额定信息,比方:key/value对,offsets(偏移量),topic(主题名称)
* @param props 用户传入的kafka参数
*/
public FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(null, subscriptionPattern, deserializer, props);
}
private FlinkKafkaConsumer(
List<String> topics,
Pattern subscriptionPattern,
KafkaDeserializationSchema<T> deserializer,
Properties props) {
// 调用父类(FlinkKafkaConsumerBase)构造方法,PropertiesUtil.getLong办法第一个参数为Properties,第二个参数为key,第三个参数为value默认值
super(
topics,
subscriptionPattern,
deserializer,
getLong(
checkNotNull(props, "props"),
KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
!getBoolean(props, KEY_DISABLE_METRICS, false));
this.properties = props;
setDeserializer(this.properties);
// 配置轮询超时工夫,如果在properties中配置了KEY_POLL_TIMEOUT参数,则返回具体的配置值,否则返回默认值DEFAULT_POLL_TIMEOUT
try {
if (properties.containsKey(KEY_POLL_TIMEOUT)) {
this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
} else {
this.pollTimeout = DEFAULT_POLL_TIMEOUT;
}
}
catch (Exception e) {
throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
}
}
// 父类(FlinkKafkaConsumerBase)办法重写,该办法的作用是返回一个fetcher实例,
// fetcher的作用是连贯kafka的broker,拉去数据并进行反序列化,而后将数据输入为数据流(data stream)
@Override
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {
// 确保当偏移量的提交模式为ON_CHECKPOINTS(条件1:开启checkpoint,条件2:consumer.setCommitOffsetsOnCheckpoints(true))时,禁用主动提交
// 该办法为父类(FlinkKafkaConsumerBase)的静态方法
// 这将笼罩用户在properties中配置的任何设置
// 当offset的模式为ON_CHECKPOINTS,或者为DISABLED时,会将用户配置的properties属性进行笼罩
// 具体是将ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"的值重置为"false
// 能够了解为:如果开启了checkpoint,并且设置了consumer.setCommitOffsetsOnCheckpoints(true),默认为true,
// 就会将kafka properties的enable.auto.commit强制置为false
adjustAutoCommitConfig(properties, offsetCommitMode);
return new KafkaFetcher<>(
sourceContext,
assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
runtimeContext.getTaskNameWithSubtasks(),
deserializer,
properties,
pollTimeout,
runtimeContext.getMetricGroup(),
consumerMetricGroup,
useMetrics);
}
//父类(FlinkKafkaConsumerBase)办法重写
// 返回一个分区发现类,分区发现能够应用kafka broker的高级consumer API发现topic和partition的元数据
@Override
protected AbstractPartitionDiscoverer createPartitionDiscoverer(
KafkaTopicsDescriptor topicsDescriptor,
int indexOfThisSubtask,
int numParallelSubtasks) {
return new KafkaPartitionDiscoverer(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, properties);
}
/**
*判断是否在kafka的参数开启了主动提交,即enable.auto.commit=true,
* 并且auto.commit.interval.ms>0,
* 留神:如果没有没有设置enable.auto.commit的参数,则默认为true
* 如果没有设置auto.commit.interval.ms的参数,则默认为5000毫秒
* @return
*/
@Override
protected boolean getIsAutoCommitEnabled() {
//
return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
}
/**
* 确保配置了kafka音讯的key与value的反序列化形式,
* 如果没有配置,则应用ByteArrayDeserializer序列化器,
* 该类的deserialize办法是间接将数据进行return,未做任何解决
* @param props
*/
private static void setDeserializer(Properties props) {
final String deSerName = ByteArrayDeserializer.class.getName();
Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
}
if (valDeSer != null && !valDeSer.equals(deSerName)) {
LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
}
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
}
}
剖析
下面的代码曾经给出了十分具体的正文,上面将对比拟要害的局部进行剖析。
- 构造方法剖析
FlinkKakfaConsumer提供了7种构造方法,如上图所示。不同的构造方法别离具备不同的性能,通过传递的参数也能够大抵剖析出每种构造方法特有的性能,为了不便了解,本文将对其进行分组讨论,具体如下:
单topic
/**
* 创立一个kafka的consumer source
* @param topic 生产的主题名称
* @param valueDeserializer 反序列化类型,用于将kafka的字节音讯转换为Flink的对象
* @param props 用户传入的kafka参数
*/
public FlinkKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
this(Collections.singletonList(topic), valueDeserializer, props);
}
/**
* 创立一个kafka的consumer source
* 该构造方法容许传入KafkaDeserializationSchema,该反序列化类反对拜访kafka生产的额定信息
* 比方:key/value对,offsets(偏移量),topic(主题名称)
* @param topic 生产的主题名称
* @param deserializer 反序列化类型,用于将kafka的字节音讯转换为Flink的对象
* @param props 用户传入的kafka参数
*/
public FlinkKafkaConsumer(String topic, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(Collections.singletonList(topic), deserializer, props);
}
下面两种构造方法只反对单个topic,区别在于反序列化的形式不一样。第一种应用的是DeserializationSchema,第二种应用的是KafkaDeserializationSchema,其中应用带有KafkaDeserializationSchema参数的构造方法能够获取更多的从属信息,比方在某些场景下须要获取key/value对,offsets(偏移量),topic(主题名称)等信息,能够抉择应用此形式的构造方法。以上两种办法都调用了公有的构造方法,公有构造方法的剖析见上面。
多topic
/**
* 创立一个kafka的consumer source
* 该构造方法容许传入多个topic(主题),反对生产多个主题
* @param topics 生产的主题名称,多个主题为List汇合
* @param deserializer 反序列化类型,用于将kafka的字节音讯转换为Flink的对象
* @param props 用户传入的kafka参数
*/
public FlinkKafkaConsumer(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
}
/**
* 创立一个kafka的consumer source
* 该构造方法容许传入多个topic(主题),反对生产多个主题,
* @param topics 生产的主题名称,多个主题为List汇合
* @param deserializer 反序列化类型,用于将kafka的字节音讯转换为Flink的对象,反对获取额定信息
* @param props 用户传入的kafka参数
*/
public FlinkKafkaConsumer(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(topics, null, deserializer, props);
}
下面的两种多topic的构造方法,能够应用一个list汇合接管多个topic进行生产,区别在于反序列化的形式不一样。第一种应用的是DeserializationSchema,第二种应用的是KafkaDeserializationSchema,其中应用带有KafkaDeserializationSchema参数的构造方法能够获取更多的从属信息,比方在某些场景下须要获取key/value对,offsets(偏移量),topic(主题名称)等信息,能够抉择应用此形式的构造方法。以上两种办法都调用了公有的构造方法,公有构造方法的剖析见上面。
正则匹配topic
/**
* 基于正则表达式订阅多个topic
* 如果开启了分区发现,即FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS值为非正数
* 只有是可能正则匹配上,主题一旦被创立就会立刻被订阅
* @param subscriptionPattern 主题的正则表达式
* @param valueDeserializer 反序列化类型,用于将kafka的字节音讯转换为Flink的对象,反对获取额定信息
* @param props 用户传入的kafka参数
*/
public FlinkKafkaConsumer(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
this(null, subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props);
}
/**
* 基于正则表达式订阅多个topic
* 如果开启了分区发现,即FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS值为非正数
* 只有是可能正则匹配上,主题一旦被创立就会立刻被订阅
* @param subscriptionPattern 主题的正则表达式
* @param deserializer 该反序列化类反对拜访kafka生产的额定信息,比方:key/value对,offsets(偏移量),topic(主题名称)
* @param props 用户传入的kafka参数
*/
public FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(null, subscriptionPattern, deserializer, props);
}
理论的生产环境中可能有这样一些需要,比方有一个flink作业须要将多种不同的数据聚合到一起,而这些数据对应着不同的kafka topic,随着业务增长,新增一类数据,同时新增了一个kafka topic,如何在不重启作业的状况下作业主动感知新的topic。首先须要在构建FlinkKafkaConsumer时的properties中设置flink.partition-discovery.interval-millis参数为非负值,示意开启动静发现的开关,以及设置的工夫距离。此时FLinkKafkaConsumer外部会启动一个独自的线程定期去kafka获取最新的meta信息。具体的调用执行信息,参见上面的公有构造方法
公有构造方法
private FlinkKafkaConsumer(
List<String> topics,
Pattern subscriptionPattern,
KafkaDeserializationSchema<T> deserializer,
Properties props) {
// 调用父类(FlinkKafkaConsumerBase)构造方法,PropertiesUtil.getLong办法第一个参数为Properties,第二个参数为key,第三个参数为value默认值。KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS值是开启分区发现的配置参数,在properties外面配置flink.partition-discovery.interval-millis=5000(大于0的数),如果没有配置则应用PARTITION_DISCOVERY_DISABLED=Long.MIN_VALUE(示意禁用分区发现)
super(
topics,
subscriptionPattern,
deserializer,
getLong(
checkNotNull(props, "props"),
KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
!getBoolean(props, KEY_DISABLE_METRICS, false));
this.properties = props;
setDeserializer(this.properties);
// 配置轮询超时工夫,如果在properties中配置了KEY_POLL_TIMEOUT参数,则返回具体的配置值,否则返回默认值DEFAULT_POLL_TIMEOUT
try {
if (properties.containsKey(KEY_POLL_TIMEOUT)) {
this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
} else {
this.pollTimeout = DEFAULT_POLL_TIMEOUT;
}
}
catch (Exception e) {
throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
}
}
- 其余办法剖析
KafkaFetcher对象创立
// 父类(FlinkKafkaConsumerBase)办法重写,该办法的作用是返回一个fetcher实例,
// fetcher的作用是连贯kafka的broker,拉去数据并进行反序列化,而后将数据输入为数据流(data stream)
@Override
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup,
boolean useMetrics) throws Exception {
// 确保当偏移量的提交模式为ON_CHECKPOINTS(条件1:开启checkpoint,条件2:consumer.setCommitOffsetsOnCheckpoints(true))时,禁用主动提交
// 该办法为父类(FlinkKafkaConsumerBase)的静态方法
// 这将笼罩用户在properties中配置的任何设置
// 当offset的模式为ON_CHECKPOINTS,或者为DISABLED时,会将用户配置的properties属性进行笼罩
// 具体是将ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"的值重置为"false
// 能够了解为:如果开启了checkpoint,并且设置了consumer.setCommitOffsetsOnCheckpoints(true),默认为true,
// 就会将kafka properties的enable.auto.commit强制置为false
adjustAutoCommitConfig(properties, offsetCommitMode);
return new KafkaFetcher<>(
sourceContext,
assignedPartitionsWithInitialOffsets,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
runtimeContext.getTaskNameWithSubtasks(),
deserializer,
properties,
pollTimeout,
runtimeContext.getMetricGroup(),
consumerMetricGroup,
useMetrics);
}
该办法的作用是返回一个fetcher实例,fetcher的作用是连贯kafka的broker,拉去数据并进行反序列化,而后将数据输入为数据流(data stream),在这里对主动偏移量提交模式进行了强制调整,即确保当偏移量的提交模式为ON_CHECKPOINTS(条件1:开启checkpoint,条件2:consumer.setCommitOffsetsOnCheckpoints(true))时,禁用主动提交。这将笼罩用户在properties中配置的任何设置,简略能够了解为:如果开启了checkpoint,并且设置了consumer.setCommitOffsetsOnCheckpoints(true),默认为true,就会将kafka properties的enable.auto.commit强制置为false。对于offset的提交模式,见下文的偏移量提交模式分析。
判断是否设置了主动提交
@Override
protected boolean getIsAutoCommitEnabled() {
//
return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
}
判断是否在kafka的参数开启了主动提交,即enable.auto.commit=true,并且auto.commit.interval.ms>0, 留神:如果没有没有设置enable.auto.commit的参数,则默认为true, 如果没有设置auto.commit.interval.ms的参数,则默认为5000毫秒。该办法会在FlinkKafkaConsumerBase的open办法进行初始化的时候调用。
反序列化
private static void setDeserializer(Properties props) {
// 默认的反序列化形式
final String deSerName = ByteArrayDeserializer.class.getName();
//获取用户配置的properties对于key与value的反序列化模式
Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
// 如果配置了,则应用用户配置的值
if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
}
if (valDeSer != null && !valDeSer.equals(deSerName)) {
LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
}
// 没有配置,则应用ByteArrayDeserializer进行反序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
}
确保配置了kafka音讯的key与value的反序列化形式,如果没有配置,则应用ByteArrayDeserializer序列化器,
ByteArrayDeserializer类的deserialize办法是间接将数据进行return,未做任何解决。
FlinkKafkaConsumerBase源码
@Internal
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
CheckpointListener,
ResultTypeQueryable<T>,
CheckpointedFunction {
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";
private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
private boolean enableCommitOnCheckpoints = true;
/**
* 偏移量的提交模式,仅能通过在FlinkKafkaConsumerBase#open(Configuration)进行配置
* 该值取决于用户是否开启了checkpoint
*/
private OffsetCommitMode offsetCommitMode;
/**
* 配置从哪个地位开始生产kafka的音讯,
* 默认为StartupMode#GROUP_OFFSETS,即从以后提交的偏移量开始生产
*/
private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
private Map<KafkaTopicPartition, Long> specificStartupOffsets;
private Long startupOffsetsTimestamp;
/**
* 确保当偏移量的提交模式为ON_CHECKPOINTS时,禁用主动提交,
* 这将笼罩用户在properties中配置的任何设置。
* 当offset的模式为ON_CHECKPOINTS,或者为DISABLED时,会将用户配置的properties属性进行笼罩
* 具体是将ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"的值重置为"false,即禁用主动提交
* @param properties kafka配置的properties,会通过该办法进行笼罩
* @param offsetCommitMode offset提交模式
*/
static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
}
}
/**
* 决定是否在开启checkpoint时,在checkpoin之后提交偏移量,
* 只有用户配置了启用checkpoint,该参数才会其作用
* 如果没有开启checkpoint,则应用kafka的配置参数:enable.auto.commit
* @param commitOnCheckpoints
* @return
*/
public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints) {
this.enableCommitOnCheckpoints = commitOnCheckpoints;
return this;
}
/**
* 从最早的偏移量开始生产,
*该模式下,Kafka 中的曾经提交的偏移量将被疏忽,不会用作起始地位。
*能够通过consumer1.setStartFromEarliest()进行设置
*/
public FlinkKafkaConsumerBase<T> setStartFromEarliest() {
this.startupMode = StartupMode.EARLIEST;
this.startupOffsetsTimestamp = null;
this.specificStartupOffsets = null;
return this;
}
/**
* 从最新的数据开始生产,
* 该模式下,Kafka 中的 已提交的偏移量将被疏忽,不会用作起始地位。
*
*/
public FlinkKafkaConsumerBase<T> setStartFromLatest() {
this.startupMode = StartupMode.LATEST;
this.startupOffsetsTimestamp = null;
this.specificStartupOffsets = null;
return this;
}
/**
*指定具体的偏移量工夫戳,毫秒
*对于每个分区,其工夫戳大于或等于指定工夫戳的记录将用作起始地位。
* 如果一个分区的最新记录早于指定的工夫戳,则只从最新记录读取该分区数据。
* 在这种模式下,Kafka 中的已提交 offset 将被疏忽,不会用作起始地位。
*/
protected FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
checkArgument(startupOffsetsTimestamp >= 0, "The provided value for the startup offsets timestamp is invalid.");
long currentTimestamp = System.currentTimeMillis();
checkArgument(startupOffsetsTimestamp <= currentTimestamp,
"Startup time[%s] must be before current time[%s].", startupOffsetsTimestamp, currentTimestamp);
this.startupMode = StartupMode.TIMESTAMP;
this.startupOffsetsTimestamp = startupOffsetsTimestamp;
this.specificStartupOffsets = null;
return this;
}
/**
*
* 从具体的消费者组最近提交的偏移量开始生产,为默认形式
* 如果没有发现分区的偏移量,应用auto.offset.reset参数配置的值
* @return
*/
public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
this.startupMode = StartupMode.GROUP_OFFSETS;
this.startupOffsetsTimestamp = null;
this.specificStartupOffsets = null;
return this;
}
/**
*为每个分区指定偏移量进行生产
*/
public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets) {
this.startupMode = StartupMode.SPECIFIC_OFFSETS;
this.startupOffsetsTimestamp = null;
this.specificStartupOffsets = checkNotNull(specificStartupOffsets);
return this;
}
@Override
public void open(Configuration configuration) throws Exception {
// determine the offset commit mode
// 决定偏移量的提交模式,
// 第一个参数为是否开启了主动提交,
// 第二个参数为是否开启了CommitOnCheckpoint模式
// 第三个参数为是否开启了checkpoint
this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
getIsAutoCommitEnabled(),
enableCommitOnCheckpoints,
((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
// 省略的代码
}
// 省略的代码
/**
* 创立一个fetcher用于连贯kafka的broker,拉去数据并进行反序列化,而后将数据输入为数据流(data stream)
* @param sourceContext 数据输入的上下文
* @param subscribedPartitionsToStartOffsets 以后sub task须要解决的topic分区汇合,即topic的partition与offset的Map汇合
* @param watermarksPeriodic 可选,一个序列化的工夫戳提取器,生成periodic类型的 watermark
* @param watermarksPunctuated 可选,一个序列化的工夫戳提取器,生成punctuated类型的 watermark
* @param runtimeContext task的runtime context上下文
* @param offsetCommitMode offset的提交模式,有三种,别离为:DISABLED(禁用偏移量主动提交),ON_CHECKPOINTS(仅仅当checkpoints实现之后,才提交偏移量给kafka)
* KAFKA_PERIODIC(应用kafka主动提交函数,周期性主动提交偏移量)
* @param kafkaMetricGroup Flink的Metric
* @param useMetrics 是否应用Metric
* @return 返回一个fetcher实例
* @throws Exception
*/
protected abstract AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup kafkaMetricGroup,
boolean useMetrics) throws Exception;
protected abstract boolean getIsAutoCommitEnabled();
// 省略的代码
}
上述代码是FlinkKafkaConsumerBase的局部代码片段,基本上对其做了具体正文,外面的有些办法是FlinkKafkaConsumer继承的,有些是重写的。之所以在这里给出,能够对照FlinkKafkaConsumer的源码,从而不便了解。
偏移量提交模式分析
Flink Kafka Consumer 容许有配置如何将 offset 提交回 Kafka broker(或 0.8 版本的 Zookeeper)的行为。请留神:Flink Kafka Consumer 不依赖于提交的 offset 来实现容错保障。提交的 offset 只是一种办法,用于公开 consumer 的进度以便进行监控。
配置 offset 提交行为的办法是否雷同,取决于是否为 job 启用了 checkpointing。在这里先给出提交模式的具体论断,上面会对两种形式进行具体的剖析。根本的论断为:
-
开启checkpoint
- 状况1:用户通过调用 consumer 上的 setCommitOffsetsOnCheckpoints(true) 办法来启用 offset 的提交(默认状况下为 true )
那么当 checkpointing 实现时,Flink Kafka Consumer 将提交的 offset 存储在 checkpoint 状态中。
这确保 Kafka broker 中提交的 offset 与 checkpoint 状态中的 offset 统一。
留神,在这个场景中,Properties 中的主动定期 offset 提交设置会被齐全疏忽。
此状况应用的是ON_CHECKPOINTS
- 状况2:用户通过调用 consumer 上的 setCommitOffsetsOnCheckpoints(“false”) 办法来禁用 offset 的提交,则应用DISABLED模式提交offset
- 未开启checkpoint
Flink Kafka Consumer 依赖于外部应用的 Kafka client 主动定期 offset 提交性能,因而,要禁用或启用 offset 的提交 -
状况1:配置了Kafka properties的参数配置了”enable.auto.commit” = “true”或者 Kafka 0.8 的 auto.commit.enable=true,应用KAFKA_PERIODIC模式提交offset,即主动提交offset
- 状况2:没有配置enable.auto.commit参数,应用DISABLED模式提交offset,这意味着kafka不晓得以后的消费者组的消费者每次生产的偏移量。
提交模式源码剖析
- offset的提交模式
public enum OffsetCommitMode {
// 禁用偏移量主动提交
DISABLED,
// 仅仅当checkpoints实现之后,才提交偏移量给kafka
ON_CHECKPOINTS,
// 应用kafka主动提交函数,周期性主动提交偏移量
KAFKA_PERIODIC;
}
- 提交模式的调用
public class OffsetCommitModes {
public static OffsetCommitMode fromConfiguration(
boolean enableAutoCommit,
boolean enableCommitOnCheckpoint,
boolean enableCheckpointing) {
// 如果开启了checkinpoint,执行上面判断
if (enableCheckpointing) {
// 如果开启了checkpoint,进一步判断是否在checkpoin启用时提交(setCommitOffsetsOnCheckpoints(true)),如果是则应用ON_CHECKPOINTS模式
// 否则应用DISABLED模式
return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
} else {
// 若Kafka properties的参数配置了"enable.auto.commit" = "true",则应用KAFKA_PERIODIC模式提交offset
// 否则应用DISABLED模式
return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
}
}
}
小结
本文次要介绍了Flink Kafka Consumer,首先对FlinkKafkaConsumer的不同版本进行了比照,而后给出了一个残缺的Demo案例,并对案例的配置参数进行了具体解释,接着剖析了FlinkKafkaConsumer的继承关系,并别离对FlinkKafkaConsumer以及其父类FlinkKafkaConsumerBase的源码进行了解读,最初从源码层面剖析了Flink Kafka Consumer的偏移量提交模式,并对每一种提交模式进行了梳理。
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包
发表回复