关于airflow:workflow-之-Dagster-基本用法qbit

前言技术栈 Windows 10Python 3.8.10poetry 1.2.0dagster 1.0.16dagit 1.0.16poetry github:https://github.com/python-poe...dagster github:https://github.com/dagster-io...dagster 官网文档:https://docs.dagster.io/装置用 poetry 初始化我的项目后在 pyproject.toml 增加以下依赖,而后运行 poetry update # 国内镜像源(可选)[[tool.poetry.source]]name = "aliyun"url = "https://mirrors.aliyun.com/pypi/simple/"default = true[tool.poetry.dependencies]python = "^3.8.10"dagster = "~1.0.16"dagit = "~1.0.16"测试代码test_dagster.py # encoding: utf-8# author: qbit# date: 2022-11-09# summary: 测试 dagster,加减乘除from dagster import get_dagster_logger, job, op, assetloggerDag = get_dagster_logger()@opdef OpSeed(): r""" 种子函数,生成初始参数 """ return (2, 1)@opdef OpAdd(seed): r""" 读入参数做加法 """ x, y = seed result = x + y loggerDag.info(f"{x} + {y} = {result}") return result@opdef OpSubtract(x): r""" 读入参数减 1 """ result = x - 1 loggerDag.info(f"{x} - 1 = {result}") return result@opdef OpMultiply(x): r""" 读入参数乘以 2 """ result = x * 2 loggerDag.info(f"{x} * 2 = {result}") # raise Exception('haha') return result@opdef OpDivide(x, y): r""" 读入参数做除法 """ result = y / x loggerDag.info(f"{y} / {x} = {result}") return result@jobdef arithmetic(): r""" 四则运算 """ seed = OpSeed() addResult = OpAdd(seed) subResult = OpSubtract(addResult) mulResult = OpMultiply(addResult) OpDivide(subResult, mulResult)if __name__ == "__main__": result = arithmetic.execute_in_process( run_config={"loggers": {"console": {"config": {"log_level": "info"}}}})间接运行间接应用 python 运行 ...

November 9, 2022 · 3 min · jiezi

关于airflow:Airflow-从入门到精通03完整-ETL-实例

本节将讲述应用 Connection、MyqLOperator、XComs 来实现一个残缺的airflow ETL。一、将数据存入数据库的原始办法1、创立表CREATE database demodb;use demodb;create table stock_prices_stage(ticker varchar(30),as_of_date date,open_price double,high_price double,low_price double,close_price double) COMMENT = '股票价格缓冲区表';create table stock_prices(id int not null AUTO_INCREMENT,ticker varchar(30),as_of_date date COMMENT '以后日期',open_price double,high_price double,low_price double,close_price double,created_at timestamp default now(),updated_at timestamp default now(),primary key (id))COMMENT = '股票价格表';create index ids_stockprices on stock_prices(ticker, as_of_date);create index ids_stockpricestage on stock_prices_stage(ticker, as_of_date);二、应用 airflow Connection 治理数据库连贯信息在上一节代码的根底上,将保留到文件的数据转存到数据库中,V2版本的代码如下: download_stock_price_v2.py 2.1 传统连贯办法"""Example DAG demonstrating the usage of the BashOperator."""from datetime import timedeltafrom textwrap import dedentimport yfinance as yffrom airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.utils.dates import days_agofrom airflow.models import Variableimport mysql.connectordef download_price(*args, **context): stock_list = get_tickers(context) for ticker in stock_list: dat = yf.Ticker(ticker) hist = dat.history(period="1mo") # print(type(hist)) # print(hist.shape) # print(os.getcwd()) with open(get_file_path(ticker), 'w') as writer: hist.to_csv(writer, index=True) print("Finished downloading price data for " + ticker)def get_file_path(ticker): # NOT SAVE in distributed system return f'./{ticker}.csv'def load_price_data(ticker): with open(get_file_path(ticker), 'r') as reader: lines = reader.readlines() return [[ticker] + line.split(',')[:5] for line in lines if line[:4] != 'Date']def get_tickers(context): # 获取配置的变量Variables stock_list = Variable.get("stock_list_json", deserialize_json=True) # 如果有配置参数,则应用配置参数的数据(Trigger DAG with parameters) stocks = context["dag_run"].conf.get("stocks") if stocks: stock_list = stocks return stock_listdef save_to_mysql_stage(*args, **context): tickers = get_tickers(context) # 连贯数据库 mydb = mysql.connector.connect( host="98.14.13.15", user="root", password="Quant888", database="demodb", port=3307 ) mycursor = mydb.cursor() for ticker in tickers: val = load_price_data(ticker) print(f"{ticker} length={len(val)} {val[1]}") sql = """INSERT INTO stock_prices_stage (ticker, as_of_date, open_price, high_price, low_price, close_price) VALUES (%s,%s,%s,%s,%s,%s)""" mycursor.executemany(sql, val) mydb.commit() print(mycursor.rowcount, "record inserted.")default_args = { 'owner': 'airflow'}# [START instantiate_dag]with DAG( dag_id='download_stock_price_v2', default_args=default_args, description='download stock price and save to local csv files and save to database', schedule_interval=None, start_date=days_ago(2), tags=['quantdata'],) as dag: # [END instantiate_dag] dag.doc_md = """ This DAG download stock price """ download_task = PythonOperator( task_id="download_prices", python_callable=download_price, provide_context=True ) save_to_mysql_task = PythonOperator( task_id="save_to_database", python_callable=save_to_mysql_stage, provide_context=True ) download_task >> save_to_mysql_task而后在 airflow 后盾手动触发执行,前两次执行失败,后边调试后,执行胜利了 ...

September 7, 2021 · 7 min · jiezi

关于airflow:airflow-2x-安装记录qbit

前言airflow 是 DAG(有向无环图)的工作管理系统,简略的了解就是一个高级版的 crontab。airflow 解决了 crontab 无奈解决的工作依赖问题。环境与组件Ubuntu 20.04Python-3.8(Anaconda3-2020.11-Linux-x86_64)MySQL 8.0apache-airflow 2.0.2装置步骤创立账号 sudo useradd airflow -m -s /bin/bashsudo passwd airflow切换账号 su airflow配置 Anaconda 环境变量 # ~/.bashrcexport PATH=/anaconda/anaconda3/bin:$PATH配置 pip 国内镜像 pip3 config set global.index-url https://mirrors.aliyun.com/pypi/simple/装置 airflow # 全家桶(master)pip3 install "apache-airflow[all]~=2.0.2"# OR 选择性装置pip3 install "apache-airflow[async,mysql,rabbitmq,celery,dask]~=2.0.2"为 airflow 增加 PATH 环境变量 # 在 /home/airflow/.bashrc 文件尾追加以下内容:export PATH=/home/airflow/.local/bin:$PATH查看 airflow 版本并创立 airflow 的 HOME 目录 # 默认 ~/airflow 目录airflow version设置 Ubuntu 零碎时区 timedatectl set-timezone Asia/Shanghai至此,装置结束MySQL 配置创立数据库和用户 CREATE DATABASE airflow CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;创立用户 ...

May 11, 2021 · 1 min · jiezi