共计 2705 个字符,预计需要花费 7 分钟才能阅读完成。
简介:airflow 是 Airbnb 开源的一个用 python 编写的调度工具,基于有向无环图(DAG),airflow 能够定义一组有依赖的工作,依照依赖顺次执行,通过 python 代码定义子工作,并反对各种 Operate 操作器,灵活性大,能满足用户的各种需要。本文次要介绍应用 Airflow 的 python Operator 调度 MaxCompute 工作。
背景
airflow 是 Airbnb 开源的一个用 python 编写的调度工具,基于有向无环图(DAG),airflow 能够定义一组有依赖的工作,依照依赖顺次执行,通过 python 代码定义子工作,并反对各种 Operate 操作器,灵活性大,能满足用户的各种需要。本文次要介绍应用 Airflow 的 python Operator 调度 MaxCompute 工作。
一、环境筹备
- Python 2.7.5 PyODPS 反对 Python2.6 以上版本
- Airflow apache-airflow-1.10.7
1. 装置 MaxCompute 须要的包
pip install setuptools>=3.0
pip install requests>=2.4.0
pip install greenlet>=0.4.10 # 可选,装置后能减速 Tunnel 上传。
pip install cython>=0.19.0 # 可选,不倡议 Windows 用户装置。
pip install pyodps
留神:如果 requests 包抵触,先卸载再装置对应的版本
2. 执行如下命令查看装置是否胜利
python -c “from odps import ODPS”
二、开发步骤
1. 在 Airflow 家目录编写 python 调度脚本 Airiflow_MC.py
# -*- coding: UTF-8 -*-
import sys
import os
from odps import ODPS
from odps import options
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from configparser import ConfigParser
import time
reload(sys)
sys.setdefaultencoding('utf8')
#批改零碎默认编码。# MaxCompute 参数设置
options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}
cfg = ConfigParser()
cfg.read("odps.ini")
print(cfg.items())
odps = ODPS(cfg.get("odps","access_id"),cfg.get("odps","secret_access_key"),cfg.get("odps","project"),cfg.get("odps","endpoint"))
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retry_delay': timedelta(minutes=5),
'start_date':datetime(2020,1,15)
# 'email': ['airflow@example.com'],
# 'email_on_failure': False,
# 'email_on_retry': False,
# 'retries': 1,
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))
def read_sql(sqlfile):
with io.open(sqlfile, encoding='utf-8', mode='r') as f:
sql=f.read()
f.closed
return sql
def get_time():
print '以后工夫是{}'.format(time.time())
return time.time()
def mc_job ():
project = odps.get_project() # 取到默认我的项目。instance=odps.run_sql("select * from long_chinese;")
print(instance.get_logview_address())
instance.wait_for_success()
with instance.open_reader() as reader:
count = reader.count
print("查问表数据条数:{}".format(count))
for record in reader:
print record
return count
t1 = PythonOperator (
task_id = 'get_time' ,
provide_context = False ,
python_callable = get_time,
dag = dag )
t2 = PythonOperator (
task_id = 'mc_job' ,
provide_context = False ,
python_callable = mc_job ,
dag = dag )
t2.set_upstream(t1)
2. 提交
python Airiflow_MC.py
3. 进行测试
# print the list of active DAGs
airflow list_dags
# prints the list of tasks the "tutorial" dag_id
airflow list_tasks Airiflow_MC
# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks Airiflow_MC --tree
#测试 task
airflow test Airiflow_MC get_time 2010-01-16
airflow test Airiflow_MC mc_job 2010-01-16
4. 运行调度工作
登录到 web 界面点击按钮运行
5. 查看工作运行后果
(1)点击 view log
(2)查看后果
原文链接
本文为阿里云原创内容,未经容许不得转载。