乐趣区

关于Flink:37-手游基于-Flink-CDC-Hudi-湖仓一体方案实践

本文作者是 37 手游大数据开发徐润柏,介绍了 37 手游为何抉择 Flink 作为计算引擎,并如何基于 Flink CDC + Hudi 构建新的湖仓一体计划,次要内容包含:

  1. Flink CDC 基本知识介绍
  2. Hudi 基本知识介绍
  3. 37 手游的业务痛点和技术计划选型
  4. 37 手游湖仓一体介绍
  5. Flink CDC + Hudi 实际
  6. 总结

一、Flink-CDC 2.0

Flink CDC Connectors 是 Apache Flink 的一个 source 端的连接器,目前 2.0 版本反对从 MySQL 以及 Postgres 两种数据源中获取数据,2.1 版本社区确定会反对 Oracle,MongoDB 数据源。

Fink CDC 2.0 的外围 feature,次要体现为实现了以下三个十分重要的性能:

  • 全程无锁,不会对数据库产生须要加锁所带来的危险;
  • 多并行度,全量数据的读取阶段反对程度扩大,使亿级别的大表能够通过加大并行度来放慢读取速度;
  • 断点续传,全量阶段反对 checkpoint,即便工作因某种原因退出了,也可通过保留的 checkpoint 对工作进行复原实现数据的断点续传。

Flink CDC 2.0 详解外围改良

二、Hudi

Apache Hudi 目前被业内形容为围绕数据库内核构建的 流式数据湖平台 (Streaming Data Lake Platform)。

因为 Hudi 领有良好的 Upsert 能力,并且 0.10 Master 对 Flink 版本反对至 1.13.x,因而咱们抉择通过 Flink + Hudi 的形式为 37 手游的业务场景提供分钟级 Upsert 数据的剖析查问能力。

三、37 手游的业务痛点和技术计划选型

1. 旧架构与业务痛点

1.1 数据实时性不够

  • 日志类数据通过 sqoop 每 30min 同步前 60min 数据到 Hive;
  • 数据库类数据通过 sqoop 每 60min 同步当天全量数据到 Hive;
  • 数据库类数据通过 sqoop 每天同步前 60 天数据到 Hive。

1.2 业务代码逻辑简单且难保护

  • 目前 37 手游还有很多的业务开发沿用 MySQL + PHP 的开发模式,代码逻辑简单且很难保护;
  • 雷同的代码逻辑,往往流解决须要开发一份代码,批处理则须要另开发一份代码,不能复用。

1.3 频繁重刷历史数据

  • 频繁地重刷历史数据来保证数据统一。

1.4 Schema 变更频繁

  • 因为业务需要,常常须要增加表字段。

1.5 Hive 版本低

  • 目前 Hive 应用版本为 1.x 版本,并且降级版本比拟艰难;
  • 不反对 Upsert;
  • 不反对行级别的 delete。

因为 37 手游的业务场景,数据 upsert、delete 是个很常见的需要。所以基于 Hive 数仓的架构对业务需要的满足度不够。

2. 技术选型

在同步工具的选型上思考过 Canal 和 Maxwell。但 Canal 只适宜增量数据的同步并且须要部署,保护起来绝对较重。而 Maxwell 尽管比拟轻量,但与 Canal 一样须要配合 Kafka 等音讯队列应用。比照之下,Flink CDC 能够通过配置 Flink connector 的形式基于 Flink-SQL 进行应用,非常笨重,并且完满符合基于 Flink-SQL 的流批一体架构。

在存储引擎的选型上,目前最热门的数据湖产品当属:Apache Hudi,Apache Iceberg 和 DeltaLake,这些在咱们的场景下各有优劣。最终,基于 Hudi 对上下游生态的凋谢、对全局索引的反对、对 Flink 1.13 版本的反对,以及对 Hive 版本的兼容性 (Iceberg 不反对 Hive1.x 的版本) 等起因,抉择了 Hudi 作为湖仓一体和流批一体的存储引擎。

针对上述存在的业务痛点以及选型比照,咱们的最终计划为:以 Flink1.13.2 作为计算引擎,依附 Flink 提供的流批对立的 API,基于 Flink-SQL 实现流批一体,Flink-CDC 2.0 作为 ODS 层的数据同步工具以及 Hudi-0.10 Master 作为存储引擎的湖仓一体,解决保护两套代码的业务痛点。

四、新架构与湖仓一体

37 手游的湖仓一体计划,是 37 手游流批一体架构的一部分。通过湖仓一体、流批一体,准实时场景下做到了:数据同源、同计算引擎、同存储、同计算口径。数据的时效性能够到分钟级,能很好的满足业务准实时数仓的需要。上面是架构图:

MySQL 数据通过 Flink CDC 进入到 Kafka。之所以数据先入 Kafka 而不是间接入 Hudi,是为了实现多个实时工作复用 MySQL 过去的数据,防止多个工作通过 Flink CDC 接 MySQL 表以及 Binlog,对 MySQL 库的性能造成影响。

通过 CDC 进入到 Kafka 的数据除了落一份到离线数据仓库的 ODS 层之外,会同时依照实时数据仓库的链路,从 ODS->DWD->DWS->OLAP 数据库,最初供报表等数据服务应用。实时数仓的每一层后果数据会准实时的落一份到离线数仓,通过这种形式做到程序一次开发、指标口径对立,数据对立。

从架构图上,能够看到有一步数据修改 (重跑历史数据) 的动作,之所以有这一步是思考到:有可能存在因为口径调整或者前一天的实时工作计算结果谬误,导致重跑历史数据的状况。

而存储在 Kafka 的数据有生效工夫,不会存太久的历史数据,重跑很久的历史数据无奈从 Kafka 中获取历史源数据。再者,如果把大量的历史数据再一次推到 Kafka,走实时计算的链路来修改历史数据,可能会影响当天的实时作业。所以针对重跑历史数据,会通过数据修改这一步来解决。

总体上说,37 手游的数据仓库属于 Lambda 和 Kappa 混搭的架构。流批一体数据仓库的各个数据链路有数据品质校验的流程。第二天对前一天的数据进行对账,如果前一天实时计算的数据无异样,则不须要修改数据,Kappa 架构曾经足够。

五、Flink CDC 2.0 + Kafka + Hudi 0.10 实际

1. 环境筹备

  • Flink 1.13.2
  • …/lib/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar (批改 Master 分支的 Hudi Flink 版本为 1.13.2 而后构建)
  • …/lib/hadoop-mapreduce-client-core-2.7.3.jar (解决 Hudi ClassNotFoundException)
  • ../lib/flink-sql-connector-mysql-cdc-2.0.0.jar
  • ../lib/flink-format-changelog-json-2.0.0.jar
  • ../lib/flink-sql-connector-kafka_2.11-1.13.2.jar

source 端 MySQL-CDC 表定义:

create table sy_payment_cdc (
  ID BIGINT,
  ...
  PRIMARY KEY(ID) NOT ENFORCED
) with(
  'connector' = 'mysql-cdc',
  'hostname' = '','port'='',
  'username' = '','password'='',
  'database-name' = '','table-name'='',
  'connect.timeout' = '60s',
  'scan.incremental.snapshot.chunk.size' = '100000',
  'server-id'='5401-5416'
);

值得注意的是:scan.incremental.snapshot.chunk.size 参数须要依据理论状况来配置,如果表数据量不大,应用默认值即可。

Sink 端 Kafka+Hudi COW 表定义:

create table sy_payment_cdc2kafka (
  ID BIGINT,
  ...
  PRIMARY KEY(ID) NOT ENFORCED
) with (
  'connector' = 'kafka',
  'topic' = '','scan.startup.mode'='latest-offset','properties.bootstrap.servers'='',
  'properties.group.id' = '','key.format'='',
  'key.fields' = '','format'='changelog-json'
);

create table sy_payment2Hudi (
  ID BIGINT,
  ...
  PRIMARY KEY(ID) NOT ENFORCED
)
PARTITIONED BY (YMD)
WITH (
  'connector' = 'Hudi',
  'path' = 'hdfs:///data/Hudi/m37_mpay_tj/sy_payment',
  'table.type' = 'COPY_ON_WRITE',
  'partition.default_name' = 'YMD',
  'write.insert.drop.duplicates' = 'true',
  'write.bulk_insert.shuffle_by_partition' = 'false',
  'write.bulk_insert.sort_by_partition' = 'false',
  'write.precombine.field' = 'MTIME',
  'write.tasks' = '16',
  'write.bucket_assign.tasks' = '16',
  'write.task.max.size' = '','write.merge.max_memory'=''
);

针对历史数据入 Hudi,能够抉择离线 bulk_insert 的形式入湖,再通过 Load Index Bootstrap 加载数据后接回增量数据。bulk_insert 形式入湖数据的唯一性依附源端的数据自身,在接回增量数据时也须要做到保证数据不失落。

这里咱们抉择更为简略的调整工作资源的形式将历史数据入湖。依附 Flink 的 checkpoint 机制,不论是 CDC 2.0 入 Kafka 期间还是 Kafka 入 Hudi 期间,都能够通过指定 checkpoint 的形式对工作进行重启并且数据不会失落。

咱们能够在配置 CDC 2.0 入 Kafka,Kafka 入 Hudi 工作时调大内存并配置多个并行度,放慢历史数据入湖,等到所有历史数据入湖后,再相应的调小入湖工作的内存配置并且将 CDC 入 Kafka 的并行度设置为 1,因为增量阶段 CDC 是单并行度,而后指定 checkpoint 重启工作。

依照下面表定义的参数配置,配置 16 个并行度,Flink TaskManager 内存大小为 50G 的状况下,单表 15 亿历史数据入至 Hudi COW 表理论用时 10 小时,单表 9 亿数据入至 Hudi COW 表理论用时 6 小时。当然这个耗时很大一部分是 COW 写放大的个性,在大数据量的 upsert 模式下耗时较多。

目前咱们的集群由 200 多台机器组成,在线的流计算工作总数有 200 多,总数据量靠近 2PB。

如果集群资源很无限的状况下,能够依据理论状况调整 Hudi 表以及 Flink 工作的内存配置,还能够通过配置 Hudi 的限流参数 write.rate.limit 让历史数据迟缓入湖。

之前 Flink CDC 1.x 版本因为全量 snapshot 阶段单并行度读取的起因,过后亿级以上的表在全量 snapshot 读取阶段就须要消耗很长时间,并且 checkpoint 会失败无奈保证数据的断点续传。

所以过后入 Hudi 是采纳先启动一个 CDC 1.x 的程序将此刻开始的增量数据写入 Kafka,之后再启动另外一个 sqoop 程序拉取以后的所有数据至 Hive 后,通过 Flink 读取 Hive 的数据写 Hudi,最初再把 Kafka 的增量数据从头生产接回 Hudi。因为 Kafka 与 Hive 的数据存在交加,因而数据不会失落,加上 Hudi 的 upsert 能力保障了数据惟一。

然而,这种形式的链路太长操作艰难,现在通过 CDC 2.0 在全量 snapshot 阶段反对多并行度以及 checkpoint 的能力,的确大大降低了架构的复杂度。

2. 数据比对

  • 因为生产环境用的是 Hive1.x,Hudi 对于 1.x 还不反对数据同步,所以通过创立 Hive 内部表的形式进行查问,如果是 Hive2.x 以上版本,可参考 Hive 同步章节;
  • 创立 Hive 内部表 + 预创立分区;
  • auxlib 文件夹增加 Hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar。
CREATE EXTERNAL TABLE m37_mpay_tj.`ods_sy_payment_f_d_b_ext`(
  `_hoodie_commit_time` string,
  `_hoodie_commit_seqno` string,
  `_hoodie_record_key` string,
  `_hoodie_partition_path` string,
  `_hoodie_file_name` string,
  `ID` bigint,
  ...
  )
PARTITIONED BY (`dt` string)
ROW FORMAT SERDE
  'org.apache.hadoop.Hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.Hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.Hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs:///data/Hudi/m37_mpay_tj/sy_payment'

最终查问 Hudi 数据 (Hive 内部表的模式) 与原来 sqoop 同步的 Hive 数据做比对失去:

  1. 总数统一;
  2. 按天分组统计数量统一;
  3. 按天分组统计金额统一。

六、总结

湖仓一体以及流批一体架构比照传统数仓架构次要有以下几点益处:

  • Hudi 提供了 Upsert 能力,解决频繁 Upsert/Delete 的痛点;
  • 提供分钟级的数据,比传统数仓有更高的时效性;
  • 基于 Flink-SQL 实现了流批一体,代码保护成本低;
  • 数据同源、同计算引擎、同存储、同计算口径;
  • 选用 Flink CDC 作为数据同步工具,省掉 sqoop 的保护老本。

最初针对频繁减少表字段的痛点需要,并且心愿后续同步上游零碎的时候可能主动退出这个字段,目前还没有完满的解决方案,心愿 Flink CDC 社区能在后续的版本提供 Schema Evolution 的反对。

Reference

[1] MySQL CDC 文档:https://ververica.github.io/f…

[2] Hudi Flink 答疑解惑:https://www.yuque.com/docs/sh…

[3] Hudi 的一些设计:https://www.yuque.com/docs/sh…

退出移动版