乐趣区

关于apache:打造全新批流融合详解-Apache-Flink-1140-发布的-Pulsar-Flink-Connector

作者简介

 盛宇帆,StreamNative 开发工程师,Apache Pulsar 与 Apache Flink 贡献者。退出 StreamNative 之前,他曾就任于阿里大数据平台和腾讯云负责 Flink 开发工作。盛宇帆是腾讯云我的项目 Barad 的外围 committer 与我的项目落地负责人。他目前在 StreamNative 负责 Pulsar-Flink 和 Pulsar-Spark 相干的开发工作,他和他的团队曾经将 Pulsar Source Connector 奉献给 Flink 社区,并于 Apache Flink 1.14.0 公布,并将在后续的公布中残缺地将 Pulsar Connector 奉献给社区。
编辑:Jipei@StreamNative,Apache Pulsar 贡献者。

本文摘要

  • 批流一体是数据计算的将来趋势,Pulsar Flink Connector 为基于 Apache Pulsar 在 Apache Flink 上以批流一体的形式解决数据提供了现实的解决方案。
  • StreamNative 已将 Pulsar Source Connector 奉献至 Flink 1.14.0 版本。用户能够应用它从 Pulsar 读取数据,并保障每条数据只被解决一次。
  • 最新 Pulsar Flink Connector 基于 Pulsar 2.8.0 和 Flink 1.14 版本,反对 Pulsar 的事务处理,进一步交融了两者的个性。

背景

随着数据日益收缩,采纳事件流解决数据至关重要。Apache Flink 将批流解决对立到计算引擎中,提供了统一化的编程接口。Apache Pulsar(与 Apache BookKeeper 一起)以 “ 流 “ 的形式对立数据。在 Pulsar 中,数据存储成一个正本,以流(streaming)(通过 pub-sub 接口)和 segment(用于批处理)的形式进行拜访。Pulsar 解决了企业在应用不同的存储和音讯技术解决方案时遇到的数据孤岛问题。

Flink 能够间接与 Pulsar broker 进行实时的流式读写,同时 Flink 也能够批量读取 Pulsar 底层离线存储,与 BookKeeper 的内容进行批次读写。同时反对批流,使得 Pulsar 和 Flink 先天就是符合的搭档。把 Flink 和 Pulsar 联合应用,这两种开源技术能够创立一个对立的数据架构,为实时数据驱动企业提供最佳解决方案。

为了将 Pulsar 与 Flink 的性能进行整合,为用户提供更弱小的开发能力,StreamNative 开发并开源了 Pulsar Flink Connector。通过屡次的打磨,Pulsar Flink Connector 已合并进 Flink 代码仓库,并在 Flink 1.14.0 版本中公布!

Pulsar Flink Connector 基于 Apache Pulsar 和 Apache Flink 提供弹性数据处理,容许 Apache Flink 读写 Apache Pulsar 中的数据。应用 Pulsar Flink Connector,企业可能更专一于业务逻辑,无需关注存储问题。

打造全新的 Pulsar Flink Connector

在此版本之前,StreamNative 已公布 Pulsar Flink Connector 2.7 版本。为什么要颠覆之前的代码,从新打造批流交融呢?在新版本中进行了哪些重构呢?

新版本改变

拆分设计

所有的数据生产都是基于 split(分流)创立 Reader 去生产数据。如何将 Pulsar 音讯形象为 split?首先咱们对 topic 进行形象,针对每一个分区创立 Partition 示例。对有分区的 topic 就按数量创立,而对无分区的 topic 只有 1 个 partition,其值为 -1。

在 Pulsar 的 exclusive(独占)、shared(共享)和 failover(灾备)订阅模式中,咱们将 topic partition 包装为在 Flink 上生产的 split,其中蕴含生产节点、存储节点和两个非凡的状态,最初生产的音讯 ID 和以后解决的事务 ID 别离用于 Pulsar 的不同模式。在 Pulsar 的 key_shared(键共享)模式中,在 topic partition 和 split 间映射的时候减少了 range 层。

针对每个分区创立 split 的起因在于:

  • Pulsar 的分区理论也是 topic;
  • Topic 分区理论是子 topic;
  • 仅可在繁多 topic 上执行 Consumer.seek()。

枚举器(enumerator)设计

枚举器对应 split 散发和订阅的接口。这个设计留神分成两个局部,一部分是基于 TopicList,对于用户给定的一组 topic,从 Pulsar 进行信息查问;另一部分是 Topic Pattern,查问以后 topic、正则匹配并创立 split。

在 exclusive(独占)、key_shared(键共享)和 failover(灾备)模式中,一个 split 只会被以轮循的形式调配给一个 reader。

在 shared(共享)模式中,每个 split 会分给每个 reader,在此模式中,每个 reader 会生产 Pulsar 的每个 partition。

Reader 设计

在 exclusive(独占)和 failover(灾备)模式中,Reader 设计如下:

咱们能够看到这个 topic 以后有三个分区,在 enumerator 这一层依据分区创立 3 个 split,Flink 的并行度为 3,产生 Reader 0、1、2 三个 reader 别离生产 split,由此造成独占的生产模式。Failover 模式和独占模式是一样的生产模型,二者都是程序生产。

Unordered Reader
 
Unordered Split Reader
SortedMap<Long, Map<TopicPartition, MessageID>> cursorsToCommit
ConcurrentMap<TopicPartition,MessageID> cursorsOfFinishedSplits
ScheduledExecutorService cursorScheduler

在 Pulsar 的 Shared 和 Key_shared 模式下,生产是无序的。咱们既不心愿它程序生产,也不心愿一条条地 ACK。于是咱们在这里引入事务(transaction),每创立一条音讯就开启一个事务,在事务内进行 ACK,事务 ACK 会在 checkpoint 上进行提交。

Unordered Reader
 
Unordered Split Reader
TransactionCoordinatorClient coordinatorClient
SortedMap<Long, List<TxnID>> transactionsToCommit
List<TxnID> transactionsOfFinishedSplits

类型零碎

Pulsar 同 Flink 相似,都有类型零碎。

Flink 的类型零碎:

  • DeserializationSchema:对原始数据进行解码;
  • TypeInformation:Flink 每个 strength?之间基于 TypeInformation 进行数据序列化而传输
  • TypeSerializer:TypeInformation 创立的序列化实例。

在 Pulsar 中:

  • Schema:Pulsar Schema 是 Client 端数据序列化和反序列化的接口;
  • SchemaInfo:接口创立 SchemaInfo 传输给 Broker,broker 依据 SchemaInfo 进行 Schema 版本的兼容和 Schema 是否可能降级的校验。SchemaInfo 使 broker 不须要进行序列化和反序列化;
  • SchemaDefinition:给 Client 创立 Schema 所需的实例。

因而 Pulsar 和 Flink 在类型零碎上进行买通,就产生了以下两种模式:

  • 常见模式:Reader 以 Byte 数据的模式进行生产,用 Flink 的 DeserializationSchema 进行解析,DeserializationSchema 自带的 TypeInformation 向上游传递。Flink 和其余音讯零碎也是用这种模式。

  • Pulsar 独有的模式:Reader 以 Byte 数据的模式进行生产,在 Flink 上以 Pulsar Schema 将数据进行解码,并主动创立能在 Flink 上应用的 TypeInformation。

然而在第二种模式中没有用到 Pulsar 自带的 Schema 兼容和校验,在下个版本中咱们将用到这个个性。

版本要求

Flink 以后只提供 Pulsar Source connector,用户能够应用它从 Pulsar 读取数据,并保障每条数据只被解决一次。

连接器以后反对 Pulsar 2.7.0 之后的版本,然而连接器应用到了 Pulsar 的事务机制,倡议在 Pulsar 2.8.0 及其之后的版本上应用连接器进行数据读取。更多对于 Pulsar API 兼容性设计可浏览 PIP-72。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-pulsar_2.11</artifactId>
    <version>1.14.0</version>
</dependency>

浏览文档,理解如何将连接器增加到 Flink 集群实例内。

应用 Flink 1.14.0 的 Pulsar Source Connector

新版本的 Pulsar Source Connector 已被合并进 Flink 最新公布的 1.14.0 版本。如果要想应用基于旧版的 SourceFunction 实现的 Pulsar Source Connector,或者是应用的 Flink 版本低于 1.14,能够应用 StreamNative 独自保护的 pulsar-flink。

结构 Pulsar Source Connector 实例

Pulsar Source Connector 提供了 builder 类来结构 Source Connector 实例。上面的代码实例应用 builder 类创立的 Source Connector 会从 topic“persistent://public/default/my-topic”的数据开始端进行生产。连接器应用了 Exclusive(独占)的订阅形式生产音讯,订阅名称为 my-subscription,并把音讯体的二进制字节流以 UTF-8 的形式编码为字符串。

PulsarSource<String> pulsarSource = PulsarSource.builder()
 .setServiceUrl(serviceUrl)
    .setAdminUrl(adminUrl)
    .setStartCursor(StartCursor.earliest())
    .setTopics("my-topic")
    .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
    .setSubscriptionName("my-subscription")
    .setSubscriptionType(SubscriptionType.Exclusive)
    .build();
 
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");

如果应用结构类结构 Pulsar Source Connector,肯定要提供上面几个属性:

  • Pulsar 数据生产的地址,应用 setServiceUrl(String) 办法提供;
  • Pulsar HTTP 治理地址,应用 setAdminUrl(String) 办法提供;
  • Pulsar 订阅名称,应用 setSubscriptionName(String) 办法提供;
  • 须要生产的 topic 或者是 topic 上面的分区,详见指定生产的 Topic 或者 Topic 分区;
  • 解码 Pulsar 音讯的反序列化器,详见反序列化器。

指定生产的 Topic/Topic 分区

Pulsar Source Connector 提供了两种订阅 topic 或 topic 分区的形式:

  • Topic 列表,从这个 Topic 的所有分区上生产音讯,例如:
PulsarSource.builder().setTopics("some-topic1", "some-topic2")
 
// 从 topic "topic-a" 的 0 和 1 分区上生产
PulsarSource.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
  • Topic 正则,连接器应用给定的正则表达式匹配出所有合规的 topic,例如:
PulsarSource.builder().setTopicPattern("topic-*")

Topic 名称简写

从 Pulsar 2.0 之后,残缺的 topic 名称格局为 {persistent|non-persistent}:// 租户 / 命名空间 /topic。然而连接器不须要提供 topic 名称的残缺定义,因为 topic 类型、租户、命名空间都设置了默认值。

以后反对的简写形式:

⚠️留神:对于 non-persistent(非长久化)topic,连接器不反对简写名称,non-persistent://public/default/my-topic不可简写成 non-persistent://my-topic

订阅分区构造的 Topic

对于 Pulsar 而言,Topic 分区也是一种 Topic。Pulsar 会将一个有分区的 Topic 在外部依照分区的大小拆分成等量的无分区 Topic。例如,在 Pulsar 的 sample 租户上面的 flink 命名空间外面创立了一个有 3 个分区的 topic,给它起名为 simple-string。能够在 Pulsar 上看到如下的 topic 列表:

这意味着,用户能够用下面的子 topic 去间接生产分区外面的数据,不须要再去基于下层的父 topic 去生产全副分区的数据。例如:应用 PulsarSource.builder().setTopics(“sample/flink/simple-string-partition-1“, “sample/flink/simple-string-partition-2“) 将会只生产 topic sample/flink/simple-string 下面的分区 1 和 2 外面的音讯。

配置 Topic 正则表达式

后面提到了 Pulsar topic 有 persistentnon-persistent 两种类型,应用正则表达式生产数据的时候,连接器会尝试从正则表达式外面解析出音讯的类型。例如:PulsarSource.builder().setTopicPattern(“non-persistent://my-topic*“) 会解析出 non-persistent topic 类型。如果用户应用 topic 名称简写的形式,连接器会应用默认的音讯类型 persistent

如果想用正则去生产 persistent 和 non-persistent 类型的 topic,须要应用 RegexSubscriptionMode 定义 topic 类型,例如:setTopicPattern("topic-*",RegexSubscriptionMode.AllTopics)`。

解析音讯——反序列化器

反序列化器用于解析 Pulsar 音讯,连接器应用 PulsarDeserializationSchema来定义反序列化器。用户能够在 builder 类中应用 setDeserializationSchema(PulsarDeserializationSchema)办法配置反序列化器,它会解析 Pulsar 的 Message<byte[]> 实例。

如果用户只关怀音讯体的二进制字节流,并不需要其余属性来解析数据。能够间接应用预约义的 PulsarDeserializationSchema。Pulsar 连接器外面提供了 3 种预约义好的反序列化器:

  • 应用 Pulsar 的 Schema 解析音讯。
// 根底数据类型
PulsarDeserializationSchema.pulsarSchema(Schema)
 
// 构造类型 (JSON, Protobuf, Avro, etc.)
PulsarDeserializationSchema.pulsarSchema(Schema, Class)
 
// 键值对类型
PulsarDeserializationSchema.pulsarSchema(Schema, Class, Class)
  • 应用 Flink 的 DeserializationSchema 解析音讯。
PulsarDeserializationSchema.flinkSchema(DeserializationSchema)
  • 应用 Flink 的 TypeInformation 解析音讯。
PulsarDeserializationSchema.flinkTypeInfo(TypeInformation, ExecutionConfig)

Pulsar 的 Message<byte[]> 蕴含了很多额定的属性。例如,音讯的 key、音讯发送工夫、音讯生产工夫、用户在音讯上自定义的键值对属性等。能够应用 Message<byte[]> 接口来获取这些属性。

如果用户须要基于这些额定的属性来解析一条音讯,能够实现 PulsarDeserializationSchema 接口,并肯定要确保 PulsarDeserializationSchema.getProducedType()办法返回的 TypeInformation 是正确的后果。Flink 应用 TypeInformation 将解析进去的后果序列化传递到上游算子。

订阅模式

Pulsar 共反对四种订阅模式:exclusive(独占)、shared(共享)、failover(灾备)、key_shared(key 共享)。以后 Pulsar 连接器外面,独占  和  灾备 的实现没有区别,如果 Flink 的一个 reader 挂了,连接器会把所有未生产的数据交给其余的 reader 来生产数据。默认状况下,如果没有指定订阅类型,连接器应用共享订阅类型(SubscriptionType.Shared)。

// 名为 "my-shared" 的共享订阅
PulsarSource.builder().setSubscriptionName("my-shared")
 
// 名为 "my-exclusive" 的独占订阅
PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive)

如果想在 Pulsar 连接器外面应用 key 共享 订阅,须要提供 RangeGenerator 实例。RangeGenerator 会生成一组音讯 key 的 hash 范畴,连接器会基于给定的范畴来生产数据。Pulsar 连接器也提供了一个名为 UniformRangeGenerator 的默认实现,它会基于 flink Source Connector 的并行度将 hash 范畴均分。

起始生产地位

连接器应用 setStartCursor(StartCursor) 办法给定开始生产的地位。内置的生产地位有:

  • 从 topic 外面最早的一条音讯开始生产。
StartCursor.earliest()
  • 从 topic 外面最新的一条音讯开始生产。
StartCursor.latest()
  • 从给定的音讯开始生产。
StartCursor.fromMessageId(MessageId)
  • 与前者不同的是,给定的音讯能够跳过,再进行生产。
StartCursor.fromMessageId(MessageId,boolean)

从给定的音讯工夫开始生产。

StartCursor.fromMessageTime(long)

每条音讯都有一个固定的序列号,这个序列号在 Pulsar 上有序排列,其蕴含了 ledger、entry、partition 等原始信息,用于在 Pulsar 底层存储上查找到具体的音讯。Pulsar 称这个序列号为 MessageId,用户能够应用 DefaultImplementation.newMessageId(long ledgerId, long entryId, int partitionIndex)创立它。

边界

Pulsar 连接器同时反对流式和批的生产形式,默认状况下,连接器应用流的形式生产数据。除非工作失败或者被勾销,否则连接器将继续生产数据。用户能够应用 setBoundedStopCursor(StopCursor)给定进行生产的地位,这种状况下连接器会应用批的形式进行生产。当所有 topic 分区都生产到了进行地位,Flink 工作就会完结。应用流的形式一样能够给定进行地位,应用 setUnboundedStopCursor(StopCursor)办法即可。内置的进行地位如下:

  • 永不进行。
StopCursor.never()
  • 进行于 Pulsar 启动时 topic 外面最新的那条数据。
StopCursor.latest()
  • 进行于某条音讯,后果里不蕴含此音讯。
StopCursor.atMessageId(MessageId)
  • 进行于某条音讯之后,后果里蕴含此音讯。
StopCursor.afterMessageId(MessageId)
  • 进行于某个给定的音讯工夫戳。
StopCursor.atEventTime(long)

其余配置项

除了后面提到的配置选项,连接器还提供了丰盛的选项供 Pulsar 专家应用,在 builder 类里通过 setConfig(ConfigOption<T>, T) 和 setConfig(Configuration) 办法给定 Pulsar 客户端、Pulsar API 的全副配置。具体参考其余配置项

动静分区发现

为了能在启动 Flink 工作之后还能发现在 Pulsar 上扩容的分区或者是新创建的 topic,连接器提供了动静分区发现机制。该机制不须要重启 Flink 工作。对选项 PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS设置一个正整数即可启用。

// 10 秒查问一次分区信息
PulsarSource.builder()
        .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 10000);

默认状况下,Pulsar 启用动静分区发现,查问距离为 30 秒。用户能够给定一个正数,将该性能禁用。如果应用批的形式生产数据,将无奈启用该性能。

事件工夫与 watermark

默认状况下,连接器应用 Pulsar Message<byte[]> 外面的工夫作为解析后果的工夫戳。用户能够应用 WatermarkStrategy 来自行解析出想要的音讯工夫,并向上游传递对应的水位线。

env.fromSource(pulsarSource, new CustomWatermarkStrategy(), "Pulsar Source With Custom Watermark Strategy")

定义 WatermarkStrategy 参考文档。

音讯确认

一旦在 topic 上创立了订阅,音讯便会存储在 Pulsar 里。即便没有消费者,音讯也不会被抛弃。只有当连接器同 Pulsar 确认此条音讯曾经被生产,该音讯才以某种机制会被移除。连接器反对四种订阅形式,它们的音讯确认形式也大不相同。

独占和灾备订阅

独占  和  灾备  订阅下,连接器应用累进式确认形式。确认某条音讯曾经被解决时,其后面被生产的音讯会主动被置为已读。Pulsar 连接器会在 Flink 实现检查点时将对应时刻生产的音讯置为已读,以此来保障 Pulsar 状态与 Flink 状态统一。如果用户没有在 Flink 上启用检查点,连接器能够应用周期性提交来将生产状态提交给 Pulsar,应用配置 PulsarSourceOptions.PULSAR_AUTO_COMMIT_CURSOR_INTERVAL 来进行定义。

须要留神的是,此种场景下,Pulsar 连接器并不依赖于提交到 Pulsar 的状态来做容错。音讯确认只是为了能在 Pulsar 端看到对应的生产解决状况。

共享和 key 共享订阅

共享 和 key 共享 须要顺次确认每一条音讯,所以连接器在 Pulsar 事务外面进行音讯确认,而后将事务提交到 Pulsar。首先须要在 Pulsar 的 borker.conf 文件外面启用事务:

transactionCoordinatorEnabled=true

连接器创立的事务的默认超时工夫为 3 小时,请确保这个工夫大于 Flink 检查点的距离。用户能够应用 PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS来设置事务的超时工夫。

如果用户无奈启用 Pulsar 的事务,或者是因为我的项目禁用了检查点,须要将 PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE选项设置为 true,音讯从 Pulsar 生产后会被立即置为已读。连接器无奈保障此种场景下的音讯一致性。连接器在 Pulsar 上应用日志的模式记录某个事务下的音讯确认,为了更好的性能,请缩短 Flink 做检查点的距离。

降级与问题诊断

降级步骤参阅降级应用程序和 Flink 版本。Pulsar 连接器没有在 Flink 端存储生产的状态,所有的生产信息都推送到了 Pulsar。

留神:

  • 不要同时降级 Pulsar 连接器和 Pulsar 服务端的版本。
  • 应用最新版本的 Pulsar 客户端来生产音讯。

Flink 只应用了 Pulsar 的 Java 客户端 和治理 API。应用 Flink 和 Pulsar 交互时如果遇到问题,很有可能与 Flink 无关,请先降级 Pulsar 的版本、Pulsar 客户端的版本,或者批改 Pulsar 的配置、Pulsar 连接器的配置来尝试解决问题。

分割咱们

欢送大家应用 Pulsar Flink Connector 并与咱们交换,独特优化这个批流一体的我的项目。目前社区已成立 Pulsar Flink Connector SIG(非凡兴趣小组),扫描下方 Bot 二维码,回复“Flink”退出 Pulsar Flink Connector SIG,与我的项目开发者交换。

关注公众号「Apache Pulsar」,获取干货与动静

退出移动版