乐趣区

关于数据同步:SeaTunnel-在天翼云数据集成平台的探索实践

点亮 ⭐️ Star · 照亮开源之路

https://github.com/apache/inc…

讲师简介

周利旺 天翼云大数据开发工程师

在 11 月 26 日,Apache SeaTunnel& APISIX 联结 Meetup 期间,天翼云科技大数据开发工程师周利旺给大家分享了天翼云数据集成平台 引入 SeaTunnel 过程中的一些摸索实际,心愿对大家有所帮忙!

天翼云数据集成平台基于 Apache Nifi 二次封装而成,然而对于一些特定的需要 Apache Nifi 不可能很好地满足,因而须要引入第三方的数据集成工具进行能力上的补足。而 SeaTunnel 恰是能用 Nifi 互补的好工具。本次讲座介绍了 SeaTunnel 整合到天翼云数据平台在架构层面的设计与思考。

本文次要包含四个局部:

● Apache SeaTunnel 简介

● Apache Nifi 简介

● SeaTunnel 与 Nifi 整合计划

● 天翼云数据集成平台

● 参加开源教训与心得

01 SeaTunnel 简介

咱们平台次要是面向政企客户,目前是以 Apache Nifi 作为外围,辅之以原生的 Flink 利用,在此基础上进行封装和二次开发出各种各样的数据集成利用,提供面向不同行业的解决方案,目前 Seatunnel 在咱们外部尽管没上生产实践,然而也做了不少的摸索,所以本次议题次要讲下 SeaTunnel 和 Nifi 联合应用的一些教训。因为两者各有优劣,然而在能力上能够舍短取长,从而能够对标更多的性能,满足客户的更多需要。

SeaTunnel 实质上是对 Spark 和 Flink 进行了一层封装,当初新版本又退出了自研的 SeaTunnel 引擎,用户能够通过编辑配置文件来疾速构建工作流。

配置文件包含四个局部

  • env 是配置整体的环境
  • source 配置数据源的信息
  • transform 是配置数据处理相干的
  • sink 配置数据去向相干的。

与 kettle,Nifi 这种重量级的平台相比,SeaTunnel 更像 Datax 轻量级的数据传输工具,用户能够依据须要来装置 Source、Sink、Transform 插件。

就数据源而言,SeaTunnel 反对关系型数据库像 mysql、oracle,非关系型数据库 mongodb、redis 等等,文件类型的 ftp、hdfs、ossFile,网络通信的 socket、http,音讯队列、kafka、pulsar,数据湖等等,其中的品种十分多。

Transform 转换插件反对常见的大小写转换,替换、宰割、sql、取 uid 这些,用户也能够依据本人的需要自定义插件。

装置和启动 SeaTunnel 也很不便,下载压缩包解压后,依据你想运行的工作类型,运行对应的启动脚本。

Spark 和 Flink 都有 v1 和 v2 两个版本,当初还反对新的 SeaTunnel 引擎,启动形式都比拟相似,这里反对提交工作到 local、yarn、k8s 三种环境上。

02 Nifi 简介

Apache Nifi 是一个基于 Web 图形界面,通过拖拽、连贯、配置实现基于流程的编程,它既能够单节点运行,也能够集群模式运行,集群的节点采纳 Zookeeper 进行协调。

图上所示就是一个 Nifi 集群,而后每一个长方形相当于一个 Nifi 节点,在它最上层是以 web server 接管这种用户申请,每个节点的服务运行在一个 jvm 之上,由 processor 和 flow controller 撑持起业务逻辑的构建,flowfile 是底层的数据结构,它会存储在 flowfile repository,content repository。

Nifi 罕用的组件次要包含 五个局部

  • FlowFile
  • Processor
  • Connection
  • Process Group
  • ControllerService

2.1 FlowFile

Flowfile 是 Nifi 底层的数据结构,它由属性和内容组成,它的属性能够用于形容数据,比方一个提供输入数据到本地的 putFile 组件,它输入的 FlowFile 的属性就能够是文件名,这个 Flowfile 是一个抽象概念,能够通过配置把它们放在内存里或者硬盘上,用户能够通过查看队列(可视化的模式)来看到 Flowfile(它的属性以及相干内容)。

2.2 Processor

它是 mapper 外面最外围的一个局部。Processor 能够通过编排连贯来构建工作流。罕用的 Processor 它也会蕴含这个读取数据转化数据、输出数据这几大类的。

Processor 是 Nifi 外面最外围的局部, Processor 能够通过编排连贯来构建工作流。罕用的 Processor 蕴含读取数据, 转换数据、输入数据这几大类的,总的反对的组件数量大概有三四百个。

每个组件都反对配置独自的调度策略,反对定时调度和 cron 表达式调度。Nifi 尽管有这泛滥的组件,然而像 cdc 组件是比拟薄弱的,只有 mysqlcdc,而后像一些新型的数据源读写组件目前开源版本的 Nifi 都没有反对,比方数据湖 hudi、iceberg 等等;

2.3 Connection

Connection 用于连贯不同处理器,它相当于一个队列,从上游传下来的数据来会进入这个队列,直到被上游组件把数据生产,Connection 能够配置不同的优先级策略,以及数据过期工夫,存储容量等;

2.4 Processor group

用于将一组处理器组织在一起,内部数据能够通过 Nifi 端口组件进行输入输出,remote processor group 是用户接管近程过程的数据(Nifi 实例)。

2.5 Controller Service

它相当于是运行在后盾的服务,我了解它的作用是把处理器要用到的一些公共属性抽取进去,比方连贯数据库的 jdbc 连接池,kerberos 认证服务,达到复用的目标。

03 SeaTunnel 与 Nifi 整合计划

3.1 Nifi Execute Process 组件封装 SeaTunnel

第一种联合形式是能够用 Nifi 的 Execute Process 组件封装 SeaTunnel,利用 Nifi 的调度能力,使其能够定时运行 SeaTunnel 工作。通常想先要调度 SeaTunnel 工作,须要依赖像 Apache DolphinScheduler 或者其余的调度工具,当初 SeaTunnel 有个 web 平台(ui 还在开发)也是把工作提交到 DolphinScheduler 去调度,应用 Nifi 在这里相当于提供了一种新的思路。

构建 Nifi 工作流

将 SeaTunnel 和 Nifi 部署到一个节点上,能够通过 executeProcess 这个组件来运行 SeaTunnel 启动脚本,SeaTunnel 的运行日志会通过头节点传到上游组件,它能够通过这个投机节点,上游流传的日志外面会蕴含工作相干的一些信息。

如果这个工作启动胜利的话,咱们能够过滤出它的工作 id,拿到工作 id 之后就能够进一步地对工作进行管制,比方封装进行工作,勾销一些工作的接口性能。

上图是在队列中查看到的 SeaTunnel 运行日志,SeaTunnel 启动胜利之后能够在其中看到工作 id,像 Flink job ID、applicationID 都能够获取。

3.2 基于 Nifi Site-To-Site 协定构建

SeaTunnel -Nifi -Connector

基于Nifi Site-To-Site 协定构建 SeaTunnel -Nifi -Connector

Nifi 实例之间的首选通信协议是 Nifi 站点到站点(S2S)协定。S2S 能够轻松,高效,平安地将数据从一个 Nifi 实例传输到另一个实例,或者其余应用程序或设施中,通过 S2S 协定与 Nifi 进行通信。S2S 中反对以 socket 的协定和 HTTP(S)协定作为底层传输协定.

Nifi 能够基于 Site-To-Site 协定与内部的 Nifi 节点或集群通信,也能够与内部的利用进行通信。该协定底层基于 socket 和 http 协定实现。能够利用 Nifi 的端口组件,从内部接收数据或者发送数据给内部零碎。

 

如图所示,右边这个流程是内部数据通过输出端口传到 Nifi,左边这个是 Nifi 数据通过端口传到内部。

3.2.1 SeaTunnel Nifi Connector Sink

这个是我做的 SeaTunnel connector sink demo,基于 SeaTunnel flink v1 connector,目前还比拟毛糙,所以还没提交到社区,前面欠缺了会提交。

因为 Nifi 的数据是流式的,首先咱们实现 FlinkStreamSink 接口,重写 outputStream 办法来输入数据到 Nifi,须要在配置中指定 Nifi 的 url 以及指标端口,而后利用 Flink 内置的 Nifi Sink 组件来输入数据,Nifi 的数据结构实际上是由 content 和 attribute 组成, 这里的 attribute 设成了空, 实际上咱们是能够按需将一些附加信息放到 attribute 中。这里 Nifi 用到了 Nifi data packet 数据结构,实际上底层是基于 Site To Site 协定进行一个实现。

3.2.2 SeaTunnel Nifi Connector Source

Nifi connector source 实现形式其实和 sink 差不多, 是基于 Flink 的 Nifi Source 组成,通过 FlinkStreamSource 接口重写 getData 办法来实现。

对于基于 Spark 引擎的 Connector,也能够用相似的实现。

通过 SeaTunnel 的 Nifi Connector,Nifi 和 SeaTunnel 之间的数据就能够不便地传输,能够在一些场景下代替用 kafka 作为音讯传输中间件的计划

03 天翼云数据集成平台

SeaTunnel V1 与 V2 Api 解决流程比照

外围的 Connector 是基于 Apache Nifi 封装而成,原生的 Nifi 对于用户而言很不敌对,用户的应用老本很高。

组件尽管多然而非专业人士 也很难将其玩转 ,然而它的能力还很弱小的, 比方工作流编排,调度,统计监控,版本治理,血统剖析 等能力都有,甚至 mini Nifi 还反对从边缘端采集物联网数据,因而咱们就在原生的 Nifi 之上,做了大量的封装,开发了很多常见的数据集成利用。

咱们的数据总线局部,底层采纳 Flink。Flink 生产 kafka 的数据来写入 hive 或者其余指标。而下层的利用中,会封装 Connector 发送数据到 kafka,再由数据总线写入数据中台的数仓之中的工作,接管 Kafka 数据,写入到数据中台中。

✦ ✦✦ ✦✦ ✦✦ ✦

在摸索过程中,咱们也思考引入 SeaTunnel,引入 SeaTunnel 有什么益处呢?首先 SeaTunnel,它是能够补足原有零碎的一些 cdc 能力的。

1、因为 Nifi 的 cdc 组件仅反对 mysql 数据源,cloudera 官网也不举荐用 Nifi 做 cdc,即使是 Nifi 提供了二次开发能力,用 Nifi 定制 cdc 组件从开发成本上而言也不够经济。

2、正好 SeaTunnel 是一个轻量级的工具,反对的组件类型丰盛,且依靠于 Flink 生态,适宜 cdc 的场景。另外,咱们的数据总线局部,设计之初只是想服务于中台产品的外部数仓 hive,在新版本中前面也心愿可能反对写入更多的指标,同时还心愿可能做些简略的 ETL。

当然,咱们能够本人通过开发更多的 Flink 程序来满足这种需要,然而发现 SeaTunnel 只有进行简略地对解析配置的局部革新,就能够整合到咱们原有的体系之中。

✦ ✦ ✦ ✦ ✦ ✦ ✦ ✦

基于这两点,我来说说咱们是如何做的。

这是 整个数据平台的业务调用逻辑,用户在前端填写好配置参数之后,参数被发送到 web 服务上,web 服务用 Flink Yarn API 起一个 Flink 利用。

当然在这之前,咱们须要部署好 Hadoop 等相干环境,以及 Flink 须要依赖相干的包传到 HDFS 下面。

那咱们怎么把最开始的 Flink 程序革新成 SeaTunnel 内核的 呢?总共有 6 个步骤!

1.main 函数将 args 参数组织成 map;

2.FlinkCommandArgs 减少 map 类型的成员变量;

3.FlinkApiTaskExecuteCommand 接管 FlinkCmmandArgs,在 execute()中,设法传入 map 给 ConfigBuilder;

4. 批改 SeaTunnel -core-base 中的 ConfigBuilder , 减少 loadByMap 办法;

5.loadByMap 办法外部调用 ConfigFactory.parseMap 办法,返回 Config;

6. 从新打包 SeaTunnel-core-flink 模块;

这个图是一个 简略的 demo,首先能够配置 SeaTunnel -core-flink.jar,把它放在 HDFS 上的一个门路替换原有的 app,替换相干的 lib 将用到的 SeaTunnel 插件也放到 lib 目录下。

我这里改的是程序运行的一个主类,把它改成 org.apache.seatunnel.core.flink.SeatunnelFlink,这样就能够替换掉原生数据总线局部 Flink 的局部。

另外一种模式是 Nifi 能够二次开发来封装 SeaTunnel 组件,最初后果是生成 nar 包(相似于 jar 包),搁置于 Nifi 的 lib 目录下即可失效。生成的组件能够于 Nifi 其余组件进行连贯。这种形式与之前提到的用 executeProcess 组件来启动 SeaTunnel 不同,它能够达到更深层次的定制。比方从新 Nifi 处理器的 onStop 办法,来管制 Spark 或者 Flink 工作的进行,或者重写 OnTrigger 办法来解决数据,也能够定义新的连贯关系来解决异样数据。

这个组件展现进去成果如图所示。它能够把 SeaTunnel 所须要的环境信息,传到组件外面去,同时能够定自定义组件上的 SeaTunnel 的参数,包含 env,source,transform,sink,也能够针对不同的数据源类型再对 参数细分,而这些参数通过 Nifi api 是很容易从前端传给 Nifi 的,因而采纳这种模式可能很容易地将 SeaTunnel 置于 Nifi 的体系内,使其两者能力联合。

04 参加开源教训与心得

在工具调研到参加到开源我的项目奉献的过程中,有过很多小故事,当然在这里要非常感谢 SeaTunnel 社区的同学们,他们十分激情,而且社区十分沉闷,最开始也从公司的一些大佬那里听到他们对 SeaTunnel 的评估很不错。

咱们团队看了官网的介绍之后,发现引入 SeaTunnel 能够为后续平台 开发节俭不少工作量,于是本人开始弄 demo 进行验证,也尝试开发了插件。正好碰上社区出了 “Connector”开发者激励打算,于是我就尝试把本人开发出的插件提交给社区。

第一次提交 PR 遇到的坑比拟多,因为你想在开源社区提交代码,不仅须要在 代码格调单元测试,还要恪守相干的 license,这个过程中所有的代码测验花了大略四天的工夫左右。

在社区的交换过程中,我开始加了社区经营人员的微信,经营小助手十分激情 ,有问题就间接会踊跃 帮我协调,一起解决,我感觉这个气氛不论是对于用户还是贡献者,都是特地敌对,体验也特地好的。

周老师的文章到这里完结了,感激大家的浏览,也欢送来社区跟技术同学获得交换,最初心愿更多的同学 退出到 SeaTunnel 社区,在这里不仅能够深切感触到 Apache 的开源精力和文化,还能理解 Apache 我的项目的治理流程,学习到优良的代码设计思维。

心愿通过大家的致力,独特成长,将 SeaTunnel 打造成为顶级的数据集成平台。

Apache SeaTunnel

Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩大、用于海量数据(离线 & 实时)同步和转化的数据集成平台

仓库地址: https://github.com/apache/inc…

网址:https://seatunnel.apache.org/

Proposal:https://cwiki.apache.org/conf…

Apache SeaTunnel(Incubating) 下载地址:https://seatunnel.apache.org/…

衷心欢送更多人退出!

咱们置信,在 「Community Over Code」(社区大于代码)、「Open and Cooperation」(凋谢合作)、「Meritocracy」(精英治理)、以及「 多样性与共识决策」等 The Apache Way 的指引下,咱们将迎来更加多元化和容纳的社区生态,共建开源精力带来的技术提高!

咱们诚邀各位有志于让外乡开源立足寰球的搭档退出 SeaTunnel 贡献者小家庭,一起共建开源!

提交问题和倡议:https://github.com/apache/inc…

奉献代码:https://github.com/apache/inc…

订阅社区开发邮件列表 : dev-subscribe@seatunnel.apach…

开发邮件列表:dev@seatunnel.apache.org

退出 Slack:https://join.slack.com/t/apac…

关注 Twitter: https://twitter.com/ASFSeaTunnel

退出移动版