始终以来,大数据量始终是爆炸性增长,每天几十 TB 的数据增量曾经十分常见,但云存储相对来说还是不便宜的。泛滥云上的大数据用户特地心愿能够非常简单疾速的将文件挪动到更实惠的 S3、OSS 上进行保留,这篇文章就来介绍如何应用 SeaTunnel 来进行到 OSS 的数据同步。

首先简要介绍一下 Apache SeaTunnel,SeaTunnel 专一于数据集成和数据同步,次要解决以下问题:

  • 数据源多样:罕用的数据源有数百种,版本不兼容。随着新技术的呈现,呈现了更多的数据源。用户很难找到可能全面疾速反对这些数据源的工具。
  • 简单同步场景:数据同步须要反对离线-全量同步、离线-增量同步、CDC、实时同步、全库同步等多种同步场景。
  • 资源需要高:现有的数据集成和数据同步工具往往须要大量的计算资源或 JDBC 连贯资源来实现海量小表的实时同步。这在肯定水平上减轻了企业的累赘。
  • 不足品质和监控:数据集成和同步过程常常会失落或反复数据。同步过程不足监控,无奈直观理解工作过程中数据的真实情况

SeaTunnel 反对海量数据的高效离线/实时同步, 每天可稳固高效同步数百亿级数据,曾经有 B 站,腾讯云,微博,360,Shopee 等数百家公司生产应用。

上面步入明天的正题,明天具体来说是讲 Apache SeaTunnel 产品与阿里云 OSS 的集成。

在阿里云 OSS 产品界面,开明 Bucket:

上面是 SeaTunnel 的部署, SeaTunnel 反对多种部署形式: 单机,集群,K8s 等形式。因为 SeaTunnel 不依赖 Zookeeper 等第三方组件,所以整体部署非常简单,具体请参考其官网:https://seatunnel.apache.org/docs/2.3.0/start-v2/locally/depl...

接下来是 SeaTunnel 应用过程,应用命令:

./bin/seatunnel.sh -m local -c ./config/localfile-oss.config

在 SeaTunnel 中,用户能够通过 config 文件定制本人的数据同步需要,最大限度地施展 SeaTunnel 的后劲。那么接下来就给大家介绍一下如何配置 Config 文件

能够看到,config 文件蕴含几个局部:env、source、transform、sink。不同的模块有不同的性能。理解这些模块后,您将理解 SeaTunnel 的工作原理。

用于增加一些引擎可选参数,无论是哪个引擎(Spark或Flink),这里都要填写相应的可选参数。

source 用于定义 SeaTunnel 须要从哪里获取数据,并将获取的数据用于下一步。能够同时定义多个源。当初反对的起源查看 SeaTunnel 的起源。每个 Source 都有本人特定的参数来定义如何取数据,SeaTunnel 也提取了每个 source 会用到的参数,比方parameter,用来指定 result_table_name 以后 source 产生的数据的名称,不便供其余模块后续应用。

本例中的 localfile-oss.config 配置文件内容介绍:

env {                                                                                                                                                                             # You can set SeaTunnel environment configuration here                                                                                                                         execution.parallelism = 10                                                                                                                                                     job.mode = "BATCH"                                                                                                                                                              checkpoint.interval = 10000                                                                                                                                                     #execution.checkpoint.interval = 10000                                                                                                                                         #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"                                                                                                          }                                                                                                                                                                                                                                                                                                                                                               source {                                                                                                                                                                        LocalFile {                                                                                                                                                                     #本地待同步的数据文件夹, 本例子中只有一个 test0.csv 文件,具体内容参考下图  path = "/data/seatunnel-2.3.1/testfile/source"                                                                                                                                 type = "csv"                                                                                                                                                                                                                                                                                                                                     delimiter = "#"                                                                                                                                                                  schema {                                                                                                                                                                          fields {                                                                                                                                                                            name = string                                                                                                                                                                   age = int                                                                                                                                                                      gender = string                                                                                                                                                             }                                                                                                                                                                             }                                                                                                                                                                            }                                                                                                                                                                                                                                                                                      }                                                                                                                                                                                                                                                                                                                                                               sink {                                                                                                                                                                                                                                                                                                                                           OssJindoFile {                                                                                                                                                                                                                                                                                                                                                                                                 path="/seatunnel/oss03"                                                            bucket = "oss://bucket123456654321234.cn-hangzhou.oss-dls.aliyuncs.com"                                                                                                           access_key = "I5t7VZyZSmMNwKsNv1LTADxW"                                                                                                                                        access_secret = "BinZ9J0zYxRbvG9wQUi6LiUjZElLTA"                                                                                                                                                                                                                                                                endpoint = "cn-hangzhou.oss-dls.aliyuncs.com"                                                                                                                                }                                                                                                                                                                                                                                                                                  }

注:下图本地待同步的数据文件夹, 本例子中只有一个 test0.csv 文件,具体内容

特地留神:如果是开明了 HDFS 的 OSS,有 2 个中央是不一样的:1 是 bucket,1 是 endpoint 。如下红色局部是开明了 HDFS 后的,被 “#” 正文掉的是未开明 HDFS 的状况。

SeaTunnel 对这 2 种状况都是反对的,只是大家要留神一下配置 bucket 和 endpoint 时的不同!

执行运行命令后,咱们能够从 SeaTunnel 控制台看下以下 SeaTunnel 本次同步状况的数据:


       Job Statistic Information                                                                                                                                           

Start Time : 2023-02-22 17:12:19

End Time : 2023-02-22 17:12:37

Total Time(s) : 18

Total Read Count : 10000000

Total Write Count : 10000000

Total Failed Count : 0


从阿里云界面上能够看到 OSS 端的监控数据:



能够看进去 SeaTunnel 疾速高效地同步了 1000万数据量的本地文件!

最初,Apache SeaTunnel 目前曾经反对了过百种数据源,并公布了 SeaTunnel Zeta 同步引擎,性能巨佳,还有群进行技术支持,欢送比照,欢送一试!感兴趣的搭档欢送分割社区志愿者微信: seatunnel1

参考:

1、https://seatunnel.apache.org/docs/2.3.0/start-v2/locally/depl...

2、https://seatunnel.apache.org/docs/2.3.0/start-v2/locally/quic...

3、https://seatunnel.apache.org

本文由 白鲸开源科技 提供公布反对!