背景:为接下来的实战我的项目,本文先介绍一个简略实例。指标:将本地csv数据文件同步到Databricks表中

创立表

在DataBricks中先创立一个表

create table stg.stg_text (    indes stirng,    edw_created_on_dt timestamp,    edw_changed_on_dt timestamp,    edw_etl_insert_dt timestamp,    edw_etl_update_dt timestamp,    etl_insert_dt timestamp,    etl_update_dt timestamp)using deltalocation '/mnt/data_warehouse/az_kpi/stg.db/stg_text'; -- 构造存储地位

上传文件到Blob

关上Azure首页进入存储账户中,点入进入Blob容器,点击上传csv文件

转换CSV文件为Parquet文件

Parquet是列式存储格局的文件,parquet文件压缩比高更节俭空间,且读写更高效。

关上DataFactory 【创作】新建一个管道,拖【复制数据】组件到面板.点击源。

新建一个数据源Blob数据源。抉择数据类型CSV

抉择链接服务,即创立的Blob容器,填入门路,确定创立实现。测试链接胜利。

而后在源中抉择方才创立的源。

接收器与源的创立过程相似,门路要抉择存储Paruet文件的门路。创立实现如下图

其余设置临时不须要设置,点击【调试】测试一下是否转换胜利。

同步数据到DataBricks表中

在这一步须要在DataBricks应用脚本实现。而后再用DataFactory的Job调用DataBricks脚本。这里应用的是Python

全局配置文件,次要是Blob的链接信息。创立一个notebook Default Language:Python 抉择集群。

if getArgument("schema") == 'stg': # 以后数据库  storage_account_name = "存储服务账号名"  storage_account_access_key = "存储服务的拜访key"  pre_str="wasbs://Blob仓库名@databricks账号名.blob.core.chinacloudapi.cn/"  path=getArgument("file_name")  file_location = pre_str + path  file_type = "parquet"  spark.conf.set(    "fs.azure.account.key."+storage_account_name+".blob.core.chinacloudapi.cn",    storage_account_access_key)# 其它数据库elif getArgument("schema") in ['dw','dm']:  sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict")

创立数据库目录stg,在目录下创立notebook Default Language:python 。内容如图。 分为三局部,第一局部援用配置文件,承受文件名参数,确定文件;第二局部将Parquet文件转换成长期视图;第三局部将视图数据插入到表中。

回到DataFactory中在【流动】拖拽一个笔记本到面板。抉择笔记本的DataBricks的来链接服务,在设置中抉择笔记本门路,并填入库名参数和文件名参数。

调试运行,检查数据是否曾经同步到表中。