乐趣区

关于微服务:开源微服务编排框架Netflix-Conductor

简介:本文次要介绍 netflix conductor 的基本概念和次要运行机制。

作者 | 夜阳
起源 | 阿里技术公众号

本文次要介绍 netflix conductor 的基本概念和次要运行机制。

一 简介

netflix conductor 是基于 JAVA 语言编写的开源流程引擎,用于架构基于微服务的流程。它具备如下个性:

  • 容许创立简单的业务流程,流程中每个独立的工作都是由一个微服务所实现。
  • 基于 JSON DSL 创立工作流,对工作的执行进行编排。
  • 工作流在执行的过程中可见、可追溯。
  • 提供暂停、复原、重启等多种管制模型。
  • 提供一种简略的形式来最大限度重用微服务。
  • 领有扩大到百万流程并发运行的服务能力。
  • 通过队列服务实现客户端与服务端的拆散。
  • 反对 HTTP 或其余 RPC 协定进行数据传送

二 基本概念

1 Task

Task 是最小执行单元,承载了一段执行逻辑,如发送 HTTP 申请等。

  • System Task:被 conductor 服务执行,这些工作的执行与引擎在同一个 JVM 中。
  • Worker Task:被 worker 服务执行,执行与引擎隔离开,worker 通过队列获取工作后,执行并更新后果状态到引擎。Worker 的实现是跨语言的,其应用 Http 协定与 Server 通信。

conductor 提供了若干内置 SystemTask:

功能性 Task:

  • HTTP:发送 http 申请
  • JSON_JQ_TRANSFORM:jq 命令执行,个别用户 json 的转换,具体可见 jq 官网文档
  • KAFKA_PUBLISH: 公布 kafka 音讯

流程管制 Task:

  • SWITCH(原 Decision):条件判断分支,相似于代码中的 switch case
  • FORK:启动并行分支,用于调度并行任务
  • JOIN:汇总并行分支,用于汇总并行任务
  • DO_WHILE:循环,相似于代码中的 do while
  • WAIT:始终在运行中,直到内部工夫触发更新节点状态,可用于期待内部操作
  • SUB_WORKFLOW:子流程,执行其余的流程
  • TERMINATE:完结流程,以指定输入提前结束流程,能够与 SWITCH 节点配合应用,相似代码中的提前 return 语句

自定义 Task:

  • 对于 System Task,Conductor 提供了 WorkflowSystemTask 抽象类,能够自定义扩大实现。
  • 对于 Worker Task,能够实现 conductor 的 client Worker 接口实现执行逻辑。

2 Workflow

  • Workflow 由一系列须要执行的 Task 组成,conductor 采纳 json 来形容 Task 的流转关系。
  • 除根本的程序流程外,借助内置的 SWITCH、FORK、JOIN、DO_WIHLE、TERMINATE 工作,还能实现分支、并行、循环、提前结束等流程管制。

3 Input&Output

Task 的输出是一种映射,其作为工作流实例化的一部分或某些其余 Task 的输入。容许将来自工作流或其余 Task 的输出 / 输入作为随后执行的 Task 的输出。

  • Task 有本人的输出和输入,输入输出都是 jsonobject 类型。
  • Task 能够援用其余 Task 的输入输出,应用 ${taskxxx.output}的形式援用。援用语法为 json-path,除最根底的 ${taskxxx.output}的值解析形式外,还反对其余简单操作,如过滤等,具体见 json-path 语法。
  • 启动 Workflow 时能够传入流程的输出数据,Task 能够通过 ${workflow.input}的形式援用。

Task 实现原子操作的解决以及流程管制操作,Workflow 定义形容 Task 的流转关系,Task 援用 Workflow 或者其它 Task 的输入输出。通过这些机制,conductor 实现了 JSON DSL 对流程的形容。

三 整体架构

次要分为几个局部:

  • Orchestrator: 负责流程的流转调度工作;
  • Management/Execution Service: 提供流程、工作的治理更新等操作;
  • TaskQueues: 工作队列,Orchestrator 解析进去的待执行 Task 会放到队列中;
  • Worker: 工作执行 worker,从 TaskQueues 中获取工作,通过 Execution Service 更新工作状态与后果数据;
  • Database: 元数据 & 运行时数据库,用于保留运行时的 Workflow、Task 等状态信息,以及流程工作定义的等原信息;
  • Index: 索引数据库,用于存储执行历史;

四 运行模型

1 Task 状态转移

  • SCHEDULED:待调度,task 放到队列中还没有被 poll 进去执行时的状态
  • IN_PROGRESS:执行中,被 poll 进去执行但还没有实现时的状态
  • COMPLETED:执行实现
  • FAILED:执行失败
  • CANCELLED:被停止时为此状态,个别呈现在两种状况:

1. 手动停止流程时,正在运行中的 task 会被置为此状态;
2. 多个 fork 分支,当某个分支的 task 失败时,其它分支中正在运行的 task 会被置为此状态;

2 工作队列

工作的执行(同步的零碎工作除外)都会先增加到工作队列中,是典型的生产者消费者模式。

  • 工作队列,是一个带有提早、优先级性能的队列;
  • 每种类型的 Task 是一个独自的队列,此外,如果配置了 domain、isolationGroup,还会拆分成多个队列实现执行隔离;
  • decider service 是生产者,其依据流程配置与以后执行状况,解析出可执行的 task 后,增加到队列;
  • 工作执行器 (SystemTaskWorker、Worker) 是消费者,其长轮询对应的队列,从队列中获取工作执行;

队列接口可插拔,conductor 提供了 Dynomite、MySQL、PostgreSQL 的实现。

3 外围性能实现机制

conductor 调度的外围是 decider service,其依据以后流程运行的状态,解析出将要执行的工作列表,将工作入队交给 worker 执行。

decide 次要流程简化如下,具体代码见 WorkflowExecutor.java 的 decide 办法:

其中,调度工作解决流程简化如下,具体代码见 WorkflowExecutor.java 的 scheduleTask 办法:

decide 的触发机会

最次要的触发机会:

  • 新启动执行时,会触发 decide 操作
  • 零碎工作执行实现时,会触发 decide 操作
  • Workder 工作通过 ExecutionService 更新工作状态时,会触发 decide 操作

流程管制节点的实现机制

1)Task & TaskMapper

对于每一个 Task 来说,都有 Task 和 TaskMapper 两局部:

  • Task:工作的执行逻辑代码,它的作用是 Task 的执行
  • TaskMapper:工作的映射逻辑代码,它通过 Task 的定义配置、以后实例的执行状态等信息,返回理论须要执行的 Task 列表

对于个别的工作来说,TaskMapper 返回的是就是 Task 自身,补充一些执行实例的状态信息。然而对于管制节点来说,会有不同的逻辑。

2)条件分支 (SWITCH) 的实现机制

SWITCH 用于依据条件判断,执行不同的分支。

实际上,该节点的 Task 不做任何操作,TaskMapper 依据分支条件,判断出要走的分之后,返回对应分支的第一个 Task。

SwitchTaskMapper.java getMappedTasks 办法要害代码:

// 待调度的 Task list,最终返回后果
List<Task> tasksToBeScheduled = new LinkedList<>();
// evalResult 是分支条件变量的值(case)
// decisionCases 是一个 Map 构造,key 为分支的 case 值,value 为对应分支的工作定义 list(分支内的工作定义会有多个)// 依据分支变量的理论值,获取对应分支的工作定义 list
List<WorkflowTask> selectedTasks = taskToSchedule.getDecisionCases().get(evalResult);
// default 的逻辑:如果获取不到对应的分支或者分支为空,则用默认的分支
if (selectedTasks == null || selectedTasks.isEmpty()) {selectedTasks = taskToSchedule.getDefaultCase();
}
if (selectedTasks != null && !selectedTasks.isEmpty()) {// 获取分支的第一个(下标 0)task,返回给 decider service 去做调度(decider 会把工作增加到队列里,交给 worker 去执行)WorkflowTask selectedTask = selectedTasks.get(0);
  // 调用了 deciderService 的 getTasksToBeScheduled 办法,此办法里又获取到 TaskMapper 调用了 getMappedTasks。这里采纳了递归调用的形式,解析嵌套的 Task
  List<Task> caseTasks = taskMapperContext.getDeciderService()
    .getTasksToBeScheduled(workflowInstance, selectedTask, retryCount, taskMapperContext.getRetryTaskId());
  tasksToBeScheduled.addAll(caseTasks);
  switchTask.getInputData().put("hasChildren", "true");
}
return tasksToBeScheduled;

3)并行 (FORK) 的实现机制

FORK 用于开启多个并行分支。

实际上,该节点的 Task 不做任何操作,TaskMapper 返回所有并行分支的第一个 Task。
ForkJoinTaskMapper.java getMappedTasks 要害代码:

// 待调度的 Task list,最终返回后果
List<Task> tasksToBeScheduled = new LinkedList<>();
// 配置中的所有 fork 分支
List<List<WorkflowTask>> forkTasks = taskToSchedule.getForkTasks();
for (List<WorkflowTask> wfts : forkTasks) {
  // 每个分支取第一个 Task
  WorkflowTask wft = wfts.get(0);
  // 调用了 deciderService 的 getTasksToBeScheduled 办法,此办法里又获取到 TaskMapper 调用了 getMappedTasks。这里采纳了递归调用的形式,解析嵌套的 Task
  List<Task> tasks2 = taskMapperContext.getDeciderService()
    .getTasksToBeScheduled(workflowInstance, wft, retryCount);
  tasksToBeScheduled.addAll(tasks2);
}
return tasksToBeScheduled;

总的来说,分支 (SWITCH)、并行(FORK) 节点自身没有执行逻辑,其通过 TaskMapper 返回到理论要执行的 Task,而后交给 Decider Service 解决。

重试的实现机制

重试和其延迟时间设置,都是借助工作队列的性能实现的。

重试:将工作从新增加到工作队列

重试的延迟时间:增加到工作队列时设置延迟时间,延迟时间过后,工作能力在队列中被 poll 进去执行

五 完整性保障机制

因为调度过程中可能会呈现因机器重启、网络异样、JVM 解体等偶发状况,这些会导致的 decide 过程意外终止,流程执行不残缺,展现出如流程始终运行中(理论曾经没有在调度),或者其它状态谬误等异常现象。

1 WorkflowReconciler

针对这种状况,conductor 有一个 WorkflowReconciler,会定期尝试 decide 所有正在运行中的流程,修复流程执行的一致性。此外,它还有一个作用是校验流程超时工夫。

2 decideQueue

那么 WorkflowReconciler 是如何获取到以后运行中的流程呢,答案是 decideQueue。

decideQueue 和工作队列雷同,也是一个具备提早性能的队列,其寄存的是正在执行中的流程的实例 id。在工作开始执行时(包含新启动执行、重试执行、复原执行、重跑执行等),会将实例 id push 到 decideQueue 中;在执行完结(胜利、失败)时,会从 decideQueue 中删除实例 id。

3 ExecutionLockService

WorkflowReconciler 会定期尝试 decide 所有正在运行中的流程用于超时判断、保护流程一致性。然而流程自身失常执行也会触发 decide,如果同一个执行同时触发两个 decide,可能会导致状态凌乱,执行卡住等问题。

conductor 采纳了锁来解决这个问题,其提供了单机 LocalOnlyLock(基于信号量实现)、redis 分布式锁(基于 redission 实现)、zookeeper 分布式锁三种实现。

decide 办法中最开始会尝试获取锁,如果获取失败则间接返回。通过锁来保障不会对同一个流程实例并发执行 decide。

if (!executionLockService.acquireLock(workflowId)) {return false;}

因为锁是可配置的,可能会导致一个误区:单台机器的话不必配置锁。其实单机也是须要配置锁的,因为 WorkflowReconciler 和流程失常执行会产生抵触,可能会导致偶发的流程状态凌乱问题。

原文链接
本文为阿里云原创内容,未经容许不得转载。

退出移动版