共计 14872 个字符,预计需要花费 38 分钟才能阅读完成。
本文介绍了 Flink Hudi 通过流计算对原有基于 mini-batch 的增量计算模型一直优化演进。用户能够通过 Flink SQL 将 CDC 数据实时写入 Hudi 存储,且在行将公布的 0.9 版本 Hudi 原生反对 CDC format。次要内容为:
- 背景
- 增量 ETL
- 演示
一、背景
近实时
从 2016 年开始,Apache Hudi 社区就开始通过 Hudi 的 UPSERT 能力摸索近实时场景的应用案例 [1]。通过 MR/Spark 的批处理模型,用户能够实现小时级别的数据注入 HDFS/OSS。在纯实时场景,用户通过流计算引擎 Flink + KV/OLAP 存储的架构能够实现端到端的秒级 (5 分钟级) 实时剖析。然而在秒级 (5 分钟级) 到小时级时的场景还存在大量的用例,咱们称之为 NEAR-REAL-TIME (近实时)。
在实践中有大量的案例都属于近实时的领域:
- 分钟级别的大屏;
- 各种 BI 剖析 (OLAP);
- 机器学习分钟级别的特征提取。
增量计算
解决近实时的计划以后是比拟凋谢的。
- 流解决的时延低,然而 SQL 的 pattern 比拟固定,查问端的能力(索引、ad hoc)欠缺;
- 批处理的数仓能力丰盛然而数据时延大。
于是 Hudi 社区提出基于 mini-batch 的增量计算模型:
增量 数据集 => 增量 计算结果 merge 已存后果 => 外存
这套模型通过湖存储的 snapshot 拉取增量的数据集 (两个 commits 之前的数据集),通过 Spark/Hive 等批处理框架计算增量的后果 (比方简略的 count) 再 merge 到已存后果中。
外围问题
增量模型须要解决的外围问题:
- UPSERT 能力:相似 KUDU 和 Hive ACID,Hudi 也提供了分钟级的更新能力;
- 增量生产:Hudi 通过湖存储的多 snapshots 提供增量拉取。
基于 mini-batch 的增量计算模型能够晋升局部场景的时延、节俭计算成本,但有一个很大的限度:对 SQL 的 pattern 有要求。因为计算走的是批,批计算自身不保护状态,这就要求计算的指标可能比拟不便地 merge,简略的 count、sum 能够做,然而 avg、count distinct 这些还是须要拉取全量数据重算。
随着流计算和实时数仓的遍及,Hudi 社区也在踊跃的拥抱变动,通过流计算对原有基于 mini-batch 的增量计算模型一直优化演进:在 0.7 版本引入了流式数据入湖,在 0.9 版本反对了原生的 CDC format。
二、增量 ETL
DB 数据入湖
随着 CDC 技术的成熟,debezium 这样的 CDC 工具越来越风行,Hudi 社区也先后集成了流写,流读的能力。用户能够通过 Flink SQL 将 CDC 数据实时写入 Hudi 存储:
- 用户既能够通过 Flink CDC connector 间接将 DB 数据导入 Hudi;
- 也能够先将 CDC 数据导入 Kafka,再通过 Kafka connector 导入 Hudi。
第二种计划的容错和扩展性会好一些。
数据湖 CDC
在行将公布的 0.9 版本,Hudi 原生反对 CDC format,一条 record 的所有变更记录都能够保留,基于此,Hudi 和流计算零碎联合的更加欠缺,能够流式读取 CDC 数据 [2]:
源头 CDC 流的所有音讯变更都在入湖之后保留下来,被用于流式生产。Flink 的有状态计算实时累加计算结果 (state),通过流式写 Hudi 将计算的变更同步到 Hudi 湖存储,之后持续对接 Flink 流式生产 Hudi 存储的 changelog,实现下一层级的有状态计算。近实时端到端 ETL pipeline:
这套架构将端到端的 ETL 时延缩短到分钟级,并且每一层的存储格局都能够通过 compaction 压缩成列存(Parquet、ORC)以提供 OLAP 剖析能力,因为数据湖的开放性,压缩后的格局能够对接各种查问引擎:Flink、Spark、Presto、Hive 等。
一张 Hudi 数据湖表具备两种状态:
- 表状态:查问最新的快照后果,同时提供高效的列存格局
- 流状态:流式生产变更,能够指定任意点位流读之后的 changelog
三、演示
咱们通过一段 Demo 演示 Hudi 表的两种状态。
环境筹备
- Flink SQL Client
- Hudi master 打包
hudi-flink-bundle
jar - Flink 1.13.1
这里提前准备一段 debezium-json 格局的 CDC 数据
{"before":null,"after":{"id":101,"ts":1000,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}
{"before":null,"after":{"id":102,"ts":2000,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":null,"after":{"id":103,"ts":3000,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":null,"after":{"id":104,"ts":4000,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":null,"after":{"id":105,"ts":5000,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":null,"after":{"id":106,"ts":6000,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":null,"after":{"id":107,"ts":7000,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":null,"after":{"id":108,"ts":8000,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":null,"after":{"id":109,"ts":9000,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":{"id":106,"ts":6000,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"after":{"id":106,"ts":10000,"name":"hammer","description":"18oz carpenter hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589361987000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":362,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589361987936,"transaction":null}
{"before":{"id":107,"ts":7000,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"after":{"id":107,"ts":11000,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362099000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362099505,"transaction":null}
{"before":null,"after":{"id":110,"ts":12000,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362210000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1068,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362210230,"transaction":null}
{"before":null,"after":{"id":111,"ts":13000,"name":"scooter","description":"Big 2-wheel scooter","weight":5.179999828338623},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362243000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1394,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362243428,"transaction":null}
{"before":{"id":110,"ts":12000,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"after":{"id":110,"ts":14000,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362293000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1707,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362293539,"transaction":null}
{"before":{"id":111,"ts":13000,"name":"scooter","description":"Big 2-wheel scooter","weight":5.179999828338623},"after":{"id":111,"ts":15000,"name":"scooter","description":"Big 2-wheel scooter","weight":5.170000076293945},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362330000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2090,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362330904,"transaction":null}
{"before":{"id":111,"ts":16000,"name":"scooter","description":"Big 2-wheel scooter","weight":5.170000076293945},"after":null,"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362344000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2443,"row":0,"thread":2,"query":null},"op":"d","ts_ms":1589362344455,"transaction":null}
通过 Flink SQL Client 创立表用来读取 CDC 数据文件
Flink SQL> CREATE TABLE debezium_source(
> id INT NOT NULL,
> ts BIGINT,
> name STRING,
> description STRING,
> weight DOUBLE
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = '/Users/chenyuzhao/workspace/hudi-demo/source.data',
> 'format' = 'debezium-json'
> );
[INFO] Execute statement succeed.
执行 SELECT 察看后果,能够看到一共有 20 条记录,两头有一些 UPDATE s,最初一条音讯是 DELETE
Flink SQL> select * from debezium_source;
+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+
| op | id | ts | name | description | weight |
+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+
| +I | 101 | 1000 | scooter | Small 2-wheel scooter | 3.140000104904175 |
| +I | 102 | 2000 | car battery | 12V car battery | 8.100000381469727 |
| +I | 103 | 3000 | 12-pack drill bits | 12-pack of drill bits with ... | 0.800000011920929 |
| +I | 104 | 4000 | hammer | 12oz carpenter's hammer | 0.75 |
| +I | 105 | 5000 | hammer | 14oz carpenter's hammer | 0.875 |
| +I | 106 | 6000 | hammer | 16oz carpenter's hammer | 1.0 |
| +I | 107 | 7000 | rocks | box of assorted rocks | 5.300000190734863 |
| +I | 108 | 8000 | jacket | water resistent black wind ... | 0.10000000149011612 |
| +I | 109 | 9000 | spare tire | 24 inch spare tire | 22.200000762939453 |
| -U | 106 | 6000 | hammer | 16oz carpenter's hammer | 1.0 |
| +U | 106 | 10000 | hammer | 18oz carpenter hammer | 1.0 |
| -U | 107 | 7000 | rocks | box of assorted rocks | 5.300000190734863 |
| +U | 107 | 11000 | rocks | box of assorted rocks | 5.099999904632568 |
| +I | 110 | 12000 | jacket | water resistent white wind ... | 0.20000000298023224 |
| +I | 111 | 13000 | scooter | Big 2-wheel scooter | 5.179999828338623 |
| -U | 110 | 12000 | jacket | water resistent white wind ... | 0.20000000298023224 |
| +U | 110 | 14000 | jacket | new water resistent white w... | 0.5 |
| -U | 111 | 13000 | scooter | Big 2-wheel scooter | 5.179999828338623 |
| +U | 111 | 15000 | scooter | Big 2-wheel scooter | 5.170000076293945 |
| -D | 111 | 16000 | scooter | Big 2-wheel scooter | 5.170000076293945 |
+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+
Received a total of 20 rows
创立 Hudi 表,这里设置表的状态为 MERGE_ON_READ
并且关上 changelog 模式属性 changelog.enabled
Flink SQL> CREATE TABLE hoodie_table(
> id INT NOT NULL PRIMARY KEY NOT ENFORCED,
> ts BIGINT,
> name STRING,
> description STRING,
> weight DOUBLE
> ) WITH (
> 'connector' = 'hudi',
> 'path' = '/Users/chenyuzhao/workspace/hudi-demo/t1',
> 'table.type' = 'MERGE_ON_READ',
> 'changelog.enabled' = 'true',
> 'compaction.async.enabled' = 'false'
> );
[INFO] Execute statement succeed.
查问
通过 INSERT 语句将数据导入 Hudi,开启流读模式,并执行查问察看后果
Flink SQL> select * from hoodie_table/*+ OPTIONS('read.streaming.enabled'='true')*/;
+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+
| op | id | ts | name | description | weight |
+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+
| +I | 101 | 1000 | scooter | Small 2-wheel scooter | 3.140000104904175 |
| +I | 102 | 2000 | car battery | 12V car battery | 8.100000381469727 |
| +I | 103 | 3000 | 12-pack drill bits | 12-pack of drill bits with ... | 0.800000011920929 |
| +I | 104 | 4000 | hammer | 12oz carpenter's hammer | 0.75 |
| +I | 105 | 5000 | hammer | 14oz carpenter's hammer | 0.875 |
| +I | 106 | 6000 | hammer | 16oz carpenter's hammer | 1.0 |
| +I | 107 | 7000 | rocks | box of assorted rocks | 5.300000190734863 |
| +I | 108 | 8000 | jacket | water resistent black wind ... | 0.10000000149011612 |
| +I | 109 | 9000 | spare tire | 24 inch spare tire | 22.200000762939453 |
| -U | 106 | 6000 | hammer | 16oz carpenter's hammer | 1.0 |
| +U | 106 | 10000 | hammer | 18oz carpenter hammer | 1.0 |
| -U | 107 | 7000 | rocks | box of assorted rocks | 5.300000190734863 |
| +U | 107 | 11000 | rocks | box of assorted rocks | 5.099999904632568 |
| +I | 110 | 12000 | jacket | water resistent white wind ... | 0.20000000298023224 |
| +I | 111 | 13000 | scooter | Big 2-wheel scooter | 5.179999828338623 |
| -U | 110 | 12000 | jacket | water resistent white wind ... | 0.20000000298023224 |
| +U | 110 | 14000 | jacket | new water resistent white w... | 0.5 |
| -U | 111 | 13000 | scooter | Big 2-wheel scooter | 5.179999828338623 |
| +U | 111 | 15000 | scooter | Big 2-wheel scooter | 5.170000076293945 |
| -D | 111 | 16000 | scooter | Big 2-wheel scooter | 5.170000076293945 |
能够看到 Hudi 保留了每行的变更记录,包含 change log 的 operation 类型,这里咱们关上 TABLE HINTS 性能,不便动静设置表参数。
持续应用 batch 读模式,执行查问察看输入后果,能够看到两头的变更被合并。
Flink SQL> select * from hoodie_table;
2021-08-20 20:51:25,052 INFO org.apache.hadoop.conf.Configuration.deprecation [] - mapred.job.map.memory.mb is deprecated. Instead, use mapreduce.map.memory.mb
+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+
| op | id | ts | name | description | weight |
+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+
| +U | 110 | 14000 | jacket | new water resistent white w... | 0.5 |
| +I | 101 | 1000 | scooter | Small 2-wheel scooter | 3.140000104904175 |
| +I | 102 | 2000 | car battery | 12V car battery | 8.100000381469727 |
| +I | 103 | 3000 | 12-pack drill bits | 12-pack of drill bits with ... | 0.800000011920929 |
| +I | 104 | 4000 | hammer | 12oz carpenter's hammer | 0.75 |
| +I | 105 | 5000 | hammer | 14oz carpenter's hammer | 0.875 |
| +U | 106 | 10000 | hammer | 18oz carpenter hammer | 1.0 |
| +U | 107 | 11000 | rocks | box of assorted rocks | 5.099999904632568 |
| +I | 108 | 8000 | jacket | water resistent black wind ... | 0.10000000149011612 |
| +I | 109 | 9000 | spare tire | 24 inch spare tire | 22.200000762939453 |
+----+-------------+----------------------+--------------------------------+--------------------------------+--------------------------------+
Received a total of 10 rows
聚合
Batch 读模式下计算 count(*)
Flink SQL> select count (*) from hoodie_table;
+----+----------------------+
| op | EXPR$0 |
+----+----------------------+
| +I | 1 |
| -U | 1 |
| +U | 2 |
| -U | 2 |
| +U | 3 |
| -U | 3 |
| +U | 4 |
| -U | 4 |
| +U | 5 |
| -U | 5 |
| +U | 6 |
| -U | 6 |
| +U | 7 |
| -U | 7 |
| +U | 8 |
| -U | 8 |
| +U | 9 |
| -U | 9 |
| +U | 10 |
+----+----------------------+
Received a total of 19 rows
Streaming 读模式下计算 count(*)
Flink SQL> select count (*) from hoodie_table/*+OPTIONS('read.streaming.enabled'='true')*/;
+----+----------------------+
| op | EXPR$0 |
+----+----------------------+
| +I | 1 |
| -U | 1 |
| +U | 2 |
| -U | 2 |
| +U | 3 |
| -U | 3 |
| +U | 4 |
| -U | 4 |
| +U | 5 |
| -U | 5 |
| +U | 6 |
| -U | 6 |
| +U | 7 |
| -U | 7 |
| +U | 8 |
| -U | 8 |
| +U | 9 |
| -U | 9 |
| +U | 8 |
| -U | 8 |
| +U | 9 |
| -U | 9 |
| +U | 8 |
| -U | 8 |
| +U | 9 |
| -U | 9 |
| +U | 10 |
| -U | 10 |
| +U | 11 |
| -U | 11 |
| +U | 10 |
| -U | 10 |
| +U | 11 |
| -U | 11 |
| +U | 10 |
| -U | 10 |
| +U | 11 |
| -U | 11 |
| +U | 10 |
能够看到 batch 和 streaming 模式下的计算结果是统一的。
Reference
[1] https://www.oreilly.com/conte…
[2] https://hudi.apache.org/blog/…