背景:客户须要为业务做一些数据展现. 客户会通过s3 每天给到咱们增量数据.咱们每天通过DataFactory的job抽取s3的数据,抽取后的原始数据存储到Blob容器中,再通过job抽取数据到DataBricks表中,同时通过 spark sql 解决数据,造成后果表,最初提供给BI共事,制作前端报表.其中根底或要害的步骤曾经在后面介绍。

抽取原始数据

第一步抽取原始数据,我的项目的根底是如何稳固地从s3抽取数据,并插入到DataBrikcs表中.大抵流程如下。若不应用Blob容器,可能会简略点。

遍历s3容器,获取数据到Blob

  1. 首先咱们要明确不是所有的s3文件都会取进来,要依据以后理论须要取数据,所有咱们须要一个配置表以来限度获取的s3文件. 如 FILE_LIST.csv 文件放入到BLOB中,文件有两列,一列为表名也对于数据文件名,另一列为状态值判断是否无效。将FILE_LIST先应用【简略实例】的办法,放入DataBricks表中作为配置表。如

    FILE_NAMEIS_ACTIVE
    DW_TEST1
  2. 依据【04 DataBricks遍历S3容器】咱们能够在s3中取到存在FILE_LIST表中的数据文件门路,并将门路存储在一表中,如flag_file_info表
  3. 接下来,通过DataFactory中lookup流动,获取到flag_file_info的记录。配置如下。

    -- 拼接数据文件的门路 ,留神放入lookup流动中时不要有回车select 'EDW_SHARE/Request/DataFile'||replace(substr(flag_file,instr(flag_file,'FLAGFILE_')+8),'.csv','')||'/'||content||'.csv' as file_name,flag_file,content||'.csv' as content from cfg.flag_file_info where status=1 and date_id = date_format(from_utc_timestamp(current_timestamp(),'UTC+8'),'yyyyMMdd')

  4. 迭代。ForEach逐条获取查问的后果,并设置变量

  1. ForEach中放入两个流动,一获取s3数据到Blob,二更新记录的状态 。同样是复制数据,但这里复制数据的源是s3,指标是Blob


源S3数据集配置如下。接收器Blob设置相似

批量转换CSV文件为Parquet文件

将所有同步到的s3文件放在Blob一个文件中,同样利用迭代把每个CSV文件转换成Parquet文件。管道流动

获取元数据的数据集配置如下:

留神迭代设置的变量

@activity('get_file_name').output.childItems

迭代中的流动放入【复制数据】源和接收器的配置如下:

将同步数据到DataBricks表中

将Parquet文件生产长期视图插入数据到DataBricks表中。因为这一步易出错,且每个表出错的状况不同,所以这一步没有应用变量批量解决,每个表独自解决。参考【05 简略实例】

解决数据到dw

每天的增量文件时同步到stg库中,然而为了防止出现谬误,stg只保留每天增量文件。stg再依据每个表的不同更新策略同步数据到DW层,DW层保留全量数据。JOB时间接调用sp所在的笔记本就能够了。

解决数据到dm

底层有表了,剩下就能够解决数据,解决完的数据放在另一层。对外提供服务。JOB调用和dw一样。

整体JOB如下:

触发器、监控和警报

触发器

每个管道都能够设置触发器,可定时运行JOB。绝对简略就不再介绍了

监控和警报

在监视器->监控和警报中创立新的预警规定。次要是配置条件抉择指标和增加操作组、告诉

配置条件

抉择指标时能够抉择管道级别,也能够抉择流动级别。其余的看本人的需要设置就能够了。

增加操作组和告诉人

一个操作组里能够放多个告诉,创立操作组后,其余的监控预警也能够援用。

告诉能够邮件、短信、电话语音。填入对于信息即可(产生对应费用)