简介:本文次要介绍netflix conductor的基本概念和次要运行机制。

作者 | 夜阳
起源 | 阿里技术公众号

本文次要介绍netflix conductor的基本概念和次要运行机制。

一 简介

netflix conductor是基于JAVA语言编写的开源流程引擎,用于架构基于微服务的流程。它具备如下个性:

  • 容许创立简单的业务流程,流程中每个独立的工作都是由一个微服务所实现。
  • 基于JSON DSL 创立工作流,对工作的执行进行编排。
  • 工作流在执行的过程中可见、可追溯。
  • 提供暂停、复原、重启等多种管制模型。
  • 提供一种简略的形式来最大限度重用微服务。
  • 领有扩大到百万流程并发运行的服务能力。
  • 通过队列服务实现客户端与服务端的拆散。
  • 反对 HTTP 或其余RPC协定进行数据传送

二 基本概念

1 Task

Task是最小执行单元,承载了一段执行逻辑,如发送HTTP申请等。

  • System Task:被conductor服务执行,这些工作的执行与引擎在同一个JVM中。
  • Worker Task:被worker服务执行,执行与引擎隔离开,worker通过队列获取工作后,执行并更新后果状态到引擎。Worker的实现是跨语言的,其应用Http协定与Server通信。

conductor提供了若干内置SystemTask:

功能性Task:

  • HTTP:发送http申请
  • JSON_JQ_TRANSFORM:jq命令执行,个别用户json的转换,具体可见jq官网文档
  • KAFKA_PUBLISH: 公布kafka音讯

流程管制Task:

  • SWITCH(原Decision):条件判断分支,相似于代码中的switch case
  • FORK:启动并行分支,用于调度并行任务
  • JOIN:汇总并行分支,用于汇总并行任务
  • DO_WHILE:循环,相似于代码中的do while
  • WAIT:始终在运行中,直到内部工夫触发更新节点状态,可用于期待内部操作
  • SUB_WORKFLOW:子流程,执行其余的流程
  • TERMINATE:完结流程,以指定输入提前结束流程,能够与SWITCH节点配合应用,相似代码中的提前return语句

自定义Task:

  • 对于System Task,Conductor提供了WorkflowSystemTask 抽象类,能够自定义扩大实现。
  • 对于Worker Task,能够实现conductor的client Worker接口实现执行逻辑。

2 Workflow

  • Workflow由一系列须要执行的Task组成,conductor采纳json来形容Task的流转关系。
  • 除根本的程序流程外,借助内置的SWITCH、FORK、JOIN、DO_WIHLE、TERMINATE工作,还能实现分支、并行、循环、提前结束等流程管制。

3 Input&Output

Task的输出是一种映射,其作为工作流实例化的一部分或某些其余Task的输入。容许将来自工作流或其余Task的输出/输入作为随后执行的Task的输出。

  • Task有本人的输出和输入,输入输出都是jsonobject类型。
  • Task能够援用其余Task的输入输出,应用${taskxxx.output}的形式援用。援用语法为json-path,除最根底的${taskxxx.output}的值解析形式外,还反对其余简单操作,如过滤等,具体见json-path语法。
  • 启动Workflow时能够传入流程的输出数据,Task能够通过${workflow.input}的形式援用。

Task实现原子操作的解决以及流程管制操作,Workflow定义形容Task的流转关系,Task援用Workflow或者其它Task的输入输出。通过这些机制,conductor实现了JSON DSL对流程的形容。

三 整体架构

次要分为几个局部:

  • Orchestrator: 负责流程的流转调度工作;
  • Management/Execution Service: 提供流程、工作的治理更新等操作;
  • TaskQueues: 工作队列,Orchestrator解析进去的待执行Task会放到队列中;
  • Worker: 工作执行worker,从TaskQueues中获取工作,通过Execution Service更新工作状态与后果数据;
  • Database: 元数据&运行时数据库,用于保留运行时的Workflow、Task等状态信息,以及流程工作定义的等原信息;
  • Index: 索引数据库,用于存储执行历史;

四 运行模型

1 Task状态转移

  • SCHEDULED:待调度,task放到队列中还没有被poll进去执行时的状态
  • IN_PROGRESS:执行中,被poll进去执行但还没有实现时的状态
  • COMPLETED:执行实现
  • FAILED:执行失败
  • CANCELLED:被停止时为此状态,个别呈现在两种状况:

1.手动停止流程时,正在运行中的task会被置为此状态;
2.多个fork分支,当某个分支的task失败时,其它分支中正在运行的task会被置为此状态;

2 工作队列

工作的执行(同步的零碎工作除外)都会先增加到工作队列中,是典型的生产者消费者模式。

  • 工作队列,是一个带有提早、优先级性能的队列;
  • 每种类型的Task是一个独自的队列,此外,如果配置了domain、isolationGroup,还会拆分成多个队列实现执行隔离;
  • decider service是生产者,其依据流程配置与以后执行状况,解析出可执行的task后,增加到队列;
  • 工作执行器(SystemTaskWorker、Worker)是消费者,其长轮询对应的队列,从队列中获取工作执行;

队列接口可插拔,conductor提供了Dynomite 、MySQL、PostgreSQL的实现。

3 外围性能实现机制

conductor调度的外围是decider service,其依据以后流程运行的状态,解析出将要执行的工作列表,将工作入队交给worker执行。

decide次要流程简化如下,具体代码见WorkflowExecutor.java的decide办法:

其中,调度工作解决流程简化如下,具体代码见WorkflowExecutor.java的scheduleTask办法:

decide的触发机会

最次要的触发机会:

  • 新启动执行时,会触发decide操作
  • 零碎工作执行实现时,会触发decide操作
  • Workder工作通过ExecutionService更新工作状态时,会触发decide操作

流程管制节点的实现机制

1)Task & TaskMapper

对于每一个Task来说,都有Task和TaskMapper两局部:

  • Task:工作的执行逻辑代码,它的作用是Task的执行
  • TaskMapper:工作的映射逻辑代码,它通过Task的定义配置、以后实例的执行状态等信息,返回理论须要执行的Task列表

对于个别的工作来说,TaskMapper返回的是就是Task自身,补充一些执行实例的状态信息。然而对于管制节点来说,会有不同的逻辑。

2)条件分支(SWITCH)的实现机制

SWITCH用于依据条件判断,执行不同的分支。

实际上,该节点的Task不做任何操作,TaskMapper依据分支条件,判断出要走的分之后,返回对应分支的第一个Task。

SwitchTaskMapper.java getMappedTasks办法要害代码:

// 待调度的Task list,最终返回后果List<Task> tasksToBeScheduled = new LinkedList<>();// evalResult是分支条件变量的值(case)// decisionCases是一个Map构造,key为分支的case值,value为对应分支的工作定义list(分支内的工作定义会有多个)// 依据分支变量的理论值,获取对应分支的工作定义listList<WorkflowTask> selectedTasks = taskToSchedule.getDecisionCases().get(evalResult);// default的逻辑:如果获取不到对应的分支或者分支为空,则用默认的分支if (selectedTasks == null || selectedTasks.isEmpty()) {  selectedTasks = taskToSchedule.getDefaultCase();}if (selectedTasks != null && !selectedTasks.isEmpty()) {  // 获取分支的第一个(下标0)task,返回给decider service去做调度(decider会把工作增加到队列里,交给worker去执行)  WorkflowTask selectedTask = selectedTasks.get(0);  // 调用了deciderService的getTasksToBeScheduled办法,此办法里又获取到TaskMapper调用了getMappedTasks。这里采纳了递归调用的形式,解析嵌套的Task  List<Task> caseTasks = taskMapperContext.getDeciderService()    .getTasksToBeScheduled(workflowInstance, selectedTask, retryCount, taskMapperContext.getRetryTaskId());  tasksToBeScheduled.addAll(caseTasks);  switchTask.getInputData().put("hasChildren", "true");}return tasksToBeScheduled;

3)并行(FORK)的实现机制

FORK用于开启多个并行分支。

实际上,该节点的Task不做任何操作,TaskMapper返回所有并行分支的第一个Task。
ForkJoinTaskMapper.java getMappedTasks要害代码:

// 待调度的Task list,最终返回后果List<Task> tasksToBeScheduled = new LinkedList<>();// 配置中的所有fork分支List<List<WorkflowTask>> forkTasks = taskToSchedule.getForkTasks();for (List<WorkflowTask> wfts : forkTasks) {  // 每个分支取第一个Task  WorkflowTask wft = wfts.get(0);  // 调用了deciderService的getTasksToBeScheduled办法,此办法里又获取到TaskMapper调用了getMappedTasks。这里采纳了递归调用的形式,解析嵌套的Task  List<Task> tasks2 = taskMapperContext.getDeciderService()    .getTasksToBeScheduled(workflowInstance, wft, retryCount);  tasksToBeScheduled.addAll(tasks2);}return tasksToBeScheduled;

总的来说,分支(SWITCH)、并行(FORK)节点自身没有执行逻辑,其通过TaskMapper返回到理论要执行的Task,而后交给Decider Service解决。

重试的实现机制

重试和其延迟时间设置,都是借助工作队列的性能实现的。

重试:将工作从新增加到工作队列

重试的延迟时间:增加到工作队列时设置延迟时间,延迟时间过后,工作能力在队列中被poll进去执行

五 完整性保障机制

因为调度过程中可能会呈现因机器重启、网络异样、JVM解体等偶发状况,这些会导致的decide过程意外终止,流程执行不残缺,展现出如流程始终运行中(理论曾经没有在调度),或者其它状态谬误等异常现象。

1 WorkflowReconciler

针对这种状况,conductor有一个WorkflowReconciler,会定期尝试decide所有正在运行中的流程,修复流程执行的一致性。此外,它还有一个作用是校验流程超时工夫。

2 decideQueue

那么WorkflowReconciler是如何获取到以后运行中的流程呢,答案是decideQueue。

decideQueue和工作队列雷同,也是一个具备提早性能的队列,其寄存的是正在执行中的流程的实例id。在工作开始执行时(包含新启动执行、重试执行、复原执行、重跑执行等),会将实例id push到decideQueue中;在执行完结(胜利、失败)时,会从decideQueue中删除实例id。

3 ExecutionLockService

WorkflowReconciler会定期尝试decide所有正在运行中的流程用于超时判断、保护流程一致性。然而流程自身失常执行也会触发decide,如果同一个执行同时触发两个decide,可能会导致状态凌乱,执行卡住等问题。

conductor采纳了锁来解决这个问题,其提供了单机LocalOnlyLock(基于信号量实现)、redis分布式锁(基于redission实现)、zookeeper分布式锁三种实现。

decide办法中最开始会尝试获取锁,如果获取失败则间接返回。通过锁来保障不会对同一个流程实例并发执行decide。

if (!executionLockService.acquireLock(workflowId)) {  return false;}

因为锁是可配置的,可能会导致一个误区:单台机器的话不必配置锁。其实单机也是须要配置锁的,因为WorkflowReconciler和流程失常执行会产生抵触,可能会导致偶发的流程状态凌乱问题。

原文链接
本文为阿里云原创内容,未经容许不得转载。