共计 7065 个字符,预计需要花费 18 分钟才能阅读完成。
Apache SeaTunnel 现已反对 StarRocks Connector,使其“Connector 方阵”进一步扩充。StarRocks 是一个用户根底宽泛的 MPP 数据库,SeaTunnel 将 StarRocks 纳入反对列表,将不便用户更好地解决数据同步问题。
在 2023 年 3 月 30 日 的 SeaTunnel 线上流动上,贡献者 毕博 为咱们分享了《SeaTunnel StarRocks 连接器的å应用及原理介绍》,内容精髓整顿如å下。
贡献者简介
毕博,马蜂窝数据平台负责人,Apache SeaTunnel 贡献者
分享纲要:
● Seatunnel StarRocks 连接器简介
● StarRocks Connector 性能个性
● StarRocks Connector 数据读取解析
● StarRocks Connector 数据写入解析
● StarRocks Connector 的应用示例
● StarRocks Connector 的后续布局
SeaTunnel StarRocks 连接器简介
首先介绍一下数据同步平台 Apache SeaTunnel 的根本架构。
上图为 SeaTunnel 架构图,它提供了一套形象的 API,包含 Source、Transform、Sink API 等。基于这些形象 API 可扩大出各种各样 Connector,其中基于 Source API 实现的 Source Connector 能够从左侧泛滥的数据源中读取数据,Transform Connector 用于实现数据 Pineline 中的数据转换解决,而 Sink Connector 能够将数据写入到右侧多种异构的数据源中。
同时,在运行时,SeaTunnel 提供了还一个翻译层,会将 Connector 实现的 Source 和 Sink API 翻译成引擎外部可运行的 API,使连接器的能够在不同引擎上运行。目前 SeaTunnel 反对三种执行引擎,Spark,Flink,以及 SeaTunnel 自研执行引擎 Zeta Engine。
以上就是 SeaTunnel 的整体架构,能够看出,SeaTunnel 通过 Souce Connector 和 Sink Connector 实现与不同数据源进行链接。据理解,目前 SeaTunnel 曾经反对 100+ 种丰盛的连接器,其中包含 StarRocks Connector。
上图形容了 Seatunnel 和 StarRocks 联合的整体状况,SeaTunnel 提供了 Source Connector 和 Sink Connector。在 Source 局部 StarRocks 的表作为数据源,通过 Seatunel Source Connector 分布式地去提取 StarRocks 的数据;两头通过 SeaTunnel 提供的 Transform Connector 做一些分布式的数据处理和转换;前面的 StarRocks Sink 连接器则次要是把 SeaTunnel 内存里的数据,通过 StarRocks 提供的 Stream Load API,将数据导入到 StarRocks。
StarRocks Connector 性能个性
Source 性能个性
这张表格展现了目前 SeaTunnel Source 连接器以后反对的外围性能和个性,包含:
● 字段投影: 假如待读取表有多个字段,然而整个的数据处理的 Pipline 中 只用到局部字段,那么针对这些局部列做数据同步就是字段投影的应用;
● 谓词下推: 谓词下推能够在数据扫描的时候过滤大量用不到的数据,通过下推到引擎,能够缩小数据传输的数据量;
● 数据类型的主动映射: 是对于从 StarRocks 读取的数据类型与 Seatunnel 外部数据类型的映射,前面会介绍目前反对的数据类型;
● 用户自定义分片: 是通过将待读取数据源的整个数据集拆分多个分片,每一个分片能够独自查问,并且在分片生成的阶段用户能够通过配置参数去管制分片生成的数量;
● 并行读取:首先 StarRocks 是反对并行的读取数据源,同时基于下面的数据源分片的切分,在读取时多个分片同时独立进行,最终通过并行读取放慢读取的速率;
● 状态的复原:Source 连接器读取阶段切分多个分片之后,连接器在读取过程中会定期将未进行读取分片信息保留在 State 中;这样在故障复原的时候,联合 state 中分片的地位信息进行从新读取;
● 至多一次:得益于状态复原,所以在读取端提供至多一次的语义;
● batch 模式:目前 StarRocks 连接器 Source 局部只反对批模式。
Sink 性能个性
Sink 的数据导入是基于 StarRocks 的 Stream Load 实现,因为 Stream Load 反对 Csv 和 Json 两种文件格式,所以连接器在 sink 端能够指定 csv 和 json 两种文件格式进行导入。
写入时,思考到写入的效率,所以会波及数据赞批,进行批量数据写入,而不是单条提交。
如果写入呈现了异样,程序会主动判断是不是可复原的异样,再基于肯定的策略进行重试。
对于 CDC, 目前 Seatunnel 反对数据库的 changelog 捕获,再联合 StarRocks 的 Stream Load 接口,能够对 StarRocks 的主键模型表进行数据变更,包含插入、更新和删除数据,所以连接器以后反对将 Seatunnel 获取的 cdc 数据导入到 StarRocks 中来。
上述列表展现了目前 Source 和 Sink 连接器曾经反对的性能和个性,心愿在理论利用中能够给大家提供一些参考。
StarRocks Connector 读取原理
接下来我会着重介绍 StarRocks Connector 的读取原理,帮忙大家更好地应用连接器性能。
字段投影
咱们在读 StarRocks 表的时候,是能够抉择局部字段读取的,比方这里咱们有一个 StarRocks 表,有 4 个字段。然而理论同步应用到的字段只有 lo_orderkey、lo_number 两个字段,对于指定局部列的提取数据场景,能够在配置 Source 连接器的时候,通过 fields 参数来指定要查问的字段和数据读到 Seatunnel 下面的字段数据类型。
这样,在 Seatunnel 真正执行的时候,就能只同步指定的字段,最终同步到 StarRocks 数据如下图。
通过缩小投影字段能够升高同步过程网络、内存资源耗费,晋升同步性能。
谓词下推
在理论应用中,咱们可能须要过滤掉局部行的数据,如获取表中 linenum< 3 的局部数据。这时,咱们能够在配置 Source 连接器的时候,通过配置 scan-filter 参数来过滤指定的局部行。
在理论执行中会将条件下推到 StarRocks,在 StarRocks 引擎内进行分区剪裁或分桶剪裁等优化解决。
这样,在读取数据阶段跳过全表扫描,能够大大减少数据处理的数据量,晋升读取数据的效率。
字段投影 & 谓词下推实现
在具体实现上,通过用户在连接器中配置中指定的 fields 和 scan-filter 参数,连接器在程序中会主动生成实用于 StarRocks 的查问语句。如图,通过程序转换,最终生成 SQL。
并行读取:实现计划
并行读取 StarRocks 数据次要有两种计划,以 Flink 引擎读取 StarRocks 为例。
第一个计划是间接通过 JDBC 协定读取数据,数据最终须要通过 fe 单节点将数据抽取上来,读取效率较低;
第二个计划是进行分布式的设计,先通过 fe 查问对应 StarRocks 表的分片的元数据信息,获取待读取数据的数据分布状况,再用分布式并行的形式间接从多个 be 节点读取数据。
这样做让整体的吞吐能力失去很大的晋升,目前 StarRocks Connector 基于第二种计划。
并行读取:获取 SR 的数据分布
Source 连接器实现并行读取,首先要晓得 StarRocks 表的存储的数据分布状况。以后 StarRocks 的 FE 提供了获取单表查问打算接口,通过指定要查问的表及 sql 进行 api 接口的调用。
右侧是 fe 接口返回的后果通过序列化后对应的数据结构,query plane 为查问打算的字符串。
parrtions 是一个 map, key 是 starrocks tablet id,value 为分桶的散布在 be 节点的地址,因为 starrocks 表的数据是多正本治理,所以会有多个 be 地址。
通过以上信息信息,就能够晓得表中要查问数据的数据分布状况。
并行读取:split 切分 (基于数据分布)
要实现并行读取,就须要要看待查问的指标表的数据范畴划分,再进行分片切分,让并行的线程读取特定分片的数据。
在 Souce Connector 实现中,分片切分是基于 StarRocks 表的数据分布进行数据范畴划分。
左侧则是形容了 StarRocks 数据分布。StarRocks 应用列式存储,采纳分辨别桶机制进行数据管理。对应图中表 A 依照日期“月”划分分区,进一步的 2023-01 月份的分区切分为 5 个 分桶 (A、B、C、D、E.)。
分桶是 StarRocks 中最小的数据管理单元,每个分桶应用多正本进行组织, 对应图中别离为分桶 A,有 A-1、A-2、A-3 个正本;分桶 B 有 B-1,B-2,B-3 等,这些分桶正本最终会存储在不同的 BE 节点中。
如果咱们要同步表 A 中 2023 年 1 月份的数据,首先要晓得这部分数据的数据分布状况。之前介绍了通过 Fe Api 能够获取 StarRocks 表的数据分布状况,对应图中,分桶 A 数据保留在 BE-1、BE-2、BE-3 上。
下一步,通过 肯定策略,为每一个 tablet 抉择最优的 be 查问节点,准则是最终后果中每一个 be 节点有绝对均等数量的分桶期待被查问,这样能够保障在并行查问时,每一个 be 节查问负载绝对平衡。
最初,依据后面为每一个分桶抉择查问 be 节点信息生成的 split 分片。
并行读取:用户自定义分片
Source Connector 反对自定义分片,也就是用户能够管制分片生成, 通过 request_tablet-size 这个配置参数制。
刚刚咱们介绍了生成 split 分片切分的过程,StarRocks 表 A 的 5 个分桶 A\B\C\D\E,最终生成了 3 个分片对图中上半局部。假加咱们想让查问数据的并发度更高,就须要生成更多的分片。这时,咱们能够设置 request_tablet-size,限度每个分片中 tablet 的数量。比方咱们配置 request_tablet-siz=1,示意每个分片的分桶最多为 1,那么最终将会生成 5 个分片 成果如上图。
并行读取:调配 split 到 reader
Split 切分好了,须要调配给每一个并行的 Reader。
Reader 数量的指定是通过在工作的 env 配置并行度(图左侧),配置好就会有几个并行的 Reader 去读取数据源。
图中右侧是具体分片调配给 Reader 的过程:Split 通过 split 中的属性 id 向 Reader 数取模,使每一个 Reader 上调配的分片数绝对统一。
并行读取:reader 读取数据
将 split 分片配给 Reader 之后,每一个 Reader 就开始理论的数据读取,该过程是每个 Reader 通过 be 提供的一组 thrift 协定向 be 节点扫描。分桶对应的数据如图中所示,每个分片蕴含了须要向哪个 be 节点查问及须要扫描 be 上的哪些分桶数据。
下图是 be 提供 thrift 协定具体接口。
有三个重要的办法,首先创立一个 scanner,通过相似游标的形式,屡次调用 getnext 获取全副数据,最终数据都实现返回后,通过 close scaner 开释资源。
并行读取:arrow -> seatunnel row 的数据转换
Reader 通过 thrift 协定向 be 节点扫描数据,最终从 be 获取到的数据是 apache arrrow 的数据格式。
因为 StarRocks 表的数据通过 Seatunnel 读取进去之后首先要转换为 seatunnel 本人的数据结构 SeatunnelRow,之后才能够在 Seatunnel 外部进行数据转换及写出,因而须要将 apache arrow 的数据类型转换为 Seatunnel 的数据类型。
整个转换过程如图所示:
其中 apache arrow 的 vachar 能够依据用户在 souece 连接器配置数据投影的数据类型转换为 Date、Timestamp 和 String。
数据类型映射
最终 从 StarRocks 读取的数据类型、从 be 节点获取的 apache-arrrow 格局的数据类型 以及 转换后 Seatunnel 的数据类型之间的映射关系如上图,也是目前 StarRocks 连接器反对的数据类型映射,基本上笼罩了所有的数据类型,但 ARRAY、HLL、BITMAP 等临时还不反对。
在应用中咱们只须要关怀 StarRocks 的 Datatype 和 Seatunnel Datatype 的映射就能够,apache-arrrow 局部的转换是程序主动实现的。
并行读取: 状态复原
在读取的时候还会波及到状态复原,因为如果工作读取的数据量比拟大,读取的工夫会较长,两头如果呈现谬误或者异样,须要从出错的地位从新读取,相似于断点续传。
这外面有两个比拟重要的过程:
● 状态的保留: 通过 Reader 把 未读取的 split 信息存到 state 里,引擎在读取过程会定期对 state 做快照,如 snapshotState 办法的逻辑;
● 状态复原:Reader 的状态复原次要是通过最初一次快照,进行复原后持续读取。在开始读取数据的时候,从未读取的分片汇合中外面去生产,之后开始理论读取,对应 pollNext 办法逻辑。
StarRocks Connector 写入原理
介绍完 Source Connector 的写入原理,咱们再来看 Sink 连接器的写入原理。
StarRocks Sink 写入是基于 Stream Load 接口,在写入时须要做解决批量和重试。对于批量,数据是在写入之后,先缓存在内存中,达到肯定阈值之后再进行批量数据的提交。
阈值目前包含批数据的大小、数据条数限度,同时连接器也反对定时提交,肯定工夫距离下提交一次。
📢留神,在 sink 的时候,须要注意 ”too many tablet versions” 报错,呈现问题是因为导入频率太快,数据没能及时合并 (Compaction),从而导致版本数超过反对的最大未合并版本数。
除了优化 BE 的配置,调整合并策略,如 cumulative_compaction_num_threads_per_disk、base_compaction_num_threads_per_disk 等来放慢合并,也能够在 sink 端管制批量的提交的阈值,增大单次导入的数据量,升高导入频率。
对于重试,SeaTunnel 反对配置重试策略,如重试次数,期待距离与最大重试次数等。
CDC 数据写入反对
目前,SeaTunnel 已反对数据库变更数据捕捉(CDC https://github.com/apache/incubator-seatunnel/issues/3175),以将数据更改实时传送到上游零碎。SeaTunnel 将捕捉到的数据更改分为以下 4 种类型:iNSERT(数据插入)、UPDATE_BEFORE(数据更改前的旧值)、UPDATE_AFTER(数据更改后的新值)、DELETE (数据删除)。
在写入的指标数据下面,StarRocks 数据源的主键模型反对通过 Stream Load 导入作业,同时对 StarRocks 表进行数据变更,包含插入、更新和删除数据。
因而,将 Seatunnel changelog 数据的变更类型转换成 StarRocks 反对的变更类型,使 Seatunnel Connector 能够反对 cdc 写入 StarRocks。
如果上图所示,在 Seatunnel 中如果 cdc 数据如图所示,别离插入主键为 1\2\ 3 的数据,对主键 1 进行 UPDATE, 会生成 update_before、update_after、dedete 的 cdc changelog event, 通过 sink 连接器配置中 enable_upsert_delete = true,开启将 cdc 数据写入 StarRocks 的反对。
StarRocks Connector 应用示例
以在 StarRocks 与 StarRocks 之间同步数据这个应用场景为例,介绍如何配置应用连接器。假如在 StarRocks 有一张数据表 customer_1,有四个数据列,咱们指标将数据同步到一个张表 customer_2 , 首先在 seatunnel 工作配置文件中配置 Source Connector,数据表有 4 个字段列,咱们只须要 2 个字段,所以配置数据投影。
在 Transform Connector 配置中咱们进一步进行数据处理,心愿将 c_name 字段中 customer 前缀去除,保留数字局部同时导入数据字段名称跟指标表名称表不统一,须要通过 sql 重命名。
最初配置 Sink Connector,配置指标数据源的链接信息,指定 Stream Load 数据导入的文件格式为 Json。
在 env 外面对工作参数进行指定,如工作的整体并行度,当然也能够在 connector 的配置外面独自指定并行度。
最终导入到指标表的 customer_2 如下图:
连接器后续布局
至此,咱们能够看到,SeaTunnel 的根本数据同步性能曾经十分欠缺了,但一些数据同步场景对数据可靠性有着更高的要求,在 sink 侧须要有仅一次和至多一次的语义反对,这两点曾经在社区的反对打算中了。
其中对于 EOS 语义,StarRocks 2.4 版本提供了 Stream Load 事务接口,为实现高效导入同时兼顾 EOS 提供了实现的根底;
另外,社区还打算在 Source 和 Sink 连接器中反对更多的数据类型映射,如 bitmap、hill、array 等,丰盛连接器的性能。
本文由博客一文多发平台 OpenWrite 公布!