DolphinDB提供了功能强大的内存计算引擎,内置工夫序列函数,分布式计算以及流数据处理引擎,在泛滥场景下均可高效的计算K线。本教程将介绍DolphinDB如何通过批量解决和流式解决计算K线。

  • 历史数据批量计算K线

其中能够指定K线窗口的起始工夫;一天中能够存在多个交易时段,包含隔夜时段;K线窗口可重叠;应用交易量作为划分K线窗口的维度。须要读取的数据量特地大并且须要将后果写入数据库时,可应用DolphinDB内置的Map-Reduce函数并行计算。

  • 流式计算K线

应用API实时接管市场数据,并应用DolphinDB内置的流数据时序计算引擎(TimeSeriesAggregator)进行实时计算失去K线数据。

1. 历史数据K线计算

应用历史数据计算K线,可应用DolphinDB的内置函数bar,dailyAlignedBar,或wj。

1.1 不指定K线窗口的起始时刻,依据数据主动生成K线后果

bar(X,Y)返回X减去X除以Y的余数,个别用于将数据分组。

date = 09:32m 09:33m 09:45m 09:49m 09:56m 09:56m;bar(date, 5);

返回以下后果:

[09:30m,09:30m,09:45m,09:45m,09:55m,09:55m]

例子1:应用以下数据模仿美国股票市场:

n = 1000000date = take(2019.11.07 2019.11.08, n)time = (09:30:00.000 + rand(int(6.5*60*60*1000), n)).sort!()timestamp = concatDateTime(date, time)price = 100+cumsum(rand(0.02, n)-0.01)volume = rand(1000, n)symbol = rand(`AAPL`FB`AMZN`MSFT, n)trade = table(symbol, date, time, timestamp, price, volume).sortBy!(`symbol`timestamp)undef(`date`time`timestamp`price`volume`symbol)

计算5分钟K线:

barMinutes = 5OHLC = select first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume from trade group by symbol, date, bar(time, barMinutes*60*1000) as barStart

请留神,以上数据中,time列的精度为毫秒。若time列精度不是毫秒,则该当将 barMinutes601000 中的数字做相应调整。

1.2 须要指定K线窗口的起始时刻

须要指定K线窗口的起始时刻,可应用dailyAlignedBar函数。该函数可解决每日多个交易时段,亦可解决隔夜时段。

请留神,应用dailyAlignedBar函数时,工夫列必须含有日期信息,包含 DATETIME, TIMESTAMP 或 NANOTIMESTAMP 这三种类型的数据。指定每个交易时段窗口起始时刻的参数 timeOffset 必须应用相应的去除日期信息之后的 SECOND,TIME 或 NANOTIME 类型的数据。

例子2(每日一个交易时段):计算美国股票市场7分钟K线。数据沿用例子1中的trade表。

barMinutes = 7OHLC = select first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume from trade group by symbol, dailyAlignedBar(timestamp, 09:30:00.000, barMinutes*60*1000) as barStart

例子3(每日两个交易时段):中国股票市场每日有两个交易时段,上午时段为9:30至11:30,下午时段为13:00至15:00。

应用以下数据模仿:

n = 1000000date = take(2019.11.07 2019.11.08, n)time = (09:30:00.000 + rand(2*60*60*1000, n/2)).sort!() join (13:00:00.000 + rand(2*60*60*1000, n/2)).sort!()timestamp = concatDateTime(date, time)price = 100+cumsum(rand(0.02, n)-0.01)volume = rand(1000, n)symbol = rand(`600519`000001`600000`601766, n)trade = table(symbol, timestamp, price, volume).sortBy!(`symbol`timestamp)undef(`date`time`timestamp`price`volume`symbol)

计算7分钟K线:

barMinutes = 7sessionsStart=09:30:00.000 13:00:00.000OHLC = select first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume from trade group by symbol, dailyAlignedBar(timestamp, sessionsStart, barMinutes*60*1000) as barStart

例子4(每日两个交易时段,蕴含隔夜时段):某些期货每日有多个交易时段,且包含隔夜时段。本例中,第一个交易时段为8:45到下午13:45,另一个时段为隔夜时段,从下午15:00到第二天05:00。

应用以下数据模仿:

daySession =  08:45:00.000 : 13:45:00.000nightSession = 15:00:00.000 : 05:00:00.000n = 1000000timestamp = rand(concatDateTime(2019.11.06, daySession[0]) .. concatDateTime(2019.11.08, nightSession[1]), n).sort!()price = 100+cumsum(rand(0.02, n)-0.01)volume = rand(1000, n)symbol = rand(`A120001`A120002`A120003`A120004, n)trade = select * from table(symbol, timestamp, price, volume) where timestamp.time() between daySession or timestamp.time()>=nightSession[0] or timestamp.time()<nightSession[1] order by symbol, timestampundef(`timestamp`price`volume`symbol)

计算7分钟K线:

barMinutes = 7sessionsStart = [daySession[0], nightSession[0]]OHLC = select first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume from trade group by symbol, dailyAlignedBar(timestamp, sessionsStart, barMinutes*60*1000) as barStart

1.3 重叠K线窗口:应用wj函数

以上例子中,K线窗口均不重叠。若要计算重叠K线窗口,能够应用wj函数。应用wj函数,可对左表中的工夫列,指定绝对工夫范畴,在右表中进行计算。

例子5 (每日两个交易时段,重叠的K线窗口):模仿中国股票市场数据,每5分钟计算30分钟K线。

n = 1000000sampleDate = 2019.11.07symbols = `600519`000001`600000`601766trade = table(take(sampleDate, n) as date,     (09:30:00.000 + rand(7200000, n/2)).sort!() join (13:00:00.000 + rand(7200000, n/2)).sort!() as time,     rand(symbols, n) as symbol,     100+cumsum(rand(0.02, n)-0.01) as price,     rand(1000, n) as volume)

首先依据工夫来生成窗口,并且用cross join来生成股票和交易窗口的组合。

barWindows = table(symbols as symbol).cj(table((09:30:00.000 + 0..23 * 300000).join(13:00:00.000 + 0..23 * 300000) as time))

而后应用wj函数计算重叠窗口的K线数据:

OHLC = wj(barWindows, trade, 0:(30*60*1000),         <[first(price) as open, max(price) as high, min(price) as low, last(price) as close, sum(volume) as volume]>, `symbol`time)

1.4 应用交易量划分K线窗口

下面的例子咱们均应用工夫作为划分K线窗口的维度。在实践中,也能够应用其余维度作为划分K线窗口的根据。譬如用累计的交易量来计算K线。

例子6 (每日两个交易时段,应用累计的交易量计算K线):模仿中国股票市场数据,交易量每减少10000计算K线。

n = 1000000sampleDate = 2019.11.07symbols = `600519`000001`600000`601766trade = table(take(sampleDate, n) as date,     (09:30:00.000 + rand(7200000, n/2)).sort!() join (13:00:00.000 + rand(7200000, n/2)).sort!() as time,     rand(symbols, n) as symbol,     100+cumsum(rand(0.02, n)-0.01) as price,     rand(1000, n) as volume)    volThreshold = 10000select first(time) as barStart, first(price) as open, max(price) as high, min(price) as low, last(price) as close from (select symbol, price, cumsum(volume) as cumvol from trade context by symbol)group by symbol, bar(cumvol, volThreshold) as volBar

代码采纳了嵌套查问的办法。子查问为每个股票生成累计的交易量cumvol,而后在主查问中依据累计的交易量用bar函数生成窗口。

1.5 应用MapReduce函数减速

若需从数据库中提取较大量级的历史数据,计算K线,而后存入数据库,可应用DolphinDB内置的Map-Reduce函数mr进行数据的并行读取与计算。这种办法能够显著进步速度。

本例应用美国股票市场准确到纳秒的交易数据。原始数据存于"dfs://TAQ"数据库的"trades"表中。"dfs://TAQ"数据库采纳复合分区:基于交易日期Date的值分区与基于股票代码Symbol的范畴分区。

(1) 将存于磁盘的原始数据表的元数据载入内存:

login(`admin, `123456)db = database("dfs://TAQ")trades = db.loadTable("trades")

(2) 在磁盘上创立一个空的数据表,以寄存计算结果。以下代码建设一个模板表(model),并依据此模板表的schema在数据库"dfs://TAQ"中创立一个空的 OHLC 表以寄存K线计算结果:

model=select top 1 Symbol, Date, Time.second() as bar, PRICE as open, PRICE as high, PRICE as low, PRICE as close, SIZE as volume from trades where Date=2007.08.01, Symbol=`EBAYif(existsTable("dfs://TAQ", "OHLC"))    db.dropTable("OHLC")db.createPartitionedTable(model, `OHLC, `Date`Symbol)

(3) 应用mr函数计算K线数据,并将后果写入 OHLC 表中:

def calcOHLC(inputTable){    tmp=select first(PRICE) as open, max(PRICE) as high, min(PRICE) as low, last(PRICE) as close, sum(SIZE) as volume from inputTable where Time.second() between 09:30:00 : 15:59:59 group by Symbol, Date, 09:30:00+bar(Time.second()-09:30:00, 5*60) as bar    loadTable("dfs://TAQ", `OHLC).append!(tmp)    return tmp.size()}ds = sqlDS(<select Symbol, Date, Time, PRICE, SIZE from trades where Date between 2007.08.01 : 2019.08.01>)mr(ds, calcOHLC, +)

在以上代码中,ds是函数sqlDS生成的一系列数据源,每个数据源代表从一个数据分区中提取的数据;自定义函数calcOHLC为Map-Reduce算法中的map函数,对每个数据源计算K线数据,并将后果写入数据库,返回写入数据库的K线数据的行数;"+"是Map-Reduce算法中的reduce函数,将所有map函数的后果,亦即写入数据库的K线数据的行数相加,返回写入数据库的K线数据总数。

2. 实时K线计算

DolphinDB database 中计算实时K线的流程如下图所示:

DolphinDB中计算实时K线流程图

实时数据供应商个别会提供基于Python、Java或其余罕用语言的API的数据订阅服务。本例中应用Python来模仿接管市场数据,通过DolphinDB Python API写入流数据表中。DolphinDB的流数据时序聚合引擎(TimeSeriesAggregator)能够对实时数据依照指定的频率与挪动窗口计算K线。

本例应用的模仿实时数据源为文本文件trades.csv。该文件蕴含以下4列(一起给出一行样本数据):

以下三大节介绍实时K线计算的三个步骤:

2.1 应用 Python 接管实时数据,并写入DolphinDB流数据表

  • DolphinDB 中建设流数据表
share streamTable(100:0, `Symbol`Datetime`Price`Volume,[SYMBOL,DATETIME,DOUBLE,INT]) as Trade
  • Python程序从数据源 trades.csv 文件中读取数据写入DolphinDB。

实时数据中Datetime的数据精度是秒,因为pandas DataFrame中仅能应用DateTime[64]即nanatimestamp类型,所以下列代码在写入前有一个数据类型转换的过程。这个过程也实用于大多数数据须要荡涤和转换的场景。

import dolphindb as ddbimport pandas as pdimport numpy as npcsv_file = "trades.csv"csv_data = pd.read_csv(csv_file, dtype={'Symbol':str} )csv_df = pd.DataFrame(csv_data)s = ddb.session();s.connect("127.0.0.1",8848,"admin","123456")#上传DataFrame到DolphinDB,并对Datetime字段做类型转换s.upload({"tmpData":csv_df})s.run("data = select Symbol, datetime(Datetime) as Datetime, Price, Volume from tmpData")s.run("tableInsert(Trade,data)")

2.2 实时计算K线

本例中应用时序聚合引擎createTimeSeriesAggregator函数实时计算K线数据,并将计算结果输入到流数据表OHLC中。

实时计算K线数据,依据利用场景不同,能够分为以下2种状况:

  • 仅在每次工夫窗口完结时触发计算
  • 工夫窗口齐全不重合,例如每隔5分钟计算过来5分钟的K线数据
  • 工夫窗口局部重合,例如每隔1分钟计算过来5分钟的K线数据
  • 在每个工夫窗口完结时触发计算,同时在每个工夫窗口内数据也会依照肯定频率更新
    例如每隔1分钟计算过来1分钟的K线数据,但最近1分钟的K线不心愿等到窗口完结后再计算。心愿每隔1秒钟更新一次

上面针对上述的几种状况别离介绍如何应用createTimeSeriesAggregator函数实时计算K线数据。请依据理论须要抉择相应场景创立工夫序列聚合引擎。

2.2.1 仅在每次工夫窗口完结时触发计算

仅在每次工夫窗口完结时触发计算的状况下,又能够分为工夫窗口齐全不重合和局部重合两种场景。这两种状况可通过设定createTimeSeriesAggregator函数的windowSize参数和step参数以实现。上面具体阐明。

首先定义输出表:

share streamTable(100:0, `datetime`symbol`open`high`low`close`volume,[DATETIME, SYMBOL, DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG]) as OHLC

而后依据应用场景不同,抉择以下任意一种场景创立工夫序列聚合引擎。

场景一:每隔5分钟计算一次过来5分钟的K线数据,应用以下脚本定义时序聚合引擎,其中,windowSize参数取值与step参数取值相等

tsAggrKline = createTimeSeriesAggregator(name="aggr_kline", windowSize=300, step=300, metrics=<[first(Price),max(Price),min(Price),last(Price),sum(volume)]>, dummyTable=Trade, outputTable=OHLC, timeColumn=`Datetime, keyColumn=`Symbol)

场景二:每隔1分钟计算过来5分钟的K线数据,能够应用以下脚本定义时序聚合引擎。其中,windowSize参数取值是step参数取值的倍数

tsAggrKline = createTimeSeriesAggregator(name="aggr_kline", windowSize=300, step=60, metrics=<[first(Price),max(Price),min(Price),last(Price),sum(volume)]>, dummyTable=Trade, outputTable=OHLC, timeColumn=`Datetime, keyColumn=`Symbol)

最初,定义流数据订阅。若此时流数据表Trade中曾经有实时数据写入,那么实时数据会马上被订阅并注入聚合引擎:

subscribeTable(tableName="Trade", actionName="act_tsaggr", offset=0, handler=append!{tsAggrKline}, msgAsTable=true)

场景一的输出表前5行数据:

2.2.2 在每个工夫窗口完结触发计算,同时依照肯定频率更新计算结果

以窗口工夫1分钟计算vwap价格为例,10:00更新了聚合后果当前,那么下一次更新至多要等到10:01。依照计算规定,这一分钟内即便产生了很多交易,也不会触发任何计算。这在很多金融交易场景中是无奈承受的,心愿以更高的频率更新信息,为此引入了时序聚合引擎的updateTime参数。

updateTime参数示意计算的工夫距离,如果没有指定updateTime,只有在每个工夫窗口完结时,工夫序列聚合引擎才会触发一次计算。但如果指定了updateTime,在以下3种状况下都会触发计算:

  • 在每个工夫窗口完结时,工夫序列聚合引擎会触发一次计算
  • 每过updateTime个工夫单位,工夫序列聚合引擎都会触发一次计算
  • 如果数据进入后超过2updateTime个工夫单位(如果2updateTime有余2秒,则设置为2秒),以后窗口中仍有未计算的数据,工夫序列聚合引擎会触发一次计算

这样就能保障时序聚合引擎能在每个工夫窗口完结触发计算,同时在每个工夫窗口外部也会依照肯定频率触发计算。

须要阐明的是,时序聚合引擎要求在应用updateTime参数时,必须应用keyedTable作为输出表。具体起因如下:

  • 若将一般的table或streamTable作为输出表
    table与streamTable不会对反复的数据进行写入限度,因而在数据满足触发updateTime的条件而还未满足触发step的条件时,时序聚合引擎会一直向输出表增加同一个time的计算结果,最终失去的输出表就会有大量工夫雷同的记录,这个后果就没有意义。
  • 若将keyedStreamTable作为输出表
    keyedStreamTable不容许更新历史记录,也不容许往表中增加key值雷同的记录。往表中增加新记录时,零碎会主动查看新记录的主键值,如果新纪录的主键值与已有记录的主键值反复时,新纪录不会被写入。在本场景下体现的后果是,在数据还没有满足触发step的条件,但满足触发updateTime的条件时,时序聚合引擎将最近窗口的计算结果写入到输出表,却因为工夫雷同而被禁止写入,updateTIme参数同样失去了意义。
  • 应用keyedTable作为输出表
    keyedTable容许更新,往表中增加新记录时,零碎会主动查看新记录的主键值,如果新纪录的主键值与已有记录的主键值反复时,会更新表中对应的记录。在本场景下体现的后果是,同一个工夫计算结果可能会产生更新。在数据还没有满足触发step的条件,但满足触发updateTime的条件时,计算结果会被批改为依据最近窗口内的数据进行计算的后果,而不是向输出表中增加一条新的记录。直到数据满足触发step的条件时,才会向输出表中增加新的记录。而这个后果才是咱们预期想要达到的成果,因而时序聚合引擎要求在应用updateTime参数时,必须应用keyedTable作为输出表。

例如,要计算窗口为1分钟的K线,但最近1分钟的K线不心愿等到窗口完结后再计算。心愿每隔1秒钟都更新一次近1分钟的K线数据。咱们能够通过如下步骤实现这个场景。

首先,咱们须要创立一个keyedTable作为输出表,并将工夫列和股票代码列作为主键。当有新的数据注入输出表时,如果新纪录的工夫在表中已存在,会更新表中对应工夫的记录。这样就能保障每次查问时每个时刻的数据是最新的。

share keyedTable(`datetime`Symbol, 100:0, `datetime`Symbol`open`high`low`close`volume,[DATETIME,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG]) as OHLC
请留神:在应用时序聚合引擎时将keyedTable作为输出表,若时序聚合引擎指定了keyColumn参数,那么kyedTable须要同时将工夫相干列和keyColumn列作为主键。

每隔1分钟计算一次过来1分钟的K线数据,并且每隔1秒钟都更新一次近1分钟的K线数据,能够应用以下脚本定义时序聚合引擎。其中,windowSize参数取值与step参数取值相等,并指定updateTime参数取值为1秒钟,即每隔1秒种更新最近1分钟的数据。下例中的useWindowStartTime参数则用于指定输出表中的工夫为数据窗口的起始工夫。

tsAggrKline = createTimeSeriesAggregator(name="aggr_kline", windowSize=60, step=60, metrics=<[first(Price),max(Price),min(Price),last(Price),sum(volume)]>, dummyTable=Trade, outputTable=OHLC, timeColumn=`Datetime, keyColumn=`Symbol,updateTime=1, useWindowStartTime=true)
请留神,在应用工夫序列聚合引擎时,windowSize必须是step的整数倍,并且step必须是updateTime的整数倍。

最初,定义流数据订阅。若此时流数据表Trade中曾经有实时数据写入,那么实时数据会马上被订阅并注入聚合引擎:

subscribeTable(tableName="Trade", actionName="act_tsaggr", offset=0, handler=append!{tsAggrKline}, msgAsTable=true)

输出表的前5行数据:

2.3 在Python中展现K线数据

在本例中,聚合引擎的输出表也定义为流数据表,客户端能够通过Python API订阅输出表,并将计算结果展示到Python终端。

以下代码应用Python API订阅实时聚合计算的输入后果表OHLC,并将后果通过print函数打印进去。

import dolphindb as ddbimport pandas as pdimport numpy as np#设定本地端口20001用于订阅流数据s.enableStreaming(20001)def handler(lst):             print(lst)# 订阅DolphinDB(本机8848端口)上的OHLC流数据表s.subscribe("127.0.0.1", 8848, handler, "OHLC")

也可通过Grafana等可视化零碎来连贯DolphinDB database,对输出表进行查问并将后果以图表形式展示。