基于1.3.6, 仅集体了解,欢送斧正.

架构

master

启动类为org.apache.dolphinscheduler.server.master.MasterServer,通过spring注解@PostConstruct启动run办法.
master节点在启动时,次要做了以下4个事:

  1. 通过netty监听端口,与worker节点通信
  2. 在注册核心(zk)上注册本人
  3. 启动任务调度线程
  4. 启动quartz

其中quartz是一个定时工作的组件,能够通过数据库做集群.

worker

worker节点在启动时,次要做了以下:

  1. 监听端口,和master通信
  2. 通过注册核心注册
  3. 启动工作执行线程
  4. 启动工作ack和后果上报重试线程RetryReportTaskStatusThread

master和worker节点都会监听端口,是因为单方都会作为客户端被动发送音讯给对方.worker节点可查看org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService#getRemoteChannel(int)代码,在原有连贯不可用时会通过注册核心,找到原有的master的ip和监听端口,而后发动新的连贯申请.master节点可查看NettyExecutorManager,作为客户端向worker发动申请.

logger

Logger节点目前仅仅是通过netty监听了一个端口,承受对日志文件的读取申请,次要逻辑都在org.apache.dolphinscheduler.server.log.LoggerRequestProcessor中.目前日志文件都是写到worker节点的本地文件,因而logger节点必须和worker节点一对一部署在一起.日志写到本地文件,对容器部署不是很敌对,如果要长久化,势必须要通过长久化存储.个人感觉没什么必要特地的抽出这么一个节点,性能特地简略,又必须和worker节点一一部署在一起,齐全能够合并入worker节点中,或者是为了后续扩大吧.

api

api节点是个web利用,提供controller接口为前端提供服务,次要就是crud.

alter

定时拉取数据库,发送告警信息,该节点利用只有部署一个就能够了.

服务注册与发现

master和worker节点在启动时都会作为服务端通过netty监听端口,只有客户端晓得服务端ip和该端口,即可通过向其发动连贯进行通信.

master注册

org.apache.dolphinscheduler.server.master.registry.MasterRegistry#registry

public void registry() {     // 1.获取本机地址     String address = NetUtils.getAddr(masterConfig.getListenPort());     // 2.master注册门路     String localNodePath = getMasterPath();     // 3.创立长期节点     zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, "");     // 4.注册连贯状态监听器     zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener(         (client, newState) -> {             if (newState == ConnectionState.LOST) {                 logger.error("master : {} connection lost from zookeeper", address);             } else if (newState == ConnectionState.RECONNECTED) {                 logger.info("master : {} reconnected to zookeeper", address);                 zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, "");             } else if (newState == ConnectionState.SUSPENDED) {                 logger.warn("master : {} connection SUSPENDED ", address);                 zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, "");             }         });     // 5.定时上报zk状态     int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval();     HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,                                                     masterConfig.getMasterMaxCpuloadAvg(),                                                     masterConfig.getMasterReservedMemory(),                                                     Sets.newHashSet(getMasterPath()),                                                     Constants.MASTER_TYPE,                                                     zookeeperRegistryCenter);      this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);     logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval); }

在状态上报实现后,zk节点状态如下图,data为本机状态等信息,用逗号隔开.

worker注册

org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry#registry

 public void registry() {     // 1.获取本机地址     String address = NetUtils.getAddr(workerConfig.getListenPort());     // 2. 获取zk门路,须要依据配置文件的worker group注册     Set<String> workerZkPaths = getWorkerZkPaths();     int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval();          for (String workerZKPath : workerZkPaths) {         // 3.创立长期节点         zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, "");         // 4.注册连贯状态监听器         zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener(             (client,newState) -> {                 if (newState == ConnectionState.LOST) {                     logger.error("worker : {} connection lost from zookeeper", address);                 } else if (newState == ConnectionState.RECONNECTED) {                     logger.info("worker : {} reconnected to zookeeper", address);                     zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, "");                 } else if (newState == ConnectionState.SUSPENDED) {                     logger.warn("worker : {} connection SUSPENDED ", address);                     zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, "");                 }             });         logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath);     }     // 5.定时上报zk状态     HeartBeatTask heartBeatTask = new HeartBeatTask(this.startTime,                                                     this.workerConfig.getWorkerMaxCpuloadAvg(),                                                     this.workerConfig.getWorkerReservedMemory(),                                                     workerZkPaths,                                                     Constants.WORKER_TYPE,                                                     this.zookeeperRegistryCenter);      this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);     logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval); }

在状态上报实现后,zk节点状态如下图:

可见在default组下注册胜利.

服务发现

服务注册实现之后,即可通过zk注册核心做到服务发现.通过查看ServerNodeManager代码能够得悉,ds的服务发现是master负责的.其通过继承spring的InitializingBean在启动时执行以下代码

 @Override public void afterPropertiesSet() throws Exception {     /**      * 从zk中拿到master和worker节点      */     load();     /**      * 定时同步表t_ds_worker_group的work group      */     executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor"));     executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS);     /**      * 注册master节点变更监听器      */     registryCenter.getRegisterOperator().addListener(new MasterNodeListener());     /**      * 注册worker节点变更监听器      */     registryCenter.getRegisterOperator().addListener(new WorkerGroupNodeListener()); }

查看WorkerNodeInfoAndGroupDbSyncTask代码,t_ds_worker_group表的工作组,应该不能够和配置文件的工作组重名.在有节点新增和删除时,zk回调监听器,而后同步批改本地缓存,为ExecutorDispatcher散发工作时应用.

调度流程

MasterSchedulerService

该线程在master节点启动时启动,循环调用scheduleProcess办法.

scheduleProcess

  1. 通过zk获取分布式锁
  2. 从t_ds_command表上拉取一条命令
  3. 解决该命令失去工作实例

    1. 结构工作实例
    2. 若工作实例为空,插入谬误命令表t_ds_error_command,并删除该命令
    3. 若线程池数量有余,设置为期待线程状态
    4. 保留工作实例并删除该命令
  4. 线程池执行工作实例

command有很多种类型,定时触发的起源能够查看ProcessScheduleJob类,其继承Job,每次定时工作触发就会回调execute办法向command插入数据.

MasterExecThread

每个工作实例都会有一个该线程进行执行监控,一般工作调用executeProcess办法执行

prepareProcess

  1. 构建dag
  2. 初始化各个状态的工作节点队列

    1. readyToSubmitTaskQueue 筹备好去提交的工作节点队列
    2. activeTaskNode 运行中的工作节点
    3. dependFailedTask 依赖节点失败

    runProcess

  3. 提交没有依赖的工作节点到readyToSubmitTaskQueue队列
  4. 循环判断是否有实现的工作节点,并提交后继的工作节点
  5. 从readyToSubmitTaskQueue提交工作节点执行

    endProcess

    保留工作实例,如果是期待线程状态就创立一个复原期待线程命令,最初发送告警

    MasterTaskExecThread

    在MasterExecThread循环中,会调用submitStandByTask办法将readyToSubmitTaskQueue队列的工作节点提交到activeTaskNode中,提交工作节点的办法为submitTaskExec.其中一般工作创立了一个MasterTaskExecThread线程.每个工作节点都会有一个该线程来负责执行和监控状态.

    submit

    首先提交到数据库,再通过dispatchTask办法散发工作到worker.循环提交尽量确保两者都实现

    dispatchTask

    构建TaskPriority对象并放入TaskPriorityQueueImpl队列,其中外部是一个线程平安的阻塞优先级队列PriorityBlockingQueue

    waitTaskQuit

    循环判断工作节点是否勾销,暂停,实现,超时.其中勾销会向worker节点发送TASK_KILL_REQUEST命令

    TaskPriorityQueueConsumer

    在dispatchTask办法中,工作被放入了一个工作优先级队列中,之后由TaskPriorityQueueConsumer线程来生产,该线程也是一直循环,从队列中获取工作,而后进行散发

    dispatch

    构建执行上下文,而后由ExecutorDispatcher散发

    ExecutorDispatcherdispatch

    首先获取ExecutorManager,目前只有一个NettyExecutorManager实现类.通过HostManager筛选出一个可执行的worker,默认会通过ServerNodeManager筛选出worker group下的worker节点

    NettyExecutorManager

    作为一个netty客户端向worker节点发送音讯,其中对每个节点会重试3次,若失败则会向其余同组节点重试

    工作执行的通信流程

    下面貌似梳理了很多,但实际上才实现了工作开始执行时,master节点向worker节点发送的第一条音讯TASK_EXECUTE_REQUEST的过程,而且过程中也省略了很多看不懂的代码.下图是工作节点执行过程中整个的master和worker的通信交互流程.

ds的netty封装

因为工作发送给worker当前,代码就执行到其余的节点上了,为了串联代码的调用流程,这里记录一下dolphinscheduler对netty的封装.

NettyRemotingServer

在构造方法中,依据入参配置,初始化了boss和work两个EventLoopGroup,而后在start办法中,设置tcp参数和ChannelPipeline,最初阻塞监听端口.其中业务逻辑都放在ChannelPipeline中.

initNettyChannel1

private void initNettyChannel(SocketChannel ch) {    ChannelPipeline pipeline = ch.pipeline();    pipeline.addLast("encoder", encoder);    pipeline.addLast("decoder", new NettyDecoder());    pipeline.addLast("handler", serverHandler);}

首先增加的是编解码器,因为tcp传输的对象都是字节流,须要编解码器来负责字节流和java对象的互相转换.其中能够看到,编码器是共享的,解码器是每个管道都从新new的,这是因为在解码时因为tcp的粘包和拆包,须要缓存没有被解码的字节.

NettyServerHandler

首先先看一下userEventTriggered办法,这里解决了IdleStateEvent事件,然而方才在initNettyChannel办法中是没有看到增加了IdleStateHandler的.在dev的最新代码上曾经修复了.

processReceived

在承受并解码实现时,会回调该办法.音讯体解码成Command对象之后,依据type找到通过registerProcessor办法注册的处理器NettyRequestProcessor对象和解决的线程池.耗时申请是不容许在io线程上执行的,会阻塞导致无奈解决新的音讯,所以这里应用线程池异步解决.

NettyRemotingClient

同样在构造方法中依据入参配置初始化boss和work两个EventLoopGroup,在start中设置tcp配置和初始化ChannelPipeline.此外多了一个responseFutureExecutor线程池解决超时的ResponseFuture.

sendAsync

异步发送音讯,应用asyncSemaphore信号量管制了并发申请数量,opaque作为音讯id对应申请和响应.在netty发送音讯的回调中判断发送是否胜利.

sendSync

同步发送音讯,返回参为近程服务返回的响应.

ResponseFuture

因为worker和master之间的通信不是同步申请,因而ds设计了这个类用来异步转同步,和申请超时管制.其在构造方法中会放入全局的FUTURE_TABLE中被responseFutureExecutor线程监控是否超时.

waitResponse

阻塞期待响应,或者超时.latch在putResponse中被开释.

putResponse

放入响应,并去除全局的缓存.

NettyClientHandler

NettyRequestProcessor

这是具体业务逻辑的接口,负责解决不同的Command类型.

工作执行

在worker节点启动的时候能够看到,接管到TASK_EXECUTE_REQUEST音讯时,注册的处理器是TaskExecuteProcessor

TaskExecuteProcessor

  1. 创立日志和执行文件夹
  2. 发送TASK_EXECUTE_ACK给master
  3. 交给线程池workerExecService执行

    TaskExecuteThread

    工作节点的执行线程

  4. 设置超时
  5. 通过TaskManager依据类型取得工作对象
  6. 顺次调用工作对象的init,handle,after
  7. 发送TASK_EXECUTE_RESPONSE把执行后果告知master

    TaskResponseService

    master在承受到TASK_EXECUTE_ACK和TASK_EXECUTE_RESPONSE之后,都会把音讯放到该类的eventQueue中,外部启动了一个线程负责长久化到数据库.而后发送DB_TASK_ACK和DB_TASK_RESPONSE音讯到worker,让worker去除重试的缓存.
    因为数据库中状态曾经批改,那么MasterTaskExecThread的waitTaskQuit会退出循环.而后MasterExecThread的runProcess在遍历activeTaskNode时,future.isDone()的判断会通过,最终通过submitPostNode办法提交上面的工作节点.

容错

ZKMasterClient

dataChanged

通过zk的监听回调解决节点挂掉的容错.在该办法内依据类型别离解决master和worker节点挂掉的容错.

removeZKNodePath

通过获取zk锁避免多个节点容错.

failoverMaster

master容错.通过查表找到挂掉的master节点在解决的t_ds_process_instance.对每个ProcessInstance去除host,并创立一个容错复原的command.最终调度线程会解决该命令.

failoverWorker

worker容错.找到所有挂掉的worker节点的host正在解决的TaskInstance,而后遍历批改状态为须要容错.对照MasterTaskExecThread线程,发现在typeIsFinished判断之后跳出循环,最终MasterExecThread线程也会感知到并从新提交给其余worker.

网络抖动

因为” 网络抖动”可能会使得节点短时间内失去和ZooKeeper的心跳,从而产生节点的remove事件。对于这种状况,咱们应用最简略的形式,那就是节点一旦和ZooKeeper产生超时连贯,则间接将Master或Worker服务停掉。

这段官网阐明,对应代码中很多循环都会判断Stopper是否在运行.在HeartBeatTask中判断节点是否为挂掉的节点,若是调用stop.

改良

工作运行时须要很多线程配合,每个工作实例须要一个MasterExecThread线程,外部taskExecService线程池为20个线程(默认,当然不肯定会启动这么多).单机可调度的数量会受到线程数量的限度.amee之前也是对每个工作都须要有一个线程来监控,前面批改为应用工夫轮全局同一监控.在MasterSchedulerService中masterExecService线程池数量默认为100,阐明默认状况下单机master只能同时调度100个工作实例.

很多状态是通过数据库来同步的,因而须要一直的在各处查询数据库,而全局保护的缓存此时曾经失去了意义.不仅对数据库产生了压力,代码个人感觉也比拟乱.有一个全局对立的模型,各处批改都对该模型进行批改或者更好.api节点对数据库的批改(例如暂停)须要告诉到master.

劣势

可视化,这是ds相较而言最大的长处.dag的创立,各种工作参数的设置,数据源,文件资源,运行时的状态监控等等都能够在页面上直观的看到.不过如果是二次开发成咱们本人的产品,页面设置和展现必定是本人做的.

反对程度扩大,容错,HA,非常适合于容器化部署(外部应用ip来标识,这里在k8s环境会有问题)

深度反对大数据环境的各种工作,对大数据场景来说能够做到真正意义上的开箱即用.