乐趣区

关于flink:B-站构建实时数据湖的探索和实践

摘要:本文整顿自 bilibili 大数据实时团队资深开发工程师周晖栋,在 Flink Forward Asia 2022 实时湖仓专场的分享。本篇内容次要分为四个局部:

  1. 背景和痛点
  2. 场景摸索
  3. 基建优化
  4. 总结和瞻望

点击查看原文视频 & 演讲 PPT

一、背景和痛点

在大数据场景利用中,业务不仅要计算数据后果,而且要保障时效性。目前,我司演化出两条链路。时效性高的数据走 Kafka、Flink 实时链路;时效性要求低的数据走 Spark 离线链路。上图简略形容了 B 站数据上报、解决和应用的链路。数据采集次要通过 APP 端上报的行为事件数据。服务端上报的日志数据会通过网关以及散发层,流式散发到大数据数仓体系内。

MySQL 中存储的业务数据,通过 Datax 周期性的批式同步到数仓内。时效性高的数据会通过 Flink+Kafka 进行流式计算。时效性低的数据通过 Spark+HDFS 进行批计算最初出仓到 MySQL Redis Kafka 的介质中,为 AI、BI 的模型训练、报表剖析场景应用。

在应用的过程中,也发现了诸多问题。

  1. 离线数据的时效性有余,离线批计算以小时 / 天为单位。越来越多的业务方心愿时效性达到分钟级,离线的小时计算或天计算不能满足业务方的需要。为了达到更高的时效性,业务会再开发一条实时链路。
  2. 但实时链路的可观测性较弱。因为在 Kafka 里查看数据并不不便,所以须要将 Kafka 里的数据挪动到其余存储中,能力进行查看。实时数据链路广泛不容易和业务工夫对齐,难以精确定位到须要重跑的终点。如果数据出现异常,业务个别不会抉择在实时流上进行重跑,而是进行离线链路的 T-1 修复。
  3. 实时离线双链路会有双份的资源开销,开发以及运维老本。除此之外,口径不统一还会带来额定的解释老本。
  4. 全天计算资源顶峰集中在凌晨,整个大数据集群的应用峰值在凌晨 2 点~8 点。次要在跑天级任务,也存在工作排队的景象。其余时段的资源比拟闲暇,应用时有显著的峰谷景象。整体资源使用率有肯定的优化空间。
  5. 在数据出仓孤岛方面,对于用户来说还须要克隆一份数据到 HDFS 上,因而会存在数据一致性的问题。当数据出仓后,权限以及数据联邦查问都会存在有余。

咱们心愿通过实时数据湖计划,解决以上痛点。

  1. 咱们通过 Flink+Hudi 将数据增量,以及增量计算产出的后果存储在 Hudi 中,反对分钟级的数据计算,进一步加强了数据的时效性。
  2. 除此之外,Hudi 具备流表二象性,岂但能够进行实时的流式增量生产,而且能够作为表,间接进行查问。相比 Kafka 链路,进一步加强了可观测性。
  3. 实时数据湖同时满足实时和离线的需要,达到降本增效的成果。除此之外,它还反对离线数仓数据重跑的诉求。
  4. 通过增量计算,将本来在 0 点当前调配的数据资源进行细分,摊派到全天的每分钟,错峰的应用资源。
  5. 通过排序、索引、物化等形式,咱们能够间接查问 Hudi 表中的数据,从而达到数据不出仓的成果。

二、场景摸索

当业务零碎数据存储在 MySQL 中,须要将这些数据导入到大数据的数仓中,进行报表、剖析、计算等场景。目前,业务数据入仓不仅用于离线 ETL,还冀望具备时效性。比方在稿件内容审核场景中,工作人员心愿晓得近十分钟的稿件增长量是否匹配稿件审核人力,存在实时监控诉求,以及告警诉求。稿件的数据来源于业务数据库,它并不满足于以后天级、小时级数据同步的时效性,心愿能达到分钟级的时效性。

在降本增效的大背景下,咱们不仅要满足业务诉求,还要思考效力。实时链路用于实时场景,离线链路用于批量的 ETL 场景比拟节约。所以咱们心愿通过实时数据湖,构建一套流批对立的计划,同时满足实时和离线场景的诉求。通过调研发现,现有的计划中有以下几类:

第一,DataX 定期批量导出数据到 Hive。

Hive 自身并不具备更新能力,个别按天来导出全量,并不满足时效性诉求。除此之外,该计划还存在数据冗余的问题。Hive 表每天的分区,都是 MySQL 表当天的快照。比方一张用户信息表的信息变动很少,每天都须要存储一遍全量快照。即每条用户信息,每天都会被存储一次。如果 Hive 表的生命周期是 365 天,那这条数据会被反复存储 365 次。

第二,Canal/CDC to Hudi 计划。

DB 的数据通过 Canal 或 Flink CDC 写入 Hudi 中,从而满足时效性的诉求。因为 Hudi 的数据实时更新,不具备可反复读的能力。因而该计划并不满足 ETL 场景。即便应用 Hudi” 快照读 ” 的能力。尽管能够读取 Hudi 的历史的 Commit,获取一份某一时刻的快照数据。但如果长时间的保留 Commit 数据,会导致文件过多,会影响拜访 timeline 的性能,进而影响到 Hudi 的读写性能。

第三,Hudi export Hive 计划。

该计划是前两个计划的联合,将 DB 的数据通过 Canal/CDC,写入 Hudi 之后,进行周期性的导出到 Hive 表。Hudi 表用于实时场景,Hive 表用于离线的 ETL 场景。从而同时满足两方面的场景诉求。其毛病在于,用户在应用过程中用到了两张表,存在肯定的了解老本,以及数据冗余问题。

第四,Hudi Savepoint 计划。

次要解决数据冗余的问题。通过周期性的 Savepoint,能够存储 Hudi 过后的 timeline 元数据。拜访 Savepoint 时,会映射地拜访到 Hudi 的理论文件,防止冗余的存储数据文件。每天一个 Savepoint,相当于每天存储一份 MySQL 快照,从而满足了 ETL 场景复读的诉求。与此同时,还能够间接拜访 Hudi 的最新数据,满足用户的实时诉求。

但该计划仍有一些缺点,它无奈准确的切分跨天数据。通过 Flink 增量写 Hudi 时,会周期性的产生 Commit,无法控制业务工夫和 Commit 对齐。如果昨天和明天的数据落在同一个 Commit 里,Savepoint 会以 Commit 为最小力度。当拜访昨天的 Savepoint 时,它会蕴含明天的数据,与用户的预期不符。

为了解决上述问题,咱们提出的解决方案是 Hudi Snapshot View 快照视图。在 Hudi Savepoint 计划上做了改良,简略来说是一个带过滤的 Hudi Savepoint。

在导出 Hive 计划中是能够加过滤条件,将第二天的数据过滤进来。咱们把过滤逻辑纳入到 Hudi 快照视图中。在快照视图里将过滤逻辑,做在 Hudi 底层,存储在 Hudi Meta 中。在拜访快照视图时,咱们会吐出过滤后的数据,从而解决快照里存在跨天数据的问题。

如上图所示,Delta Commit T3 蕴含了 11 月 1 号和 11 月 2 号的数据。快照视图的源数据在存储时,将历史的 T1、T2、T3 的源数据全副进行存储。除此之外,还存储了 Delta<=11 月 1 号的过滤条件。在读取时将数据进行过滤,将仅蕴含 11 月 1 号及以前的数据给查问端,并不蕴含 11 月 2 号的数据。

快照视图同样是存储元数据,通过映射的形式,拜访理论的数据文件,不存在数据冗余存储的问题。也同时满足实时和离线场景,实现了流批对立。除此之外,快照视图是独立切出了一个 timeline,反对再做 Compaction、Clustering 等操作来减速查问。

接下来,讲一讲 Snapshot View 的生成机会。用户应该在哪次 Commit 后,做下这个快照视图?这里须要了解两个概念,一个是事件工夫,一个是解决工夫。

当数据呈现提早,尽管事实中的工夫达到了 0 点,但它可能还在解决 22 点的数据。此时,如果进行快照视图,用户读取的快照视图数据就会偏少。因为 Commit 是解决工夫,不是业务的事件工夫。这个 Snapshot View 要在事件工夫推动到 0 点后进行,才可能保证数据的残缺。为此,咱们在 Hudi 中减少了解决进度。这个概念相似于 Flink 中应用 Watermark 来标识解决进度。咱们扩大了 Hudi Meta 在 Commit 中存储了解决进度。当事件工夫推动到 0 点后,开始进行 Snapshot View 操作,告诉上游工作能够被调起。在 Meta 中有了事件的解决进度,在查问 Hudi 时也能获取解决进度,从而进行一些判断。

除此之外,咱们还做了引擎层的适配。在应用方面,用户写的 SQL 和原来的基本一致,通过 hint 或者 set 参数,指定是查问快照分区或者是查问实时分区。在 DB 入仓的场景上,既满足了实时场景的时效性,又满足了 ETL 的离线诉求,胜利实现了实时和离线的对立,达到降本增效的目标。

接下来,讲一讲埋点入仓场景。我司作为一家互联网公司,也会进行用户的行为事件定义,收集数据、上报入仓,而后进行剖析和应用。用数据驱动领导业务倒退。

我司的用户⾏为事件上报曾经颇具规模,行为事件十分多。当初曾经定义了上万个行为事件的 ID,每天日增千亿条数据,流量十分大。埋点入仓是公司级的我的项目,全站各个业务方都在上报埋点。在应用埋点时,我司存在大量部门业务线的穿插应用。比方广告 AI 须要应用其余业务线上报的数据,进行样本收集和训练。

在原有架构上,APP 端上报的数据通过传输和荡涤,落入数仓事后划分好的表分区中,供应业务方进行开发应用。这种业务划分是依据 BU、事件类型等业务信息,进行的粗粒度划分。上游的工作能够应用本人部门的表以及其余部门的表,只须要申请权限即可。

但该架构也存在一些痛点。一条流数据的隔离性不够,上万个埋点由同一个渠道传输,荡涤,隔离性有余。容易呈现流动期间某个行为事件猛增,影响整体的工作解决进度。除此之外,业务线应用须要过滤大量无用数据。在上游业务的工作中,可能仅用到本人的一个行为事件进行剖析。但此时行为事件与同部门其余的行为事件混在一起。在条件过滤时,引擎层只能做到分区级的过滤。将整个分区的文件加载进来,而后进行过滤,有较大文件读取的 IO 节约。与此同时,部门在穿插应用数据时,权限治理较难,应用到了其余 BU 的一个行为事件,须要申请整个 BU 表的权限。粒度粗,存在危险。上游有分钟级的诉求。目前,数据进行流式传输是小时级的荡涤。上游的时效性是小时级别,不满足用户的时效性的诉求。

为了解决上述问题,咱们做了一些架构上的优化。如上图所示,数据上报传输后,将数据落到户的 Hudi 表里。用户通过 View 来拜访或应用这些数据,能够用于离线 ETL、实时计算、BI 报表剖析等场景。

对于秒级时效性诉求的数据,会走高优 Kafka 链路,提供给在线服务应用,这种占比就比拟小了。北极星事件治理平台和元数据管理,负责管理整个行为事件埋点的生命周期、散发规定等等。

平台管制从边缘上报开始,进行规定分流,业务隔离,晋升隔离性。当数据落入业务 Hudi 表后,进行 Clustering,对业务数据进行排序和索引。通过引擎层,进行文件级别 / 数据块级别的 Dataskip,缩小理论读取数据量的 IO 开销。用户通过 Hive View 读取数据,平台通过给用户的 View 减少有权限的行为事件,达到行为事件级别权限治理。比方 a 部门的同学,在应用 b 部门的行为事件时,在 a 部门的 View 上减少一个 b 部门行为事件的 ID 即可。在提交 SQL 进行查看时,会进行行为事件级别的权限校验。增量传输荡涤 Hudi 表时,Hudi 表反对增量生产,能够达到分钟级的时效性。上游实时工作能够接在这个 View 前面进行应用,从而达到流批对立。

在 Hudi 侧的优化方面,因为流量数据入湖不更新,所以咱们采纳了 no index 模式,去掉 bucket assign 等过程,从而晋升写入速度。与此同时,Flink 增量 Clustering 上游的 ETL 提早,无明显增加。通过 Clustering 之后,数据开始变得有序。索引记录了行为事件的散布状况,能够通过条件查问,进行文件级别和数据块级别的过滤。除此之外,Flink、Spark 等引擎也反对 Hudi 表的谓词下推,进一步晋升了效率。在 Flink 对于 View 的反对方面,View 上游能够再去定义 Watermark,也能够在 View 上定义 with 属性等等。

通过架构调整和 Hudi 能力的联合,咱们加强了埋点治理的隔离性、时效性,也节约了资源。

接下来,讲一讲 BI 实时报表场景。在原先架构下,流量数据和 DB 数据导入数仓后,会进行 Join 打宽,聚合后将原来的计算结果输入到 MySQL 之类的存储。BI 报表会间接对接 MySQL,进行数据展现。另外一条离线链路,会进行 T-1 的修数兜底。

原先架构的痛点在于,实时和离线两条链路反复建设,计算存储老本高,开发运维老本高,口径解释老本高。Kafka 数据须要复制其余存储,能力进行查问,可观测性比拟弱。除此之外,Kafka 链路难做数据修复。Kafka 链路很难确定修复终点,通常应用 T-1 的形式进行修复。存在数据出仓孤岛等问题。

BI 实时报表场景个别没有秒级的时效性诉求,分钟级的时效性就能够满足诉求。咱们通过 Hudi 替换 Kafka,同时满足了实时和离线的诉求,实现流批对立,达到降本,数据口径失去对立。

Hudi 相比 Kafka,能够间接查问 Hudi 中的数据,比 Kafka 更容易、更不便的进行告警。

比方在 Kafka 上比照七天前的数据,做一个阈值告警。须要生产七天前的数据以及以后数据,进行计算当前,再进行告警。整个过程比较复杂。Hudi 的查问 SQL 和离线的查问 SQL 是统一的。对于 DQC 零碎来说,实时 DQC 和离线 DQC 的计划是对立的,开发成本较低。对于有秒级时效性要求的工作,还须要走 Kafka 链路。

除此之外,数据能够做到不出仓。BI 报表能够间接查问,对接查问的 Hudi 表,进行数据展现。

在理论应用过程中,也存在一些问题。间接对 Hudi 的明细表进行聚合查问时,查问工夫过长,存在读放大的问题。

假如实时 DQC 每五分钟统计近一个小时的数据,进行数据条数监控。五分钟会计算近一个小时的数据,下一个五分钟再计算近一个小时的数据。在滑动窗口的过程中,两头的数据会被计算好屡次,存在比较严重的 IO 放大。

除此之外,以 BI 报表场景为例。假如展现一个 DAU 曲线,每个点都是历史数据的累计值。1 点的数据就是 0 点~1 点数据的累计值。2 点的数据就是 0 点~2 点数据的累计值。在界面展现时,就须要计算 n 个点,每个点都会进行反复的计算,导致查问的工夫较长,存在读放大的问题。

除此之外,开发运维的老本较高。用户会在一个 BI 面板的界面,展现多个指标。可能是从同一张 Hudi 的明细表里,出的不同维度的数据。如果出十个指标,就须要开发和运维十个实时工作,开发和运维老本较高,可靠性较低。当一个实时工作出现异常,这个面板就会缺失一部分的指标。

咱们提出的优化计划是,通过 Flink+Hudi 构建 Projection 物化视图。通过 Flink State 状态,仅需摄取增量数据计算即可,防止读放大问题。将查问后果提前计算出来,间接查问后果数据,来达到减速查问的成果。

具体的流程是,用户提交一个查问 SQL 给 Excalibur Server,进行 SQL 解析。在解析过程中,会提交 Projection 创立,提交一个 Stream 工作,而后增量读取原始表中的数据,进行物化计算当前,再存储到 Projection 物化表中。当查问 SQL 命中物化规定时,就会改写查问,间接查问后果表,达到减速的成果。

通过扩大 Flink Batch SQL 的解析过程,查问时候会加载物化规定以及 Projection 的元数据。并且判断物化表以后 Watermark 物化进度。如果满足要求,则改写查问 Projection 物化表。

咱们参考 Calcite 的物化规定,减少了 TVF 的语法⽀持。

反对 Projection 的创立,用户提交批查问,能够通过在 select 语句上减少 hint,提醒查问引擎,该查问会进行复用。引擎会对该查问创立 Projection。

反对 Flink SQL 的 Projection DDL 语法以及 SQL 查问改写的规定。当用户提交批查问时,如果有对应的 Projection,就能进行改写。改写后能够间接应用 Projection 的后果,大大减速查问,可能做到秒级甚至毫秒级的响应。

Projection 的改写降级,是依据 Watermark 等指标,屏蔽掉 Projection 实时工作的提早和失败等问题,保障了查问后果的可靠性。咱们在 Hudi Meta 减少了 Watermark 数据处理进度信息。在数据写入的过程中,咱们会在 Commit Meta 中记录物化进度。在执行物化规定匹配时,如果落后以后工夫太多,就会回绝以后 Projection 改写,间接降级到原表,进行数据查问。

在 select 语句上减少 hint 创立,通过物化的能力达到减速查问的成果。岂但解决了读放大的问题,通过主动降级,也缩小了用户的开发和运维老本。

在将来,咱们会围绕 Projection 效率进行优化。回收长期无奈命中的 Projection。合并多个维度雷同的 Projection,升高 Projection 的计算成本。除此之外,咱们会和指标零碎对接,通过指标零碎缓存减速查问,满足一些高 QPS 场景的流计算。

之前在流式写入时,应用 Flink SQL,批量修数应用 Spark SQL,依然须要开发和运维两套 SQL。在 Hudi 实时数据库的计划下,咱们参考了离线修数计划。

历史分区重跑,应用 Flink Batch Overwrite,与离线修数形式是统一的。

以后分区重跑,咱们应用 Flink Stream Overwrite 的形式。比方须要将以后的分区数据进行清空删除,而后再进行写入。因为它是 no index 的形式去写入,所以它没有没有方法通过 update 的模式笼罩之前写入的数据。咱们通过扩大 Hudi Catalog,反对 Flink SQL 的形式,alter table drop partition 操作删除分区及数据。而后通过从新流式写入的形式,实现了 Flink Stream Overwrite。

当工具反对级联重跑工作后,咱们就能够从最源端的 ODS 层级,修复到最末端,不再须要开发运维 Spark T-1 修复工作。真正达到了流批一体的成果。

三、基建优化

在基建优化方面,咱们对 Table Service 进行优化。因为 Compaction、Clustering 等工作耗资源较多,和写入工作相互影响,导致写入的性能降落。咱们通过拆分 Table Service,通过独立资源运行,来解决这个问题。

咱们将 Compaction plan、Clustering plan 执行打算的生成过程,放在写入工作中,将真正执行 Compaction、Clustering 的 task 独立过程,进行执行。防止写入和 Table Service 相互影响,进步了写入性能。与此同时,反对了动静调整 Compaction plan 的策略,通过调整频次,缩小不必要的 IO。

Hudi Manager 用于规模化的治理托管,包含表服务托管,比方 Compaction、Clustering、Projection 工作托管独立运行,资源隔离,晋升写入稳定性。反对主动拉起,可批可流。

在表治理方面,是在建表时构建 Hudi 的源数据,取代第一次写入时构建的源数据,防止重要参数脱漏。比方将数据批量导入到 Hudi 时,不关怀 preCombine 比拟字段,初始化好了表的元数据。流式写入时,不会批改表的元数据。该匹配字段的缺失,会导致无奈失去正确的合并后果。

在策略配置方面,用户抉择 OLAP、ETL 场景时,能够主动配置不同的表服务的执行距离。比方上游的 ETL 场景是天级别调度。相比 OLAP 场景,咱们能够应用更低的 Compaction 频次。

如上图所示,咱们在理论应用的过程中,发现和解决了不少数据品质、稳定性、性能方面的问题,并做了功能性的加强,奉献给社区。涵盖了 Sink、Compaction、Clustering、Common 包、Source、Catalog 等若干方面。后面场景中提到的一些能力,咱们陆续也会以 PR 或者 RFC 的模式推给社区。

四、总结和瞻望

咱们在流量数据入湖、DB 数据入湖场景、报表场景以及流批一体,都做了一系列的实际。

接下来,咱们还会深刻数仓畛域,摸索通过物化工作缩小数仓分层,通过分层工作的剖析、诊断、优化,进行智能分层。使业务同学更加专一于数据的应用,加重数仓分层的工作量,向湖仓一体的方向演进。加强增量化计算,反对围绕 Hudi 的 Join ETL,在存储层优化 Join 的逻辑。摸索 Hudi 在 AI 畛域的使用。

在内核方面,咱们后续会加强 Hudi Meta Store,对立元数据管理;加强 Table Service;加强 Hudi Join 的列拼接能力。

点击查看原文视频 & 演讲 PPT


更多内容


流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
0 元试用 实时计算 Flink 版(5000CU* 小时,3 个月内)
理解流动详情:https://free.aliyun.com/?pipCode=sc

退出移动版