共计 1657 个字符,预计需要花费 5 分钟才能阅读完成。
背景: 客户须要为业务做一些数据展现. 客户会通过 s3 每天给到咱们增量数据. 咱们每天通过 DataFactory 的 job 抽取 s3 的数据, 抽取后的原始数据存储到 Blob 容器中, 再通过 job 抽取数据到 DataBricks 表中, 同时通过 spark sql 解决数据, 造成后果表, 最初提供给 BI 共事, 制作前端报表. 其中根底或要害的步骤曾经在后面介绍。
抽取原始数据
第一步抽取原始数据, 我的项目的根底是如何稳固地从 s3 抽取数据, 并插入到 DataBrikcs 表中. 大抵流程如下。若不应用 Blob 容器,可能会简略点。
遍历 s3 容器,获取数据到 Blob
-
首先咱们要明确不是所有的 s3 文件都会取进来, 要依据以后理论须要取数据, 所有咱们须要一个配置表以来限度获取的 s3 文件. 如 FILE_LIST.csv 文件放入到 BLOB 中,文件有两列,一列为表名也对于数据文件名,另一列为状态值判断是否无效。将 FILE_LIST 先应用【简略实例】的办法,放入 DataBricks 表中作为配置表。如
FILE_NAME IS_ACTIVE DW_TEST 1 - 依据【04 DataBricks 遍历 S3 容器】咱们能够在 s3 中取到存在 FILE_LIST 表中的数据文件门路,并将门路存储在一表中,如 flag_file_info 表
-
接下来,通过 DataFactory 中 lookup 流动,获取到 flag_file_info 的记录。配置如下。
-- 拼接数据文件的门路,留神放入 lookup 流动中时不要有回车 select 'EDW_SHARE/Request/DataFile'||replace(substr(flag_file,instr(flag_file,'FLAGFILE_')+8),'.csv','')||'/'||content||'.csv'as file_name,flag_file,content||'.csv' as content from cfg.flag_file_info where status=1 and date_id = date_format(from_utc_timestamp(current_timestamp(),'UTC+8'),'yyyyMMdd')
- 迭代。ForEach 逐条获取查问的后果,并设置变量
- ForEach 中放入两个流动,一获取 s3 数据到 Blob,二更新记录的状态。同样是复制数据,但这里复制数据的源是 s3, 指标是 Blob
源 S3 数据集配置如下。接收器 Blob 设置相似
批量转换 CSV 文件为 Parquet 文件
将所有同步到的 s3 文件放在 Blob 一个文件中,同样利用迭代把每个 CSV 文件转换成 Parquet 文件。管道流动
获取元数据的数据集配置如下:
留神迭代设置的变量
@activity('get_file_name').output.childItems
迭代中的流动放入【复制数据】源和接收器的配置如下:
将同步数据到 DataBricks 表中
将 Parquet 文件生产长期视图插入数据到 DataBricks 表中。因为这一步易出错,且每个表出错的状况不同,所以这一步没有应用变量批量解决,每个表独自解决。参考【05 简略实例】
解决数据到 dw
每天的增量文件时同步到 stg 库中,然而为了防止出现谬误,stg 只保留每天增量文件。stg 再依据每个表的不同更新策略同步数据到 DW 层,DW 层保留全量数据。JOB 时间接调用 sp 所在的笔记本就能够了。
解决数据到 dm
底层有表了,剩下就能够解决数据,解决完的数据放在另一层。对外提供服务。JOB 调用和 dw 一样。
整体 JOB 如下:
触发器、监控和警报
触发器
每个管道都能够设置触发器,可定时运行 JOB。绝对简略就不再介绍了
监控和警报
在监视器 -> 监控和警报中创立新的预警规定。次要是配置条件抉择指标和增加操作组、告诉
配置条件
抉择指标时能够抉择管道级别,也能够抉择流动级别。其余的看本人的需要设置就能够了。
增加操作组和告诉人
一个操作组里能够放多个告诉,创立操作组后,其余的监控预警也能够援用。
告诉能够邮件、短信、电话语音。填入对于信息即可(产生对应费用)