关于程序员:量化投资系列之股票日行情数据本地化

71次阅读

共计 7730 个字符,预计需要花费 20 分钟才能阅读完成。

量化投资系列之《股票日行情数据本地化》

上篇介绍了如何搭建可编程量化环境,所以曾经具备了本地数据存储的环境了,这篇将持续介绍通过 akShare 获取股票日行情数据,并存储到 clickhouse 数据库中,并且本地化查问某股票某时间段内的日行情数据。

数据本地化设计

1、流程设计

数据本地化的设计,流程应该是首次更新时,是工夫 t1 到当初 t2 的时间段内所有沪深 A 股的股票数据,随时间推移,只需增量更新 t2 到最新的工夫的日行情数据,所以须要一份本地配置文件记录每个股票更新的工夫节点。

动静增量更新日行情数据本地化流程图如下:

2、代码设计

因为刚接触 python,语法不是很相熟,不懂间接看 python 语法文档的,所以当初的代码设计是面向过程的,前面相熟 python 了再重构面向对象。

尽管不是面向对象,然而文件所负责的性能要清晰,便于前面保护和扩大,模块划分次要有:配置信息模块、网络数据获取模块、数据库治理模块、可视化模块。当初根底版是这几个模块,后续会增加策略模块、机器辨认模块、机器学习模块等等扩大模块。

模块如表所示:

py 文件 形容
ak_stock_data.py 通过 akShare 获取接口数据
stock_config.py 配置日行情数据增量更新的信息
stock_constant.py 放一些常量,例如:门路
stock_db.py 负责于 clickhouse 数据库进行交互,存储和本地获取
stock_plot.py 负责股票数据可视化

数据表

数据表的设计,只须要股票的必要字段,以及能够覆盖性更新。何为覆盖性更新,例如:股票 A 工夫 t1 的日行情数据,屡次更新数据库,不会产生多条数据,而是笼罩原来的 t1 数据。

表字段阐明如下:

名称 类型 形容
date Date 交易日
code String 股票代码
open Float32 开盘价
high Float32 最高价
low Float32 最低价
close Float32 收盘价
volume Float64 成交量,留神单位: 手
amount Float64 成交额,留神单位: 元

表创立的 sql 语句如下:

CREATE TABLE stock.stock_daily_price
(
    `date`   Date,
    `code`   String,
    `open`   Float32,
    `high`   Float32,
    `low`    Float32,
    `close`  Float32,
    `volume` Float64,
    `amount` Float64
) ENGINE = ReplacingMergeTree()
      ORDER BY (javaHash(code), date)

阐明: ENGINE 引擎,采纳表 ReplacingMergeTree,它会删除排序键值雷同的反复项,从而达到去除反复值。
排序键值 ORDER BY (javaHash(code), date):javaHash 函数对股票代码 code 进行 hash 值提取。ORDER BY (javaHash(code), date) 的意思是,按股票代码 code 的 hash 值和工夫 date 进行排序。

按上所述创立数据表,就具备了一个能够通过股票代码和日期作为惟一排序值,去除反复插入的数值,保障了股票在某天日行情数据的唯一性。

增量更新

想设计一个更新过的数据不再更新,只更新没更新的数据,很天然要一份配置表,在日行情数据本地化之前,只须要读取某只股票更新到那里了,而后接最初更新的日期到当初的工夫节点为所需更新时间段。所以咱们须要一个字段:股票最初更新日期。当然,网络数据更新不牢靠,会有失败状况,所以须要一个字段记录更新失败的次数。高级版大略是这样子

配置表如下:

名称 类型 形容
code String 股票代码
name String 股票名称
daily_update_time Date 最初一次更新的日期 <br/> 格局:20220903
error_daily_update_count int 更新谬误次数 <br/> success:0 fail:+1

阐明:error_daily_update_count 是记录失败次数,如果更新很屡次,还是失败的,可能这股票退市了或怎么了,能够定期革除这些无用股票。

stock_config.py

import os

import pandas as pd

import stock_constant as sc


def create_config():
    """
    配置文件参数
    code:股票代码,name: 股票名称
    last_update_time: 最初一次更新的日期
    error_update_count:申请更新谬误次数统计,定期清理屡次更新谬误的股票代码
    :return: DataFrame
    """columns = ['code','name','daily_update_time','error_daily_update_count']
    df = pd.DataFrame(columns=columns)
    save_config(df)
    return df


def read_config(is_new=False):
    """
    is_new if true del config else if part update
    如果是新则删除配置信息,重置,从新更新,否则增量更新
    读取配置信息,并对返回的 dataFrame 数据类型进行预处理
    :return:
    """
    if is_new:
        os.remove(sc.config_path)
        print('删除配置胜利,重置配置...')

    try:
        df = pd.read_csv(sc.config_path)
    except Exception as error:
        print("创立配置信息:", error)
        df = create_config()
    # code 强制转 str 类型,并补全股票代码为 6 位
    df['code'] = df['code'].astype(str)
    df['code'] = df['code'].str.zfill(6)
    df['error_daily_update_count'] = df['error_daily_update_count'].astype(int)
    return df


def save_config(df):
    """
    保留配置信息,疏忽列
    :param df:
    :return:
    """df.to_csv(sc.config_path, encoding='utf-8', index=False)

该模块,提供配置信息保留和获取
留神:获取配置时 code 强制转 str 类型,并补全股票代码为 6 位。保留配置时,疏忽列的保留。

效果图:

日行情数据本地化

akShare 三个接口即可实现
获取沪深 A 股所有股票,通过接口 ak.stock_sh_a_spot_em() ,ak.stock_sz_a_spot_em(),通过这两个接口,即可获取沪深 A 股所有股票代码,而后荡涤数据保留必要的数据到配置文件中 config.csv。
通过接口 ak.stock_zh_a_hist() 获取某股票某时间段的日行情数据,而后保留到数据库中,更新配置信息。

留神:须要批改网络 dataFrame 的列名,再保留到数据库中,列如:接口的列名是,[‘ 日期 ’, ‘ 收盘 ’, ‘ 最高 ’, ‘ 最低 ’, ‘ 开盘 ’, ‘ 成交量 ’, ‘ 成交额 ’],数据库的列名为:[‘date’, ‘open’, ‘high’, ‘low’, ‘close’, ‘volume’, ‘amount’],所以须要批改。

插入数据库是,须要重设 dataFrame index 为数据库列名的 date ,df.set_index([‘date’], inplace=True)。

话不多说,间接打码

ak_stock_data.py

import time

import akshare as ak

import stock_config as scg
import stock_constant as sct
import stock_db as sdb


def stock_code_net_to_csv():
    """
    获取 A 股股票代码,并保留到 data/xxx.csv 文件中
    新浪日行情数据须要:沪交所股票代码,代码增加前缀 sh,深交所股票代码,代码增加前缀 sz
    :return: [上交所 DataFrame, 深交所 DataFrame]
    """
    stock_sh_a_spot_em_df = ak.stock_sh_a_spot_em()
    # 批改股票代码前缀
    stock_sh_a_spot_em_df['代码'] = \
        stock_sh_a_spot_em_df['代码'].apply(lambda _: str(_))
    # stock_sh_a_spot_em_df['代码'].apply(lambda x: "{}{}".format('sh', x))
    # 保留
    stock_sh_a_spot_em_df.to_csv(sct.sh_code_path)

    stock_sz_a_spot_em_df = ak.stock_sz_a_spot_em()
    # 批改股票代码前缀
    stock_sz_a_spot_em_df['代码'] = \
        stock_sz_a_spot_em_df['代码'].apply(lambda _: str(_))
    # stock_sz_a_spot_em_df['代码'].apply(lambda x: "{}{}".format('sz', x))
    # 保留
    stock_sz_a_spot_em_df.to_csv(sct.sz_code_path)
    return stock_sh_a_spot_em_df, stock_sz_a_spot_em_df


def start():
    """
    量化投资程序入口
    :return:
    """
    config_df = scg.read_config()
    # 如果配置是空,则获取沪深 A 股信息,获取 code,保留到配置
    if config_df.empty:
        sh_df, sz_df = stock_code_net_to_csv()
        # 补全本地配置信息
        for index, row in sh_df.iterrows():
            config_df.loc[len(config_df), config_df.columns] = (row['代码'], row['名称'], sct.start_date, 0)
        for index, row in sz_df.iterrows():
            config_df.loc[len(config_df), config_df.columns] = (row['代码'], row['名称'], sct.start_date, 0)
        # 保留到本地
        scg.save_config(config_df)
        print('初始化配置信息,并保留到本地胜利...')
    else:
        print('曾经初始化过本地配置...')
    # 开始更新日行情数据
    update_stock_zh_a_daily_eastmoney()


def update_stock_zh_a_daily_eastmoney():
    """
    西方财产日行情数据,沪深 A 股,先从本地配置获取股票代码,再获取日行情数据
    获取胜利或失败,记录到本地数据,以便股票数据更新残缺
    :return:
    """
    success_code_list = []
    except_code_list = []
    # 读取配置信息
    config_df = scg.read_config()
    if config_df.empty:
        print('配置信息谬误,请查看...')
        return

    for index, row in config_df.iterrows():
        code = row['code']
        start_time = row['daily_update_time']
        end_time = time.strftime('%Y%m%d', time.localtime())
        try:
            except_code = str(code)
            df = ak.stock_zh_a_hist(symbol=str(code),
                start_date=start_time,
                end_date=end_time,
                adjust="qfq")
        except:
            except_code_list.append(except_code)
            # 更新配置信息 config_df
            config_df.loc[config_df['code'] == code, 'error_daily_update_count'] \
                = row['error_daily_update_count'] + 1

            print("产生异样 code", except_code)
            continue

        print('胜利获取股票: index->{} {}日行情数据'.format(index, code), '开始工夫: {} 完结工夫: {}'.format(start_time, end_time))
        if df.empty:
            continue

        # 获取对应的子列集
        sub_df = df[['日期', '收盘', '最高', '最低', '开盘', '成交量', '成交额']]
        # net_df 的列名可能和数据库列名不一样,批改列名对应数据库的列名
        sub_df.columns = ['date', 'open', 'high', 'low', 'close', 'volume', 'amount']
        # 批改 index 为 date 去掉默认的 index 便于直接插入数据库
        sub_df.set_index(['date'], inplace=True)
        sub_df.insert(sub_df.shape[1], 'code', str(code))
        sdb.to_table(sub_df, "stock_daily_price")
        # 更新配置信息 config_df
        config_df.loc[config_df['code'] == code, 'daily_update_time'] = end_time
        config_df.loc[config_df['code'] == code, 'error_daily_update_count'] = 0
        # 距离更新到本地配置
        if index % 100 == 0:
            scg.save_config(config_df)
            print('index: {} 更新本地配置一次...'.format(index))

        success_code_list.append(code)
        print(sub_df)
    # 同步配置到本地
    scg.save_config(config_df)

    print('更新本地配置胜利...')
    print("胜利申请的 code:", success_code_list)
    print("谬误申请 code:", except_code_list)


if __name__ == '__main__':
    start()
    pass

数据库效果图:

日志效果图:
从最初打印进去的日志能够看出,4 千多个股票,绝大部分数据申请是胜利的,并保留到数据库中,也更新到本地股票配置信息中 config.csv,不便下次增量下载日行情数据。

因为有了增量更新性能,再跑的时候成果是这样子的

本地数据查问

至此,网络数据本地化曾经实现,俗话说:巧妇难为无米之炊,当初米曾经在本地了,能够磨刀霍霍搞事件了。
所以封装一下从本地获取数据的接口:stock_daily(code, start_time, end_time)。只须要传一个股票代码,开始工夫,完结工夫,就能够从本地获取某股票某时间段的股票日行情数据。

数据库封装代码:

stock_db.py

import pandahouse as ph
import time

'''
数据的存储和获取
CREATE TABLE stock.stock_daily_price
(
    `date`   Date,
    `code`   String,
    `open`   Float32,
    `high`   Float32,
    `low`    Float32,
    `close`  Float32,
    `volume` Float64,
    `amount` Float64
--     `adj_factor` Int32,
--     `st_status` Int16,
--     `trade_status` Int16
) ENGINE = ReplacingMergeTree()
      ORDER BY (javaHash(code), date)
'''connection = dict(database="stock",
                  host="http://localhost:8123",
                  user='default',
                  password='sykent')


def to_table(data, table):
    """
    插入数据到表
    :param data:
    :param table:
    :return:
    """
    affected_rows = ph.to_clickhouse(data, table=table, connection=connection)
    return affected_rows


def from_table(sql):
    """
    查问表
    :param sql:
    :return: dataframe
    """
    last_time = time.time()
    df = ph.read_clickhouse(sql, connection=connection)
    print("db-> 耗时: {}  sql: {}".format((time.time() - last_time) * 1000, sql))
    return df


def stock_daily(code, start_time, end_time):
    """
    获取某股票,某时间段的日行情数据
    select *
    from stock_daily_price
    where code == '000001' and date between '2022-03-30' and '2022-07-29'
    :param code:
    :param start_time:
    :param end_time:
    :return:
    """sql ="select * from stock.stock_daily_price where code == '{}' and date between '{}' and '{}'" \
        .format(code, start_time, end_time)
    return from_table(sql)


def all_stock_daily(start_time, end_time):
    """
    获取所有股票某时间段的日行情数据
    select *
    from stock.stock_daily_price
    where date between '2022-03-30' and '2022-07-29'
    :param start_time:
    :param end_time:
    :return:
    """sql ="select * from stock.stock_daily_price where date between '{}' and '{}'" \
        .format(start_time, end_time)
    return from_table(sql)

效果图所示,获取中国安全本地化 2022 年的全副日行情数据,只须要 35ms,是不是很快。

下篇预报

获取本地日行情数据,k 线可视化,曾经封装实现,只须要一句代码即可画出丑陋 k 线图。

效果图:

完稿于 2022 年 09 月 04 日 14:26:10

本文由 mdnice 多平台公布

正文完
 0