关于数据库:一文教会你用Apache-SeaTunnel-Zeta离线把数据从MySQL同步到StarRocks

0次阅读

共计 4718 个字符,预计需要花费 12 分钟才能阅读完成。

在上一篇文章中,咱们介绍了如何下载安装部署 SeaTunnel Zeta 服务(3 分钟部署 SeaTunnel Zeta 单节点 Standalone 模式环境),接下来咱们介绍一下 SeaTunnel 反对的第一个同步场景:离线批量同步。顾名思意,离线批量同步须要用户定义好 SeaTunnel JobConfig,抉择批处理模式,作业启动后开始同步数据,当数据同步实现后作业实现退出。

上面以 MySQL 离线同步到 StarRocks 为例,介绍如何应用 SeaTunnel 进行离线同步作业的定义和运行。

1. 定义作业配置文件

SeaTunnel 应用配置文件来定义作业,在这个示例中,作业的配置文件如下,文件保留门路~/seatunnel/apache-seatunnel-incubating-2.3.1/config/mysql_to_sr.config

# 定义一些作业的运行参数, 具体能够参考 https://seatunnel.apache.org/docs/2.3.1/concept/JobEnvConfig
env {
    job.mode="BATCH"  #作业的运行模式,BATCH= 离线批同步,STREAMING= 实时同步
    job.name="SeaTunnel_Job"
    checkpoint.interval=10000 #每 10000ms 进行一次 checkpoint,前面会具体介绍 checkpoint 对 JDBC Source 和 StarRocks Sink 这两个连接器的影响
}
source {
    Jdbc {
        parallelism=5 # 并行度,这里是启动 5 个 Source Task 来并行的读取数据
        partition_column="id" # 应用 id 字段来进行 split 的拆分,目前只反对数字类型的主键列,而且该列的值最好是离线的,自增 id 最佳
        partition_num="20" # 拆分成 20 个 split,这 20 个 split 会被调配给 5 个 Source Task 来解决
        result_table_name="Table9210050164000"
        query="SELECT `id`, `f_binary`, `f_blob`, `f_long_varbinary`, `f_longblob`, `f_tinyblob`, `f_varbinary`, `f_smallint`, `f_smallint_unsigned`, `f_mediumint`, `f_mediumint_unsigned`, `f_int`, `f_int_unsigned`, `f_integer`, `f_integer_unsigned`, `f_bigint`, `f_bigint_unsigned`, `f_numeric`, `f_decimal`, `f_float`, `f_double`, `f_double_precision`, `f_longtext`, `f_mediumtext`, `f_text`, `f_tinytext`, `f_varchar`, `f_date`, `f_datetime`, `f_timestamp` FROM `sr_test`.`test1`"
        password="root@123"
        driver="com.mysql.cj.jdbc.Driver"
        user=root
        url="jdbc:mysql://st01:3306/sr_test?enabledTLSProtocols=TLSv1.2&rewriteBatchedStatements=true"
    }
}
transform {# 在本次示例中咱们不须要做工作的 Transform 操作,所以这里为空,也能够将 transform 整个元素删除}
sink {
    StarRocks {
        batch_max_rows=10240 # 
        source_table_name="Table9210050164000"
        table="test2"
        database="sr_test"
        base-url="jdbc:mysql://datasource01:9030"
        password="root"
        username="root"
        nodeUrls=["datasource01:8030" #写入数据是通过 StarRocks 的 Http 接口]
    }
}

2. 作业配置阐明

在这个作业定义文件中,咱们通过 env 定义了作业的运行模式是 BATCH 离线批处理模式,同时定义了作业的名称是 ”SeaTunnel_Job”。checkpoint.interval 参数用来定义该作业过程中多久进行一次 checkpoint,那什么是 checkpoint,以及 checkpoint 在 Apache SeaTunnel 中的作用是什么呢?

2.1 checkpoint

查看官网文档中对 Apache SeaTunnel Zeta 引擎 checkpoint 的介绍:https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/chec… 发现 checkpoint 是用来使运行在 Apache SeaTunnel Zeta 中的作业能定期的将本人的状态以快照的模式保留下来,当工作意外失败时,能够从最近一次保留的快照中复原作业,以实现工作的失败复原,断点续传等性能。其实 checkpoint 的外围是分布式快照算法:Chandy-Lamport 算法,是广泛应用在分布式系统,更多是分布式计算零碎中的一种容错解决实践根底。这里不具体介绍 Chandy-Lamport 算法,接下来咱们重点阐明在本示例中 checkpoint 对这个同步工作的影响。

Apache SeaTunnel Zeta 引擎在作业启动时会启动一个叫 CheckpointManager 的线程,用来治理这个作业的 checkpoint。SeaTunnel Connector API 提供了一套 checkpoint 的 API,用于在引擎触发 checkpoint 时告诉具体的 Connector 进行相应的解决。SeaTunnel 的 Source 和 Sink 连接器都是基于 SeaTunnel Connector API 开发的,只是不同的连接器对 checkpoint API 的实现细节不同,所以能实现的性能也不同。

2.1.1 checkpoint 对 JDBC Source 的影响

在本示例中咱们通过 JDBC Source 连接器的官网文档 https://seatunnel.apache.org/docs/2.3.1/connector-v2/source/Jdbc 能够发现如下内容:

这阐明 JDBC Source 连接器实现了 checkpoint 相干的接口,通过源码咱们能够得悉,当 checkpoint 产生时,JDBC Source 会将本人还未解决的 split 做为状态的快照发送给 CheckpointManager 进行长久化保留。这样当作业失败并复原时,JDBC Source 会从最近一次保留的快照中读取哪些 split 还未解决,而后接着解决这些 split。

在该作业中通过 partition_num=20,会将 query 参数中指定的 sql 语句的后果分成 20 个 split 进行解决,每个 split 会生成读取它负责的数据的 sql,这个 sql 是由 query 中指定的 sql 再加上一些 where 过滤条件组成的。这 20 个 split 会被调配给 5 个 Source Task 进行解决,现实状况下,每个 Source Task 会调配到 4 个 split。假如在一次 checkpoint 时每个 Source Task 都只剩下一个 split 没有解决,这个 split 的信息会被保留下来,如果这之后作业挂掉了,作业会主动进行复原,复原时每个 Source Task 都会获取到那个还未解决的 split,并接着进行解决。如果作业不再报错,这些 split 都解决实现后,作业运行实现。如果作业还是报错(比方指标端 StarRocks 挂了,无奈写入数据),最终作业会以失败状态完结。

断点续传:

如果在作业失败后,咱们修复了问题,并且心愿该作业接着之前的进度运行,只解决那些之前没有被解决过的 split,能够应用 sh seatunnel.sh -r jobId 来让作业 ID 为 jobId 的作业从断点中复原。

回到主题,checkpoint.interval=10000 对于从 Mysql 中读取数据意味着每过 10s,SeaTunnel Zeta 引擎就会触发一次 checkpoint 操作,而后 JDBC Source Task 会被要求将本人还未解决的 split 信息保留下来,这里需要留神的是,JDBC Source Task 读取数据是以 split 为单位的,如果 checkpoint 触发时一个 split 中的数据正在被读取还未齐全发送给上游的 StarRocks,它会等到这个 split 的数据处理实现之后才会响应这次 checkpoint 操作。这里肯定要留神,如果 MySQL 中的数据量比拟大,一个 split 的数据须要很长的时候能力解决实现,可能会导致 checkpoint 超时。对于 checkpoint 的超时时长能够参数 https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/chec…,默认是 1 分钟。

2.1.2 checkpoint 对 StarRocks Sink 的影响

在 Sink 连接器的文档上,咱们也能看到如下图中的标识:

这个标识代表该 Sink 连接器是否实现了准确解决一次的语义,如果该标识被选中,阐明这个 Sink 连接器能保障发给它的数据它只会往指标端写入一次,不会漏掉导致指标端数据失落,也不会反复往指标端写入。这一性能常见的实现形式是两阶段提交,反对事务的连接器个别会先开启事务进行数据的写入。当 checkpoint 产生时,将事务 ID 返回给 CheckManager 进行长久化,当作业中的所有 Task 都响应了 CheckManager 的 checkpoint 申请后,第一阶段实现。而后 Apache SeaTunnel Zeta 引擎会调用 AggregateCommit 的办法让 Sink 对其事务进行提交,这个过程被称为第二阶段,第二阶段实现后该次 checkpoint 实现。如果第二阶段提交失败,作业会失败,而后主动复原,复原后会再次从第二阶段开始,要求对事务进行提交,直到该事务提交实现,如果事务始终失败,作业也将失败。

并不是只有实现了 exactly-once 个性的 Sink 连接器能力保障指标端的数据不失落不反复,如果指标端的数据库反对以主键去重,那只有 Sink 连接器保障发送给它的数据至多往指标端写入一次,无论反复写入多少次,最终都不会导致指标端数据失落或反复。在该示例中 StarRocks Sink 连接器即是应用了这种形式,StarRocks Sink 连接器会将收到的数据先缓存在内存中,当缓存的行数达到 batch_max_rows 设置的 10240 行,就会发动一次写入申请,将数据写入到 StarRocks 中。如果 MySQL 中的数据量很小,达不到 10240 行,那就会在 checkpoint 触发时进行 StarRocks 的写入。

3. 运行作业

咱们应用 Apache SeaTunnel Zeta 引擎来运行该作业

cd ~/seatunnel/apache-seatunnel-incubating-2.3.1
sh bin/seatunnel.sh --config config/mysql_to_sr.config

作业运行实现后能够看到如下信息,阐明作业状态为 FINISHED,读取 20w 行数据,写入 StarRocks 也是 20w 行数据,用时 6s。

本文由 白鲸开源科技 提供公布反对!

正文完
 0