摘要:本文整顿自字节跳动基础架构工程师李国君,在 Streaming Lakehouse Meetup 的分享。幸福里业务是一种典型的交易、事务类型的业务场景,这种业务场景在实时数仓建模中遇到了诸多挑战。本次分享次要介绍幸福里业务基于 Flink & Paimon 构建流式数仓的实践经验,从业务背景、流批一体数仓架构、实际中遇到的问题和解决方案,借助 Paimon 最终能拿到的收益,以及将来布局方面进行介绍。
点击查看原文视频 & 演讲 PPT
一、业务背景
幸福里业务是字节旗下对于房产的业务线,围绕这个业务有很多针对 BP 反对的方向,其中最重要的方向之一就是工单零碎。工单零碎面向的用户是幸福里业务线一线的经纪人和门店经理等。如下图所示,咱们能够看下通过工单零碎,数据是如何产生和流转的。
首先由经纪人将已实现的代看工作提交工单,后续相应的门店经理会对该工单进行审核,在这个过程中就产生了两条数据,并将其更新到业务库的 Binlog 数据,作为实时数仓的数据源进行计算后生成数据报表或间接用于一些考核零碎。其中数据报表用于展现评估一线经纪人的工作是否达标等;考核零碎则用于门店经理为一线经纪人设定考核任务量的工作零碎,通过任务量规范主动反馈处分等。因而在以上利用的实时数仓建模上,咱们发现房产类业务有两个典型的特点:
- 准确性要求 100%,不能有数据失落和反复的状况产生。
- 须要全量计算,增量数据在 MQ 留存工夫无限,须要拿到全量数据 View 进行计算。
实时数仓建模特点
在理论业务的实时数仓 Pipeline 中,进入实时数仓前有多个数据源,每个数据源的特点也都不同,所以实时增量局部会存在 MQ 中,全量数据则是存在 Hive 中。
上图实时数仓中的每一层都是由一个 Flink Streaming SQL 串联起来的,DW 层的次要性能是把多个数据源进行 Join 打宽,通过计算出来的宽表实现间接输入进 MQ 中。因为 MQ 的留存工夫无限会造成一个小时级或天级的周期性工作,在一个周期完结后 MQ 中的数据最终会落到 Hive 里。DWM 这一层次要的作用是聚合计算,聚合计算的后果也会间接输入到 MQ 中。每一层的计算模式都和上一层雷同,实时数仓的计算结果会通过 Service 层服务于在线的数据利用,比方下面提到的数据报表和考核零碎。每层输入的 Hive 离线数据能够用于 BP 同学做数据排查 / 验证等离线查问工作。
回顾实时数仓的两个特点,一是准确性要求 100%,也就是说要求整个数仓的实时工作状态算子都要保护全量数据;二是须要全量计算,是指因为异构存储,实时数据存在 MQ,历史数据存在 Hive,那么就使得每层生产的 MQ 都须要实时生产增量数据和 Hive 全量数据。从开发工程师的视角这套实时数仓模型存在如下痛点:
在开发过程中须要时刻关注业务逻辑之外的逻辑,比方在 SQL 中对数据的反复解决;在数据去重过程中,应用繁多字段解决不够精准,须要引入 Nanotime 做非确定性计算来解决问题等。之所以存在以上问题,次要是因为在整个链路中,实时数据和离线数据是离开存储的,这种存储异构使得两局部的数据人造很难对齐。
这里的数据运维蕴含三个局部:数据排查、数据验证和数据勘误。存在的问题是,在数据排查和数据验证的过程中,如果发现某条链路上的某个 SQL 作业须要勘误。勘误实现的 SQL 的后果输入到 MQ 中,须要再将 MQ 中的数据落盘到存储中的操作会产生 T+1 的代价。另外在勘误过程中的两头后果回退会间接裸露给用户。
第二个问题是如上图紫色局部是一个简化的链路,而在理论生产过程中的复杂度很高,体现在主链路上的是一些表和工作会被其余很多工作或表依赖,使得勘误过程会影响到很多不可预知的表或工作。造成以上问题的起因,次要有两点,一个是数据勘误产生后果回退裸露给用户,另外则是血缘关系简单且须要人为保护。
在以后的这条链路上,Flink 实时工作的状态保护是十分大的,这就造成存储和计算资源的耗费十分大,从这么大的状态中复原作业的过程也会很慢。产生状态大问题的两大起因次要是去重算子保护全量数据状态和级联 Join 状态反复。
为什么抉择 Paimon
基于以上存在的痛点,咱们思考心愿通过 Flink 生态搭建 Steaming Lakehouse 的组合来解决原始链路上的问题,如上图所示,原始链路存在的问题有:
- 存储异构,Base+Delta 数据难对齐;
- 去重引入非确定性计算和大状态;
- 血缘关系简单 & 数据勘误后果回退裸露给用户。
对应解决原始链路的问题,咱们抉择了 Paimon:
- 流批一体的存储能够以对立 Table 对外输入,实时和离线数据能够存储到一张 Paimon 表中,间接解决了对齐的问题;
- 不须要去重,Changelog Producer 代替状态算子,同时反对在存储上产生残缺的 Log,并将其长久化代替原有链路上的状态算子;
- 血统治理 & 数据一致性治理,反对无感知数据勘误。
二、流式数仓实际
首先介绍流式数仓实际过程中的架构设计,如下图所示:
- 存储层选用了 HDFS 或 S3 的对象存储作为存储底座,选用 Paimon 作为对立的 Table 形象;
- 计算层选用 Flink 同一的技术栈,对立了流批计算;
- 数据管理层实现了 Table 的血统治理和数据的血统治理,基于这样的血统治理能够做到数据一致性,血统治理能够用于数据溯源的需要,为数据品质提供保障。
- 数据一致性治理,流批一体 ETL 数据管理。在多表一致性联调的时候,能够主动对齐数据,不须要开发人员手动对齐。
如上图可见下层通过 Gateway 或 Service 层对外提供服务,最终用户通过 SQL Client 或是 Rest API 拜访整个零碎。
上图是流式数仓 Pipeline。数据源和后面提到的一样,离线数据存在 Hive 中,实时数据存在 MQ 中。不同的是在进入 Streaming Lakehouse 的时候,设置了一个 ODS 层,这层会通过 Flink Streaming SQL 把每一个数据源积淀到 Paimon Table 里。第二层是 DWD 层,通过对多个数据源进行 Join 打宽的操作,将输入的后果积淀到 Paimon Table 里。再通过最初一层 APP 层做指标聚合以及透出的工作。
因为两头数据都积淀在 Paimon Table 中,所以开发人员在做数据排查和验证的时候能够间接操作。通过上图实时数仓的 Pipeline 能够看到存储上是流批一体的,在计算上也是用 Flink 的技术栈对立了流批计算,这样能够缩小开发和运维的老本。而且两头数据积淀也是可间接查问的,不须要在运维的时候做更多繁琐的操作。
在实现上述 Streaming Lakehouse 实际落地后总结了如下收益:
- 简化开发流程
流批一体存储能够解决实时和离线存储异构的问题;
缩小业务入侵,移除去重算子,解决非确定性计算。
- 晋升运维体验
两头数据可查;数据可追溯;
血缘关系 & 多表一致性,加强了多表关联调试能力,并且能够做到数据勘误无感知。
- 缩小状态量
Changelog 长久化,能够缩小 30% 的状态量。
在实际过程中,除了取得了不少收益,也同样遇到了新的问题,次要包含两个:
- 数据新鲜度差:端到端的提早变动为分钟级,数据新鲜度升高;
- 小文件问题:一些小文件可能会影响读写性能。
三、流式数仓的调优
端到端提早调优
首先要剖析下整个链路数据的可见性与什么相干。如上图所示,Source 在收到数据之后,会把这些 Records 源源不断的发给 Bucket,而后 Bucket Writer 在收到数据后,先把这些数据缓存到一个基于内存的 Buffer,存满之后会触发一个 Flash 将这个 Buffer 里的数据全副都 Flash 到磁盘上。这个时候就生成了对外不可见的数据文件,只有当上游触发了一个 Checkpoint 的时候,整个链路中 Commit 算子生成一个 Snapshot 指向刚生成的数据文件能力对外可见。
剖析整个流程,能够得出两个论断:
- 数据可见性与 Checkpoint 绑定。更严格的说是一个周期的数据可见性与 Checkpoint 周期严格绑定。
- Checkpoint 周期 = Checkpoint interval + Checkpoint latency。Checkpoint interval 是 Checkpoint 触发的频率;Checkpoint latency 是整个实现一个 Checkpoint 所需的耗时。
因而在咱们在做端到端调优的时候,是否只须要针对 Checkpoint 周期做相干调整就能够呢?最简略的是不是将 Checkpoint interval 进行调小操作呢?
在得出结论前咱们先来看下写入流程。在 Paimon Sink 算子中,Bucket Writer 会源源不断的把数据凋谢到磁盘的数据文件里,另外 Paimon Sink 还蕴含另外一个组件 Compact Manager。这个组件次要是针对磁盘上的数据文件一直的做 Compaction。如上图右侧展现,Compaction 在逻辑上是个 Bucket,在存储上是一个目录,目录下会寄存很多数据文件,这些文件是由 LSM 树组织的,分为多个 Level。实际上 Compact Manager 在做 Compaction 的时候就是针对这些不同层的数据做的过程。
所以咱们推断,整个 Compaction 过程是一个 I/O 比拟多的操作过程,假如一味的调小 Checkpoint Interval,会导致频繁的 Checkpoint,比方原来 100 条数据原本是能分一个文件里的,然而当初 Checkpoint 频率过高后,这 100 条数据可能会被分到多个文件里,那么每个文件外面的数据都会很小。其次,小文件过多,会让 Compaction 的整体代价变得更高,也会影响写入的性能。其实这就是一个谋求数据新鲜度的过程,次要须要在数据的写入性能和数据新鲜度上做衡量。在通过多方实际验证后,举荐将 Checkpoint Interval 设置为 1-2 分钟为优。
Checkpoint Latency 优化能够分为几个方向进行:
- Log-Based 增量 Checkpoint
利用 Flink 高版本的一些个性,如 Log-based 增量 Checkpoint 的形式去优化上传阶段的耗时。
- 缩小状态量
比方缩小上传输数据量,那么上传耗时就会缩小。
- Checkpoint 继续上传
继续上传本地状态文件。
- 搭建独立 HDFS 集群
缩小遇到慢节点的概率。
通过以上四种方向的优化,咱们在实践中失去验证的后果是能够将端到端的提早做到分钟级。
小文件优化
字节外部的实际是基于 HDFS 为存储底座的,咱们将小文件定义为显著小于 HDFS 上一个 Block 大小的文件。小文件引出最间接的问题就是文件数量太多导致须要更多的 Block,比方 Create Block,Delete Block 等,间接的影响就是 I/O 操作频繁,会导致 HDFS 上的 NamaNode 压力过大对稳定性产生影响;另外,无论文件自身有多大,它的 Block 的元信息是固定的,而这些元信息都是存在 NameNode 内存里的,过多的 Block 元信息会造成内存 OOM 问题;当数据太扩散 / 文件数量太多时,数据就有可能被调配到更多的 HDFS 的 DataNode 里,就会造成 DataNode 的来回跳转,减少频繁的随机读写,使效率和性能变低;并且调配的 DataNode 变多遇到慢节点的概率也会变大。
在小文件相干的问题中,决定是否产生小文件的机会和因素有以下几点:
- 文件生成。数据文件在磁盘上生成是有两个触发机会的,一个是 Checkpoint 的时候,它会强制把以后的 WriteBuffer 里的数据刷到磁盘上;第二个是 WriteBuffer,当它满了也会把内存外面的数据刷到磁盘上。如果把 Checkpoint Interval 调的过小,或是把 WriteBuffer 容量设置的比拟小,那么数据就会更频繁的被刷到磁盘上,而产生适量的小文件。
- 文件划分。通过设置一些 Partition key 或 Bucket key,就决定了数据的走向,它会落在哪些文件里。比方,生产中理论数量十分小,同时又设置了过多的 Bucket,那么能够预感,一个 Bucket 能够分到的数据量肯定会比拟小。这个过程中也会遇到小文件问题。另外,如果设置 Partition key 或 Bucket key 不合理,可能会产生一些文件歪斜的状况,即热 Key 问题。
- 文件清理。Paimon 具备文件清理机制,在 Compaction 过程中会删除一些无用的文件。另外,数据由 Snapshot 治理,如果 Snapshot 过期,就会从磁盘上删除对应的数据文件。如果 Compaction 触发条件和 Snapshot 过期条件没有治理好,也会造成冗余的文件留在磁盘上。
基于以上的介绍,分享一下咱们在实际过程中积攒的一些小文件调优参数,见下表所示。
- Checkpoint interval::举荐在 1-2 min 比拟适合;
- WriteBuffer 大小:举荐应用默认值,除非遇到相干问题须要调整;
- 业务数据量:能够依据业务数据量调节 Bucket 数,调整根据为单个 Bucket 大小在 1G 左右比拟适合;
- Key 的设置:能够依据理论的业务 Key 特点设置适合的 Bucket-key、Partition,以防产生热 Key 歪斜的问题;
- Compaction 治理和 Snapshot 治理相干参数:举荐应用默认值,除非遇到相干问题须要调整。
经验了整个架构革新之后,咱们将原有实时数仓链路做了比照,如下图可见,在很多指标上都取得了肯定的收益。
- 端到端提早:在原始实时数仓开启 Mini-batch 的状况下,端到端提早没有显著进化,能够反对 1-2 min 的近实时可见;
- 数据排查时效性:能够从小时级晋升到分钟级;
- 状态量节俭了约 30%;
- 开发周期缩短约 50%。
四、将来布局
以后次要布局了以下四个方向:
- 首先,秒级端到端提早的尝试。可能会分几期来做,打算引入 Embeded Log System 来解决这个问题。长期来看,会把数据可见性与 Checkpoint 解绑;
- 其次,数据一致性治理。血缘关系治理和数据一致性治理这两个方面,在理论数据运维中是很有价值的;
- 第三,状态复用。状态复用次要是解决 Join 状态复用的问题。另外,咱们心愿能够做到中间状态可查;
- 第四,监控运维。将来当规模下来,心愿能够建设监控体系,并做到指标可观测。
Q&A
Q:请问在数据源异构的状况下,是否思考过其余入湖的技术选型?为何最终抉择了 Paimon?
A:在最后技术选型的时候,次要思考几个点,一个是跟 Flink 生态的联合,一个是 Streaming Warehouse 这种模型,过后与这两点联合最好的是 Paimon,另外在咱们 Steaming upsert 的支流场景下状况更优。
另外,对于两头存储层,是 Plugin 的模式,不是说肯定要和 Paimon 做很深的绑定。
Q:请问在做数据回溯、快照和回滚的时候,有做过哪些思考?可能给一些可供参考的倡议?
A:在这个方面咱们次要是基于 Paimon 做了血统治理的性能。血缘关系治理简略来讲分为两个局部:第一局部是表的血缘关系治理;第二局部是数据的血缘关系治理。
表的血缘关系治理,比方在提交作业的时候,通过工作能够提取出它上下游表的信息,而后插入到引入的 System Database 里。数据血缘关系治理,能够依据 Checkpoint 去划分数据版本,一个 Checkpoint 实现之后就意味着一个版本数据的产生。而后再把具体生产了哪个版本的数据,记录到 System Database 里。
基于这两种血缘关系治理,既能够放弃旧链路在线服务的状态,也能保障新链路回溯数据或勘误数据成为可能。在生产环境中,由零碎层面把表主动切换,就能够实现一次数据回溯。
Q:请问用流自身去解决数据,如果数据量过大,是否会造成获取数据源闭口的环节拥挤,以至于数据进不来?
A:这是一个写入性能优化的问题,在 Paimon 官网上有专门针对这块的具体领导,大家能够去理解下。
点击查看原文视频 & 演讲 PPT
Flink Forward Asia 2023 正式启动
点击查看流动详情
更多内容
流动举荐
阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
首购 99 元包月试用,有机会赢取定制周边礼品!
产品官网:https://www.aliyun.com/product/bigdata/sc