关于flink:Flink-Hudi-在-Linkflow-构建实时数据湖的生产实践

48次阅读

共计 10224 个字符,预计需要花费 26 分钟才能阅读完成。

可变数据的解决始终以来都是大数据系统,尤其是实时零碎的一大难点。在调研多种计划后,咱们抉择了 CDC to Hudi 的数据摄入计划,目前在生产环境可实现分钟级的数据实时性,心愿本文所述对大家的生产实践有所启发。内容包含:

  1. 背景
  2. CDC 和数据湖
  3. 技术挑战
  4. 成果
  5. 将来打算
  6. 总结

一、背景

Linkflow 作为客户数据平台(CDP),为企业提供从客户数据采集、剖析到执行的经营闭环。每天都会通过一方数据采集端点(SDK)和三方数据源,如微信,微博等,收集大量的数据。这些数据都会通过荡涤,计算,整合后写入存储。使用者能够通过灵便的报表或标签对长久化的数据进行剖析和计算,后果又会作为 MA (Marketing Automation) 零碎的数据源,从而实现对特定人群的精准营销。

在 Linkflow 中,数据分为不可变数据(Immutable Data)和可变数据(Mutable Data),这些数据都会参加剖析,波及到的表大略有十几张,其中不可变数据的数据量较大,能够达到数十亿级。如果放到传统大数据系统,不可变数据即为事实数据,可变数据为维度数据。但在真正的业务实际里,用户的天然属性,订单的金额和状态等都是可更新的,这些数据的数据量往往也十分可观,在咱们的零碎里此类数据也会达到亿级。对于可变数据之前始终都是通过关系型数据库 MySQL 进行治理,一来数据保护不便,二来业务对接容易。

但问题也不言而喻:

  • 数据碎片化,因为 MySQL 大表 online DDL 危险较大,随着业务复杂度的晋升,往往须要减少新的子表来扩大业务属性,也就是说一个残缺的用户数据会散落在多张表中,这对查问非常不敌对。
  • 多维度查问无奈实现,因为关系型数据库的劣势不是多维度查问,并且给所有字段都加索引也并不事实,所以须要一款可反对 OLAP 查问引擎的数据组件来撑持多维分析的业务场景。并且思考到将来可分别独立扩大的可能,咱们也优先思考计算和存储拆散的架构。

二、CDC 和数据湖

CDC(CHANGE DATA CAPTURE)是一种软件设计模式,用于确定和跟踪已变更的数据,以便能够对更改后的数据采取措施。其实早在两年前咱们就有应用 canal 冗余 MySQL 数据到异构存储的教训,只是过后没有意识到能够通过这种形式与大数据存储进行集成。在应用 canal 的过程中咱们发现了一些性能的问题,并且开源社区根本无人保护,所以在新架构启动前又调研了 Maxwell 和 Debezium,恰好关注到 Flink 母公司 Ververica 开源的我的项目 flink-cdc-connectors[1],该我的项目将 Debezium 作为 binlog 的同步引擎嵌入到 Flink 工作中,能够不便地在流工作中对 binlog 的音讯进行筛选、校验、数据整合和格局转换,并且性能优异。思考到将来又能够间接与行为数据进行双流 join,甚至通过 CEP 进行简略的风控,咱们最终抉择了 Debezium in Flink 的 CDC 计划。

因为 MySQL 中的数据主题很多,在流工作中咱们同时也做了数据路由,即不同主题的变动数据会路由到不同的 Kafka Topic 中,行将 Kafka 作为 ODS。这样做的益处很多,首先对于可变数据咱们能够清晰的察看到每次变动的过程,其次能够对数据进行回放,逐次变动的叠加后果便是最终的状态。

接下来要思考的就是数据存在哪里,联合上文提到的“计算存储拆散”准则,这也是数据湖提供的一个劣势,数据湖个别应用相似文件系统存储(对象存储或传统的 HDFS)来构建,恰好合乎咱们的预期。在比照了几种数据湖计划后,咱们抉择了 Apache Hudi,理由如下:

  • Hudi 提供了一个在 HDFS 中 upsert 的解决方案,即相似关系型数据库的应用体验,对于可更新数据十分敌对,并且也合乎 MySQL binlog 的语义。
  • 增量查问,能够很不便的获取最近 30 分钟,或者 1 天内发生变化的数据,这对于一些可叠加的离线计算工作十分敌对,不再须要针对全量数据进行计算,只须要针对变动数据进行计算,大大节俭了机器资源和工夫。
  • 能够实时同步元数据到 Hive,为“入湖即可查”发明了条件。
  • 对 COW 和 MOR 两种不同应用场景别离进行了优化。
  • Hudi 社区凋谢且迭代速度快,在其孵化阶段就被 AWS EMR 集成,而后被阿里云 DLA 数据湖剖析 [2]、阿里云 EMR[3] 以及腾讯云 EMR[4]集成,前景不错,同时 ApacheHudi 国内技术交换群探讨十分热烈,国内基于 Hudi 构建数据湖的公司越来越多。

在集成了 Hudi 后,咱们的架构演化成这样:

数据表都抉择了 COW(写时复制)模式,次要是思考到读多写少的特点,并且咱们须要查问过程尽可能地快,MOR(读时合并)的策略在查问端的性能还是要稍强劲一些,再加上对于数据时延并没有到亚秒级的要求,所以最终抉择了 COW。

最上层咱们应用了 Presto 作为剖析引擎,提供数据即席查问的能力。因为咱们应用的 Hudi 版本是 0.6.0,与 Flink 的集成还没有公布,所以咱们不得不采纳 Flink + Spark 双擎的策略,应用 Spark Streaming 将 Kafka 中的数据写入 Hudi。

三、技术挑战

在进行了 PoC 后咱们确定了上图所示的架构设计,但在真正的实现过程中,也遇到了不小的挑战。

3.1 CDC 运行模式定制

■ 全量模式

Debezium 的一大劣势就是“批流一体”,snapshot 阶段就是通过扫描全表将数据回放成与 binlog 增量日志内容统一的音讯,这样使用者就能够应用雷同的代码同时解决全量和增量数据。然而在咱们的业务实际中,如果历史表的个数和表内的数据都很多,就会造成 snapshot 阶段继续的工夫十分长,一旦这个过程出现意外中断,那么下次须要从第一张表开始从新扫描。假如残缺的 snapshot 过程须要数天,那么这种规模的“重试”咱们是无奈承受的,所以须要有相似断点续传的机制,在查问了 Debezuim 官网文档后咱们发现了 snapshot.include.collection.list 参数。

An optional, comma-separated list of regular expressions that match names of schemas specified
 in table.include.list for which you want to take the snapshot.

所以能够在 snapshot 中断后,通过该参数传入残余待扫描的表,从而实现“接力”的能力。但这里须要留神的一点是,无论 snapshot 阶段重试几次,增量的 binlog 位点都必须是首次 snapshot 时的位点,否则就会丢数据。这也带来了另一个问题,如果中断后再接力直到 snapshot 实现,Debezuim 是会主动开始从本次(而不是首次)snapshot 时的 binlog 位点间接开始增量同步数据,这不是咱们须要的后果,咱们须要 snapshot 完结后工作间接终止。

翻了很多 Debezuim 的文档并没有发现这样的性能,然而在翻阅源码的过程中看到其实是有方法的。

/**
*Perform a snapshot andthen stop before attempting to read the binlog.
*/
INITIAL_ONLY("initial_only",true);
// MySqlConnectorTask.java
if(taskContext.isInitialSnapshotOnly()){logger.warn("This connector will only perform a snapshot, and will stop after that completes.");
    chainedReaderBuilder.addReader(newBlockingReader("blocker",
"Connector has completed all of its work but will continue in the running state. It can be shut down at any time."));
    chainedReaderBuilder
.completionMessage("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate.");
}

即在 initial_only 的模式下 Debezuim 会应用 BlockingReader 代替 BinlogReader 将线程阻塞,不再进行增量生产。

■ 增量模式

如果 snapshot 完结后工作主动进行,那么就须要手动重启工作持续增量同步,同时增量模式须要反对指定 MySQL 的 binlog 文件和具体的位点(position)。Debezuim 自带 schema_only_recovery 模式,能够手动设置参数。

DebeziumOffset specificOffset =newDebeziumOffset();
Map<String,Object> sourceOffset =newHashMap<>();
sourceOffset.put("file", startupOptions.specificOffsetFile);
sourceOffset.put("pos", startupOptions.specificOffsetPos);
specificOffset.setSourceOffset(sourceOffset);

因为咱们之前应用的 ververica / flink-cdc-connectors 版本是 1.2.0,没有凋谢 Debezuim 的 schema_only_recovery 模式,所以批改了相干源码。目前 1.3.0 版本已反对,在 MySQLSourceBuilder 中作为启动参数传入即可。

3.2 局部更新(Patch Update)

这里有必要解释一下什么是笼罩更新什么是局部更新,这其实也对应于 RESTful 的语义,put 就是笼罩更新,要求调用方提供的肯定是一个残缺的资源对象,实践上说,如果用了 put,但却没有提供残缺的资源对象,那么缺了的那些字段应该被清空。patch 对应局部更新,或部分更新,调用方只提供须要更新的字段,而不提供残缺的资源对象,益处是能够节俭带宽。

在 Hudi 中默认只反对笼罩更新,但对于咱们业务而言,采集端点上报的数据不可能蕴含残缺的业务对象,如用户年龄的增长,在上报时只会蕴含一个字段的信息。

{
  "id": 123,
  "ts": 1435290195610,
  "data": {"age": 25}
}

这就须要先找出 rowkey=123 的数据内容,并与待更新内容进行合并后再写入。合并时如果待写入数据的字段不为空,那么进行归并。Hudi 默认采纳 OverwriteWithLatestAvroPayload 的 combineAndGetUpdateValue 办法。

Simply overwrites storage with latest delta record

为了向前兼容,数据开发共事 Karl 新增了 OverwriteNonDefaultsWithLatestAvroPayload 类,覆写了 combineAndGetUpdateValue 来解决上述问题,并已反馈给社区 [HUDI-1255] Add new Payload (OverwriteNonDefaultsWithLatestAvroPayload) for updating specified fields in storage[5] , 其实社区内相似需要还有很多,如 [HUDI-1160] Support update partial fields for CoW table[6],咱们也期待有更多的开发者能够将这个性能做的更加欠缺。

当然这里也存在限度,如果真的心愿将某个字段更新为空值,那么应用 OverwriteNonDefaultsWithLatestAvroPayload 是无奈实现的。

同时咱们也对社区的 Compaction 策略了补充,增加了基于工夫的 Compaction 调度策略,即不仅仅能够基于增量提交数进行 Compaction,还能够基于工夫做 Compaction,该工作也曾经反馈给社区,参见[HUDI-1381] Schedule compaction based on time elapsed[7],这对于想要在指定工夫内进行 Compaction 提供了更高的灵活性。

3.3 一批次内雷同 rowkey 数据的归并

因为 CDC 的一个特色就是实时监听数据的变动,例如一个订单的状态在几分钟内可能就会产生若干次扭转,再加上 Spark Streaming 微批处理的特点,有较大的概率会在一个工夫窗口获取大量雷同 rowkey 的数据,不同 rowkey 对应局部数据,因而咱们在 Streaming 工作中对一批次雷同 rowkey 的数据进行了归并,整体相似 Hudi 应用 Bloom 判断 rowkey 是否存在的逻辑。特地须要留神的是时序问题,数据的叠加必须严格依照 ts 工夫,否则就会呈现旧版本的数据笼罩新版本的状况。

3.4 Schema evolution

因为业务的倒退以及灵活性的要求,表字段扩大(Schema evolution)肯定是刚需。Hudi 恰好也思考到了这一点,咱们从 Hudi 的 wiki[8] 上理解到:

What's Hudi's schema evolution story 

Hudi uses Avro as the internal canonical representation for records, primarily due to its nice schema compatibility & evolution[9] properties. This is a key aspect of having reliability in your ingestion or ETL pipelines. As long as the schema passed to Hudi (either explicitly in DeltaStreamer schema provider configs or implicitly by Spark Datasource's Dataset schemas) is backwards compatible (e.g no field deletes, only appending new fields to schema), Hudi will seamlessly handle read/write of old and new data and also keep the Hive schema up-to date.

既然 Avro 格局自身就反对 Schema evolution,天然地 Hudi 也反对。

  1. Schema evolution 大抵能够分为 4 种:
  2. Backwards compatible: 向后兼容,用新的 schema 能够读取旧数据,如果字段没值,就用 default 值,这也是 Hudi 提供的兼容形式。
  3. Forwards compatible: 向前兼容,用旧 schema 能够读取新数据,Avro 将疏忽新加的字段,如果要向前兼容,删掉的字段必须要有默认值。
  4. Full compatible: 反对向前兼容,向后兼容,如果要全兼容,那么就须要只增加有默认值的字段,并且只移除有默认值的字段。
  5. No Compatibility Checking:这种状况一般来说就是须要强制扭转某个字段的类型,此时就须要做全量的数据迁徙,不举荐。

在生产实践中,咱们通过批改 schema 就能够实现字段扩大的需要。但随之而来也会发现一些问题,比方字段过多会造成单个文件很大(冲破 128mb),写入很慢,极其状况下 1000 多列的文件写入会达到小时级别。后续咱们也在寻找一些优化计划,例如字段回收或者垂直分表,将单文件内的字段数量升高。

3.5 同时查问和写入导致异样

这是呈现在查问端的问题,咱们应用 Presto 查问 Hive 表时会呈现 Hudi 元数据文件找不到的异样,进而导致 Hudi 外部的 NPE。

Error checking path :hdfs://hudipath/.hoodie_partition_metadata, under folder: hdfs://hudipath/event/202102; nested exception is java.sql.SQLException: Query failed (#20210309_031334_04606_fipir)

基于上述信息,狐疑是在查问的同时,元数据信息被批改而导致的问题。在求助社区后,咱们将 HoodieROTablePathFilter 中的 hoodiePathCache 改为线程平安的 ConcurrentHashMap,从新打包失去 hudi-hadoop-mr.jar 和 hudi-common.jar,替换到 presto/plugin/hive-hadoop2 的目录下,重启 Presto。后续没有发现 NPE 的状况。

四、成果

再来回顾一下咱们在架构之初对于数据湖的构想:

  • 反对可变数据。
  • 反对 schema evolution。
  • 计算存储拆散,反对多种查问引擎。
  • 反对增量视图和工夫旅行。

这些个性 Hudi 根本都实现了,新架构实现后比照之前的零碎,数据时延和离线解决性能都有了显著晋升,具体表现在:

  1. 实时数据写入过程简化,之前的更新操作实现繁琐,当初开发过程中根本不必关怀是新增还是更新操作,大大降低了开发人员的心智累赘。
  2. 实时数据入湖到可查问的工夫缩短,尽管咱们的采纳的是 COW 的表模式,但理论测试发现入湖到可查问的时效性并不低,根本都在分钟级。
  3. 离线解决性能晋升,基于 Hudi 的增量视图个性,每天的离线工作能够很容易的获取过来 24h 变动的数据,解决的数据量级变小,进而带来更短的解决工夫。

五、将来打算

5.1 Flink 集成

之前提到“无可奈何”的双擎策略,事实上是十分苦恼的,运维和开发方式都无奈对立,所以咱们对 Hudi 官网集成 Flink 的停顿十分关注,并且近期也有了新的 RFC – 24: Hoodie Flink Writer Proposal[10],同时也曾经在 Hudi 0.8.0 版本深度集成了 Flink 能力,期待将来的 Flink 集成版本在性能上能够有很大的晋升,同时也能够将解决引擎对立成 Flink,不再采纳双引擎模式。

5.2 并发写

因为 Hudi 文件为了保障元数据的一致性,在 0.8.0 版本之前不反对并发写。但在理论利用中,数据湖中的很多数据不光是实时的数据,还有很多是须要通过离线计算取得的,如果某张表的一部分字段是 CDC 的间接反映,另一部分字段是离线工作的计算结果,这就会带来并发写的需要。

咱们目前采纳两种形式来躲避:

垂直分表,行将两局部文件离开,CDC 数据通过 Spark Streaming 写入,离线计算结果写入另一个文件,防止并发写。

模仿成 CDC 音讯回写 Kafka,为了查问性能不能分表的状况下,离线计算结果会模仿成 CDC 音讯写入 Kafka,再通过 Spark Streaming 写入 Hudi。但毛病也是很显著的,就是离线工作的后果反映到最终存储的时延较长。

最近 Hudi 公布的 0.8.0 版本曾经反对了并发写模式,其基于乐观锁,文件级别的冲突检测可很好的满足并发写需要,后续会测试看下成果。

5.3 性能优化

上文也提到了诸如大文件,GC 频繁的一些问题,综合来看咱们发现写入的瓶颈次要产生在两个中央。

■ 索引

因为目前咱们采纳 HoodieGlobalBloomIndex,导致建设索引和查问索引的工夫都较长,官网提供了 3 种索引实现:

How does the Hudi indexing work 
& what are its benefits? 

The indexing component is a key part of the Hudi writing and it maps a given recordKey to a fileGroup inside Hudi consistently. This enables faster identification of the file groups that are affected/dirtied by a given write operation.
Hudi supports a few options for indexing as below

• HoodieBloomIndex (default)
 : Uses a bloom filter and ranges information placed in the footer of parquet/base files (and soon log files as well)
•HoodieGlobalBloomIndex : The default indexing only enforces uniqueness of a key inside a single partition i.e the user is expected to know the partition under which a given record key is stored. This helps the indexing scale very well for even very large datasets[11]. However, in some cases, it might be necessary instead to do the de-duping/enforce uniqueness across all partitions and the global bloom index does exactly that. If this is used, incoming records are compared to files across the entire dataset and ensure a recordKey is only present in one partition.
•HBaseIndex : Apache HBase is a key value store, typically found in close proximity to HDFS. You can also store the index inside HBase, which could be handy if you are already operating HBase.

You can implement your own index if you’d like, by subclassing the HoodieIndex class and configuring the index class name in configs.
在与社区的探讨后,咱们更偏向于应用 HBaseIndex 或相似的 k-v store 来治理索引。

■ 更新

upsert 慢除了某些文件较大的问题,另一方面也与 CDC 的特点无关。可变数据的更新范畴其实是不可预测的,极其状况下待更新的 1000 条数据属于 1000 个不同的文件时,更新的性能很难通过代码优化的形式晋升,只能减少 cpu 资源进步解决并行度。咱们会从几个方面着手:

  1. 参数调整,要是否有方法均衡文件的数量和大小。
  2. 尝试局部业务表应用 MOR 模式,MOR 在更新时会先将数据写入日志文件,之后再合并到 Parquet,实践上能够升高覆写 Parquet 文件的频率。
  3. 探讨业务上的 trade-off 来换取更好的写入速度。

六、总结

将来有待优化的工作还有很多,咱们也会积极参与社区建设,尝试新的个性,为用户带来更好的数据服务体验,最初感激 Flink CDC Connectors 和 Apache Hudi 的开发者和社区维护者。

作者|Dean,Linkflow 首席架构师

援用链接

[1] flink-cdc-connectors: https://github.com/ververica/…
[2] 阿里云 DLA 数据湖剖析: https://help.aliyun.com/docum…
[3] 阿里云 EMR: https://help.aliyun.com/docum…
[4] 腾讯云 EMR: https://cloud.tencent.com/doc…
[5] [HUDI-1255] Add new Payload(OverwriteNonDefaultsWithLatestAvroPayload) for updating specified fields in storage: https://github.com/apache/hud…
[6] [HUDI-1160] Support update partial fields for CoW table: https://github.com/apache/hud…
[7] [HUDI-1381] Schedule compaction based on time elapsed: https://github.com/apache/hud…
[8] wiki: https://cwiki.apache.org/conf…
[9] schema compatibility & evolution: https://docs.confluent.io/cur…
[10] RFC – 24: Hoodie Flink Writer Proposal: https://cwiki.apache.org/conf…
[11] very large datasets: https://eng.uber.com/uber-big…
[12] ding.wang@linkflowtech.com: mailto:ding.wang@linkflowtech.com

正文完
 0