乐趣区

关于workflow:极速开发扩充-Apache-DolphinScheduler-Task-类型-实用教程

背景简介

目前在大数据生态中,调度零碎是不可或缺的一个重要组件。Apache DolphinScheduler 作为一个顶级的 Apache 我的项目,其稳定性和易用性也能够说是名落孙山的。而对于一个调度零碎来说,可能反对的可调度的工作类型同样是一个十分重要的因素,在调度、分布式、高可用、易用性解决了的状况下,随着业务的倒退或者各种需要应用到的组件增多,用户自然而然会心愿可能疾速、不便、简洁地对 Apache Dolphinscheduler 可调度的工作类型进行裁减。本文便带大家理解如何不便、极速裁减一个 Apache DolphinScheduler Task。

作者简介

张柏强,大数据开发工程师,次要钻研方向为实时计算、元数据治理、大数据根底组件。

01 什么是 SPI 服务发现 (What is SPI)?

SPI 全称为 (Service Provider Interface),是 JDK 内置的一种服务提供发现机制。大多数人可能会很少用到它,因为它的定位次要是面向开发厂商的,在 java.util.ServiceLoader 的文档里有比拟具体的介绍,其形象的概念是指动静加载某个服务实现。

02 为什么要引入 SPI(Why did we introduce SPI)?

不同的企业可能会有本人的组件须要通过 task 去执行,大数据生态中最为罕用数仓工具 Apache Hive 来举例,不同的企业应用 Hive 办法各有不同。有的企业通过 HiveServer2 执行工作,有的企业应用 HiveClient 执行工作,而 Apache DolphinScheduler 提供的开箱即用的 Task 中并没有反对 HiveClient 的 Task,所以大部分使用者都会通过 Shell 去执行。然而,Shell 哪有人造的 TaskTemplate 好用呢?所以,Apache DolphinScheduler 为了使用户可能更好地依据企业需要定制不同的 Task,便反对了 TaskSPI 化。

咱们首先要理解一下 Apache DolphinScheduler 的 Task 改版历程,在 DS 1.3.x 时,裁减一个 Task 须要从新编译整个 Apache DolphinScheduler,耦合重大,所以在 Apache DolphinScheduler 2.0.x 引入了 SPI。后面咱们提到了 SPI 的抽象概念是动静加载某个服务的实现,这里咱们具象一点,将 Apache DolphinScheduler 的 Task 看成一个执行服务,而咱们须要依据使用者的抉择去执行不同的服务,如果没有的服务,则须要咱们本人裁减,相比于 1.3.x 咱们只须要实现咱们的 Task 具体实现逻辑,而后恪守 SPI 的规定,编译成 Jar 并上传到指定目录,即可应用咱们本人编写的 Task。

03 谁在应用它 (Who is using it)?

1、Apache DolphinScheduler

  • task
  • datasource

2、Apache Flink

flink sql connector,用户实现了一个 flink-connector 后,Flink 也是通过 SPI 来动静加载的

3、Spring boot

spring boot spi

4、Jdbc

jdbc4。0 以前,开发人员还须要基于 Class。forName(“xxx”) 的形式来装载驱动,jdbc4 也基于 spi 的机制来发现驱动提供商了,能够通过 META-INF/services/java。sql。Driver 文件里指定实现类的形式来裸露驱动提供者

5、更多

dubbo

common-logging

04 Apache DolphinScheduler SPI Process?

分析一下下面这张图,我给 Apache DolphinScheduler 分为逻辑 Task 以及物理 Task,逻辑 Task 指 DependTask,SwitchTask 这种逻辑上的 Task;物理 Task 是指 ShellTask,SQLTask 这种执行工作的 Task。而在 Apache DolphinScheduler 中,咱们个别裁减的都是物理 Task,而物理 Task 都是交由 Worker 去执行,所以咱们要明确的是,当咱们在有多台 Worker 的状况下,要将自定义的 Task 散发到每一台有 Worker 的机器上,当咱们启动 Worker 服务时,worker 会去启动一个 ClassLoader 来加载相应的实现了规定的 Task lib,能够看到 HiveClient 和 SeatunnelTask 都是用户自定义的,然而只有 HiveTask 被 Apache DolphinScheduler TaskPluginManage 加载了,起因是 SeatunnelTask 并没有去恪守 SPI 的规定。SPI 的规定图上也有赘述,也能够参考 java.util.ServiceLoader 这个类,上面有一个简略的参考 (摘出的一部分代码,具体能够本人去看看)

public final class ServiceLoader<S> implements Iterable<S> {

//scanning dir prefix
private static final String PREFIX = "META-INF/services/";

//The class or interface representing the service being loaded
private final Class<S> service;

//The class loader used to locate, load, and instantiate providers
private final ClassLoader loader;

//Private inner class implementing fully-lazy provider lookup
private class LazyIterator implements Iterator<S> {
    Class<S> service;
    ClassLoader loader;
    Enumeration<URL> configs = null;
    String nextName = null;

    //......
    private boolean hasNextService() {if (configs == null) {
            try {
                //get dir all class
                String fullName = PREFIX + service.getName();
                if (loader == null)
                    configs = ClassLoader.getSystemResources(fullName);
                else
                    configs = loader.getResources(fullName);
            } catch (IOException x) {//......}
            //......
        }
    }
}

}

05 如何扩大一个 data sourceTask or DataSource (How to extend a task or datasource)?

5.1 创立 Maven 我的项目

mvn archetype:generate \

-DarchetypeGroupId=org.apache.dolphinscheduler \
-DarchetypeArtifactId=dolphinscheduler-hive-client-task \
-DarchetypeVersion=1.10.0 \
-DgroupId=org.apache.dolphinscheduler \
-DartifactId=dolphinscheduler-hive-client-task \
-Dversion=0.1 \
-Dpackage=org.apache.dolphinscheduler \
-DinteractiveMode=false 

5.2 Maven 依赖

<!–dolphinscheduler spi basic core denpendence–>
<dependency>

 <groupId>org.apache.dolphinscheduler</groupId>
 <artifactId>dolphinscheduler-spi</artifactId>
 <version>${dolphinscheduler.lib.version}</version>
 <scope>${common.lib.scope}</scope>

</dependency>
<dependency>

 <groupId>org.apache.dolphinscheduler</groupId>
 <artifactId>dolphinscheduler-task-api</artifactId>
 <version>${dolphinscheduler.lib.version}</version>
 <scope>${common.lib.scope}</scope>

</dependency>

5.3 创立 Task 通道工厂 (TaskChannelFactory)

首先咱们须要创立工作服务的工厂,其次要作用是帮忙构建 TaskChannel 以及 TaskPlugin 参数,同时给出该工作的惟一标识,ChannelFactory 在 Apache DolphinScheduler 的 Task 服务组中,其作用属于是在工作组中的承前启后,交互前后端以及帮忙 Worker 构建 TaskChannel。

package org.apache.dolphinscheduler.plugin.task.hive;

import org.apache.dolphinscheduler.spi.params.base.PluginParams;
import org.apache.dolphinscheduler.spi.task.TaskChannel;
import org.apache.dolphinscheduler.spi.task.TaskChannelFactory;

import java.util.List;

public class HiveClientTaskChannelFactory implements TaskChannelFactory {

/**
 *  创立工作通道, 基于该通道执行工作
 * @return 工作通道
 */
@Override
public TaskChannel create() {return new HiveClientTaskChannel();
}

/**
 *  返回当前任务的全局惟一标识
 * @return 工作类型名称
 */
@Override
public String getName() {return "HIVE CLIENT";}

/**
 * 前端页面须要用到的渲染, 次要分为
 
 * @return
 */
@Override
public List<PluginParams> getParams() {List<PluginParams> pluginParams = new ArrayList<>();
    InputParam nodeName = InputParam.newBuilder("name", "$t('Node name')")
            .addValidate(Validate.newBuilder()
                    .setRequired(true)
                    .build())
            .build();
    PluginParams runFlag = RadioParam.newBuilder("runFlag", "RUN_FLAG")
            .addParamsOptions(new ParamsOptions("NORMAL", "NORMAL", false))
            .addParamsOptions(new ParamsOptions("FORBIDDEN", "FORBIDDEN", false))
            .build();

    PluginParams build = CheckboxParam.newBuilder("Hive SQL", "Test HiveSQL")
            .setDisplay(true)
            .setValue("-- author: \n --desc:")
            .build();

    pluginParams.add(nodeName);
    pluginParams.add(runFlag);
    pluginParams.add(build);

    return pluginParams;
}

}

5.4 创立 TaskChannel

有了工厂之后,咱们会依据工厂创立出 TaskChannel,TaskChannel 蕴含如下两个办法,一个是勾销,一个是创立,目前不须要关注勾销,次要关注创立工作。

void cancelApplication(boolean status);

/**
 * 构建可执行工作
 */
AbstractTask createTask(TaskRequest taskRequest);

public class HiveClientTaskChannel implements TaskChannel {

@Override
public void cancelApplication(boolean b) {//do nothing}

@Override
public AbstractTask createTask(TaskRequest taskRequest) {return new HiveClientTask(taskRequest);
}

}

5.5 构建 Task 实现

通过 TaskChannel 咱们失去了可执行的物理 Task,然而咱们须要给以后 Task 增加相应的实现,才可能让 Apache DolphinScheduler 去执行你的工作,首先在编写 Task 之前咱们须要先理解一下 Task 之间的关系:

通过上图咱们能够看到,基于 Yarn 执行工作的 Task 都会去继承 AbstractYarnTask,不须要通过 Yarn 执行的都会去间接继承 AbstractTaskExecutor,次要是蕴含一个 AppID,以及 CanalApplication setMainJar 之类的办法,想晓得的小伙伴能够本人去深入研究一下,如上可知咱们实现的 HiveClient 就须要继承 AbstractYarnTask,在构建 Task 之前,咱们须要构建一下适配 HiveClient 的 Parameters 对象用来反序列化 JsonParam。

package com.jegger.dolphinscheduler.plugin.task.hive;

import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.ResourceInfo;

import java.util.List;

public class HiveClientParameters extends AbstractParameters {

/**
 * 用 HiveClient 执行, 最简略的形式就是将所有 SQL 全副贴进去即可, 所以咱们只须要一个 SQL 参数
 */
private String sql;

public String getSql() {return sql;}

public void setSql(String sql) {this.sql = sql;}

@Override
public boolean checkParameters() {return sql != null;}

@Override
public List<ResourceInfo> getResourceFilesList() {return null;}

}

实现了 Parameters 对象之后,咱们具体实现 Task,例子中的实现比较简单,就是将用户的参数写入到文件中,通过 Hive -f 去执行工作。

package org.apache.dolphinscheduler.plugin.task.hive;

import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;

import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

public class HiveClientTask extends AbstractYarnTask {

/**
 * hive client parameters
 */
private HiveClientParameters hiveClientParameters;

/**
 * taskExecutionContext
 */
private final TaskRequest taskExecutionContext;


public HiveClientTask(TaskRequest taskRequest) {super(taskRequest);
    this.taskExecutionContext = taskRequest;
}

/**
 * task init method
 */
@Override
public void init() {logger.info("hive client task param is {}", JSONUtils.toJsonString(taskExecutionContext));
    this.hiveClientParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), HiveClientParameters.class);

    if (this.hiveClientParameters != null && !hiveClientParameters.checkParameters()) {throw new RuntimeException("hive client task params is not valid");
    }
}

/**
 * build task execution command
 *
 * @return task execution command or null
 */
@Override
protected String buildCommand() {String filePath = getFilePath();
    if (writeExecutionContentToFile(filePath)) {return "hive -f" + filePath;}
    return null;
}

/**
 * get hive sql write path
 *
 * @return file write path
 */
private String getFilePath() {return String.format("%s/hive-%s-%s.sql", this.taskExecutionContext.getExecutePath(), this.taskExecutionContext.getTaskName(), this.taskExecutionContext.getTaskInstanceId());
}

@Override
protected void setMainJarName() {//do nothing}

/**
 * write hive sql to filepath
 *
 * @param filePath file path
 * @return write success?
 */
private boolean writeExecutionContentToFile(String filePath) {Path path = Paths.get(filePath);
    try (BufferedWriter writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)) {writer.write(this.hiveClientParameters.getSql());
        logger.info("file:" + filePath + "write success.");
        return true;
    } catch (IOException e) {logger.error("file:" + filePath + "write failed.please path auth.");
        e.printStackTrace();
        return false;
    }

}

@Override
public AbstractParameters getParameters() {return this.hiveClientParameters;}

}

5.6 恪守 SPI 规定

# 1,Resource 下创立 META-INF/services 文件夹, 创立接口全类名雷同的文件
zhang@xiaozhang resources % tree ./
./
└── META-INF

└── services
    └── org.apache.dolphinscheduler.spi.task.TaskChannelFactory

2, 在文件中写入实现类的全限定类名

zhang@xiaozhang resources % more META-INF/services/org.apache.dolphinscheduler.spi.task.TaskChannelFactory
org.apache.dolphinscheduler.plugin.task.hive.HiveClientTaskChannelFactory

5.7 打包和部署

1、打包
mvn clean install
2、部署
cp ./target/dolphinscheduler-task-hiveclient-1.0.jar $DOLPHINSCHEDULER_HOME/lib/
3、restart dolphinscheduler server

以上操作实现后,咱们查看 worker 日志 tail -200f $Apache DolphinScheduler_HOME/log/Apache DolphinScheduler-worker.log

Apache DolphinScheduler 的插件开发就到此实现~ 波及到前端的批改能够参考:

Apache DolphinScheduler-ui/src/js/conf/home/pages/dag/_source/formModel/

参加奉献

随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真挚欢送酷爱开源的搭档退出到开源社区中来,为中国开源崛起献上一份本人的力量,让外乡开源走向寰球。

参加 DolphinScheduler 社区有十分多的参加奉献的形式,包含:

奉献第一个 PR(文档、代码) 咱们也心愿是简略的,第一个 PR 用于相熟提交的流程和社区合作以及感触社区的友好度。

社区汇总了以下适宜老手的问题列表:https://github.com/apache/dol…

非老手问题列表:https://github.com/apache/dol…

如何参加奉献链接:https://dolphinscheduler.apac…

来吧,DolphinScheduler 开源社区须要您的参加,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是微小的。

参加开源能够近距离与各路高手切磋,迅速晋升本人的技能,如果您想参加奉献,咱们有个贡献者种子孵化群,能够增加社区小助手微信 (Leonard-ds),手把手教会您 (贡献者不分程度高下,有问必答,要害是有一颗违心奉献的心)。

增加小助手微信时请阐明想参加奉献。

来吧,开源社区十分期待您的参加。

退出移动版