简介:本文次要介绍 netflix conductor 的基本概念和次要运行机制。
作者 | 夜阳
起源 | 阿里技术公众号
本文次要介绍 netflix conductor 的基本概念和次要运行机制。
一 简介
netflix conductor 是基于 JAVA 语言编写的开源流程引擎,用于架构基于微服务的流程。它具备如下个性:
- 容许创立简单的业务流程,流程中每个独立的工作都是由一个微服务所实现。
- 基于 JSON DSL 创立工作流,对工作的执行进行编排。
- 工作流在执行的过程中可见、可追溯。
- 提供暂停、复原、重启等多种管制模型。
- 提供一种简略的形式来最大限度重用微服务。
- 领有扩大到百万流程并发运行的服务能力。
- 通过队列服务实现客户端与服务端的拆散。
- 反对 HTTP 或其余 RPC 协定进行数据传送
二 基本概念
1 Task
Task 是最小执行单元,承载了一段执行逻辑,如发送 HTTP 申请等。
- System Task:被 conductor 服务执行,这些工作的执行与引擎在同一个 JVM 中。
- Worker Task:被 worker 服务执行,执行与引擎隔离开,worker 通过队列获取工作后,执行并更新后果状态到引擎。Worker 的实现是跨语言的,其应用 Http 协定与 Server 通信。
conductor 提供了若干内置 SystemTask:
功能性 Task:
- HTTP:发送 http 申请
- JSON_JQ_TRANSFORM:jq 命令执行,个别用户 json 的转换,具体可见 jq 官网文档
- KAFKA_PUBLISH: 公布 kafka 音讯
流程管制 Task:
- SWITCH(原 Decision):条件判断分支,相似于代码中的 switch case
- FORK:启动并行分支,用于调度并行任务
- JOIN:汇总并行分支,用于汇总并行任务
- DO_WHILE:循环,相似于代码中的 do while
- WAIT:始终在运行中,直到内部工夫触发更新节点状态,可用于期待内部操作
- SUB_WORKFLOW:子流程,执行其余的流程
- TERMINATE:完结流程,以指定输入提前结束流程,能够与 SWITCH 节点配合应用,相似代码中的提前 return 语句
自定义 Task:
- 对于 System Task,Conductor 提供了 WorkflowSystemTask 抽象类,能够自定义扩大实现。
- 对于 Worker Task,能够实现 conductor 的 client Worker 接口实现执行逻辑。
2 Workflow
- Workflow 由一系列须要执行的 Task 组成,conductor 采纳 json 来形容 Task 的流转关系。
- 除根本的程序流程外,借助内置的 SWITCH、FORK、JOIN、DO_WIHLE、TERMINATE 工作,还能实现分支、并行、循环、提前结束等流程管制。
3 Input&Output
Task 的输出是一种映射,其作为工作流实例化的一部分或某些其余 Task 的输入。容许将来自工作流或其余 Task 的输出 / 输入作为随后执行的 Task 的输出。
- Task 有本人的输出和输入,输入输出都是 jsonobject 类型。
- Task 能够援用其余 Task 的输入输出,应用 ${taskxxx.output}的形式援用。援用语法为 json-path,除最根底的 ${taskxxx.output}的值解析形式外,还反对其余简单操作,如过滤等,具体见 json-path 语法。
- 启动 Workflow 时能够传入流程的输出数据,Task 能够通过 ${workflow.input}的形式援用。
Task 实现原子操作的解决以及流程管制操作,Workflow 定义形容 Task 的流转关系,Task 援用 Workflow 或者其它 Task 的输入输出。通过这些机制,conductor 实现了 JSON DSL 对流程的形容。
三 整体架构
次要分为几个局部:
- Orchestrator: 负责流程的流转调度工作;
- Management/Execution Service: 提供流程、工作的治理更新等操作;
- TaskQueues: 工作队列,Orchestrator 解析进去的待执行 Task 会放到队列中;
- Worker: 工作执行 worker,从 TaskQueues 中获取工作,通过 Execution Service 更新工作状态与后果数据;
- Database: 元数据 & 运行时数据库,用于保留运行时的 Workflow、Task 等状态信息,以及流程工作定义的等原信息;
- Index: 索引数据库,用于存储执行历史;
四 运行模型
1 Task 状态转移
- SCHEDULED:待调度,task 放到队列中还没有被 poll 进去执行时的状态
- IN_PROGRESS:执行中,被 poll 进去执行但还没有实现时的状态
- COMPLETED:执行实现
- FAILED:执行失败
- CANCELLED:被停止时为此状态,个别呈现在两种状况:
1. 手动停止流程时,正在运行中的 task 会被置为此状态;
2. 多个 fork 分支,当某个分支的 task 失败时,其它分支中正在运行的 task 会被置为此状态;
2 工作队列
工作的执行(同步的零碎工作除外)都会先增加到工作队列中,是典型的生产者消费者模式。
- 工作队列,是一个带有提早、优先级性能的队列;
- 每种类型的 Task 是一个独自的队列,此外,如果配置了 domain、isolationGroup,还会拆分成多个队列实现执行隔离;
- decider service 是生产者,其依据流程配置与以后执行状况,解析出可执行的 task 后,增加到队列;
- 工作执行器 (SystemTaskWorker、Worker) 是消费者,其长轮询对应的队列,从队列中获取工作执行;
队列接口可插拔,conductor 提供了 Dynomite、MySQL、PostgreSQL 的实现。
3 外围性能实现机制
conductor 调度的外围是 decider service,其依据以后流程运行的状态,解析出将要执行的工作列表,将工作入队交给 worker 执行。
decide 次要流程简化如下,具体代码见 WorkflowExecutor.java 的 decide 办法:
其中,调度工作解决流程简化如下,具体代码见 WorkflowExecutor.java 的 scheduleTask 办法:
decide 的触发机会
最次要的触发机会:
- 新启动执行时,会触发 decide 操作
- 零碎工作执行实现时,会触发 decide 操作
- Workder 工作通过 ExecutionService 更新工作状态时,会触发 decide 操作
流程管制节点的实现机制
1)Task & TaskMapper
对于每一个 Task 来说,都有 Task 和 TaskMapper 两局部:
- Task:工作的执行逻辑代码,它的作用是 Task 的执行
- TaskMapper:工作的映射逻辑代码,它通过 Task 的定义配置、以后实例的执行状态等信息,返回理论须要执行的 Task 列表
对于个别的工作来说,TaskMapper 返回的是就是 Task 自身,补充一些执行实例的状态信息。然而对于管制节点来说,会有不同的逻辑。
2)条件分支 (SWITCH) 的实现机制
SWITCH 用于依据条件判断,执行不同的分支。
实际上,该节点的 Task 不做任何操作,TaskMapper 依据分支条件,判断出要走的分之后,返回对应分支的第一个 Task。
SwitchTaskMapper.java getMappedTasks 办法要害代码:
// 待调度的 Task list,最终返回后果
List<Task> tasksToBeScheduled = new LinkedList<>();
// evalResult 是分支条件变量的值(case)
// decisionCases 是一个 Map 构造,key 为分支的 case 值,value 为对应分支的工作定义 list(分支内的工作定义会有多个)// 依据分支变量的理论值,获取对应分支的工作定义 list
List<WorkflowTask> selectedTasks = taskToSchedule.getDecisionCases().get(evalResult);
// default 的逻辑:如果获取不到对应的分支或者分支为空,则用默认的分支
if (selectedTasks == null || selectedTasks.isEmpty()) {selectedTasks = taskToSchedule.getDefaultCase();
}
if (selectedTasks != null && !selectedTasks.isEmpty()) {// 获取分支的第一个(下标 0)task,返回给 decider service 去做调度(decider 会把工作增加到队列里,交给 worker 去执行)WorkflowTask selectedTask = selectedTasks.get(0);
// 调用了 deciderService 的 getTasksToBeScheduled 办法,此办法里又获取到 TaskMapper 调用了 getMappedTasks。这里采纳了递归调用的形式,解析嵌套的 Task
List<Task> caseTasks = taskMapperContext.getDeciderService()
.getTasksToBeScheduled(workflowInstance, selectedTask, retryCount, taskMapperContext.getRetryTaskId());
tasksToBeScheduled.addAll(caseTasks);
switchTask.getInputData().put("hasChildren", "true");
}
return tasksToBeScheduled;
3)并行 (FORK) 的实现机制
FORK 用于开启多个并行分支。
实际上,该节点的 Task 不做任何操作,TaskMapper 返回所有并行分支的第一个 Task。
ForkJoinTaskMapper.java getMappedTasks 要害代码:
// 待调度的 Task list,最终返回后果
List<Task> tasksToBeScheduled = new LinkedList<>();
// 配置中的所有 fork 分支
List<List<WorkflowTask>> forkTasks = taskToSchedule.getForkTasks();
for (List<WorkflowTask> wfts : forkTasks) {
// 每个分支取第一个 Task
WorkflowTask wft = wfts.get(0);
// 调用了 deciderService 的 getTasksToBeScheduled 办法,此办法里又获取到 TaskMapper 调用了 getMappedTasks。这里采纳了递归调用的形式,解析嵌套的 Task
List<Task> tasks2 = taskMapperContext.getDeciderService()
.getTasksToBeScheduled(workflowInstance, wft, retryCount);
tasksToBeScheduled.addAll(tasks2);
}
return tasksToBeScheduled;
总的来说,分支 (SWITCH)、并行(FORK) 节点自身没有执行逻辑,其通过 TaskMapper 返回到理论要执行的 Task,而后交给 Decider Service 解决。
重试的实现机制
重试和其延迟时间设置,都是借助工作队列的性能实现的。
重试:将工作从新增加到工作队列
重试的延迟时间:增加到工作队列时设置延迟时间,延迟时间过后,工作能力在队列中被 poll 进去执行
五 完整性保障机制
因为调度过程中可能会呈现因机器重启、网络异样、JVM 解体等偶发状况,这些会导致的 decide 过程意外终止,流程执行不残缺,展现出如流程始终运行中(理论曾经没有在调度),或者其它状态谬误等异常现象。
1 WorkflowReconciler
针对这种状况,conductor 有一个 WorkflowReconciler,会定期尝试 decide 所有正在运行中的流程,修复流程执行的一致性。此外,它还有一个作用是校验流程超时工夫。
2 decideQueue
那么 WorkflowReconciler 是如何获取到以后运行中的流程呢,答案是 decideQueue。
decideQueue 和工作队列雷同,也是一个具备提早性能的队列,其寄存的是正在执行中的流程的实例 id。在工作开始执行时(包含新启动执行、重试执行、复原执行、重跑执行等),会将实例 id push 到 decideQueue 中;在执行完结(胜利、失败)时,会从 decideQueue 中删除实例 id。
3 ExecutionLockService
WorkflowReconciler 会定期尝试 decide 所有正在运行中的流程用于超时判断、保护流程一致性。然而流程自身失常执行也会触发 decide,如果同一个执行同时触发两个 decide,可能会导致状态凌乱,执行卡住等问题。
conductor 采纳了锁来解决这个问题,其提供了单机 LocalOnlyLock(基于信号量实现)、redis 分布式锁(基于 redission 实现)、zookeeper 分布式锁三种实现。
decide 办法中最开始会尝试获取锁,如果获取失败则间接返回。通过锁来保障不会对同一个流程实例并发执行 decide。
if (!executionLockService.acquireLock(workflowId)) {return false;}
因为锁是可配置的,可能会导致一个误区:单台机器的话不必配置锁。其实单机也是须要配置锁的,因为 WorkflowReconciler 和流程失常执行会产生抵触,可能会导致偶发的流程状态凌乱问题。
原文链接
本文为阿里云原创内容,未经容许不得转载。