关于dolphinscheduler:dolphinscheduler-136源码解析

56次阅读

共计 9524 个字符,预计需要花费 24 分钟才能阅读完成。

基于 1.3.6, 仅集体了解, 欢送斧正.

架构

master

启动类为 org.apache.dolphinscheduler.server.master.MasterServer, 通过 spring 注解@PostConstruct 启动 run 办法.
master 节点在启动时, 次要做了以下 4 个事:

  1. 通过 netty 监听端口, 与 worker 节点通信
  2. 在注册核心 (zk) 上注册本人
  3. 启动任务调度线程
  4. 启动 quartz

其中 quartz 是一个定时工作的组件, 能够通过数据库做集群.

worker

worker 节点在启动时, 次要做了以下:

  1. 监听端口, 和 master 通信
  2. 通过注册核心注册
  3. 启动工作执行线程
  4. 启动工作 ack 和后果上报重试线程RetryReportTaskStatusThread

master 和 worker 节点都会监听端口, 是因为单方都会作为客户端被动发送音讯给对方.worker 节点可查看 org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService#getRemoteChannel(int) 代码, 在原有连贯不可用时会通过注册核心, 找到原有的 master 的 ip 和监听端口, 而后发动新的连贯申请.master 节点可查看 NettyExecutorManager, 作为客户端向 worker 发动申请.

logger

Logger 节点目前仅仅是通过 netty 监听了一个端口, 承受对日志文件的读取申请, 次要逻辑都在 org.apache.dolphinscheduler.server.log.LoggerRequestProcessor 中. 目前日志文件都是写到 worker 节点的本地文件, 因而 logger 节点必须和 worker 节点一对一部署在一起. 日志写到本地文件, 对容器部署不是很敌对, 如果要长久化, 势必须要通过长久化存储. 个人感觉没什么必要特地的抽出这么一个节点, 性能特地简略, 又必须和 worker 节点一一部署在一起, 齐全能够合并入 worker 节点中, 或者是为了后续扩大吧.

api

api 节点是个 web 利用, 提供 controller 接口为前端提供服务, 次要就是 crud.

alter

定时拉取数据库, 发送告警信息, 该节点利用只有部署一个就能够了.

服务注册与发现

master 和 worker 节点在启动时都会作为服务端通过 netty 监听端口, 只有客户端晓得服务端 ip 和该端口, 即可通过向其发动连贯进行通信.

master 注册

org.apache.dolphinscheduler.server.master.registry.MasterRegistry#registry

public void registry() {
     // 1. 获取本机地址
     String address = NetUtils.getAddr(masterConfig.getListenPort());
     // 2.master 注册门路
     String localNodePath = getMasterPath();
     // 3. 创立长期节点
     zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, "");
     // 4. 注册连贯状态监听器
     zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener((client, newState) -> {if (newState == ConnectionState.LOST) {logger.error("master : {} connection lost from zookeeper", address);
             } else if (newState == ConnectionState.RECONNECTED) {logger.info("master : {} reconnected to zookeeper", address);
                 zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, "");
             } else if (newState == ConnectionState.SUSPENDED) {logger.warn("master : {} connection SUSPENDED", address);
                 zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, "");
             }
         });
     // 5. 定时上报 zk 状态
     int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval();
     HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
                                                     masterConfig.getMasterMaxCpuloadAvg(),
                                                     masterConfig.getMasterReservedMemory(),
                                                     Sets.newHashSet(getMasterPath()),
                                                     Constants.MASTER_TYPE,
                                                     zookeeperRegistryCenter);
 
     this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
     logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);
 }

在状态上报实现后,zk 节点状态如下图,data 为本机状态等信息, 用逗号隔开.

worker 注册

org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry#registry

 public void registry() {
     // 1. 获取本机地址
     String address = NetUtils.getAddr(workerConfig.getListenPort());
     // 2. 获取 zk 门路, 须要依据配置文件的 worker group 注册
     Set<String> workerZkPaths = getWorkerZkPaths();
     int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval();
     
     for (String workerZKPath : workerZkPaths) {
         // 3. 创立长期节点
         zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, "");
         // 4. 注册连贯状态监听器
         zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener((client,newState) -> {if (newState == ConnectionState.LOST) {logger.error("worker : {} connection lost from zookeeper", address);
                 } else if (newState == ConnectionState.RECONNECTED) {logger.info("worker : {} reconnected to zookeeper", address);
                     zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, "");
                 } else if (newState == ConnectionState.SUSPENDED) {logger.warn("worker : {} connection SUSPENDED", address);
                     zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, "");
                 }
             });
         logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath);
     }
     // 5. 定时上报 zk 状态
     HeartBeatTask heartBeatTask = new HeartBeatTask(this.startTime,
                                                     this.workerConfig.getWorkerMaxCpuloadAvg(),
                                                     this.workerConfig.getWorkerReservedMemory(),
                                                     workerZkPaths,
                                                     Constants.WORKER_TYPE,
                                                     this.zookeeperRegistryCenter);
 
     this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
     logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval);
 }

在状态上报实现后,zk 节点状态如下图:

可见在 default 组下注册胜利.

服务发现

服务注册实现之后, 即可通过 zk 注册核心做到服务发现. 通过查看 ServerNodeManager 代码能够得悉,ds 的服务发现是 master 负责的. 其通过继承 spring 的 InitializingBean 在启动时执行以下代码

 @Override
 public void afterPropertiesSet() throws Exception {
     /**
      * 从 zk 中拿到 master 和 worker 节点
      */
     load();
     /**
      * 定时同步表 t_ds_worker_group 的 work group
      */
     executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor"));
     executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS);
     /**
      * 注册 master 节点变更监听器
      */
     registryCenter.getRegisterOperator().addListener(new MasterNodeListener());
     /**
      * 注册 worker 节点变更监听器
      */
     registryCenter.getRegisterOperator().addListener(new WorkerGroupNodeListener());
 }

查看 WorkerNodeInfoAndGroupDbSyncTask 代码,t_ds_worker_group 表的工作组, 应该不能够和配置文件的工作组重名. 在有节点新增和删除时,zk 回调监听器, 而后同步批改本地缓存, 为 ExecutorDispatcher 散发工作时应用.

调度流程

MasterSchedulerService

该线程在 master 节点启动时启动, 循环调用 scheduleProcess 办法.

scheduleProcess

  1. 通过 zk 获取分布式锁
  2. 从 t_ds_command 表上拉取一条命令
  3. 解决该命令失去工作实例

    1. 结构工作实例
    2. 若工作实例为空, 插入谬误命令表 t_ds_error_command, 并删除该命令
    3. 若线程池数量有余, 设置为期待线程状态
    4. 保留工作实例并删除该命令
  4. 线程池执行工作实例

command 有很多种类型, 定时触发的起源能够查看 ProcessScheduleJob 类, 其继承 Job, 每次定时工作触发就会回调 execute 办法向 command 插入数据.

MasterExecThread

每个工作实例都会有一个该线程进行执行监控, 一般工作调用 executeProcess 办法执行

prepareProcess

  1. 构建 dag
  2. 初始化各个状态的工作节点队列

    1. readyToSubmitTaskQueue 筹备好去提交的工作节点队列
    2. activeTaskNode 运行中的工作节点
    3. dependFailedTask 依赖节点失败

    runProcess

  3. 提交没有依赖的工作节点到 readyToSubmitTaskQueue 队列
  4. 循环判断是否有实现的工作节点, 并提交后继的工作节点
  5. 从 readyToSubmitTaskQueue 提交工作节点执行

    endProcess

    保留工作实例, 如果是期待线程状态就创立一个复原期待线程命令, 最初发送告警

    MasterTaskExecThread

    在 MasterExecThread 循环中, 会调用 submitStandByTask 办法将 readyToSubmitTaskQueue 队列的工作节点提交到 activeTaskNode 中, 提交工作节点的办法为 submitTaskExec. 其中一般工作创立了一个MasterTaskExecThread 线程. 每个工作节点都会有一个该线程来负责执行和监控状态.

    submit

    首先提交到数据库, 再通过 dispatchTask 办法散发工作到 worker. 循环提交尽量确保两者都实现

    dispatchTask

    构建 TaskPriority 对象并放入 TaskPriorityQueueImpl 队列, 其中外部是一个线程平安的阻塞优先级队列 PriorityBlockingQueue

    waitTaskQuit

    循环判断工作节点是否勾销, 暂停, 实现, 超时. 其中勾销会向 worker 节点发送 TASK_KILL_REQUEST 命令

    TaskPriorityQueueConsumer

    在 dispatchTask 办法中, 工作被放入了一个工作优先级队列中, 之后由 TaskPriorityQueueConsumer 线程来生产, 该线程也是一直循环, 从队列中获取工作, 而后进行散发

    dispatch

    构建执行上下文, 而后由 ExecutorDispatcher 散发

    ExecutorDispatcherdispatch

    首先获取 ExecutorManager, 目前只有一个 NettyExecutorManager 实现类. 通过 HostManager 筛选出一个可执行的 worker, 默认会通过 ServerNodeManager 筛选出 worker group 下的 worker 节点

    NettyExecutorManager

    作为一个 netty 客户端向 worker 节点发送音讯, 其中对每个节点会重试 3 次, 若失败则会向其余同组节点重试

    工作执行的通信流程

    下面貌似梳理了很多, 但实际上才实现了工作开始执行时,master 节点向 worker 节点发送的第一条音讯 TASK_EXECUTE_REQUEST 的过程, 而且过程中也省略了很多看不懂的代码. 下图是工作节点执行过程中整个的 master 和 worker 的通信交互流程.

ds 的 netty 封装

因为工作发送给 worker 当前, 代码就执行到其余的节点上了, 为了串联代码的调用流程, 这里记录一下 dolphinscheduler 对 netty 的封装.

NettyRemotingServer

在构造方法中, 依据入参配置, 初始化了 boss 和 work 两个 EventLoopGroup, 而后在 start 办法中, 设置 tcp 参数和 ChannelPipeline, 最初阻塞监听端口. 其中业务逻辑都放在 ChannelPipeline 中.

initNettyChannel1

private void initNettyChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast("encoder", encoder);
    pipeline.addLast("decoder", new NettyDecoder());
    pipeline.addLast("handler", serverHandler);
}

首先增加的是编解码器, 因为 tcp 传输的对象都是字节流, 须要编解码器来负责字节流和 java 对象的互相转换. 其中能够看到, 编码器是共享的, 解码器是每个管道都从新 new 的, 这是因为在解码时因为 tcp 的粘包和拆包, 须要缓存没有被解码的字节.

NettyServerHandler

首先先看一下 userEventTriggered 办法, 这里解决了 IdleStateEvent 事件, 然而方才在 initNettyChannel 办法中是没有看到增加了 IdleStateHandler 的. 在 dev 的最新代码上曾经修复了.

processReceived

在承受并解码实现时, 会回调该办法. 音讯体解码成 Command 对象之后, 依据 type 找到通过 registerProcessor 办法注册的处理器 NettyRequestProcessor 对象和解决的线程池. 耗时申请是不容许在 io 线程上执行的, 会阻塞导致无奈解决新的音讯, 所以这里应用线程池异步解决.

NettyRemotingClient

同样在构造方法中依据入参配置初始化 boss 和 work 两个 EventLoopGroup, 在 start 中设置 tcp 配置和初始化 ChannelPipeline. 此外多了一个 responseFutureExecutor 线程池解决超时的 ResponseFuture.

sendAsync

异步发送音讯, 应用 asyncSemaphore 信号量管制了并发申请数量,opaque 作为音讯 id 对应申请和响应. 在 netty 发送音讯的回调中判断发送是否胜利.

sendSync

同步发送音讯, 返回参为近程服务返回的响应.

ResponseFuture

因为 worker 和 master 之间的通信不是同步申请, 因而 ds 设计了这个类用来异步转同步, 和申请超时管制. 其在构造方法中会放入全局的 FUTURE_TABLE 中被 responseFutureExecutor 线程监控是否超时.

waitResponse

阻塞期待响应, 或者超时.latch 在 putResponse 中被开释.

putResponse

放入响应, 并去除全局的缓存.

NettyClientHandler

NettyRequestProcessor

这是具体业务逻辑的接口, 负责解决不同的 Command 类型.

工作执行

在 worker 节点启动的时候能够看到, 接管到 TASK_EXECUTE_REQUEST 音讯时, 注册的处理器是 TaskExecuteProcessor

TaskExecuteProcessor

  1. 创立日志和执行文件夹
  2. 发送 TASK_EXECUTE_ACK 给 master
  3. 交给线程池 workerExecService 执行

    TaskExecuteThread

    工作节点的执行线程

  4. 设置超时
  5. 通过 TaskManager 依据类型取得工作对象
  6. 顺次调用工作对象的 init,handle,after
  7. 发送 TASK_EXECUTE_RESPONSE 把执行后果告知 master

    TaskResponseService

    master 在承受到 TASK_EXECUTE_ACK 和 TASK_EXECUTE_RESPONSE 之后, 都会把音讯放到该类的 eventQueue 中, 外部启动了一个线程负责长久化到数据库. 而后发送 DB_TASK_ACK 和 DB_TASK_RESPONSE 音讯到 worker, 让 worker 去除重试的缓存.
    因为数据库中状态曾经批改, 那么 MasterTaskExecThread 的 waitTaskQuit 会退出循环. 而后 MasterExecThread 的 runProcess 在遍历 activeTaskNode 时,future.isDone()的判断会通过, 最终通过 submitPostNode 办法提交上面的工作节点.

容错

ZKMasterClient

dataChanged

通过 zk 的监听回调解决节点挂掉的容错. 在该办法内依据类型别离解决 master 和 worker 节点挂掉的容错.

removeZKNodePath

通过获取 zk 锁避免多个节点容错.

failoverMaster

master 容错. 通过查表找到挂掉的 master 节点在解决的 t_ds_process_instance. 对每个 ProcessInstance 去除 host, 并创立一个容错复原的 command. 最终调度线程会解决该命令.

failoverWorker

worker 容错. 找到所有挂掉的 worker 节点的 host 正在解决的 TaskInstance, 而后遍历批改状态为须要容错. 对照 MasterTaskExecThread 线程, 发现在 typeIsFinished 判断之后跳出循环, 最终 MasterExecThread 线程也会感知到并从新提交给其余 worker.

网络抖动

因为”网络抖动”可能会使得节点短时间内失去和 ZooKeeper 的心跳,从而产生节点的 remove 事件。对于这种状况,咱们应用最简略的形式,那就是节点一旦和 ZooKeeper 产生超时连贯,则间接将 Master 或 Worker 服务停掉。

这段官网阐明, 对应代码中很多循环都会判断 Stopper 是否在运行. 在 HeartBeatTask 中判断节点是否为挂掉的节点, 若是调用 stop.

改良

工作运行时须要很多线程配合, 每个工作实例须要一个 MasterExecThread 线程, 外部 taskExecService 线程池为 20 个线程(默认, 当然不肯定会启动这么多). 单机可调度的数量会受到线程数量的限度.amee 之前也是对每个工作都须要有一个线程来监控, 前面批改为应用工夫轮全局同一监控. 在 MasterSchedulerService 中 masterExecService 线程池数量默认为 100, 阐明默认状况下单机 master 只能同时调度 100 个工作实例.

很多状态是通过数据库来同步的, 因而须要一直的在各处查询数据库, 而全局保护的缓存此时曾经失去了意义. 不仅对数据库产生了压力, 代码个人感觉也比拟乱. 有一个全局对立的模型, 各处批改都对该模型进行批改或者更好.api 节点对数据库的批改 (例如暂停) 须要告诉到 master.

劣势

可视化, 这是 ds 相较而言最大的长处.dag 的创立, 各种工作参数的设置, 数据源, 文件资源, 运行时的状态监控等等都能够在页面上直观的看到. 不过如果是二次开发成咱们本人的产品, 页面设置和展现必定是本人做的.

反对程度扩大, 容错,HA, 非常适合于容器化部署(外部应用 ip 来标识, 这里在 k8s 环境会有问题)

深度反对大数据环境的各种工作, 对大数据场景来说能够做到真正意义上的开箱即用.

正文完
 0