关于大数据处理:一文读懂硬核-Apache-DolphinScheduler30-源码解析

42次阅读

共计 59276 个字符,预计需要花费 149 分钟才能阅读完成。

点亮 ⭐️ Star · 照亮开源之路

https://github.com/apache/dol…

本文目录

  • 1 DolphinScheduler 的设计与策略
  • 1.1 分布式设计
  • 1.1.1 中心化
  • 1.1.2 去中心化
  • 1.2 DophinScheduler 架构设计
  • 1.3 容错问题
  • 1.3.1 宕机容错
  • 1.3.2 失败重试
  • 1.4 近程日志拜访
  • 2 DolphinScheduler 源码剖析
  • 2.1 工程模块介绍与配置文件
  • 2.1.1 工程模块介绍
  • 2.1.2 配置文件
  • 2.2 Api 次要工作操作接口
  • 2.3 Quaterz 架构与运行流程
  • 2.3.1 概念与架构
  • 2.3.2 初始化与执行流程
  • 2.3.3 集群运行
  • 2.4 Master 启动与执行流程
  • 2.4.1 概念与执行逻辑
  • 2.4.2 集群与槽(slot)
  • 2.4.3 代码执行流程
  • 2.5 Work 启动与执行流程
  • 2.5.1 概念与执行逻辑
  • 2.5.2 代码执行流程
  • 2.6 rpc 交互
  • 2.6.1 Master 与 Worker 交互
  • 2.6.2 其余服务与 Master 交互
  • 2.7 负载平衡算法
  • 2.7.1 加权随机
  • 2.7.2 线性负载
  • 2.7.3 平滑轮询
  • 2.8 日志服务
  • 2.9 报警
  • 3 后记
  • 3.1 Make friends
  • 3.2 参考文献

前言

钻研 Apache Dolphinscheduler 也是机缘巧合,平时负责基于 xxl-job 二次开发进去的调度平台,因为遇到了并发性能瓶颈,到了不得不优化重构的境地,所以搜寻市面上利用较广的调度平台以借鉴优化思路。

在浏览完 DolphinScheduler 代码之后,便生出了将其设计与思考记录下来的念头,这便是此篇文章的起源。因为没有正式生产应用,业务了解不肯定透彻,了解可能有偏差,欢送大家交换探讨。

1 DolphinScheduler 的设计与策略

大家能关注 DolphinScheduler 那么肯定对调度零碎有了肯定的理解,对于调度所波及的到一些专有名词在这里就不做过多的介绍,重点介绍一下流程定义,流程实例,工作定义,工作实例。(没有作业这个概念的确也很离奇,可能是不想和 Quartz 的 JobDetail 重叠)。

  • 工作定义:各种类型的工作,是流程定义的要害组成,如 sql,shell,spark,mr,python 等;
  • 工作实例:工作的实例化,标识着具体的工作执行状态;
  • 流程定义:一组工作节点通过依赖关系建设的起来的有向无环图(DAG);
  • 流程实例:通过手动或者定时调度生成的流程实例;
  • 定时调度:零碎采纳 Quartz 散布式调度器,并同时反对 cron 表达式可视化的生成;

1.1 分布式设计

分布式系统的架构设计根本分为中心化和去中心化两种,各有优劣,凭借各自的业务抉择。

1.1.1 中心化

中心化设计比较简单,集群中的节点装置角色能够分为 Master 和 Slave 两种,如下图:

Master: Master 的角色次要负责工作散发并监督 Slave 的衰弱状态,能够动静的将工作平衡到 Slave 上,以至 Slave 节点不至于“忙死”或”闲死”的状态。

中心化设计存在一些问题。

第一点,一旦 Master 呈现了问题,则群龙无首,整个集群就会解体。

为了解决这个问题,大多数 Master/Slave 架构模式都采纳了主备 Master 的设计方案,能够是热备或者冷备,也能够是主动切换或手动切换,而且越来越多的新零碎都开始具备主动选举切换 Master 的能力, 以晋升零碎的可用性。

第二点,如果 Scheduler 在 Master 上,尽管能够反对一个 DAG 中不同的工作运行在不同的机器上,然而会产生 Master 的过负载。如果 Scheduler 在 Slave 上,一个 DAG 中所有的工作都只能在某一台机器上进行作业提交,在并行任务比拟多的时候,Slave 的压力可能会比拟大。

xxl-job 就是采纳这种设计形式,然而存在相应的问题。管理器 (admin) 宕机集群会解体,Scheduler 在管理器上,管理器负责所有工作的校验和散发,管理器存在过载的危险, 须要开发者想计划解决。

1.1.2 去中心化

在去中心化设计里,通常没有 Master/Slave 的概念,所有的角色都是一样的,位置是平等的,去中心化设计的外围设计在于整个分布式系统中不存在一个区别于其余节点的“管理者”,因而不存在单点故障问题。

但因为不存在“管理者”节点所以每个节点都须要跟其余节点通信才失去必须要的机器信息,而分布式系统通信的不可靠性,则大大增加了上述性能的实现难度。实际上,真正去中心化的分布式系统并不多见。

反而动静中心化分布式系统正在一直涌出。在这种架构下,集群中的管理者是被动静抉择进去的,而不是预置的,并且集群在产生故障的时候,集群的节点会自发的举办会议来选举新的管理者去主持工作。

个别都是基于 Raft 算法实现的选举策略。Raft 算法,目前社区也有相应的 PR,还没合并。

  • PR 链接:https://github.com/apache/dol…
  • 动静展现见链接:http://thesecretlivesofdata.com/

DolphinScheduler 的去中心化是 Master/Worker 注册到注册核心,实现 Master 集群和 Worker 集群无核心。

1.2 DophinScheduler 架构设计

顺手盗用一张官网的零碎架构图,能够看到调度零碎采纳去中心化设计,由 UI,API,MasterServer,Zookeeper,WorkServer,Alert 等几局部组成。

API: API 接口层,次要负责解决前端 UI 层的申请。该服务对立提供 RESTful api 向内部提供申请服务。接口包含工作流的创立、定义、查问、批改、公布、下线、手工启动、进行、暂停、复原、从该节点开始执行等等。

MasterServer: MasterServer 采纳分布式无核心设计理念,MasterServer 集成了 Quartz,次要负责 DAG 工作切分、工作提交监控,并同时监听其它 MasterServer 和 WorkerServer 的衰弱状态。MasterServer 服务启动时向 Zookeeper 注册长期节点,通过监听 Zookeeper 长期节点变动来进行容错解决。WorkServer:WorkerServer 也采纳分布式无核心设计理念,WorkerServer 次要负责工作的执行和提供日志服务。WorkerServer 服务启动时向 Zookeeper 注册长期节点,并维持心跳。

ZooKeeper: ZooKeeper 服务,零碎中的 MasterServer 和 WorkerServer 节点都通过 ZooKeeper 来进行集群治理和容错。另外零碎还基于 ZooKeeper 进行事件监听和分布式锁。

Alert:提供告警相干接口,接口次要包含两种类型的告警数据的存储、查问和告诉性能,反对丰盛的告警插件自在拓展配置。

1.3 容错问题

容错分为服务宕机容错和工作重试,服务宕机容错又分为 Master 容错和 Worker 容错两种状况;

1.3.1 宕机容错

服务容错设计依赖于 ZooKeeper 的 Watcher 机制,实现原理如图:

其中 Master 监控其余 Master 和 Worker 的目录,如果监听到 remove 事件,则会依据具体的业务逻辑进行流程实例容错或者工作实例容错,容错流程图绝对官网文档外面的流程图,人性化了些,大家能够参考一下,具体如下所示。

ZooKeeper Master 容错实现之后则从新由 DolphinScheduler 中 Scheduler 线程调度,遍历 DAG 找到“正在运行”和“提交胜利”的工作,对“正在运行”的工作监控其工作实例的状态,对“提交胜利”的工作须要判断 Task Queue 中是否曾经存在,如果存在则同样监控工作实例的状态,如果不存在则从新提交工作实例。

Master Scheduler 线程一旦发现工作实例为”须要容错”状态,则接管工作并进行从新提交。留神因为”网络抖动”可能会使得节点短时间内失去和 ZooKeeper 的心跳,从而产生节点的 remove 事件。

对于这种状况,咱们应用最简略的形式,那就是节点一旦和 ZooKeeper 产生超时连贯,则间接将 Master 或 Worker 服务停掉。

1.3.2 失败重试

这里首先要辨别工作失败重试、流程失败复原、流程失败重跑的概念:

  1. 工作失败重试是工作级别的,是调度零碎主动进行的,比方一个 Shell 工作设置重试次数为 3 次,那么在 Shell 工作运行失败后会本人再最多尝试运行 3 次。
  2. 流程失败复原是流程级别的,是手动进行的,复原是从只能从失败的节点开始执行或从以后节点开始执行。流程失败重跑也是流程级别的,是手动进行的,重跑是从开始节点进行。

接下来说正题,咱们将工作流中的工作节点分了两种类型。

  1. 一种是业务节点,这种节点都对应一个理论的脚本或者解决语句,比方 Shell 节点、MR 节点、Spark 节点、依赖节点等。
  2. 还有一种是逻辑节点,这种节点不做理论的脚本或语句解决,只是整个流程流转的逻辑解决,比方子流程节等。

每一个业务节点都能够配置失败重试的次数,当该工作节点失败,会主动重试,直到胜利或者超过配置的重试次数。逻辑节点不反对失败重试。然而逻辑节点里的工作反对重试。

如果工作流中有工作失败达到最大重试次数,工作流就会失败进行,失败的工作流能够手动进行重跑操作或者流程复原操作。

1.4 近程日志拜访

因为 Web(UI)和 Worker 不肯定在同一台机器上,所以查看日志不能像查问本地文件那样。

有两种计划:

  1. 将日志放到 ES 搜索引擎上;
  2. 通过 netty 通信获取近程日志信息;

介于思考到尽可能的 DolphinScheduler 的轻量级性,所以抉择了 RPC 实现近程拜访日志信息,具体代码的实际见 2.8 章节。

2 DolphinScheduler 源码剖析

上一章的解说可能初步看起来还不是很清晰,本章的次要目标是从代码层面一一介绍第一张解说的性能。对于零碎的装置在这里并不会波及,装置运行请大家自行摸索。

2.1 工程模块介绍与配置文件

2.1.1 工程模块介绍

  • dolphinscheduler-alert 告警模块,提供告警服务;
  • dolphinscheduler-api web 利用模块,提供 Rest Api 服务,供 UI 进行调用;
  • dolphinscheduler-common 通用的常量枚举、工具类、数据结构或者基类 dolphinscheduler-dao 提供数据库拜访等操作;
  • dolphinscheduler-remote 基于 netty 的客户端、服务端 ;
  • dolphinscheduler-server 日志与心跳服务 ;
  • dolphinscheduler-log-server LoggerServer 用于 Rest Api 通过 RPC 查看日志;
  • dolphinscheduler-master MasterServer 服务,次要负责 DAG 的切分和工作状态的监控 ;
  • dolphinscheduler-worker WorkerServer 服务,次要负责工作的提交、执行和工作状态的更新;
  • dolphinscheduler-service service 模块,蕴含 Quartz、Zookeeper、日志客户端拜访服务,便于 server 模块和 api 模块调用 ;
  • dolphinscheduler-ui 前端模块;

2.1.2 配置文件

dolphinscheduler-common common.properties

# 本地工作目录, 用于寄存临时文件
data.basedir.path=/tmp/dolphinscheduler
#资源文件存储类型: HDFS,S3,NONE
resource.storage.type=NONE
#资源文件存储门路
resource.upload.path=/dolphinscheduler
#hadoop 是否开启 kerberos 权限
hadoop.security.authentication.startup.state=false
#kerberos 配置目录
java.security.krb5.conf.path=/opt/krb5.conf
#kerberos 登录用户
login.user.keytab.username=hdfs-mycluster@ESZ.COM

#kerberos 登录用户 keytab
login.user.keytab.path=/opt/hdfs.headless.keytab

#kerberos 过期工夫, 整数, 单位为小时
kerberos.expire.time=2
#    如果存储类型为 HDFS, 须要配置领有对应操作权限的用户
hdfs.root.user=hdfs
#申请地址如果 resource.storage.type=S3, 该值相似为: s3a://dolphinscheduler. 如果 resource.storage.type=HDFS, 如果 hadoop 配置了 HA, 须要复制 core-site.xml 和 hdfs-site.xml 文件到 conf 目录
fs.defaultFS=hdfs://mycluster:8020
aws.access.key.id=minioadmin
aws.secret.access.key=minioadmin
aws.region=us-east-1
aws.endpoint=http://localhost:9000
# resourcemanager port, the default value is 8088 if not specified
resource.manager.httpaddress.port=8088
#yarn resourcemanager 地址, 如果 resourcemanager 开启了 HA, 输出 HA 的 IP 地址(以逗号分隔), 如果 resourcemanager 为单节点, 该值为空即可
yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
#如果 resourcemanager 开启了 HA 或者没有应用 resourcemanager, 放弃默认值即可. 如果 resourcemanager 为单节点, 你须要将 ds1 配置为 resourcemanager 对应的 hostname
yarn.application.status.address=http://ds1:%s/ws/v1/cluster/apps/%s
# job history status url when application number threshold is reached(default 10000, maybe it was set to 1000)
yarn.job.history.status.address=http://ds1:19888/ws/v1/history/mapreduce/jobs/%s

# datasource encryption enable
datasource.encryption.enable=false

# datasource encryption salt
datasource.encryption.salt=!@#$%^&*

# data quality option
data-quality.jar.name=dolphinscheduler-data-quality-dev-SNAPSHOT.jar

#data-quality.error.output.path=/tmp/data-quality-error-data

# Network IP gets priority, default inner outer

# Whether hive SQL is executed in the same session
support.hive.oneSession=false

# use sudo or not, if set true, executing user is tenant user and deploy user needs sudo permissions; if set false, executing user is the deploy user and doesn't need sudo permissions
sudo.enable=true

# network interface preferred like eth0, default: empty
#dolphin.scheduler.network.interface.preferred=

# network IP gets priority, default: inner outer
#dolphin.scheduler.network.priority.strategy=default

# system env path
#dolphinscheduler.env.path=dolphinscheduler_env.sh

#是否处于开发模式
development.state=false

# rpc port
alert.rpc.port=50052

# Url endpoint for zeppelin RESTful API
zeppelin.rest.url=http://localhost:8080

dolphinscheduler-api application.yaml

server:
  port: 12345
  servlet:
    session:
      timeout: 120m
    context-path: /dolphinscheduler/
  compression:
    enabled: true
    mime-types: text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json,application/xml
  jetty:
    max-http-form-post-size: 5000000

spring:
  application:
    name: api-server
  banner:
    charset: UTF-8
  jackson:
    time-zone: UTC
    date-format: "yyyy-MM-dd HH:mm:ss"
  servlet:
    multipart:
      max-file-size: 1024MB
      max-request-size: 1024MB
  messages:
    basename: i18n/messages
  datasource:
#    driver-class-name: org.postgresql.Driver
#    url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
    username: root
    password: root
    hikari:
      connection-test-query: select 1
      minimum-idle: 5
      auto-commit: true
      validation-timeout: 3000
      pool-name: DolphinScheduler
      maximum-pool-size: 50
      connection-timeout: 30000
      idle-timeout: 600000
      leak-detection-threshold: 0
      initialization-fail-timeout: 1
  quartz:
    auto-startup: false
    job-store-type: jdbc
    jdbc:
      initialize-schema: never
    properties:
      org.quartz.threadPool:threadPriority: 5
      org.quartz.jobStore.isClustered: true
      org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
      org.quartz.scheduler.instanceId: AUTO
      org.quartz.jobStore.tablePrefix: QRTZ_
      org.quartz.jobStore.acquireTriggersWithinLock: true
      org.quartz.scheduler.instanceName: DolphinScheduler
      org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
      org.quartz.jobStore.useProperties: false
      org.quartz.threadPool.makeThreadsDaemons: true
      org.quartz.threadPool.threadCount: 25
      org.quartz.jobStore.misfireThreshold: 60000
      org.quartz.scheduler.makeSchedulerThreadDaemon: true
#      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
      org.quartz.jobStore.clusterCheckinInterval: 5000

management:
  endpoints:
    web:
      exposure:
        include: '*'
  metrics:
    tags:
      application: ${spring.application.name}

registry:
  type: zookeeper
  zookeeper:
    namespace: dolphinscheduler
#    connect-string: localhost:2181
    connect-string: 10.255.158.70:2181
    retry-policy:
      base-sleep-time: 60ms
      max-sleep: 300ms
      max-retries: 5
    session-timeout: 30s
    connection-timeout: 9s
    block-until-connected: 600ms
    digest: ~

audit:
  enabled: false

metrics:
  enabled: true

python-gateway:
  # Weather enable python gateway server or not. The default value is true.
  enabled: true
  # The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different
  # between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost`
  gateway-server-address: 0.0.0.0
  # The port of Python gateway server start. Define which port you could connect to Python gateway server from
  # Python API side.
  gateway-server-port: 25333
  # The address of Python callback client.
  python-address: 127.0.0.1
  # The port of Python callback client.
  python-port: 25334
  # Close connection of socket server if no other request accept after x milliseconds. Define value is (0 = infinite),
  # and socket server would never close even though no requests accept
  connect-timeout: 0
  # Close each active connection of socket server if python program not active after x milliseconds. Define value is
  # (0 = infinite), and socket server would never close even though no requests accept
  read-timeout: 0

# Override by profile

---
spring:
  config:
    activate:
      on-profile: mysql
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
  quartz:
    properties:
      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate

dolphinscheduler-master application.yaml

spring:
  banner:
    charset: UTF-8
  application:
    name: master-server
  jackson:
    time-zone: UTC
    date-format: "yyyy-MM-dd HH:mm:ss"
  cache:
    # default enable cache, you can disable by `type: none`
    type: none
    cache-names:
      - tenant
      - user
      - processDefinition
      - processTaskRelation
      - taskDefinition
    caffeine:
      spec: maximumSize=100,expireAfterWrite=300s,recordStats
  datasource:
    #driver-class-name: org.postgresql.Driver
    #url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
    username: root
    password:
    hikari:
      connection-test-query: select 1
      minimum-idle: 5
      auto-commit: true
      validation-timeout: 3000
      pool-name: DolphinScheduler
      maximum-pool-size: 50
      connection-timeout: 30000
      idle-timeout: 600000
      leak-detection-threshold: 0
      initialization-fail-timeout: 1
  quartz:
    job-store-type: jdbc
    jdbc:
      initialize-schema: never
    properties:
      org.quartz.threadPool:threadPriority: 5
      org.quartz.jobStore.isClustered: true
      org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
      org.quartz.scheduler.instanceId: AUTO
      org.quartz.jobStore.tablePrefix: QRTZ_
      org.quartz.jobStore.acquireTriggersWithinLock: true
      org.quartz.scheduler.instanceName: DolphinScheduler
      org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
      org.quartz.jobStore.useProperties: false
      org.quartz.threadPool.makeThreadsDaemons: true
      org.quartz.threadPool.threadCount: 25
      org.quartz.jobStore.misfireThreshold: 60000
      org.quartz.scheduler.makeSchedulerThreadDaemon: true
#      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
      org.quartz.jobStore.clusterCheckinInterval: 5000

registry:
  type: zookeeper
  zookeeper:
    namespace: dolphinscheduler
#    connect-string: localhost:2181
    connect-string: 10.255.158.70:2181
    retry-policy:
      base-sleep-time: 60ms
      max-sleep: 300ms
      max-retries: 5
    session-timeout: 30s
    connection-timeout: 9s
    block-until-connected: 600ms
    digest: ~

master:
  listen-port: 5678
  # master fetch command num
  fetch-command-num: 10
  # master prepare execute thread number to limit handle commands in parallel
  pre-exec-threads: 10
  # master execute thread number to limit process instances in parallel
  exec-threads: 100
  # master dispatch task number per batch
  dispatch-task-number: 3
  # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
  host-selector: lower_weight
  # master heartbeat interval, the unit is second
  heartbeat-interval: 10
  # master commit task retry times
  task-commit-retry-times: 5
  # master commit task interval, the unit is millisecond
  task-commit-interval: 1000
  state-wheel-interval: 5
  # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2
  max-cpu-load-avg: -1
  # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
  reserved-memory: 0.3
  # failover interval, the unit is minute
  failover-interval: 10
  # kill yarn jon when failover taskInstance, default true
  kill-yarn-job-when-task-failover: true

server:
  port: 5679

management:
  endpoints:
    web:
      exposure:
        include: '*'
  metrics:
    tags:
      application: ${spring.application.name}

metrics:
  enabled: true

# Override by profile

---
spring:
  config:
    activate:
      on-profile: mysql
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
  quartz:
    properties:
      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate

dolphinscheduler-worker application.yaml

spring:
  banner:
    charset: UTF-8
  application:
    name: worker-server
  jackson:
    time-zone: UTC
    date-format: "yyyy-MM-dd HH:mm:ss"
  datasource:
    #driver-class-name: org.postgresql.Driver
    #url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
    username: root
    #password: root
    password:
    hikari:
      connection-test-query: select 1
      minimum-idle: 5
      auto-commit: true
      validation-timeout: 3000
      pool-name: DolphinScheduler
      maximum-pool-size: 50
      connection-timeout: 30000
      idle-timeout: 600000
      leak-detection-threshold: 0
      initialization-fail-timeout: 1

registry:
  type: zookeeper
  zookeeper:
    namespace: dolphinscheduler
#    connect-string: localhost:2181
    connect-string: 10.255.158.70:2181
    retry-policy:
      base-sleep-time: 60ms
      max-sleep: 300ms
      max-retries: 5
    session-timeout: 30s
    connection-timeout: 9s
    block-until-connected: 600ms
    digest: ~

worker:
  # worker listener port
  listen-port: 1234
  # worker execute thread number to limit task instances in parallel
  exec-threads: 100
  # worker heartbeat interval, the unit is second
  heartbeat-interval: 10
  # worker host weight to dispatch tasks, default value 100
  host-weight: 100
  # worker tenant auto create
  tenant-auto-create: true
  # worker max cpuload avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value -1: the number of cpu cores * 2
  max-cpu-load-avg: -1
  # worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G
  reserved-memory: 0.3
  # default worker groups separated by comma, like 'worker.groups=default,test'
  groups:
    - default
  # alert server listen host
  alert-listen-host: localhost
  alert-listen-port: 50052

server:
  port: 1235

management:
  endpoints:
    web:
      exposure:
        include: '*'
  metrics:
    tags:
      application: ${spring.application.name}

metrics:
  enabled: true

次要关注数据库,quartz, zookeeper, masker, worker 配置。

2.2 API 次要工作操作接口

其余业务接口能够不必关注,只须要关注最最次要的流程上线性能接口,此接口能够发散出所有的任务调度相干的代码。

接口:/dolphinscheduler/projects/{projectCode}/schedules/{id}/online;此接口会将定义的流程提交到 Quartz 调度框架;代码如下:

public Map<String, Object> setScheduleState(User loginUser,
                                                long projectCode,
                                                Integer id,
                                                ReleaseState scheduleStatus) {
        Map<String, Object> result = new HashMap<>();

        Project project = projectMapper.queryByCode(projectCode);
        // check project auth
        boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
        if (!hasProjectAndPerm) {
            return result;
        }

        // check schedule exists
        Schedule scheduleObj = scheduleMapper.selectById(id);

        if (scheduleObj == null) {
            putMsg(result, Status.SCHEDULE\_CRON\_NOT_EXISTS, id);
            return result;
        }
        // check schedule release state
        if (scheduleObj.getReleaseState() == scheduleStatus) {
            logger.info(“schedule release is already {},needn’t to change schedule id: {} from {} to {}”,
                    scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus);
            putMsg(result, Status.SCHEDULE\_CRON\_REALEASE\_NEED\_NOT_CHANGE, scheduleStatus);
            return result;
        }
        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
        if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
            putMsg(result, Status.PROCESS\_DEFINE\_NOT_EXIST, String.valueOf(scheduleObj.getProcessDefinitionCode()));
            return result;
        }
        List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());
        if (processTaskRelations.isEmpty()) {
            putMsg(result, Status.PROCESS\_DAG\_IS_EMPTY);
            return result;
        }
        if (scheduleStatus == ReleaseState.ONLINE) {
            // check process definition release state
            if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
                logger.info(“not release process definition id: {} , name : {}”,
                        processDefinition.getId(), processDefinition.getName());
                putMsg(result, Status.PROCESS\_DEFINE\_NOT_RELEASE, processDefinition.getName());
                return result;
            }
            // check sub process definition release state
            List<Long> subProcessDefineCodes = new ArrayList<>();
            processService.recurseFindSubProcess(processDefinition.getCode(), subProcessDefineCodes);
            if (!subProcessDefineCodes.isEmpty()) {
                List<ProcessDefinition> subProcessDefinitionList =
                        processDefinitionMapper.queryByCodes(subProcessDefineCodes);
                if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) {
                    for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) {
                        /**
                         * if there is no online process, exit directly
                         */
                        if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) {
                            logger.info(“not release process definition id: {} , name : {}”,
                                    subProcessDefinition.getId(), subProcessDefinition.getName());
                            putMsg(result, Status.PROCESS\_DEFINE\_NOT_RELEASE, String.valueOf(subProcessDefinition.getId()));
                            return result;
                        }
                    }
                }
            }
        }

        // check master server exists
        List<Server> masterServers = monitorService.getServerListFromRegistry(true);

        if (masterServers.isEmpty()) {
            putMsg(result, Status.MASTER\_NOT\_EXISTS);
            return result;
        }

        // set status
        scheduleObj.setReleaseState(scheduleStatus);

        scheduleMapper.updateById(scheduleObj);

        try {
            switch (scheduleStatus) {
                case ONLINE:
                    logger.info(“Call master client set schedule online, project id: {}, flow id: {},host: {}”, project.getId(), processDefinition.getId(), masterServers);
                    setSchedule(project.getId(), scheduleObj);
                    break;
                case OFFLINE:
                    logger.info(“Call master client set schedule offline, project id: {}, flow id: {},host: {}”, project.getId(), processDefinition.getId(), masterServers);
                    deleteSchedule(project.getId(), id);
                    break;
                default:
                    putMsg(result, Status.SCHEDULE\_STATUS\_UNKNOWN, scheduleStatus.toString());
                    return result;
            }
        } catch (Exception e) {
            result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? “set online failure” : “set offline failure”);
            throw new ServiceException(result.get(Constants.MSG).toString(), e);
        }

        putMsg(result, Status.SUCCESS);
        return result;
    }

public void setSchedule(int projectId, Schedule schedule) {logger.info("set schedule, project id: {}, scheduleId: {}", projectId, schedule.getId());

        quartzExecutor.addJob(ProcessScheduleJob.class, projectId, schedule);
    }
public void addJob(Class<? extends Job> clazz, int projectId, final Schedule schedule) {String jobName = this.buildJobName(schedule.getId());
        String jobGroupName = this.buildJobGroupName(projectId);

        Map<String, Object> jobDataMap = this.buildDataMap(projectId, schedule);
        String cronExpression = schedule.getCrontab();
        String timezoneId = schedule.getTimezoneId();

        /**
         * transform from server default timezone to schedule timezone
         * e.g. server default timezone is `UTC`
         * user set a schedule with startTime `2022-04-28 10:00:00`, timezone is `Asia/Shanghai`,
         * api skip to transform it and save into databases directly, startTime `2022-04-28 10:00:00`, timezone is `UTC`, which actually added 8 hours,
         * so when add job to quartz, it should recover by transform timezone
         */
        Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), timezoneId);
        Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), timezoneId);

        lock.writeLock().lock();
        try {JobKey jobKey = new JobKey(jobName, jobGroupName);
            JobDetail jobDetail;
            //add a task (if this task already exists, return this task directly)
            if (scheduler.checkExists(jobKey)) {jobDetail = scheduler.getJobDetail(jobKey);
                jobDetail.getJobDataMap().putAll(jobDataMap);
            } else {jobDetail = newJob(clazz).withIdentity(jobKey).build();

                jobDetail.getJobDataMap().putAll(jobDataMap);

                scheduler.addJob(jobDetail, false, true);

                logger.info("Add job, job name: {}, group name: {}",
                        jobName, jobGroupName);
            }

            TriggerKey triggerKey = new TriggerKey(jobName, jobGroupName);
            /*
             * Instructs the Scheduler that upon a mis-fire
             * situation, the CronTrigger wants to have it's
             * next-fire-time updated to the next time in the schedule after the
             * current time (taking into account any associated Calendar),
             * but it does not want to be fired now.
             */
            CronTrigger cronTrigger = newTrigger()
                    .withIdentity(triggerKey)
                    .startAt(startDate)
                    .endAt(endDate)
                    .withSchedule(cronSchedule(cronExpression)
                                    .withMisfireHandlingInstructionDoNothing()
                                    .inTimeZone(DateUtils.getTimezone(timezoneId))
                    )
                    .forJob(jobDetail).build();

            if (scheduler.checkExists(triggerKey)) {
                // updateProcessInstance scheduler trigger when scheduler cycle changes
                CronTrigger oldCronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
                String oldCronExpression = oldCronTrigger.getCronExpression();

                if (!StringUtils.equalsIgnoreCase(cronExpression, oldCronExpression)) {
                    // reschedule job trigger
                    scheduler.rescheduleJob(triggerKey, cronTrigger);
                    logger.info("reschedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
                            jobName, jobGroupName, cronExpression, startDate, endDate);
                }
            } else {scheduler.scheduleJob(cronTrigger);
                logger.info("schedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
                        jobName, jobGroupName, cronExpression, startDate, endDate);
            }

        } catch (Exception e) {throw new ServiceException("add job failed", e);
        } finally {lock.writeLock().unlock();}
    }

2.3 Quaterz 架构与运行流程

2.3.1 概念与架构

Quartz 框架次要包含如下几个局部:

  • SchedulerFactory:任务调度工厂,次要负责管理任务调度器;
  • Scheduler:任务调度器,次要负责任务调度,以及操作工作的相干接口;
  • Job:工作接口,实现类蕴含具体任务业务代码;
  • JobDetail:用于定义作业的实例;
  • Trigger:工作触发器,次要寄存 Job 执行的工夫策略。例如多久执行一次,什么时候执行,以什么频率执行等等;
  • JobBuilder:用于定义 / 构建 JobDetail 实例,用于定义作业的实例。
  • TriggerBuilder:用于定义 / 构建触发器实例;
  • Calendar:Trigger 扩大对象,能够排除或者蕴含某个指定的工夫点(如排除法定节假日);
  • JobStore:存储作业和任务调度期间的状态 Scheduler 的生命期,从 SchedulerFactory 创立它时开始,到 Scheduler 调用 Shutdown() 办法时完结;

Scheduler 被创立后,能够减少、删除和列举 Job 和 Trigger,以及执行其它与调度相干的操作(如暂停 Trigger)。但 Scheduler 只有在调用 start() 办法后,才会真正地触发 trigger(即执行 job)

2.3.2 初始化与执行流程

Quartz 的基本原理就是通过 Scheduler 来调度被 JobDetail 和 Trigger 定义的装置 Job 接口标准实现的自定义工作业务对象,来实现工作的调度。根本逻辑如下图:

代码时序图如下:

根本内容就是初始化任务调度容器 Scheduler,以及容器所需的线程池,数据交互对象 JobStore,工作解决线程 QuartzSchedulerThread 用来解决 Job 接口的具体业务实现类。

DolphinScheduler 的业务类是 ProcessScheduleJob,次要性能就是依据调度信息往 commond 表中写数据。

2.3.3 集群运行

须要留神的事:

  1. 当 Quartz 采纳集群模式部署的时候,存储介质不能应用内存的模式,也就是不能应用 JobStoreRAM。
  2. Quartz 集群对于对于须要被调度的 Triggers 实例的扫描是应用数据库锁 TRIGGER_ACCESS 来实现的,保障此扫描过程只能被一个 Quartz 实例获取到。代码如下:

  public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
        throws JobPersistenceException {
        
        String lockName;
        if(isAcquireTriggersWithinLock() || maxCount > 1) {
            lockName = LOCK\_TRIGGER\_ACCESS;
        } else {
            lockName = null;
        }
        return executeInNonManagedTXLock(lockName, 
                new TransactionCallback<List<OperableTrigger>>() {
                    public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
                        return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
                    }
                },
                new TransactionValidator<List<OperableTrigger>>() {
                    public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {
                        try {
                            List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
                            Set<String> fireInstanceIds = new HashSet<String>();
                            for (FiredTriggerRecord ft : acquired) {
                                fireInstanceIds.add(ft.getFireInstanceId());
                            }
                            for (OperableTrigger tr : result) {
                                if (fireInstanceIds.contains(tr.getFireInstanceId())) {
                                    return true;
                                }
                            }
                            return false;
                        } catch (SQLException e) {
                            throw new JobPersistenceException(“error validating trigger acquisition”, e);
                        }
                    }
                });
    }

3. 集群失败实例复原须要留神的是各个实例复原各自实例对应的异样实例,因为数据库有调度容器的 instanceId 信息。代码如下:

 protected void clusterRecover(Connection conn, List<SchedulerStateRecord> failedInstances)
        throws JobPersistenceException {if (failedInstances.size() > 0) {long recoverIds = System.currentTimeMillis();

            logWarnIfNonZero(failedInstances.size(),
                    "ClusterManager: detected" + failedInstances.size()
                            + "failed or restarted instances.");
            try {for (SchedulerStateRecord rec : failedInstances) {getLog().info(
                            "ClusterManager: Scanning for instance \""
                                    + rec.getSchedulerInstanceId()
                                    + "\"'s failed in-progress jobs.");

                    List<FiredTriggerRecord> firedTriggerRecs = getDelegate()
                            .selectInstancesFiredTriggerRecords(conn,
                                    rec.getSchedulerInstanceId());

                    int acquiredCount = 0;
                    int recoveredCount = 0;
                    int otherCount = 0;

                    Set<TriggerKey> triggerKeys = new HashSet<TriggerKey>();

                    for (FiredTriggerRecord ftRec : firedTriggerRecs) {TriggerKey tKey = ftRec.getTriggerKey();
                        JobKey jKey = ftRec.getJobKey();

                        triggerKeys.add(tKey);

                        // release blocked triggers..
                        if (ftRec.getFireInstanceState().equals(STATE_BLOCKED)) {getDelegate()
                                    .updateTriggerStatesForJobFromOtherState(
                                            conn, jKey,
                                            STATE_WAITING, STATE_BLOCKED);
                        } else if (ftRec.getFireInstanceState().equals(STATE_PAUSED_BLOCKED)) {getDelegate()
                                    .updateTriggerStatesForJobFromOtherState(
                                            conn, jKey,
                                            STATE_PAUSED, STATE_PAUSED_BLOCKED);
                        }

                        // release acquired triggers..
                        if (ftRec.getFireInstanceState().equals(STATE_ACQUIRED)) {getDelegate().updateTriggerStateFromOtherState(
                                    conn, tKey, STATE_WAITING,
                                    STATE_ACQUIRED);
                            acquiredCount++;
                        } else if (ftRec.isJobRequestsRecovery()) {
                            // handle jobs marked for recovery that were not fully
                            // executed..
                            if (jobExists(conn, jKey)) {@SuppressWarnings("deprecation")
                                SimpleTriggerImpl rcvryTrig = new SimpleTriggerImpl(
                                        "recover_"
                                                + rec.getSchedulerInstanceId()
                                                + "_"
                                                + String.valueOf(recoverIds++),
                                        Scheduler.DEFAULT_RECOVERY_GROUP,
                                        new Date(ftRec.getScheduleTimestamp()));
                                rcvryTrig.setJobName(jKey.getName());
                                rcvryTrig.setJobGroup(jKey.getGroup());
                                rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY);
                                rcvryTrig.setPriority(ftRec.getPriority());
                                JobDataMap jd = getDelegate().selectTriggerJobDataMap(conn, tKey.getName(), tKey.getGroup());
                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME, tKey.getName());
                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, tKey.getGroup());
                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getFireTimestamp()));
                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_SCHEDULED_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getScheduleTimestamp()));
                                rcvryTrig.setJobDataMap(jd);

                                rcvryTrig.computeFirstFireTime(null);
                                storeTrigger(conn, rcvryTrig, null, false,
                                        STATE_WAITING, false, true);
                                recoveredCount++;
                            } else {getLog()
                                        .warn(
                                                "ClusterManager: failed job'"
                                                        + jKey
                                                        + "'no longer exists, cannot schedule recovery.");
                                otherCount++;
                            }
                        } else {otherCount++;}

                        // free up stateful job's triggers
                        if (ftRec.isJobDisallowsConcurrentExecution()) {getDelegate()
                                    .updateTriggerStatesForJobFromOtherState(
                                            conn, jKey,
                                            STATE_WAITING, STATE_BLOCKED);
                            getDelegate()
                                    .updateTriggerStatesForJobFromOtherState(
                                            conn, jKey,
                                            STATE_PAUSED, STATE_PAUSED_BLOCKED);
                        }
                    }

                    getDelegate().deleteFiredTriggers(conn,
                            rec.getSchedulerInstanceId());

                    // Check if any of the fired triggers we just deleted were the last fired trigger
                    // records of a COMPLETE trigger.
                    int completeCount = 0;
                    for (TriggerKey triggerKey : triggerKeys) {if (getDelegate().selectTriggerState(conn, triggerKey).
                                equals(STATE_COMPLETE)) {
                            List<FiredTriggerRecord> firedTriggers =
                                    getDelegate().selectFiredTriggerRecords(conn, triggerKey.getName(), triggerKey.getGroup());
                            if (firedTriggers.isEmpty()) {if (removeTrigger(conn, triggerKey)) {completeCount++;}
                            }
                        }
                    }

                    logWarnIfNonZero(acquiredCount,
                            "ClusterManager: ......Freed" + acquiredCount
                                    + "acquired trigger(s).");
                    logWarnIfNonZero(completeCount,
                            "ClusterManager: ......Deleted" + completeCount
                                    + "complete triggers(s).");
                    logWarnIfNonZero(recoveredCount,
                            "ClusterManager: ......Scheduled" + recoveredCount
                                    + "recoverable job(s) for recovery.");
                    logWarnIfNonZero(otherCount,
                            "ClusterManager: ......Cleaned-up" + otherCount
                                    + "other failed job(s).");

                    if (!rec.getSchedulerInstanceId().equals(getInstanceId())) {getDelegate().deleteSchedulerState(conn,
                                rec.getSchedulerInstanceId());
                    }
                }
            } catch (Throwable e) {
                throw new JobPersistenceException("Failure recovering jobs:"
                        + e.getMessage(), e);
            }
        }
    }

2.4 Master 启动与执行流程

2.4.1 概念与执行逻辑

要害概念:

Quartz 相干:

  • Scheduler(任务调度容器,个别都是 StdScheduler 实例)。
  • ProcessScheduleJob:(实现 Quarts 调度框架的 Job 接口的业务类,专门生成 DolphinScheduler 数据库业务表 t\_ds\_commond 数据);

DolphinScheduler 相干:

  • NettyRemotingServer(netty 服务端,蕴含 netty 服务端 serverBootstrap 对象与 netty 服务端业务解决对象 serverHandler),NettyServerHandler:(netty 服务端业务解决类:蕴含各类处理器以及处理器对应的执行线程池);
  • TaskPluginManager(工作插件管理器,不同类型的工作以插件的模式治理,在应用服务启动的时候,通过 @AutoService 加载实现了 TaskChannelFactory 接口的工厂信息到数据库,通过工厂对象来加载各类 TaskChannel 实现类到缓存);
  • MasterRegistryClient(master 操作 zk 的客户端,封装了 master 对于 zk 的所有操作,注册,查问,删除等);
  • MasterSchedulerService(扫描服务,蕴含业务执行线程和 work 蕴含的 nettyhe 护短,负责任务调度业务,slot 来管制集群模式下工作不被反复调度,底层实现是 zookeeper 分布式锁);
  • WorkflowExecuteThread(真正的业务解决线程,通过插槽获取命令 commond,执行之前会校验 slot 的变动,如果变动不执行,要害性能就是构建工作相干的参数,定义,优先级等,而后发送到队列,供队列解决线程生产);
  • CommonTaskProcessor(一般工作处理器,实现 ITaskProcessor 接口,依据业务分为一般,依赖,子工作,阻塞,条件工作类型,蕴含了工作的提交,运行,散发,杀死等业务,通过 @AutoService 加载的类,根本就是封装了对);
  • TaskPriorityQueueImpl(工作队列,负责工作队列的存储管制);
  • TaskPriorityQueueConsumer(工作队列生产线程,负责工作的依据负载平衡策略在 worker 之间散发与执行);
  • ServerNodeManager(节点信息控制器,负责节点注册信息更新与槽位(slot)变更,底层实现是 zookeeper 分布式锁的利用);
  • EventExecuteService(事件处理线程,通过缓存起来的工作解决线程,解决每个工作在处理过程中注册在线程事件队列中的事件);
  • FailoverExecuteThread(故障转移线程,蕴含 Master 和 worker 的);
  • MasterRegistryDataListener(托管在 zk 治理框架 cautor 的故障监听器,负责对 worker 和 master 注册在 zk 上的节点的新增和删除)。

主节点容错代码如下,业务解释见 1.5.1Master 容错解释:

 private void failoverMasterWithLock(String masterHost) {String failoverPath = getFailoverLockPath(NodeType.MASTER, masterHost);
        try {registryClient.getLock(failoverPath);
            this.failoverMaster(masterHost);
        } catch (Exception e) {LOGGER.error("{} server failover failed, host:{}", NodeType.MASTER, masterHost, e);
        } finally {registryClient.releaseLock(failoverPath);
        }
    }
 /**
     * failover master
     * <p>
     * failover process instance and associated task instance
     * 故障转移流程实例和关联的工作实例
     * @param masterHost master host
     */
    private void failoverMaster(String masterHost) {if (StringUtils.isEmpty(masterHost)) {return;}
        Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost);
        long startTime = System.currentTimeMillis();
        List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
        LOGGER.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size());
        List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
        for (ProcessInstance processInstance : needFailoverProcessInstanceList) {if (Constants.NULL.equals(processInstance.getHost())) {continue;}

            List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
            for (TaskInstance taskInstance : validTaskInstanceList) {LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
                failoverTaskInstance(processInstance, taskInstance, workerServers);
            }

            if (serverStartupTime != null && processInstance.getRestartTime() != null
                && processInstance.getRestartTime().after(serverStartupTime)) {continue;}

            LOGGER.info("failover process instance id: {}", processInstance.getId());
            //updateProcessInstance host is null and insert into command
            processInstance.setHost(Constants.NULL);
            processService.processNeedFailoverProcessInstances(processInstance);
        }

        LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime);
    }

2.4.2 集群与槽(slot)

其实这里的采纳 Zookeer 分布式锁精确也不精确,为什么这么说,因为 Slot 是 CommondId 对 Master 列表长度取模来计算的,而 Master 列表长度的刷新是 Zookeeper 分布式锁来管制,Master 节点的调度数据扫描是通过 Slot 来管制的。

具体代码如下:

Slot 刷新

private void updateMasterNodes() {
        MASTER_SLOT = 0;
        MASTER_SIZE = 0;
        this.masterNodes.clear();
        String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;
        try {registryClient.getLock(nodeLock);
            Collection<String> currentNodes = registryClient.getMasterNodesDirectly();
            List<Server> masterNodes = registryClient.getServerList(NodeType.MASTER);
            syncMasterNodes(currentNodes, masterNodes);
        } catch (Exception e) {logger.error("update master nodes error", e);
        } finally {registryClient.releaseLock(nodeLock);
        }

    }
/**
     * sync master nodes
     *
     * @param nodes master nodes
     */
    private void syncMasterNodes(Collection<String> nodes, List<Server> masterNodes) {masterLock.lock();
        try {String addr = NetUtils.getAddr(NetUtils.getHost(), masterConfig.getListenPort());
            this.masterNodes.addAll(nodes);
            this.masterPriorityQueue.clear();
            this.masterPriorityQueue.putList(masterNodes);
            int index = masterPriorityQueue.getIndex(addr);
            if (index >= 0) {MASTER_SIZE = nodes.size();
                MASTER_SLOT = index;
            } else {logger.warn("current addr:{} is not in active master list", addr);
            }
            logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, addr);
        } finally {masterLock.unlock();
        }
    }

Slot 利用

/**
     * 1. get command by slot
     * 2. donot handle command if slot is empty
     */
    /** * 1. 通过插槽获取命令 * 2. 如果插槽为空,则不解决命令 */
    private void scheduleProcess() throws Exception {List<Command> commands = findCommands();
        if (CollectionUtils.isEmpty(commands)) {
            //indicate that no command ,sleep for 1s
            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
            return;
        }

        List<ProcessInstance> processInstances = command2ProcessInstance(commands);
        if (CollectionUtils.isEmpty(processInstances)) {return;}

        for (ProcessInstance processInstance : processInstances) {if (processInstance == null) {continue;}

            WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread(
                    processInstance
                    , processService
                    , nettyExecutorManager
                    , processAlertManager
                    , masterConfig
                    , stateWheelExecuteThread);

            this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread);
            if (processInstance.getTimeout() > 0) {stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
            }
            workflowExecuteThreadPool.startWorkflow(workflowExecuteThread);
        }
    }
private List<Command> findCommands() {
        int pageNumber = 0;
        int pageSize = masterConfig.getFetchCommandNum();
        List<Command> result = new ArrayList<>();
        if (Stopper.isRunning()) {int thisMasterSlot = ServerNodeManager.getSlot();
            int masterCount = ServerNodeManager.getMasterSize();
            if (masterCount > 0) {result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
            }
        }
        return result;
    }
@Override
    public List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) {if (masterCount <= 0) {return Lists.newArrayList();
        }
        return commandMapper.queryCommandPageBySlot(pageSize, pageNumber * pageSize, masterCount, thisMasterSlot);
    }
    
 <select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">
        select *
        from t_ds_command
        where id % #{masterCount} = #{thisMasterSlot}
        order by process_instance_priority, id asc
            limit #{limit} offset #{offset}
    </select>

## 槽位查看
 private List<ProcessInstance> command2ProcessInstance(List<Command> commands) {List<ProcessInstance> processInstances = Collections.synchronizedList(new ArrayList<>(commands.size()));
        CountDownLatch latch = new CountDownLatch(commands.size());
        for (final Command command : commands) {masterPrepareExecService.execute(() -> {
                try {
                    // slot check again
                    SlotCheckState slotCheckState = slotCheck(command);
                    if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) {logger.info("handle command {} skip, slot check state: {}", command.getId(), slotCheckState);
                        return;
                    }
                    ProcessInstance processInstance = processService.handleCommand(logger,
                            getLocalAddress(),
                            command);
                    if (processInstance != null) {processInstances.add(processInstance);
                        logger.info("handle command {} end, create process instance {}", command.getId(), processInstance.getId());
                    }
                } catch (Exception e) {logger.error("handle command error", e);
                    processService.moveToErrorCommand(command, e.toString());
                } finally {latch.countDown();
                }
            });
        }

        try {
            // make sure to finish handling command each time before next scan
            latch.await();} catch (InterruptedException e) {logger.error("countDownLatch await error", e);
        }

        return processInstances;
    }

private SlotCheckState slotCheck(Command command) {int slot = ServerNodeManager.getSlot();
        int masterSize = ServerNodeManager.getMasterSize();
        SlotCheckState state;
        if (masterSize <= 0) {state = SlotCheckState.CHANGE;} else if (command.getId() % masterSize == slot) {state = SlotCheckState.PASS;} else {state = SlotCheckState.INJECT;}
        return state;
    }

2.4.3 代码执行流程

代码过于繁琐,此处不再一一粘贴代码解释各个类的性能,自行看代码更加清晰。

2.5Worker 启动与执行流程

2.5.1 概念与执行逻辑

  • NettyRemotingServer(worker 蕴含的 netty 服务端)WorkerRegistryClient(zk 客户端,封装了 worker 与 zk 相干的操作,注册,查问,删除等);
  • TaskPluginManager(工作插件管理器,封装了插件加载逻辑和工作理论执行业务的形象);
  • WorkerManagerThread(工作工作线程生成器,生产 netty 处理器推动队列的工作信息,并生成工作执行线程提交线程池治理);
  • TaskExecuteProcessor(Netty 工作执行处理器,生成 master 散发到 work 的工作信息,并推送到队列);
  • TaskExecuteThread(工作执行线程);
  • TaskCallbackService(工作回调线程,与 master 蕴含的 netty client 通信);
  • AbstractTask(工作理论业务的抽象类,子类蕴含理论的工作执行业务,SqlTask,DataXTask 等);
  • RetryReportTaskStatusThread(不关注)

2.5.2 代码执行流程

Worker 节点代码时序图如下:

代码过于繁琐,此处不再一一粘贴代码解释各个类的性能,自行看代码更加清晰。

2.6 RPC 交互

因为节点和应用服务之间的 RPC 通信都是基于 Netty 实现的,Netty 相干常识不在这里过多的解说,以后章节只波及 Master 与 Worker 之间的交互模式的设计与实现。

整体设计如下

2.6.1 Master 与 Worker 交互

Master 与 worker 之间的业务逻辑的交互是基于 Netty 服务端与客户端来实现 Rpc 通信的,Master 和 Worker 启动的时候会将本人的 Netty 服务端信息注册到 ZK 相应的节点上,Master 的工作散发线程和工作杀死等业务运行时,拉取 ZK 上的 Worker 节点信息,依据负载平衡策略抉择一个节点(下章介绍负载平衡),构建 Netty 客户端与 Worker 的 Netty 服务端通信,Worker 收到 Master 的 RPC 申请之后会缓存 Channel 信息并解决对应业务,同时 Callback 回调线程会获取缓存的通道来执行回调操作,这样就造成的闭环。

工作的执行杀死,以及回调状态解决等操作都是通过 Netty 客户端与服务端绑定的 Processer 处理器来进行的。

Master 局部具体代码如下:

Master 启动的时候会初始化 Nettyserver,注册对应的申请处理器到 NettyHandler 并启动:

 @PostConstruct
    public void run() throws SchedulerException {
        // init remoting server
        NettyServerConfig serverConfig = new NettyServerConfig();
        serverConfig.setListenPort(masterConfig.getListenPort());
        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);

        // logger server
        this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);

        this.nettyRemotingServer.start();

        // install task plugin
        this.taskPluginManager.installPlugin();

        // self tolerant
        this.masterRegistryClient.init();
        this.masterRegistryClient.start();
        this.masterRegistryClient.setRegistryStoppable(this);

        this.masterSchedulerService.init();
        this.masterSchedulerService.start();

        this.eventExecuteService.start();
        this.failoverExecuteThread.start();

        this.scheduler.start();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {if (Stopper.isRunning()) {close("shutdownHook");
            }
        }));
    }
 /**
     * server start
     */
    public void start() {if (isStarted.compareAndSet(false, true)) {
            this.serverBootstrap
                    .group(this.bossGroup, this.workGroup)
                    .channel(NettyUtils.getServerSocketChannelClass())
                    .option(ChannelOption.SO_REUSEADDR, true)
                    .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
                    .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
                    .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
                    .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
                    .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) {initNettyChannel(ch);
                        }
                    });

            ChannelFuture future;
            try {future = serverBootstrap.bind(serverConfig.getListenPort()).sync();} catch (Exception e) {logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e);
                throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
            }
            if (future.isSuccess()) {logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
            } else if (future.cause() != null) {throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()), future.cause());
            } else {throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
            }
        }
    }

‍Master 的 NettyExecutorManager 初始化的时候会将 NettyRemotingClient 也初始化,并且会注册解决 Worker 回调申请的处理器,真正的端口绑定是在获取到执行器端口之后:

 /**
     * constructor
     */
    public NettyExecutorManager() {final NettyClientConfig clientConfig = new NettyClientConfig();
        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
    }
## 注册解决 worker 回调的处理器
    @PostConstruct
    public void init() {this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
    }
    
 public NettyRemotingClient(final NettyClientConfig clientConfig) {
        this.clientConfig = clientConfig;
        if (NettyUtils.useEpoll()) {this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {private final AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
                }
            });
        } else {this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {private final AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
                }
            });
        }
        this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
                new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),
                new CallerThreadExecutePolicy());
        this.clientHandler = new NettyClientHandler(this, callbackExecutor);

        this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor"));

        this.start();}
 /**
     * start
     */
 private void start() {

        this.bootstrap
                .group(this.workerGroup)
                .channel(NettyUtils.getSocketChannelClass())
                .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive())
                .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay())
                .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())
                .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis())
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) {ch.pipeline()
                                .addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS))
                                .addLast(new NettyDecoder(), clientHandler, encoder);
                    }
                });
        this.responseFutureExecutor.scheduleAtFixedRate(ResponseFuture::scanFutureTable, 5000, 1000, TimeUnit.MILLISECONDS);
        isStarted.compareAndSet(false, true);
    }

工作散发代码如下:

/**
     * task dispatch
     *
     * @param context context
     * @return result
     * @throws ExecuteException if error throws ExecuteException
     */
    public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
        /**
         * get executor manager
         */
        ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());
        if (executorManager == null) {throw new ExecuteException("no ExecutorManager for type :" + context.getExecutorType());
        }

        /**
         * host select
         */

        Host host = hostManager.select(context);
        if (StringUtils.isEmpty(host.getAddress())) {
            throw new ExecuteException(String.format("fail to execute : %s due to no suitable worker,"
                            + "current task needs worker group %s to execute",
                    context.getCommand(),context.getWorkerGroup()));
        }
        context.setHost(host);
        executorManager.beforeExecute(context);
        try {
            /**
             * task execute
             */
            return executorManager.execute(context);
        } finally {executorManager.afterExecute(context);
        }
    }


/**
     * execute logic
     *
     * @param context context
     * @return result
     * @throws ExecuteException if error throws ExecuteException
     */
    @Override
    public Boolean execute(ExecutionContext context) throws ExecuteException {

        /**
         *  all nodes
         */
        Set<String> allNodes = getAllNodes(context);

        /**
         * fail nodes
         */
        Set<String> failNodeSet = new HashSet<>();

        /**
         *  build command accord executeContext
         */
        Command command = context.getCommand();

        /**
         * execute task host
         */
        Host host = context.getHost();
        boolean success = false;
        while (!success) {
            try {doExecute(host, command);
                success = true;
                context.setHost(host);
            } catch (ExecuteException ex) {logger.error(String.format("execute command : %s error", command), ex);
                try {failNodeSet.add(host.getAddress());
                    Set<String> tmpAllIps = new HashSet<>(allNodes);
                    Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
                    if (remained != null && remained.size() > 0) {host = Host.of(remained.iterator().next());
                        logger.error("retry execute command : {} host : {}", command, host);
                    } else {throw new ExecuteException("fail after try all nodes");
                    }
                } catch (Throwable t) {throw new ExecuteException("fail after try all nodes");
                }
            }
        }

        return success;
    }


/**
     * execute logic
     *
     * @param host host
     * @param command command
     * @throws ExecuteException if error throws ExecuteException
     */
    public void doExecute(final Host host, final Command command) throws ExecuteException {
        /**
         * retry count,default retry 3
         */
        int retryCount = 3;
        boolean success = false;
        do {
            try {nettyRemotingClient.send(host, command);
                success = true;
            } catch (Exception ex) {logger.error(String.format("send command : %s to %s error", command, host), ex);
                retryCount--;
                ThreadUtils.sleep(100);
            }
        } while (retryCount >= 0 && !success);

        if (!success) {throw new ExecuteException(String.format("send command : %s to %s error", command, host));
        }
    }

  /**
     * send task
     *
     * @param host host
     * @param command command
     */
    public void send(final Host host, final Command command) throws RemotingException {Channel channel = getChannel(host);
        if (channel == null) {throw new RemotingException(String.format("connect to : %s fail", host));
        }
        try {ChannelFuture future = channel.writeAndFlush(command).await();
            if (future.isSuccess()) {logger.debug("send command : {} , to : {} successfully.", command, host.getAddress());
            } else {String msg = String.format("send command : %s , to :%s failed", command, host.getAddress());
                logger.error(msg, future.cause());
                throw new RemotingException(msg);
            }
        } catch (Exception e) {logger.error("Send command {} to address {} encounter error.", command, host.getAddress());
            throw new RemotingException(String.format("Send command : %s , to :%s encounter error", command, host.getAddress()), e);
        }
    }

Worker 局部具体代码如下:

同理 Woker 在启动的时候会初始化 NettyServer,注册对应处理器并启动:

/**
     * worker server run
     */
    @PostConstruct
    public void run() {
        // init remoting server
        NettyServerConfig serverConfig = new NettyServerConfig();
        serverConfig.setListenPort(workerConfig.getListenPort());
        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);

        // logger server
        this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);

        this.nettyRemotingServer.start();

        // install task plugin
        this.taskPluginManager.installPlugin();

        // worker registry
        try {this.workerRegistryClient.registry();
            this.workerRegistryClient.setRegistryStoppable(this);
            Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();

            this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
        } catch (Exception e) {logger.error(e.getMessage(), e);
            throw new RuntimeException(e);
        }

        // task execute manager
        this.workerManagerThread.start();

        // retry report task status
        this.retryReportTaskStatusThread.start();

        /*
         * registry hooks, which are called before the process exits
         */
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {if (Stopper.isRunning()) {close("shutdownHook");
            }
        }));
    }

回调线程对象初始化的时候,会将蕴含的 Nettyremotingclient 一起初始化,并注册好对应的业务处理器:

 public TaskCallbackService() {final NettyClientConfig clientConfig = new NettyClientConfig();
        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningProcessor);
        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
    }

回调线程会通过其余执行器中缓存下来的 Chanel 与 Master 的客户端进行通信:

/**
     * send result
     *
     * @param taskInstanceId taskInstanceId
     * @param command command
     */
    public void send(int taskInstanceId, Command command) {NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
        if (nettyRemoteChannel != null) {nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener() {

                @Override
                public void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()) {// remove(taskInstanceId);
                        return;
                    }
                }
            });
        }
    }

2.6.2 其余服务与 Master 交互

以日志服务为例,前端触发申请日志的接口,通过参数与数据库交互获取到 Master 的 NettyServer 信息,而后构建 Netty 客户端与 Master 进行通信获取日志并返回。具体代码如下

 public Result<String> queryLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
                                   @RequestParam(value = "taskInstanceId") int taskInstanceId,
                                   @RequestParam(value = "skipLineNum") int skipNum,
                                   @RequestParam(value = "limit") int limit) {return loggerService.queryLog(taskInstanceId, skipNum, limit);
    }
 /**
     * view log
     *
     * @param taskInstId task instance id
     * @param skipLineNum skip line number
     * @param limit limit
     * @return log string data
     */
    @Override
    @SuppressWarnings("unchecked")
    public Result<String> queryLog(int taskInstId, int skipLineNum, int limit) {TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);

        if (taskInstance == null) {return Result.error(Status.TASK_INSTANCE_NOT_FOUND);
        }
        if (StringUtils.isBlank(taskInstance.getHost())) {return Result.error(Status.TASK_INSTANCE_HOST_IS_NULL);
        }
        Result<String> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
        String log = queryLog(taskInstance,skipLineNum,limit);
        result.setData(log);
        return result;
    }
/**
     * query log
     *
     * @param taskInstance  task instance
     * @param skipLineNum skip line number
     * @param limit       limit
     * @return log string data
     */
    private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {Host host = Host.of(taskInstance.getHost());

        logger.info("log host : {} , logPath : {} , port : {}", host.getIp(), taskInstance.getLogPath(),
                host.getPort());

        StringBuilder log = new StringBuilder();
        if (skipLineNum == 0) {
            String head = String.format(LOG_HEAD_FORMAT,
                    taskInstance.getLogPath(),
                    host,
                    Constants.SYSTEM_LINE_SEPARATOR);
            log.append(head);
        }

        log.append(logClient
                .rollViewLog(host.getIp(), host.getPort(), taskInstance.getLogPath(), skipLineNum, limit));

        return log.toString();}
 /**
     * roll view log
     *
     * @param host host
     * @param port port
     * @param path path
     * @param skipLineNum skip line number
     * @param limit limit
     * @return log content
     */
    public String rollViewLog(String host, int port, String path, int skipLineNum, int limit) {logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, skipLineNum, limit);
        RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);
        String result = "";
        final Host address = new Host(host, port);
        try {Command command = request.convert2Command();
            Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);
            if (response != null) {
                RollViewLogResponseCommand rollReviewLog = JSONUtils.parseObject(response.getBody(), RollViewLogResponseCommand.class);
                return rollReviewLog.getMsg();}
        } catch (Exception e) {logger.error("roll view log error", e);
        } finally {this.client.closeChannel(address);
        }
        return result;
    }
 /**
     * sync send
     *
     * @param host host
     * @param command command
     * @param timeoutMillis timeoutMillis
     * @return command
     */
    public Command sendSync(final Host host, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException {final Channel channel = getChannel(host);
        if (channel == null) {throw new RemotingException(String.format("connect to : %s fail", host));
        }
        final long opaque = command.getOpaque();
        final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
        channel.writeAndFlush(command).addListener(future -> {if (future.isSuccess()) {responseFuture.setSendOk(true);
                return;
            } else {responseFuture.setSendOk(false);
            }
            responseFuture.setCause(future.cause());
            responseFuture.putResponse(null);
            logger.error("send command {} to host {} failed", command, host);
        });
        /*
         * sync wait for result
         */
        Command result = responseFuture.waitResponse();
        if (result == null) {if (responseFuture.isSendOK()) {throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());
            } else {throw new RemotingException(host.toString(), responseFuture.getCause());
            }
        }
        return result;
    }

Nettyclient 随着日志业务对象初始化而初始化:

 /**
     * construct client
     */
    public LogClientService() {this.clientConfig = new NettyClientConfig();
        this.clientConfig.setWorkerThreads(4);
        this.client = new NettyRemotingClient(clientConfig);
        this.isRunning = true;
    }

2.7 负载平衡算法

Master 在抉择执行器的时候 DolphinScheduler 提供了三种负载平衡算法,且所有的算法都用到了节点权重:加权随机(random),平滑轮询(roundrobin),线性负载(lowerweight)。通过配置文件来管制到底应用哪一个负载平衡策略,默认配置是权重策略:host-selector: lower_weight。

@Bean
    public HostManager hostManager() {HostSelector selector = masterConfig.getHostSelector();
        HostManager hostManager;
        switch (selector) {
            case RANDOM:
                hostManager = new RandomHostManager();
                break;
            case ROUND_ROBIN:
                hostManager = new RoundRobinHostManager();
                break;
            case LOWER_WEIGHT:
                hostManager = new LowerWeightHostManager();
                break;
            default:
                throw new IllegalArgumentException("unSupport selector" + selector);
        }
        beanFactory.autowireBean(hostManager);
        return hostManager;
    }

2.7.1 加权随机

看代码更好了解:依照全副权重值求和,而后取汇总后果的随机整数,随机整数对原先所有 host 的权重累差,返回小于零的时候的 host,没有就随机返回一个。

  @Override
    public HostWorker doSelect(final Collection<HostWorker> source) {List<HostWorker> hosts = new ArrayList<>(source);
        int size = hosts.size();
        int[] weights = new int[size];
        int totalWeight = 0;
        int index = 0;

        for (HostWorker host : hosts) {totalWeight += host.getHostWeight();
            weights[index] = host.getHostWeight();
            index++;
        }

        if (totalWeight > 0) {int offset = ThreadLocalRandom.current().nextInt(totalWeight);

            for (int i = 0; i < size; i++) {offset -= weights[i];
                if (offset < 0) {return hosts.get(i);
                }
            }
        }
        return hosts.get(ThreadLocalRandom.current().nextInt(size));
    }

2.7.2 线性负载

权重计算逻辑:利用注册的 Cpu 占用、内存占用以及加载因子还有启动工夫耗费做计算。

private double calculateWeight(double cpu, double memory, double loadAverage, long startTime) {
        double calculatedWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR;
        long uptime = System.currentTimeMillis() - startTime;
        if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
            // If the warm-up is not over, add the weight
            return calculatedWeight * Constants.WARM_UP_TIME / uptime;
        }
        return calculatedWeight;
    }

获取权重最小的节点,并把节点权重置为最大。

/**
     * select
     *
     * @param sources sources
     * @return HostWeight
     */
    @Override
    public HostWeight doSelect(Collection<HostWeight> sources) {
        double totalWeight = 0;
        double lowWeight = 0;
        HostWeight lowerNode = null;
        for (HostWeight hostWeight : sources) {totalWeight += hostWeight.getWeight();
            hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());
            if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) {
                lowerNode = hostWeight;
                lowWeight = hostWeight.getCurrentWeight();}
        }
        lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);
        return lowerNode;

    }

2.7.3 平滑轮询

这个算法不是很好的可能了解,所以我不晓得我的了解是否正确,它有一个预热的过程,之前都是取第一个,等到累计的权重超过最大就整数就开始按权重轮询。

 @Override
    public HostWorker doSelect(Collection<HostWorker> source) {List<HostWorker> hosts = new ArrayList<>(source);
        String key = hosts.get(0).getWorkerGroup();
        ConcurrentMap<String, WeightedRoundRobin> map = workGroupWeightMap.get(key);
        if (map == null) {workGroupWeightMap.putIfAbsent(key, new ConcurrentHashMap<>());
            map = workGroupWeightMap.get(key);
        }

        int totalWeight = 0;
        long maxCurrent = Long.MIN_VALUE;
        long now = System.currentTimeMillis();
        HostWorker selectedHost = null;
        WeightedRoundRobin selectWeightRoundRobin = null;

        for (HostWorker host : hosts) {String workGroupHost = host.getWorkerGroup() + host.getAddress();
            WeightedRoundRobin weightedRoundRobin = map.get(workGroupHost);
            int weight = host.getHostWeight();
            if (weight < 0) {weight = 0;}

            if (weightedRoundRobin == null) {weightedRoundRobin = new WeightedRoundRobin();
                // set weight
                weightedRoundRobin.setWeight(weight);
                map.putIfAbsent(workGroupHost, weightedRoundRobin);
                weightedRoundRobin = map.get(workGroupHost);
            }
            if (weight != weightedRoundRobin.getWeight()) {weightedRoundRobin.setWeight(weight);
            }

            long cur = weightedRoundRobin.increaseCurrent();
            weightedRoundRobin.setLastUpdate(now);
            if (cur > maxCurrent) {
                maxCurrent = cur;
                selectedHost = host;
                selectWeightRoundRobin = weightedRoundRobin;
            }

            totalWeight += weight;
        }

        if (!updateLock.get() && hosts.size() != map.size() && updateLock.compareAndSet(false, true)) {
            try {ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
                newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
                workGroupWeightMap.put(key, newMap);
            } finally {updateLock.set(false);
            }
        }

        if (selectedHost != null) {selectWeightRoundRobin.sel(totalWeight);
            return selectedHost;
        }

        return hosts.get(0);
    }

2.8 日志服务

2.6.2 曾经介绍不在做过多的阐明。

2.9 报警

暂未钻研,目测根本就是依据规定筛选数据,而后调用指定类型的报警服务接口做报警操作,比方邮件,微信,短信告诉等。

3 后记

3.1 Make friends

因为没有正式生产应用,业务了解不肯定透彻,了解可能有偏差,欢送大家一起进入社区交换探讨。

Apache DolphinScheduler Slack 群链接:https://join.slack.com/t/asf-…\_invite/zt-1e36toy4n-5n9U2R\_\_FDM05R~MJFFVBg

3.2 参考文献

  1. https://dolphinscheduler.apac…;
  2. https://juejin.cn/post/684490…;
  3. https://www.w3cschool.cn/quartz\_doc/quartz\_doc-1xbu2clr.html.

最初,感激社区蔡顺峰、钟嘉杰和阮文俊对本文整顿和批改提出建设性意见,以及对本文公布提供的帮忙。

十分欢送大家退出 DolphinScheduler 小家庭,融入开源世界!

咱们激励任何模式的参加社区,最终成为 Committer 或 PPMC,如:

  • 将遇到的问题通过 GitHub 上 issue 的模式反馈进去。
  • 答复他人遇到的 issue 问题。
  • 帮忙欠缺文档。
  • 帮忙我的项目减少测试用例。
  • 为代码增加正文。
  • 提交修复 Bug 或者 Feature 的 PR。
  • 发表利用案例实际、调度流程剖析或者与调度相干的技术文章。
  • 帮忙推广 DolphinScheduler,参加技术大会或者 meetup 的分享等。

欢送退出奉献的队伍,退出开源从提交第一个 PR 开始。

  • 比方增加代码正文或找到带有”easy to fix”标记或一些非常简单的 issue(拼写错误等) 等等,先通过第一个简略的 PR 相熟提交流程。

注:奉献不仅仅限于 PR 哈,对促成我的项目倒退的都是奉献。

置信参加 DolphinScheduler,肯定会让您从开源中受害!

参加奉献

随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真挚欢送酷爱开源的搭档退出到开源社区中来,为中国开源崛起献上一份本人的力量,让外乡开源走向寰球。

参加 DolphinScheduler 社区有十分多的参加奉献的形式,包含:

奉献第一个 PR(文档、代码) 咱们也心愿是简略的,第一个 PR 用于相熟提交的流程和社区合作以及感触社区的友好度。

社区汇总了以下适宜老手的问题列表:https://github.com/apache/dol…

非老手问题列表:https://github.com/apache/dol…

如何参加奉献链接:https://dolphinscheduler.apac…

来吧,DolphinScheduler 开源社区须要您的参加,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是微小的。

参加开源能够近距离与各路高手切磋,迅速晋升本人的技能,如果您想参加奉献,咱们有个贡献者种子孵化群,能够增加社区小助手,手把手教会您(贡献者不分程度高下,有问必答,要害是有一颗违心奉献的心)。

增加小助手微信时请阐明想参加奉献。来吧,开源社区十分期待您的参加。

正文完
 0