乐趣区

关于后端:尘锋信息基于-Apache-Paimon-的流批一体湖仓实践

尘锋信息基于 Apache Paimon 构建流批一体湖仓,次要分享:

  1. 整库入湖,TB 级数据近实时入湖
  2. 基于 Flink + Paimon 的数仓 批 ETL 建设
  3. 基于 Flink + Paimon 的数仓 流 ETL 建设
  4. 数仓 OLAP 与数据地图

点击进入 Apache Paimon 官网

一、尘锋信息介绍

尘锋信息 (www.dustess.com) 是基于企业微信生态的一站式私域经营治理解决方案供应商,致力于成为全行业首席私域经营与治理专家,帮忙企业构建数字时代私域经营治理新模式,助力企业实现高质量倒退。

尘锋有着弱小的研发技术团队,企业外部有着浓重的学习气氛,尤其是研发团队的技术学习气氛。晚期为产研团队开设独有的【尘锋公开课与微课堂】学习体系,次要以技术分享,最佳实际研究为主。前期更是成立了尘锋学院笼罩全公司的员工,包含但不限于通用技术、治理技能、产研技术、解决方案、行业案例、市场拓展等方面的常识分享与内容积淀,为公司全员提供跨区域、跨岗位、跨专业的学习平台。

通过两年多的疾速倒退,尘锋曾经成长为领有近千名员工的高新技术企业。

目前已在全国领有 13 个城市核心,笼罩华北、华中、华东、华南、东北五大区域,造成了贯通南北,辐射全国的城市服务网络,累计服务 30+ 行业的 10000+ 企业。

二、选型背景

2.1 老架构

如上图尘锋信息在 Paimon 之前有以下两套数据仓库。

离线数仓:

TiDB + HDFS + Yarn + Apache Hive + Apache Spark + Apache Doris

离线数仓用于笼罩批处理场景,笼罩业务场景次要是 T+1 和 小时级 提早的报表需要

痛点:

  1. 离线数仓提早过高,且批量从业务库拉取数据同步容易影响业务
  2. 基于 Hive 的离线数仓对于 CDC 采集 和 更新场景 治理建模有较大的侵入性,开发成本较高
  3. HDFS 相比于云厂商提供的对象存储,老本仍旧很高
  4. 私有化艰难,须要部署 Hadoop 整套生态,对于私有化数据量较小的单租户,硬件及保护老本过高

实时数仓:

Apache Kafka + Apache Flink + StarRocks + K8S

实时数仓用于笼罩流(Flink)和 微批(StarRocks),笼罩业务场景是 秒级(流)和 分钟(微批)低提早的高价值报表需要

痛点:

  1. 实时链路 SR 尽管有较好的流写能力,但不反对流读,不便于数仓依赖复用,每层之间应用 Apache Kakfa 对接,又造成较大的 开发保护老本
  2. 实时链路应用 SR 微批调度解决 会导致十分高的资源占用导致 OLAP 慢查 甚至稳定性问题
  3. SR 不反对 Overwrite 等批处理能力
  4. 与离线数仓割裂,造成数据孤岛

2.2 新架构需要

联合以上的痛点,咱们决定 Q1 进行数仓架构调整,咱们的业务需要次要有以下几点:

  1. 反对 T+1、小时级的批处理离线统计
  2. 准实时需要,提早能够在分钟级(要求入湖端到端提早管制在 1 分钟左右)
  3. 秒级提早的 实时需要,提早要求在秒级
  4. 存储成本低,存大量埋点和历史数据不肉疼
  5. 兼容私有化(整个环境不依赖 Hadoop、Hive 等比拟重的组件,升高部署运维老本)
  6. 可能疾速查问湖仓中的数据(OLAP)

联合业务需要,所以咱们对存储和计算引擎的需要如下:

  1. 较高的 CDC 摄入 及 更新能力
  2. 反对 批写、批读
  3. 反对 流写、流读
  4. 端到端提早 可能 在秒级
  5. 反对 OSS、S3、COS 等文件系统
  6. 反对 OLAP 引擎
  7. 社区沉闷

2.3 为什么抉择 Paimon

对 Paimon 进行了深刻的调研和验证,发现 Paimon 十分满足咱们的需要:

  1. 基于 LSM,具备很高的更新能力,默认的 Changelog 模型能够解决 CDC 采集的变更数据(实测入湖端到端提早能管制在 1 分钟左右)。另外 Paimon 反对 Append Only 模型,能够笼罩没有更新的日志场景,该模型在写入和读取时不必消耗资源解决更新,能够带来更高的读写性能和更低的资源耗费。
  2. 反对 批写、批读,并且反对(Flink、Spark、Hive 等多种批处理引擎)
  3. 反对 流写、流读(联合 Flink 的批处理,咱们心愿前期可能建设流批一体的数据仓库)
  4. Paimon 反对将一张表同时写入 Log System(如 kafka) 和 Lake Store (如 OSS 对象存储),联合 Log System 能够笼罩秒级提早的业务场景,并且解决了 Kafka 不可查问剖析的问题
  5. 反对 OSS、S3、COS 等文件系统,且反对 FileSystem catalog,能够齐全与 Hadoop、Hive 解耦
  6. 反对 Trino OLAP 引擎,实测 分组剖析 5 亿 200GB 数据,30 个 Bucket,可能在 10 秒内出后果(和社区沟通,还有优化空间),但满足目前需要。另外,Apache Doris 曾经开始对接 Paimon 格局,置信不久之后 Paimon 的 OLAP 生态会更加丰盛。
  7. 社区沉闷,从 2022 年初开源 至 2022 下半年,短短几个月,就曾经公布几个大版本。0.3 的性能曾经十分足够落地去解决一些生产问题,0.4 近期也快公布,0.4(Master) 目前咱们曾经用于生产,十分稳固。

尽管起步晚,然而后发优势非常明显,且没有历史包袱,形象解耦十分正当。相比 Hudi 等设计之初就捆绑 Spark 的背景,Paimon 一开始就定位反对多引擎,所以将来的后劲和扩大空间是微小的。

另外,社区活跃度上 PPMC 在社区群里直面用户,热心解答疑难,任何问题都会失去及时的回复。目前退出社区群的同学越来越多,咱们也心愿可能积极参与社区,帮忙 PPMC 们缩小累赘。

联合 Paimon,咱们 Q1 落地的湖仓一体架构如下:

三、整库入湖

3.1 实现步骤

3.1.1 Unisync 采集平台

基于 GO 语言开发,自研 Unisync 采集平台, 性能如下:

  1. 反对 CDC 增量采集多业务数据库(MongoDB、TiDB、MySQL),将不同类型的数据库日志格局进行对立,便于上游应用
  2. 反对 Batch 并行全量读取,且反对故障复原,防止过程中失败而从新拉取浪费时间
  3. 反对全量 和 增量采集主动切换,反对动静加表,加表时可指定是否增量
  4. 反对间接 Sink StarRocks、Doris、TiDB 等数据库
  5. 反对嵌入 Lua 脚本,能够进行无状态的 Map、FlatMap、Filter 等

3.1.2 Flink 采样程序

基于 Flink DatasSream API 开发,并通过 StreamPark 部署,性能如下:

  1. 生产 Kafka,将 Kafka 中的半结构化数据(MongoDB),进行解析,并将字段 – 类型保留至 State
  2. 有新增的字段主动退出 State 中,并将该条音讯补齐字段和类型,发送至上游算子
  3. 主动生成 逻辑 Kafka Table(见上图详解)
  4. 主动生成 Paimon Table 及 入湖 Flink SQL(依赖 Kafka Table 元数据信息,见上图详解)
  5. 入湖 Flink SQL 会将 Kafka Table 中的所有字段列出造成别名,主动应用 UDF 解决 dt 分区字段等等
  6. 另外有业务非常复杂的场景,能够在治理页面中,编辑生成的 Flink SQL,加强性能等等

3.1.3 Flink + Paimon 入湖程序

基于 Flink DataStream API + Paimon 0.3 开发,并通过 StreamPark 部署,性能如下:

  1. 每个 Flink Job 能够配置读取多个 Kafka Topic,并设置起始工夫 或者 Offset
  2. 程序外部依据 Kafka Topic 查问 MySQL,获取 Kafka Table 元数据信息
  3. 通过 DataStream API 读取 Kafka 失去 DataStream<JSONObject> 类型,

    通过表名,分流造成每个表独自的 DataStream<Row>

    通过 fromChangelogStream 将 DataStream<Row> 转换为 Flink Table 并注册 TemporaryView

    通过 Flink sql 不仅能够在入湖时做 Map Flatmap 甚至能够多流 Join、State 计算等

  4. 启动时 应用 Paimon 的 Flink Catalog API 依据 MySQL 中的 Paimon 建表语句创立表
  5. TabEnv 提交采样程序生成的入湖 Flink SQL

因为当初开发这套入湖程序时 Paimon 0.3 还不反对 JAVA API,所以工作节点会比拟多,不过实测增量入湖 50 张表,2TB 左右数据,分配内存 6GB,并发 2 能够稳固运行(2 分钟左右 checkpoint 距离)

Paimon 0.4 曾经反对 JAVA API,入湖的灵活性和功能性都会更加弱小,我司也正在跟进优化。

3.2 入湖实际论断

3.2.1 性能

Paimon 基于 LSM tree,对于流写的场景,Writer 算子实时接管 CDC 流,达到肯定阈值之后才 Sink 写入磁盘,当执行 checkpoint 时,Writer 算子和 commit 会解决合并,如果 bucket 设置不合理,则可能导致 checkpoint 超时(倡议一个 bucket 存 1GB 左右数据量)

  1. 全量整库入湖 80+ 表,近 2TB,全量写入阶段不解决更新,能够将 checkpoint 设置 4 分钟左右
  2. 对于全量重刷一张大表的状况,须要更新十分多的 分区 和 bucket,倡议将表 Drop 后再全量写入
  3. (下图)增量更新 150 + 字段,1.3 亿条(300GB 存量)数据的大维度表,分 40 个 bucket。如图,曾经更新近 4 亿次,增量 800GB,目前 checkpoint 放弃在 10 秒内。

资源:(2 并发、TaskManger 4GB 内存 2 slot,JobManager 1GB 内存)Paimon 基于 LSM tree 主动合并文件,基于上表曾经更新近 4 亿次 800GB 的状况下,大部分 bucket 内的文件数可能管制在 80 个内,不必放心小文件过多问题。

大维度表增量更新:

依照批改工夫排序:

3.2.2 稳定性

别离对一张 Append Only 日志表 和 Change Log 维度表进行增量稳定性测试(数据量适中)

资源配比都是是 1 个 TM 4GB 内存 2 slot

从截图能够看出,Paimon 的流写稳固十分高

Append-only 模型:

四、流批一体的数仓 ETL Pipeline

4.1 需要

  1. 满足 T+1 / 小时级 的离线数据批处理需要
  2. 满足 分钟级 的 准实时需要
  3. 满足 秒级的 实时需要
  4. 以上三种状况,业务 SQL 不应该做过多侵入,而只须要批改参数和资源占用,就能够进行升降级
  5. 湖仓中治理后的局部高价值数据,须要反对 批 和 流两种 模式写入 StarRocks / Doris /TiDB 等数据库

4.2 批

尘锋批处理次要用于笼罩 T +1 和 小时级的业务需要:

  1. 存储侧抉择 Paimon,因为 Paimon 反对 Append-only 和 changelog 两种模式,反对 insert overwrite insert into 两种写入形式。
  2. 计算引擎侧咱们抉择 Apache Flink,并联合 flink sql gateway + flink sql + DBT 来进行批 ETL 的开发和提交部署。

4.2.1 Paimon 批处理场景

Paimon 反对 Append only 模型,配合批笼罩写、批读,性能体现体现不亚于 Iceberg。因为咱们的更新场景较多,所以咱们更加关注 Changelog 模型的读写:

  1. 如上图,通过 Flink + Paimon 测试批读 Changelog 模型(MOR)220GB、一亿左右数据、20 并发,须要 3 分钟左右,每个 TM 1 slot,内存调配 2GB 左右

(留神:因为咱们应用测试的服务器是内存型 8C 64GB,所以该项测试数据并不是 Paimon 的最佳性能,实践 CPU 计算型服务器会更加杰出,提供以上数据供大家参考)

  1. ChangeLog 写入性能能够参考入湖侧。另外对于 Append only 不必解决更新,体现会更加杰出,非常适合 insert overwrite 等批笼罩场景
  2. Paimon 反对批模式 Partial Update,能够笼罩批增量 Join 场景

4.2.2 Flink sql gateway

为了满足流批一体的指标,咱们的批处理引擎也抉择次要应用 Apache Flink(以下简称 Flink)

Flink 1.16 的批处理能力失去十分大的改良,并且提供了 flink sql gateway 用于提交批作业(反对 rest endpoint 和 hiveserver2 endpoint)

Flink 1.17 近期曾经公布,批处理能力 和 sql gateway 进一步失去了增强,咱们曾经在生产测试。

抉择应用 flink sql gateway 进行批处理工作提交和治理的起因如下:

  1. sql gateway 具备交互式开发的能力,能够利用 Flink 生态丰盛的 connector,十分不便的读取 和 写入

    Paimon、SR、Doris、MySQL、TiDB、Kafka 等,甚至能够笼罩局部 OLAP 场景。用于数据开发场景,能够极大的升高 Flink sql 的应用门槛,晋升开发调试效率 和 升高保护老本

  2. sql gateway 反对对接 remote、yarn session、yarn per job(尽管曾经过期,但可在反对 Application mode 前临时应用)等多种工作提交形式。并且 sql gateway 能够依据业务场景部署多个,别离对应不同的 session 或 standalone。对于在私有化部署等场景,湖仓计划能够依据私有化用户的需要进行灵便低成本的部署。

sql-gateway.sh start -Dexecution.target=yarn-per-job

以后咱们生产应用基于 Flink 1.16 版本的 sql gateway 还有一些有余,于是为了更好的和 dbt 数据构建工具整合,咱们基于官网 hiveserver2 endpoint 实现 了 dustess_hiveserver2 endpoint,加强性能如下:

  1. 反对配置式内嵌多种 Catalog,如 Paimon、TiDB、SR、Doris、MySQL 等
  2. 反对配置式内嵌多种 Module,次要是咱们外部实现的 UDF 和 UDTF
  3. 批改默认语法为 Default(Flink)
  4. 扩大反对 Application mode(进行中)

4.2.3 dbt

咱们选用 dbt 作为数据构建工具的起因如下:

  1. 能够齐全用编写工程代码(如 Java、Go 等语言)的形式去构建数据仓库,所有的模型对立在 git 仓库,能够 review、PR、公布等流程管制,极大的进步模型复用率和防止烟囱开发。
  2. 数据开发只须要开发 select 语句,dbt 能够主动生成后果表构造,以及基于 yml 的模型正文,极大的进步了开发效率。并且 dbt 反对十分多的 宏 语句,能够将十分多的反复工作复用,并且对立和收敛口径。
  3. dbt 能够依据 source 和 ref 语法主动生成数据血统,且也能够通过命令生成模型文档

4.3 流

之前满足近实时需要

Paimon 满足近实时需要

Paimon 反对 流写 流读(ODS 全副应用 Flink 增量写入)

因为咱们业务库以 MongoDB 为主,有十分多的 JSON 嵌套字段,所以咱们有较多的单表 Flatmap 需要,并且咱们有十分多大量的不适宜工夫分区的大维度表,列多,更新频繁,于是非常适合用 流模式 来增量进行 Map 和 Flatmap

在 Paimon 之前,咱们将打平好的表写入 dwd 提供服务之后,如果上游的 dws 须要应用 dwd 间接聚合剖析,咱们采纳双写 Kafka + 结构化表的形式,这样带来的毛病是,开发简单,保护艰难,并且 Kafka 中的数据不可剖析,上游的排查会比拟麻烦。并且对于一些时效性要求不高的(比方分钟级提早)场景,应用 Kafka + 结构化表的老本切实太高,不是一个长久的计划

Paimon 反对流读,对于上述 Flatmap 后的 dwd 表,上游间接应用流读即可获取 dwd 的 changelog 流,时效性能够达到分钟级的提早,这样 ODS->DWD-DWS 的变更数据就在每层之间流动起来,齐全笼罩大部分准实时需要。

对于极少数的秒级需要,Paimon 反对 Log system(如 Kafka)+ Lake Store 的混合存储形式,并且可能做到逻辑及应用层面的对立,HybridSource 和 HybirdSink 外部主动解决从 Kafka 或 Lake Store 读写,极大的缩小了开发保护老本。

4.4 成果

ODS 的数据是应用 Flink 流式准实时写入,湖仓中 DWD 和 DWS 次要的治理需要为:

  1. Map、flatmap 转换(对于此场景,流和批的 SQL 完全一致,只须要做提交 sql 的模式配置)
  2. join 造成宽表(join 在流场景下复杂度要高于批,Paimon 提供了带有雷同 key 的局部列更新,lookup join 等升高复杂度和老本,在 sql 层面和批是统一的)
  3. 分组聚合计算(流利用 State 计算,然而 sql 和 批也是统一,只须要做流的参数配置即可,如流的 state ttl 配置等)

因为 Paimon 在存储侧实现批及流的对立,困扰 Flink 用户许久的流批决裂问题,曾经失去了根本性的解决

五、OLAP

Paimon 官网反对多种引擎,目前咱们应用 Trino 部署在 K8S 中 OLAP 剖析 Paimon,前端应用 Superset 等 BI 工具,能够满足绝大多数的外部剖析需要。

通过 Trino 读取 Iceberg VS Trino 读取 Paimon(都是 Append Only 模型),5 亿 200GB 维度表分组聚合,Iceberg 是 7 秒,Paimon 10 秒,两者的差距次要在读取性能,Iceberg 读取 ORC 有优化,而目前咱们的 Paimon 基于 ORC,Paimon 读取 Parquet 有优化,最近会应用 Parquet 进行测试。

如果是千万 或者 百万级的小表或分区,两者简直没有差距,并且社区正在踊跃的优化中。Paimon 的劣势是既能高效的更新数据,又能高效读取,十分全面。

六、数据地图

后面有提到 Paimon 反对 FileSystem catalog,咱们在一个 Spring boot + Mybatis 的 JAVA WEB 我的项目中,嵌入 Paimon Catalog API,反对定时和手动同步元数据信息进 MySQL 中,配合前端页面进行数据备注、检索、指标治理等

七、将来布局

7.1 sql gateway 降级

  1. 反对 application mode

    目前应用批处理工作应用 dbt 通过 flink sql gateway 提交作业

    目前 Flink sql gateway 反对 yarn session 和 yarn per job 两种部署模式,目前有以下问题

    • yarn session 启动须要动态指定 JobManger 和 TaskManger 的内存,不能依据提交的 SQL 做针对性调优,存在稳定性不佳 或 资源利用率不高的问题
    • yarn per job 能够在向 sql gateway 提交时通过 set 语法设置各项内存值,然而 per job 曾经过期,且存在单点问题容易导致 sql gateway 不稳固。

如上:咱们前期会逐渐实现 sql gateway 的 Application mode,用于解决以上问题,目前正在进行中。

  1. 反对流工作生命周期保护和治理

    目前咱们的流工作,尽管能够通过 dbt 编写 sql,且通过 sql gateway 提交至集群运行(通过 set ‘execution.runtime-mode’=’streaming’)

    但流工作不同于执行实现即退出的批模式,须要在调度层,兼容流的监控和治理 , 也须要 sql gateway 具备工作查看,工作治理,异样报警等流工作生命周期治理能力

7.2 Log system 生产联合应用

Paimon 反对 Log system + Lake Store 混合存储,在元数据层面对立,能够笼罩数据新鲜度很高的业务场景。

目前咱们有大量基于 Kafka + Flink + StarRocks 的实时工作及报表,也存在离线和实时的两条开发链路。将来咱们筹备利用 Log system 进一步生产解决离线和实时割裂的问题。

八、总结

以上就是 Apache Paimon 在尘锋的批流一体湖仓实际分享的全部内容,感激大家浏览到这里。

从今年初开始调研湖存储(Paimon、Hudi、Iceberg),到抉择 Paimon,到现在咱们曾经生产入湖上百张表,笼罩了大量业务。非常感谢 Apache Paimon 社区给予的帮忙,并由衷感谢 PPMC 之信老师急躁、疾速、仔细的解答和领导,帮忙咱们疾速解决每次遇到的问题。

0.4 版本也行将公布,在这里心愿 Paimon 越来越好,也心愿之后可能多为 Paimon 奉献本人的一份力量。

九、Paimon 信息

Apache Paimon 官网:https://paimon.apache.org/

Apache Piamon Github:https://github.com/apache/incubator-paimon

Apache Paimon 钉钉交换群:10880001919

点击进入 Apache Paimon 官网


更多内容


流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
0 元试用 实时计算 Flink 版(5000CU* 小时,3 个月内)
理解流动详情:https://free.aliyun.com/?pipCode=sc

退出移动版