点亮 ⭐️ Star · 照亮开源之路

https://github.com/apache/dol...

本文目录

  • 1 DolphinScheduler的设计与策略
  • 1.1 分布式设计
  • 1.1.1 中心化
  • 1.1.2 去中心化
  • 1.2 DophinScheduler架构设计
  • 1.3 容错问题
  • 1.3.1 宕机容错
  • 1.3.2 失败重试
  • 1.4 近程日志拜访
  • 2 DolphinScheduler源码剖析
  • 2.1 工程模块介绍与配置文件
  • 2.1.1 工程模块介绍
  • 2.1.2 配置文件
  • 2.2 Api次要工作操作接口
  • 2.3 Quaterz架构与运行流程
  • 2.3.1 概念与架构
  • 2.3.2 初始化与执行流程
  • 2.3.3 集群运行
  • 2.4 Master启动与执行流程
  • 2.4.1 概念与执行逻辑
  • 2.4.2 集群与槽(slot)
  • 2.4.3 代码执行流程
  • 2.5 Work启动与执行流程
  • 2.5.1 概念与执行逻辑
  • 2.5.2 代码执行流程
  • 2.6 rpc交互
  • 2.6.1 Master与Worker交互
  • 2.6.2 其余服务与Master交互
  • 2.7 负载平衡算法
  • 2.7.1 加权随机
  • 2.7.2 线性负载
  • 2.7.3 平滑轮询
  • 2.8 日志服务
  • 2.9 报警
  • 3 后记
  • 3.1 Make friends
  • 3.2 参考文献

前言

钻研Apache Dolphinscheduler也是机缘巧合,平时负责基于xxl-job二次开发进去的调度平台,因为遇到了并发性能瓶颈,到了不得不优化重构的境地,所以搜寻市面上利用较广的调度平台以借鉴优化思路。

在浏览完DolphinScheduler代码之后,便生出了将其设计与思考记录下来的念头,这便是此篇文章的起源。因为没有正式生产应用,业务了解不肯定透彻,了解可能有偏差,欢送大家交换探讨。

1 DolphinScheduler的设计与策略

大家能关注DolphinScheduler那么肯定对调度零碎有了肯定的理解,对于调度所波及的到一些专有名词在这里就不做过多的介绍,重点介绍一下流程定义,流程实例,工作定义,工作实例。(没有作业这个概念的确也很离奇,可能是不想和Quartz的JobDetail重叠)。

  • 工作定义:各种类型的工作,是流程定义的要害组成,如sql,shell,spark,mr,python等;
  • 工作实例:工作的实例化,标识着具体的工作执行状态;
  • 流程定义:一组工作节点通过依赖关系建设的起来的有向无环图(DAG);
  • 流程实例:通过手动或者定时调度生成的流程实例;
  • 定时调度:零碎采纳Quartz 散布式调度器,并同时反对cron表达式可视化的生成;

1.1 分布式设计

分布式系统的架构设计根本分为中心化和去中心化两种,各有优劣,凭借各自的业务抉择。

1.1.1 中心化

中心化设计比较简单,集群中的节点装置角色能够分为Master和Slave两种,如下图:

Master: Master的角色次要负责工作散发并监督Slave的衰弱状态,能够动静的将工作平衡到Slave上,以至Slave节点不至于“忙死”或”闲死”的状态。

中心化设计存在一些问题。

第一点,一旦Master呈现了问题,则群龙无首,整个集群就会解体。

为了解决这个问题,大多数Master/Slave架构模式都采纳了主备Master的设计方案,能够是热备或者冷备,也能够是主动切换或手动切换,而且越来越多的新零碎都开始具备主动选举切换Master的能力,以晋升零碎的可用性。

第二点,如果Scheduler在Master上,尽管能够反对一个DAG中不同的工作运行在不同的机器上,然而会产生Master的过负载。如果Scheduler在Slave上,一个DAG中所有的工作都只能在某一台机器上进行作业提交,在并行任务比拟多的时候,Slave的压力可能会比拟大。

xxl-job就是采纳这种设计形式,然而存在相应的问题。管理器(admin)宕机集群会解体,Scheduler在管理器上,管理器负责所有工作的校验和散发,管理器存在过载的危险,须要开发者想计划解决。

1.1.2 去中心化

在去中心化设计里,通常没有Master/Slave的概念,所有的角色都是一样的,位置是平等的,去中心化设计的外围设计在于整个分布式系统中不存在一个区别于其余节点的“管理者”,因而不存在单点故障问题。

但因为不存在“管理者”节点所以每个节点都须要跟其余节点通信才失去必须要的机器信息,而分布式系统通信的不可靠性,则大大增加了上述性能的实现难度。实际上,真正去中心化的分布式系统并不多见。

反而动静中心化分布式系统正在一直涌出。在这种架构下,集群中的管理者是被动静抉择进去的,而不是预置的,并且集群在产生故障的时候,集群的节点会自发的举办会议来选举新的管理者去主持工作。

个别都是基于Raft算法实现的选举策略。Raft算法,目前社区也有相应的PR,还没合并。

  • PR链接:https://github.com/apache/dol...
  • 动静展现见链接:http://thesecretlivesofdata.com/

DolphinScheduler的去中心化是Master/Worker注册到注册核心,实现Master集群和Worker集群无核心。

1.2 DophinScheduler架构设计

顺手盗用一张官网的零碎架构图,能够看到调度零碎采纳去中心化设计,由UI,API,MasterServer,Zookeeper,WorkServer,Alert等几局部组成。

API: API接口层,次要负责解决前端UI层的申请。该服务对立提供RESTful api向内部提供申请服务。接口包含工作流的创立、定义、查问、批改、公布、下线、手工启动、进行、暂停、复原、从该节点开始执行等等。

MasterServer: MasterServer采纳分布式无核心设计理念,MasterServer集成了Quartz,次要负责 DAG 工作切分、工作提交监控,并同时监听其它MasterServer和WorkerServer的衰弱状态。MasterServer服务启动时向Zookeeper注册长期节点,通过监听Zookeeper长期节点变动来进行容错解决。WorkServer:WorkerServer也采纳分布式无核心设计理念,WorkerServer次要负责工作的执行和提供日志服务。WorkerServer服务启动时向Zookeeper注册长期节点,并维持心跳。

ZooKeeper: ZooKeeper服务,零碎中的MasterServer和WorkerServer节点都通过ZooKeeper来进行集群治理和容错。另外零碎还基于ZooKeeper进行事件监听和分布式锁。

Alert:提供告警相干接口,接口次要包含两种类型的告警数据的存储、查问和告诉性能,反对丰盛的告警插件自在拓展配置。

1.3 容错问题

容错分为服务宕机容错和工作重试,服务宕机容错又分为Master容错和Worker容错两种状况;

1.3.1 宕机容错

服务容错设计依赖于ZooKeeper的Watcher机制,实现原理如图:

其中Master监控其余Master和Worker的目录,如果监听到remove事件,则会依据具体的业务逻辑进行流程实例容错或者工作实例容错,容错流程图绝对官网文档外面的流程图,人性化了些,大家能够参考一下,具体如下所示。

ZooKeeper Master容错实现之后则从新由DolphinScheduler中Scheduler线程调度,遍历 DAG 找到“正在运行”和“提交胜利”的工作,对“正在运行”的工作监控其工作实例的状态,对“提交胜利”的工作须要判断Task Queue中是否曾经存在,如果存在则同样监控工作实例的状态,如果不存在则从新提交工作实例。

Master Scheduler线程一旦发现工作实例为” 须要容错”状态,则接管工作并进行从新提交。留神因为” 网络抖动”可能会使得节点短时间内失去和ZooKeeper的心跳,从而产生节点的remove事件。

对于这种状况,咱们应用最简略的形式,那就是节点一旦和ZooKeeper产生超时连贯,则间接将Master或Worker服务停掉。

1.3.2 失败重试

这里首先要辨别工作失败重试、流程失败复原、流程失败重跑的概念:

  1. 工作失败重试是工作级别的,是调度零碎主动进行的,比方一个Shell工作设置重试次数为3次,那么在Shell工作运行失败后会本人再最多尝试运行3次。
  2. 流程失败复原是流程级别的,是手动进行的,复原是从只能从失败的节点开始执行或从以后节点开始执行。流程失败重跑也是流程级别的,是手动进行的,重跑是从开始节点进行。

接下来说正题,咱们将工作流中的工作节点分了两种类型。

  1. 一种是业务节点,这种节点都对应一个理论的脚本或者解决语句,比方Shell节点、MR节点、Spark节点、依赖节点等。
  2. 还有一种是逻辑节点,这种节点不做理论的脚本或语句解决,只是整个流程流转的逻辑解决,比方子流程节等。

每一个业务节点都能够配置失败重试的次数,当该工作节点失败,会主动重试,直到胜利或者超过配置的重试次数。逻辑节点不反对失败重试。然而逻辑节点里的工作反对重试。

如果工作流中有工作失败达到最大重试次数,工作流就会失败进行,失败的工作流能够手动进行重跑操作或者流程复原操作。

1.4 近程日志拜访

因为Web(UI)和Worker不肯定在同一台机器上,所以查看日志不能像查问本地文件那样。

有两种计划:

  1. 将日志放到ES搜索引擎上;
  2. 通过netty通信获取近程日志信息;

介于思考到尽可能的DolphinScheduler的轻量级性,所以抉择了RPC实现近程拜访日志信息,具体代码的实际见2.8章节。

2 DolphinScheduler源码剖析

上一章的解说可能初步看起来还不是很清晰,本章的次要目标是从代码层面一一介绍第一张解说的性能。对于零碎的装置在这里并不会波及,装置运行请大家自行摸索。

2.1 工程模块介绍与配置文件

2.1.1 工程模块介绍

  • dolphinscheduler-alert 告警模块,提供告警服务;
  • dolphinscheduler-api web利用模块,提供 Rest Api 服务,供 UI 进行调用;
  • dolphinscheduler-common 通用的常量枚举、工具类、数据结构或者基类 dolphinscheduler-dao 提供数据库拜访等操作;
  • dolphinscheduler-remote 基于netty的客户端、服务端 ;
  • dolphinscheduler-server 日志与心跳服务 ;
  • dolphinscheduler-log-server LoggerServer 用于Rest Api通过RPC查看日志;
  • dolphinscheduler-master MasterServer服务,次要负责 DAG 的切分和工作状态的监控 ;
  • dolphinscheduler-worker WorkerServer服务,次要负责工作的提交、执行和工作状态的更新;
  • dolphinscheduler-service service模块,蕴含Quartz、Zookeeper、日志客户端拜访服务,便于server模块和api模块调用 ;
  • dolphinscheduler-ui 前端模块;

2.1.2 配置文件

dolphinscheduler-common common.properties

#本地工作目录,用于寄存临时文件data.basedir.path=/tmp/dolphinscheduler#资源文件存储类型: HDFS,S3,NONEresource.storage.type=NONE#资源文件存储门路resource.upload.path=/dolphinscheduler#hadoop是否开启kerberos权限hadoop.security.authentication.startup.state=false#kerberos配置目录java.security.krb5.conf.path=/opt/krb5.conf#kerberos登录用户login.user.keytab.username=hdfs-mycluster@ESZ.COM#kerberos登录用户keytablogin.user.keytab.path=/opt/hdfs.headless.keytab#kerberos过期工夫,整数,单位为小时kerberos.expire.time=2#    如果存储类型为HDFS,须要配置领有对应操作权限的用户hdfs.root.user=hdfs#申请地址如果resource.storage.type=S3,该值相似为: s3a://dolphinscheduler. 如果resource.storage.type=HDFS, 如果 hadoop 配置了 HA,须要复制core-site.xml 和 hdfs-site.xml 文件到conf目录fs.defaultFS=hdfs://mycluster:8020aws.access.key.id=minioadminaws.secret.access.key=minioadminaws.region=us-east-1aws.endpoint=http://localhost:9000# resourcemanager port, the default value is 8088 if not specifiedresource.manager.httpaddress.port=8088#yarn resourcemanager 地址, 如果resourcemanager开启了HA, 输出HA的IP地址(以逗号分隔),如果resourcemanager为单节点, 该值为空即可yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx#如果resourcemanager开启了HA或者没有应用resourcemanager,放弃默认值即可. 如果resourcemanager为单节点,你须要将ds1 配置为resourcemanager对应的hostnameyarn.application.status.address=http://ds1:%s/ws/v1/cluster/apps/%s# job history status url when application number threshold is reached(default 10000, maybe it was set to 1000)yarn.job.history.status.address=http://ds1:19888/ws/v1/history/mapreduce/jobs/%s# datasource encryption enabledatasource.encryption.enable=false# datasource encryption saltdatasource.encryption.salt=!@#$%^&*# data quality optiondata-quality.jar.name=dolphinscheduler-data-quality-dev-SNAPSHOT.jar#data-quality.error.output.path=/tmp/data-quality-error-data# Network IP gets priority, default inner outer# Whether hive SQL is executed in the same sessionsupport.hive.oneSession=false# use sudo or not, if set true, executing user is tenant user and deploy user needs sudo permissions; if set false, executing user is the deploy user and doesn't need sudo permissionssudo.enable=true# network interface preferred like eth0, default: empty#dolphin.scheduler.network.interface.preferred=# network IP gets priority, default: inner outer#dolphin.scheduler.network.priority.strategy=default# system env path#dolphinscheduler.env.path=dolphinscheduler_env.sh#是否处于开发模式development.state=false# rpc portalert.rpc.port=50052# Url endpoint for zeppelin RESTful APIzeppelin.rest.url=http://localhost:8080

dolphinscheduler-api application.yaml

server:  port: 12345  servlet:    session:      timeout: 120m    context-path: /dolphinscheduler/  compression:    enabled: true    mime-types: text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json,application/xml  jetty:    max-http-form-post-size: 5000000spring:  application:    name: api-server  banner:    charset: UTF-8  jackson:    time-zone: UTC    date-format: "yyyy-MM-dd HH:mm:ss"  servlet:    multipart:      max-file-size: 1024MB      max-request-size: 1024MB  messages:    basename: i18n/messages  datasource:#    driver-class-name: org.postgresql.Driver#    url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler    driver-class-name: com.mysql.jdbc.Driver    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull    username: root    password: root    hikari:      connection-test-query: select 1      minimum-idle: 5      auto-commit: true      validation-timeout: 3000      pool-name: DolphinScheduler      maximum-pool-size: 50      connection-timeout: 30000      idle-timeout: 600000      leak-detection-threshold: 0      initialization-fail-timeout: 1  quartz:    auto-startup: false    job-store-type: jdbc    jdbc:      initialize-schema: never    properties:      org.quartz.threadPool:threadPriority: 5      org.quartz.jobStore.isClustered: true      org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX      org.quartz.scheduler.instanceId: AUTO      org.quartz.jobStore.tablePrefix: QRTZ_      org.quartz.jobStore.acquireTriggersWithinLock: true      org.quartz.scheduler.instanceName: DolphinScheduler      org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool      org.quartz.jobStore.useProperties: false      org.quartz.threadPool.makeThreadsDaemons: true      org.quartz.threadPool.threadCount: 25      org.quartz.jobStore.misfireThreshold: 60000      org.quartz.scheduler.makeSchedulerThreadDaemon: true#      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate      org.quartz.jobStore.clusterCheckinInterval: 5000management:  endpoints:    web:      exposure:        include: '*'  metrics:    tags:      application: ${spring.application.name}registry:  type: zookeeper  zookeeper:    namespace: dolphinscheduler#    connect-string: localhost:2181    connect-string: 10.255.158.70:2181    retry-policy:      base-sleep-time: 60ms      max-sleep: 300ms      max-retries: 5    session-timeout: 30s    connection-timeout: 9s    block-until-connected: 600ms    digest: ~audit:  enabled: falsemetrics:  enabled: truepython-gateway:  # Weather enable python gateway server or not. The default value is true.  enabled: true  # The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different  # between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost`  gateway-server-address: 0.0.0.0  # The port of Python gateway server start. Define which port you could connect to Python gateway server from  # Python API side.  gateway-server-port: 25333  # The address of Python callback client.  python-address: 127.0.0.1  # The port of Python callback client.  python-port: 25334  # Close connection of socket server if no other request accept after x milliseconds. Define value is (0 = infinite),  # and socket server would never close even though no requests accept  connect-timeout: 0  # Close each active connection of socket server if python program not active after x milliseconds. Define value is  # (0 = infinite), and socket server would never close even though no requests accept  read-timeout: 0# Override by profile---spring:  config:    activate:      on-profile: mysql  datasource:    driver-class-name: com.mysql.jdbc.Driver    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8  quartz:    properties:      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate

dolphinscheduler-master application.yaml

spring:  banner:    charset: UTF-8  application:    name: master-server  jackson:    time-zone: UTC    date-format: "yyyy-MM-dd HH:mm:ss"  cache:    # default enable cache, you can disable by `type: none`    type: none    cache-names:      - tenant      - user      - processDefinition      - processTaskRelation      - taskDefinition    caffeine:      spec: maximumSize=100,expireAfterWrite=300s,recordStats  datasource:    #driver-class-name: org.postgresql.Driver    #url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler    driver-class-name: com.mysql.jdbc.Driver    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull    username: root    password:    hikari:      connection-test-query: select 1      minimum-idle: 5      auto-commit: true      validation-timeout: 3000      pool-name: DolphinScheduler      maximum-pool-size: 50      connection-timeout: 30000      idle-timeout: 600000      leak-detection-threshold: 0      initialization-fail-timeout: 1  quartz:    job-store-type: jdbc    jdbc:      initialize-schema: never    properties:      org.quartz.threadPool:threadPriority: 5      org.quartz.jobStore.isClustered: true      org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX      org.quartz.scheduler.instanceId: AUTO      org.quartz.jobStore.tablePrefix: QRTZ_      org.quartz.jobStore.acquireTriggersWithinLock: true      org.quartz.scheduler.instanceName: DolphinScheduler      org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool      org.quartz.jobStore.useProperties: false      org.quartz.threadPool.makeThreadsDaemons: true      org.quartz.threadPool.threadCount: 25      org.quartz.jobStore.misfireThreshold: 60000      org.quartz.scheduler.makeSchedulerThreadDaemon: true#      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate      org.quartz.jobStore.clusterCheckinInterval: 5000registry:  type: zookeeper  zookeeper:    namespace: dolphinscheduler#    connect-string: localhost:2181    connect-string: 10.255.158.70:2181    retry-policy:      base-sleep-time: 60ms      max-sleep: 300ms      max-retries: 5    session-timeout: 30s    connection-timeout: 9s    block-until-connected: 600ms    digest: ~master:  listen-port: 5678  # master fetch command num  fetch-command-num: 10  # master prepare execute thread number to limit handle commands in parallel  pre-exec-threads: 10  # master execute thread number to limit process instances in parallel  exec-threads: 100  # master dispatch task number per batch  dispatch-task-number: 3  # master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight  host-selector: lower_weight  # master heartbeat interval, the unit is second  heartbeat-interval: 10  # master commit task retry times  task-commit-retry-times: 5  # master commit task interval, the unit is millisecond  task-commit-interval: 1000  state-wheel-interval: 5  # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2  max-cpu-load-avg: -1  # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G  reserved-memory: 0.3  # failover interval, the unit is minute  failover-interval: 10  # kill yarn jon when failover taskInstance, default true  kill-yarn-job-when-task-failover: trueserver:  port: 5679management:  endpoints:    web:      exposure:        include: '*'  metrics:    tags:      application: ${spring.application.name}metrics:  enabled: true# Override by profile---spring:  config:    activate:      on-profile: mysql  datasource:    driver-class-name: com.mysql.jdbc.Driver    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull  quartz:    properties:      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate

dolphinscheduler-worker application.yaml

spring:  banner:    charset: UTF-8  application:    name: worker-server  jackson:    time-zone: UTC    date-format: "yyyy-MM-dd HH:mm:ss"  datasource:    #driver-class-name: org.postgresql.Driver    #url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler    driver-class-name: com.mysql.jdbc.Driver    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull    username: root    #password: root    password:    hikari:      connection-test-query: select 1      minimum-idle: 5      auto-commit: true      validation-timeout: 3000      pool-name: DolphinScheduler      maximum-pool-size: 50      connection-timeout: 30000      idle-timeout: 600000      leak-detection-threshold: 0      initialization-fail-timeout: 1registry:  type: zookeeper  zookeeper:    namespace: dolphinscheduler#    connect-string: localhost:2181    connect-string: 10.255.158.70:2181    retry-policy:      base-sleep-time: 60ms      max-sleep: 300ms      max-retries: 5    session-timeout: 30s    connection-timeout: 9s    block-until-connected: 600ms    digest: ~worker:  # worker listener port  listen-port: 1234  # worker execute thread number to limit task instances in parallel  exec-threads: 100  # worker heartbeat interval, the unit is second  heartbeat-interval: 10  # worker host weight to dispatch tasks, default value 100  host-weight: 100  # worker tenant auto create  tenant-auto-create: true  # worker max cpuload avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value -1: the number of cpu cores * 2  max-cpu-load-avg: -1  # worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G  reserved-memory: 0.3  # default worker groups separated by comma, like 'worker.groups=default,test'  groups:    - default  # alert server listen host  alert-listen-host: localhost  alert-listen-port: 50052server:  port: 1235management:  endpoints:    web:      exposure:        include: '*'  metrics:    tags:      application: ${spring.application.name}metrics:  enabled: true

次要关注数据库,quartz, zookeeper, masker, worker配置。

2.2 API次要工作操作接口

其余业务接口能够不必关注,只须要关注最最次要的流程上线性能接口,此接口能够发散出所有的任务调度相干的代码。

接口:/dolphinscheduler/projects/{projectCode}/schedules/{id}/online;此接口会将定义的流程提交到Quartz调度框架;代码如下:

public Map<String, Object> setScheduleState(User loginUser,
                                                long projectCode,
                                                Integer id,
                                                ReleaseState scheduleStatus) {
        Map<String, Object> result = new HashMap<>();

        Project project = projectMapper.queryByCode(projectCode);
        // check project auth
        boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result);
        if (!hasProjectAndPerm) {
            return result;
        }

        // check schedule exists
        Schedule scheduleObj = scheduleMapper.selectById(id);

        if (scheduleObj == null) {
            putMsg(result, Status.SCHEDULE\_CRON\_NOT_EXISTS, id);
            return result;
        }
        // check schedule release state
        if (scheduleObj.getReleaseState() == scheduleStatus) {
            logger.info("schedule release is already {},needn't to change schedule id: {} from {} to {}",
                    scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus);
            putMsg(result, Status.SCHEDULE\_CRON\_REALEASE\_NEED\_NOT_CHANGE, scheduleStatus);
            return result;
        }
        ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
        if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
            putMsg(result, Status.PROCESS\_DEFINE\_NOT_EXIST, String.valueOf(scheduleObj.getProcessDefinitionCode()));
            return result;
        }
        List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode());
        if (processTaskRelations.isEmpty()) {
            putMsg(result, Status.PROCESS\_DAG\_IS_EMPTY);
            return result;
        }
        if (scheduleStatus == ReleaseState.ONLINE) {
            // check process definition release state
            if (processDefinition.getReleaseState() != ReleaseState.ONLINE) {
                logger.info("not release process definition id: {} , name : {}",
                        processDefinition.getId(), processDefinition.getName());
                putMsg(result, Status.PROCESS\_DEFINE\_NOT_RELEASE, processDefinition.getName());
                return result;
            }
            // check sub process definition release state
            List<Long> subProcessDefineCodes = new ArrayList<>();
            processService.recurseFindSubProcess(processDefinition.getCode(), subProcessDefineCodes);
            if (!subProcessDefineCodes.isEmpty()) {
                List<ProcessDefinition> subProcessDefinitionList =
                        processDefinitionMapper.queryByCodes(subProcessDefineCodes);
                if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) {
                    for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) {
                        /**
                         * if there is no online process, exit directly
                         */
                        if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) {
                            logger.info("not release process definition id: {} , name : {}",
                                    subProcessDefinition.getId(), subProcessDefinition.getName());
                            putMsg(result, Status.PROCESS\_DEFINE\_NOT_RELEASE, String.valueOf(subProcessDefinition.getId()));
                            return result;
                        }
                    }
                }
            }
        }

        // check master server exists
        List<Server> masterServers = monitorService.getServerListFromRegistry(true);

        if (masterServers.isEmpty()) {
            putMsg(result, Status.MASTER\_NOT\_EXISTS);
            return result;
        }

        // set status
        scheduleObj.setReleaseState(scheduleStatus);

        scheduleMapper.updateById(scheduleObj);

        try {
            switch (scheduleStatus) {
                case ONLINE:
                    logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
                    setSchedule(project.getId(), scheduleObj);
                    break;
                case OFFLINE:
                    logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
                    deleteSchedule(project.getId(), id);
                    break;
                default:
                    putMsg(result, Status.SCHEDULE\_STATUS\_UNKNOWN, scheduleStatus.toString());
                    return result;
            }
        } catch (Exception e) {
            result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure");
            throw new ServiceException(result.get(Constants.MSG).toString(), e);
        }

        putMsg(result, Status.SUCCESS);
        return result;
    }

public void setSchedule(int projectId, Schedule schedule) {        logger.info("set schedule, project id: {}, scheduleId: {}", projectId, schedule.getId());        quartzExecutor.addJob(ProcessScheduleJob.class, projectId, schedule);    }
public void addJob(Class<? extends Job> clazz, int projectId, final Schedule schedule) {        String jobName = this.buildJobName(schedule.getId());        String jobGroupName = this.buildJobGroupName(projectId);        Map<String, Object> jobDataMap = this.buildDataMap(projectId, schedule);        String cronExpression = schedule.getCrontab();        String timezoneId = schedule.getTimezoneId();        /**         * transform from server default timezone to schedule timezone         * e.g. server default timezone is `UTC`         * user set a schedule with startTime `2022-04-28 10:00:00`, timezone is `Asia/Shanghai`,         * api skip to transform it and save into databases directly, startTime `2022-04-28 10:00:00`, timezone is `UTC`, which actually added 8 hours,         * so when add job to quartz, it should recover by transform timezone         */        Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), timezoneId);        Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), timezoneId);        lock.writeLock().lock();        try {            JobKey jobKey = new JobKey(jobName, jobGroupName);            JobDetail jobDetail;            //add a task (if this task already exists, return this task directly)            if (scheduler.checkExists(jobKey)) {                jobDetail = scheduler.getJobDetail(jobKey);                jobDetail.getJobDataMap().putAll(jobDataMap);            } else {                jobDetail = newJob(clazz).withIdentity(jobKey).build();                jobDetail.getJobDataMap().putAll(jobDataMap);                scheduler.addJob(jobDetail, false, true);                logger.info("Add job, job name: {}, group name: {}",                        jobName, jobGroupName);            }            TriggerKey triggerKey = new TriggerKey(jobName, jobGroupName);            /*             * Instructs the Scheduler that upon a mis-fire             * situation, the CronTrigger wants to have it's             * next-fire-time updated to the next time in the schedule after the             * current time (taking into account any associated Calendar),             * but it does not want to be fired now.             */            CronTrigger cronTrigger = newTrigger()                    .withIdentity(triggerKey)                    .startAt(startDate)                    .endAt(endDate)                    .withSchedule(                            cronSchedule(cronExpression)                                    .withMisfireHandlingInstructionDoNothing()                                    .inTimeZone(DateUtils.getTimezone(timezoneId))                    )                    .forJob(jobDetail).build();            if (scheduler.checkExists(triggerKey)) {                // updateProcessInstance scheduler trigger when scheduler cycle changes                CronTrigger oldCronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);                String oldCronExpression = oldCronTrigger.getCronExpression();                if (!StringUtils.equalsIgnoreCase(cronExpression, oldCronExpression)) {                    // reschedule job trigger                    scheduler.rescheduleJob(triggerKey, cronTrigger);                    logger.info("reschedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",                            jobName, jobGroupName, cronExpression, startDate, endDate);                }            } else {                scheduler.scheduleJob(cronTrigger);                logger.info("schedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",                        jobName, jobGroupName, cronExpression, startDate, endDate);            }        } catch (Exception e) {            throw new ServiceException("add job failed", e);        } finally {            lock.writeLock().unlock();        }    }

2.3 Quaterz架构与运行流程

2.3.1 概念与架构

Quartz 框架次要包含如下几个局部:

  • SchedulerFactory:任务调度工厂,次要负责管理任务调度器;
  • Scheduler :任务调度器,次要负责任务调度,以及操作工作的相干接口;
  • Job :工作接口,实现类蕴含具体任务业务代码;
  • JobDetail:用于定义作业的实例;
  • Trigger:工作触发器,次要寄存 Job 执行的工夫策略。例如多久执行一次,什么时候执行,以什么频率执行等等;
  • JobBuilder :用于定义/构建 JobDetail 实例,用于定义作业的实例。
  • TriggerBuilder :用于定义/构建触发器实例;
  • Calendar:Trigger 扩大对象,能够排除或者蕴含某个指定的工夫点(如排除法定节假日);
  • JobStore:存储作业和任务调度期间的状态Scheduler的生命期,从 SchedulerFactory 创立它时开始,到 Scheduler 调用Shutdown() 办法时完结;

Scheduler 被创立后,能够减少、删除和列举 Job 和 Trigger,以及执行其它与调度相干的操作(如暂停 Trigger)。但Scheduler 只有在调用 start() 办法后,才会真正地触发 trigger(即执行 job)

2.3.2 初始化与执行流程

Quartz的基本原理就是通过Scheduler来调度被JobDetail和Trigger定义的装置Job接口标准实现的自定义工作业务对象,来实现工作的调度。根本逻辑如下图:

代码时序图如下:

根本内容就是初始化任务调度容器Scheduler,以及容器所需的线程池,数据交互对象JobStore,工作解决线程QuartzSchedulerThread用来解决Job接口的具体业务实现类。

DolphinScheduler的业务类是ProcessScheduleJob,次要性能就是依据调度信息往commond表中写数据。

2.3.3 集群运行

须要留神的事:

  1. 当Quartz采纳集群模式部署的时候,存储介质不能应用内存的模式,也就是不能应用JobStoreRAM。
  2. Quartz集群对于对于须要被调度的Triggers实例的扫描是应用数据库锁TRIGGER_ACCESS来实现的,保障此扫描过程只能被一个Quartz实例获取到。代码如下:

  public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
        throws JobPersistenceException {
        
        String lockName;
        if(isAcquireTriggersWithinLock() || maxCount > 1) { 
            lockName = LOCK\_TRIGGER\_ACCESS;
        } else {
            lockName = null;
        }
        return executeInNonManagedTXLock(lockName, 
                new TransactionCallback<List<OperableTrigger>>() {
                    public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
                        return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
                    }
                },
                new TransactionValidator<List<OperableTrigger>>() {
                    public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {
                        try {
                            List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
                            Set<String> fireInstanceIds = new HashSet<String>();
                            for (FiredTriggerRecord ft : acquired) {
                                fireInstanceIds.add(ft.getFireInstanceId());
                            }
                            for (OperableTrigger tr : result) {
                                if (fireInstanceIds.contains(tr.getFireInstanceId())) {
                                    return true;
                                }
                            }
                            return false;
                        } catch (SQLException e) {
                            throw new JobPersistenceException("error validating trigger acquisition", e);
                        }
                    }
                });
    }

3.集群失败实例复原须要留神的是各个实例复原各自实例对应的异样实例,因为数据库有调度容器的instanceId信息。代码如下:

 protected void clusterRecover(Connection conn, List<SchedulerStateRecord> failedInstances)        throws JobPersistenceException {        if (failedInstances.size() > 0) {            long recoverIds = System.currentTimeMillis();            logWarnIfNonZero(failedInstances.size(),                    "ClusterManager: detected " + failedInstances.size()                            + " failed or restarted instances.");            try {                for (SchedulerStateRecord rec : failedInstances) {                    getLog().info(                            "ClusterManager: Scanning for instance \""                                    + rec.getSchedulerInstanceId()                                    + "\"'s failed in-progress jobs.");                    List<FiredTriggerRecord> firedTriggerRecs = getDelegate()                            .selectInstancesFiredTriggerRecords(conn,                                    rec.getSchedulerInstanceId());                    int acquiredCount = 0;                    int recoveredCount = 0;                    int otherCount = 0;                    Set<TriggerKey> triggerKeys = new HashSet<TriggerKey>();                    for (FiredTriggerRecord ftRec : firedTriggerRecs) {                        TriggerKey tKey = ftRec.getTriggerKey();                        JobKey jKey = ftRec.getJobKey();                        triggerKeys.add(tKey);                        // release blocked triggers..                        if (ftRec.getFireInstanceState().equals(STATE_BLOCKED)) {                            getDelegate()                                    .updateTriggerStatesForJobFromOtherState(                                            conn, jKey,                                            STATE_WAITING, STATE_BLOCKED);                        } else if (ftRec.getFireInstanceState().equals(STATE_PAUSED_BLOCKED)) {                            getDelegate()                                    .updateTriggerStatesForJobFromOtherState(                                            conn, jKey,                                            STATE_PAUSED, STATE_PAUSED_BLOCKED);                        }                        // release acquired triggers..                        if (ftRec.getFireInstanceState().equals(STATE_ACQUIRED)) {                            getDelegate().updateTriggerStateFromOtherState(                                    conn, tKey, STATE_WAITING,                                    STATE_ACQUIRED);                            acquiredCount++;                        } else if (ftRec.isJobRequestsRecovery()) {                            // handle jobs marked for recovery that were not fully                            // executed..                            if (jobExists(conn, jKey)) {                                @SuppressWarnings("deprecation")                                SimpleTriggerImpl rcvryTrig = new SimpleTriggerImpl(                                        "recover_"                                                + rec.getSchedulerInstanceId()                                                + "_"                                                + String.valueOf(recoverIds++),                                        Scheduler.DEFAULT_RECOVERY_GROUP,                                        new Date(ftRec.getScheduleTimestamp()));                                rcvryTrig.setJobName(jKey.getName());                                rcvryTrig.setJobGroup(jKey.getGroup());                                rcvryTrig.setMisfireInstruction(SimpleTrigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY);                                rcvryTrig.setPriority(ftRec.getPriority());                                JobDataMap jd = getDelegate().selectTriggerJobDataMap(conn, tKey.getName(), tKey.getGroup());                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_NAME, tKey.getName());                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_GROUP, tKey.getGroup());                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getFireTimestamp()));                                jd.put(Scheduler.FAILED_JOB_ORIGINAL_TRIGGER_SCHEDULED_FIRETIME_IN_MILLISECONDS, String.valueOf(ftRec.getScheduleTimestamp()));                                rcvryTrig.setJobDataMap(jd);                                rcvryTrig.computeFirstFireTime(null);                                storeTrigger(conn, rcvryTrig, null, false,                                        STATE_WAITING, false, true);                                recoveredCount++;                            } else {                                getLog()                                        .warn(                                                "ClusterManager: failed job '"                                                        + jKey                                                        + "' no longer exists, cannot schedule recovery.");                                otherCount++;                            }                        } else {                            otherCount++;                        }                        // free up stateful job's triggers                        if (ftRec.isJobDisallowsConcurrentExecution()) {                            getDelegate()                                    .updateTriggerStatesForJobFromOtherState(                                            conn, jKey,                                            STATE_WAITING, STATE_BLOCKED);                            getDelegate()                                    .updateTriggerStatesForJobFromOtherState(                                            conn, jKey,                                            STATE_PAUSED, STATE_PAUSED_BLOCKED);                        }                    }                    getDelegate().deleteFiredTriggers(conn,                            rec.getSchedulerInstanceId());                    // Check if any of the fired triggers we just deleted were the last fired trigger                    // records of a COMPLETE trigger.                    int completeCount = 0;                    for (TriggerKey triggerKey : triggerKeys) {                        if (getDelegate().selectTriggerState(conn, triggerKey).                                equals(STATE_COMPLETE)) {                            List<FiredTriggerRecord> firedTriggers =                                    getDelegate().selectFiredTriggerRecords(conn, triggerKey.getName(), triggerKey.getGroup());                            if (firedTriggers.isEmpty()) {                                if (removeTrigger(conn, triggerKey)) {                                    completeCount++;                                }                            }                        }                    }                    logWarnIfNonZero(acquiredCount,                            "ClusterManager: ......Freed " + acquiredCount                                    + " acquired trigger(s).");                    logWarnIfNonZero(completeCount,                            "ClusterManager: ......Deleted " + completeCount                                    + " complete triggers(s).");                    logWarnIfNonZero(recoveredCount,                            "ClusterManager: ......Scheduled " + recoveredCount                                    + " recoverable job(s) for recovery.");                    logWarnIfNonZero(otherCount,                            "ClusterManager: ......Cleaned-up " + otherCount                                    + " other failed job(s).");                    if (!rec.getSchedulerInstanceId().equals(getInstanceId())) {                        getDelegate().deleteSchedulerState(conn,                                rec.getSchedulerInstanceId());                    }                }            } catch (Throwable e) {                throw new JobPersistenceException("Failure recovering jobs: "                        + e.getMessage(), e);            }        }    }

2.4 Master启动与执行流程

2.4.1 概念与执行逻辑

要害概念:

Quartz相干:

  • Scheduler(任务调度容器,个别都是StdScheduler实例)。
  • ProcessScheduleJob:(实现Quarts调度框架的Job接口的业务类,专门生成DolphinScheduler数据库业务表t\_ds\_commond数据);

DolphinScheduler相干:

  • NettyRemotingServer(netty服务端,蕴含netty服务端serverBootstrap对象与netty服务端业务解决对象serverHandler), NettyServerHandler:(netty服务端业务解决类:蕴含各类处理器以及处理器对应的执行线程池);
  • TaskPluginManager(工作插件管理器,不同类型的工作以插件的模式治理,在应用服务启动的时候,通过@AutoService加载实现了TaskChannelFactory接口的工厂信息到数据库,通过工厂对象来加载各类TaskChannel实现类到缓存);
  • MasterRegistryClient(master操作zk的客户端,封装了master对于zk的所有操作,注册,查问,删除等);
  • MasterSchedulerService(扫描服务,蕴含业务执行线程和work蕴含的nettyhe护短,负责任务调度业务,slot来管制集群模式下工作不被反复调度,底层实现是zookeeper分布式锁);
  • WorkflowExecuteThread(真正的业务解决线程,通过插槽获取命令commond,执行之前会校验slot的变动,如果变动不执行,要害性能就是构建工作相干的参数,定义,优先级等,而后发送到队列,供队列解决线程生产);
  • CommonTaskProcessor(一般工作处理器,实现ITaskProcessor接口,依据业务分为一般,依赖,子工作,阻塞,条件工作类型,蕴含了工作的提交,运行,散发,杀死等业务,通过@AutoService加载的类,根本就是封装了对);
  • TaskPriorityQueueImpl(工作队列,负责工作队列的存储管制);
  • TaskPriorityQueueConsumer(工作队列生产线程,负责工作的依据负载平衡策略在worker之间散发与执行);
  • ServerNodeManager (节点信息控制器,负责节点注册信息更新与槽位(slot)变更,底层实现是zookeeper分布式锁的利用);
  • EventExecuteService(事件处理线程,通过缓存起来的工作解决线程,解决每个工作在处理过程中注册在线程事件队列中的事件);
  • FailoverExecuteThread(故障转移线程,蕴含Master和worker的);
  • MasterRegistryDataListener(托管在zk治理框架cautor的故障监听器,负责对worker和master注册在zk上的节点的新增和删除)。

主节点容错代码如下,业务解释见1.5.1Master容错解释:

 private void failoverMasterWithLock(String masterHost) {        String failoverPath = getFailoverLockPath(NodeType.MASTER, masterHost);        try {            registryClient.getLock(failoverPath);            this.failoverMaster(masterHost);        } catch (Exception e) {            LOGGER.error("{} server failover failed, host:{}", NodeType.MASTER, masterHost, e);        } finally {            registryClient.releaseLock(failoverPath);        }    } /**     * failover master     * <p>     * failover process instance and associated task instance     *故障转移流程实例和关联的工作实例     * @param masterHost master host     */    private void failoverMaster(String masterHost) {        if (StringUtils.isEmpty(masterHost)) {            return;        }        Date serverStartupTime = getServerStartupTime(NodeType.MASTER, masterHost);        long startTime = System.currentTimeMillis();        List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);        LOGGER.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size());        List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);        for (ProcessInstance processInstance : needFailoverProcessInstanceList) {            if (Constants.NULL.equals(processInstance.getHost())) {                continue;            }            List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());            for (TaskInstance taskInstance : validTaskInstanceList) {                LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());                failoverTaskInstance(processInstance, taskInstance, workerServers);            }            if (serverStartupTime != null && processInstance.getRestartTime() != null                && processInstance.getRestartTime().after(serverStartupTime)) {                continue;            }            LOGGER.info("failover process instance id: {}", processInstance.getId());            //updateProcessInstance host is null and insert into command            processInstance.setHost(Constants.NULL);            processService.processNeedFailoverProcessInstances(processInstance);        }        LOGGER.info("master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime);    }

2.4.2 集群与槽(slot)

其实这里的采纳Zookeer分布式锁精确也不精确,为什么这么说,因为Slot是CommondId对Master列表长度取模来计算的,而Master列表长度的刷新是Zookeeper分布式锁来管制,Master节点的调度数据扫描是通过Slot来管制的。

具体代码如下:

Slot刷新

private void updateMasterNodes() {        MASTER_SLOT = 0;        MASTER_SIZE = 0;        this.masterNodes.clear();        String nodeLock = Constants.REGISTRY_DOLPHINSCHEDULER_LOCK_MASTERS;        try {            registryClient.getLock(nodeLock);            Collection<String> currentNodes = registryClient.getMasterNodesDirectly();            List<Server> masterNodes = registryClient.getServerList(NodeType.MASTER);            syncMasterNodes(currentNodes, masterNodes);        } catch (Exception e) {            logger.error("update master nodes error", e);        } finally {            registryClient.releaseLock(nodeLock);        }    }/**     * sync master nodes     *     * @param nodes master nodes     */    private void syncMasterNodes(Collection<String> nodes, List<Server> masterNodes) {        masterLock.lock();        try {            String addr = NetUtils.getAddr(NetUtils.getHost(), masterConfig.getListenPort());            this.masterNodes.addAll(nodes);            this.masterPriorityQueue.clear();            this.masterPriorityQueue.putList(masterNodes);            int index = masterPriorityQueue.getIndex(addr);            if (index >= 0) {                MASTER_SIZE = nodes.size();                MASTER_SLOT = index;            } else {                logger.warn("current addr:{} is not in active master list", addr);            }            logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, addr);        } finally {            masterLock.unlock();        }    }

Slot利用

/**     * 1. get command by slot     * 2. donot handle command if slot is empty     */    /** * 1. 通过插槽获取命令 * 2. 如果插槽为空,则不解决命令 */    private void scheduleProcess() throws Exception {        List<Command> commands = findCommands();        if (CollectionUtils.isEmpty(commands)) {            //indicate that no command ,sleep for 1s            Thread.sleep(Constants.SLEEP_TIME_MILLIS);            return;        }        List<ProcessInstance> processInstances = command2ProcessInstance(commands);        if (CollectionUtils.isEmpty(processInstances)) {            return;        }        for (ProcessInstance processInstance : processInstances) {            if (processInstance == null) {                continue;            }            WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread(                    processInstance                    , processService                    , nettyExecutorManager                    , processAlertManager                    , masterConfig                    , stateWheelExecuteThread);            this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread);            if (processInstance.getTimeout() > 0) {                stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);            }            workflowExecuteThreadPool.startWorkflow(workflowExecuteThread);        }    }private List<Command> findCommands() {        int pageNumber = 0;        int pageSize = masterConfig.getFetchCommandNum();        List<Command> result = new ArrayList<>();        if (Stopper.isRunning()) {            int thisMasterSlot = ServerNodeManager.getSlot();            int masterCount = ServerNodeManager.getMasterSize();            if (masterCount > 0) {                result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);            }        }        return result;    }@Override    public List<Command> findCommandPageBySlot(int pageSize, int pageNumber, int masterCount, int thisMasterSlot) {        if (masterCount <= 0) {            return Lists.newArrayList();        }        return commandMapper.queryCommandPageBySlot(pageSize, pageNumber * pageSize, masterCount, thisMasterSlot);    }     <select id="queryCommandPageBySlot" resultType="org.apache.dolphinscheduler.dao.entity.Command">        select *        from t_ds_command        where id % #{masterCount} = #{thisMasterSlot}        order by process_instance_priority, id asc            limit #{limit} offset #{offset}    </select>##槽位查看 private List<ProcessInstance> command2ProcessInstance(List<Command> commands) {        List<ProcessInstance> processInstances = Collections.synchronizedList(new ArrayList<>(commands.size()));        CountDownLatch latch = new CountDownLatch(commands.size());        for (final Command command : commands) {            masterPrepareExecService.execute(() -> {                try {                    // slot check again                    SlotCheckState slotCheckState = slotCheck(command);                    if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) {                        logger.info("handle command {} skip, slot check state: {}", command.getId(), slotCheckState);                        return;                    }                    ProcessInstance processInstance = processService.handleCommand(logger,                            getLocalAddress(),                            command);                    if (processInstance != null) {                        processInstances.add(processInstance);                        logger.info("handle command {} end, create process instance {}", command.getId(), processInstance.getId());                    }                } catch (Exception e) {                    logger.error("handle command error ", e);                    processService.moveToErrorCommand(command, e.toString());                } finally {                    latch.countDown();                }            });        }        try {            // make sure to finish handling command each time before next scan            latch.await();        } catch (InterruptedException e) {            logger.error("countDownLatch await error ", e);        }        return processInstances;    }private SlotCheckState slotCheck(Command command) {        int slot = ServerNodeManager.getSlot();        int masterSize = ServerNodeManager.getMasterSize();        SlotCheckState state;        if (masterSize <= 0) {            state = SlotCheckState.CHANGE;        } else if (command.getId() % masterSize == slot) {            state = SlotCheckState.PASS;        } else {            state = SlotCheckState.INJECT;        }        return state;    }

2.4.3 代码执行流程

代码过于繁琐,此处不再一一粘贴代码解释各个类的性能,自行看代码更加清晰。

2.5Worker启动与执行流程

2.5.1 概念与执行逻辑

  • NettyRemotingServer(worker蕴含的netty服务端) WorkerRegistryClient(zk客户端,封装了worker与zk相干的操作,注册,查问,删除等) ;
  • TaskPluginManager(工作插件管理器,封装了插件加载逻辑和工作理论执行业务的形象) ;
  • WorkerManagerThread(工作工作线程生成器,生产netty处理器推动队列的工作信息,并生成工作执行线程提交线程池治理) ;
  • TaskExecuteProcessor(Netty工作执行处理器,生成master散发到work的工作信息,并推送到队列) ;
  • TaskExecuteThread(工作执行线程) ;
  • TaskCallbackService(工作回调线程,与master蕴含的netty client通信);
  • AbstractTask(工作理论业务的抽象类,子类蕴含理论的工作执行业务,SqlTask,DataXTask等) ;
  • RetryReportTaskStatusThread(不关注)

2.5.2 代码执行流程

Worker节点代码时序图如下:

代码过于繁琐,此处不再一一粘贴代码解释各个类的性能,自行看代码更加清晰。

2.6 RPC交互

因为节点和应用服务之间的RPC通信都是基于Netty实现的,Netty相干常识不在这里过多的解说,以后章节只波及Master与Worker之间的交互模式的设计与实现。

整体设计如下

2.6.1 Master与Worker交互

Master与worker之间的业务逻辑的交互是基于Netty服务端与客户端来实现Rpc通信的,Master和Worker启动的时候会将本人的Netty服务端信息注册到ZK相应的节点上,Master的工作散发线程和工作杀死等业务运行时,拉取ZK上的Worker节点信息,依据负载平衡策略抉择一个节点(下章介绍负载平衡),构建Netty客户端与Worker的Netty服务端通信,Worker收到Master的RPC申请之后会缓存Channel信息并解决对应业务,同时Callback回调线程会获取缓存的通道来执行回调操作,这样就造成的闭环。

工作的执行杀死,以及回调状态解决等操作都是通过Netty客户端与服务端绑定的Processer处理器来进行的。

Master局部具体代码如下:

Master启动的时候会初始化Nettyserver,注册对应的申请处理器到NettyHandler并启动:

 @PostConstruct    public void run() throws SchedulerException {        // init remoting server        NettyServerConfig serverConfig = new NettyServerConfig();        serverConfig.setListenPort(masterConfig.getListenPort());        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.TASK_WAKEUP_EVENT_REQUEST, taskEventProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, cacheProcessor);        // logger server        this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);        this.nettyRemotingServer.start();        // install task plugin        this.taskPluginManager.installPlugin();        // self tolerant        this.masterRegistryClient.init();        this.masterRegistryClient.start();        this.masterRegistryClient.setRegistryStoppable(this);        this.masterSchedulerService.init();        this.masterSchedulerService.start();        this.eventExecuteService.start();        this.failoverExecuteThread.start();        this.scheduler.start();        Runtime.getRuntime().addShutdownHook(new Thread(() -> {            if (Stopper.isRunning()) {                close("shutdownHook");            }        }));    } /**     * server start     */    public void start() {        if (isStarted.compareAndSet(false, true)) {            this.serverBootstrap                    .group(this.bossGroup, this.workGroup)                    .channel(NettyUtils.getServerSocketChannelClass())                    .option(ChannelOption.SO_REUSEADDR, true)                    .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())                    .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())                    .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())                    .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())                    .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) {                            initNettyChannel(ch);                        }                    });            ChannelFuture future;            try {                future = serverBootstrap.bind(serverConfig.getListenPort()).sync();            } catch (Exception e) {                logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e);                throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));            }            if (future.isSuccess()) {                logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());            } else if (future.cause() != null) {                throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()), future.cause());            } else {                throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));            }        }    }

Master的NettyExecutorManager初始化的时候会将NettyRemotingClient也初始化,并且会注册解决Worker回调申请的处理器,真正的端口绑定是在获取到执行器端口之后:

 /**     * constructor     */    public NettyExecutorManager() {        final NettyClientConfig clientConfig = new NettyClientConfig();        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);    }##注册解决worker回调的处理器    @PostConstruct    public void init() {        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);        this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);    }     public NettyRemotingClient(final NettyClientConfig clientConfig) {        this.clientConfig = clientConfig;        if (NettyUtils.useEpoll()) {            this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {                private final AtomicInteger threadIndex = new AtomicInteger(0);                @Override                public Thread newThread(Runnable r) {                    return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));                }            });        } else {            this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {                private final AtomicInteger threadIndex = new AtomicInteger(0);                @Override                public Thread newThread(Runnable r) {                    return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));                }            });        }        this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,                new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),                new CallerThreadExecutePolicy());        this.clientHandler = new NettyClientHandler(this, callbackExecutor);        this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor"));        this.start();    } /**     * start     */ private void start() {        this.bootstrap                .group(this.workerGroup)                .channel(NettyUtils.getSocketChannelClass())                .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive())                .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay())                .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())                .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis())                .handler(new ChannelInitializer<SocketChannel>() {                    @Override                    public void initChannel(SocketChannel ch) {                        ch.pipeline()                                .addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS))                                .addLast(new NettyDecoder(), clientHandler, encoder);                    }                });        this.responseFutureExecutor.scheduleAtFixedRate(ResponseFuture::scanFutureTable, 5000, 1000, TimeUnit.MILLISECONDS);        isStarted.compareAndSet(false, true);    }

工作散发代码如下:

/**     * task dispatch     *     * @param context context     * @return result     * @throws ExecuteException if error throws ExecuteException     */    public Boolean dispatch(final ExecutionContext context) throws ExecuteException {        /**         * get executor manager         */        ExecutorManager<Boolean> executorManager = this.executorManagers.get(context.getExecutorType());        if (executorManager == null) {            throw new ExecuteException("no ExecutorManager for type : " + context.getExecutorType());        }        /**         * host select         */        Host host = hostManager.select(context);        if (StringUtils.isEmpty(host.getAddress())) {            throw new ExecuteException(String.format("fail to execute : %s due to no suitable worker, "                            + "current task needs worker group %s to execute",                    context.getCommand(),context.getWorkerGroup()));        }        context.setHost(host);        executorManager.beforeExecute(context);        try {            /**             * task execute             */            return executorManager.execute(context);        } finally {            executorManager.afterExecute(context);        }    }/**     * execute logic     *     * @param context context     * @return result     * @throws ExecuteException if error throws ExecuteException     */    @Override    public Boolean execute(ExecutionContext context) throws ExecuteException {        /**         *  all nodes         */        Set<String> allNodes = getAllNodes(context);        /**         * fail nodes         */        Set<String> failNodeSet = new HashSet<>();        /**         *  build command accord executeContext         */        Command command = context.getCommand();        /**         * execute task host         */        Host host = context.getHost();        boolean success = false;        while (!success) {            try {                doExecute(host, command);                success = true;                context.setHost(host);            } catch (ExecuteException ex) {                logger.error(String.format("execute command : %s error", command), ex);                try {                    failNodeSet.add(host.getAddress());                    Set<String> tmpAllIps = new HashSet<>(allNodes);                    Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);                    if (remained != null && remained.size() > 0) {                        host = Host.of(remained.iterator().next());                        logger.error("retry execute command : {} host : {}", command, host);                    } else {                        throw new ExecuteException("fail after try all nodes");                    }                } catch (Throwable t) {                    throw new ExecuteException("fail after try all nodes");                }            }        }        return success;    }/**     * execute logic     *     * @param host host     * @param command command     * @throws ExecuteException if error throws ExecuteException     */    public void doExecute(final Host host, final Command command) throws ExecuteException {        /**         * retry count,default retry 3         */        int retryCount = 3;        boolean success = false;        do {            try {                nettyRemotingClient.send(host, command);                success = true;            } catch (Exception ex) {                logger.error(String.format("send command : %s to %s error", command, host), ex);                retryCount--;                ThreadUtils.sleep(100);            }        } while (retryCount >= 0 && !success);        if (!success) {            throw new ExecuteException(String.format("send command : %s to %s error", command, host));        }    }  /**     * send task     *     * @param host host     * @param command command     */    public void send(final Host host, final Command command) throws RemotingException {        Channel channel = getChannel(host);        if (channel == null) {            throw new RemotingException(String.format("connect to : %s fail", host));        }        try {            ChannelFuture future = channel.writeAndFlush(command).await();            if (future.isSuccess()) {                logger.debug("send command : {} , to : {} successfully.", command, host.getAddress());            } else {                String msg = String.format("send command : %s , to :%s failed", command, host.getAddress());                logger.error(msg, future.cause());                throw new RemotingException(msg);            }        } catch (Exception e) {            logger.error("Send command {} to address {} encounter error.", command, host.getAddress());            throw new RemotingException(String.format("Send command : %s , to :%s encounter error", command, host.getAddress()), e);        }    }

Worker局部具体代码如下:

同理Woker在启动的时候会初始化NettyServer,注册对应处理器并启动:

/**     * worker server run     */    @PostConstruct    public void run() {        // init remoting server        NettyServerConfig serverConfig = new NettyServerConfig();        serverConfig.setListenPort(workerConfig.getListenPort());        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);        // logger server        this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.ROLL_VIEW_LOG_REQUEST, loggerRequestProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.VIEW_WHOLE_LOG_REQUEST, loggerRequestProcessor);        this.nettyRemotingServer.registerProcessor(CommandType.REMOVE_TAK_LOG_REQUEST, loggerRequestProcessor);        this.nettyRemotingServer.start();        // install task plugin        this.taskPluginManager.installPlugin();        // worker registry        try {            this.workerRegistryClient.registry();            this.workerRegistryClient.setRegistryStoppable(this);            Set<String> workerZkPaths = this.workerRegistryClient.getWorkerZkPaths();            this.workerRegistryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);        } catch (Exception e) {            logger.error(e.getMessage(), e);            throw new RuntimeException(e);        }        // task execute manager        this.workerManagerThread.start();        // retry report task status        this.retryReportTaskStatusThread.start();        /*         * registry hooks, which are called before the process exits         */        Runtime.getRuntime().addShutdownHook(new Thread(() -> {            if (Stopper.isRunning()) {                close("shutdownHook");            }        }));    }

回调线程对象初始化的时候,会将蕴含的Nettyremotingclient一起初始化,并注册好对应的业务处理器:

 public TaskCallbackService() {        final NettyClientConfig clientConfig = new NettyClientConfig();        this.nettyRemotingClient = new NettyRemotingClient(clientConfig);        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningProcessor);        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);    }

回调线程会通过其余执行器中缓存下来的Chanel与Master的客户端进行通信:

/**     * send result     *     * @param taskInstanceId taskInstanceId     * @param command command     */    public void send(int taskInstanceId, Command command) {        NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);        if (nettyRemoteChannel != null) {            nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener() {                @Override                public void operationComplete(ChannelFuture future) throws Exception {                    if (future.isSuccess()) {                        // remove(taskInstanceId);                        return;                    }                }            });        }    }

2.6.2 其余服务与Master交互

以日志服务为例,前端触发申请日志的接口,通过参数与数据库交互获取到Master的NettyServer信息,而后构建Netty客户端与Master进行通信获取日志并返回。具体代码如下

 public Result<String> queryLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,                                   @RequestParam(value = "taskInstanceId") int taskInstanceId,                                   @RequestParam(value = "skipLineNum") int skipNum,                                   @RequestParam(value = "limit") int limit) {        return loggerService.queryLog(taskInstanceId, skipNum, limit);    }
 /**     * view log     *     * @param taskInstId task instance id     * @param skipLineNum skip line number     * @param limit limit     * @return log string data     */    @Override    @SuppressWarnings("unchecked")    public Result<String> queryLog(int taskInstId, int skipLineNum, int limit) {        TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);        if (taskInstance == null) {            return Result.error(Status.TASK_INSTANCE_NOT_FOUND);        }        if (StringUtils.isBlank(taskInstance.getHost())) {            return Result.error(Status.TASK_INSTANCE_HOST_IS_NULL);        }        Result<String> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());        String log = queryLog(taskInstance,skipLineNum,limit);        result.setData(log);        return result;    }
/**     * query log     *     * @param taskInstance  task instance     * @param skipLineNum skip line number     * @param limit       limit     * @return log string data     */    private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {        Host host = Host.of(taskInstance.getHost());        logger.info("log host : {} , logPath : {} , port : {}", host.getIp(), taskInstance.getLogPath(),                host.getPort());        StringBuilder log = new StringBuilder();        if (skipLineNum == 0) {            String head = String.format(LOG_HEAD_FORMAT,                    taskInstance.getLogPath(),                    host,                    Constants.SYSTEM_LINE_SEPARATOR);            log.append(head);        }        log.append(logClient                .rollViewLog(host.getIp(), host.getPort(), taskInstance.getLogPath(), skipLineNum, limit));        return log.toString();    }
 /**     * roll view log     *     * @param host host     * @param port port     * @param path path     * @param skipLineNum skip line number     * @param limit limit     * @return log content     */    public String rollViewLog(String host, int port, String path, int skipLineNum, int limit) {        logger.info("roll view log, host : {}, port : {}, path {}, skipLineNum {} ,limit {}", host, port, path, skipLineNum, limit);        RollViewLogRequestCommand request = new RollViewLogRequestCommand(path, skipLineNum, limit);        String result = "";        final Host address = new Host(host, port);        try {            Command command = request.convert2Command();            Command response = this.client.sendSync(address, command, LOG_REQUEST_TIMEOUT);            if (response != null) {                RollViewLogResponseCommand rollReviewLog = JSONUtils.parseObject(                        response.getBody(), RollViewLogResponseCommand.class);                return rollReviewLog.getMsg();            }        } catch (Exception e) {            logger.error("roll view log error", e);        } finally {            this.client.closeChannel(address);        }        return result;    }
 /**     * sync send     *     * @param host host     * @param command command     * @param timeoutMillis timeoutMillis     * @return command     */    public Command sendSync(final Host host, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException {        final Channel channel = getChannel(host);        if (channel == null) {            throw new RemotingException(String.format("connect to : %s fail", host));        }        final long opaque = command.getOpaque();        final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);        channel.writeAndFlush(command).addListener(future -> {            if (future.isSuccess()) {                responseFuture.setSendOk(true);                return;            } else {                responseFuture.setSendOk(false);            }            responseFuture.setCause(future.cause());            responseFuture.putResponse(null);            logger.error("send command {} to host {} failed", command, host);        });        /*         * sync wait for result         */        Command result = responseFuture.waitResponse();        if (result == null) {            if (responseFuture.isSendOK()) {                throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());            } else {                throw new RemotingException(host.toString(), responseFuture.getCause());            }        }        return result;    }

Nettyclient随着日志业务对象初始化而初始化:

 /**     * construct client     */    public LogClientService() {        this.clientConfig = new NettyClientConfig();        this.clientConfig.setWorkerThreads(4);        this.client = new NettyRemotingClient(clientConfig);        this.isRunning = true;    }

2.7 负载平衡算法

Master在抉择执行器的时候DolphinScheduler提供了三种负载平衡算法,且所有的算法都用到了节点权重:加权随机(random),平滑轮询(roundrobin),线性负载(lowerweight)。通过配置文件来管制到底应用哪一个负载平衡策略,默认配置是权重策略:host-selector: lower_weight。

@Bean    public HostManager hostManager() {        HostSelector selector = masterConfig.getHostSelector();        HostManager hostManager;        switch (selector) {            case RANDOM:                hostManager = new RandomHostManager();                break;            case ROUND_ROBIN:                hostManager = new RoundRobinHostManager();                break;            case LOWER_WEIGHT:                hostManager = new LowerWeightHostManager();                break;            default:                throw new IllegalArgumentException("unSupport selector " + selector);        }        beanFactory.autowireBean(hostManager);        return hostManager;    }

2.7.1 加权随机

看代码更好了解:依照全副权重值求和,而后取汇总后果的随机整数,随机整数对原先所有host的权重累差,返回小于零的时候的host,没有就随机返回一个。

  @Override    public HostWorker doSelect(final Collection<HostWorker> source) {        List<HostWorker> hosts = new ArrayList<>(source);        int size = hosts.size();        int[] weights = new int[size];        int totalWeight = 0;        int index = 0;        for (HostWorker host : hosts) {            totalWeight += host.getHostWeight();            weights[index] = host.getHostWeight();            index++;        }        if (totalWeight > 0) {            int offset = ThreadLocalRandom.current().nextInt(totalWeight);            for (int i = 0; i < size; i++) {                offset -= weights[i];                if (offset < 0) {                    return hosts.get(i);                }            }        }        return hosts.get(ThreadLocalRandom.current().nextInt(size));    }

2.7.2 线性负载

权重计算逻辑:利用注册的Cpu占用、内存占用以及加载因子还有启动工夫耗费做计算。

private double calculateWeight(double cpu, double memory, double loadAverage, long startTime) {        double calculatedWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR;        long uptime = System.currentTimeMillis() - startTime;        if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {            // If the warm-up is not over, add the weight            return calculatedWeight * Constants.WARM_UP_TIME / uptime;        }        return calculatedWeight;    }

获取权重最小的节点,并把节点权重置为最大。

/**     * select     *     * @param sources sources     * @return HostWeight     */    @Override    public HostWeight doSelect(Collection<HostWeight> sources) {        double totalWeight = 0;        double lowWeight = 0;        HostWeight lowerNode = null;        for (HostWeight hostWeight : sources) {            totalWeight += hostWeight.getWeight();            hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());            if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) {                lowerNode = hostWeight;                lowWeight = hostWeight.getCurrentWeight();            }        }        lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);        return lowerNode;    }

2.7.3 平滑轮询

这个算法不是很好的可能了解,所以我不晓得我的了解是否正确,它有一个预热的过程,之前都是取第一个,等到累计的权重超过最大就整数就开始按权重轮询。

 @Override    public HostWorker doSelect(Collection<HostWorker> source) {        List<HostWorker> hosts = new ArrayList<>(source);        String key = hosts.get(0).getWorkerGroup();        ConcurrentMap<String, WeightedRoundRobin> map = workGroupWeightMap.get(key);        if (map == null) {            workGroupWeightMap.putIfAbsent(key, new ConcurrentHashMap<>());            map = workGroupWeightMap.get(key);        }        int totalWeight = 0;        long maxCurrent = Long.MIN_VALUE;        long now = System.currentTimeMillis();        HostWorker selectedHost = null;        WeightedRoundRobin selectWeightRoundRobin = null;        for (HostWorker host : hosts) {            String workGroupHost = host.getWorkerGroup() + host.getAddress();            WeightedRoundRobin weightedRoundRobin = map.get(workGroupHost);            int weight = host.getHostWeight();            if (weight < 0) {                weight = 0;            }            if (weightedRoundRobin == null) {                weightedRoundRobin = new WeightedRoundRobin();                // set weight                weightedRoundRobin.setWeight(weight);                map.putIfAbsent(workGroupHost, weightedRoundRobin);                weightedRoundRobin = map.get(workGroupHost);            }            if (weight != weightedRoundRobin.getWeight()) {                weightedRoundRobin.setWeight(weight);            }            long cur = weightedRoundRobin.increaseCurrent();            weightedRoundRobin.setLastUpdate(now);            if (cur > maxCurrent) {                maxCurrent = cur;                selectedHost = host;                selectWeightRoundRobin = weightedRoundRobin;            }            totalWeight += weight;        }        if (!updateLock.get() && hosts.size() != map.size() && updateLock.compareAndSet(false, true)) {            try {                ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);                newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);                workGroupWeightMap.put(key, newMap);            } finally {                updateLock.set(false);            }        }        if (selectedHost != null) {            selectWeightRoundRobin.sel(totalWeight);            return selectedHost;        }        return hosts.get(0);    }

2.8 日志服务

2.6.2曾经介绍不在做过多的阐明。

2.9 报警

暂未钻研,目测根本就是依据规定筛选数据,而后调用指定类型的报警服务接口做报警操作,比方邮件,微信,短信告诉等。

3 后记

3.1 Make friends

因为没有正式生产应用,业务了解不肯定透彻,了解可能有偏差,欢送大家一起进入社区交换探讨。

Apache DolphinScheduler Slack群链接:https://join.slack.com/t/asf-...\_invite/zt-1e36toy4n-5n9U2R\_\_FDM05R~MJFFVBg

3.2 参考文献

  1. https://dolphinscheduler.apac...;
  2. https://juejin.cn/post/684490...;
  3. https://www.w3cschool.cn/quartz\_doc/quartz\_doc-1xbu2clr.html.

最初,感激社区蔡顺峰、钟嘉杰和阮文俊对本文整顿和批改提出建设性意见,以及对本文公布提供的帮忙。

十分欢送大家退出 DolphinScheduler 小家庭,融入开源世界!

咱们激励任何模式的参加社区,最终成为 Committer 或 PPMC,如:

  • 将遇到的问题通过 GitHub 上 issue 的模式反馈进去。
  • 答复他人遇到的 issue 问题。
  • 帮忙欠缺文档。
  • 帮忙我的项目减少测试用例。
  • 为代码增加正文。
  • 提交修复 Bug 或者 Feature 的 PR。
  • 发表利用案例实际、调度流程剖析或者与调度相干的技术文章。
  • 帮忙推广 DolphinScheduler,参加技术大会或者 meetup 的分享等。

欢送退出奉献的队伍,退出开源从提交第一个 PR 开始。

  • 比方增加代码正文或找到带有 ”easy to fix” 标记或一些非常简单的 issue(拼写错误等) 等等,先通过第一个简略的 PR 相熟提交流程。

注:奉献不仅仅限于 PR 哈,对促成我的项目倒退的都是奉献。

置信参加 DolphinScheduler,肯定会让您从开源中受害!

参加奉献

随着国内开源的迅猛崛起,Apache DolphinScheduler 社区迎来蓬勃发展,为了做更好用、易用的调度,真挚欢送酷爱开源的搭档退出到开源社区中来,为中国开源崛起献上一份本人的力量,让外乡开源走向寰球。

参加 DolphinScheduler 社区有十分多的参加奉献的形式,包含:

奉献第一个PR(文档、代码) 咱们也心愿是简略的,第一个PR用于相熟提交的流程和社区合作以及感触社区的友好度。

社区汇总了以下适宜老手的问题列表:https://github.com/apache/dol...

非老手问题列表:https://github.com/apache/dol...

如何参加奉献链接:https://dolphinscheduler.apac...

来吧,DolphinScheduler开源社区须要您的参加,为中国开源崛起添砖加瓦吧,哪怕只是小小的一块瓦,汇聚起来的力量也是微小的。

参加开源能够近距离与各路高手切磋,迅速晋升本人的技能,如果您想参加奉献,咱们有个贡献者种子孵化群,能够增加社区小助手,手把手教会您( 贡献者不分程度高下,有问必答,要害是有一颗违心奉献的心 )。

增加小助手微信时请阐明想参加奉献。来吧,开源社区十分期待您的参加。