关于大数据:达人专栏-还不会用-Apache-Dolphinscheduler大佬用时一个月写出的最全入门教程三

55次阅读

共计 6968 个字符,预计需要花费 18 分钟才能阅读完成。

作者 | 欧阳涛 招联金融大数据开发工程师

02 Master 启动流程

2.10 WorkFlowExecutorThread 里执行 Submit StandByTask 办法

SubmitStandByTask 干了 5 件事件:

  1. 从 ReadyToSubmitTaskQueue 中取出 TaskInstance。
  2. (这个 TaskInstance 是能够重试并且设定为强制胜利了的)把 task 放到 completeTaskMap 以及 taskInstanceMap,并从队列中移除。
  3. 如果这个 task 是首次执行的话,就会先从 task 和 ProcessInstance 中获取参数(varPool)【这一步的办法是 GetPreVarPool】
  4. 获取这个 task 依赖后果【这一步的办法是 GetDependResultForTask】
  5. 依据第 4 步获取的依赖后果,如果依赖后果为失败或者不执行,就从队列中移除,并且放到 FailedTaskMap 里的。如果依赖后果为胜利则将执行 SubmitTaskExec 办法,同时会放到 CompleteTaskMap。至于 SubmitTaskExec 做了哪些事件将在 2.11 中阐明。

2.11 WorkFlowExecutorThread 里执行 SubmitTaskExec 办法

SumbitTaskExec 干了 9 件事件:

  1. PackageTaskInstance 封装了 TaskInstance,就是将 TaskInstance 和 ProcessInstance 进行了绑定,并且获取到了 MainJar,ResourceList 这些信息。
  2. 依据 TaskType 获取 CommonTaskProcessor,这里采纳 SPI 机制获取。如果想具体理解 SPI 机制的,能够百度搜寻 AutoService 注解以及 ServiceLoad 进行具体理解。
  3. CommonTaskProcessor 初始化,也就是将 TaskInstance、ProcessInstance、ProcessService、MasterConfig 传递给 CommonTaskProcessor。
  4. 告诉流程所在的主机,通过 netty 发送 Host 和 HostUpdateCommand。
  5. 将 CommonTaskProcessor 的 Action 为 submit(提交)状态。(这步极为重要)
  6. 放入到 ValidMap,TaskInstanceMap,ActiveTaskProcessorMaps 里。
  7. 将 CommonTaskProcessor 的 Action 设置成 Run 状态的。
  8. 将 task 以及 ProcessInstance 放入到 StateWheelExecuteThread 进行 checkout。
  9. 如果这个 task 执行实现就增加到 StateEvents 队列中。

下一节讲述 commonTaskProcessor 的 submit 状态。

2.12 CommonTaskProcessor 里执行 Submit Task 办法

回顾一下上节的第 5 步,CommonTaskProcessor 的 Action 设置为 Submit 之后,

去 ComonTaskProcessor 的父类 BaseTaskProcessor 找 Action 办法,在 Action 办法中有个 Switch 构造,很显著会进入 Submit 办法,之后就进入本节所说的 SubmitTask 办法的了。

SubmitTask 在这里干了三件事件:

  1. ProcessService。SubmitTaskWithRetry 能够反复 5 次 (MasterConfig.GetTask CommitInterval) 提交 task 工作,最初在 ProcessServiceImpl 执行 submitTask。
  2. 将此 task 的信息插入到 TaskGroupQueue 数据表中。
  3. DispatchTask 下发工作,将 Task 工作下发到实现了 TaskPriorityQueue 接口的 TaskPriorityQueueImpl 中去。

在 ProcessServiceImpl 如何执行 submitTask 将在 2.13 中阐明,同时 DispatchTask 下发做了那些事件,将在 2.15 中阐明。

2.13 ProcessServiceImpl 里执行 SubmitTask 办法

ProcessServiceImpl 是属于 Service 模块的,SubmitTask 次要干了 2 件事件:

  1. SubmitTaskInstanceToDB 将工作实例保留到数据库中,当然这外面有数据结构 (TaskInstance) 的变动,纯属业务的扭转的。
  2. 如果此非完结状态,CreateSubWorkerProcess 创立子流程,如果没有子流程,间接跳过 2.14 的内容。进入 2.15。创立子流程做了哪些事件将在 2.14 中阐明。

2.14 ProcessServiceImpl 里执行 CreateSubWork Process 办法

创立子流程须要干 6 件事件:

  1. FindWorkProcessMapByParent 查找父流程与 task 绑定的 ProcessInstanceMap,是流程实例与 Task 关系的表。
  2. SetProcessInstanceMap。设置刚刚查找的 ProcessInstanceMap,如果能找到以前跑的 ProcessInstanceMap,更新这个 ProcessInstanceMap,如果没有找到就创立新的 ProcessInstanceMap,并插入到数据库中。
  3. CreateSubProcessCommand,依据参数,父流程等创立子流程命令的(SubProcessCommand)。
  4. UpdateSubProcessDefinitionByParent 依据父流程更新子流程的定义。
  5. InitSubInstanceState 初始化子实例状态。
  6. CreateCommand 将创立的子流程命令插入数据库中。

这里 ProcessInstanceMap 并不是 jdk 包下的 map,而是表 t\_ds\_relation\_process\_instance 的数据的。外面存储了父流程实例以及工作的关系的。3 到 6 这些步骤都是 crud 的业务,外面具体的细节就赘述了。

2.15 CommonTaskProcessor 里执行 Dispatch Task 办法

DispatchTask 办法干了三件事件:

  1. 获取 TaskPriorityQueueImpl 的 bean。
  2. 将 TaskInstance,ProcessInstance 封装成 TaskPriority。
  3. 将封装后的 TaskPriority 放到这 bean 下的 queue 中去,这个队列是 jdk 的 PriorityBlockingQueue,是一个具备优先级别的无界阻塞队列。

此时将 DispatchTask 放进 task,那如何生产队列中的 task 的呢?2.16 将阐明这个议题。

2.16 TaskPriorityQueueConsumer 执行 run 和 dispatchTask 办法

TaskPriorityQueueConsumer 是一个继承 Thread 的类。在 MasterServer 启动之后,依据 Spring 的个性,TaskPriorityQueueConsumer 会创立一个对象由 Spring 治理。TaskPriority 会执行 init 的办法。线程启动并且设置线程名字 Task UpdateConsumerThread。

Run 办法中以 3(MasterConfig.getMasterDispatch Task) 次拉取为循环,每次 1 秒从队列中 (BatchDispatch) 拉取 TaskPriority,如果失败就有从新丢回到这队列中去。

随后对拉取的数据进行 DispatchTask 办法。

DispatchTask 办法中做了三件事件:

  1. 从 TaskPriority 中取出 context,依据 Command,ExecutorType 和 Workergroup 封装成 Execution Context。
  2. 将 ExecutionContext 交给 Executor Dispatcher 进行 Dispatcher,这将在 2.17 中阐明。
  3. 如果发送胜利,返回 result 为 true。将 TaskEvent 增加到 TaskEventService (addEvents)中,由 TaskEventService 进行治理的。TaskEventService 的阐明将在 2.19 中介绍。

2.17 ExecutorDispatcher 里执行 Dispatch 办法

ExecutorDispatcher 这个类就干了三件事件:

  1. ExecutorDispatcher 此类实现了 InitializingBean。也就是创立过程中执行了 AfterPropertiesSet 办法,ExecutorManagers 注册了 Worker 和 Client 的 ExecutorType。
  2. Dispatch 办法中获取到了 Worker 的 ExecutorType,而后进行 HostManagar.select。在 Select 办法中会依据 MasterConfig 中的 Host-selector 策略抉择机器,默认是 Lower-weight。如果读者有自定义的需要,则能够实现 HostManager 接口的。(Lower-weight 如何抉择的,就不具体介绍了。因为难度并不大,也就是纯属业务的变动的,有趣味就能够自行浏览的。)
  3. 抉择完了 Host 之后,调用 ExecutorManager 进行 execute。这里的 EeforeExecute 和 AfterExecute 是没有内容的,如果读者有需要,同样能够在此增加内容的。在 2.18 中会阐明 execute 的内容。

2.18 NettyExecutorManager 执行 execute 和 doExecute 办法

ExecutorManager 目前就一个实现类,就是 NettyExecutorManger。

在 init 办法中 NettyRemotingClient 注册了 TaskExecuteResponse、TaskExecuteAck 和

TaskKillResponse 的 Processor。这些 Processor 是用来让 Master 和 Worker 进行交互的。

在 Executor 办法中最外围的办法就是 DoExecute。

在 DoExecute 中 NettyRemotingClient 依据无效的 Host 发送 Command。如果发送失败了,剔除失败节点,将 task 从新增加到队列中。

至此,Master 就以 Command 模式发送 task 信息给 Worker,阐明一下,此时的 Command 是 Remote 包下的 Command,与后面的 Command 没有任何关系的,不要混同了。Master 和 Worker 的交互过程会在第四章节中讲述。

2.19 TaskEventService 执行 addEvents 办法

先说说 TaskEventService 创立过程。这是由 Spring 治理的,而后执行 Start 办法之后,有两个线程创立进去,一个是 TaskEventThread,另外一个是 TaskEventHandlerThread。在 TaskEventThread 会从 EventQueue 中取出 TaskEvent 事件进行提交(submitTaskEvent)。而 TaskEventHandlerThread 会执行 EventHandler 办法。EventHandler 中会从 TaskExecuteThreadMap 中取出数据来执行 executeEvent 办法。

那么 TaskExecuteThreadMap 如何插入数据的呢? 答案就是本节所说的 addEvents 办法。

addEvents 办法中会调用 TaskExecuteThreadPool 中的 SubmitTaskEvent 办法。而在 SubmitTaskEvent 办法中最外围的性能就是往 TaskExecuteThreadMap 放入数据,也就是以 ProcessInstanceId 为 key,TaskExecuteThread 为 value 的 map,并且会调用 TaskExecuteThrad 的 addEvent 办法,将 event 放入到 events 队列中。

至于 TaskExecuteThread 做了哪些事件将在 2.20 中阐明。

2.20 TaskExecuteThread 执行 Persist 办法

接上文 2.19 的在 TaskExecuteThreadPool 中 ExecuteEvent 办法。

执行 TaskExecuteThread 中的 run 办法。在 run 办法中从 events 队列中取出 TaskEvent,并执行 Persist 长久化操作的,将 task 信息保留到数据库中的。

在 Persist 办法中,重点是 Switch 构造下的内容。依据 DISPATCH,RUNNING,RESULT,执行不同的办法,封装不同的 TaskInstance 内容保留到数据库中,并发送申请给 Worker。

另外构建 StateEvent 对象, 交给 WorkerflowExecuteThreadPool 进行解决长久化后的 StateEvent 对象。stateEvent 应该如何解决呢?请参考 2.22 的内容。

2.21 MasterSchedulerService 总结

MasterServer 的 MasterSchedulerService 曾经根本讲完。回到最开始的 MasterServer 这部分,发现 MasterSchedulerService 前面的两个 bean 没有讲,也就是 EventExecuteService 以及 FailoverExecute Thread. 这两个都是线程的,将在 2.22 和 2.23 中阐明这最初两个 bean。

2.22 EventExecuteService 线程的 run 办法

在 MasterServer 调用 Start 办法后,EventExeuctor Service 的 run 办法执行过程如下:

  1. 每 100 毫秒执行 EventHandler 办法。
  2. 每次执行 EventHandler 办法时,从 2.5 章节的第 3 步 ProcessInstance ExecCacheManager 中取出 WorkFlowExecutorThread,通过 WorkflowExecuteThreadPool 执行 ExecuteEvent 办法。
  3. 在 ExecuteEvent 办法中,能够发现最外围的办法就是 HandlerEvents 办法。
  4. 在 HandlerEvents 中能够发现,从 2.11 章节的第 9 步的 StateEvents 队列取出 StateEvent,而后在通过 StateEventHandler 办法进行判断的。
  5. 在 WorkflowExecutorThread 的 stateEventHandler 办法中,依据 StateEventType 的不同,以有 6 种不同类型的办法去调用,别离为 PROCESS\_STATE \_CHANGE、TASK\_STATE\_CHANGE、PROCESS\_TIMEOUT、TASK\_TIMEOUT、TASK\_RETRY、PROCESS\_BLOCKED。通过不同的 type 调用不同的办法,如 PROCESS\_STATE\_CHANGE 调用 ProcessStateChangeHandler 办法,这里就不具体讲述各个办法的内容了,其本质上也都是内存数据结构的变动。

P.S.:

  1. 如果 StateEventHandler 办法中某一类型胜利执行,则从 StateEvents 队列中移除它了。
  2. 返回到 WorkflowExecuteThreadPool 类的 ExecuteEvent 办法中,执行完第 3 步之后,会有个回调函数,失败就执行 OnFailure 办法。胜利就执行 OnSuccess 办法,NotifyProcessChanged 告诉流程扭转中,要么 NotifyMyself,要么告诉其余流程 NotifyProcessChanged 的。

2.23 FailoverExecutorThread 线程的 Run 办法

此节为机器故障切换执行的线程,次要干了 5 件事件。具体执行流程如下:

  1. Run 办法中 FailoverService.checkMaster Failover 查看是否须要切换的 host。
  2. 如果有 host 的话,就进入 FailoveMaster WithLock 办法。在此办法中,从 zk 中通过分布式锁来进行切换机器,也就是进入 FailoverMaster 办法。
  3. 在 FailoverMaster 中,从 ProcessSerivce 里 (QueryNeedFailover ProcessInstance) 查问所须要切换的流程实例(NeedFailover ProcessInstanceList)。
  4. 接下来,就是通过 zk 获取无效的 WorkerServers.failoverTaskInstance 来切换 task。在切换 task 时有三个步骤,别离是: 当是 Yarnjobs 时,则间接杀掉 ; 扭转 task 的状态,也就是从 Running 到 Needfailover ; WorkflowExecutor ThreadPool 提交 StateEvent。
  5. 在 ProcessService 中解决该切换的流程,减少切换流程实例的 Command,插入数据库中。

下两章将持续讲述 Worker 和 Master 与 Worker 的交互。

正文完
 0