乐趣区

关于大数据:量化交易干货丨如何使用DolphinDB计算K线

DolphinDB 提供了功能强大的内存计算引擎,内置工夫序列函数,分布式计算以及流数据处理引擎,在泛滥场景下均可高效的计算 K 线。本教程将介绍 DolphinDB 如何通过批量解决和流式解决计算 K 线。

  • 历史数据批量计算 K 线

其中能够指定 K 线窗口的起始工夫;一天中能够存在多个交易时段,包含隔夜时段;K 线窗口可重叠;应用交易量作为划分 K 线窗口的维度。须要读取的数据量特地大并且须要将后果写入数据库时,可应用 DolphinDB 内置的 Map-Reduce 函数并行计算。

  • 流式计算 K 线

应用 API 实时接管市场数据,并应用 DolphinDB 内置的流数据时序计算引擎 (TimeSeriesAggregator) 进行实时计算失去 K 线数据。

1. 历史数据 K 线计算

应用历史数据计算 K 线,可应用 DolphinDB 的内置函数 bar,dailyAlignedBar,或 wj。

1.1 不指定 K 线窗口的起始时刻,依据数据主动生成 K 线后果

bar(X,Y)返回 X 减去 X 除以 Y 的余数,个别用于将数据分组。

date = 09:32m 09:33m 09:45m 09:49m 09:56m 09:56m;
bar(date, 5);

返回以下后果:

[09:30m,09:30m,09:45m,09:45m,09:55m,09:55m]

例子 1:应用以下数据模仿美国股票市场:

n = 1000000
date = take(2019.11.07 2019.11.08, n)
time = (09:30:00.000 + rand(int(6.5*60*60*1000), n)).sort!()
timestamp = concatDateTime(date, time)
price = 100+cumsum(rand(0.02, n)-0.01)
volume = rand(1000, n)
symbol = rand(`AAPL`FB`AMZN`MSFT, n)
trade = table(symbol, date, time, timestamp, price, volume).sortBy!(`symbol`timestamp)
undef(`date`time`timestamp`price`volume`symbol)

计算 5 分钟 K 线:

barMinutes = 5
OHLC = select first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume from trade group by symbol, date, bar(time, barMinutes*60*1000) as barStart

请留神,以上数据中,time 列的精度为毫秒。若 time 列精度不是毫秒,则该当将 barMinutes601000 中的数字做相应调整。

1.2 须要指定 K 线窗口的起始时刻

须要指定 K 线窗口的起始时刻,可应用 dailyAlignedBar 函数。该函数可解决每日多个交易时段,亦可解决隔夜时段。

请留神,应用 dailyAlignedBar 函数时,工夫列必须含有日期信息,包含 DATETIME, TIMESTAMP 或 NANOTIMESTAMP 这三种类型的数据。指定每个交易时段窗口起始时刻的参数 timeOffset 必须应用相应的去除日期信息之后的 SECOND,TIME 或 NANOTIME 类型的数据。

例子 2 (每日一个交易时段):计算美国股票市场 7 分钟 K 线。数据沿用例子 1 中的 trade 表。

barMinutes = 7
OHLC = select first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume from trade group by symbol, dailyAlignedBar(timestamp, 09:30:00.000, barMinutes*60*1000) as barStart

例子 3 (每日两个交易时段):中国股票市场每日有两个交易时段,上午时段为 9:30 至 11:30,下午时段为 13:00 至 15:00。

应用以下数据模仿:

n = 1000000
date = take(2019.11.07 2019.11.08, n)
time = (09:30:00.000 + rand(2*60*60*1000, n/2)).sort!() join (13:00:00.000 + rand(2*60*60*1000, n/2)).sort!()
timestamp = concatDateTime(date, time)
price = 100+cumsum(rand(0.02, n)-0.01)
volume = rand(1000, n)
symbol = rand(`600519`000001`600000`601766, n)
trade = table(symbol, timestamp, price, volume).sortBy!(`symbol`timestamp)
undef(`date`time`timestamp`price`volume`symbol)

计算 7 分钟 K 线:

barMinutes = 7
sessionsStart=09:30:00.000 13:00:00.000
OHLC = select first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume from trade group by symbol, dailyAlignedBar(timestamp, sessionsStart, barMinutes*60*1000) as barStart

例子 4 (每日两个交易时段,蕴含隔夜时段):某些期货每日有多个交易时段,且包含隔夜时段。本例中,第一个交易时段为 8:45 到下午 13:45,另一个时段为隔夜时段,从下午 15:00 到第二天 05:00。

应用以下数据模仿:

daySession =  08:45:00.000 : 13:45:00.000
nightSession = 15:00:00.000 : 05:00:00.000
n = 1000000
timestamp = rand(concatDateTime(2019.11.06, daySession[0]) .. concatDateTime(2019.11.08, nightSession[1]), n).sort!()
price = 100+cumsum(rand(0.02, n)-0.01)
volume = rand(1000, n)
symbol = rand(`A120001`A120002`A120003`A120004, n)
trade = select * from table(symbol, timestamp, price, volume) where timestamp.time() between daySession or timestamp.time()>=nightSession[0] or timestamp.time()<nightSession[1] order by symbol, timestamp
undef(`timestamp`price`volume`symbol)

计算 7 分钟 K 线:

barMinutes = 7
sessionsStart = [daySession[0], nightSession[0]]
OHLC = select first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume from trade group by symbol, dailyAlignedBar(timestamp, sessionsStart, barMinutes*60*1000) as barStart

1.3 重叠 K 线窗口:应用 wj 函数

以上例子中,K 线窗口均不重叠。若要计算重叠 K 线窗口,能够应用 wj 函数。应用 wj 函数,可对左表中的工夫列,指定绝对工夫范畴,在右表中进行计算。

例子 5 (每日两个交易时段,重叠的 K 线窗口):模仿中国股票市场数据,每 5 分钟计算 30 分钟 K 线。

n = 1000000
sampleDate = 2019.11.07
symbols = `600519`000001`600000`601766
trade = table(take(sampleDate, n) as date, 
    (09:30:00.000 + rand(7200000, n/2)).sort!() join (13:00:00.000 + rand(7200000, n/2)).sort!() as time, 
    rand(symbols, n) as symbol, 
    100+cumsum(rand(0.02, n)-0.01) as price, 
    rand(1000, n) as volume)

首先依据工夫来生成窗口,并且用 cross join 来生成股票和交易窗口的组合。

barWindows = table(symbols as symbol).cj(table((09:30:00.000 + 0..23 * 300000).join(13:00:00.000 + 0..23 * 300000) as time))

而后应用 wj 函数计算重叠窗口的 K 线数据:

OHLC = wj(barWindows, trade, 0:(30*60*1000), 
        <[first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume]>, `symbol`time)

1.4 应用交易量划分 K 线窗口

下面的例子咱们均应用工夫作为划分 K 线窗口的维度。在实践中,也能够应用其余维度作为划分 K 线窗口的根据。譬如用累计的交易量来计算 K 线。

例子 6 (每日两个交易时段,应用累计的交易量计算 K 线):模仿中国股票市场数据,交易量每减少 10000 计算 K 线。

n = 1000000
sampleDate = 2019.11.07
symbols = `600519`000001`600000`601766
trade = table(take(sampleDate, n) as date, 
    (09:30:00.000 + rand(7200000, n/2)).sort!() join (13:00:00.000 + rand(7200000, n/2)).sort!() as time, 
    rand(symbols, n) as symbol, 
    100+cumsum(rand(0.02, n)-0.01) as price, 
    rand(1000, n) as volume)
    
volThreshold = 10000
select first(time) as barStart, first(price) as open, max(price) as high, min(price) as low, last(price) as close 
from (select symbol, price, cumsum(volume) as cumvol from trade context by symbol)
group by symbol, bar(cumvol, volThreshold) as volBar

代码采纳了嵌套查问的办法。子查问为每个股票生成累计的交易量 cumvol,而后在主查问中依据累计的交易量用 bar 函数生成窗口。

1.5 应用 MapReduce 函数减速

若需从数据库中提取较大量级的历史数据,计算 K 线,而后存入数据库,可应用 DolphinDB 内置的 Map-Reduce 函数 mr 进行数据的并行读取与计算。这种办法能够显著进步速度。

本例应用美国股票市场准确到纳秒的交易数据。原始数据存于 ”dfs://TAQ” 数据库的 ”trades” 表中。”dfs://TAQ” 数据库采纳复合分区:基于交易日期 Date 的值分区与基于股票代码 Symbol 的范畴分区。

(1) 将存于磁盘的原始数据表的元数据载入内存:

login(`admin, `123456)
db = database("dfs://TAQ")
trades = db.loadTable("trades")

(2) 在磁盘上创立一个空的数据表,以寄存计算结果。以下代码建设一个模板表(model),并依据此模板表的 schema 在数据库 ”dfs://TAQ” 中创立一个空的 OHLC 表以寄存 K 线计算结果:

model=select top 1 Symbol, Date, Time.second() as bar, PRICE as open, PRICE as high, PRICE as low, PRICE as close, SIZE as volume from trades where Date=2007.08.01, Symbol=`EBAY
if(existsTable("dfs://TAQ", "OHLC"))
    db.dropTable("OHLC")
db.createPartitionedTable(model, `OHLC, `Date`Symbol)

(3) 应用 mr 函数计算 K 线数据,并将后果写入 OHLC 表中:

def calcOHLC(inputTable){tmp=select first(PRICE) as open, max(PRICE) as high, min(PRICE) as low, last(PRICE) as close, sum(SIZE) as volume from inputTable where Time.second() between 09:30:00 : 15:59:59 group by Symbol, Date, 09:30:00+bar(Time.second()-09:30:00, 5*60) as bar
    loadTable("dfs://TAQ", `OHLC).append!(tmp)
    return tmp.size()}
ds = sqlDS(<select Symbol, Date, Time, PRICE, SIZE from trades where Date between 2007.08.01 : 2019.08.01>)
mr(ds, calcOHLC, +)

在以上代码中,ds 是函数 sqlDS 生成的一系列数据源,每个数据源代表从一个数据分区中提取的数据;自定义函数 calcOHLC 为 Map-Reduce 算法中的 map 函数,对每个数据源计算 K 线数据,并将后果写入数据库,返回写入数据库的 K 线数据的行数;”+” 是 Map-Reduce 算法中的 reduce 函数,将所有 map 函数的后果,亦即写入数据库的 K 线数据的行数相加,返回写入数据库的 K 线数据总数。

2. 实时 K 线计算

DolphinDB database 中计算实时 K 线的流程如下图所示:

DolphinDB 中计算实时 K 线流程图

实时数据供应商个别会提供基于 Python、Java 或其余罕用语言的 API 的数据订阅服务。本例中应用 Python 来模仿接管市场数据,通过 DolphinDB Python API 写入流数据表中。DolphinDB 的流数据时序聚合引擎 (TimeSeriesAggregator) 能够对实时数据依照指定的频率与挪动窗口计算 K 线。

本例应用的模仿实时数据源为文本文件 trades.csv。该文件蕴含以下 4 列(一起给出一行样本数据):

以下三大节介绍实时 K 线计算的三个步骤:

2.1 应用 Python 接管实时数据,并写入 DolphinDB 流数据表

  • DolphinDB 中建设流数据表
share streamTable(100:0, `Symbol`Datetime`Price`Volume,[SYMBOL,DATETIME,DOUBLE,INT]) as Trade
  • Python 程序从数据源 trades.csv 文件中读取数据写入 DolphinDB。

实时数据中 Datetime 的数据精度是秒,因为 pandas DataFrame 中仅能应用 DateTime[64]即 nanatimestamp 类型,所以下列代码在写入前有一个数据类型转换的过程。这个过程也实用于大多数数据须要荡涤和转换的场景。

import dolphindb as ddb
import pandas as pd
import numpy as np
csv_file = "trades.csv"
csv_data = pd.read_csv(csv_file, dtype={'Symbol':str} )
csv_df = pd.DataFrame(csv_data)
s = ddb.session();
s.connect("127.0.0.1",8848,"admin","123456")
#上传 DataFrame 到 DolphinDB,并对 Datetime 字段做类型转换
s.upload({"tmpData":csv_df})
s.run("data = select Symbol, datetime(Datetime) as Datetime, Price, Volume from tmpData")
s.run("tableInsert(Trade,data)")

2.2 实时计算 K 线

本例中应用时序聚合引擎 createTimeSeriesAggregator 函数实时计算 K 线数据,并将计算结果输入到流数据表 OHLC 中。

实时计算 K 线数据,依据利用场景不同,能够分为以下 2 种状况:

  • 仅在每次工夫窗口完结时触发计算
  • 工夫窗口齐全不重合,例如每隔 5 分钟计算过来 5 分钟的 K 线数据
  • 工夫窗口局部重合,例如每隔 1 分钟计算过来 5 分钟的 K 线数据
  • 在每个工夫窗口完结时触发计算,同时在每个工夫窗口内数据也会依照肯定频率更新
    例如每隔 1 分钟计算过来 1 分钟的 K 线数据,但最近 1 分钟的 K 线不心愿等到窗口完结后再计算。心愿每隔 1 秒钟更新一次

上面针对上述的几种状况别离介绍如何应用 createTimeSeriesAggregator 函数实时计算 K 线数据。请依据理论须要抉择相应场景创立工夫序列聚合引擎。

2.2.1 仅在每次工夫窗口完结时触发计算

仅在每次工夫窗口完结时触发计算的状况下,又能够分为工夫窗口齐全不重合和局部重合两种场景。这两种状况可通过设定 createTimeSeriesAggregator 函数的 windowSize 参数和 step 参数以实现。上面具体阐明。

首先定义输出表:

share streamTable(100:0, `datetime`symbol`open`high`low`close`volume,[DATETIME, SYMBOL, DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG]) as OHLC

而后依据应用场景不同,抉择以下任意一种场景创立工夫序列聚合引擎。

场景一:每隔 5 分钟计算一次过来 5 分钟的 K 线数据,应用以下脚本定义时序聚合引擎,其中,windowSize 参数取值与 step 参数取值相等

tsAggrKline = createTimeSeriesAggregator(name="aggr_kline", windowSize=300, step=300, metrics=<[first(Price),max(Price),min(Price),last(Price),sum(volume)]>, dummyTable=Trade, outputTable=OHLC, timeColumn=`Datetime, keyColumn=`Symbol)

场景二:每隔 1 分钟计算过来 5 分钟的 K 线数据,能够应用以下脚本定义时序聚合引擎。其中,windowSize 参数取值是 step 参数取值的倍数

tsAggrKline = createTimeSeriesAggregator(name="aggr_kline", windowSize=300, step=60, metrics=<[first(Price),max(Price),min(Price),last(Price),sum(volume)]>, dummyTable=Trade, outputTable=OHLC, timeColumn=`Datetime, keyColumn=`Symbol)

最初,定义流数据订阅。若此时流数据表 Trade 中曾经有实时数据写入,那么实时数据会马上被订阅并注入聚合引擎:

subscribeTable(tableName="Trade", actionName="act_tsaggr", offset=0, handler=append!{tsAggrKline}, msgAsTable=true)

场景一的输出表前 5 行数据:

2.2.2 在每个工夫窗口完结触发计算,同时依照肯定频率更新计算结果

以窗口工夫 1 分钟计算 vwap 价格为例,10:00 更新了聚合后果当前,那么下一次更新至多要等到 10:01。依照计算规定,这一分钟内即便产生了很多交易,也不会触发任何计算。这在很多金融交易场景中是无奈承受的,心愿以更高的频率更新信息,为此引入了时序聚合引擎的 updateTime 参数。

updateTime 参数示意计算的工夫距离,如果没有指定 updateTime,只有在每个工夫窗口完结时,工夫序列聚合引擎才会触发一次计算。但如果指定了 updateTime,在以下 3 种状况下都会触发计算:

  • 在每个工夫窗口完结时,工夫序列聚合引擎会触发一次计算
  • 每过 updateTime 个工夫单位,工夫序列聚合引擎都会触发一次计算
  • 如果数据进入后超过 2 updateTime 个工夫单位(如果 2 updateTime 有余 2 秒,则设置为 2 秒),以后窗口中仍有未计算的数据,工夫序列聚合引擎会触发一次计算

这样就能保障时序聚合引擎能在每个工夫窗口完结触发计算,同时在每个工夫窗口外部也会依照肯定频率触发计算。

须要阐明的是,时序聚合引擎要求在应用 updateTime 参数时,必须应用 keyedTable 作为输出表。具体起因如下:

  • 若将一般的 table 或 streamTable 作为输出表
    table 与 streamTable 不会对反复的数据进行写入限度,因而在数据满足触发 updateTime 的条件而还未满足触发 step 的条件时,时序聚合引擎会一直向输出表增加同一个 time 的计算结果,最终失去的输出表就会有大量工夫雷同的记录,这个后果就没有意义。
  • 若将 keyedStreamTable 作为输出表
    keyedStreamTable 不容许更新历史记录,也不容许往表中增加 key 值雷同的记录。往表中增加新记录时,零碎会主动查看新记录的主键值,如果新纪录的主键值与已有记录的主键值反复时,新纪录不会被写入。在本场景下体现的后果是,在数据还没有满足触发 step 的条件,但满足触发 updateTime 的条件时,时序聚合引擎将最近窗口的计算结果写入到输出表,却因为工夫雷同而被禁止写入,updateTIme 参数同样失去了意义。
  • 应用 keyedTable 作为输出表
    keyedTable 容许更新,往表中增加新记录时,零碎会主动查看新记录的主键值,如果新纪录的主键值与已有记录的主键值反复时,会更新表中对应的记录。在本场景下体现的后果是,同一个工夫计算结果可能会产生更新。在数据还没有满足触发 step 的条件,但满足触发 updateTime 的条件时,计算结果会被批改为依据最近窗口内的数据进行计算的后果,而不是向输出表中增加一条新的记录。直到数据满足触发 step 的条件时,才会向输出表中增加新的记录。而这个后果才是咱们预期想要达到的成果,因而时序聚合引擎要求在应用 updateTime 参数时,必须应用 keyedTable 作为输出表。

例如,要计算窗口为 1 分钟的 K 线,但最近 1 分钟的 K 线不心愿等到窗口完结后再计算。心愿每隔 1 秒钟都更新一次近 1 分钟的 K 线数据。咱们能够通过如下步骤实现这个场景。

首先,咱们须要创立一个 keyedTable 作为输出表,并将工夫列和股票代码列作为主键。当有新的数据注入输出表时,如果新纪录的工夫在表中已存在,会更新表中对应工夫的记录。这样就能保障每次查问时每个时刻的数据是最新的。

share keyedTable(`datetime`Symbol, 100:0, `datetime`Symbol`open`high`low`close`volume,[DATETIME,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG]) as OHLC

请留神:在应用时序聚合引擎时将 keyedTable 作为输出表,若时序聚合引擎指定了 keyColumn 参数,那么 kyedTable 须要同时将工夫相干列和 keyColumn 列作为主键。

每隔 1 分钟计算一次过来 1 分钟的 K 线数据,并且每隔 1 秒钟都更新一次近 1 分钟的 K 线数据,能够应用以下脚本定义时序聚合引擎。其中,windowSize 参数取值与 step 参数取值相等,并指定 updateTime 参数取值为 1 秒钟,即每隔 1 秒种更新最近 1 分钟的数据。下例中的 useWindowStartTime 参数则用于指定输出表中的工夫为数据窗口的起始工夫。

tsAggrKline = createTimeSeriesAggregator(name="aggr_kline", windowSize=60, step=60, metrics=<[first(Price),max(Price),min(Price),last(Price),sum(volume)]>, dummyTable=Trade, outputTable=OHLC, timeColumn=`Datetime, keyColumn=`Symbol,updateTime=1, useWindowStartTime=true)

请留神,在应用工夫序列聚合引擎时,windowSize 必须是 step 的整数倍,并且 step 必须是 updateTime 的整数倍。

最初,定义流数据订阅。若此时流数据表 Trade 中曾经有实时数据写入,那么实时数据会马上被订阅并注入聚合引擎:

subscribeTable(tableName="Trade", actionName="act_tsaggr", offset=0, handler=append!{tsAggrKline}, msgAsTable=true)

输出表的前 5 行数据:

2.3 在 Python 中展现 K 线数据

在本例中,聚合引擎的输出表也定义为流数据表,客户端能够通过 Python API 订阅输出表,并将计算结果展示到 Python 终端。

以下代码应用 Python API 订阅实时聚合计算的输入后果表 OHLC,并将后果通过 print 函数打印进去。

import dolphindb as ddb
import pandas as pd
import numpy as np
#设定本地端口 20001 用于订阅流数据
s.enableStreaming(20001)
def handler(lst):         
    print(lst)
# 订阅 DolphinDB(本机 8848 端口)上的 OHLC 流数据表
s.subscribe("127.0.0.1", 8848, handler, "OHLC")

也可通过 Grafana 等可视化零碎来连贯 DolphinDB database,对输出表进行查问并将后果以图表形式展示。

退出移动版