S3File 是一个用于治理 Amazon S3(Simple Storage Service)的 Python 模块。以后,Apache SeaTunnel 曾经反对 S3File Sink Connector,为了更好地应用这个 Connector,有必要看一下这篇应用文档指南。

形容

将数据输入到 AWS S3 文件系统。

提醒:

如果您应用的是 Spark/Flink,在应用此连接器之前,必须确保您的 Spark/Flink 集群曾经集成了 Hadoop。Hadoop 2.x 版本已通过测试。

如果您应用的是 SeaTunnel Engine,它会在您下载和装置 SeaTunnel Engine 时主动集成 Hadoop JAR 包。您能够在 ${SEATUNNEL_HOME}/lib 目录下确认这个 JAR 包是否存在。

次要个性

  • [x] 仅一次语义

默认状况下,咱们应用 2PC 提交来确保 "仅一次语义"。

  • [x] 文件格式类型

    • [x] 文本 (text)
    • [x] CSV
    • [x] Parquet
    • [x] ORC
    • [x] JSON
    • [x] Excel

选项

名称类型必须默认值备注
pathstring-
bucketstring-
fs.s3a.endpointstring-
fs.s3a.aws.credentials.providerstringcom.amazonaws.auth.InstanceProfileCredentialsProvider
access_keystring-仅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时应用
access_secretstring-仅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时应用
custom_filenamebooleanfalse是否须要自定义文件名
file_name_expressionstring"${transactionId}"仅在 custom_filename 为 true 时应用
filename_time_formatstring"yyyy.MM.dd"仅在 custom_filename 为 true 时应用
file_format_typestring"csv"
field_delimiterstring'\001'仅在 file_format 为 text 时应用
row_delimiterstring"\n"仅在 file_format 为 text 时应用
have_partitionbooleanfalse是否须要解决分区
partition_byarray-仅在 have_partition 为 true 时应用
partition_dir_expressionstring"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/"仅在 have_partition 为 true 时应用
is_partition_field_write_in_filebooleanfalse仅在 have_partition 为 true 时应用
sink_columnsarray当此参数为空时,将写入所有从 "Transform" 或 "Source" 获取的字段
is_enable_transactionbooleantrue
batch_sizeint1000000
compress_codecstringnone
common-optionsobject-
max_rows_in_memoryint-仅在 file_format 为 Excel 时应用
sheet_namestringSheet${Random number}仅在 file_format 为 Excel 时应用

path [string]

目标目录门路是必须的。

bucket [string]

S3 文件系统的bucket地址,例如:s3n://seatunnel-test,如果您应用的是 s3a 协定,此参数应为 s3a://seatunnel-test

fs.s3a.endpoint [string]

fs s3a 端点

fs.s3a.aws.credentials.provider [string]

认证 s3a 的形式。目前咱们仅反对 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvidercom.amazonaws.auth.InstanceProfileCredentialsProvider

对于凭证提供程序的更多信息,您能够参考 Hadoop AWS 文档

access_key [string]

S3 文件系统的拜访密钥。如果未设置此参数,请确认凭证提供程序链能够正确验证,可参考 hadoop-aws。

access_secret [string]

S3 文件系统的拜访密钥。如果未设置此参数,请确认凭证提供程序链能够正确验证,可参考 hadoop-aws。

hadoop_s3_properties [map]

如果须要增加其余选项,能够在这里增加并参考此 链接

hadoop_s3_properties {      "fs.s3a.buffer.dir" = "/data/st_test/s3a"      "fs.s3a.fast.upload.buffer" = "disk"   }

custom_filename [boolean]

是否自定义文件名。

file_name_expression [string]

仅在 custom_filenametrue 时应用

file_name_expression 形容了将创立到 path 中的文件表达式。咱们能够在 file_name_expression 中增加变量 ${now} ${uuid},例如 test_${uuid}_${now}
${now} 代表以后工夫,其格局能够通过指定选项 filename_time_format 来定义。

请留神,如果 is_enable_transactiontrue,咱们将在文件名的结尾主动增加${transactionId}_

filename_time_format [string]

仅在 custom_filenametrue 时应用

file_name_expression 参数中的格局为 xxxx-${now} 时,filename_time_format 能够指定门路的工夫格局,默认值为 yyyy.MM.dd。罕用的工夫格局列于下表中:

符号形容
y
M
d月中的天数
H一天中的小时 (0-23)
m小时中的分钟
s分钟中的秒数

file_format_type [string]

咱们反对以下文件类型:

  • 文本 (text)
  • JSON
  • CSV
  • ORC
  • Parquet
  • Excel

请留神,最终文件名将以文件格式的后缀结尾,文本文件的后缀是 txt

field_delimiter [string]

数据行中列之间的分隔符。仅在 file_format 为 text 时须要。

row_delimiter [string]

文件中行之间的分隔符。仅在 file_format 为 text 时须要。

have_partition [boolean]

是否须要解决分区。

partition_by [array]

仅在 have_partitiontrue 时应用。

基于选定字段对分区数据进行分区。

partition_dir_expression [string]

仅在 have_partitiontrue 时应用。

如果指定了 partition_by,咱们将依据分区信息生成相应的分区目录,并将最终文件放在分区目录中。

默认的 partition_dir_expression${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/k0 是第一个分区字段,v0 是第一个分区字段的值。

is_partition_field_write_in_file [boolean]

仅在 have_partitiontrue 时应用。

如果 is_partition_field_write_in_filetrue,分区字段及其值将写入数据文件中。

例如,如果您想要写入 Hive 数据文件,其值应为 false

sink_columns [array]

须要写入文件的哪些列,默认值为从 "Transform" 或 "Source" 获取的所有列。
字段的程序决定了理论写入文件的程序。

is_enable_transaction [boolean]

如果 is_enable_transaction 为 true,咱们将确保在写入目标目录时数据不会失落或反复。

请留神,如果 is_enable_transactiontrue,咱们将在文件头部主动增加 ${transactionId}_

目前仅反对 true

batch_size [int]

文件中的最大行数。对于 SeaTunnel Engine,文件中的行数由 batch_sizecheckpoint.interval 独特决定。如果 checkpoint.interval 的值足够大,当文件中的行数大于 batch_size 时,写入器将写入文件。如果 checkpoint.interval 较小,则在新的检查点触发时,写入器将创立一个新文件。

compress_codec [string]

文件的压缩编解码器及其反对的详细信息如下:

  • txt: lzo none
  • JSON: lzo none
  • CSV: lzo none
  • ORC: lzo snappy lz4 zlib none
  • Parquet: lzo snappy lz4 gzip brotli zstd none

提醒:Excel 类型不反对任何压缩格局。

常见选项

请参考 Sink Common Options 获取 Sink 插件的常见参数详细信息。

max_rows_in_memory [int]

当文件格式为 Excel 时,能够缓存在内存中的数据项的最大数量。

sheet_name [string]

工作簿的工作表名称。

示例

对于文本文件格局,具备 have_partitioncustom_filenamesink_columnscom.amazonaws.auth.InstanceProfileCredentialsProvider 的配置示例:

  S3File {    bucket = "s3a://seatunnel-test"    tmp_path = "/tmp/seatunnel"    path="/seatunnel/text"    fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"    fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"    file_format_type = "text"    field_delimiter = "\t"    row_delimiter = "\n"    have_partition = true    partition_by = ["age"]    partition_dir_expression = "${k0}=${v0}"    is_partition_field_write_in_file = true    custom_filename = true    file_name_expression = "${transactionId}_${now}"    filename_time_format = "yyyy.MM.dd"    sink_columns = ["name","age"]    is_enable_transaction=true    hadoop_s3_properties {      "fs.s3a.buffer.dir" = "/data/st_test/s3a"      "fs.s3a.fast.upload.buffer" = "disk"    }  }

对于 Parquet 文件格式,仅需用 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider进行配置:

  S3File {    bucket = "s3a://seatunnel-test"    tmp_path = "/tmp/seatunnel"    path="/seatunnel/parquet"    fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"    fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"    access_key = "xxxxxxxxxxxxxxxxx"    secret_key = "xxxxxxxxxxxxxxxxx"    file_format_type = "parquet"    hadoop_s3_properties {      "fs.s3a.buffer.dir" = "/data/st_test/s3a"      "fs.s3a.fast.upload.buffer" = "disk"    }  }

对于 orc 文件仅需配置 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

  S3File {    bucket = "s3a://seatunnel-test"    tmp_path = "/tmp/seatunnel"    path="/seatunnel/orc"    fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"    fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"    access_key = "xxxxxxxxxxxxxxxxx"    secret_key = "xxxxxxxxxxxxxxxxx"    file_format_type = "orc"  }

更新日志

2.3.0-beta 2022-10-20

  • 增加 S3File Sink 连接器

    2.3.0 2022-12-30

  • Bug修复

    • 修复了以下导致数据写入文件失败的谬误:

      • 当上游字段为空时会抛出 NullPointerException
      • Sink 列映射失败
      • 从状态中复原写入器时间接获取事务失败 (3258)
  • 性能

    • 反对 S3A 协定 (3632)

      • 容许用户增加额定的 Hadoop-S3 参数
      • 容许应用 S3A 协定
      • 解耦 Hadoop-AWS 依赖
    • 反对设置每个文件的批处理大小 (3625)
    • 设置 S3 AK 为可选项 (3688)

下一版本

  • [优化]反对文件压缩(3699)

    本文由 白鲸开源科技 提供公布反对!