共计 1467 个字符,预计需要花费 4 分钟才能阅读完成。
背景: 为接下来的实战我的项目,本文先介绍一个简略实例。指标: 将本地 csv 数据文件同步到 Databricks 表中
创立表
在 DataBricks 中先创立一个表
create table stg.stg_text (
indes stirng,
edw_created_on_dt timestamp,
edw_changed_on_dt timestamp,
edw_etl_insert_dt timestamp,
edw_etl_update_dt timestamp,
etl_insert_dt timestamp,
etl_update_dt timestamp
)using delta
location '/mnt/data_warehouse/az_kpi/stg.db/stg_text'; -- 构造存储地位
上传文件到 Blob
关上 Azure 首页进入存储账户中,点入进入 Blob 容器,点击上传 csv 文件
转换 CSV 文件为 Parquet 文件
Parquet 是列式存储格局的文件,parquet 文件压缩比高更节俭空间,且读写更高效。
关上 DataFactory【创作】新建一个管道,拖【复制数据】组件到面板. 点击源。
新建一个数据源 Blob 数据源。抉择数据类型 CSV
抉择链接服务,即创立的 Blob 容器,填入门路,确定创立实现。测试链接胜利。
而后在源中抉择方才创立的源。
接收器与源的创立过程相似,门路要抉择存储 Paruet 文件的门路。创立实现如下图
其余设置临时不须要设置,点击【调试】测试一下是否转换胜利。
同步数据到 DataBricks 表中
在这一步须要在 DataBricks 应用脚本实现。而后再用 DataFactory 的 Job 调用 DataBricks 脚本。这里应用的是 Python
全局配置文件,次要是 Blob 的链接信息。创立一个 notebook Default Language:Python 抉择集群。
if getArgument("schema") == 'stg': # 以后数据库
storage_account_name = "存储服务账号名"
storage_account_access_key = "存储服务的拜访 key"
pre_str="wasbs://Blob 仓库名 @databricks 账号名.blob.core.chinacloudapi.cn/"
path=getArgument("file_name")
file_location = pre_str + path
file_type = "parquet"
spark.conf.set(
"fs.azure.account.key."+storage_account_name+".blob.core.chinacloudapi.cn",
storage_account_access_key)
# 其它数据库
elif getArgument("schema") in ['dw','dm']:
sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict")
创立数据库目录 stg, 在目录下创立 notebook Default Language:python。内容如图。分为三局部,第一局部援用配置文件,承受文件名参数,确定文件;第二局部将 Parquet 文件转换成长期视图;第三局部将视图数据插入到表中。
回到 DataFactory 中在【流动】拖拽一个笔记本到面板。抉择笔记本的 DataBricks 的来链接服务,在设置中抉择笔记本门路,并填入库名参数和文件名参数。
调试运行,检查数据是否曾经同步到表中。