乐趣区

关于后端:Spark离线开发框架设计与实现

导读 :本文介绍了开发框架的整体设计,随后对各模块进行了拆解,重点介绍了如何疾速实现应用程序的开发,并从设计思路、实现形式、性能介绍及创立形式等角度对通用的数据回溯利用进行了全面介绍,实现了一次环境筹备,多数据回溯工作的启动计划。总之,框架对开发效率、回溯工作的效率与保护老本及代码治理便捷性都会有显著的成果。

全文 3308 字,预计浏览工夫 10 分钟。

一、背景

随着 Spark 以及其社区的一直倒退,Spark 自身技术也在一直成熟,Spark 在技术架构和性能上的劣势越来越显著,目前大多数公司在大数据处理中都偏向应用 Spark。Spark 反对多种语言的开发,如 Scala、Java、Sql、Python 等。

Spark SQL 应用规范的数据连贯,与 Hive 兼容,易与其它语言 API 整合,表达清晰、简略易上手、学习成本低,是开发者开发简略数据处理的首选语言,但对于简单的数据处理、数据分析的开发,应用 SQL 开发显得力不从心,保护老本也十分高,应用高级语言解决会更高效。

在日常的数据仓库开发工作中,咱们除了开发工作外,也波及大量的数据回溯工作。对于创新型业务来说,口径变动频繁、业务迅速迭代,数据仓库的回溯十分常见,通过回溯几个月甚至一年是十分广泛的,但传统的回溯工作形式效率极低,而且须要人力亲密关注各工作状态。

针对目前现状,咱们开发了一套 Spark 离线开发框架,如下表所示,咱们例举了目前存在的问题及解决方案。框架的实现不仅让开发变得简略高效,而且对于数据的回溯工作在不须要任何开发的状况下,疾速高效地实现大量的回溯工作。

二、框架设计

框架旨在封装反复的工作,让开发变得简略。框架如图 2 - 1 所示,次要分为三个局部,根底框架、可扩大工具及应用程序,开发者只需关注应用程序即可简略疾速实现代码开发。


2.1 根底框架

根底框架中,咱们对于所有类型的利用实现代码与配置拆散机制,资源配置对立以 XML 文件模式保留并由框架解析解决。框架会依据开发者配置的工作应用资源大小,实现了 SparkSession、SparkContext、SparkConf 的创立,同时加载了罕用环境变量,开发了通用的 UDF 函数(如罕用的 url 参数解析等)。其中 Application 为所有利用的父类,解决流程如图所示,开发者只需编写关注绿色局部即可。

目前,离线框架所反对的罕用环境变量如下表所示。


2.2 可扩大工具

可扩大工具中蕴含了大量的工具类,服务于应用程序及根底框架,罕用有,配置文件解析类,如解析工作资源参数等;数据库工具类,用于读写数据库;日期工具类,用于日期加减、转换、辨认并解析环境变量等。服务于应用程序的通用工具模块可统称为可扩大工具,这里不再赘述。

2.3 应用程序

2.3.1 SQL 利用

对于 SQL 利用,只须要创立 SQL 代码及资源配置即可,利用类为惟一类(已实现),有且只有一个,供所有 SQL 利用应用,开发者无需关怀。如下配置所示,class 为所有利用的惟一类名,开发者要关怀的是 path 中的 sql 代码及 conf 中该 sql 所应用的资源大小。

<?xml version="1.0" encoding="UTF-8"?>
<project name="test">
    <class>com.way.app.instance.SqlExecutor</class>
    <path>sql 文件门路 </path>
  <!--    sparksession conf   -->
    <conf>
        <spark.executor.memory>1G</spark.executor.memory>
        <spark.executor.cores>2</spark.executor.cores>
        <spark.driver.memory>1G</spark.driver.memory>
        <spark.executor.instances>20</spark.executor.instances>
    </conf>
</project>

2.3.2 Java 利用

对于简单的数据处理,SQL 代码不能满足需要时,咱们也反对 Java 程序的编写,与 SQL 不同的是,开发者须要创立新的利用类,继承 Application 父类并实现 run 办法即可,run 办法中开发者只须要关注数据的解决逻辑,对于通用的 SparkSession、SparkContext 等创立及敞开无需关注,框架还帮忙开发者封装了代码的输出、输入逻辑,对于输出类型,框架反对 HDFS 文件输出、SQL 输出等多种输出类型,开发者只需调用相干处理函数即可。

如下为一个简略的 Java 数据处理利用,从配置文件能够看出,仍需配置资源大小,但与 SQL 不同的是,开发者须要定制化编写对应的 Java 类(class 参数),以及利用的输出(input 参数)和输入参数(output 参数),此利用中输出为 SQL 代码,输入为 HDFS 文件。从 Test 类实现能够看出,开发者只需三步走:获取输出数据、逻辑解决、后果输入,即可实现代码编写。

<?xml version="1.0" encoding="UTF-8"?>
<project name="ecommerce_dwd_hanwuji_click_incr_day_domain">
    <class>com.way.app.instance.ecommerce.Test</class>
    <input>
        <type>table</type>
        <sql>select
            clk_url,
            clk_num
            from test_table
            where event_day='{DATE}'
            and click_pv > 0
            and is_ubs_spam=0
        </sql>
    </input>
    <output>
        <type>afs_kp</type>
        <path>test/event_day={DATE}</path>
    </output>
    <conf>
        <spark.executor.memory>2G</spark.executor.memory>
        <spark.executor.cores>2</spark.executor.cores>
        <spark.driver.memory>2G</spark.driver.memory>
        <spark.executor.instances>10</spark.executor.instances>
    </conf>
</project>
package com.way.app.instance.ecommerce;import com.way.app.Application;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.sql.Row;import java.util.Map;import org.apache.spark.api.java.function.FilterFunction;import org.apache.spark.sql.Dataset;public class Test extends Application {@Override    public void run() {// 输出        Map<String, String> input = (Map<String, String>) property.get("input");        Dataset<Row> ds = sparkSession.sql(getInput(input)).toDF("url", "num");        // 逻辑解决(简略的筛选出 url 带有局部站点的日志)JavaRDD<String> outRdd = ds.filter((FilterFunction<Row>) row -> {String url = row.getAs("url").toString();            return url.contains(".jd.com")                    || url.contains(".suning.com")                    || url.contains("pin.suning.com")                      || url.contains(".taobao.com")                    || url.contains("detail.tmall.hk")                    || url.contains(".amazon.cn")                    || url.contains(".kongfz.com")                    || url.contains(".gome.com.cn")                    || url.contains(".kaola.com")                    || url.contains(".dangdang.com")                    || url.contains("aisite.wejianzhan.com")                    || url.contains("w.weipaitang.com");        })                .toJavaRDD()                .map(row -> row.mkString("\001"));        // 输入        Map<String, String> output = (Map<String, String>) property.get("output");        outRdd.saveAsTextFile(getOutPut(output));    }}

2.3.3 数据回溯利用

数据回溯利用是为解决疾速回溯、开释人力而研发的,应用十分便捷,开发者无需重构工作代码,与 SQL 利用雷同,回溯利用类为惟一类(已实现),有且只有一个,供所有回溯工作应用,且反对多种回溯计划。

2.3.3.1 方案设计

在日常回溯过程中发现,一次回溯工作存在重大的工夫节约,无论以何种形式提交工作,都须要经验以下执行环境申请及筹备的过程:

  1. 在 client 提交 application,首先 client 向 RS 申请启动 ApplicationMaster
  2. RS 先随机找到一台 NodeManager 启动 ApplicationMaster
  3. ApplicationMaster 向 RS 申请启动 Executor 的资源
  4. RS 返回一批资源给 ApplicationMaster
  5. ApplicationMaster 连贯 Executor
  6. 各个 Executor 反向注册给 ApplicationMaster
  7. ApplicationMaster 发送 task、监控 task 执行,回收后果

这个过程占用的工夫咱们统称为执行环境筹备,咱们提交工作后,经验如下三个过程:

  1. 执行环境筹备
  2. 开始执行代码
  3. 开释资源

执行环境筹备通常会有 5 -20 分钟的等待时间,以队列过后的资源状况高低稳定,失败率为 10% 左右,失败起因因为队列、网络、资源有余等造成的不可抗力因素;代码执行过程通常失败率在 5% 左右,通常因为节点不稳固、网络等因素导致。离线开发框架回溯利用从节省时间和人力两个方面思考,设计方案图 2 - 3 所示。

从回溯工夫方面来看:将所有回溯子工作的第一、第三步的工夫压缩为一次,即环境筹备及开释各一次,执行屡次回溯代码。若开发者回溯工作为 30 个子工作,则节俭的工夫为 5 -20 分钟乘 29,可见,回溯子工作越多,回溯提效越显著。

从人工染指方面来看,第一,开发者无需额定开发、增加回溯配置即可。第二,离线框架回溯利用启动的工作数量远远小于传统回溯计划,以图 2 - 3 为例,该回溯工作为串行回溯形式,应用框架后只需关注一个工作的执行状态,而传统形式则需人工保护 N 个工作的执行状态。

最初,咱们在应用离线开发框架回溯一个一年的串行工作中,代码的执行只须要 5 分钟左右,咱们发现,不应用离线开发框架回溯的工作在最现实的状况下(即最短时间调配到资源、所有子工作均无失败状况、一次能够串行启动 365 天),须要的工夫为 2.5 天,但应用离线开发框架回溯的工作,在最坏的状况下(即最长工夫调配到资源,工作失败状况呈现 10%),只须要 6 个小时就可实现,提效 90% 以上,且根本无需人力关注。

2.3.3.2 性能介绍
断点续回

应用 Spark 计算,咱们在享受其计算带来的飞快速度时,难免会遭逢其中的不稳定性,节点宕机、网络连接失败、资源问题带来的工作失败不足为奇,回溯工作动辄几个月、甚至一年,任务量微小,失败后能够持续从断点处回溯显得尤为重要。在离线框架设计中,记录了工作回溯过程中已胜利的局部,工作失败重启后会进行断点续回。

回溯程序

在回溯工作中,通常咱们会依据业务须要确定回溯程序,如对于有新老用户的增量数据,因为以后的日期数据依赖历史数据,所以咱们通常会从历史到当初开始回溯。但没有这种须要时,一般来说,先回溯当初能够疾速满足业务方对当初数据指标的理解,咱们通常会从当初到历史回溯。在离线框架设计中,开发者可依据业务须要抉择回溯程序。

并行回溯

通常,回溯工作优先级低于例行工作,在资源无限的状况下,回溯过程中不能一次性全副开启,免得占用大量资源影响例行工作,所以离线框架默认为串行回溯。当然在资源充沛的时间段,咱们能够抉择适当的并行回溯。离线开发框架反对肯定的并发度,开发者在回溯工作时熟能生巧。

2.3.3.3 创立一个回溯工作

回溯利用的应用十分不便,开发者无需新开发代码,应用例行的代码,配置回溯计划即可,如下代码所示,

  • class 参数为回溯利用的惟一类,必填参数,所有回溯工作无需变动。
  • type 参数为回溯利用类型,默认为 sql,若利用类型为 java,则 type 值应为 java 类名。
  • path 参数为回溯代码门路,必填参数,无默认值,通常与例行工作代码雷同,无需批改。
  • limitdate 参数为回溯的截止日期,必填参数,无默认值。
  • startdate 参数为回溯开始日期,必填参数,无默认值,若工作进入断点续回或开启并行回溯时,则该参数有效。
  • order 参数为回溯程序,默认为倒序。当值为 1 时为正序,为值为 - 1 时为倒序。
  • distance 参数为回溯步长,框架默认为串行回溯,但也反对并行回溯,该参数次要用于反对并行回溯,当该参数存在且值不为 - 1 时,回溯开始日期取值为基准日期。如启动两个并行任务,工作的执行范畴为基准日期至基准日期加步长或 limitdate,若基准日期加步长后日期大于 limitdate,则是取 limitdate,否则反之。
  • file 参数为回溯日志文件,必填参数,无默认值,用于记录已回溯胜利的日期,当失败再次重启工作时,startdate 会以日志文件中日期的下一个日期为准。
  • conf 参数与其余利用雷同,为本次回溯工作的资源占用配置。
<?xml version="1.0" encoding="UTF-8"?><project name="ecommerce_ads_others_order_retain_incr_day">    <class>com.way.app.instance.ecommerce.Huisu</class>    <type>sql</type>    <path>/sql/ecommerce/ecommerce_ads_others_order_retain_incr_day.sql</path>    <limitdate>20220404</limitdate>    <startdate>20210101</startdate>    <order>1</order>    <distance>-1</distance>    <file>/user/ecommerce_ads_others_order_retain_incr_day/process</file>    <conf>        <spark.executor.memory>1G</spark.executor.memory>        <spark.executor.cores>2</spark.executor.cores>        <spark.executor.instances>30</spark.executor.instances>        <spark.yarn.maxAppAttempts>1</spark.yarn.maxAppAttempts>    </conf></project>
‍

三、应用形式

3.1 应用介绍

应用离线框架形式开发时,开发者只需重点关注数据逻辑解决局部,开发实现打包后,提交执行,对于每一个利用主类雷同,如前文所述为 Application 父类,不随利用变动,惟一变动的是父类须要接管的参数,该参数为利用的配置文件的相对路径。

3.2 应用比照

应用离线框架前后比照图如下所示。


四、瞻望

目前,离线开发框架仅反对 SQL、Java 语言代码的开发,但 Spark 反对的语言远不止这两种,咱们须要持续对框架降级反对多语言开发等,让开发者更不便、疾速地进行大数据开发。

———-  END  ———-

举荐浏览

云原生时代的搜寻服务算力治理

浅谈小程序开源业务架构建设之路

百度小程序包流式下载安装优化

前端工程化之 FaaS SSR 计划

日志中台不重不丢实现浅谈

百度 ToB 垂类账号权限平台的设计与实际

退出移动版