尘锋信息基于 Apache Paimon 构建流批一体湖仓,次要分享:
- 整库入湖,TB 级数据近实时入湖
- 基于 Flink + Paimon 的数仓 批 ETL 建设
- 基于 Flink + Paimon 的数仓 流 ETL 建设
- 数仓 OLAP 与数据地图
点击进入 Apache Paimon 官网
一、尘锋信息介绍
尘锋信息 (www.dustess.com) 是基于企业微信生态的一站式私域经营治理解决方案供应商,致力于成为全行业首席私域经营与治理专家,帮忙企业构建数字时代私域经营治理新模式,助力企业实现高质量倒退。
尘锋有着弱小的研发技术团队,企业外部有着浓重的学习气氛,尤其是研发团队的技术学习气氛。晚期为产研团队开设独有的【尘锋公开课与微课堂】学习体系,次要以技术分享,最佳实际研究为主。前期更是成立了尘锋学院笼罩全公司的员工,包含但不限于通用技术、治理技能、产研技术、解决方案、行业案例、市场拓展等方面的常识分享与内容积淀,为公司全员提供跨区域、跨岗位、跨专业的学习平台。
通过两年多的疾速倒退,尘锋曾经成长为领有近千名员工的高新技术企业。
目前已在全国领有13个城市核心,笼罩华北、华中、华东、华南、东北五大区域,造成了贯通南北,辐射全国的城市服务网络,累计服务30+行业的10000+企业。
二、选型背景
2.1 老架构
如上图尘锋信息在Paimon之前有以下两套数据仓库。
离线数仓:
TiDB + HDFS + Yarn + Apache Hive + Apache Spark + Apache Doris
离线数仓用于笼罩批处理场景 ,笼罩业务场景次要是 T+1 和 小时级 提早的报表需要
痛点:
- 离线数仓提早过高,且批量从业务库拉取数据同步容易影响业务
- 基于 Hive 的离线数仓对于 CDC采集 和 更新场景 治理建模有较大的侵入性,开发成本较高
- HDFS 相比于云厂商提供的对象存储,老本仍旧很高
- 私有化艰难,须要部署 Hadoop 整套生态,对于私有化数据量较小的单租户,硬件及保护老本过高
实时数仓:
Apache Kafka + Apache Flink + StarRocks + K8S
实时数仓用于笼罩流(Flink) 和 微批(StarRocks),笼罩业务场景是 秒级 (流) 和 分钟(微批)低提早的高价值报表需要
痛点:
- 实时链路 SR 尽管有较好的流写能力,但不反对流读,不便于数仓依赖复用,每层之间应用Apache Kakfa对接,又造成较大的 开发保护老本
- 实时链路应用SR微批调度解决 会导致十分高的资源占用导致 OLAP 慢查 甚至稳定性问题
- SR 不反对Overwrite 等批处理能力
- 与离线数仓割裂,造成数据孤岛
2.2 新架构需要
联合以上的痛点,咱们决定Q1进行数仓架构调整,咱们的业务需要次要有以下几点:
- 反对 T+1 、小时级的批处理离线统计
- 准实时需要 ,提早能够在分钟级 (要求入湖端到端提早管制在 1分钟左右)
- 秒级提早的 实时需要 ,提早要求在秒级
- 存储成本低,存大量埋点和历史数据不肉疼
- 兼容私有化 (整个环境不依赖 Hadoop 、Hive 等比拟重的组件,升高部署运维老本)
- 可能疾速查问湖仓中的数据(OLAP)
联合业务需要,所以咱们对存储和计算引擎的需要如下:
- 较高的 CDC 摄入 及 更新能力
- 反对 批写 、批读
- 反对 流写 、流读
- 端到端提早 可能 在秒级
- 反对 OSS 、S3、COS 等文件系统
- 反对 OLAP 引擎
- 社区沉闷
2.3 为什么抉择 Paimon
对Paimon进行了深刻的调研和验证,发现Paimon 十分满足咱们的需要:
- 基于LSM ,具备很高的更新能力,默认的 Changelog 模型能够解决 CDC 采集的变更数据(实测入湖端到端提早能管制在 1分钟左右)。另外Paimon 反对 Append Only 模型,能够笼罩没有更新的日志场景,该模型在写入和读取时不必消耗资源解决更新,能够带来更高的读写性能和更低的资源耗费。
- 反对 批写 、批读 ,并且反对 (Flink、Spark、Hive 等多种批处理引擎)
- 反对 流写、流读 (联合Flink 的批处理,咱们心愿前期可能建设流批一体的数据仓库)
- Paimon 反对将一张表同时写入 Log System(如 kafka) 和 Lake Store (如 OSS 对象存储),联合 Log System 能够笼罩秒级提早的业务场景,并且解决了 Kafka 不可查问剖析的问题
- 反对 OSS 、S3、COS 等文件系统 ,且反对 FileSystem catalog ,能够齐全与 Hadoop 、Hive 解耦
- 反对 Trino OLAP 引擎,实测 分组剖析 5亿 200GB 数据,30 个Bucket,可能在10秒内出后果 (和社区沟通,还有优化空间),但满足目前需要。另外,Apache Doris 曾经开始对接 Paimon格局,置信不久之后Paimon的OLAP生态会更加丰盛。
- 社区沉闷,从2022年初开源 至 2022下半年,短短几个月,就曾经公布几个大版本。0.3 的性能曾经十分足够落地去解决一些生产问题,0.4 近期也快公布,0.4(Master) 目前咱们曾经用于生产,十分稳固。
尽管起步晚,然而后发优势非常明显,且没有历史包袱,形象解耦十分正当。相比 Hudi等设计之初就捆绑 Spark 的背景,Paimon 一开始就定位反对多引擎,所以将来的后劲和扩大空间是微小的。
另外 ,社区活跃度上 PPMC 在社区群里直面用户,热心解答疑难,任何问题都会失去及时的回复。目前退出社区群的同学越来越多,咱们也心愿可能积极参与社区,帮忙PPMC们缩小累赘。
联合 Paimon ,咱们Q1 落地的湖仓一体架构如下:
三、整库入湖
3.1 实现步骤
3.1.1 Unisync 采集平台
基于 GO 语言开发,自研 Unisync 采集平台, 性能如下:
- 反对 CDC 增量采集多业务数据库(MongoDB 、TiDB 、MySQL),将不同类型的数据库日志格局进行对立,便于上游应用
- 反对 Batch 并行全量读取,且反对故障复原,防止过程中失败而从新拉取浪费时间
- 反对全量 和 增量采集主动切换 ,反对动静加表,加表时可指定是否增量
- 反对间接 Sink StarRocks 、Doris 、TiDB 等数据库
- 反对嵌入Lua脚本,能够进行无状态的 Map 、FlatMap 、Filter 等
3.1.2 Flink 采样程序
基于 Flink DatasSream API 开发 ,并通过 StreamPark 部署,性能如下:
- 生产Kafka ,将Kafka 中的半结构化数据(MongoDB) ,进行解析,并将字段 - 类型保留至 State
- 有新增的字段主动退出State中,并将该条音讯补齐字段和类型,发送至上游算子
- 主动生成 逻辑 Kafka Table (见上图详解)
- 主动生成 Paimon Table 及 入湖 Flink SQL (依赖 Kafka Table 元数据信息,见上图详解)
- 入湖 Flink SQL 会将 Kafka Table 中的所有字段列出造成别名,主动应用UDF解决 dt 分区字段等等
- 另外有业务非常复杂的场景,能够在治理页面中,编辑生成的 Flink SQL,加强性能等等
3.1.3 Flink + Paimon 入湖程序
基于 Flink DataStream API + Paimon 0.3 开发,并通过 StreamPark 部署,性能如下:
- 每个Flink Job 能够配置读取多个 Kafka Topic ,并设置起始工夫 或者 Offset
- 程序外部依据 Kafka Topic 查问 MySQL ,获取 Kafka Table 元数据信息
通过 DataStream API 读取 Kafka 失去 DataStream<JSONObject> 类型,
通过表名,分流造成每个表独自的 DataStream<Row>
通过 fromChangelogStream 将 DataStream<Row> 转换为 Flink Table 并注册 TemporaryView
通过 Flink sql 不仅能够在入湖时做 Map Flatmap 甚至能够多流 Join 、State计算等
- 启动时 应用 Paimon 的 Flink Catalog API 依据MySQL 中的Paimon建表语句创立表
- TabEnv 提交采样程序生成的入湖 Flink SQL
因为当初开发这套入湖程序时Paimon 0.3 还不反对 JAVA API ,所以工作节点会比拟多,不过实测增量入湖50张表,2TB 左右数据,分配内存6GB ,并发 2 能够稳固运行 (2分钟左右checkpoint距离)
Paimon 0.4 曾经反对 JAVA API ,入湖的灵活性和功能性都会更加弱小,我司也正在跟进优化。
3.2 入湖实际论断
3.2.1 性能
Paimon 基于 LSM tree ,对于流写的场景,Writer 算子实时接管CDC 流,达到肯定阈值之后才Sink 写入磁盘,当执行checkpoint 时,Writer 算子和 commit 会解决合并,如果 bucket设置不合理,则可能导致checkpoint 超时 (倡议一个 bucket 存 1GB 左右数据量)
- 全量整库入湖 80+ 表,近 2TB ,全量写入阶段不解决更新,能够将checkpoint 设置4分钟左右
- 对于全量重刷一张大表的状况,须要更新十分多的 分区 和 bucket ,倡议将表Drop后再全量写入
- (下图)增量更新 150 + 字段 ,1.3 亿条(300GB存量)数据的大维度表 ,分40个 bucket 。如图,曾经更新近 4亿次,增量800GB,目前 checkpoint 放弃在10秒内。
资源:( 2 并发 、TaskManger 4GB 内存 2 slot ,JobManager 1GB 内存 ) Paimon 基于 LSM tree 主动合并文件,基于上表曾经更新近 4亿次 800GB 的状况下,大部分bucket 内的文件数可能管制在 80个内,不必放心小文件过多问题。
大维度表增量更新:
依照批改工夫排序:
3.2.2 稳定性
别离对一张 Append Only 日志表 和 Change Log 维度表进行增量稳定性测试 (数据量适中)
资源配比都是是 1个 TM 4GB 内存 2 slot
从截图能够看出,Paimon 的流写稳固十分高
Append-only 模型:
四、流批一体的数仓 ETL Pipeline
4.1 需要
- 满足 T+1 / 小时级 的离线数据批处理需要
- 满足 分钟级 的 准实时需要
- 满足 秒级的 实时需要
- 以上三种状况,业务SQL 不应该做过多侵入,而只须要批改参数和资源占用,就能够进行升降级
- 湖仓中治理后的局部高价值数据,须要反对 批 和 流两种 模式写入 StarRocks / Doris /TiDB 等数据库
4.2 批
尘锋批处理次要用于笼罩T+1 和 小时级的业务需要:
- 存储侧抉择 Paimon ,因为 Paimon 反对 Append-only 和 changelog 两种模式,反对 insert overwrite insert into 两种写入形式 。
- 计算引擎侧咱们抉择 Apache Flink ,并联合 flink sql gateway + flink sql + DBT 来进行批 ETL 的开发和提交部署。
4.2.1 Paimon 批处理场景
Paimon 反对 Append only 模型 ,配合批笼罩写、批读 ,性能体现体现不亚于 Iceberg 。因为咱们的更新场景较多,所以咱们更加关注 Changelog 模型的读写:
- 如上图,通过 Flink + Paimon 测试批读 Changelog模型(MOR) 220GB 、 一亿左右数据 、 20 并发 ,须要 3分钟左右,每个 TM 1 slot ,内存调配2GB 左右
(留神:因为咱们应用测试的服务器是内存型 8C 64GB,所以该项测试数据并不是 Paimon 的最佳性能,实践 CPU 计算型服务器会更加杰出,提供以上数据供大家参考)
- ChangeLog 写入性能能够参考入湖侧。另外对于 Append only 不必解决更新,体现会更加杰出,非常适合 insert overwrite 等批笼罩场景
- Paimon 反对批模式 Partial Update ,能够笼罩批增量 Join 场景
4.2.2 Flink sql gateway
为了满足流批一体的指标,咱们的批处理引擎也抉择次要应用 Apache Flink (以下简称 Flink )
Flink 1.16 的批处理能力失去十分大的改良 ,并且提供了 flink sql gateway 用于提交批作业(反对 rest endpoint 和 hiveserver2 endpoint )
Flink 1.17 近期曾经公布,批处理能力 和 sql gateway 进一步失去了增强,咱们曾经在生产测试。
抉择应用 flink sql gateway 进行批处理工作提交和治理的起因如下:
sql gateway 具备交互式开发的能力,能够利用Flink 生态丰盛的 connector,十分不便的读取 和 写入
Paimon 、SR、Doris、MySQL、TiDB 、Kafka 等, 甚至能够笼罩局部OLAP 场景。用于数据开发场景,能够极大的升高 Flink sql 的应用门槛 ,晋升开发调试效率 和 升高保护老本
- sql gateway 反对对接 remote 、yarn session、yarn per job(尽管曾经过期,但可在反对 Application mode前临时应用)等多种工作提交形式。并且 sql gateway 能够依据业务场景部署多个,别离对应不同的 session 或 standalone。对于在私有化部署等场景,湖仓计划能够依据私有化用户的需要进行灵便低成本的部署。
sql-gateway.sh start -Dexecution.target=yarn-per-job
以后咱们生产应用基于Flink 1.16版本的 sql gateway还有一些有余,于是为了更好的和 dbt 数据构建工具整合,咱们基于官网 hiveserver2 endpoint 实现 了 dustess_hiveserver2 endpoint ,加强性能如下:
- 反对配置式内嵌多种 Catalog ,如 Paimon 、TiDB、SR、Doris、MySQL 等
- 反对配置式内嵌多种 Module ,次要是咱们外部实现的 UDF 和 UDTF
- 批改默认语法为 Default (Flink)
- 扩大反对 Application mode (进行中)
4.2.3 dbt
咱们选用dbt 作为数据构建工具的起因如下:
- 能够齐全用编写工程代码 (如 Java 、Go等语言)的形式去构建数据仓库,所有的模型对立在 git 仓库,能够review 、PR 、公布等流程管制,极大的进步模型复用率和防止烟囱开发 。
- 数据开发只须要开发 select 语句,dbt 能够主动生成后果表构造,以及基于yml 的模型正文,极大的进步了开发效率 。并且dbt 反对十分多的 宏 语句,能够将十分多的反复工作复用,并且对立和收敛口径。
- dbt 能够依据 source 和 ref 语法主动生成数据血统,且也能够通过命令生成模型文档
4.3 流
之前满足近实时需要
Paimon 满足近实时需要
Paimon 反对 流写 流读 (ODS 全副应用Flink 增量写入)
因为咱们业务库以MongoDB 为主,有十分多的 JSON 嵌套字段,所以咱们有较多的单表 Flatmap 需要,并且咱们有十分多大量的不适宜工夫分区的大维度表,列多,更新频繁,于是非常适合用 流模式 来增量进行 Map 和 Flatmap
在Paimon之前,咱们将打平好的表写入 dwd 提供服务之后,如果上游的 dws 须要应用 dwd 间接聚合剖析,咱们采纳双写 Kafka + 结构化表的形式,这样带来的毛病是 ,开发简单,保护艰难,并且 Kafka 中的数据不可剖析,上游的排查会比拟麻烦。并且对于一些时效性要求不高的(比方分钟级提早)场景,应用Kafka + 结构化表的老本切实太高,不是一个长久的计划
Paimon 反对流读,对于上述Flatmap后的dwd 表,上游间接应用流读即可获取 dwd 的changelog 流,时效性能够达到分钟级的提早,这样 ODS->DWD-DWS 的变更数据就在每层之间流动起来,齐全笼罩大部分准实时需要。
对于极少数的秒级需要,Paimon 反对 Log system (如Kafka ) + Lake Store 的混合存储形式,并且可能做到逻辑及应用层面的对立,HybridSource 和 HybirdSink 外部主动解决从 Kafka 或 Lake Store 读写 ,极大的缩小了开发保护老本。
4.4 成果
ODS的数据是应用Flink流式准实时写入,湖仓中DWD和DWS次要的治理需要为:
- Map、flatmap转换(对于此场景,流和批的SQL完全一致,只须要做提交sql的模式配置)
- join 造成宽表 (join在流场景下复杂度要高于批,Paimon提供了带有雷同key的局部列更新,lookup join等升高复杂度和老本,在sql层面和批是统一的)
- 分组聚合计算 (流利用 State 计算,然而sql 和 批也是统一,只须要做流的参数配置即可,如流的state ttl 配置等)
因为 Paimon 在存储侧实现批及流的对立,困扰Flink用户许久的流批决裂问题,曾经失去了根本性的解决
五、OLAP
Paimon 官网反对多种引擎 ,目前咱们应用 Trino 部署在 K8S 中 OLAP 剖析Paimon ,前端应用Superset 等BI 工具,能够满足绝大多数的外部剖析需要。
通过Trino 读取 Iceberg VS Trino 读取Paimon(都是 Append Only模型) ,5亿 200GB 维度表分组聚合 ,Iceberg 是 7秒 ,Paimon 10秒,两者的差距次要在读取性能,Iceberg读取ORC 有优化,而目前咱们的Paimon 基于ORC ,Paimon读取 Parquet有优化,最近会应用Parquet进行测试。
如果是千万 或者 百万级的小表或分区,两者简直没有差距,并且社区正在踊跃的优化中。Paimon的劣势是既能高效的更新数据,又能高效读取,十分全面。
六、数据地图
后面有提到 Paimon 反对 FileSystem catalog ,咱们在一个 Spring boot + Mybatis 的JAVA WEB 我的项目中,嵌入 Paimon Catalog API ,反对定时和手动同步元数据信息进MySQL 中,配合前端页面进行数据备注、检索、指标治理等
七、将来布局
7.1 sql gateway 降级
反对 application mode
目前应用批处理工作应用 dbt 通过 flink sql gateway 提交作业
目前Flink sql gateway 反对 yarn session 和 yarn per job 两种部署模式,目前有以下问题
- yarn session 启动须要动态指定JobManger 和 TaskManger 的内存 ,不能依据提交的SQL 做针对性调优,存在稳定性不佳 或 资源利用率不高的问题
- yarn per job 能够在向 sql gateway 提交时通过 set 语法设置各项内存值,然而 per job 曾经过期,且存在单点问题容易导致 sql gateway 不稳固。
如上:咱们前期会逐渐实现 sql gateway 的 Application mode,用于解决以上问题,目前正在进行中。
反对流工作生命周期保护和治理
目前咱们的流工作,尽管能够通过 dbt 编写sql ,且通过 sql gateway 提交至集群运行 (通过 set 'execution.runtime-mode'='streaming' )
但流工作不同于执行实现即退出的批模式,须要在调度层,兼容流的监控和治理 , 也须要 sql gateway 具备工作查看,工作治理,异样报警等流工作生命周期治理能力
7.2 Log system 生产联合应用
Paimon 反对 Log system + Lake Store 混合存储,在元数据层面对立,能够笼罩数据新鲜度很高的业务场景。
目前咱们有大量基于 Kafka + Flink + StarRocks 的实时工作及报表,也存在离线和实时的两条开发链路。将来咱们筹备利用 Log system 进一步生产解决离线和实时割裂的问题。
八、总结
以上就是 Apache Paimon 在尘锋的批流一体湖仓实际分享的全部内容,感激大家浏览到这里。
从今年初开始调研湖存储 (Paimon 、Hudi 、Iceberg ),到抉择Paimon ,到现在咱们曾经生产入湖上百张表 ,笼罩了大量业务。非常感谢 Apache Paimon 社区给予的帮忙,并由衷感谢 PPMC 之信老师急躁、疾速、仔细的解答和领导,帮忙咱们疾速解决每次遇到的问题 。
0.4 版本也行将公布,在这里心愿 Paimon 越来越好,也心愿之后可能多为 Paimon 奉献本人的一份力量。
九、Paimon 信息
Apache Paimon 官网:https://paimon.apache.org/
Apache Piamon Github:https://github.com/apache/incubator-paimon
Apache Paimon 钉钉交换群:10880001919
点击进入 Apache Paimon 官网
更多内容
流动举荐
阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启流动:
0 元试用 实时计算 Flink 版(5000CU*小时,3 个月内)
理解流动详情:https://free.aliyun.com/?pipCode=sc