共计 4787 个字符,预计需要花费 12 分钟才能阅读完成。
0. 背景
真的是想不通,Airflow 不论社区活跃度还是 Github 的 star 数都是远胜于 Azkaban 还有 EasyScheduler 的,但是为何却连一个完备的安装教程都没有呢?是我的需求太高?真的是心累不已,整整把搜索引擎还有 youtube 翻来覆去也没让我感到满足……不过好在,一步一坑一脚印的最终搭建连通好了环境以及 Operator。好了,废话不多说,开始 Airflow 今日份安装教程。
1. 安装前准备工作
- 安装版本说明
安装工具 | 版本 | 用途 |
---|---|---|
Python | 3.6.5 | 安装 airflow 及其依赖包、开发 airflow 的 dag 使用 |
MySQL | 5.7 | 作为 airflow 的元数据库 |
Airflow | 1.10.0 | 任务调度平台 |
请选择一台干净的物理机或者云主机。不然,产生任何多余影响或者后果,本人概不负责!
- 请确保你熟悉 Linux 环境及基本操作命令,顺便会一些 Python 基础命令,如果不熟悉,请出门左转充完电再来
2. 安装 Python3
Python3 的安装可以参考我之前的文章,在此不再敖述
3. 安装 MySQL
3 年前也写过一个关于 Centos 安装 MySQL 的教程,但是虽然实用,但是内容太久,在此我们用最简方式快速安装 MySQL 并配置用户(当然,如果你用现成的 RDS
也可以,省去了安装过程,可直接跳转至为 Airflow 建库建用户步骤了)。
- 老规矩,卸载 mariadb
rpm -qa | grep mariadb
rpm -e --nodeps mariadb-libs-5.5.52-1.el7.x86_64
sudo rpm -e --nodeps mariadb-libs-5.5.52-1.el7.x86_64
rpm -qa | grep mariadb
- 下载 mysql 的 repo 源
wget http://repo.mysql.com/mysql-community-release-el7-5.noarch.rpm
- 通过 rpm 安装
sudo rpm -ivh mysql-community-release-el7-5.noarch.rpm
- 安装 mysql 并授权
sudo yum install mysql-server
sudo chown -R mysql:mysql /var/lib/mysql
- 启动 mysql
service mysqld start
__
以下操作均在 mysql 客户端上进行操作,首先需要连接并登录 mysql。
用 root 用户连接登录 mysql:
mysql -uroot
- 重置 mysql 密码
use mysql;
update user set password=password('root') where user='root';
flush privileges;
- 为 Airflow 建库、建用户
建库:
create database airflow;
建用户:
create user 'airflow'@'%' identified by 'airflow';
create user 'airflow'@'localhost' identified by 'airflow';
为用户授权:
grant all on airflow.* to 'airflow'@'%';
flush privileges;
exit;
__
4. 安装 Airflow
万事既已具备,让我们开始进入今天的主题!
4.1 基础篇
- 1)通过 pip 安装 airflow 脚手架
安装之前需要设置一下临时环境变量 SLUGIFY_USES_TEXT_UNIDECODE
,不然,会导致安装失败,命令如下:
export SLUGIFY_USES_TEXT_UNIDECODE=yes
安装 airflow 脚手架:
sudo pip install apache-airflow===1.10.0
airflow 会被安装到 python3 下的 site-packages 目录下,完整目录为: ${PYTHON_HOME}/lib/python3.6/site-packages/airflow
,我的 airflow 目录如下所示:
- 2) 正式安装 airflow
安装 airflow 前,我们需要先配置一下 airflow 的安装目录 AIRFLOW_HOME
,同时为了方便使用 airflow 的相关命令,我们也把 airflow 配置到环境变量中,一劳永逸。
编辑 /etc/profile
系统环境变量文件:
sudo vim /etc/profile
做如下修改(当然,具体目录需要修改成你自己对应的目录,不要照搬不动哦):
使修改后的环境变量立即生效:
sudo source /etc/profile
- 3)执行
airflow
命令做初始化操作
因为配置过 airflow 的环境变量 SITE_AIRFLOW_HOME
,我们在哪里执行如下命令都可:
airflow
到此,airflow 会在刚刚的 AIRFLOW_HOME
目录下生成一些文件。当然,执行该命令时可能会报一些错误,可以不用理会!生成的文件列表如下所示:
- 4) 为 airflow 安装 mysql 模块
sudo pip install 'apache-airflow[mysql]'
airflow 的包依赖安装均可采用该方式进行安装,具体可参考 airflow 官方文档
- 5) 采用 mysql 作为 airflow 的元数据库
修改 airflow.cfg 文件,配置 mysql 作为 airflow 元数据库:
_
这里就巨坑无比了,很多人的教程里面都是这么直接写的,然后就完蛋了!!!不信你试试看,等下初始化数据库必死无疑!而且还没有任何有效的解决方案可以供你搜索!!!其次也不要相信什么改成 <span style=”color:green;”> pymysql
</span> 配合 pymysql 包实现,这样更惨,会有数据类型解析问题,让你毫无头绪!!!切记切记!!!
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow
或
sql_alchemy_conn = mysql+pymysql://airflow:airflow@localhost:3306/airflow
_
那既然这种方式不可行,该怎么办呢?办法总比困难多的!因为 Python3 不再支持 MySQLdb 了,只有 Python2 才支持。但是呢,也只有 MySQLdb 才是最佳结果。
首先,我们通过 pip 安装一下 mysqlclient:
sudo pip install mysqlclient
然后,再通过 pip 安装一下 MySQLdb:
sudo pip install MySQLdb
最后,我们修改 airflow.cfg 文件中的 sql_alchemy_conn 配置:
sql_alchemy_conn = mysql+mysqldb://airflow:airflow@localhost:3306/airflow
到此,我们已经为 airflow 配置好元数据库信息且准备好依赖包。
- 6) 初始化元数据库信息(其实也就是新建 airflow 依赖的表)
airflow initdb
此时,我们的 mysql 元数据库(库名为 airflow)中已经新建好了 airflow 的依赖表:
-
7) 应用的基础命令
- airflow 组件:webserver, scheduler, worker, flower
- 后台启动各组件命令:
airflow xxx -D
- 查看 dag 列表:
airflow list_dags
- 查看某个 dag 的任务列表:
airflow list_tasks dag_id
- 挂起 / 恢复某个 dag:
airflow pause/unpause dag_id
- 测试某个 dag 任务:
airflow test dag_id task_id execution_date
4.2 进阶篇
- 1) 初识 executor
这里为什么要修改呢?因为 SequentialExecutor 是单进程顺序执行任务,默认执行器,通常只用于测试,LocalExecutor 是多进程本地执行任务使用的,CeleryExecutor 是分布式调度使用(当然也可以单机),生产环境常用,DaskExecutor 则用于动态任务调度,常用于数据分析。
- 2)如何修改时区为东八区
为什么要修改时区呢?因为 Airflow 默认的时间是 GMT 时间,虽然可以在 Airflow 集群分布在不同时区时仍可保证时间相同,不会出现时间不同步的问题,但是这个时间比北京早 8 小时,不太符合我们的阅读习惯,也不够简洁直观。鉴于我们通常情况下,我们要么为单节点服务,要么即使扩展也是在同一个时区的,所以将时区修改为东八区,即北京时间,这样更便于我们使用。
Come on!
(1) 修改 airflow.cfg 文件:
default_timezone = Asia/Shanghai
这里修改的是 scheduler 的调度时间,也就是说在编写调度时间是可以直接写北京时间。
(2) 修改 webserver 页面上右上角展示的时间:
需要修改 ${PATHON_HOME}/lib/python3.6/site-packages/airflow/www/templates/admin/master.html
文件。
修改后效果如图所示:
(3) 修改 webserver lastRun 时间:
第 1 处修改 ${PATHON_HOME}/lib/python3.6/site-packages/airflow/models.py
文件。
def utc2local(self,utc):
import time
epoch = time.mktime(utc.timetuple())
offset = datetime.fromtimestamp(epoch) - datetime.utcfromtimestamp(epoch)
return utc + offset
效果如下:
第 2 处修改 ${PATHON_HOME}/lib/python3.6/site-packages/airflow/www/templates/airflow/dags.html
文件。
dag.utc2local(last_run.execution_date).strftime("%Y-%m-%d %H:%M")
dag.utc2local(last_run.start_date).strftime("%Y-%m-%d %H:%M")
效果如下:
修改完毕,此时可以通过重启 webserver 查看效果!
- 3) 添加用户认证
在这里我们采用简单的 password 认证方式足矣!
(1)安装 password 组件:
sudo pip install apache-airflow 此处含有隐藏内容,需要正确输入密码后可见!
(2)修改 airflow.cfg 配置文件:
[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth
(3)编写 python 脚本用于添加用户账号:
编写 add_account.py
文件:
import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser
user = PasswordUser(models.User())
user.username = 'airflow'
user.email = 'test_airflow@wps.cn'
user.password = 'airflow'
session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()
执行 add_account.py
文件:
python add_account.py
你会发现 mysql 元数据库表 user 中会多出来一条记录的。
- 4) 修改 webserver 地址
4.3 高级篇
- 1) 配置 Airflow 分布式集群
(未完待续 …)