作者 | 欧阳涛 招联金融大数据开发工程师
02 Master启动流程
2.10 WorkFlowExecutorThread 里执行 Submit StandByTask 办法
SubmitStandByTask干了5件事件:
- 从ReadyToSubmitTaskQueue中取出TaskInstance。
- (这个TaskInstance是能够重试并且设定为强制胜利了的)把task放到completeTaskMap以及taskInstanceMap,并从队列中移除。
- 如果这个task是首次执行的话,就会先从task和ProcessInstance中获取参数(varPool)【这一步的办法是GetPreVarPool】
- 获取这个task依赖后果【这一步的办法是GetDependResultForTask】
- 依据第4步获取的依赖后果,如果依赖后果为失败或者不执行,就从队列中移除,并且放到FailedTaskMap里的。如果依赖后果为胜利则将执行SubmitTaskExec办法,同时会放到CompleteTaskMap。至于SubmitTaskExec做了哪些事件将在2.11中阐明。
2.11 WorkFlowExecutorThread里执行SubmitTaskExec办法
SumbitTaskExec干了9件事件:
- PackageTaskInstance封装了TaskInstance,就是将TaskInstance和ProcessInstance进行了绑定,并且获取到了MainJar,ResourceList这些信息。
- 依据TaskType获取CommonTaskProcessor,这里采纳SPI机制获取。如果想具体理解SPI机制的,能够百度搜寻AutoService注解以及ServiceLoad进行具体理解。
- CommonTaskProcessor初始化,也就是将TaskInstance、ProcessInstance、ProcessService、MasterConfig传递给CommonTaskProcessor。
- 告诉流程所在的主机,通过netty发送Host和HostUpdateCommand。
- 将CommonTaskProcessor的Action为submit(提交)状态。(这步极为重要)
- 放入到ValidMap,TaskInstanceMap,ActiveTaskProcessorMaps里。
- 将CommonTaskProcessor的Action设置成Run状态的。
- 将task以及ProcessInstance放入到StateWheelExecuteThread进行checkout。
- 如果这个task执行实现就增加到StateEvents队列中。
下一节讲述commonTaskProcessor的submit状态。
2.12 CommonTaskProcessor里执行Submit Task办法
回顾一下上节的第5步,CommonTaskProcessor的Action设置为Submit之后,
去ComonTaskProcessor的父类BaseTaskProcessor找Action办法,在Action办法中有个Switch构造,很显著会进入Submit办法,之后就进入本节所说的SubmitTask办法的了。
SubmitTask在这里干了三件事件:
- ProcessService。SubmitTaskWithRetry能够反复5次(MasterConfig.GetTask CommitInterval)提交task工作,最初在ProcessServiceImpl执行submitTask。
- 将此task的信息插入到TaskGroupQueue数据表中。
- DispatchTask下发工作,将Task工作下发到实现了TaskPriorityQueue接口的TaskPriorityQueueImpl中去。
在ProcessServiceImpl如何执行submitTask将在2.13中阐明,同时DispatchTask下发做了那些事件,将在2.15中阐明。
2.13 ProcessServiceImpl里执行SubmitTask办法
ProcessServiceImpl是属于Service模块的,SubmitTask次要干了2件事件:
- SubmitTaskInstanceToDB 将工作实例保留到数据库中,当然这外面有数据结构(TaskInstance)的变动,纯属业务的扭转的。
- 如果此非完结状态,CreateSubWorkerProcess创立子流程,如果没有子流程,间接跳过2.14的内容。进入2.15。创立子流程做了哪些事件将在2.14中阐明。
2.14 ProcessServiceImpl里执行CreateSubWork Process办法
创立子流程须要干6件事件:
- FindWorkProcessMapByParent查找父流程与task绑定的ProcessInstanceMap,是流程实例与Task关系的表。
- SetProcessInstanceMap。设置刚刚查找的ProcessInstanceMap,如果能找到以前跑的ProcessInstanceMap,更新这个ProcessInstanceMap,如果没有找到就创立新的ProcessInstanceMap,并插入到数据库中。
- CreateSubProcessCommand,依据参数,父流程等创立子流程命令的(SubProcessCommand)。
- UpdateSubProcessDefinitionByParent依据父流程更新子流程的定义。
- InitSubInstanceState初始化子实例状态。
- CreateCommand将创立的子流程命令插入数据库中。
这里ProcessInstanceMap并不是jdk包下的map,而是表t\_ds\_relation\_process\_instance的数据的。外面存储了父流程实例以及工作的关系的。3到6这些步骤都是crud的业务,外面具体的细节就赘述了。
2.15 CommonTaskProcessor里执行Dispatch Task办法
DispatchTask办法干了三件事件:
- 获取TaskPriorityQueueImpl的bean。
- 将TaskInstance,ProcessInstance封装成TaskPriority。
- 将封装后的TaskPriority放到这bean下的queue中去,这个队列是jdk的PriorityBlockingQueue,是一个具备优先级别的无界阻塞队列。
此时将DispatchTask放进task,那如何生产队列中的task的呢?2.16将阐明这个议题。
2.16 TaskPriorityQueueConsumer执行run和dispatchTask办法
TaskPriorityQueueConsumer是一个继承Thread的类。在MasterServer启动之后,依据Spring的个性,TaskPriorityQueueConsumer会创立一个对象由Spring治理。TaskPriority会执行init的办法。线程启动并且设置线程名字Task UpdateConsumerThread。
Run办法中以3(MasterConfig.getMasterDispatch Task ) 次拉取为循环,每次1秒从队列中(BatchDispatch)拉取TaskPriority,如果失败就有从新丢回到这队列中去。
随后对拉取的数据进行DispatchTask办法。
DispatchTask办法中做了三件事件:
- 从TaskPriority中取出context,依据Command,ExecutorType和Workergroup封装成Execution Context。
- 将ExecutionContext交给Executor Dispatcher进行Dispatcher,这将在2.17中阐明。
- 如果发送胜利,返回result为true。将TaskEvent增加到TaskEventService (addEvents)中,由TaskEventService进行治理的。TaskEventService的阐明将在2.19中介绍。
2.17 ExecutorDispatcher里执行Dispatch办法
ExecutorDispatcher这个类就干了三件事件:
- ExecutorDispatcher此类实现了InitializingBean。也就是创立过程中执行了AfterPropertiesSet办法,ExecutorManagers注册了Worker和Client的ExecutorType。
- Dispatch办法中获取到了Worker的ExecutorType,而后进行HostManagar.select。在Select办法中会依据MasterConfig中的Host-selector策略抉择机器,默认是Lower-weight。如果读者有自定义的需要,则能够实现HostManager接口的。(Lower-weight如何抉择的,就不具体介绍了。因为难度并不大,也就是纯属业务的变动的,有趣味就能够自行浏览的。)
- 抉择完了Host之后,调用ExecutorManager进行execute。这里的EeforeExecute和AfterExecute是没有内容的,如果读者有需要,同样能够在此增加内容的。在2.18中会阐明execute的内容。
2.18 NettyExecutorManager执行execute和doExecute办法
ExecutorManager目前就一个实现类,就是NettyExecutorManger。
在init办法中NettyRemotingClient注册了TaskExecuteResponse、TaskExecuteAck和
TaskKillResponse的Processor。这些Processor是用来让Master和Worker进行交互的。
在Executor办法中最外围的办法就是DoExecute。
在DoExecute中NettyRemotingClient依据无效的Host发送Command。如果发送失败了,剔除失败节点,将task从新增加到队列中。
至此,Master就以Command模式发送task信息给Worker,阐明一下,此时的Command是Remote包下的Command,与后面的Command没有任何关系的,不要混同了。Master和Worker的交互过程会在第四章节中讲述。
2.19 TaskEventService执行addEvents办法
先说说TaskEventService创立过程。这是由Spring治理的,而后执行Start办法之后,有两个线程创立进去,一个是TaskEventThread,另外一个是TaskEventHandlerThread。在TaskEventThread会从EventQueue中取出TaskEvent事件进行提交(submitTaskEvent)。而TaskEventHandlerThread会执行EventHandler办法。EventHandler中会从TaskExecuteThreadMap中取出数据来执行executeEvent办法。
那么TaskExecuteThreadMap如何插入数据的呢?答案就是本节所说的addEvents办法。
addEvents办法中会调用TaskExecuteThreadPool中的SubmitTaskEvent办法。而在SubmitTaskEvent办法中最外围的性能就是往TaskExecuteThreadMap放入数据,也就是以ProcessInstanceId为key,TaskExecuteThread为value的map,并且会调用TaskExecuteThrad的addEvent办法,将event放入到events队列中。
至于TaskExecuteThread做了哪些事件将在2.20中阐明。
2.20 TaskExecuteThread执行Persist办法
接上文2.19的在TaskExecuteThreadPool中ExecuteEvent办法。
执行TaskExecuteThread中的run办法。在run办法中从events队列中取出TaskEvent,并执行Persist长久化操作的,将task信息保留到数据库中的。
在Persist办法中,重点是Switch构造下的内容。依据DISPATCH,RUNNING,RESULT,执行不同的办法,封装不同的TaskInstance内容保留到数据库中,并发送申请给Worker。
另外构建StateEvent对象,交给WorkerflowExecuteThreadPool进行解决长久化后的StateEvent对象。stateEvent应该如何解决呢?请参考2.22的内容。
2.21 MasterSchedulerService总结
MasterServer的MasterSchedulerService曾经根本讲完。回到最开始的MasterServer这部分,发现MasterSchedulerService前面的两个bean没有讲,也就是EventExecuteService以及FailoverExecute Thread.这两个都是线程的,将在2.22和2.23中阐明这最初两个bean。
2.22 EventExecuteService线程的run办法
在MasterServer调用Start办法后,EventExeuctor Service的run办法执行过程如下:
- 每100毫秒执行EventHandler办法。
- 每次执行EventHandler办法时,从2.5章节的第3步ProcessInstance ExecCacheManager中取出WorkFlowExecutorThread,通过WorkflowExecuteThreadPool执行ExecuteEvent办法。
- 在ExecuteEvent办法中, 能够发现最外围的办法就是HandlerEvents办法。
- 在HandlerEvents中能够发现,从2.11章节的第9步的StateEvents队列取出StateEvent,而后在通过StateEventHandler办法进行判断的。
- 在WorkflowExecutorThread的stateEventHandler办法中,依据StateEventType的不同,以有6种不同类型的办法去调用,别离为PROCESS\_STATE \_CHANGE、TASK\_STATE\_CHANGE、PROCESS\_TIMEOUT、TASK\_TIMEOUT、TASK\_RETRY、PROCESS\_BLOCKED。通过不同的type调用不同的办法,如PROCESS\_STATE\_CHANGE调用ProcessStateChangeHandler办法,这里就不具体讲述各个办法的内容了,其本质上也都是内存数据结构的变动。
P.S.:
- 如果StateEventHandler办法中某一类型胜利执行,则从StateEvents队列中移除它了。
- 返回到WorkflowExecuteThreadPool类的ExecuteEvent办法中,执行完第3步之后,会有个回调函数,失败就执行OnFailure办法。胜利就执行OnSuccess办法,NotifyProcessChanged告诉流程扭转中,要么NotifyMyself,要么告诉其余流程NotifyProcessChanged的。
2.23 FailoverExecutorThread线程的Run办法
此节为机器故障切换执行的线程,次要干了5件事件。具体执行流程如下:
- Run办法中FailoverService.checkMaster Failover查看是否须要切换的host。
- 如果有host的话,就进入FailoveMaster WithLock办法。在此办法中,从zk中通过分布式锁来进行切换机器,也就是进入FailoverMaster办法。
- 在FailoverMaster中,从ProcessSerivce里(QueryNeedFailover ProcessInstance)查问所须要切换的流程实例(NeedFailover ProcessInstanceList)。
- 接下来,就是通过zk获取无效的WorkerServers.failoverTaskInstance来切换task。在切换task时有三个步骤,别离是:当是Yarnjobs时,则间接杀掉 ; 扭转task的状态,也就是从Running到Needfailover ; WorkflowExecutor ThreadPool提交StateEvent。
- 在ProcessService中解决该切换的流程,减少切换流程实例的Command,插入数据库中。
下两章将持续讲述Worker和Master与Worker的交互。