关于后端:小米基于-Flink-的实时数仓建设实践

42次阅读

共计 7611 个字符,预计需要花费 20 分钟才能阅读完成。

摘要:本文整顿自小米软件开发工程师周超,在 Flink Forward Asia 2022 平台建设专场的分享。本篇内容次要分为四个局部:

  1. 小米数仓架构演变
  2. Flink+Iceberg 架构降级实际
  3. 流批一体实时数仓摸索
  4. 将来瞻望

点击查看原文视频 & 演讲 PPT

一、小米数仓架构演变

1.1 数仓架构现状

在介绍演变前,咱们先来理解下小米以后的技术现状。

上图展现的是小米目前的技术架构,在存储侧咱们次要利用数据湖 Iceberg 和自研音讯队列 Talos,计算层次要利用 Flink 和 Spark,他们对立运行在 Yarn 上,对立通过 Metacat 获取元数据信息,并通过 Ranger 来进行对立的鉴权服务。咱们外部应用 Spark 和 Presto 来撑持 OLAP 查问场景,并通过 Kyuubi 来实现路由。

在实时数仓场景中,咱们抉择 Flink 作为计算底座,Hive、Talos、Iceberg 作为存储底座,其中,音讯队列 Talos 作为传统 Lambda 架构的通用抉择,在咱们外部占比拟大且很稳固,Iceberg 作为一款优良的湖存储,兼具时效性和低成本,其应用占比也在逐渐晋升,应用到 Iceberg 的 Flink 作业在总占比中曾经达到近 50%。

咱们对外部实时链路进行了统计,Iceberg 在大多数场景下曾经对 Hive 进行了替换,对分钟级的实时链路进行了较好的撑持;因为应用 Iceberg 搭建的实时链路目前仅能达到分钟级的时效,音讯队列仍有着较高占比。

1.2 数仓架构演变

接下来看下小米外部数仓架构的演变历程。

在引入数据湖前,针对日志埋点这样的聚合计算场景,业务会应用离线计算来搭建链路,采集模块会将日志或埋点数据对立收集到音讯队列中,Flink 生产音讯队列中的数据实时写入 ODS 层 Hive 表,上游的计算则采纳 Spark 或者 Hive 按小时或天进行荡涤、聚合。显然,这样的链路解决提早和老本都较高,这些离线作业往往都在凌晨进行调度,给整个集群带来较大压力。

针对 CDC 数据源,实时数据通常会通过音讯队列进行流转,保障解决的实时性,数据在音讯队列中以 Changelog-Json 的格局进行存储。但为了保障计算的准确性,业务链路通常会应用 Lambda 架构来搭建,会额定引入一条离线链路。离线链路基于 Hive 或 Kudu 构建,ODS 层应用 Spark Streaming 近实时导入,局部场景也会定期全量导入,上游计算依赖 Spark 做定时调度。显然,这样的架构开发和保护的老本都会很高。

带着下面的问题,咱们想要对批和流链路进行对立,并可能满足低成本和低提早,为此咱们引入了 Iceberg,在引入 Iceberg 初期,小米外部的应用以 v1 表为主 (v1 表是数据分析表,反对 Append 类型数据的增量读写)。因为 Flink 旧架构(1.12 版本) 读取 Iceberg 的数据时效性不高,所以在日志埋点场景的利用次要是替换了 Hive,应用 Iceberg 来存储 ODS、DWD 层数据,能够升高存储老本,同时配合 Spark、Presto 能够取得更快的查问速度。

针对 CDC 数据源的场景,在初期也同样以替换 Hive 为主以获取更低的老本。

在中期,咱们对 Iceberg v2 表不断完善,v2 表在 v1 表的根底上反对了行级别的更新和删除,同时也反对了 Merge on read 模式,并且有着不错的性能。业务的实时链路也能够齐全依赖 Flink 和 Iceberg 来进行搭建。之前的日志埋点链路通过 Iceberg v2 表的降级后,应用 Flink+Iceberg v2 替换了原先的 Spark + Iceberg v1,将链路时效性由小时级晋升至分钟级。

因为 v2 表可能反对行级别的更新,而且数据实时可查,本来针对 CDC 数据源的 Lambda 架构链路能够降级到 Kappa 架构,由 Flink 和 Iceberg v2 表来构建,兼顾时效性和老本,依赖 Parquet+ZSTD 压缩,存储老本相比于原先 Parquet+snappy 可能节俭 30%。

1.3 以后架构遇到的问题

通过咱们一段时间的应用,咱们发现目前 Iceberg 可能很好地兼顾老本、查问效率,社区的很多优化也以离线为主,但在实时中存在着时效性和稳定性方面的有余,间隔音讯队列仍有差距,同时,Iceberg 作为对立的存储 Format,在理论生产时须要读取底层文件,而 v2 表有着多种文件类型,读取时须要组织 DataFile 和两类 DleteFile(Equlity delete 和 Position delete)的关系,逻辑较为简单。

咱们在基于 Flink+Iceberg 的实时链路构建中,常常会遇到以下两类问题:

  • 链路实时性略差,相比于音讯队列仍有差距,目前仅可能稳固在 15 分钟左右。
  • 链路稳定性略差,常常因为链路中一张表的生产积压导致作业失败,从而使得整体链路不可用,重大依赖人工干预。

二、Flink+Iceberg 架构降级实际

2.1 基于 Flink1.12 的旧架构实现

针对上述的两个问题,咱们对 Flink+Iceberg 的架构进行了降级。

上图中的实时数仓链路由多张 Iceberg 表和多个 Flink 作业组成,其中 Iceberg 负责数据的存储,Flink 负责数据的荡涤、流转,显然对一条链路的实时性和稳定性撑持,Flink 起了关键作用。在一个 Flink 流式作业中,数据会通过读取、计算、写入,在理论场景中,咱们发现数据的读取效率低,重大影响了作业吞吐,后续的相干优化也次要围绕读取局部开展。

在介绍优化之前,先来理解下读取架构的现状。

在优化前,咱们的 Flink+Iceberg 实时链路次要依靠于 Flink 1.12 版本构建,在 1.12 版本中,读取逻辑被拆分为 Monitor 和 Reader 两个算子,在进行增量生产时,Monitor 算子扫描 Snapshot 中的文件,并组织成 Split 发往上游给 Reader 算子生产。这样的架构做到了很好的扫描和读取逻辑拆散,然而仍有几点重要缺点,例如:Split 信息在 Jobmanager 和 TaskManager 之间单向同步、繁多的工夫驱动扫描、以及咱们为了保障程序性而限度了读取并发为 1,这些点一起影响着生产速度。

2.2 旧架构遇到的次要问题

这样的缺点在理论作业中会有实时性和稳定性两大问题体现。在实时性方面,存在着生产速度慢、生产存在稳定;在稳定性方面,存在着 Task OOM,Checkpoint 容易超时。

为了保障拓扑的有向无环,数据在上下游算子之间只能单向流动,这导致扫描进度和读取进度只可能单向同步,Monitor 算子感知不到 Reader 算子的生产状况;在这种单向同步的机制下,Monitor 会在肯定周期内下发固定的 Split 数,如果在一个周期内发送 Split 的数量较少,那么 Reader 算子会有局部工夫处于闲暇状态,导致生产存在稳定,存在资源节约。而如果一个周期内下发的 Split 超过了 Reader 的生产能力,那么 Split 就会在 Reader 侧沉积,占用额定的堆内存。

同时固定的扫描距离也会导致生产的提早,新数据须要期待肯定扫描距离后才可能被生产到。而单并发生产又限度了作业的吞吐下限,以上这三点一起影响着实时性。

这样的机制不仅影响着实时性,对稳定性也有不小的影响。Monitor 和 Reader 的单向同步机制,使得生产须要指定距离和距离内下发的 Split 数,未生产完的 Split 会存储在堆内存中,积压较多会导致 OOM、Full gc 频繁,Task 吞吐升高。

同时,旧架构的 SourceFunction 在实现数据下发时须要持有 Checkpoint 锁从而保证数据下发和状态更新的统一,而 Reader 算子 Checkpoint 粒度仅细化到 Split 级别,所以 Reader 算子须要长时间去持有 Checkpoint 锁,只有生产完一个 Split 后才会开释,这在上游解决慢,反压状况下是致命的缺点,很容易导致 Checkpoint 超时。这些点一起促使着作业稳定性的升高。

2.3 基于 Flink1.14 的新架构实现

为了解决上述实时性和稳定性问题,咱们在社区基于 FLIP-27 的改变上改良了读取逻辑,次要涵盖了上图右侧的七点,其中双向通信,Monitor 逻辑移至 JobManager 是 FLIP-27 的要害优化点。咱们外部次要对前面的五点进行了优化,别离是 Snapshot 的顺次扫描、自适应的扫描模式、分区多并发生产等。

增量生产 Iceberg 存在着两种形式,别离为顺次扫描 Snapshot 和合并多个 Snapshot 扫描。在合并多个 Snapshot 的扫描模式中,须要依赖 Merge on read 模式,用后续 Snapshot 中的 Delete 文件对以后 Snapshot 中的 Data 文件数据进行过滤。如果合并多个 Snapshot 进行生产,那么一个 DataFile 可能会关联到很多后续 Snapshot 的 DeleteFile,使得 Split 的组织变得复杂,同时 Reader 算子在应用 DeleteFile 过滤 DataFile 时,须要将 Equlity delete file 全副读取到内存中,这也很容易导致 Task 产生内存问题。

咱们默认将扫描模式设置为了顺次扫描,该模式能够更好地追踪数据变动,并且升高文件组织复杂度,防止了在合并多个 Snapshot 模式中因为 Delete 文件较大而产生的内存问题,对稳定性更加敌对。

旧架构中,扫描逻辑次要由工夫驱动,定时触发,在新架构中,咱们引入了自适应的扫描模式,减少了事件驱动,解决了生产稳定和 Task 潜在的内存问题。在理论扫描过程中,动静 Enumerator 会依据内存中 Buffer 的反馈进行决策,小于阈值就立即执行扫描操作,保障 Reader 可能间断生产,大于阈值就阻塞扫描,防止将更多的 Split 缓存在内存中。

在新架构中,咱们针对 v2 表实现了并发生产,将本来的繁多队列 Buffer 依照上游 Task 拆分成多个队列 Buffer,Iceberg 表中不同分区的数据文件会依照写入排序,并被 Hash 到不同的队列,实现生产的分区有序。

同时为了保障各个 Task 生产数据的对齐,咱们应用 Snapshot 的提交工夫来生成 Watermark,引入 AlignedAssigner 来实现对立的 Split 调配,在调配端实现对齐,保障上游各个 Task 生产数据的对齐。

下面咱们讲到的自适应扫描只能解决单个 Source 实例的问题,在理论利用中,局部场景仍有潜在稳定性问题存在,例如集成场景中的指标拆分,将一张表的数据拆分至少张表;数仓场景中,对同一张表进行屡次援用,筛选不同局部的数据进行 Join。在这两个应用场景中,因为不满足 Source 复用规定,会有多个读取同一张 Source 表的实例存在。

在 Flink 中,Source 的复用受 Partition、Limit、Project、Filter 影响,以 Project 和 Filter 为例。上图右边的 SQL 形容了 Project 下推导致的复用生效,因一个字段的区别,同一份数据就会被读取三次;上图左边的 SQL 形容了 Filter 下推导致复用生效的场景,即便选取的范畴有很大反复,但 Source 仍不会失去复用。因为复用的生效,同一个表的雷同 Split 会在内存中存在多份,仍然有呈现内存问题的可能。

为了优化这种状况,咱们引入了两种形式。

  • 通过规定来主动对以后不合乎复用的 Source 算子执行复用。
  • 反对在 SQL 中手动指定表履行强制复用。这样的优化在缩小内存占用、缩小数据反复读取方面有着不错的成果,节俭作业资源的同时也减少了稳定性。

通过切换至新架构,生产 Iceberg 表的均匀扫描距离降至小于 1 秒,单个 Task 吞吐晋升至 70 万条每秒,实时数仓链路新鲜度晋升至 5 分钟内。

三、流批一体实时数仓摸索

上一章介绍了 Flink 读取 Iceberg 架构的优化,这一章将次要介绍小米在 Flink 流批一体实时数仓上遇到的问题以及相干摸索。

遇到的问题能够归结为三类。

第一类是数据稳定,实时数仓中数据是一直变动的,因为 Flink 回撤机制的存在,-U 和 +U 会拆分为两条数据写入,在 -U 写入,+U 未写入时执行查问,会查问到异样数据,而在 +U 写入后又能查问到失常后果。

第二类是计算不确定性,Flink 的状态过期会导致计算结果的不确。同时针对这部分异样数据,往往没有简略的比照、修复伎俩,这导致实时数据产出的数据修复难。

针对数据稳定问题,思考到上游绝大多数零碎都可能反对 Upsert 写入,咱们引入了写入前数据抛弃能力,用于抛弃无关紧要的数据,将其称为 Drop Operator。该算子作用在 Sink 节点前,可能依据配置抛弃指定类型的数据。

针对 Flink 聚合增量数据写入 ADS 层 MySQL 的场景,能够配置抛弃 -U,防止 ADS 层查问稳定。同样,该配置能够很不便的将 Changelog 流抛弃 -U 和 -D 转为 Append 流,满足一些非凡的业务场景。

在解决计算的不确定性前,咱们须要先理解其产生的起因。在 Flink SQL 中,状态起着重要作用,正确的中间状态是计算结果正确的必要条件。但显然,目前状态的放弃是低廉的,咱们须要一个状态过期策略来进行均衡。

在 Flink 外部,有着 Watermark 清理和 TTL 清理两类算子。Watermark 能够依据业务的须要去生成,清理的策略依据理论应用场景制订,所以对计算结果影响可控。而依赖 TTL 清理的这类算子,在 Flink SQL 中状态过期的策略无奈失去精确管制,只能设置一个对立的状态过期工夫,往往因为过期工夫设置不合理或者满足不了业务需要,从而产生意料之外的计算结果。

例如物流、服务单场景,订单从创立到敞开的时间跨度往往很长,很容易呈现在订单还没有完结前,状态就过期了。为了解决订单跨度工夫长导致状态失落的问题,业务会设置一个离线的 Topic,通过离线链路定期往离线 Topic 里补数据,补充的数据从新流入实时链路中,将过期状态从新补回。

针对由状态过期而导致的计算不确定问题,咱们有两种解决思路。

  • 针对定时批量补状态,咱们尝试从源头解决问题,让状态按需过期,缩小额定链路保护。
  • 针对定时批量修复异样数据,咱们想要提供更加简略不便的修复路径。

为了可能让状态按需过期,咱们引入了算子级的状态清理性能,将清理规定利用范畴从作业细化到各个算子,将清理规定从工夫规定拓展到业务规定,并通过 Query Hint 对算子提供灵便、不便的定义。

目前该性能反对两类算子,别离是 Group 聚合算子和 Regular Join 算子,上图表格为反对的参数,通过 TTL 的参数能够设置该算子状态的过期工夫,condition 参数能够填写清理规定,为了不便判断,清理规定须要是布尔表达式。

上图的 SQL 展现了求某类商品总销量的聚合计算逻辑,该聚合算子状态保留工夫为 30 天,笼罩了作业级的 1 天保留工夫,且当商品状态为售罄或下架,那么就革除该商品的状态,这意味着无关该商品的销售记录后续不会再呈现。

在聚合算子里,咱们退出了一个状态清理的查看器,将用户设置的清理规定通过 codegen 转换为 Java 代码,在聚合计算后进行规定查看,匹配胜利后执行清理。

同样针对 Join 算子,状态清理查看器的实现相似,只是在 Join 算子会对左右表的状态别离进行清理,清理完后会去对方状态中将援用计数 -1。

上图的 SQL 示例形容了一个物流表的 Join 场景,左表为物流订单表,保留着订单状态以及更新工夫,右表为维度表,保留着该订单的一些根底信息,包含创立工夫。在图中的例子中,Join 算子的状态清理不再依赖 Proctime,只依赖于运单状态和运单的持续时间。

尽管算子级状态清理可能解决一部分需要,但它的应用门槛较高,且并非所有业务都有明确的清理规定,一个简略不便的修复伎俩才实用于所有场景。如果想要用 Flink Batch 对数据进行修复,目前有 INSERT 和 OVERWRITE 两种形式。应用 INSERT 实现 SQL 逻辑较为简单,且只能对数据进行笼罩,不能删除;OVERWRITE 的修复形式粒度较粗,而且会使上游实时作业产生较大稳定。

在这样的场景下,咱们应用 Flink 实现了 Merge 语法。Merge 语法会对两个数据源做 Join,并能够针对不同的 Join 状况执行增、删、改操作,对上游影响小。

在具体的实现上,咱们在本来的 Calcite 语法上欠缺了 Merge 语法的解析逻辑,反对为每个 Action 设置独立的判断条件,在 Schema 匹配状况下反对 Insert 和 Update 语句,简化逻辑。

在 SQL 校验阶段,Merge 逻辑会被转为 Outer Join 和多个 Merge Action 的联合。在优化阶段,目前咱们会依据理论的 Merge Action 状况来优化 Join 形式,将默认的 Outer Join 改写为 Anti Join 或者 Inner Join,缩小解决的数据量。

最终,Merge 逻辑会生成 Join 和 MergeAction 两个算子,Merge Action 算子依据上游 Join 状况来生成增、删、改数据并发往上游。因为 Flink SQL 目前提供了优良的流批一体架构,能够复用以后的逻辑,将增删改数据写入上游数据系统。

在咱们外部,Spark 和 Flink 目前都反对 Merge 语法,但 Spark 在框架层只提供了语法侧的反对,Runtime 层的反对在 Iceberg 侧由插件实现。Flink 则在框架中实现了语法和 Runtime 层的反对,使得 Merge 的性能更加通用,也可能反对更多存储系统。目前,在咱们外部,Flink merge into 入湖和入库场景应用较多。

因为实现起因,Spark 在咱们外部目前仅能反对 Merge into 入湖,所以咱们在 Merge into 入湖场景下对 Spark 和 Flink 的处理速度做了测试,目前中小批量数据的 Merge 操作 Flink 执行速度会略快,大数据场景下 Flink 因为入湖速度较 Spark 慢,所以耗时稍多,但整体来看,Flink 曾经可能满足日常修复需要。

四、将来瞻望

咱们的将来瞻望次要包含以下三点:

  • 基于 Iceberg 的秒级湖仓建设,目前 Flink+Iceberg 在咱们外部实时链路中可能很好的反对分钟级的场景,咱们心愿将来在实时性上有所突破,将链路新鲜度晋升至秒级。
  • 基于 Iceberg 的残缺 CDC 的反对。目前,如果一张 Iceberg 被多个上游并行写入,或者单个作业回溯写入,咱们须要应用 Upsert 模式,写入 +I 或 +U 前默认写入一条 -D,但因为短少信息,写入的 Delete 可能是多余的且无奈获取到正确的非主键列值,咱们心愿在前期可能对其欠缺,使得上游可能读取到正确残缺的 CDC 数据。
  • 跟进基于 Flink SQL Gateway,欠缺动静查问的反对。

点击查看原文视频 & 演讲 PPT


更多内容


流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
0 元试用 实时计算 Flink 版(5000CU* 小时,3 个月内)
理解流动详情:https://click.aliyun.com/m/1000372333/

正文完
 0