Airflowv110任务调度平台的安装教程

10次阅读

共计 4787 个字符,预计需要花费 12 分钟才能阅读完成。

0. 背景

真的是想不通,Airflow 不论社区活跃度还是 Github 的 star 数都是远胜于 Azkaban 还有 EasyScheduler 的,但是为何却连一个完备的安装教程都没有呢?是我的需求太高?真的是心累不已,整整把搜索引擎还有 youtube 翻来覆去也没让我感到满足……不过好在,一步一坑一脚印的最终搭建连通好了环境以及 Operator。好了,废话不多说,开始 Airflow 今日份安装教程。

1. 安装前准备工作

  • 安装版本说明
安装工具 版本 用途
Python 3.6.5 安装 airflow 及其依赖包、开发 airflow 的 dag 使用
MySQL 5.7 作为 airflow 的元数据库
Airflow 1.10.0 任务调度平台

请选择一台干净的物理机或者云主机。不然,产生任何多余影响或者后果,本人概不负责!

  • 请确保你熟悉 Linux 环境及基本操作命令,顺便会一些 Python 基础命令,如果不熟悉,请出门左转充完电再来

2. 安装 Python3

Python3 的安装可以参考我之前的文章,在此不再敖述

3. 安装 MySQL

3 年前也写过一个关于 Centos 安装 MySQL 的教程,但是虽然实用,但是内容太久,在此我们用最简方式快速安装 MySQL 并配置用户(当然,如果你用现成的 RDS 也可以,省去了安装过程,可直接跳转至为 Airflow 建库建用户步骤了)。

  • 老规矩,卸载 mariadb
rpm -qa | grep mariadb

rpm -e --nodeps mariadb-libs-5.5.52-1.el7.x86_64

sudo rpm -e --nodeps mariadb-libs-5.5.52-1.el7.x86_64

rpm -qa | grep mariadb
  • 下载 mysql 的 repo 源
wget http://repo.mysql.com/mysql-community-release-el7-5.noarch.rpm
  • 通过 rpm 安装
sudo rpm -ivh mysql-community-release-el7-5.noarch.rpm
  • 安装 mysql 并授权
sudo yum install mysql-server
sudo chown -R mysql:mysql /var/lib/mysql
  • 启动 mysql
service mysqld start

__

以下操作均在 mysql 客户端上进行操作,首先需要连接并登录 mysql。

用 root 用户连接登录 mysql:

mysql -uroot
  • 重置 mysql 密码
use mysql;

update user set password=password('root') where user='root';

flush privileges;
  • 为 Airflow 建库、建用户

建库:

create database airflow;

建用户:

create user 'airflow'@'%' identified by 'airflow';

create user 'airflow'@'localhost' identified by 'airflow';

为用户授权:

grant all on airflow.* to 'airflow'@'%';

flush privileges;

exit;

__

4. 安装 Airflow

万事既已具备,让我们开始进入今天的主题!

4.1 基础篇

  • 1)通过 pip 安装 airflow 脚手架

安装之前需要设置一下临时环境变量 SLUGIFY_USES_TEXT_UNIDECODE ,不然,会导致安装失败,命令如下:

export SLUGIFY_USES_TEXT_UNIDECODE=yes

安装 airflow 脚手架:

sudo pip install apache-airflow===1.10.0

airflow 会被安装到 python3 下的 site-packages 目录下,完整目录为: ${PYTHON_HOME}/lib/python3.6/site-packages/airflow ,我的 airflow 目录如下所示:

  • 2) 正式安装 airflow

安装 airflow 前,我们需要先配置一下 airflow 的安装目录 AIRFLOW_HOME ,同时为了方便使用 airflow 的相关命令,我们也把 airflow 配置到环境变量中,一劳永逸。

编辑 /etc/profile 系统环境变量文件:

sudo vim /etc/profile

做如下修改(当然,具体目录需要修改成你自己对应的目录,不要照搬不动哦):

使修改后的环境变量立即生效:

sudo source /etc/profile
  • 3)执行 airflow 命令做初始化操作

因为配置过 airflow 的环境变量 SITE_AIRFLOW_HOME ,我们在哪里执行如下命令都可:

airflow

到此,airflow 会在刚刚的 AIRFLOW_HOME 目录下生成一些文件。当然,执行该命令时可能会报一些错误,可以不用理会!生成的文件列表如下所示:

  • 4) 为 airflow 安装 mysql 模块
sudo pip install 'apache-airflow[mysql]'

airflow 的包依赖安装均可采用该方式进行安装,具体可参考 airflow 官方文档

  • 5) 采用 mysql 作为 airflow 的元数据库

修改 airflow.cfg 文件,配置 mysql 作为 airflow 元数据库:

_
这里就巨坑无比了,很多人的教程里面都是这么直接写的,然后就完蛋了!!!不信你试试看,等下初始化数据库必死无疑!而且还没有任何有效的解决方案可以供你搜索!!!其次也不要相信什么改成 <span style=”color:green;”> pymysql </span> 配合 pymysql 包实现,这样更惨,会有数据类型解析问题,让你毫无头绪!!!切记切记!!!

sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow

sql_alchemy_conn = mysql+pymysql://airflow:airflow@localhost:3306/airflow

_
那既然这种方式不可行,该怎么办呢?办法总比困难多的!因为 Python3 不再支持 MySQLdb 了,只有 Python2 才支持。但是呢,也只有 MySQLdb 才是最佳结果。

首先,我们通过 pip 安装一下 mysqlclient:

sudo pip install mysqlclient

然后,再通过 pip 安装一下 MySQLdb:

sudo pip install MySQLdb

最后,我们修改 airflow.cfg 文件中的 sql_alchemy_conn 配置:

sql_alchemy_conn = mysql+mysqldb://airflow:airflow@localhost:3306/airflow

到此,我们已经为 airflow 配置好元数据库信息且准备好依赖包。

  • 6) 初始化元数据库信息(其实也就是新建 airflow 依赖的表)
airflow initdb

此时,我们的 mysql 元数据库(库名为 airflow)中已经新建好了 airflow 的依赖表:

  • 7) 应用的基础命令

    • airflow 组件:webserver, scheduler, worker, flower
    • 后台启动各组件命令: airflow xxx -D
    • 查看 dag 列表: airflow list_dags
    • 查看某个 dag 的任务列表: airflow list_tasks dag_id
    • 挂起 / 恢复某个 dag: airflow pause/unpause dag_id
    • 测试某个 dag 任务: airflow test dag_id task_id execution_date

4.2 进阶篇

  • 1) 初识 executor

这里为什么要修改呢?因为 SequentialExecutor 是单进程顺序执行任务,默认执行器,通常只用于测试,LocalExecutor 是多进程本地执行任务使用的,CeleryExecutor 是分布式调度使用(当然也可以单机),生产环境常用,DaskExecutor 则用于动态任务调度,常用于数据分析。

  • 2)如何修改时区为东八区

为什么要修改时区呢?因为 Airflow 默认的时间是 GMT 时间,虽然可以在 Airflow 集群分布在不同时区时仍可保证时间相同,不会出现时间不同步的问题,但是这个时间比北京早 8 小时,不太符合我们的阅读习惯,也不够简洁直观。鉴于我们通常情况下,我们要么为单节点服务,要么即使扩展也是在同一个时区的,所以将时区修改为东八区,即北京时间,这样更便于我们使用。

Come on!

(1) 修改 airflow.cfg 文件:

default_timezone = Asia/Shanghai

这里修改的是 scheduler 的调度时间,也就是说在编写调度时间是可以直接写北京时间。

(2) 修改 webserver 页面上右上角展示的时间:

需要修改 ${PATHON_HOME}/lib/python3.6/site-packages/airflow/www/templates/admin/master.html 文件。

修改后效果如图所示:

(3) 修改 webserver lastRun 时间:

第 1 处修改 ${PATHON_HOME}/lib/python3.6/site-packages/airflow/models.py 文件。

def utc2local(self,utc):
       import time
       epoch = time.mktime(utc.timetuple())
       offset = datetime.fromtimestamp(epoch) - datetime.utcfromtimestamp(epoch)
       return utc + offset

效果如下:

第 2 处修改 ${PATHON_HOME}/lib/python3.6/site-packages/airflow/www/templates/airflow/dags.html 文件。

dag.utc2local(last_run.execution_date).strftime("%Y-%m-%d %H:%M")
dag.utc2local(last_run.start_date).strftime("%Y-%m-%d %H:%M")

效果如下:

修改完毕,此时可以通过重启 webserver 查看效果!

  • 3) 添加用户认证

在这里我们采用简单的 password 认证方式足矣!

(1)安装 password 组件:

sudo pip install apache-airflow

  此处含有隐藏内容,需要正确输入密码后可见!

(2)修改 airflow.cfg 配置文件:

[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth

(3)编写 python 脚本用于添加用户账号:

编写 add_account.py 文件:

import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser

user = PasswordUser(models.User())
user.username = 'airflow'
user.email = 'test_airflow@wps.cn'
user.password = 'airflow'

session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()

执行 add_account.py 文件:

python add_account.py

你会发现 mysql 元数据库表 user 中会多出来一条记录的。

  • 4) 修改 webserver 地址

4.3 高级篇

  • 1) 配置 Airflow 分布式集群

(未完待续 …)

正文完
 0