乐趣区

关于大数据:Apache-Hudi使用简介

Apache Hudi 应用简介

[TOC]

数据实时处理和实时的数据

实时分为解决的实时和数据的实时
即席剖析是要求对数据实时的解决,马上要失去对应的后果
Flink、Spark Streaming 是用来对实时数据的实时处理,数据要求实时,解决也要迅速
数据不实时,解决也不及时的场景则是咱们的数仓 T + 1 数据

而本文探讨的 Apache Hudi,对应的场景是数据的实时,而非解决的实时。它旨在将 Mysql 中的时候以近实时的形式映射到大数据平台,比方 Hive 中。

业务场景和技术选型

传统的离线数仓,通常数据是 T + 1 的,不能满足对当日数据分析的需要
而流式计算个别是基于窗口,并且窗口逻辑绝对比拟固定。
而笔者所在的公司有一类非凡的需要,业务剖析比拟相熟现有事务数据库的数据结构,并且心愿有很多即席剖析,这些剖析蕴含当日比拟实时的数据。惯常他们是基于 Mysql 从库,间接通过 Sql 做相应的剖析计算。但很多时候会遇到如下阻碍

  • 数据量较大、剖析逻辑较为简单时,Mysql 从库耗时较长
  • 一些跨库的剖析无奈实现

因而,一些弥合在 OLTP 和 OLAP 之间的技术框架呈现,典型有 TiDB。它能同时反对 OLTP 和 OLAP。而诸如 Apache Hudi 和 Apache Kudu 则相当于现有 OLTP 和 OLAP 技术的桥梁。他们可能以现有 OLTP 中的数据结构存储数据,反对 CRUD,同时提供跟现有 OLAP 框架的整合(如 Hive,Impala),以实现 OLAP 剖析

Apache Kudu,须要独自部署集群。而 Apache Hudi 则不须要,它能够利用现有的大数据集群比方 HDFS 做数据文件存储,而后通过 Hive 做数据分析,相对来说更适宜资源受限的环境

Apache hudi 简介

应用 Aapche Hudi 整体思路

Hudi 提供了 Hudi 表的概念,这些表反对 CRUD 操作。咱们能够基于这个特点,将 Mysql Binlog 的数据重放至 Hudi 表,而后基于 Hive 对 Hudi 表进行查问剖析。数据流向架构如下

Hudi 表数据结构

Hudi 表的数据文件,能够应用操作系统的文件系统存储,也能够应用 HDFS 这种分布式的文件系统存储。为了后续剖析性能和数据的可靠性,个别应用 HDFS 进行存储。以 HDFS 存储来看,一个 Hudi 表的存储文件分为两类。

  • 蕴含 _partition_key 相干的门路是理论的数据文件,按分区存储,当然分区的门路 key 是能够指定的,我这里应用的是_partition_key
  • .hoodie 因为 CRUD 的零散性,每一次的操作都会生成一个文件,这些小文件越来越多后,会重大影响 HDFS 的性能,Hudi 设计了一套文件合并机制。.hoodie 文件夹中寄存了对应的文件合并操作相干的日志文件。
数据文件

Hudi 实在的数据文件应用 Parquet 文件格式存储

.hoodie 文件

Hudi 把随着工夫流逝,对表的一系列 CRUD 操作叫做 Timeline。Timeline 中某一次的操作,叫做 Instant。Instant 蕴含以下信息

  • Instant Action 记录本次操作是一次数据提交(COMMITS),还是文件合并(COMPACTION),或者是文件清理(CLEANS)
  • Instant Time 本次操作产生的工夫
  • state 操作的状态,发动(REQUESTED),进行中(INFLIGHT),还是已实现(COMPLETED)

.hoodie 文件夹中寄存对应操作的状态记录

Hudi 记录 Id

hudi 为了实现数据的 CRUD,须要可能惟一标识一条记录。hudi 将把数据集中的惟一字段(record key) + 数据所在分区 (partitionPath) 联结起来当做数据的惟一键

COW 和 MOR

基于上述根底概念之上,Hudi 提供了两类表格局 COW 和 MOR。他们会在数据的写入和查问性能上有一些不同

Copy On Write Table

简称 COW。顾名思义,他是在数据写入的时候,复制一份原来的拷贝,在其根底上增加新数据。正在读数据的申请,读取的是是近的残缺正本,这相似 Mysql 的 MVCC 的思维。

上图中,每一个色彩都蕴含了截至到其所在工夫的所有数据。老的数据正本在超过肯定的个数限度后,将被删除。这种类型的表,没有 compact instant,因为写入时相当于曾经 compact 了。

  • 长处 读取时,只读取对应分区的一个数据文件即可,较为高效
  • 毛病 数据写入的时候,须要复制一个先前的正本再在其根底上生成新的数据文件,这个过程比拟耗时。且因为耗时,读申请读取到的数据绝对就会滞后
Merge On Read Table

简称 MOR。新插入的数据存储在 delta log 中。定期再将 delta log 合并进行 parquet 数据文件。读取数据时,会将 delta log 跟老的数据文件做 merge,失去残缺的数据返回。当然,MOR 表也能够像 COW 表一样,疏忽 delta log,只读取最近的残缺数据文件。下图演示了 MOR 的两种数据读写形式

  • 长处 因为写入数据先写 delta log,且 delta log 较小,所以写入老本较低
  • 毛病 须要定期合并整顿 compact,否则碎片文件较多。读取性能较差,因为须要将 delta log 和 老数据文件合并

基于 hudi 的代码实现

我在 github 上搁置了基于 Hudi 的封装实现,对应的源码地址为 https://github.com/wanqiufeng…。

binlog 数据写入 Hudi 表

  • binlog-consumer 分支应用 Spark streaming 生产 kafka 中的 Binlog 数据,并写入 Hudi 表。Kafka 中的 binlog 是通过阿里的 Canal 工具同步拉取的。程序入口是 CanalKafkaImport2Hudi,它提供了一系列参数,配置程序的执行行为
参数名 含意 是否必填 默认值
--base-save-path hudi 表寄存在 HDFS 的根底门路,比方 hdfs://192.168.16.181:8020/hudi_data/
--mapping-mysql-db-name 指定解决的 Mysql 库名
--mapping-mysql-table-name 指定解决的 Mysql 表名
--store-table-name 指定 Hudi 的表名 默认会依据 –mapping-mysql-db-name 和 –mapping-mysql-table-name 主动生成。假如 –mapping-mysql-db-name 为 crm,–mapping-mysql-table-name 为 order。那么最终的 hudi 表名为 crm__order
--real-save-path 指定 hudi 表最终存储的 hdfs 门路 默认依据 –base-save-path 和 –store-table-name 主动生成,生成格局为 ’–base-save-path’+’/’+’–store-table-name’,举荐默认
--primary-key 指定同步的 mysql 表中能惟一标识记录的字段名 默认 id
--partition-key 指定 mysql 表中能够用于分区的工夫字段,字段必须是 timestamp 或 dateime 类型
--precombine-key 最终用于配置 hudi 的hoodie.datasource.write.precombine.field 默认 id
--kafka-server 指定 Kafka 集群地址
--kafka-topic 指定生产 kafka 的队列
--kafka-group 指定生产 kafka 的 group 默认在存储表名前加 ’hudi’ 前缀,比方 ’hudi_crm__order’
--duration-seconds 因为本程序应用 Spark streaming 开发,这里指定 Spark streaming 微批的时长 默认 10 秒

一个应用的 demo 如下

/data/opt/spark-2.4.4-bin-hadoop2.6/bin/spark-submit --class com.niceshot.hudi.CanalKafkaImport2Hudi \
    --name hudi__goods \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 512m \
    --executor-memory 512m \
    --executor-cores 1 \
    --num-executors 1 \
    --queue hudi \
    --conf spark.executor.memoryOverhead=2048 \
    --conf "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=\tmp\hudi-debug" \
    --conf spark.core.connection.ack.wait.timeout=300 \
    --conf spark.locality.wait=100 \
    --conf spark.streaming.backpressure.enabled=true \
    --conf spark.streaming.receiver.maxRate=500 \
    --conf spark.streaming.kafka.maxRatePerPartition=200 \
    --conf spark.ui.retainedJobs=10 \
    --conf spark.ui.retainedStages=10 \
    --conf spark.ui.retainedTasks=10 \
    --conf spark.worker.ui.retainedExecutors=10 \
    --conf spark.worker.ui.retainedDrivers=10 \
    --conf spark.sql.ui.retainedExecutions=10 \
    --conf spark.yarn.submit.waitAppCompletion=false \
    --conf spark.yarn.maxAppAttempts=4 \
    --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
    --conf spark.yarn.max.executor.failures=20 \
    --conf spark.yarn.executor.failuresValidityInterval=1h \
    --conf spark.task.maxFailures=8 \
    /data/opt/spark-applications/hudi_canal_consumer/hudi-canal-import-1.0-SNAPSHOT-jar-with-dependencies.jar  --kafka-server local:9092 --kafka-topic dt_streaming_canal_xxx --base-save-path hdfs://192.168.2.1:8020/hudi_table/ --mapping-mysql-db-name crm --mapping-mysql-table-name order --primary-key id --partition-key createDate --duration-seconds 1200

历史数据同步以及表元数据同步至 hive

history_import_and_meta_sync 分支提供了将历史数据同步至 hudi 表,以及将 hudi 表数据结构同步至 hive meta 的操作

同步历史数据至 hudi 表

这里采纳的思路是

  • 将 mysql 全量数据通过注入 sqoop 等工具,导入到 hive 表。
  • 而后采纳分支代码中的工具 HiveImport2HudiConfig,将数据导入 Hudi 表

HiveImport2HudiConfig 提供了如下一些参数,用于配置程序执行行为

参数名 含意 是否必填 默认值
--base-save-path hudi 表寄存在 HDFS 的根底门路,比方 hdfs://192.168.16.181:8020/hudi_data/
--mapping-mysql-db-name 指定解决的 Mysql 库名
--mapping-mysql-table-name 指定解决的 Mysql 表名
--store-table-name 指定 Hudi 的表名 默认会依据 –mapping-mysql-db-name 和 –mapping-mysql-table-name 主动生成。假如 –mapping-mysql-db-name 为 crm,–mapping-mysql-table-name 为 order。那么最终的 hudi 表名为 crm__order
--real-save-path 指定 hudi 表最终存储的 hdfs 门路 默认依据 –base-save-path 和 –store-table-name 主动生成,生成格局为 ’–base-save-path’+’/’+’–store-table-name’,举荐默认
--primary-key 指定同步的 hive 历史表中能惟一标识记录的字段名 默认 id
--partition-key 指定 hive 历史表中能够用于分区的工夫字段,字段必须是 timestamp 或 dateime 类型
--precombine-key 最终用于配置 hudi 的hoodie.datasource.write.precombine.field 默认 id
--sync-hive-db-name 全量历史数据所在 hive 的库名
--sync-hive-table-name 全量历史数据所在 hive 的表名
--hive-base-path hive 的所有数据文件寄存地址,须要参看具体的 hive 配置 /user/hive/warehouse
--hive-site-path hive-site.xml 配置文件所在的地址
--tmp-data-path 程序执行过程中临时文件寄存门路。个别默认门路是 /tmp。有可能呈现 /tmp 所在磁盘太小,而导致历史程序执行失败的状况。当呈现该状况时,能够通过该参数自定义执行门路 默认操作系统长期目录

一个程序执行 demo

nohup java -jar hudi-learn-1.0-SNAPSHOT.jar --sync-hive-db-name hudi_temp --sync-hive-table-name crm__wx_user_info --base-save-path hdfs://192.168.2.2:8020/hudi_table/ --mapping-mysql-db-name crm --mapping-mysql-table-name "order" --primary-key "id" --partition-key created_date --hive-site-path /etc/lib/hive/conf/hive-site.xml --tmp-data-path /data/tmp > order.log &
同步 hudi 表构造至 hive meta

须要将 hudi 的数据结构和分区,以 hive 表面的模式同步至 Hive meta,能力是 Hive 感知到 hudi 数据,并通过 sql 进行查问剖析。Hudi 自身在生产 Binlog 进行存储时,能够顺带将相干表元数据信息同步至 hive。但思考到每条写入 Apache Hudi 表的数据,都要读写 Hive Meta,对 Hive 的性能可能影响很大。所以我独自开发了 HiveMetaSyncConfig 工具,用于同步 hudi 表元数据至 Hive。思考到目前程序只反对按天分区,所以同步工具能够一天执行一次即可。参数配置如下

参数名 含意 是否必填 默认值
--hive-db-name 指定 hudi 表同步至哪个 hive 数据库
--hive-table-name 指定 hudi 表同步至哪个 hive 表
--hive-jdbc-url 指定 hive meta 的 jdbc 链接地址,例如 jdbc:hive2://192.168.16.181:10000
--hive-user-name 指定 hive meta 的链接用户名 默认 hive
--hive-pwd 指定 hive meta 的链接明码 默认 hive
--hudi-table-path 指定 hudi 表所在 hdfs 的文件门路
--hive-site-path 指定 hive 的 hive-site.xml 门路

一个程序执行 demo

java -jar hudi-learn-1.0-SNAPSHOT.jar --hive-db-name streaming --hive-table-name crm__order --hive-user-name hive --hive-pwd hive --hive-jdbc-url jdbc:hive2://192.168.16.181:10000 --hudi-table-path hdfs://192.168.16.181:8020/hudi_table/crm__order --hive-site-path /lib/hive/conf/hive-site.xml

一些踩坑

hive 相干配置

有些 hive 集群的 hive.input.format 配置,默认是 org.apache.hadoop.hive.ql.io.CombineHiveInputFormat,这会导致挂载 Hudi 数据的 Hive 表面读取到所有 Hudi 的 Parquet 数据,从而导致最终的读取后果反复。须要将 hive 的 format 改为org.apache.hadoop.hive.ql.io.HiveInputFormat,为了防止在整个集群层面上更改对其余离线 Hive Sql 造成不必要的影响,倡议只对以后 hive session 设置set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;

spark streaming 的一些调优

因为 binlog 写入 Hudi 表的是基于 Spark streaming 实现的,这里给出了一些 spark 和 spark streaming 层面的配置,它能使整个程序工作更稳固

| 配置 | 含意 |
| :——– | ——–:|
| spark.streaming.backpressure.enabled=true| 启动背压,该配置能使 Spark Streaming 生产速率,基于上一次的生产状况,进行调整,防止程序解体 |
| spark.ui.retainedJobs=10
spark.ui.retainedStages=10
spark.ui.retainedTasks=10
spark.worker.ui.retainedExecutors=10
spark.worker.ui.retainedDrivers=10
spark.sql.ui.retainedExecutions=10 | 默认状况下,spark 会在 driver 中存储一些 spark 程序执行过程中各 stage 和 task 的历史信息,当 driver 内存过小时,可能使 driver 解体,通过上述参数,调节这些历史数据存储的条数,从而减小对内层应用 |
|spark.yarn.maxAppAttempts=4| 配置当 driver 解体后,尝试重启的次数 |
|spark.yarn.am.attemptFailuresValidityInterval=1h| 假若 driver 执行一周才解体一次,那咱们更心愿每次都能重启,而上述配置在累计到重启 4 次后,driver 就再也不会被重启,该配置则用于重置 maxAppAttempts 的工夫距离 |
|spark.yarn.max.executor.failures=20|executor 执行也可能失败,失败后集群会主动调配新的 executor, 该配置用于配置容许 executor 失败的次数,超过次数后程序会报(reason: Max number of executor failures (400) reached),并退出 |
|spark.yarn.executor.failuresValidityInterval=1h| 指定 executor 失败重调配次数重置的工夫距离 |
|spark.task.maxFailures=8| 容许工作执行失败的次数 |

将来改良

  • 反对无分区,或非日期分区表。目前只反对日期分区表
  • 多数据类型反对,目前为了程序的稳定性,会将 Mysql 中的字段全副以 String 类型存储至 Hudi

参考资料

https://hudi.apache.org/

欢送关注我的集体公众号 ” 东南偏北 UP”,记录代码人生,行业思考,科技评论

退出移动版