乐趣区

Flink实战八-Streaming-Connectors-编程

1 概览

1.1 预定义的源和接收器

Flink 内置了一些基本数据源和接收器,并且始终可用。该预定义的数据源包括文件,目录和插 socket,并从集合和迭代器摄取数据。该预定义的数据接收器支持写入文件和标准输入输出及 socket。

1.2 绑定连接器

连接器提供用于与各种第三方系统连接的代码。目前支持这些系统:

  • Apache Kafka (source/sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)
  • Google PubSub (source/sink)

要在应用程序中使用其中一个连接器,通常需要其他第三方组件,例如数据存储或消息队列的服务器。

虽然本节中列出的流连接器是 Flink 项目的一部分,并且包含在源版本中,但它们不包含在二进制分发版中。

1.3 Apache Bahir 中的连接器

Flink 的其他流处理连接器正在通过 Apache Bahir 发布,包括:

  • Apache ActiveMQ (source/sink)
  • Apache Flume (sink)
  • Redis (sink)
  • Akka (sink)
  • Netty (source)

1.4 其他连接到 Flink 的方法

1.4.1 通过异步 I / O 进行数据渲染

使用连接器不是将数据输入和输出 Flink 的唯一方法。一种常见的模式是在一个 Map 或多个 FlatMap 中查询外部数据库或 Web 服务以渲染主数据流。

Flink 提供了一个用于异步 I / O 的 API,以便更有效,更稳健地进行这种渲染。

1.4.2 可查询状态

当 Flink 应用程序将大量数据推送到外部数据存储时,这可能会成为 I / O 瓶颈。如果所涉及的数据具有比写入更少的读取,则更好的方法可以是外部应用程序从 Flink 获取所需的数据。在可查询的状态界面,允许通过 Flink 被管理的状态,按需要查询支持这个。

2 HDFS 连接器

此连接器提供一个 Sink,可将分区文件写入任一 Hadoop 文件系统支持的文件系统。

  • 要使用此连接器,请将以下依赖项添加到项目中:


请注意,流连接器当前不是二进制发布的一部分

2.1 Bucketing File Sink

可以配置分段行为以及写入,但我们稍后会介绍。这是可以创建一个默认情况下汇总到按时间拆分的滚动文件的存储槽的方法

  • Java

  • Scala

唯一必需的参数是存储桶的基本路径。可以通过指定自定义 bucketer,写入器和批量大小来进一步配置接收器。

默认情况下,当数据元到达时,分段接收器将按当前系统时间拆分,并使用日期时间模式 ”yyyy-MM-dd–HH” 命名存储区。这种模式传递给 DateTimeFormatter 使用当前系统时间和 JVM 的默认时区来形成存储桶路径。用户还可以为 bucketer 指定时区以格式化存储桶路径。每当遇到新日期时,都会创建一个新存储桶。

例如,如果有一个包含分钟作为最精细粒度的模式,将每分钟获得一个新桶。每个存储桶本身都是一个包含多个部分文件的目录:接收器的每个并行实例将创建自己的部件文件,当部件文件变得太大时,接收器也会在其他文件旁边创建新的部件文件。当存储桶变为非活动状态时,将刷新并关闭打开的部件文件。如果存储桶最近未写入,则视为非活动状态。默认情况下,接收器每分钟检查一次非活动存储桶,并关闭任何超过一分钟未写入的存储桶。setInactiveBucketCheckInterval()并 setInactiveBucketThreshold()在一个 BucketingSink。

也可以通过指定自定义 bucketer setBucketer()上 BucketingSink。如果需要,bucketer 可以使用数据元或元组的属性来确定 bucket 目录。

默认编写器是 StringWriter。这将调用 toString()传入的数据元并将它们写入部分文件,由换行符分隔。在 a setWriter() 上指定自定义编写器使用 BucketingSink。如果要编写 Hadoop SequenceFiles,可以使用提供的 SequenceFileWriter,也可以配置为使用压缩。

有两个配置选项指定何时应关闭零件文件并启动新零件文件:

  • 通过设置批量大小(默认部件文件大小为 384 MB)
  • 通过设置批次滚动时间间隔(默认滚动间隔为Long.MAX_VALUE

当满足这两个条件中的任何一个时,将启动新的部分文件。看如下例子:

  • Java

  • Scala

  • 这将创建一个接收器,该接收器将写入遵循此模式的存储桶文件:

  • Java

  • 生成结果

  • date-time 是我们从日期 / 时间格式获取的字符串
  • parallel-task 是并行接收器实例的索引
  • count 是由于批处理大小或批处理翻转间隔而创建的部分文件的运行数

然而这种方式创建了太多小文件,不适合 HDFS!仅供娱乐!

3 Apache Kafka 连接器

3.1 简介

此连接器提供对 Apache Kafka 服务的事件流的访问。

Flink 提供特殊的 Kafka 连接器,用于从 / 向 Kafka 主题读取和写入数据。Flink Kafka Consumer 集成了 Flink 的检查点机制,可提供一次性处理语义。为实现这一目标,Flink 并不完全依赖 Kafka 的消费者群体偏移跟踪,而是在内部跟踪和检查这些偏移。

为用例和环境选择一个包(maven artifact id)和类名。对于大多数用户来说,FlinkKafkaConsumer08(部分 flink-connector-kafka)是合适的。

然后,导入 maven 项目中的连接器:

环境配置参考

3.2 ZooKeeper 安装及配置

  • 下载 zk

http://archive.cloudera.com/cdh5/cdh/5/zookeeper-3.4.5-cdh5.15.1.tar.gz

  • 配置系统环境

  • 修改配置数据存储路径

  • 启动3.3 Kafka 部署及测试假设你刚刚开始并且没有现有的 Kafka 或 ZooKeeper 数据

由于 Kafka 控制台脚本对于基于 Unix 和 Windows 的平台不同,因此在 Windows 平台上使用 bin windows 而不是 bin /,并将脚本扩展名更改为.bat。

Step 1: 下载代码

  • 下载

  • 解压

  • 配置环境变量



  • 配置服务器属性
  • 修改日志存储路径
  • 修改主机名

Step 2: 启动服务器

Kafka 使用 ZooKeeper,因此如果还没有 ZooKeeper 服务器,则需要先启动它。

  • 后台模式启动

Step 3: 创建一个主题

  • 创建 topic

Step 4: 发送一些消息

Kafka 附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到 Kafka 集群。默认情况下,每行将作为单独的消息发送。

运行生产者,然后在控制台中键入一些消息以发送到服务器。

  • 启动生产者

Step 5: 启动一个消费者

Kafka 还有一个命令行使用者,它会将消息转储到标准输出。

  • 分屏,新建消费端

  • 在不同的终端中运行上述每个命令,那么现在应该能够在生产者终端中键入消息并看到它们出现在消费者终端中

所有命令行工具都有其他选项; 运行不带参数的命令将显示更详细地记录它们的使用信息。

3.4 Kafka 1.0.0+ Connector

从 Flink 1.7 开始,有一个新的通用 Kafka 连接器,它不跟踪特定的 Kafka 主要版本。相反,它在 Flink 发布时跟踪最新版本的 Kafka。

如果您的 Kafka 代理版本是 1.0.0 或更高版本,则应使用此 Kafka 连接器。如果使用旧版本的 Kafka(0.11,0.10,0.9 或 0.8),则应使用与代理版本对应的连接器。

兼容性

通过 Kafka 客户端 API 和代理的兼容性保证,通用 Kafka 连接器与较旧和较新的 Kafka 代理兼容。它与版本 0.11.0 或更高版本兼容,具体取决于所使用的功能。

将 Kafka Connector 从 0.11 迁移到通用(V1.10 新增)

要执行迁移,请参阅升级作业和 Flink 版本指南和

  • 在整个过程中使用 Flink 1.9 或更新版本。
  • 不要同时升级 Flink 和操作符。
  • 确保您作业中使用的 Kafka Consumer 和 / 或 Kafka Producer 分配了唯一标识符(uid):
  • 使用 stop with savepoint 功能获取保存点(例如,使用 stop –withSavepoint)CLI 命令。

用法

  • 要使用通用 Kafka 连接器,请为其添加依赖关系:


然后实例化新源(FlinkKafkaConsumer)

Flink Kafka Consumer 是一个流数据源,可以从 Apache Kafka 中提取并行数据流。使用者可以在多个并行实例中运行,每个实例都将从一个或多个 Kafka 分区中提取数据。

Flink Kafka Consumer 参与了检查点,并保证在故障期间没有数据丢失,并且计算处理元素“恰好一次”。(注意:这些保证自然会假设 Kafka 本身不会丢失任何数据。)

请注意,Flink 在内部将偏移量作为其分布式检查点的一部分进行快照。承诺给 Kafka 的抵消只是为了使外部的进展观与 Flink 对进展的看法同步。这样,监控和其他工作可以了解 Flink Kafka 消费者在多大程度上消耗了一个主题。

和接收器(FlinkKafkaProducer)。

除了从模块和类名中删除特定的 Kafka 版本之外,API 向后兼容 Kafka 0.11 连接器。

3.5 Kafka 消费者

Flink 的 Kafka 消费者被称为 FlinkKafkaConsumer08(或 09Kafka 0.9.0.x 等)。它提供对一个或多个 Kafka 主题的访问。

构造函数接受以下参数:

  • 主题名称 / 主题名称列表
  • DeserializationSchema / KeyedDeserializationSchema 用于反序列化来自 Kafka 的数据
  • Kafka 消费者的属性。需要以下属性:

    • “bootstrap.servers”(以逗号分隔的 Kafka 经纪人名单)
    • “zookeeper.connect”(逗号分隔的 Zookeeper 服务器列表)(仅 Kafka 0.8 需要)
    • “group.id”消费者群组的 ID


  • 上述程序注意配置 ip 主机映射
  • 虚拟机 hosts

  • 本地机器 hosts
  • 发送消息

  • 运行程序消费消息


Example:

  • Java

  • Scala

The DeserializationSchema

Flink Kafka Consumer 需要知道如何将 Kafka 中的二进制数据转换为 Java / Scala 对象。

在 DeserializationSchema 允许用户指定这样的一个架构。T deserialize(byte[] message) 为每个 Kafka 消息调用该方法,从 Kafka 传递值。

从它开始通常很有帮助 AbstractDeserializationSchema,它负责将生成的 Java / Scala 类型描述为 Flink 的类型系统。实现 vanilla 的用户 DeserializationSchema 需要自己实现该 getProducedType(…)方法。

为了访问 Kafka 消息的键和值,KeyedDeserializationSchema 具有以下 deserialize 方法T deserialize(byte [] messageKey,byte [] message,String topic,int partition,long offset)

为方便起见,Flink 提供以下模式:

  • TypeInformationSerializationSchema(和 TypeInformationKeyValueSerializationSchema)创建基于 Flink 的模式 TypeInformation。如果 Flink 编写和读取数据,这将非常有用。此模式是其他通用序列化方法的高性能 Flink 替代方案。
  • JsonDeserializationSchema(和 JSONKeyValueDeserializationSchema)将序列化的 JSON 转换为 ObjectNode 对象,可以使用 objectNode.get(“field”)作为(Int / String / …)()从中访问字段。KeyValue objectNode 包含一个“key”和“value”字段,其中包含所有字段,以及一个可选的“元数据”字段,用于公开此消息的偏移量 / 分区 / 主题。
  • AvroDeserializationSchema 它使用静态提供的模式读取使用 Avro 格式序列化的数据。它可以从 Avro 生成的类(AvroDeserializationSchema.forSpecific(…))中推断出模式,也可以 GenericRecords 使用手动提供的模式(with AvroDeserializationSchema.forGeneric(…))。此反序列化架构要求序列化记录不包含嵌入式架构。

    • 还有一个可用的模式版本,可以在 Confluent Schema Registry 中查找编写器的模式(用于编写记录的 模式)。使用这些反序列化模式记录将使用从模式注册表中检索的模式进行读取,并转换为静态提供的模式(通过 ConfluentRegistryAvroDeserializationSchema.forGeneric(…)或 ConfluentRegistryAvroDeserializationSchema.forSpecific(…))。

要使用此反序列化模式,必须添加以下附加依赖项:

当遇到因任何原因无法反序列化的损坏消息时,有两个选项 – 从 deserialize(…)方法中抛出异常将导致作业失败并重新启动,或者返回 null 以允许 Flink Kafka 使用者以静默方式跳过损坏的消息。请注意,由于使用者的容错能力(请参阅下面的部分以获取更多详细信息),因此对损坏的消息执行失败将使消费者尝试再次反序列化消息。因此,如果反序列化仍然失败,则消费者将在该损坏的消息上进入不间断重启和失败循环。

3.6 Kafka 生产者

Flink 的 Kafka Producer 被称为 FlinkKafkaProducer011(或 010 对于 Kafka 0.10.0.x 版本。或者直接就是 FlinkKafkaProducer,对于 Kafka>=1.0.0 的版本来说)。

它允许将记录流写入一个或多个 Kafka 主题。

自应用

  • Pro
  • 确保启动端口

  • Pro 端生产消息

  • 消费端接收

Example

  • Java

  • Scala

上面的示例演示了创建 Flink Kafka Producer 以将流写入单个 Kafka 目标主题的基本用法。对于更高级的用法,还有其他构造函数变体允许提供以下内容:

  • 提供自定义属性

生产者允许为内部的 KafkaProducer 提供自定义属性配置。

  • 自定义分区程序

将记录分配给特定分区,可以为 FlinkKafkaPartitioner 构造函数提供实现。将为流中的每个记录调用此分区程序,以确定应将记录发送到的目标主题的确切分区。

  • 高级序列化模式

与消费者类似,生产者还允许使用调用的高级序列化模式 KeyedSerializationSchema,该模式允许单独序列化键和值。它还允许覆盖目标主题,以便一个生产者实例可以将数据发送到多个主题。

3.8 Kafka 消费者开始位置配置

Flink Kafka Consumer 允许配置如何确定 Kafka 分区的起始位置。

  • Java

  • Scala

Flink Kafka Consumer 的所有版本都具有上述明确的起始位置配置方法。

  • setStartFromGroupOffsets(默认行为)

从 group.idKafka 代理(或 Zookeeper for Kafka 0.8)中的消费者组(在消费者属性中设置)提交的偏移量开始读取分区。如果找不到分区的偏移量,auto.offset.reset 将使用属性中的设置。

  • setStartFromEarliest()/ setStartFromLatest()

从最早 / 最新记录开始。在这些模式下,Kafka 中的承诺偏移将被忽略,不会用作起始位置。

  • setStartFromTimestamp(long)

从指定的时间戳开始。对于每个分区,时间戳大于或等于指定时间戳的记录将用作起始位置。如果分区的最新记录早于时间戳,则只会从最新记录中读取分区。在此模式下,Kafka 中的已提交偏移将被忽略,不会用作起始位置。

还可以指定消费者应从每个分区开始的确切偏移量:

  • Java

  • Scala

上面的示例将使用者配置为从主题的分区 0,1 和 2 的指定偏移量开始 myTopic。偏移值应该是消费者应为每个分区读取的下一条记录。请注意,如果使用者需要读取在提供的偏移量映射中没有指定偏移量的分区,则它将回退到 setStartFromGroupOffsets()该特定分区的默认组偏移行为(即)。

请注意,当作业从故障中自动恢复或使用保存点手动恢复时,这些起始位置配置方法不会影响起始位置。在恢复时,每个 Kafka 分区的起始位置由存储在保存点或检查点中的偏移量确定。

3.9 Kafka 生产者和容错

Kafka 0.8

在 0.9 之前,Kafka 没有提供任何机制来保证至少一次或恰好一次的语义。

Kafka 0.9 和 0.10

启用 Flink 的检查点时,FlinkKafkaProducer09 和 FlinkKafkaProducer010 能提供 至少一次 传输保证。

除了开启 Flink 的检查点,还应该配置 setter 方法:

  • setLogFailuresOnly(boolean)

默认为 false。启用此选项将使生产者仅记录失败日志而不是捕获和重新抛出它们。这大体上就是计数已成功的记录,即使它从未写入目标 Kafka 主题。这必须设为 false 对于确保 至少一次

  • setFlushOnCheckpoint(boolean)

默认为 true。启用此函数后,Flink 的检查点将在检查点成功之前等待检查点时的任何动态记录被 Kafka 确认。这可确保检查点之前的所有记录都已写入 Kafka。必须开启,对于确保 至少一次

总之,默认情况下,Kafka 生成器对版本 0.9 和 0.10 具有 至少一次 保证,即

setLogFailureOnly 设置为 false 和 setFlushOnCheckpoint 设置为 true。

默认情况下,重试次数设置为“0”。这意味着当 setLogFailuresOnly 设置为时 false,生产者会立即失败,包括 Leader 更改。
默认情况下,该值设置为“0”,以避免重试导致目标主题中出现重复消息。对于经常更改代理的大多数生产环境,建议将重试次数设置为更高的值。
Kafka 目前没有生产者事务,因此 Flink 在 Kafka 主题里无法保证恰好一次交付

Kafka >= 0.11

启用 Flink 的检查点后,FlinkKafkaProducer011

对于 Kafka >= 1.0.0 版本是 FlinkKafkaProduce

可以提供准确的一次交付保证。

除了启用 Flink 的检查点,还可以通过将适当的语义参数传递给 FlinkKafkaProducer011, 选择三种不同的算子操作模式

  • Semantic.NONE

Flink 啥都不保证。生成的记录可能会丢失,也可能会重复。

  • Semantic.AT_LEAST_ONCE(默认设置)


类似于 setFlushOnCheckpoint(true)在 FlinkKafkaProducer010。这可以保证不会丢失任何记录(尽管它们可以重复)。

  • Semantic.EXACTLY_ONCE

使用 Kafka 事务提供恰好一次的语义。每当您使用事务写入 Kafka 时,不要忘记为任何从 Kafka 消费记录的应用程序设置所需的 isolation.level(read_committed 或 read_uncommitted- 后者为默认值)。

注意事项

Semantic.EXACTLY_ONCE 模式依赖于在从所述检查点恢复之后提交在获取检查点之前启动的事务的能力。如果 Flink 应用程序崩溃和完成重启之间的时间较长,那么 Kafka 的事务超时将导致数据丢失(Kafka 将自动中止超过超时时间的事务)。考虑到这一点,请根据预期的停机时间适当配置事务超时。

Kafka broker 默认 transaction.max.timeout.ms 设置为 15 分钟。此属性不允许为生产者设置大于其值的事务超时。

FlinkKafkaProducer011 默认情况下,将_transaction.timeout.msproducer config_中的属性设置为 1 小时,因此_transaction.max.timeout.ms_在使用 Semantic.EXACTLY_ONCE 模式之前应该增加 该属性。

在_read_committed_模式中 KafkaConsumer,任何未完成的事务(既不中止也不完成)将阻止来自给定 Kafka 主题的所有读取超过任何未完成的事务。换言之,遵循以下事件顺序:

  1. 用户事务 1 开启并写记录
  2. 用户事务 2 开启并写了一些其他记录
  3. 用户提交事务 2

即使事务 2 已经提交了记录,在事务 1 提交或中止之前,消费者也不会看到它们。这有两个含义:

  • 首先,在 Flink 应用程序的正常工作期间,用户可以预期 Kafka 主题中生成的记录的可见性会延迟,等于已完成检查点之间的平均时间。
  • 其次,在 Flink 应用程序失败的情况下,读者将阻止此应用程序编写的主题,直到应用程序重新启动或配置的事务超时时间过去为止。此注释仅适用于有多个代理 / 应用程序写入同一 Kafka 主题的情况。

Semantic.EXACTLY_ONCE 模式为每个 FlinkKafkaProducer011 实例使用固定大小的 KafkaProducers 池。每个检查点使用其中一个生产者。如果并发检查点的数量超过池大小,FlinkKafkaProducer011 将引发异常并将使整个应用程序失败。请相应地配置最大池大小和最大并发检查点数。
Semantic.EXACTLY_ONCE 采取所有可能的措施,不要留下任何阻碍消费者阅读 Kafka 主题的延迟事务,这是必要的。但是,如果 Flink 应用程序在第一个检查点之前失败,则在重新启动此类应用程序后,系统中没有关于先前池大小的信息。因此,在第一个检查点完成之前按比例缩小 Flink 应用程序是不安全的 _FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR_。

3.10 Kafka 消费者及其容错

启用 Flink 的检查点后,Flink Kafka Consumer 将使用主题中的记录,并以一致的方式定期检查其所有 Kafka 偏移以及其他 算子操作的状态。如果作业失败,Flink 会将流式程序恢复到最新检查点的状态,并从存储在检查点中的偏移量开始重新使用来自 Kafka 的记录。

因此,绘制检查点的间隔定义了程序在发生故障时最多可以返回多少。

检查点常用参数

enableCheckpointing

启用流式传输作业的检查点。将定期快照流式数据流的分布式状态。如果发生故障,流数据流将从最新完成的检查点重新启动。

该作业在给定的时间间隔内定期绘制检查点。状态将存储在配置的状态后端。

此刻未正确支持检查点迭代流数据流。如果“force”参数设置为 true,则系统仍将执行作业。

setCheckpointingMode

setCheckpointTimeout

setMaxConcurrentCheckpoints

要使用容错的 Kafka 使用者,需要在运行环境中启用拓扑的检查点:

  • Scala

  • Java

另请注意,如果有足够的处理插槽可用于重新启动拓扑,则 Flink 只能重新启动拓扑。因此,如果拓扑由于丢失了 TaskManager 而失败,那么之后仍然必须有足够的可用插槽。YARN 上的 Flink 支持自动重启丢失的 YARN 容器。

如果未启用检查点,Kafka 使用者将定期向 Zookeeper 提交偏移量。

参考

Streaming Connectors

Kafka 官方文档

退出移动版