关于任务调度:任务调度系统系列之Airflow

69次阅读

共计 6069 个字符,预计需要花费 16 分钟才能阅读完成。

这是任务调度零碎调研系列文章的开篇,后续会陆续调研 Oozie,Azkaban,Dolphin Scheduler 等零碎。本文的次要内容是来自对官网文档和网上相干材料的调研,并非基于理论应用的经验总结,文章中难免会有一些不尽的细节或者对于 Airflow 谬误的观点,如有不当之处,欢送斧正交换。

Airflow 是一个基于 Python 开发的,通过编程的形式创立、调度和监控工作流 (workflow) 的平台。最早由 Maxime Beauchemin 于 2014 年 10 月在 Airbnb 创立,并且从创立之初就以开源的模式开发。2016 年 3 月进入 Apache 基金孵化器,2019 年 1 月正式成为 Apache 顶级我的项目。

官网文档中,特意强调了应用代码定义工作流的长处,使得工作流的保护、版本治理、测试和合作变得更加容易,间接复用代码开发过程中用到工具、零碎就能够了,无需再反复造轮子,能够像开发软件系统一样开发数据工作,继续集成也是开箱即用。然而数据工作的测试向来不是一件简略的事件,不晓得在理论应用中基于 Airflow 的数据开发 CI/CD 晦涩度如何。这种基于 Python 代码定义工作流的形式应用门槛略微高了一点。基于代码定义 flow 中节点的依赖关系,并不如通过界面拖拽那么直观,是不是也会使易用性大大折扣?

架构


如上图所示,Airflow 次要由以下几个局部组成:

DAG 目录(DAG Directory)

存储定义 DAG 的 Python 文件的目录,调度器、执行器和执行节点会读取该目录下的文件获取 DAG 相干信息,所以要确保所有节点上 DAG 目录的数据同步。如何确保文件同步到也是一项简单的工程。

数据库(Metadata Database)

数据库次要用于存储系统的配置信息(零碎变量,数据源链接信息,用户信息等)、解析 DAG 文件生成的 DAG 对象和工作执行的状态等信息。

调度器(Scheduler)

独立部署的过程,负责触发执行定时调度的工作流。调度器次要负责两件事:1)定时扫描 DAG 文件目录,解析变更或新增的 DAG 文件,并将解析后生成的 DAG 对象 (Serialized DAG) 存储到数据库;2)扫描数据库中的 DAG 对象,如果满足调度执行条件,则触发生成工作实例并提交给 Executor 执行。

调度器高可用

从 2.0 开始,Airflow 调度器反对高可用部署,采纳了我之前实现调度服务高可用时应用的策略,通过数据库行锁的机制,实现多主的高可用。这样实现的益处是缩小了 leader 选举、节点故障转移的复杂度。多个节点同时工作相较于主从模式也能获取较好的解决性能,能够通过横向扩大调度器晋升调度服务的解决能力,但究竟要受限于底层单点数据库的解决能力。如果执行事务的时长比拟久,特地是事务中存在校验并发限度、资源应用配额的操作时,就很容易造成死锁,所以在 Airflow 理论部署中,高可用对数据库有着非凡的要求,须要数据库反对 SKIP LOCKED 或者NOWAIT

执行器(Executor)

执行器负责执行或者提交执行工作实例(Task Instances)。执行器在理论部署中集成在调度器过程内,Airflow 提供了两种类型的执行器,1)本地执行器,工作间接在调度器所在节点上执行;2)近程执行器,依据各执行器的实现形式,将工作提交到近程执行节点上执行。如果零碎自带的执行器无奈满足你的业务需要,能够自行实现自定义执行器。

零碎自带本地执行器:

  • Debug Executor:次要用于在 IDE 中对工作进行测试执行。
  • Local Executor:在调度器本地新建过程执行工作实例,能够通过 parallelism 参数管制最大工作并发数。
  • Sequential Executor:能够了解为最大并发数 1 的 Local Executor。

零碎自带近程执行器:

  • Celery Executor:Celery 是一个基于 Python 开发的分布式异步音讯工作队列,通过它能够轻松的实现工作的异步解决。Celery Executor 将工作发送到音讯队列 (RabbitMQ, Redis 等),而后 Celery Worker 从音讯队列中生产执行工作,并将执行后果写入到 Celery 的 Backend 中。Celery Executor 通过队列(queues) 实现资源隔离,定义工作时指定应用的具体队列,则该工作只能由相应队列的 worker 执行。然而这个资源隔离的粒度有点粗,如果想实现更细粒度的资源,能够抉择 Kubernetes Executor。
  • Kubernetes Executor:通过 K8S 集群执行工作。Kubernetes Executor 调用 K8S API 申请 Worker Pod,而后由 Pod 负责工作的执行。
  • CeleryKubernetes Executor:是下面两个执行器的组合,因为 Airflow 部署时只能指定一种类型执行器,如果既须要通过 Celery 执行又想提交到 K8S 集群执行,则能够抉择该执行器。
  • Dask Executor: Dask 是基于 Python 实现的分布式计算框架,Dask Executor 次要是通过 Dask 分布式集群执行工作。

近程执行时所有执行节点都会间接数据库,鉴于弹性伸缩是 Airflow 的一大个性,如果执行节点规模太大对数据库造成的压力不可小觑,所以为什么要采纳执行节点直连数据的形式呢?

执行节点(worker)

负责具体任务的执行,依据执行器不同,可能是调度器所在节点,Celery Worker 节点,K8S Pod 等。

WebServer

WebServer 次要为用户提供了治理 DAG(启用、禁用,手动执行),查看和操作 DAG 的执行状态,管理系统权限,查看和批改系统配置,治理数据源等性能。前文提到的通过代码定义依赖关系不直观的问题,Airflow 在 WebServer 给了解决方案,运行 DAG,而后通过 WebServer 的 Graph 视图以可视化的形式展现 DAG。如果肯定要在执行前可视化的形式查看 DAG 也能够在命令行执行 airflow dags show 生成 Graph 视图的图片。兴许是我调研的还不够深刻,难道就没有实时可视化展现 DAG 的计划?

性能个性

工作流定义

Airflow 通过 Python 代码以 DAG 的模式定义工作流,以下代码片段定义了上图由 7 个工作节点组成的 DAG。

// 从 2021 年 1 月 1 日开始,每天零点调度
with DAG("daily_dag", schedule_interval="@daily", start_date=datetime(2021, 1, 1)
) as dag:
    ingest = DummyOperator(task_id="ingest")
    analyse = DummyOperator(task_id="analyze")
    check = DummyOperator(task_id="check_integrity")
    describe = DummyOperator(task_id="describe_integrity")
    error = DummyOperator(task_id="email_error")
    save = DummyOperator(task_id="save")
    report = DummyOperator(task_id="report")

    ingest >> analyse // 通过 `>>`,`<<` 定义节点依赖关系
    analyse.set_downstream(check) // 通过 `set_downstream`,`set_upstream` 定义节点依赖关系
    check >> Label("No errors") >> save >> report // 通过 `Label` 正文依赖关系
    check >> Label("Errors found") >> describe >> error >> report

DAG 由节点、节点间的依赖关系以及节点间的数据流组成。节点的类型次要有以下三种:

  • Operator:工作节点,负责执行某种类型的工作。Airflow 和社区曾经实现了大量的 Operator,根本笼罩了罕用数据库,Hadoop 生态沉闷的零碎和服务,以及 AWS、Google 和 Azure 三大海内云平台的零碎和服务。
  • Sensor: 一种非凡的 Operator,次要用来监听内部事件,可用作对外部零碎、数据的依赖。Airflow 通过external_task Sensor 实现了 DAG 工作间的依赖。
  • @task注解的 Python 函数,能够了解为基于 Python 装璜器定义的语法糖,能疾速简洁的定义PythonOperator

如以上 Python 代码所示,节点间的依赖关系能够通过位操作符 >>/<<set_upstream/set_downstream办法定义。

默认状况下上游节点要等上游所有节点执行胜利后才开始执行,Airflow 提供了多种形式来扭转这一默认行为。第一种形式就是通过自定义节点的触发规定 (Trigger Rules)。Airflow 提供了上游 所有节点都失败 所有节点执行实现 局部节点失败 局部节点胜利 等多种规定,详情参考上述链接。另一种形式就是通过 管制节点。目前有三种管制节点能够扭转默认行为:

  • 分支节点 (Branching): 通过python_callable 函数返回的 task_id 决定执行上游哪个节点。
  • 仅执行最新节点 (Latest Only): 如果 仅执行最新节点 以后所属的 DAG 执行实例,不是改 DAG 最新的执行实例,则改节点及其所有子节点都不会被执行。
  • 自依赖节点(Depends On Past): 依赖节点的上一次执行,只有上一次 DAG 调度中该节点执行胜利了,才触发这一次执行。

除了定义节点间的依赖关系,Airflow 还通过 XComs(cross-communications)实现了节点间的数据流。节点能够通过 xcom_push 办法输入数据,其余节点能够通过 xcom_pull 办法获取节点的输入数据。

调度

定时调度是通过 DAG 的 schedule_interval 参数定义,传参能够是 datetime.timedelta 对象、cron 表达式(Unix 格局)或者 @daily@monthly 等预设 cron 表达式。对于任务调度工夫的定义,Airflow 采纳了目前我所接触到的任务调度零碎中不同的视角,用数据工夫(Airflow 称它为 logical date,有的零碎称它为 etl_date)来定义任务调度工夫。举个例子,如果 DAG 配置每天调度一次,在 Airflow 中 2021-12-26 这次的调度实例,要在 2021-12-27 这天凌晨才会生成,解决的是 2021-12-26 的数据。而在其余零碎中 2021-12-26 这次的调度实例就是在 2021-12-26 生成,解决的是 2021-12-25 的数据。

针对 MissFire 策略(概念来自 quartz),Airflow 提供了 catchup 参数。如果 catchup 设置为 false,则未生成的调度时间段间接跳过,只生成最新的调度实例。另外在禁用和启用调度 DAG 后catchup 逻辑也会触发。

超时失败和报警

如果要限度节点最大执行工夫,能够设置 execution_timeout 参数,节点在 execution_timeout 配置工夫内未执行胜利则主动超时失败。工作执行超时报警是通过 sla 参数配置的,节点在 sla 指定的工夫内没有执行胜利,零碎主动发送 SLA 未满足邮件,也能够通过 sla_miss_callback 回调函数,自定义工作超时的逻辑。

对于报警,Airflow 提供了 email_on_failureemail_on_retry 参数管制节点在执行失败、重试时是否发送邮件报警。在理论生产环境中,邮件报警必定是不能满足需要的,其余报警形式能够通过自定义 on_failure_callback,on_retry_callback 回调函数实现。

并发限度

Airflow 提供了多种粒度的并发限度。

零碎级别
  • parallelism: Airflow 并发执行的工作数
  • max_active_runs_per_dag: 每个 DAG 可并产生成的 DAG 调度实例数
  • dag_concurrency: 每个 DAG 实例并发执行的工作数
  • worker_concurrency: 每个执行节点可并发执行的工作数,仅 Celery Executor 的执行节点反对该配置
DAG 级别
  • max_active_runs: 以后 DAG 可并产生成的 DAG 调度实例数,该配置会笼罩零碎级别的max_active_runs_per_dag
  • concurrency: 以后 DAG 实例并发执行的工作数,该配置会笼罩零碎级别的dag_concurrency
工作级别
  • pool:pool 是 Airflow 用于实现跨 DAG、跨工作的并发限度计划。定义工作时指定工作所属 pool、工作应用的 slot 数、工作优先级;pool 资源应用达到下限后,所有附属该 pool 的工作实例进入排队状态,有闲暇资源开释时,高优先工作优先获取资源。资源分配的具体策略这里就没有在深入研究。
  • task_concurrency:当前任务节点的最大并发执行实例个数,相似于max_active_runs,只是粒度更细

补数据和手动执行

补数据(backfill)能够通过以下命令触发:


airflow dags backfill --start-date START_DATE --end-date END_DATE dag_id

默认状况下,补数据只生成并执行指定工夫范畴内缺失的调度记录。再举个例子,DAG 每天调度一次,当初要补 2021-12-01 到 2021-12-03 之间的数据,其中 2021-12-02 这天曾经调度执行过,则补数据工作只会创立执行 2021-12-01 和 20210-12-03 的调度记录。backfill 命令提供了多种选项来笼罩这一默认策略。

手动执行能够通过命令 airflow dags trigger --exec-date logical_date run_id 或者通过 WebServer 触发。

数据血统

本着让业余的人干业余的事的理念,Airflow 依靠于第三方元数据管理系统实现数据血统治理,平台自身只实现血统的收集和上报。通过工作的 inletsoutlets属性定义工作的血统信息,血统信息在工作的 post_execute 办法中推送到 XCOM,而后再由 LineageBackend 把血统信息写到 Atlas、DataHub(WhereHows)或者自定义的元数据管理系统。

总结

本文通过官网文档和网上相干材料,“纸上”动态地调研了 Airflow 的零碎架构和性能个性。整体而言,Airflow 是一个调度功能完善、扩大伸缩性良好、文档详尽、社区弱小沉闷的工作流调度平台。个人感觉在任务调度零碎选型上,可能妨碍 Airflow 入选的最次要因素是基于 Python 技术栈实现的整个零碎和 DAG 定义。如果负责平台的同学和零碎面向的用户有 Python 相干技术背景,从纸面上看,Airflow 是个十分不错、甚至是第一优先级的抉择。

本文最早公布于集体博客

正文完
 0