共计 5955 个字符,预计需要花费 15 分钟才能阅读完成。
S3File 是一个用于治理 Amazon S3(Simple Storage Service)的 Python 模块。以后,Apache SeaTunnel 曾经反对 S3File Sink Connector,为了更好地应用这个 Connector,有必要看一下这篇应用文档指南。
形容
将数据输入到 AWS S3 文件系统。
提醒:
如果您应用的是 Spark/Flink,在应用此连接器之前,必须确保您的 Spark/Flink 集群曾经集成了 Hadoop。Hadoop 2.x 版本已通过测试。
如果您应用的是 SeaTunnel Engine,它会在您下载和装置 SeaTunnel Engine 时主动集成 Hadoop JAR 包。您能够在 ${SEATUNNEL_HOME}/lib
目录下确认这个 JAR 包是否存在。
次要个性
- [x] 仅一次语义
默认状况下,咱们应用 2PC 提交来确保 “ 仅一次语义 ”。
-
[x] 文件格式类型
- [x] 文本 (text)
- [x] CSV
- [x] Parquet
- [x] ORC
- [x] JSON
- [x] Excel
选项
名称 | 类型 | 必须 | 默认值 | 备注 |
---|---|---|---|---|
path | string | 是 | – | |
bucket | string | 是 | – | |
fs.s3a.endpoint | string | 是 | – | |
fs.s3a.aws.credentials.provider | string | 是 | com.amazonaws.auth.InstanceProfileCredentialsProvider | |
access_key | string | 否 | – | 仅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时应用 |
access_secret | string | 否 | – | 仅在 fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider 时应用 |
custom_filename | boolean | 否 | false | 是否须要自定义文件名 |
file_name_expression | string | 否 | “${transactionId}” | 仅在 custom_filename 为 true 时应用 |
filename_time_format | string | 否 | “yyyy.MM.dd” | 仅在 custom_filename 为 true 时应用 |
file_format_type | string | 否 | “csv” | |
field_delimiter | string | 否 | ‘\001’ | 仅在 file_format 为 text 时应用 |
row_delimiter | string | 否 | “\n” | 仅在 file_format 为 text 时应用 |
have_partition | boolean | 否 | false | 是否须要解决分区 |
partition_by | array | 否 | – | 仅在 have_partition 为 true 时应用 |
partition_dir_expression | string | 否 | “${k0}=${v0}/${k1}=${v1}/…/${kn}=${vn}/” | 仅在 have_partition 为 true 时应用 |
is_partition_field_write_in_file | boolean | 否 | false | 仅在 have_partition 为 true 时应用 |
sink_columns | array | 否 | 当此参数为空时,将写入所有从 “Transform” 或 “Source” 获取的字段 | |
is_enable_transaction | boolean | 否 | true | |
batch_size | int | 否 | 1000000 | |
compress_codec | string | 否 | none | |
common-options | object | 否 | – | |
max_rows_in_memory | int | 否 | – | 仅在 file_format 为 Excel 时应用 |
sheet_name | string | 否 | Sheet${Random number} | 仅在 file_format 为 Excel 时应用 |
path [string]
目标目录门路是必须的。
bucket [string]
S3 文件系统的 bucket 地址,例如:s3n://seatunnel-test
,如果您应用的是 s3a
协定,此参数应为 s3a://seatunnel-test
。
fs.s3a.endpoint [string]
fs s3a 端点
fs.s3a.aws.credentials.provider [string]
认证 s3a 的形式。目前咱们仅反对 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
和 com.amazonaws.auth.InstanceProfileCredentialsProvider
。
对于凭证提供程序的更多信息,您能够参考 Hadoop AWS 文档
access_key [string]
S3 文件系统的拜访密钥。如果未设置此参数,请确认凭证提供程序链能够正确验证,可参考 hadoop-aws。
access_secret [string]
S3 文件系统的拜访密钥。如果未设置此参数,请确认凭证提供程序链能够正确验证,可参考 hadoop-aws。
hadoop_s3_properties [map]
如果须要增加其余选项,能够在这里增加并参考此 链接
hadoop_s3_properties {
"fs.s3a.buffer.dir" = "/data/st_test/s3a"
"fs.s3a.fast.upload.buffer" = "disk"
}
custom_filename [boolean]
是否自定义文件名。
file_name_expression [string]
仅在 custom_filename
为 true
时应用
file_name_expression
形容了将创立到 path
中的文件表达式。咱们能够在 file_name_expression
中增加变量 ${now}
或 ${uuid}
,例如 test_${uuid}_${now}
,${now}
代表以后工夫,其格局能够通过指定选项 filename_time_format
来定义。
请留神,如果 is_enable_transaction
为 true
,咱们将在文件名的结尾主动增加 ${transactionId}_
。
filename_time_format [string]
仅在 custom_filename
为 true
时应用
当 file_name_expression
参数中的格局为 xxxx-${now}
时,filename_time_format
能够指定门路的工夫格局,默认值为 yyyy.MM.dd
。罕用的工夫格局列于下表中:
符号 | 形容 |
---|---|
y | 年 |
M | 月 |
d | 月中的天数 |
H | 一天中的小时 (0-23) |
m | 小时中的分钟 |
s | 分钟中的秒数 |
file_format_type [string]
咱们反对以下文件类型:
- 文本 (text)
- JSON
- CSV
- ORC
- Parquet
- Excel
请留神,最终文件名将以文件格式的后缀结尾,文本文件的后缀是 txt
。
field_delimiter [string]
数据行中列之间的分隔符。仅在 file_format
为 text 时须要。
row_delimiter [string]
文件中行之间的分隔符。仅在 file_format
为 text 时须要。
have_partition [boolean]
是否须要解决分区。
partition_by [array]
仅在 have_partition
为 true
时应用。
基于选定字段对分区数据进行分区。
partition_dir_expression [string]
仅在 have_partition
为 true
时应用。
如果指定了 partition_by
,咱们将依据分区信息生成相应的分区目录,并将最终文件放在分区目录中。
默认的 partition_dir_expression
是 ${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/
。k0
是第一个分区字段,v0
是第一个分区字段的值。
is_partition_field_write_in_file [boolean]
仅在 have_partition
为 true
时应用。
如果 is_partition_field_write_in_file
为 true
,分区字段及其值将写入数据文件中。
例如,如果您想要写入 Hive 数据文件,其值应为 false
。
sink_columns [array]
须要写入文件的哪些列,默认值为从 “Transform” 或 “Source” 获取的所有列。
字段的程序决定了理论写入文件的程序。
is_enable_transaction [boolean]
如果 is_enable_transaction
为 true,咱们将确保在写入目标目录时数据不会失落或反复。
请留神,如果 is_enable_transaction
为 true
,咱们将在文件头部主动增加 ${transactionId}_
。
目前仅反对 true
。
batch_size [int]
文件中的最大行数。对于 SeaTunnel Engine,文件中的行数由 batch_size
和 checkpoint.interval
独特决定。如果 checkpoint.interval
的值足够大,当文件中的行数大于 batch_size
时,写入器将写入文件。如果 checkpoint.interval
较小,则在新的检查点触发时,写入器将创立一个新文件。
compress_codec [string]
文件的压缩编解码器及其反对的详细信息如下:
- txt:
lzo
none
- JSON:
lzo
none
- CSV:
lzo
none
- ORC:
lzo
snappy
lz4
zlib
none
- Parquet:
lzo
snappy
lz4
gzip
brotli
zstd
none
提醒:Excel 类型不反对任何压缩格局。
常见选项
请参考 Sink Common Options 获取 Sink 插件的常见参数详细信息。
max_rows_in_memory [int]
当文件格式为 Excel 时,能够缓存在内存中的数据项的最大数量。
sheet_name [string]
工作簿的工作表名称。
示例
对于文本文件格局,具备 have_partition
、custom_filename
、sink_columns
和 com.amazonaws.auth.InstanceProfileCredentialsProvider
的配置示例:
S3File {
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/text"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
custom_filename = true
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyy.MM.dd"
sink_columns = ["name","age"]
is_enable_transaction=true
hadoop_s3_properties {
"fs.s3a.buffer.dir" = "/data/st_test/s3a"
"fs.s3a.fast.upload.buffer" = "disk"
}
}
对于 Parquet 文件格式,仅需用 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
进行配置:
S3File {
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/parquet"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
file_format_type = "parquet"
hadoop_s3_properties {
"fs.s3a.buffer.dir" = "/data/st_test/s3a"
"fs.s3a.fast.upload.buffer" = "disk"
}
}
对于 orc 文件仅需配置 org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
:
S3File {
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/orc"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
file_format_type = "orc"
}
更新日志
2.3.0-beta 2022-10-20
-
增加 S3File Sink 连接器
2.3.0 2022-12-30
-
Bug 修复
-
修复了以下导致数据写入文件失败的谬误:
- 当上游字段为空时会抛出 NullPointerException
- Sink 列映射失败
- 从状态中复原写入器时间接获取事务失败 (3258)
-
-
性能
-
反对 S3A 协定 (3632)
- 容许用户增加额定的 Hadoop-S3 参数
- 容许应用 S3A 协定
- 解耦 Hadoop-AWS 依赖
- 反对设置每个文件的批处理大小 (3625)
- 设置 S3 AK 为可选项 (3688)
-
下一版本
-
[优化] 反对文件压缩(3699)
本文由 白鲸开源科技 提供公布反对!