背景:为介绍实战我的项目,本文先介绍一个简略实例。指标:将csv数据文件同步到Databricks表中

连贯s3

以后s3目录

  • flag

    • 主题01_工夫戳1.csv
    • 主题02_工夫戳2.csv
  • request

    • file_工夫戳1

      • data_file.csv
    • file_工夫戳2

      • data_file.csv

falg文件中存储的是每个主题文件的csv,每个主题csv中有所需表的列表、以及以后增量行数。request中依据主题文件工夫戳能够找到对于工夫戳的文件夹。文件夹中蕴含真正的数据文件。(工夫戳准确到秒,所以须要依据flag文件的工夫戳确定文件夹的工夫戳)

import boto3import pandas as pd#s3存储桶访问信息key_id='key_id'secret_key='密钥'my_bucket='容器名'#建设s3长期客户端client = boto3.client(                       's3',                       aws_access_key_id=key_id,                       aws_secret_access_key=secret_key,                       region_name='cn-north-1'                      )    #调用函数列出文件信息paginator = client.get_paginator('list_objects')# print(paginator)page_iterator = paginator.paginate(  Bucket=my_bucket  , Delimiter='/',  Prefix='EDW_SHARE/Flag/')

遍历s3,并做限度

上传的文件并非都获取,咱们只获取以后有用到的。所以须要一个flag表做限度,在flag中的表获取进去。咱们先将限度的列表做好。

# #定义空dataframe用于寄存flag文件内容df=pd.DataFrame(data=None, index=None, columns=('flag_file','content'), dtype=None, copy=False)# #定义dataframe用于寄存已读入的flag文件名,用于过于曾经读入过的flag,并将sql查问后果从sparksql反对的dataframe转换为pandas反对的dataframedf1 = spark.sql('select distinct flag_file from cfg.flag_file_info').toPandas()# #将flag顺次插入列表file_list=[i for i in df1['flag_file']]# file_list就是限度获取到s3中的文件列表

开始遍历,并在遍历时过滤不在falg表中的记录。

# 从s3返回的json数组中拆分flag文件名for page in page_iterator:      for key in page['Contents']:        if key['Key'] not in file_list:            #             从返回的json数组中读取flag内容            response1 = client.select_object_content(                                                      Bucket=my_bucket,                                                      Key=key['Key'],                                                      Expression='SELECT SOURCE_NAME FROM S3Object',                                                      ExpressionType='SQL',                                                      InputSerialization={                                                          'CSV': {                                                                  'FileHeaderInfo': 'USE',                                                                  'QuoteCharacter': '"'                                                                  }                                                      },                                                      OutputSerialization={                                                                            'JSON': {}                                                                           }                                                    )                        for i in response1['Payload']:                 if 'Records' in i:                    # print(key['Key'])                    for content in i['Records']['Payload'].decode('utf-8').replace('{"SOURCE_NAME":','').replace('}','').replace('"','').split('\n'):                        if content  :                            df.loc[len(df)]=(key['Key'],content)# 将pandas的dataframe转换为sparksql反对的dataframedf=spark.createDataFrame(df)#将dataframe转换为长期表df.createOrReplaceTempView("file_flag_file_info")

file_flag_file_info获取到的就是须要的文件名。获取到两列,如下

flag_filecontent
EDW_SHARE/Flag/EDW2CUBE_ALLIANCE_FLAGFILE_20210220072705.csvDW_TEST01
EDW_SHARE/Flag/EDW2CUBE_ALLIANCE_FLAGFILE_20210220072705.csvDW_TEST02

获取到的记录保留在一个表中

spark.sql("insert into cfg.flag_file_info select date_id,flag_file,content,status,create_dt from (select date_format(from_utc_timestamp(current_timestamp(),'UTC+8'),'yyyyMMdd') as date_id,flag_file,content,1 as status,from_utc_timestamp(current_timestamp(),'UTC+8') as create_dt,rank() over(partition by content order by flag_file desc) as rk from file_flag_file_info where  content in (select file_name from cfg.file_config_list where is_active=1))t where t.rk=1")

遍历s3文件目标是为了正确取到数据文件在s3的门路,为之后DataFactory从S3中拿文件做根底。