本节将讲述应用 Connection、MyqLOperator、XComs 来实现一个残缺的airflow ETL。一、将数据存入数据库的原始办法1、创立表CREATE database demodb;use demodb;create table stock_prices_stage(ticker varchar(30),as_of_date date,open_price double,high_price double,low_price double,close_price double) COMMENT = '股票价格缓冲区表';create table stock_prices(id int not null AUTO_INCREMENT,ticker varchar(30),as_of_date date COMMENT '以后日期',open_price double,high_price double,low_price double,close_price double,created_at timestamp default now(),updated_at timestamp default now(),primary key (id))COMMENT = '股票价格表';create index ids_stockprices on stock_prices(ticker, as_of_date);create index ids_stockpricestage on stock_prices_stage(ticker, as_of_date);二、应用 airflow Connection 治理数据库连贯信息在上一节代码的根底上,将保留到文件的数据转存到数据库中,V2版本的代码如下:
download_stock_price_v2.py
2.1 传统连贯办法"""Example DAG demonstrating the usage of the BashOperator."""from datetime import timedeltafrom textwrap import dedentimport yfinance as yffrom airflow import DAGfrom airflow.operators.python import PythonOperatorfrom airflow.utils.dates import days_agofrom airflow.models import Variableimport mysql.connectordef download_price(*args, **context): stock_list = get_tickers(context) for ticker in stock_list: dat = yf.Ticker(ticker) hist = dat.history(period="1mo") # print(type(hist)) # print(hist.shape) # print(os.getcwd()) with open(get_file_path(ticker), 'w') as writer: hist.to_csv(writer, index=True) print("Finished downloading price data for " + ticker)def get_file_path(ticker): # NOT SAVE in distributed system return f'./{ticker}.csv'def load_price_data(ticker): with open(get_file_path(ticker), 'r') as reader: lines = reader.readlines() return [[ticker] + line.split(',')[:5] for line in lines if line[:4] != 'Date']def get_tickers(context): # 获取配置的变量Variables stock_list = Variable.get("stock_list_json", deserialize_json=True) # 如果有配置参数,则应用配置参数的数据(Trigger DAG with parameters) stocks = context["dag_run"].conf.get("stocks") if stocks: stock_list = stocks return stock_listdef save_to_mysql_stage(*args, **context): tickers = get_tickers(context) # 连贯数据库 mydb = mysql.connector.connect( host="98.14.13.15", user="root", password="Quant888", database="demodb", port=3307 ) mycursor = mydb.cursor() for ticker in tickers: val = load_price_data(ticker) print(f"{ticker} length={len(val)} {val[1]}") sql = """INSERT INTO stock_prices_stage (ticker, as_of_date, open_price, high_price, low_price, close_price) VALUES (%s,%s,%s,%s,%s,%s)""" mycursor.executemany(sql, val) mydb.commit() print(mycursor.rowcount, "record inserted.")default_args = { 'owner': 'airflow'}# [START instantiate_dag]with DAG( dag_id='download_stock_price_v2', default_args=default_args, description='download stock price and save to local csv files and save to database', schedule_interval=None, start_date=days_ago(2), tags=['quantdata'],) as dag: # [END instantiate_dag] dag.doc_md = """ This DAG download stock price """ download_task = PythonOperator( task_id="download_prices", python_callable=download_price, provide_context=True ) save_to_mysql_task = PythonOperator( task_id="save_to_database", python_callable=save_to_mysql_stage, provide_context=True ) download_task >> save_to_mysql_task而后在 airflow 后盾手动触发执行,前两次执行失败,后边调试后,执行胜利了
...