本文介绍了百信银行实时计算平台的建设状况,实时数据湖构建在 Hudi 上的计划和实际办法,以及实时计算平台集成 Hudi 和应用 Hudi 的形式。内容包含:
- 背景
- 百信银行基于 Flink 的实时计算平台设计与实际
- 百信银行实时计算平台与实时数据湖的集成实际
- 百信银行实时数据湖的将来
- 总结
一、背景
百信银行,全称为“中信百信银行股份有限公司”,是首家获批独立法人模式的直销银行。作为首家国有控股的互联网银行,相比于传统金融行业,百信银行对数据敏捷性有更高的要求。
数据麻利,不仅要求数据的准确性,还要求数据达到的实时性,和数据传输的安全性。为了满足我行数据敏捷性的需要,百信银行大数据部承当起了建设实时计算平台的职责,保障了数据疾速,平安且规范得在线送达。
受害于大数据技术的倒退和更新迭代,目前广为人知的批流一体的两大支柱别离是:“对立计算引擎”与“对立存储引擎”。
- Flink,作为大数据实时计算畛域的佼佼者,1.12 版本的公布让它进一步晋升了 对立计算引擎 的能力;
- 同时随着数据湖技术 Hudi 的倒退,对立存储引擎 也迎来了新一代技术改革。
在 Flink 和 Hudi 社区倒退的根底上,百信银行构建了实时计算平台,同时将实时数据湖 Hudi 集成到实时计算平台之上。联合行内数据治理的思路,实现了数据实时在线、安全可靠、规范对立,且有麻利数据湖的指标。
二、百信银行基于 Flink 的实时计算平台设计与实际
1. 实时计算平台的定位
实时计算平台作为行级实时计算平台,由大数据 IaaS 团队自主研发,是一款实现了实时数据”端到端“的在线数据加工解决的企业级产品。
- 其外围性能具备了实时采集、实时计算、实时入库、简单工夫解决、规定引擎、可视化治理、一键配置、自主上线,和实时监控预警等。
- 目前其反对的场景有实时数仓、断点召回、智能风控、对立资产视图、反欺诈,和实时特色变量加工等。
- 并且,它服务着行内小微、信贷、反欺诈、消金、财务,和危险等泛滥业务线。
截止目前,在线稳固运行的有 320+ 的实时工作,且在线运行的工作 QPS 日均达到 170W 左右。
2. 实时计算平台的架构
依照性能来划分的话,实时计算平台的架构次要分为三层:
■ 1)数据采集层
采集层目前次要分为两个场景:
-
第一个场景是采集 MySQL 备库的 Binlog 日志到 Kafka 中。我行所应用的数据采集计划并没有采纳业界广泛用的如 Canal,Debezium 等现有的 CDC 计划。
1、因为咱们的 MySQL 版本为百信银行外部的版本,Binlog 协定有所不同,所以现有的技术计划不能很好的反对兼容咱们获取 Binlog 日志。2、同时,为了解决咱们数据源 MySQL 的备库随时可能因为多机房切换,而造成采集数据失落的状况。咱们自研了读取 MySQL Binlog 的 Databus 我的项目,咱们也将 Databus 逻辑转化成了 Flink 应用程序,并将其部署到了 Yarn 资源框架中,使 Databus 数据抽取能够做到高可用,且资源可控。
-
第二个场景是,咱们对接了第三方的利用,这个第三方利用会将数据写入 Kafka,而写入 Kafka 有两种形式:
1、一种形式是根据咱们定义的 Json shcema 协定。
(UMF 协定:{col_name:””,umf_id”:””,”umf_ts”:,”umf_op_”:”i/u/d”})
协定定义了”惟一 id”,”工夫戳“和”操作类型“。依据此协定,用户能够指定对该音讯的操作类型,别离是 “insert”,”update” 和 “delete”,以便上游对音讯进行针对性解决。2、另外一种形式,用户间接把 JSON 类型的数据写到 kafka 中,不辨别操作类型。
■ 2)数据计算转换层
生产 Kafka 数据进行一层转换逻辑,反对用户自定义函数,将数据标准化,做敏感数据的脱敏加密等。
■ 3)数据存储层
数据存储到 HDFS,Kudu,TiDB,Kafka,Hudi,MySQL 等贮存介质中。
在上图所示的架构图中,咱们能够看到整体实时计算平台反对的次要性能有:
-
开发层面:
1、反对标准化的 DataBus 采集性能,该性能对于反对 MySQL Binglog 同步到 Kafka 做了同步适配,不须要用户干涉配置过多。用户只须要指定数据源 MySQL 的实例就能够实现到 Kafka 的标准化同步。2、反对用户可视化编辑 FlinkSQL。3、反对用户自定义 Flink UDF 函数。4、反对简单事件处理(CEP)。5、反对用户上传打包编译好 Flink 应用程序。
-
运维层面:
1、反对不同类型工作的状态治理,反对 savepoint。2、反对端到端的提早监控,告警。
在实时计算平台降级迭代的过程中,社区 Flink 版本之间存在一些向下不兼容的状况。为了平滑的降级 Flink 版本,咱们对计算引擎的多版本模块进行对立的形象,将多版本之间做了严格的 JVM 级别隔离,使版本之间不会产生 Jar 包抵触,Flink Api 不兼容的状况。
如上图所示,咱们将不同的 Flink 版本封装到一个独立的虚拟机中,应用 Thrift Server 启动一个独立的 JVM 虚拟机,每个版本的 Flink 都会有一个独立的 Thrift Server。在应用的过程中,只有用户显示指定的 Flink 版本,Flink 应用程序就会被指定的 Thrift Server 启动。同时,咱们也将实时计算的后端服务嵌入一个罕用的 Flink 版本,防止因为启动 Thrift Server 而占用过多的启动工夫。
同时为了满足金融零碎高可用和多备的需要,实时计算平台也开发了多 Hadoop 集群的反对,反对实时计算工作在失败后能够迁徙到备集群下来。整体的计划是,反对多集群 checkpoint,savepoint,反对工作失败后,能够在备机房重启实时工作。
三、百信银行实时计算平台与实时数据湖集成实际
在介绍本内容之前,咱们先来理解一些我行目前在数据湖的现状。目前的实时数据湖,我行仍然采纳支流的 Lambda 架构来构建数据仓库。
1. Lambda
Lambda 架构下,数仓的毛病:
- 同样的需要,开发和保护两套代码逻辑:批和流两套逻辑代码都须要开发和保护,并且须要保护合并的逻辑,且需同时上线;
- 计算和存储资源占用多:同样的计算逻辑计算两次,整体资源占用会增多;
- 数据具备二义性:两套计算逻辑,实时数据和批量数据常常对不上,准确性难以分辨;
- 重用 Kafka 音讯队列:Kafka 保留往往依照天或者月保留,不能全量保留数据,无奈应用现有的 adhoc 查问引擎剖析。
2. Hudi
为了解决 Lambda 架构的痛点,我行筹备了新一代的数据湖技术架构,同时咱们也花大量的工夫调研了现有的数据湖技术,最终抉择 Hudi 作为咱们的存储引擎。
- Update / Delete 记录:Hudi 应用细粒度的文件 / 记录级别索引,来反对 Update / Delete 记录,同时还提供写操作的事务保障,反对 ACID 语义。查问会解决最初一个提交的快照,并基于此输入后果;
- 变更流:Hudi 对获取数据变更提供了流的反对,能够从给定的工夫点获取给定表中已 updated / inserted / deleted 的所有记录的增量流,能够查问不同工夫的状态数据;
- 技术栈对立:能够兼容咱们现有的 adhoc 查问引擎 presto,spark。
- 社区更新迭代速度快:曾经反对 Flink 两种不同形式的的读写操作,如 COW 和 MOR。
在新的架构中能够看到,咱们将实时和批处理贴源层的数据全副写到 Hudi 存储中,并从新写入到新的数据湖层 datalake(Hive 的数据库)。出于历史的起因,为了兼容之前的数据仓库的模型,咱们仍然保留之前的 ODS 层,历史的数仓模型放弃不变,只不过 ODS 贴源层的数据须要从 datalake 层获取。
-
首先,咱们能够看到,对于新的表的入仓逻辑,咱们通过实时计算平台应用 Flink 写入到 datalake 中(新的贴源层,Hudi 格局存储),数据分析师和数据科学家,能够间接应用 datalake 层的数据进行数据分析和机器学习建模。如果数据仓库的模型须要应用 datalake 的数据源,须要一层转换 ODS 的逻辑,这里的转换逻辑分为两种状况:
1、第一种,对于增量模型,用户只须要将最新 datalake 的分区应用快照查问放到 ODS 中即可。2、第二种,对于全量模型,用户须要把 ODS 前一天的快照和 datalake 最新的快照查问的后果进行一次合并,造成最新的快照再放到 ODS 以后的分区中,以此类推。
咱们这么做的起因是,对于现有的数仓模型不必革新,只是把 ODS 的数据起源换成 datalake,时效性强。同时满足了数据分析和数据科学家准实时获取数据的诉求。
-
另外,对于原始的 ODS 存在的数据,咱们开发了将 ODS 层的数据进行了一次初始化入 datalake 的脚本。
1、如果 ODS 层数据每天是全量的快照,咱们只将最新的一次快照数据初始化到 datalake 的雷同分区,而后实时入 datalake 的链路接入;2、如果 ODS 层的数据是增量的,咱们临时不做初始化,只在 datalake 中从新建一个实时入湖的链路,而后每天做一次增量日切到 ODS 中。
- 最初,如果是一次性入湖的数据,咱们应用批量入湖的工具导入到 datalake 中即可。
整体湖仓转换的逻辑如图:
3. 技术挑战
-
在咱们调研的初期,Hudi 对 Flink 的反对不是很成熟,咱们对 Spark – StrunctStreaming 做了大量的开发和测试。从咱们 PoC 测试后果上看,
1、如果应用无分区的 COW 写入的形式,在千万级写入量的时候会发现写入越来越慢;2、起初咱们将无分区的改为增量分区的形式写入,速度晋升了很多。
之所以会产生这个问题,是因为 spark 在写入时会读取 basefile 文件索引,文件越大越多,读取文件索引就会越慢,因而会产生写入越来越慢的状况。
-
同时,随着 Flink 对 hudi 反对越来越好,咱们的指标是打算将 Hudi 入湖的性能集成到实时计算平台。因而,咱们把实时计算平台对 Hudi 做了集成和测试,期间也遇到一些问题,典型的问题有:
1、类抵触 2、不能找到 class 文件 3、rocksdb 抵触
为了解决这些不兼容的问题,咱们将对 Hudi 的依赖,从新结构了一个独立的模块,这个工程只是把 Hudi 的依赖打包成一个 shade package。
4、当有依赖抵触时,咱们会把 Flink 模块相干或者 Hudi 模块相干的抵触依赖 exclude 掉。5、而如果有其余依赖包找不到的状况,咱们会把相干的依赖通过 pom 文件引入进来。
- 在应用 Hudi on Flink 的计划中,也遇到了相干的问题,比方,checkpoint 太大导致 checkpoint 工夫过长而引起的失败。这个问题,咱们设置状态的 TTL 工夫,把全量 checkpoint 改为增量 checkpoint,且进步并行度来解决。
-
COW 和 MOR 的抉择。目前咱们应用的 Hudi 表以 COW 居多,之所以抉择 COW,
1、第一是因为咱们目前历史存量 ODS 的数据都是一次性导入到 datalake 数据表中,不存在写放大的状况。
2、另外一个起因是,COW 的工作流比较简单,不会波及到 compaction 这样的额定操作。
如果是新增的 datalake 数据,并且存在大量的 update,并且实时性要求较高的状况下,咱们更多的抉择 MOR 格局来写,尤其写 QPS 比拟大的状况下,咱们会采纳异步 compaction 的操作,防止写放大。除了这种状况外,咱们还是会更偏向以 COW 的格局来写。
四、百信银行实时数据湖的将来
在我行实时数据湖的架构中,咱们的指标是将实时数仓的整个链路构建在 Hudi 之上,架构体系如图:
咱们整体的指标布局是代替 kafka,把 Hudi 作为两头存储,将数仓建设在 Hudi 之上,并以 Flink 作为流批一体计算引擎。这样做的益处有:
- MQ 不再负责实时数据仓库存储的两头存储介质,而 Hudi 存储在 HDFS 上,能够存储海量数据集;
- 实时数据仓库中间层能够应用 OLAP 剖析引擎查问两头后果数据;
- 真正意义上的批流一体,数据 T+1 提早的问题失去解决;
- 读时 Schema 不再须要严格定义 Schema 类型,反对 schema evolution;
- 反对主键索引,数据查问效率数倍减少,并且反对 ACID 语义,保证数据不反复不失落;
- Hudi 具备 Timeline 的性能,能够更多存储数据两头的状态数据,数据齐备性更强。
五、总结
本文介绍了百信银行实时计算平台的建设状况,实时数据湖构建在 Hudi 上的计划和实际办法,以及实时计算平台集成 Hudi 和应用 Hudi 的形式。
在应用 Hudi 的过程中,也遇到一些问题,由衷感谢社区同学的帮忙。特别感谢社区 Danny chan,leesf 解疑答惑。在实时数据湖架构体系下,构建咱们实时数仓,流批一体计划还是在摸索中。