关于数据库:如何快速在-Apache-DolphinScheduler-新扩展一个任务插件

41次阅读

共计 8382 个字符,预计需要花费 21 分钟才能阅读完成。

作者 | 代立冬

编辑 | Debra Chen

Apache DolphinScheduler 是古代数据工作流编排平台,具备十分弱小的可视化能力,DolphinScheduler 致力于使数据工程师、分析师、数据科学家等数据工作者都能够简略轻松地搭建各种数据工作流,让数据处理流程更简略牢靠。

DolphinScheduler 十分易于应用 (easy to use),目前有四种创立工作流的办法:

  • 在 UI 界面上间接通过拖放工作的形式来创立工作
  • PyDolphinScheduler,通过 Python API 创立工作流,也就是 workflow as code 的形式
  • 编写 yaml 文件,通过 yaml 创立工作流(目前必须装置 PyDolphinScheduler)
  • 通过 Open API 的形式来创立工作流

以上 4 种总有一种形式适宜您的场景!

得益于 DolphinScheduler 采纳无中心化的整体架构设计,使得 DolphinScheduler 调度性能也是同类开源数据工作流编排平台的 5 倍以上,如果您正有这样的性能问题或者调度延时问题,也无妨试试 DolphinScheduler。

DolphinScheduler 界面

好的,接下来言归正题,有不少用户想在 DolphinScheduler 扩大新的工作插件反对 (比方增加 Kettle),DolphinScheduler 的工作插件体系是基于 SPI 来进行工作插件扩大的。

什么是 SPI 服务发现?

SPI 是 Service Provider Interface 的缩写,是一种常见的服务提供发现机制,比方出名的 OLAP 引擎 Presto 也是应用 SPI 来扩大的。在 java.util.ServiceLoader 的文档里有比拟具体的介绍,其形象的概念是指动静加载某个服务实现。

比方 java.sql.Driver 接口,不同厂商能够针对同一接口做出不同的实现,比方 MySQL 和 PostgreSQL 都有不同的实现提供给用户,而 Java 的 SPI 机制能够为某个接口寻找服务实现。Java 中 SPI 机制次要思维是将拆卸的控制权移到程序之外,在模块化设计中这个机制尤其重要,其核心思想就是解耦。

SPI 整体机制图如下:

SPI 机制中有 4 个重要的组件:

  • 服务接口 Service Interface
  • 服务接口实现:不同的服务提供方能够提供一个或多个实现;框架或者零碎自身也能够提供默认的实现
  • 提供者注册 API(Provider Registration API),这是提供者用来注册实现的
  • 服务拜访 API (Service Access API),这是调用方用来获取服务的实例的接口

Apache DolphinScheduler 从 2.0 版本开始引入 SPI。将 Apache DolphinScheduler 的 Task 看成一个执行服务,而咱们须要依据使用者的抉择去执行不同的服务,如果没有的服务,则须要咱们本人裁减,咱们只须要实现咱们的 Task 具体实现逻辑,而后恪守 SPI 的规定,编译成 Jar 并上传到指定目录,就能够应用咱们本人编写的 Task 插件来执行具体的工作了。

谁在应用它?

除了后面提到的 Presto 外,还有以下技术都应用到 SPI 技术:

1、Apache DolphinScheduler

  • Task
  • Datasource

2、Apache Flink

  • Flink sql connector,用户实现了一个 Flink-connector 后,Flink 也是通过 SPI 来动静加载的

3、SpringBoot

  • Spring boot spi

4、JDBC

  • JDBC4 也基于 SPI 的机制来发现驱动提供商了,能够通过 META-INF/services/java.sql.Driver 文件里指定实现类的形式来裸露驱动提供者

5、更多

  • common-logging

DolphinScheduler SPI 工作流程

如上图,Apache DolphinScheduler 中有 2 种 Task : 逻辑 Task 和物理 Task,逻辑 Task 指 Dependent Task,Switch Task 这种管制工作流逻辑的工作插件;物理 Task 是指 Shell Task,SQL Task,Spark Task,Python Task 等这种执行具体任务的 Task。

在 Apache DolphinScheduler 中,咱们个别裁减的都是物理 Task,物理 Task 都是由 Worker 来调用并执行的,当启动 Worker 服务时,Worker 会来加载相应的实现了规定的 Task lib,HiveTask 被 Apache DolphinScheduler TaskPluginManage 加载了。SPI 的规定图上也有形容,也能够参考 java.util.ServiceLoader 类。

如何扩大一个工作插件?

创立 Maven 我的项目

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.dolphinscheduler \
    -DarchetypeArtifactId=dolphinscheduler-hive-client-task \
    -DarchetypeVersion=1.10.0 \
    -DgroupId=org.apache.dolphinscheduler \
    -DartifactId=dolphinscheduler-hive-client-task \
    -Dversion=0.1 \
    -Dpackage=org.apache.dolphinscheduler \
    -DinteractiveMode=false 

Maven 依赖

org.apache.dolphinscheduler
     dolphinscheduler-spi
     ${dolphinscheduler.lib.version}
     ${common.lib.scope}




     org.apache.dolphinscheduler
     dolphinscheduler-task-api
     ${dolphinscheduler.lib.version}
     ${common.lib.scope}

创立 Task 通道工厂 (TaskChannelFactory)

org.apache.dolphinscheduler.spi.task.TaskChannel

插件实现以上接口即可。次要蕴含创立工作(工作初始化,工作运行等办法)、工作勾销,如果是 yarn 工作,则须要实现 org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask。

咱们在 dolphinscheduler-task-api 模块提供了所有工作对外拜访的 API,而 dolphinscheduler-spi 模块则是 spi 通用代码库,定义了所有的插件模块,比方告警模块,注册核心模块等,你能够具体浏览查看。

首先咱们须要创立工作服务的工厂,其次要作用是帮忙构建 TaskChannel 以及 TaskPlugin 参数,同时给出该工作的惟一标识,ChannelFactory 在 Apache DolphinScheduler 的 Task 服务组中,其作用属于是在工作组中的承前启后,交互前后端以及帮忙 Worker 构建 TaskChannel。

package org.apache.dolphinscheduler.plugin.task.hive;
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;
import java.util.List;
public class HiveClientTaskChannelFactory implements TaskChannelFactory {
    /**
    * Create task channel, execute task through this channel
     * @return task channel
     */
    @Override
    public TaskChannel create() {return new HiveCliTaskChannel();
    }
    /**
    * Returns the global unique identifier of this task
     * @return task name
     */
    @Override
    public String getName() {return "HIVECLI";}
    /**
    * Parameters required for front-end pages
     * @return
     */
    @Override
    public List getParams() {return null;}
}

创立 TaskChannel

有了工厂之后,咱们会依据工厂创立出 TaskChannel,TaskChannel 蕴含如下两个办法,一个是勾销,一个是创立,目前不须要关注勾销,次要关注创立工作。

   void cancelApplication(boolean status);
    /**
     * 构建可执行工作
     */
    AbstractTask createTask(TaskRequest taskRequest);
    public class HiveClientTaskChannel implements TaskChannel {
    @Override
    public void cancelApplication(boolean b) {//do nothing}
    @Override
    public AbstractTask createTask(TaskRequest taskRequest) {return new HiveClientTask(taskRequest);
    }
}

构建 Task 实现

通过 TaskChannel 咱们失去了可执行的物理 Task,然而咱们须要给以后 Task 增加相应的实现,才可能让 Apache DolphinScheduler 去执行你的工作,首先在编写 Task 之前咱们须要先理解一下 Task 之间的关系:

通过上图咱们能够看到,基于 Yarn 执行工作的 Task 都会去继承 AbstractYarnTask,不须要通过 Yarn 执行的都会去间接继承 AbstractTaskExecutor,次要是蕴含一个 AppID,以及 CanalApplication setMainJar 之类的办法,想晓得的小伙伴能够本人去深入研究一下,如上可知咱们实现的 HiveClient 就须要继承 AbstractYarnTask,在构建 Task 之前,咱们须要构建一下适配 HiveClient 的 Parameters 对象用来反序列化 JsonParam。

package com.jegger.dolphinscheduler.plugin.task.hive;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.ResourceInfo;
import java.util.List;
public class HiveClientParameters extends AbstractParameters {
    /**
     * 用 HiveClient 执行, 最简略的形式就是将所有 SQL 全副贴进去即可, 所以咱们只须要一个 SQL 参数
     */
    private String sql;
    public String getSql() {return sql;}
    public void setSql(String sql) {this.sql = sql;}
    @Override
    public boolean checkParameters() {return sql != null;}
    @Override
    public List getResourceFilesList() {return null;}
}

实现了 Parameters 对象之后,咱们具体实现 Task,例子中的实现比较简单,就是将用户的参数写入到文件中,通过 Hive -f 去执行工作。

package org.apache.dolphinscheduler.plugin.task.hive;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
public class HiveClientTask extends AbstractYarnTask {
    /**
     * hive client parameters
     */
    private HiveClientParameters hiveClientParameters;
    /**
     * taskExecutionContext
     */
    private final TaskRequest taskExecutionContext;
    public HiveClientTask(TaskRequest taskRequest) {super(taskRequest);
        this.taskExecutionContext = taskRequest;
    }
    /**
     * task init method
     */
    @Override
    public void init() {logger.info("hive client task param is {}", JSONUtils.toJsonString(taskExecutionContext));
        this.hiveClientParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), HiveClientParameters.class);
        if (this.hiveClientParameters != null && !hiveClientParameters.checkParameters()) {throw new RuntimeException("hive client task params is not valid");
        }
    }
    /**
     * build task execution command
     *
     * @return task execution command or null
     */
    @Override
    protected String buildCommand() {String filePath = getFilePath();
        if (writeExecutionContentToFile(filePath)) {return "hive -f" + filePath;}
        return null;
    }
    /**
     * get hive sql write path
     *
     * @return file write path
     */
    private String getFilePath() {return String.format("%s/hive-%s-%s.sql", this.taskExecutionContext.getExecutePath(), this.taskExecutionContext.getTaskName(), this.taskExecutionContext.getTaskInstanceId());
    }
    @Override
    protected void setMainJarName() {//do nothing}
    /**
     * write hive sql to filepath
     *
     * @param filePath file path
     * @return write success?
     */
    private boolean writeExecutionContentToFile(String filePath) {Path path = Paths.get(filePath);
        try (BufferedWriter writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)) {writer.write(this.hiveClientParameters.getSql());
            logger.info("file:" + filePath + "write success.");
            return true;
        } catch (IOException e) {logger.error("file:" + filePath + "write failed.please path auth.");
            e.printStackTrace();
            return false;
        }
    }
    @Override
    public AbstractParameters getParameters() {return this.hiveClientParameters;}
}

恪守 SPI 规定

# 1,Resource 下创立 META-INF/services 文件夹, 创立接口全类名雷同的文件
zhang@xiaozhang resources % tree ./
./
└── META-INF
    └── services
        └── org.apache.dolphinscheduler.spi.task.TaskChannelFactory
# 2, 在文件中写入实现类的全限定类名
zhang@xiaozhang resources % more META-INF/services/org.apache.dolphinscheduler.spi.task.TaskChannelFactory 
org.apache.dolphinscheduler.plugin.task.hive.HiveClientTaskChannelFactory

打包和部署

## 1, 打包
mvn clean install
## 2, 部署
cp ./target/dolphinscheduler-task-hiveclient-1.0.jar $DOLPHINSCHEDULER_HOME/lib/
## 3,restart dolphinscheduler server

以上操作实现后,咱们查看 worker 日志 tail -200f $Apache DolphinScheduler_HOME/log/Apache DolphinScheduler-worker.log

Apache DolphinScheduler 的插件开发就到此实现~ 波及到前端的批改能够参考:
Apache DolphinScheduler-ui/src/js/conf/home/pages/dag/_source/formModel/

  • NOTICE:目前工作插件的前端还没有实现,因而你须要独自实现插件对应的前端页面。

TaskChannelFactory 继承自 PrioritySPI,这意味着你能够设置插件的优先级,当你有两个插件同名时,你能够通过重写 getIdentify 办法来自定义优先级。高优先级的插件会被加载,然而如果你有两个同名且优先级雷同的插件,加载插件时服务器会抛出 IllegalArgumentException。

如果工作插件存在类抵触,你能够采纳 Shade-Relocating Classes(https://maven.apache.org/plugins/maven-shade-plugin/) 来解决这种问题。

如果您有趣味试试 Apache DolphinScheduler,欢送微信增加小助手 Leonard-ds 或退出 DolphinScheduler Slack: https://s.apache.org/dolphinscheduler-slack, 我将收费全力支持您!

  • 参考:

    • 极速开发裁减 Apache DolphinScheduler Task 类型 | 实用教程
    • https://blog.csdn.net/s1293678392/article/details/120048318

    本文由 白鲸开源科技 提供公布反对!

正文完
 0