关于sap:SAP-RFC-接口基于-SeaTunnel-开发实践打通企业内部数据采集的最后一个壁垒

点亮 ⭐️ Star · 照亮开源之路

GitHub:https://github.com/apache/inc…

无论是甲方还是乙方,咱们在采集数据进行数仓模型建设时,企业的ERP一旦切换到SAP零碎中,就会遇到较高的平安挑战、技术门槛和产品壁垒。

平安挑战问题在于:传统数仓模式离线接入SAP HANA,对于多集团公司又波及到数据权限和隔离等平安问题,个别团体大型企业不太会凋谢HANA数据库进行接入,同时SAP的业务表逻辑也比较复杂;

技术门槛在于:咱们要有对应的java开发工程师,每一个数据表就要开发一个接口,而且接口的传输速度也很慢,只能适宜小批量的数据接入;

产品壁垒在于:SAP的闭环治理只能购入SAP的BW产品实现整体数据的疾速接入和模型建设,这种模式就比拟适宜“ALL IN SAP”的企业外面,所有的数据处理和剖析都是基于SAP产品进行闭环的开发治理,然而弊病仍然显著,一旦有局部产品脱离SAP,那数据团队以及运维的老本都是翻倍减少的,也无奈实现企业降本增效的目标;

理论状况是在企业外部的各种业务零碎异样简单,尤其是各种各样的ERP零碎,业务中台零碎,线上平台零碎,私有化部署的,SAAS模式的,要一个通用的工具去实现各种数据源的采集接入,前几年国内比拟支流的就是Kettle,再起初是DATAX;但他们都侧重于离线解决,对于实时数据接入也是费时费力,或者根本不能实现;

基于以上简单场景,在试用了市场上支流的开源的产品之后,咱们锁定了SeaTunnel,依照从简略到简单的接入,分步骤实现了离线数据接入,实时数据(Kafka)接入,数据在Hadoop生态和Clickhouse之间的连接买通,在验证了上述的稳定性和高速度之后,咱们外部决定开发基于SeaTunnel的SAP RFC接口,完全彻底的买通企业外部数据采集的最初一个壁垒;

首先开发BaseStaticInput插件。BaseStaticInput是个abstract class,咱们只有继承并实现它就能够。

class SapRfcInput extends BaseStaticInput{
  var config = ConfigFactory.empty()

  override def setConfig(config: Config): Unit = {
    this.config = config
  }

  override def getConfig(): Config = config

  override def checkConfig(): (Boolean, String) = {
  }

  override def getDataset(spark: SparkSession): Dataset[Row] = {
  }
}

其中的关键点就是要实现getDataSet函数,这个函数的返回值是Dataset[Row]

怎么能力失去Dataset[Row]?要么间接通过seq或者list相似的数据结构,要么通过RDD结构。

如果间接通过数据结构,在数据量过大时会产生内存溢出,这种办法在数据量很小的时候是能够的。在数据量大的时候,须要一种惰性的形式获取数据,得实现本人的RDD。


class SapRfcRDD(sc: SparkContext, config: Config) extends RDD[Row](sc, Nil) with Logging{

  override def compute(split: Partition, context: TaskContext): Iterator[Row] = new Iterator[Row] {
    override def hasNext: Boolean = {
    }

    override def next(): Row = {

    }
  }

  override protected def getPartitions: Array[Partition] = {

  }
}

SapRfcRDD构造函数咱们本人增加了一个参数config,为什么增加它?下文会阐明。

compute顾名思义就是用于计算出数据的,返回值是迭代器,阐明它是一种惰性的获取数据的形式。

实现一个迭代器在此之前首先须要实现hasNext与next办法,hasNext用于分别是否还有数据, next用于产生数据。

getPartitions用于取得分区信息。

如何实现这两个函数呢? 这个实现就得和如何取得SAP RFC接口的数据联合起来看。

咱们取得SAP RFC接口数据个别有以下关键步骤:


# 依据相干RFC接口信息取得JcoTable
getJcoTable(config: Config): JCoTable

# 取得数据行数
table.getNumRows

# 设置行
table.setRow(curIndex)

# 依据字段名取数据
val data = columns.map(column => {
   table.getString(column)
})

compute中的hasNext办法是必定和table.getNumRows相干的, next办法是必定和table.setRow办法相干的, 那咱们得取得JcoTable对象,这就和下面提到SapRfcRDD的构造函数的第二个参数config分割起来了,通过config,咱们能力取得JcoTable对象。

那为什么不间接通过结构函数参数将JcoTable注入呢?这波及到RDD是分布式数据集,它会被序列化之后在各个节点之间传递,SapRfcRDD构造函数的参数是必须可能平安序列化的,但JcoTable序列化会产生内存溢出,当然是否溢出是和JcoTable关联的数据大小无关。

那getPartitions是干嘛的呢?看下来如同不须要它也是能够的。如果你仅仅想把数据分成一个分区的话,getPartitons的确是没什么用的。

然而如果你要把数据分成多个分区,放慢它的处理速度,getPartitions的实现就很重要了。

而且要特地留神compute的split参数,它其实就是getPartitions返回的其中一个分区,compute的hasNext与next的实现和它是非亲非故的。


trait Partition extends scala.AnyRef with scala.Serializable {
  def index : scala.Int
override def hashCode() : scala.Int = { /* compiled code */ }
override def equals(other : scala.Any) : scala.Boolean = { /* compiled code */ }
}

class RowPartition(idx: Int, val start: Int, val end: Int) extends Partition {
override def index: Int = idx

override def toString: String = s"RowPartition{index: ${idx}, start: ${start}, end: ${end}}"
}

getPartitions的返回值是Array[Partitions],Partition是一个接口,实现它是非常简单的。

咱们给RowPartiton构造函数增加了两个参数start与end,即JcoTable的开始行数与完结行数,左闭又开。

比如说整个接口的数据是2000行,咱们给它分成两个分区,就相似于 RowPartition{index: 0, start: 0, end: 1000}, RowPartition{index: 0, start: 1000, end: 2000}。

在compute中split就是RowPartiton的实例, 通过split的start与end咱们能够很容易的实现hasNext, next。

override def compute(split: Partition, context: TaskContext): Iterator[Row] = new Iterator[Row] {
val columns = config.getStringList(config.getString("table")).asScala
val rowPartition: RowPartition = split.asInstanceOf[RowPartition]
val table = SapRfc.getJcoTable(config)
val tableRowData = new TableRowData(columns, table, rowPartition.index, rowPartition.start, rowPartition.end)
    println(tableRowData)


override def hasNext: Boolean = {
      tableRowData.hasNext()
    }


override def next(): Row = tableRowData.next()
  }


class TableRowData(val columns: Seq[String], val table: JCoTable, val partitionId: Int, val start: Int, val end: Int) {
var curIndex = start


  def hasNext(): Boolean = {
    curIndex < end
  }


  def next(): Row = {
    table.setRow(curIndex)
val data = columns.map(column => {
      table.getString(column)
    })
    curIndex += 1
    Row.fromSeq(data)
  }


override def toString: String = s"TableRowData{partitionId: ${partitionId}, start: ${start}, end: ${end}, columns: ${columns} }"
}

实现完SapRfcRDD, 实现getDataSet就非常容易了。

最终咱们实现了SAP RFC接口的数据接入,蕴含了2种模式ASHOST 和MSHOST (注:_The MSHOST string is useful since it will give you failover capabilities in the process server connection. Also it can load balance the CPS connections (not the jobs, they are load balanced based on other metrics) to your remote system_),极大的简化了SAP数据的采集工夫,由原来java模式的一接口一开发实现了当初的一接口一配置,附input示例:


input {
    org.interestinglab.waterdrop.input.SapRfc {
        jco.client.mshost = "XXXXXX"
        jco.client.r3name = "XXX"
        jco.client.client = "XXX"
        jco.client.user = "XXX"
        jco.client.passwd = "XXX"
        jco.client.lang = "ZH"
        jco.client.group="PUBLIC"

        function = "FUNXXX"
        params = ["IV_DDATE", ""${rfc_date}""]
        table = "TTXXX"
        TTXXX= ["col1","col2","col3"]

        partition = 4
        result_table_name = "res_tt"
      }
}

input {
    org.interestinglab.waterdrop.input.SapRfc {
        jco.client.ashost = "XXXX"
        jco.client.sysnr = "XX"
        jco.client.client = "XX"
        jco.client.user = "XX"
        jco.client.passwd = "XXX"
        jco.client.lang = "ZH"

        function = "FUNXXX"
        params = ["DDATE", ""${rfc_date}""]
        table  = "TABLEXXXX"
        TABLEXXXX = ["col1","col2","col3"]

        partition = 4
        result_table_name = "res_tt"

      }
}

参数配置蕴含三局部,第一局部端口的访问信息,第二局部是sap外部的函数以及传递参数、表名称以及表字段,第三局部是partition 是spark的分区数配置;

通过上述配置,咱们获取60万条左右sap数据(受限sap管制条件只能按天查问),从启动job到数据插入hive只须要2分钟即可,整个SAP数据的接入开发工夫由原来的天缩短到小时级别(蕴含参数配置,根本校验)。

作者:韩山峰/皇甫新义金红叶纸业团体大数据开发工程师

专一于大数据平台建设、数据仓库、数据模型建设、数据可视化方向,对市场上常见的数据集成框架以及引擎有肯定的理解。

Apache SeaTunnel

// 放弃联系 //

来,和社区一起成长!

Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩大、用于海量数据(离线&实时)同步和转化的数据集成平台。

仓库地址: https://github.com/apache/inc…

网址:https://seatunnel.apache.org/

Proposal:https://cwiki.apache.org/conf…

Apache SeaTunnel(Incubating) 2.1.0 下载地址:https://seatunnel.apache.org/…

衷心欢送更多人退出!

可能进入 Apache 孵化器,SeaTunnel(原 Waterdrop) 新的途程才刚刚开始,但社区的发展壮大须要更多人的退出。咱们置信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(凋谢合作)、「Meritocracy」(精英治理)、以及「多样性与共识决策」等 The Apache Way 的指引下,咱们将迎来更加多元化和容纳的社区生态,共建开源精力带来的技术提高!

咱们诚邀各位有志于让外乡开源立足寰球的搭档退出 SeaTunnel 贡献者小家庭,一起共建开源!

提交问题和倡议:https://github.com/apache/inc…

奉献代码:https://github.com/apache/inc…

订阅社区开发邮件列表 : dev-subscribe@seatunnel.apach…

开发邮件列表:dev@seatunnel.apache.org

退出 Slack:https://join.slack.com/t/apac…

关注 Twitter: https://twitter.com/ASFSeaTunnel

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理