关于sap:SAP-RFC-接口基于-SeaTunnel-开发实践打通企业内部数据采集的最后一个壁垒

14次阅读

共计 6003 个字符,预计需要花费 16 分钟才能阅读完成。

点亮 ⭐️ Star · 照亮开源之路

GitHub:https://github.com/apache/inc…

无论是甲方还是乙方,咱们在采集数据进行数仓模型建设时,企业的 ERP 一旦切换到 SAP 零碎中,就会遇到较高的平安挑战、技术门槛和产品壁垒。

平安挑战问题在于:传统数仓模式离线接入 SAP HANA,对于多集团公司又波及到数据权限和隔离等平安问题,个别团体大型企业不太会凋谢 HANA 数据库进行接入,同时 SAP 的业务表逻辑也比较复杂;

技术门槛在于:咱们要有对应的 java 开发工程师,每一个数据表就要开发一个接口,而且接口的传输速度也很慢,只能适宜小批量的数据接入;

产品壁垒在于:SAP 的闭环治理只能购入 SAP 的 BW 产品实现整体数据的疾速接入和模型建设,这种模式就比拟适宜 “ALL IN SAP”的企业外面,所有的数据处理和剖析都是基于 SAP 产品进行闭环的开发治理,然而弊病仍然显著,一旦有局部产品脱离 SAP,那数据团队以及运维的老本都是 翻倍减少 的,也无奈实现企业降本增效的目标;

理论状况是在企业外部的各种业务零碎异样简单,尤其是各种各样的 ERP 零碎,业务中台零碎,线上平台零碎,私有化部署的,SAAS 模式的,要一个通用的工具 去实现各种数据源的采集接入,前几年国内比拟支流的就是 Kettle,再起初是 DATAX;但他们都侧重于离线解决,对于实时数据接入也是费时费力,或者根本不能实现;

基于以上简单场景,在试用了市场上支流的开源的产品之后,咱们锁定了 SeaTunnel,依照从简略到简单的接入,分步骤实现了离线数据接入,实时数据(Kafka)接入,数据在 Hadoop 生态和 Clickhouse 之间的连接买通,在验证了上述的 稳定性和高速度 之后,咱们外部决定开发基于 SeaTunnel 的 SAP RFC 接口,完全彻底的买通企业外部数据采集的最初一个壁垒;

首先开发BaseStaticInput 插件。BaseStaticInput 是个 abstract class,咱们只有继承并实现它就能够。

class SapRfcInput extends BaseStaticInput{var config = ConfigFactory.empty()

  override def setConfig(config: Config): Unit = {this.config = config}

  override def getConfig(): Config = config

  override def checkConfig(): (Boolean, String) = { }

  override def getDataset(spark: SparkSession): Dataset[Row] = {}}

其中的关键点就是要实现getDataSet 函数, 这个函数的返回值是Dataset[Row]

怎么能力失去 Dataset[Row]?要么间接通过 seq 或者 list 相似的数据结构,要么通过 RDD 结构。

如果间接通过数据结构,在数据量过大时会产生内存溢出,这种办法在数据量很小的时候是能够的。在数据量大的时候,须要一种惰性的形式获取数据,得实现本人的 RDD。


class SapRfcRDD(sc: SparkContext, config: Config) extends RDD[Row](sc, Nil) with Logging{override def compute(split: Partition, context: TaskContext): Iterator[Row] = new Iterator[Row] {override def hasNext: Boolean = {}

    override def next(): Row = {}
  }

  override protected def getPartitions: Array[Partition] = {}}

SapRfcRDD构造函数咱们本人增加了一个参数 config, 为什么增加它?下文会阐明。

compute顾名思义就是用于计算出数据的, 返回值是迭代器, 阐明它是一种惰性的获取数据的形式。

实现一个迭代器在此之前首先须要实现hasNext 与 next 办法,hasNext 用于分别是否还有数据, next 用于产生数据。

getPartitions 用于取得分区信息。

如何实现这两个函数呢? 这个实现就得和如何取得 SAP RFC 接口的数据联合起来看。

咱们取得 SAP RFC 接口数据个别有以下 关键步骤:


# 依据相干 RFC 接口信息取得 JcoTable
getJcoTable(config: Config): JCoTable

# 取得数据行数
table.getNumRows

# 设置行
table.setRow(curIndex)

# 依据字段名取数据
val data = columns.map(column => {table.getString(column)
})

compute 中的 hasNext 办法是必定和 table.getNumRows 相干的, next 办法是必定和 table.setRow 办法相干的, 那咱们得取得 JcoTable 对象, 这就和下面提到 SapRfcRDD 的构造函数的第二个参数 config 分割起来了,通过 config, 咱们能力取得 JcoTable 对象。

那为什么不间接通过结构函数参数将 JcoTable 注入呢?这波及到 RDD 是分布式数据集,它会被序列化之后在各个节点之间传递,SapRfcRDD 构造函数的参数是必须可能平安序列化的,但 JcoTable 序列化会产生内存溢出,当然是否溢出是和 JcoTable 关联的数据大小无关。

那 getPartitions 是干嘛的呢?看下来如同不须要它也是能够的。如果你仅仅想把数据分成一个分区的话,getPartitons 的确是没什么用的。

然而如果你要把数据分成多个分区,放慢它的处理速度,getPartitions 的实现就很重要了。

而且要特地留神 compute 的 split 参数,它其实就是 getPartitions 返回的其中一个分区,compute 的 hasNext 与 next 的实现 和它是非亲非故的。


trait Partition extends scala.AnyRef with scala.Serializable {
  def index : scala.Int
override def hashCode() : scala.Int = { /* compiled code */}
override def equals(other : scala.Any) : scala.Boolean = {/* compiled code */}
}

class RowPartition(idx: Int, val start: Int, val end: Int) extends Partition {
override def index: Int = idx

override def toString: String = s"RowPartition{index: ${idx}, start: ${start}, end: ${end}}"
}

getPartitions 的返回值是Array[Partitions],Partition 是一个接口,实现它是非常简单的。

咱们给 RowPartiton 构造函数增加了两个参数start 与 end,即 JcoTable 的开始行数与完结行数, 左闭又开。

比如说整个接口的数据是 2000 行, 咱们给它分成两个分区,就相似于 RowPartition{index: 0, start: 0, end: 1000}, RowPartition{index: 0, start: 1000, end: 2000}。

在 compute 中 split 就是 RowPartiton 的实例, 通过 split 的 start 与 end 咱们能够很容易的实现 hasNext, next。

override def compute(split: Partition, context: TaskContext): Iterator[Row] = new Iterator[Row] {val columns = config.getStringList(config.getString("table")).asScala
val rowPartition: RowPartition = split.asInstanceOf[RowPartition]
val table = SapRfc.getJcoTable(config)
val tableRowData = new TableRowData(columns, table, rowPartition.index, rowPartition.start, rowPartition.end)
    println(tableRowData)


override def hasNext: Boolean = {tableRowData.hasNext()
    }


override def next(): Row = tableRowData.next()
  }


class TableRowData(val columns: Seq[String], val table: JCoTable, val partitionId: Int, val start: Int, val end: Int) {
var curIndex = start


  def hasNext(): Boolean = {curIndex < end}


  def next(): Row = {table.setRow(curIndex)
val data = columns.map(column => {table.getString(column)
    })
    curIndex += 1
    Row.fromSeq(data)
  }


override def toString: String = s"TableRowData{partitionId: ${partitionId}, start: ${start}, end: ${end}, columns: ${columns} }"
}

实现完 SapRfcRDD, 实现 getDataSet 就非常容易了。

最终咱们实现了 SAP RFC 接口的数据接入,蕴含了 2 种模式 ASHOST 和 MSHOST(注:_The MSHOST string is useful since it will give you failover capabilities in the process server connection. Also it can load balance the CPS connections (not the jobs, they are load balanced based on other metrics) to your remote system_),极大的简化了 SAP 数据的采集工夫,由原来 java 模式的一接口一开发实现了当初的一接口一配置,附 input 示例:


input {
    org.interestinglab.waterdrop.input.SapRfc {
        jco.client.mshost = "XXXXXX"
        jco.client.r3name = "XXX"
        jco.client.client = "XXX"
        jco.client.user = "XXX"
        jco.client.passwd = "XXX"
        jco.client.lang = "ZH"
        jco.client.group="PUBLIC"

        function = "FUNXXX"
        params = ["IV_DDATE", ""${rfc_date}""]
        table = "TTXXX"
        TTXXX= ["col1","col2","col3"]

        partition = 4
        result_table_name = "res_tt"
      }
}

input {
    org.interestinglab.waterdrop.input.SapRfc {
        jco.client.ashost = "XXXX"
        jco.client.sysnr = "XX"
        jco.client.client = "XX"
        jco.client.user = "XX"
        jco.client.passwd = "XXX"
        jco.client.lang = "ZH"

        function = "FUNXXX"
        params = ["DDATE", ""${rfc_date}""]
        table  = "TABLEXXXX"
        TABLEXXXX = ["col1","col2","col3"]

        partition = 4
        result_table_name = "res_tt"

      }
}

参数配置蕴含三局部,第一局部端口的访问信息,第二局部是 sap 外部的函数以及传递参数、表名称以及表字段,第三局部是 partition 是 spark 的分区数配置;

通过上述配置,咱们获取60 万条左右 sap 数据(受限 sap 管制条件只能按天查问),从启动 job 到数据插入 hive 只须要 2 分钟即可,整个 SAP 数据的接入开发工夫由原来的天缩短到小时级别(蕴含参数配置,根本校验)。

作者:韩山峰 / 皇甫新义金红叶纸业团体大数据开发工程师

专一于大数据平台建设、数据仓库、数据模型建设、数据可视化方向,对市场上常见的数据集成框架以及引擎有肯定的理解。

Apache SeaTunnel

// 放弃联系 //

来,和社区一起成长!

Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩大、用于海量数据(离线 & 实时)同步和转化的数据集成平台。

仓库地址: https://github.com/apache/inc…

网址:https://seatunnel.apache.org/

Proposal:https://cwiki.apache.org/conf…

Apache SeaTunnel(Incubating) 2.1.0 下载地址:https://seatunnel.apache.org/…

衷心欢送更多人退出!

可能进入 Apache 孵化器,SeaTunnel(原 Waterdrop) 新的途程才刚刚开始,但社区的发展壮大须要更多人的退出。咱们置信,在 「Community Over Code」(社区大于代码)、「Open and Cooperation」(凋谢合作)、「Meritocracy」(精英治理)、以及「 多样性与共识决策」等 The Apache Way 的指引下,咱们将迎来更加多元化和容纳的社区生态,共建开源精力带来的技术提高!

咱们诚邀各位有志于让外乡开源立足寰球的搭档退出 SeaTunnel 贡献者小家庭,一起共建开源!

提交问题和倡议:https://github.com/apache/inc…

奉献代码:https://github.com/apache/inc…

订阅社区开发邮件列表 : dev-subscribe@seatunnel.apach…

开发邮件列表:dev@seatunnel.apache.org

退出 Slack:https://join.slack.com/t/apac…

关注 Twitter: https://twitter.com/ASFSeaTunnel

正文完
 0