在上一篇文章中,咱们介绍了如何下载安装部署SeaTunnel Zeta服务(3分钟部署SeaTunnel Zeta单节点Standalone模式环境),接下来咱们介绍一下SeaTunnel反对的第一个同步场景:离线批量同步。顾名思意,离线批量同步须要用户定义好SeaTunnel JobConfig,抉择批处理模式,作业启动后开始同步数据,当数据同步实现后作业实现退出。
上面以MySQL离线同步到StarRocks为例,介绍如何应用SeaTunnel进行离线同步作业的定义和运行。
1. 定义作业配置文件
SeaTunnel应用配置文件来定义作业,在这个示例中,作业的配置文件如下,文件保留门路~/seatunnel/apache-seatunnel-incubating-2.3.1/config/mysql_to_sr.config
#定义一些作业的运行参数,具体能够参考 https://seatunnel.apache.org/docs/2.3.1/concept/JobEnvConfig
env {
job.mode="BATCH" #作业的运行模式,BATCH=离线批同步,STREAMING=实时同步
job.name="SeaTunnel_Job"
checkpoint.interval=10000 #每10000ms进行一次checkpoint,前面会具体介绍checkpoint对JDBC Source和StarRocks Sink这两个连接器的影响
}
source {
Jdbc {
parallelism=5 # 并行度,这里是启动5个Source Task来并行的读取数据
partition_column="id" # 应用id字段来进行split的拆分,目前只反对数字类型的主键列,而且该列的值最好是离线的,自增id最佳
partition_num="20" # 拆分成20个split,这20个split会被调配给5个Source Task来解决
result_table_name="Table9210050164000"
query="SELECT `id`, `f_binary`, `f_blob`, `f_long_varbinary`, `f_longblob`, `f_tinyblob`, `f_varbinary`, `f_smallint`, `f_smallint_unsigned`, `f_mediumint`, `f_mediumint_unsigned`, `f_int`, `f_int_unsigned`, `f_integer`, `f_integer_unsigned`, `f_bigint`, `f_bigint_unsigned`, `f_numeric`, `f_decimal`, `f_float`, `f_double`, `f_double_precision`, `f_longtext`, `f_mediumtext`, `f_text`, `f_tinytext`, `f_varchar`, `f_date`, `f_datetime`, `f_timestamp` FROM `sr_test`.`test1`"
password="root@123"
driver="com.mysql.cj.jdbc.Driver"
user=root
url="jdbc:mysql://st01:3306/sr_test?enabledTLSProtocols=TLSv1.2&rewriteBatchedStatements=true"
}
}
transform {
# 在本次示例中咱们不须要做工作的Transform操作,所以这里为空,也能够将transform整个元素删除
}
sink {
StarRocks {
batch_max_rows=10240 #
source_table_name="Table9210050164000"
table="test2"
database="sr_test"
base-url="jdbc:mysql://datasource01:9030"
password="root"
username="root"
nodeUrls=[
"datasource01:8030" #写入数据是通过StarRocks的Http接口
]
}
}
2. 作业配置阐明
在这个作业定义文件中,咱们通过env定义了作业的运行模式是BATCH离线批处理模式,同时定义了作业的名称是”SeaTunnel_Job”。checkpoint.interval参数用来定义该作业过程中多久进行一次checkpoint,那什么是checkpoint,以及checkpoint在Apache SeaTunnel中的作用是什么呢?
2.1 checkpoint
查看官网文档中对Apache SeaTunnel Zeta引擎checkpoint的介绍: https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/chec… 发现checkpoint是用来使运行在Apache SeaTunnel Zeta中的作业能定期的将本人的状态以快照的模式保留下来,当工作意外失败时,能够从最近一次保留的快照中复原作业,以实现工作的失败复原,断点续传等性能。其实checkpoint的外围是分布式快照算法:Chandy-Lamport 算法,是广泛应用在分布式系统,更多是分布式计算零碎中的一种容错解决实践根底。这里不具体介绍Chandy-Lamport 算法,接下来咱们重点阐明在本示例中checkpoint对这个同步工作的影响。
Apache SeaTunnel Zeta引擎在作业启动时会启动一个叫CheckpointManager的线程,用来治理这个作业的checkpoint。SeaTunnel Connector API提供了一套checkpoint的API,用于在引擎触发checkpoint时告诉具体的Connector进行相应的解决。SeaTunnel的Source和Sink连接器都是基于SeaTunnel Connector API开发的,只是不同的连接器对checkpoint API的实现细节不同,所以能实现的性能也不同。
2.1.1 checkpoint对JDBC Source的影响
在本示例中咱们通过JDBC Source连接器的官网文档https://seatunnel.apache.org/docs/2.3.1/connector-v2/source/Jdbc 能够发现如下内容:
这阐明JDBC Source连接器实现了checkpoint相干的接口,通过源码咱们能够得悉,当checkpoint产生时,JDBC Source会将本人还未解决的split做为状态的快照发送给CheckpointManager进行长久化保留。这样当作业失败并复原时,JDBC Source会从最近一次保留的快照中读取哪些split还未解决,而后接着解决这些split。
在该作业中通过partition_num=20,会将query参数中指定的sql语句的后果分成20个split进行解决,每个split会生成读取它负责的数据的sql,这个sql是由query中指定的sql再加上一些where过滤条件组成的。这20个split会被调配给5个Source Task进行解决,现实状况下,每个Source Task会调配到4个split。假如在一次checkpoint时每个Source Task都只剩下一个split没有解决,这个split的信息会被保留下来,如果这之后作业挂掉了,作业会主动进行复原,复原时每个Source Task都会获取到那个还未解决的split,并接着进行解决。如果作业不再报错,这些split都解决实现后,作业运行实现。如果作业还是报错(比方指标端StarRocks挂了,无奈写入数据),最终作业会以失败状态完结。
断点续传:
如果在作业失败后,咱们修复了问题,并且心愿该作业接着之前的进度运行,只解决那些之前没有被解决过的split,能够应用 sh seatunnel.sh -r jobId来让作业ID为jobId的作业从断点中复原。
回到主题,checkpoint.interval=10000对于从Mysql中读取数据意味着每过10s,SeaTunnel Zeta引擎就会触发一次checkpoint操作,而后JDBC Source Task会被要求将本人还未解决的split信息保留下来,这里需要留神的是,JDBC Source Task读取数据是以split为单位的,如果checkpoint触发时一个split中的数据正在被读取还未齐全发送给上游的StarRocks,它会等到这个split的数据处理实现之后才会响应这次checkpoint操作。这里肯定要留神,如果MySQL中的数据量比拟大,一个split的数据须要很长的时候能力解决实现,可能会导致checkpoint超时。对于checkpoint的超时时长能够参数https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/chec…, 默认是1分钟。
2.1.2 checkpoint对StarRocks Sink的影响
在Sink连接器的文档上,咱们也能看到如下图中的标识:
这个标识代表该Sink连接器是否实现了准确解决一次的语义,如果该标识被选中,阐明这个Sink连接器能保障发给它的数据它只会往指标端写入一次,不会漏掉导致指标端数据失落 ,也不会反复往指标端写入。这一性能常见的实现形式是两阶段提交,反对事务的连接器个别会先开启事务进行数据的写入。当checkpoint产生时,将事务ID返回给CheckManager进行长久化,当作业中的所有Task都响应了CheckManager的checkpoint申请后,第一阶段实现。而后Apache SeaTunnel Zeta引擎会调用AggregateCommit的办法让Sink对其事务进行提交,这个过程被称为第二阶段,第二阶段实现后该次checkpoint实现。如果第二阶段提交失败,作业会失败,而后主动复原,复原后会再次从第二阶段开始,要求对事务进行提交,直到该事务提交实现,如果事务始终失败,作业也将失败。
并不是只有实现了exactly-once个性的Sink连接器能力保障指标端的数据不失落不反复,如果指标端的数据库反对以主键去重,那只有Sink连接器保障发送给它的数据至多往指标端写入一次,无论反复写入多少次,最终都不会导致指标端数据失落或反复。在该示例中StarRocks Sink连接器即是应用了这种形式,StarRocks Sink连接器会将收到的数据先缓存在内存中,当缓存的行数达到batch_max_rows设置的10240行,就会发动一次写入申请,将数据写入到StarRocks中。如果MySQL中的数据量很小,达不到10240行,那就会在checkpoint触发时进行StarRocks的写入。
3. 运行作业
咱们应用Apache SeaTunnel Zeta引擎来运行该作业
cd ~/seatunnel/apache-seatunnel-incubating-2.3.1
sh bin/seatunnel.sh --config config/mysql_to_sr.config
作业运行实现后能够看到如下信息,阐明作业状态为FINISHED,读取20w行数据,写入StarRocks也是20w行数据,用时6s。
本文由 白鲸开源科技 提供公布反对!
发表回复