共计 2498 个字符,预计需要花费 7 分钟才能阅读完成。
背景: 为介绍实战我的项目,本文先介绍一个简略实例。指标: 将 csv 数据文件同步到 Databricks 表中
连贯 s3
以后 s3 目录
-
flag
- 主题 01_工夫戳 1.csv
- 主题 02_工夫戳 2.csv
-
request
-
file_工夫戳 1
- data_file.csv
-
file_工夫戳 2
- data_file.csv
-
falg 文件中存储的是每个主题文件的 csv, 每个主题 csv 中有所需表的列表、以及以后增量行数。request 中依据主题文件工夫戳能够找到对于工夫戳的文件夹。文件夹中蕴含真正的数据文件。(工夫戳准确到秒,所以须要依据 flag 文件的工夫戳确定文件夹的工夫戳)
import boto3
import pandas as pd
#s3 存储桶访问信息
key_id='key_id'
secret_key='密钥'
my_bucket='容器名'
#建设 s3 长期客户端
client = boto3.client(
's3',
aws_access_key_id=key_id,
aws_secret_access_key=secret_key,
region_name='cn-north-1'
)
#调用函数列出文件信息
paginator = client.get_paginator('list_objects')
# print(paginator)
page_iterator = paginator.paginate(
Bucket=my_bucket
, Delimiter='/',
Prefix='EDW_SHARE/Flag/')
遍历 s3,并做限度
上传的文件并非都获取,咱们只获取以后有用到的。所以须要一个 flag 表做限度,在 flag 中的表获取进去。咱们先将限度的列表做好。
# #定义空 dataframe 用于寄存 flag 文件内容
df=pd.DataFrame(data=None, index=None, columns=('flag_file','content'), dtype=None, copy=False)
# #定义 dataframe 用于寄存已读入的 flag 文件名,用于过于曾经读入过的 flag,并将 sql 查问后果从 sparksql 反对的 dataframe 转换为 pandas 反对的 dataframe
df1 = spark.sql('select distinct flag_file from cfg.flag_file_info').toPandas()
# #将 flag 顺次插入列表
file_list=[i for i in df1['flag_file']]
# file_list 就是限度获取到 s3 中的文件列表
开始遍历,并在遍历时过滤不在 falg 表中的记录。
# 从 s3 返回的 json 数组中拆分 flag 文件名
for page in page_iterator:
for key in page['Contents']:
if key['Key'] not in file_list:
# 从返回的 json 数组中读取 flag 内容
response1 = client.select_object_content(
Bucket=my_bucket,
Key=key['Key'],
Expression='SELECT SOURCE_NAME FROM S3Object',
ExpressionType='SQL',
InputSerialization={
'CSV': {
'FileHeaderInfo': 'USE',
'QuoteCharacter': '"'
}
},
OutputSerialization={'JSON': {}
}
)
for i in response1['Payload']:
if 'Records' in i:
# print(key['Key'])
for content in i['Records']['Payload'].decode('utf-8').replace('{"SOURCE_NAME":','').replace('}','').replace('"','').split('\n'):
if content :
df.loc[len(df)]=(key['Key'],content)
# 将 pandas 的 dataframe 转换为 sparksql 反对的 dataframe
df=spark.createDataFrame(df)
#将 dataframe 转换为长期表
df.createOrReplaceTempView("file_flag_file_info")
file_flag_file_info 获取到的就是须要的文件名。获取到两列, 如下
flag_file | content |
---|---|
EDW_SHARE/Flag/EDW2CUBE_ALLIANCE_FLAGFILE_20210220072705.csv | DW_TEST01 |
EDW_SHARE/Flag/EDW2CUBE_ALLIANCE_FLAGFILE_20210220072705.csv | DW_TEST02 |
获取到的记录保留在一个表中
spark.sql("insert into cfg.flag_file_info select date_id,flag_file,content,status,create_dt from (select date_format(from_utc_timestamp(current_timestamp(),'UTC+8'),'yyyyMMdd') as date_id,flag_file,content,1 as status,from_utc_timestamp(current_timestamp(),'UTC+8') as create_dt,rank() over(partition by content order by flag_file desc) as rk from file_flag_file_info where content in (select file_name from cfg.file_config_list where is_active=1))t where t.rk=1")
遍历 s3 文件目标是为了正确取到数据文件在 s3 的门路,为之后 DataFactory 从 S3 中拿文件做根底。
正文完