简介:本文次要介绍netflix conductor的基本概念和次要运行机制。
作者 | 夜阳
起源 | 阿里技术公众号
本文次要介绍netflix conductor的基本概念和次要运行机制。
一 简介
netflix conductor是基于JAVA语言编写的开源流程引擎,用于架构基于微服务的流程。它具备如下个性:
- 容许创立简单的业务流程,流程中每个独立的工作都是由一个微服务所实现。
- 基于JSON DSL 创立工作流,对工作的执行进行编排。
- 工作流在执行的过程中可见、可追溯。
- 提供暂停、复原、重启等多种管制模型。
- 提供一种简略的形式来最大限度重用微服务。
- 领有扩大到百万流程并发运行的服务能力。
- 通过队列服务实现客户端与服务端的拆散。
- 反对 HTTP 或其余RPC协定进行数据传送
二 基本概念
1 Task
Task是最小执行单元,承载了一段执行逻辑,如发送HTTP申请等。
- System Task:被conductor服务执行,这些工作的执行与引擎在同一个JVM中。
- Worker Task:被worker服务执行,执行与引擎隔离开,worker通过队列获取工作后,执行并更新后果状态到引擎。Worker的实现是跨语言的,其应用Http协定与Server通信。
conductor提供了若干内置SystemTask:
功能性Task:
- HTTP:发送http申请
- JSON_JQ_TRANSFORM:jq命令执行,个别用户json的转换,具体可见jq官网文档
- KAFKA_PUBLISH: 公布kafka音讯
流程管制Task:
- SWITCH(原Decision):条件判断分支,相似于代码中的switch case
- FORK:启动并行分支,用于调度并行任务
- JOIN:汇总并行分支,用于汇总并行任务
- DO_WHILE:循环,相似于代码中的do while
- WAIT:始终在运行中,直到内部工夫触发更新节点状态,可用于期待内部操作
- SUB_WORKFLOW:子流程,执行其余的流程
- TERMINATE:完结流程,以指定输入提前结束流程,能够与SWITCH节点配合应用,相似代码中的提前return语句
自定义Task:
- 对于System Task,Conductor提供了WorkflowSystemTask 抽象类,能够自定义扩大实现。
- 对于Worker Task,能够实现conductor的client Worker接口实现执行逻辑。
2 Workflow
- Workflow由一系列须要执行的Task组成,conductor采纳json来形容Task的流转关系。
- 除根本的程序流程外,借助内置的SWITCH、FORK、JOIN、DO_WIHLE、TERMINATE工作,还能实现分支、并行、循环、提前结束等流程管制。
3 Input&Output
Task的输出是一种映射,其作为工作流实例化的一部分或某些其余Task的输入。容许将来自工作流或其余Task的输出/输入作为随后执行的Task的输出。
- Task有本人的输出和输入,输入输出都是jsonobject类型。
- Task能够援用其余Task的输入输出,应用${taskxxx.output}的形式援用。援用语法为json-path,除最根底的${taskxxx.output}的值解析形式外,还反对其余简单操作,如过滤等,具体见json-path语法。
- 启动Workflow时能够传入流程的输出数据,Task能够通过${workflow.input}的形式援用。
Task实现原子操作的解决以及流程管制操作,Workflow定义形容Task的流转关系,Task援用Workflow或者其它Task的输入输出。通过这些机制,conductor实现了JSON DSL对流程的形容。
三 整体架构
次要分为几个局部:
- Orchestrator: 负责流程的流转调度工作;
- Management/Execution Service: 提供流程、工作的治理更新等操作;
- TaskQueues: 工作队列,Orchestrator解析进去的待执行Task会放到队列中;
- Worker: 工作执行worker,从TaskQueues中获取工作,通过Execution Service更新工作状态与后果数据;
- Database: 元数据&运行时数据库,用于保留运行时的Workflow、Task等状态信息,以及流程工作定义的等原信息;
- Index: 索引数据库,用于存储执行历史;
四 运行模型
1 Task状态转移
- SCHEDULED:待调度,task放到队列中还没有被poll进去执行时的状态
- IN_PROGRESS:执行中,被poll进去执行但还没有实现时的状态
- COMPLETED:执行实现
- FAILED:执行失败
- CANCELLED:被停止时为此状态,个别呈现在两种状况:
1.手动停止流程时,正在运行中的task会被置为此状态;
2.多个fork分支,当某个分支的task失败时,其它分支中正在运行的task会被置为此状态;
2 工作队列
工作的执行(同步的零碎工作除外)都会先增加到工作队列中,是典型的生产者消费者模式。
- 工作队列,是一个带有提早、优先级性能的队列;
- 每种类型的Task是一个独自的队列,此外,如果配置了domain、isolationGroup,还会拆分成多个队列实现执行隔离;
- decider service是生产者,其依据流程配置与以后执行状况,解析出可执行的task后,增加到队列;
- 工作执行器(SystemTaskWorker、Worker)是消费者,其长轮询对应的队列,从队列中获取工作执行;
队列接口可插拔,conductor提供了Dynomite 、MySQL、PostgreSQL的实现。
3 外围性能实现机制
conductor调度的外围是decider service,其依据以后流程运行的状态,解析出将要执行的工作列表,将工作入队交给worker执行。
decide次要流程简化如下,具体代码见WorkflowExecutor.java的decide办法:
其中,调度工作解决流程简化如下,具体代码见WorkflowExecutor.java的scheduleTask办法:
decide的触发机会
最次要的触发机会:
- 新启动执行时,会触发decide操作
- 零碎工作执行实现时,会触发decide操作
- Workder工作通过ExecutionService更新工作状态时,会触发decide操作
流程管制节点的实现机制
1)Task & TaskMapper
对于每一个Task来说,都有Task和TaskMapper两局部:
- Task:工作的执行逻辑代码,它的作用是Task的执行
- TaskMapper:工作的映射逻辑代码,它通过Task的定义配置、以后实例的执行状态等信息,返回理论须要执行的Task列表
对于个别的工作来说,TaskMapper返回的是就是Task自身,补充一些执行实例的状态信息。然而对于管制节点来说,会有不同的逻辑。
2)条件分支(SWITCH)的实现机制
SWITCH用于依据条件判断,执行不同的分支。
实际上,该节点的Task不做任何操作,TaskMapper依据分支条件,判断出要走的分之后,返回对应分支的第一个Task。
SwitchTaskMapper.java getMappedTasks办法要害代码:
// 待调度的Task list,最终返回后果List<Task> tasksToBeScheduled = new LinkedList<>();// evalResult是分支条件变量的值(case)// decisionCases是一个Map构造,key为分支的case值,value为对应分支的工作定义list(分支内的工作定义会有多个)// 依据分支变量的理论值,获取对应分支的工作定义listList<WorkflowTask> selectedTasks = taskToSchedule.getDecisionCases().get(evalResult);// default的逻辑:如果获取不到对应的分支或者分支为空,则用默认的分支if (selectedTasks == null || selectedTasks.isEmpty()) { selectedTasks = taskToSchedule.getDefaultCase();}if (selectedTasks != null && !selectedTasks.isEmpty()) { // 获取分支的第一个(下标0)task,返回给decider service去做调度(decider会把工作增加到队列里,交给worker去执行) WorkflowTask selectedTask = selectedTasks.get(0); // 调用了deciderService的getTasksToBeScheduled办法,此办法里又获取到TaskMapper调用了getMappedTasks。这里采纳了递归调用的形式,解析嵌套的Task List<Task> caseTasks = taskMapperContext.getDeciderService() .getTasksToBeScheduled(workflowInstance, selectedTask, retryCount, taskMapperContext.getRetryTaskId()); tasksToBeScheduled.addAll(caseTasks); switchTask.getInputData().put("hasChildren", "true");}return tasksToBeScheduled;
3)并行(FORK)的实现机制
FORK用于开启多个并行分支。
实际上,该节点的Task不做任何操作,TaskMapper返回所有并行分支的第一个Task。
ForkJoinTaskMapper.java getMappedTasks要害代码:
// 待调度的Task list,最终返回后果List<Task> tasksToBeScheduled = new LinkedList<>();// 配置中的所有fork分支List<List<WorkflowTask>> forkTasks = taskToSchedule.getForkTasks();for (List<WorkflowTask> wfts : forkTasks) { // 每个分支取第一个Task WorkflowTask wft = wfts.get(0); // 调用了deciderService的getTasksToBeScheduled办法,此办法里又获取到TaskMapper调用了getMappedTasks。这里采纳了递归调用的形式,解析嵌套的Task List<Task> tasks2 = taskMapperContext.getDeciderService() .getTasksToBeScheduled(workflowInstance, wft, retryCount); tasksToBeScheduled.addAll(tasks2);}return tasksToBeScheduled;
总的来说,分支(SWITCH)、并行(FORK)节点自身没有执行逻辑,其通过TaskMapper返回到理论要执行的Task,而后交给Decider Service解决。
重试的实现机制
重试和其延迟时间设置,都是借助工作队列的性能实现的。
重试:将工作从新增加到工作队列
重试的延迟时间:增加到工作队列时设置延迟时间,延迟时间过后,工作能力在队列中被poll进去执行
五 完整性保障机制
因为调度过程中可能会呈现因机器重启、网络异样、JVM解体等偶发状况,这些会导致的decide过程意外终止,流程执行不残缺,展现出如流程始终运行中(理论曾经没有在调度),或者其它状态谬误等异常现象。
1 WorkflowReconciler
针对这种状况,conductor有一个WorkflowReconciler,会定期尝试decide所有正在运行中的流程,修复流程执行的一致性。此外,它还有一个作用是校验流程超时工夫。
2 decideQueue
那么WorkflowReconciler是如何获取到以后运行中的流程呢,答案是decideQueue。
decideQueue和工作队列雷同,也是一个具备提早性能的队列,其寄存的是正在执行中的流程的实例id。在工作开始执行时(包含新启动执行、重试执行、复原执行、重跑执行等),会将实例id push到decideQueue中;在执行完结(胜利、失败)时,会从decideQueue中删除实例id。
3 ExecutionLockService
WorkflowReconciler会定期尝试decide所有正在运行中的流程用于超时判断、保护流程一致性。然而流程自身失常执行也会触发decide,如果同一个执行同时触发两个decide,可能会导致状态凌乱,执行卡住等问题。
conductor采纳了锁来解决这个问题,其提供了单机LocalOnlyLock(基于信号量实现)、redis分布式锁(基于redission实现)、zookeeper分布式锁三种实现。
decide办法中最开始会尝试获取锁,如果获取失败则间接返回。通过锁来保障不会对同一个流程实例并发执行decide。
if (!executionLockService.acquireLock(workflowId)) { return false;}
因为锁是可配置的,可能会导致一个误区:单台机器的话不必配置锁。其实单机也是须要配置锁的,因为WorkflowReconciler和流程失常执行会产生抵触,可能会导致偶发的流程状态凌乱问题。
原文链接
本文为阿里云原创内容,未经容许不得转载。