Apache Hudi应用简介
[TOC]
数据实时处理和实时的数据
实时分为解决的实时和数据的实时
即席剖析是要求对数据实时的解决,马上要失去对应的后果
Flink、Spark Streaming是用来对实时数据的实时处理,数据要求实时,解决也要迅速
数据不实时,解决也不及时的场景则是咱们的数仓T+1数据
而本文探讨的Apache Hudi,对应的场景是数据的实时,而非解决的实时。它旨在将Mysql中的时候以近实时的形式映射到大数据平台,比方Hive中。
业务场景和技术选型
传统的离线数仓,通常数据是T+1的,不能满足对当日数据分析的需要
而流式计算个别是基于窗口,并且窗口逻辑绝对比拟固定。
而笔者所在的公司有一类非凡的需要,业务剖析比拟相熟现有事务数据库的数据结构,并且心愿有很多即席剖析,这些剖析蕴含当日比拟实时的数据。惯常他们是基于Mysql从库,间接通过Sql做相应的剖析计算。但很多时候会遇到如下阻碍
- 数据量较大、剖析逻辑较为简单时,Mysql从库耗时较长
- 一些跨库的剖析无奈实现
因而,一些弥合在OLTP和OLAP之间的技术框架呈现,典型有TiDB。它能同时反对OLTP和OLAP。而诸如Apache Hudi和Apache Kudu则相当于现有OLTP和OLAP技术的桥梁。他们可能以现有OLTP中的数据结构存储数据,反对CRUD,同时提供跟现有OLAP框架的整合(如Hive,Impala),以实现OLAP剖析
Apache Kudu,须要独自部署集群。而Apache Hudi则不须要,它能够利用现有的大数据集群比方HDFS做数据文件存储,而后通过Hive做数据分析,相对来说更适宜资源受限的环境
Apache hudi简介
应用Aapche Hudi整体思路
Hudi 提供了Hudi 表的概念,这些表反对CRUD操作。咱们能够基于这个特点,将Mysql Binlog的数据重放至Hudi表,而后基于Hive对Hudi表进行查问剖析。数据流向架构如下
Hudi表数据结构
Hudi表的数据文件,能够应用操作系统的文件系统存储,也能够应用HDFS这种分布式的文件系统存储。为了后续剖析性能和数据的可靠性,个别应用HDFS进行存储。以HDFS存储来看,一个Hudi表的存储文件分为两类。
- 蕴含
_partition_key
相干的门路是理论的数据文件,按分区存储,当然分区的门路key是能够指定的,我这里应用的是_partition_key - .hoodie 因为CRUD的零散性,每一次的操作都会生成一个文件,这些小文件越来越多后,会重大影响HDFS的性能,Hudi设计了一套文件合并机制。 .hoodie文件夹中寄存了对应的文件合并操作相干的日志文件。
数据文件
Hudi实在的数据文件应用Parquet文件格式存储
.hoodie文件
Hudi把随着工夫流逝,对表的一系列CRUD操作叫做Timeline。Timeline中某一次的操作,叫做Instant。Instant蕴含以下信息
- Instant Action 记录本次操作是一次数据提交(COMMITS),还是文件合并(COMPACTION),或者是文件清理(CLEANS)
- Instant Time 本次操作产生的工夫
- state 操作的状态,发动(REQUESTED),进行中(INFLIGHT),还是已实现(COMPLETED)
.hoodie文件夹中寄存对应操作的状态记录
Hudi记录Id
hudi为了实现数据的CRUD,须要可能惟一标识一条记录。hudi将把数据集中的惟一字段(record key ) + 数据所在分区 (partitionPath) 联结起来当做数据的惟一键
COW和MOR
基于上述根底概念之上,Hudi提供了两类表格局COW和MOR。他们会在数据的写入和查问性能上有一些不同
Copy On Write Table
简称COW。顾名思义,他是在数据写入的时候,复制一份原来的拷贝,在其根底上增加新数据。正在读数据的申请,读取的是是近的残缺正本,这相似Mysql 的MVCC的思维。
上图中,每一个色彩都蕴含了截至到其所在工夫的所有数据。老的数据正本在超过肯定的个数限度后,将被删除。这种类型的表,没有compact instant,因为写入时相当于曾经compact了。
- 长处 读取时,只读取对应分区的一个数据文件即可,较为高效
- 毛病 数据写入的时候,须要复制一个先前的正本再在其根底上生成新的数据文件,这个过程比拟耗时。且因为耗时,读申请读取到的数据绝对就会滞后
Merge On Read Table
简称MOR。新插入的数据存储在delta log 中。定期再将delta log合并进行parquet数据文件。读取数据时,会将delta log跟老的数据文件做merge,失去残缺的数据返回。当然,MOR表也能够像COW表一样,疏忽delta log,只读取最近的残缺数据文件。下图演示了MOR的两种数据读写形式
- 长处 因为写入数据先写delta log,且delta log较小,所以写入老本较低
- 毛病 须要定期合并整顿compact,否则碎片文件较多。读取性能较差,因为须要将delta log 和 老数据文件合并
基于hudi的代码实现
我在github上搁置了基于Hudi的封装实现,对应的源码地址为 https://github.com/wanqiufeng…。
binlog数据写入Hudi表
- binlog-consumer分支应用Spark streaming生产kafka中的Binlog数据,并写入Hudi表。Kafka中的binlog是通过阿里的Canal工具同步拉取的。程序入口是CanalKafkaImport2Hudi,它提供了一系列参数,配置程序的执行行为
参数名 | 含意 | 是否必填 | 默认值 |
---|---|---|---|
--base-save-path |
hudi表寄存在HDFS的根底门路,比方hdfs://192.168.16.181:8020/hudi_data/ | 是 | 无 |
--mapping-mysql-db-name |
指定解决的Mysql库名 | 是 | 无 |
--mapping-mysql-table-name |
指定解决的Mysql表名 | 是 | 无 |
--store-table-name |
指定Hudi的表名 | 否 | 默认会依据–mapping-mysql-db-name和–mapping-mysql-table-name主动生成。假如–mapping-mysql-db-name 为crm,–mapping-mysql-table-name为order。那么最终的hudi表名为crm__order |
--real-save-path |
指定hudi表最终存储的hdfs门路 | 否 | 默认依据–base-save-path和–store-table-name主动生成,生成格局为’–base-save-path’+’/’+’–store-table-name’ ,举荐默认 |
--primary-key |
指定同步的mysql表中能惟一标识记录的字段名 | 否 | 默认id |
--partition-key |
指定mysql表中能够用于分区的工夫字段,字段必须是timestamp 或dateime类型 | 是 | 无 |
--precombine-key |
最终用于配置hudi的hoodie.datasource.write.precombine.field |
否 | 默认id |
--kafka-server |
指定Kafka 集群地址 | 是 | 无 |
--kafka-topic |
指定生产kafka的队列 | 是 | 无 |
--kafka-group |
指定生产kafka的group | 否 | 默认在存储表名前加’hudi’前缀,比方’hudi_crm__order’ |
--duration-seconds |
因为本程序应用Spark streaming开发,这里指定Spark streaming微批的时长 | 否 | 默认10秒 |
一个应用的demo如下
/data/opt/spark-2.4.4-bin-hadoop2.6/bin/spark-submit --class com.niceshot.hudi.CanalKafkaImport2Hudi \
--name hudi__goods \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
--num-executors 1 \
--queue hudi \
--conf spark.executor.memoryOverhead=2048 \
--conf "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=\tmp\hudi-debug" \
--conf spark.core.connection.ack.wait.timeout=300 \
--conf spark.locality.wait=100 \
--conf spark.streaming.backpressure.enabled=true \
--conf spark.streaming.receiver.maxRate=500 \
--conf spark.streaming.kafka.maxRatePerPartition=200 \
--conf spark.ui.retainedJobs=10 \
--conf spark.ui.retainedStages=10 \
--conf spark.ui.retainedTasks=10 \
--conf spark.worker.ui.retainedExecutors=10 \
--conf spark.worker.ui.retainedDrivers=10 \
--conf spark.sql.ui.retainedExecutions=10 \
--conf spark.yarn.submit.waitAppCompletion=false \
--conf spark.yarn.maxAppAttempts=4 \
--conf spark.yarn.am.attemptFailuresValidityInterval=1h \
--conf spark.yarn.max.executor.failures=20 \
--conf spark.yarn.executor.failuresValidityInterval=1h \
--conf spark.task.maxFailures=8 \
/data/opt/spark-applications/hudi_canal_consumer/hudi-canal-import-1.0-SNAPSHOT-jar-with-dependencies.jar --kafka-server local:9092 --kafka-topic dt_streaming_canal_xxx --base-save-path hdfs://192.168.2.1:8020/hudi_table/ --mapping-mysql-db-name crm --mapping-mysql-table-name order --primary-key id --partition-key createDate --duration-seconds 1200
历史数据同步以及表元数据同步至hive
history_import_and_meta_sync
分支提供了将历史数据同步至hudi表,以及将hudi表数据结构同步至hive meta的操作
同步历史数据至hudi表
这里采纳的思路是
- 将mysql全量数据通过注入sqoop等工具,导入到hive表。
- 而后采纳分支代码中的工具HiveImport2HudiConfig,将数据导入Hudi表
HiveImport2HudiConfig提供了如下一些参数,用于配置程序执行行为
参数名 | 含意 | 是否必填 | 默认值 |
---|---|---|---|
--base-save-path |
hudi表寄存在HDFS的根底门路,比方hdfs://192.168.16.181:8020/hudi_data/ | 是 | 无 |
--mapping-mysql-db-name |
指定解决的Mysql库名 | 是 | 无 |
--mapping-mysql-table-name |
指定解决的Mysql表名 | 是 | 无 |
--store-table-name |
指定Hudi的表名 | 否 | 默认会依据–mapping-mysql-db-name和–mapping-mysql-table-name主动生成。假如–mapping-mysql-db-name 为crm,–mapping-mysql-table-name为order。那么最终的hudi表名为crm__order |
--real-save-path |
指定hudi表最终存储的hdfs门路 | 否 | 默认依据–base-save-path和–store-table-name主动生成,生成格局为’–base-save-path’+’/’+’–store-table-name’ ,举荐默认 |
--primary-key |
指定同步的hive历史表中能惟一标识记录的字段名 | 否 | 默认id |
--partition-key |
指定hive历史表中能够用于分区的工夫字段,字段必须是timestamp 或dateime类型 | 是 | 无 |
--precombine-key |
最终用于配置hudi的hoodie.datasource.write.precombine.field |
否 | 默认id |
--sync-hive-db-name |
全量历史数据所在hive的库名 | 是 | 无 |
--sync-hive-table-name |
全量历史数据所在hive的表名 | 是 | 无 |
--hive-base-path |
hive的所有数据文件寄存地址,须要参看具体的hive配置 | 否 | /user/hive/warehouse |
--hive-site-path |
hive-site.xml配置文件所在的地址 | 是 | 无 |
--tmp-data-path |
程序执行过程中临时文件寄存门路。个别默认门路是/tmp。有可能呈现/tmp所在磁盘太小,而导致历史程序执行失败的状况。当呈现该状况时,能够通过该参数自定义执行门路 | 否 | 默认操作系统长期目录 |
一个程序执行demo
nohup java -jar hudi-learn-1.0-SNAPSHOT.jar --sync-hive-db-name hudi_temp --sync-hive-table-name crm__wx_user_info --base-save-path hdfs://192.168.2.2:8020/hudi_table/ --mapping-mysql-db-name crm --mapping-mysql-table-name "order" --primary-key "id" --partition-key created_date --hive-site-path /etc/lib/hive/conf/hive-site.xml --tmp-data-path /data/tmp > order.log &
同步hudi表构造至hive meta
须要将hudi的数据结构和分区,以hive表面的模式同步至Hive meta,能力是Hive感知到hudi数据,并通过sql进行查问剖析。Hudi自身在生产Binlog进行存储时,能够顺带将相干表元数据信息同步至hive。但思考到每条写入Apache Hudi表的数据,都要读写Hive Meta ,对Hive的性能可能影响很大。所以我独自开发了HiveMetaSyncConfig工具,用于同步hudi表元数据至Hive。思考到目前程序只反对按天分区,所以同步工具能够一天执行一次即可。参数配置如下
参数名 | 含意 | 是否必填 | 默认值 | |
---|---|---|---|---|
--hive-db-name |
指定hudi表同步至哪个hive数据库 | 是 | 无 | |
--hive-table-name |
指定hudi表同步至哪个hive表 | 是 | 无 | 、 |
--hive-jdbc-url |
指定hive meta的jdbc链接地址,例如jdbc:hive2://192.168.16.181:10000 | 是 | 无 | |
--hive-user-name |
指定hive meta的链接用户名 | 否 | 默认hive | |
--hive-pwd |
指定hive meta的链接明码 | 否 | 默认hive | |
--hudi-table-path |
指定hudi表所在hdfs的文件门路 | 是 | 无 | |
--hive-site-path |
指定hive的hive-site.xml门路 | 是 | 无 |
一个程序执行demo
java -jar hudi-learn-1.0-SNAPSHOT.jar --hive-db-name streaming --hive-table-name crm__order --hive-user-name hive --hive-pwd hive --hive-jdbc-url jdbc:hive2://192.168.16.181:10000 --hudi-table-path hdfs://192.168.16.181:8020/hudi_table/crm__order --hive-site-path /lib/hive/conf/hive-site.xml
一些踩坑
hive相干配置
有些hive集群的hive.input.format配置,默认是org.apache.hadoop.hive.ql.io.CombineHiveInputFormat,这会导致挂载Hudi数据的Hive表面读取到所有Hudi的Parquet数据,从而导致最终的读取后果反复。须要将hive的format改为org.apache.hadoop.hive.ql.io.HiveInputFormat
,为了防止在整个集群层面上更改对其余离线Hive Sql造成不必要的影响,倡议只对以后hive session设置set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
spark streaming的一些调优
因为binlog写入Hudi表的是基于Spark streaming实现的,这里给出了一些spark 和spark streaming层面的配置,它能使整个程序工作更稳固
| 配置| 含意|
| :——– | ——–:|
| spark.streaming.backpressure.enabled=true| 启动背压,该配置能使Spark Streaming生产速率,基于上一次的生产状况,进行调整,防止程序解体|
| spark.ui.retainedJobs=10
spark.ui.retainedStages=10
spark.ui.retainedTasks=10
spark.worker.ui.retainedExecutors=10
spark.worker.ui.retainedDrivers=10
spark.sql.ui.retainedExecutions=10 | 默认状况下,spark 会在driver中存储一些spark 程序执行过程中各stage和task的历史信息,当driver内存过小时,可能使driver解体,通过上述参数,调节这些历史数据存储的条数,从而减小对内层应用|
|spark.yarn.maxAppAttempts=4|配置当driver解体后,尝试重启的次数|
|spark.yarn.am.attemptFailuresValidityInterval=1h|假若driver执行一周才解体一次,那咱们更心愿每次都能重启,而上述配置在累计到重启4次后,driver就再也不会被重启,该配置则用于重置maxAppAttempts的工夫距离|
|spark.yarn.max.executor.failures=20|executor执行也可能失败,失败后集群会主动调配新的executor, 该配置用于配置容许executor失败的次数,超过次数后程序会报(reason: Max number of executor failures (400) reached),并退出|
|spark.yarn.executor.failuresValidityInterval=1h|指定executor失败重调配次数重置的工夫距离|
|spark.task.maxFailures=8|容许工作执行失败的次数|
将来改良
- 反对无分区,或非日期分区表。目前只反对日期分区表
- 多数据类型反对,目前为了程序的稳定性,会将Mysql中的字段全副以String类型存储至Hudi
参考资料
https://hudi.apache.org/
欢送关注我的集体公众号”东南偏北UP”,记录代码人生,行业思考,科技评论
发表回复