关于数据库:拈花云科基于-Apache-DolphinScheduler-在文旅业态下的实践

7次阅读

共计 10834 个字符,预计需要花费 28 分钟才能阅读完成。

作者|云科 NearFar X Lab 团队 左益、周志银、洪守伟、陈超、武超

一、导读

无锡拈花云科技服务有限公司(以下简称:拈花云科)是由拈花湾文旅和北京滴普科技独特孵化的文旅目的地数智化服务商。2022 年底,拈花云科 NearFar X Lab 团队开始测试 DolphinScheduler 作为交付型我的项目和产品我的项目的任务调度工具。本文次要分享了拈花云科在任务调度工具的抉择、迭代和实际过程中的教训,心愿对大家有所启发。

二、业务背景

咱们的服务对象次要是国内各个景区、景点,业务范围涵盖文旅行业的多个板块,如票务、交通、批发、住宿、餐饮、演绎、游乐、影院、KTV、租赁、服务、会务、康乐、康养、电商、客服、营销、分销、安防等。因为业务零碎起源较多,多零碎下的数据源类型差异化较大,所以在施行数据我的项目时咱们须要可能反对多种数据起源(Mysql、Oracle、SqlServer、Hive、Excel……)的数据集成工作。同时依据大部分景区为国有化的特点,咱们也须要具备可能提供私有化交付部署及 SAAS 化数据中台产品解决方案的双重服务撑持能力。

三、DolphinScheduler 调度零碎选型过程

在团队成立之初为了疾速构建 MVP 业务版本,咱们沿用了团队共事之前用过的 Kettle 调度计划。该计划下通过 Kettle 实现可视化调度的配置及对于异构数据的集成工作,通过 Python 调用 HQL 脚本实现基于 Hive 的传参数据计算。

基于 MVP 的构建,咱们也开始思考,在咱们的整体中台架构下该须要一个什么样的调度零碎,以及除了调度这件事自身咱们还须要哪些性能和能力。带着这些问题咱们开始整顿本人的需要,以及每个需要下有什么样的产品能够适配。

调度零碎须要撑持的利用场景

文旅业态下的数据应用场景与其它业态下的应用场景大体雷同,次要分为以下四类:

调度零碎须要撑持的我的项目类型

咱们抉择的调度零碎须要同时具备施行类我的项目、SAAS 产品两种需要下的数据中台撑持能力

基于以上需要咱们进行了调度零碎的选型比照。网上有十分多对于 Oozie、Azkaban、Airflow、DolphinScheduler、Xxl-job、Kettle 等调度选型的文章及介绍,在此不过多的开展他们的优缺点。咱们感觉每个产品的设计都有它本身的考量,都有实用与不实用的场景。联合咱们本身的应用需要最终咱们抉择了应用 DolphinScheduler 作为数据中台的调度平台。

次要起因如下:

  1. High Reliability(高可靠性)
    高可靠性是咱们看重的第一要点,因为不论是施行我的项目还是 SAAS 产品,只有零碎稳固产品才能够失常运行。DolphinScheduler 通过去中心化设计、原生 HA 工作队列反对、过载容错能力反对提供了高度持重的环境。在咱们半年的应用过程中也验证了其十分稳固。
  2. High Scalability:(高扩展性)
    因为咱们要反对施行我的项目 /SAAS 产品两种场景下的应用,DolphinScheduler 的多租户反对很好的符合了 SAAS 场景下资源隔离的应用需要。同时其扩缩容能力、高度的调度工作下限(10 万 +)都能很好的撑持咱们业务的扩展性需要。
  3. 丰盛的数据集成能力
    DolphinScheduler 提供的工作类型曾经远远涵盖了咱们常常应用的工作类型(DataX、SeaTunnel 的反对自身就涵盖了较多的 Source2Target 同步 / 推送场景)。
  4. 反对 Kubernetes 部署
    上文提到在私有化的部署形式下客户的部署环境大不相同,对于施行团队来说,如果可能简略、高效、统一的实现部署则会极大的进步我的项目投递部署效率,同时也能缩小很多因环境起因而产生的问题。
  5. 不仅仅是调度
    在调研 DolphinScheduler 的过程中咱们发现,除了调度自身这个环节,联合 DCMM(数据管理能力成熟度评估模型)的国标 8 个能力域,DolphinScheduler 在数据品质模块也做了很多实现,这无疑又命中了咱们对于数据品质能力建设的需要。同时 Apache DolphinScheduler 的服务架构中还有 AlertServer 服务。作为整体数据中台计划来说前期齐全能够将所有报警需要集成在 Apache DolphinScheduler 的报警服务中,防止多零碎反复造轮子。从这些点思考 DolphinScheduler 它不仅仅是一个调度工具而更像是一个数据开发平台。(期待随着社区的迭代会有更残缺的生态实现)
  6. 问题解决难度
    DolphinScheduler 社区十分的沉闷,在退出 DolphinScheduler 社区群后每天都能够看到十分多的搭档在交换对于 Apache DolphinScheduler 应用过程中的问题,社区人员也都踊跃的予以回复。同时 Dolphinscheduler 又是咱们国产开源软件,所以齐全不用放心存在沟通上的阻碍。

四、基于 DolphinScheduler 的我的项目实际

1、DolphinScheduler ON Kubernetes

DolphinScheduler 反对多种部署形式: 单机部署(Standalone)、伪集群部署(Pseudo-Cluster)、集群部署(Cluster)、Kubernetes 部署(Kubernetes)。在我的项目施行的场景下因为客户提供的部署环境变幻无穷,咱们须要一种稳固、疾速、不挑环境的部署形式。Apache DolphinScheduler on K8S 的部署形式很好的满足了咱们的需要,此部署形式能极大的进步整体我的项目的部署效率及动静扩展性。

  • Kubernetes 是一个开源的容器编排平台,能够实现容器的自动化部署、扩缩容、服务发现、负载平衡,能够进步 DolphinScheduler 的可用性、可扩展性和可维护性
  • Kubernetes 能够反对多种存储类型,包含本地存储、网络存储和云,能够满足 DolphinScheduler 多节点共享长久化存储需要
  • Kubernetes 能够反对多种调度策略,包含亲和性、反亲和性、污点和容忍,能够优化 DolphinScheduler 的资源利用率,进步工作执行效率。
  • Kubernetes 能够反对多种监控和日志计划,包含 Prometheus、Grafana、ELK 等等,能够不便地对 DolphinScheduler 的运行状态和性能进行监控,剖析和告警

在部署 Apache DolphinScheduler on K8S 的过程中咱们也曾遇到过一些问题,上面是咱们总结的一些 Kubernetes 部署要点:

自定义镜像

FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-alert-server: 版本号
# 如果你想反对 MySQL 数据源
COPY ./mysql-connector-java-8.0.16.jar /opt/dolphinscheduler/libs

dolphinscheduler-api

FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-api: 版本号

# 如果你想反对 MySQL 数据源
COPY ./mysql-connector-java-8.0.16.jar /opt/dolphinscheduler/libs

# 如果你想反对 Oracle 数据源
COPY ./ojdbc8-19.9.0.0.jar /opt/dolphinscheduler/libs

dolphinscheduler-master

FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-master: 版本号

# 如果你想反对 MySQL 数据源
COPY ./mysql-connector-java-8.0.16.jar /opt/dolphinscheduler/libs

dolphinscheduler-tools

FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-tools: 版本号

# 如果你想反对 MySQL 数据源
COPY ./mysql-connector-java-8.0.16.jar /opt/dolphinscheduler/libs

dolphinscheduler-worker

FROM dolphinscheduler.docker.scarf.sh/apache/dolphinscheduler-worker: 版本号

# 如果你想反对 MySQL 数据源
COPY ./mysql-connector-java-8.0.16.jar /opt/dolphinscheduler/libs

# 如果你想反对 Oracle 数据源
COPY ./ojdbc8-19.9.0.0.jar /opt/dolphinscheduler/libs

# 如果你想反对 hadoop 数据源
COPY ./hadoop-common-2.7.3.jar /opt/dolphinscheduler/libs
COPY ./hadoop-core-1.2.1.jar /opt/dolphinscheduler/libs

# 如果你想反对 hive 数据源
COPY ./hive-common.jar /opt/dolphinscheduler/libs
COPY ./hive-jdbc.jar /opt/dolphinscheduler/libs
COPY ./hive-metastore.jar /opt/dolphinscheduler/libs
COPY ./hive-serde.jar /opt/dolphinscheduler/libs
COPY ./hive-service.jar /opt/dolphinscheduler/libs

# 装置 python3 环境
RUN apt-get update && \
    apt-get install -y --no-install-recommenApache DolphinScheduler curl && \
    rm -rf /var/lib/apt/lists/*

RUN apt-get update && \
    apt-get install -y --no-install-recommenApache DolphinScheduler libcurl4-openssl-dev libssl-dev && \
    rm -rf /var/lib/apt/lists/*

RUN apt-get update && \
    apt-get install -y --no-install-recommenApache DolphinScheduler python3 && \
    rm -rf /var/lib/apt/lists/*

RUN apt-get update && \
    apt-get install -y --no-install-recommenApache DolphinScheduler python3-pip && \
    rm -rf /var/lib/apt/lists/*
    
# 装置 dataX 并且解压缩
COPY ./datax.tar.gz /home
RUN tar -zxvf /home/datax.tar.gz -C /opt

配置文件批改

官网教程中的 helm 进行装置,在装置过程中须要批改源码中 “/deploy/kubernetes/dolphinscheduler/” 门路下的 “values.yaml,Chart.yaml” 里的相干镜像和版本 ( 注: 原配置没有指定长久化贮存类,会应用默认的存储类,倡议是批改并应用可多节点读写并且能够弹性扩大的,多节点读写便于 worker 节点共用同一套 lib)

生产配置

  • dolphinscheduler-api 3 正本(注:默认是 1 正本,然而理论应用中发现,平台页面在应用过程中会有卡顿,减少正本数之后,会有明显改善
  • dolphinscheduler-alert 1 正本
  • dolphinscheduler-zookeeper 1 正本
  • dolphinscheduler-worker 3 正本
  • dolphinscheduler-master 3 正本

k8s 部署总结

采纳 k8s 部署后,最大感触就是可排除环境烦扰,疾速扩大,迁徙,部署,帮忙我司实现了数据中台私有化中的调度标准化,该计划已在多个景区中进行疾速落地并利用。

2、基于 SQL 脚本血统的 DolphinScheduler 工作流自动化实现

我的项目背景

基于景区中各个业务零碎(票务、营销、安防、酒店、商业、能耗、停车等)在景区机房下建设数据中台,实现以下利用需要:

  • 满足各个业务部门日常的报表需要
  • 反对各类大屏看板展现
  • 为景区的管理层决策提供数据根据

    技术选型

    数据仓库:Doris
    调度工具:DolphinScheduler 应用版本:3.0.0
    版本治理:Gitlab
    容器编排:Kubernete

    解决流程

  • 业务剖析与指标确认:与业务部门沟通,理解业务需要和指标,明确数据指标的定义、计算逻辑和展现形式。
  • 数据仓库分层和设计:依据数据仓库的四层架构(ODS、DWD、DWS、ADS),设计数据模型和表构造,标准命名和正文。
  • 数据脚本开发:编写数据抽取、荡涤、转换、加载等脚本,实现数据从源零碎到指标表的流转和解决。
  • 数据任务调度:入仓后的执行脚本通过血统辨认依赖主动录入 DolphinScheduler 造成工作流任务调度(设置工作依赖、触发条件、重试策略等参数)监控工作运行状态和日志。
  • 数据输入和文档:输入规范指标库和相干文档,供 BI、可视化报表等应用,同时编写数据开发文档,记录数据开发过程中的要害信息和问题。

上面介绍下咱们依据 SQL 血统辨认主动生成 Apache DolphinScheduler 调度工作的实现过程:

在 DolphinScheduler 平台上开发数据调度工作流的过程中咱们遇到一个问题:如果一个工作流下的任务量太多,在原有的 UI 界面中想通过界面形式实现配置更改以及血缘关系的建设等操作会十分不便捷。即使是通过工作定义去配置,当上百个工作同时须要配置依赖关系时也是一个不小的工作量开销而且还容易出错,前期的更新迭代也较为不便。

咱们联合工作流下 SQL 工作自身的特点(数仓 SQL 工作是整体依照 Apache DolphinScheduler、DWD、DWS、Apache DolphinScheduler 的计算流程进行计算,每个表对应一个 Apache DolphinScheduler 的 Task 既每个 Task 都会蕴含 SourceTable 及 TargetTabe。通过这层关系咱们就能够构建起残缺的数仓工作血统依赖)。基于以上想法咱们实现了从数据脚本主动生成对应的工作流和工作的自动化计划:

  • 数据入仓后的开发脚本以每个表单为单位对应生成一个 TaskSQL 脚本提交到 git。
  • 自动化工具生成工作流及工作前主动从 git 库中获取最新的数据脚本。
  • 自动化工具拉取到最新代码后辨认和剖析所有数据脚本之间的血缘关系。
  • 自动化工具通过血缘关系主动将所有的工作关联并组装到一个工作流中:每个工作执行实现后,会立刻触发上游工作,最大化地利用服务器资源。

以下是该实现的外围代码:

sql 解析

SqlParse 是应用阿里的 druid 中的组件 MySqlStatementParser 
/**
 * sql 解析
 * @param sqlStr
 * @return
 */
public static Map<String, Set<String>> sqlParser(String sqlStr) {List<String> sqlList = StrUtil.split(sqlStr, ";");
    Map<String, Set<String>> map = new HashMap<>();
    for (String sql : sqlList) {if (StrUtil.isBlank(sql)) {continue;}
        // 这里应用的时 mysql 解析
        MySqlStatementParser parser = new MySqlStatementParser(sql);
        SQLStatement sqlStatement = parser.parseStatement();
        MySqlSchemaStatVisitor visitor = new MySqlSchemaStatVisitor();
        sqlStatement.accept(visitor);
        Map<TableStat.Name, TableStat> tableStatMap = visitor.getTables();
        for (Map.Entry<TableStat.Name, TableStat> tableStatEntry : tableStatMap.entrySet()) {String name = tableStatEntry.getKey().getName();
            // 这里的 value 有两种 Select(父级)、Insert(子级)String value = tableStatEntry.getValue().toString();
            if (map.containsKey(value)) {map.get(value).add(name);
            } else {Set<String> list = new HashSet<>();
                list.add(name);
                map.put(value, list);
            }
        }
    }
    return map;
}

节点对象定义

/**
 * 工作节点定义
 */
public class Apache DolphinSchedulerTaskNode implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 源表
     */
    private List<String> sourceTableName = new ArrayList<>();

    /**
     * 指标表
     */
    private String targetTableName;

    /**
     * 源 sql
     */
    private String sql;

    /**
     * 用 sql 做一个 MD5 签名
     */
    private String md5;

    /**
     * 用 sql 名称
     */
    private String name;

    /**
     * 工作 code
     */
    private Long taskCode;

    ...
}

/**
 * 树型节点定义
 */
public class MyTreeNode extenApache DolphinScheduler Apache DolphinSchedulerTaskNode {
    /**
     * 增加一个子节点列表属性
     */
    private List<MyTreeNode> children;

    ...
}

树型构造组装

/**
 * 解析 sql, 并组装 node
 *
 * @param files
 * @return
 */
private static List<MyTreeNode> treeNodeProcess(List<File> files) {List<MyTreeNode> sourceList = new ArrayList<>();
    for (File sqlFile : files) {
        // 1 取出外面的 sql 脚本内容
        String sql = FileUtil.readUtf8String(sqlFile);
        // 2 解析外面的脚本内容
        Map<String, Set<String>> map = null;
        try {
            // 血统解析
            map = AutoCreateTask.sqlParser(sql);
        } catch (Exception e) {log.error("table-parser error: {}", sqlFile.getPath());
        }
        // 3 将每一个 sql 的 source , target 解析进去
        if (ObjectUtil.isNotNull(map)) {MyTreeNode treeNode = new MyTreeNode();
            treeNode.setName(sqlFile.getName());
            if (map.containsKey("Select")) {Set<String> select = map.get("Select");
                treeNode.setSourceTableName(new ArrayList<>(select));
            }
            treeNode.setSql(sql);
            if (map.containsKey("Insert")) {Set<String> insert = map.get("Insert");
                treeNode.setTargetTableName(new ArrayList<>(insert).get(0));
            }
            sourceList.add(treeNode);
        }
    }
    // 将 sql 依照 source , target 组成 树状构造
    return TreeUtil.getTree(sourceList);
}

/**
 * 组成树状构造
 * @param sourceList
 * @return
 */
public static List<MyTreeNode> getTree(List<MyTreeNode> sourceList) {Map<String, Set<MyTreeNode>> sourceMap = new HashMap<>();
    Map<String, Set<MyTreeNode>> targetMap = new HashMap<>();
    for (MyTreeNode node : sourceList) {
        // 源表 Map
        List<String> subSourceTableList = node.getSourceTableName();
        if (IterUtil.isNotEmpty(subSourceTableList)) {for (String subSourceTable : subSourceTableList) {if (sourceMap.containsKey(subSourceTable)) {sourceMap.get(subSourceTable).add(node);
                } else {Set<MyTreeNode> nodeSet = new HashSet<>();
                    nodeSet.add(node);
                    sourceMap.put(subSourceTable, nodeSet);
                }
            }
        }

        // 指标表 Map
        String targetTableName = node.getTargetTableName();
        if (targetMap.containsKey(targetTableName)) {targetMap.get(targetTableName).add(node);
        } else {Set<MyTreeNode> nodeSet = new HashSet<>();
            nodeSet.add(node);
            targetMap.put(targetTableName, nodeSet);
        }
    }
    // 创立一个列表,用于存储根节点
    List<MyTreeNode> rootList = new ArrayList<>();
    for (MyTreeNode node : sourceList) {
        // 将子节点解决好
        String targetTableName = node.getTargetTableName();
        if (sourceMap.containsKey(targetTableName)) {List<MyTreeNode> childrenList = node.getChildren();
            if (IterUtil.isEmpty(childrenList)) {childrenList = new ArrayList<>();
                node.setChildren(childrenList);
            }
            childrenList.addAll(sourceMap.get(targetTableName));
        }

        List<String> subSourceTableList = node.getSourceTableName();
        boolean isRoot = true;
        for (String subSourceTable : subSourceTableList) {if (targetMap.containsKey(subSourceTable)) {
                isRoot = false;
                break;
            }
        }
        if (isRoot) {rootList.add(node);
        }
    }
    return rootList;
}

局部效果图展现:

自动化生成的工作定义

自动化生成的工作流定义图

增量运行后果

自动化实现总结

  1. 数仓调度工作的秒级上线与切换
    通过该形式将数仓开发与 DolphinScheduler 解耦,实现了整体数据调度工作的秒级上线与切换。这样,咱们能够疾速复制标品化数据建模,极大地简化了施行的难度。
  2. 血统进行的工作关联与生成
    通过血统进行的工作关联与生成,大大晋升了整体的资源利用率,也缩小了人工的投入和老本。
  3. 血统监控和治理
    通过血统监控和治理,能够帮忙咱们及时发现和纠正工作执行过程中的问题和谬误,保证数据品质和准确性。

五、将来布局

  • 离在线对立调度 : 实现基于 Apache SeaTunnel 的离线与实时数据同步调度,使得两个场景在一个平台实现。
  • 利用基线治理:依据各工作的上下游依赖构建数据利用基线,并监控各指标工作及其依赖工作是否按时胜利运行,以确保指标工作的准时产出。
  • 工作指标监控:反对实时查看每个组件的指标,例如输出记录数、输入记录数和执行工夫等。
  • 数据血缘关系:上报数据源、指标表、字段等信息,构建数据血缘关系图,不便追溯数据的起源和影响。
  • 资源文件版本治理:反对资源文件等的简略多版本治理,能够查看历史版本、回滚到指定版本等。

六、总结与致谢

  1. 通过应用 DolphinScheduler 替换原有的调度工具,使得数据开发运维实现了平台统一化。基于 Apache DolphinScheduler 提供的弱小集成扩大插件能力大大晋升了数据集成与数据开发的效率。
  2. DolphinScheduler 自带的告警治理性能十分全面。配合此性能咱们建设了运维值班制度以及微信告警告诉的性能。故障产生时,运维人员能够第一工夫收到报警告诉,无效进步了故障的感知能力。
  3. 基于 DolphinScheduler 调度技术计划在多个我的项目中的优异体现,使得咱们更好的推动了公司的数据驱动策略。从实际中积淀出公司的数据施行 SOP,为各个客户、业务部门提供了及时、精确、全面的数据分析和决策反对。
  4. 咱们第一次部署时应用的是 3.0.0 版本。目前社区曾经公布了 3.1.7 迭代速度十分快。如果你们的我的项目正在选型调度工具,咱们强烈建议应用 DolphinScheduler。退出社区进群会有十分多的前辈、大佬带你腾飞。DolphinScheduler 值得鼎力举荐,心愿大家都能从中受害,祝福 DolphinScheduler 生态越来越凋敝,越来越好!

    本文由 白鲸开源科技 提供公布反对!

正文完
 0