关于SegmentFault:时序数据库DolphinDB历史数据回放教程

52次阅读

共计 6123 个字符,预计需要花费 16 分钟才能阅读完成。

一个量化策略在用于理论交易时,解决实时数据的程序通常为事件驱动。而研发量化策略时,须要应用历史数据进行回测,这时的程序通常不是事件驱动。因而同一个策略须要编写两套代码,不仅耗时而且容易出错。在 DolphinDB database 中,用户可将历史数据依照工夫程序以“实时数据”的形式导入流数据表中,这样就能够应用同一套代码进行回测和实盘交易。

DolphinDB 的流数据处理框架采纳公布 - 订阅 - 生产的模式。数据生产者将实时数据持续地以流的模式公布给所有数据订阅者。订阅者收到音讯当前,可应用自定义函数或者 DolphinDB 内置的聚合引擎来解决音讯。DolphinDB 流数据接口反对多种语言的 API,包含 C ++, C#, Java, 和 Python 等。用户能够应用这些 API 来编写更加简单的解决逻辑,更好地与理论生产环境相结合。详情请参考 DolphinDB 流数据教程。

本文介绍 replay 和 replayDS 函数,而后应用金融数据展现数据回放的过程与利用场景。

1. 函数介绍

replay

replay(inputTables, outputTables, [dateColumn], [timeColumn], [replayRate], [parallelLevel=1])

replay 函数的作用是将若干表或数据源同时回放到相应的输出表中。用户须要指定输出的数据表或数据源、输出表、日期列、工夫列、回放速度以及并行度。

replay 函数参数概念如下:

  • inputTables: 单个表或蕴含若干表或数据源(见 replayDS 介绍)的元组。
  • outputTables: 单个表或蕴含若干个表的元组,这些表通常为流数据表。输出表和输出表的个数统一,且一一对应,每对输出、输出表的构造雷同。
  • dateColumn, timeColumn: string, 示意输出表的日期和工夫列,若不指定则默认第一列为日期列。若输出表中工夫列同时蕴含日期和工夫,须要将 dateColumn 和 timeColumn 设为同一列。回放时,零碎将依据 dateColumn 和 timeColumn 的设定,决定回放的最小工夫精度。在此工夫精度下,同一时刻的数据将在雷同批次输入。比方一张表同时有日期列和工夫列,然而 replay 函数只设置了 dateColumn,那么同一天的所有数据会在一个批次输入。
  • replayRate: 整数, 示意每秒钟回放的数据条数。因为回放时同一个时刻数据在同一批次输入,因而当 replayRate 小于一个批次的行数时,理论输入的速率会大于 replayRate。
  • parallelLevel: 整数, 示意读取数据的并行度。当源数据大小超过内存大小的时候,须要应用 replayDS 函数将源数据划分为若干个小的数据源,顺次从磁盘中读取数据并回放。指定多个读取数据的线程数可晋升数据读取速度。

replayDS

replayDS(sqlObj, [dateColumn], [timeColumn], [timeRepartitionSchema])

replayDS 函数能够将输出的 SQL 查问转化为数据源,联合 replay 函数应用。其作用是依据输出表的分区以及 timeRepartitionSchema,将原始的 SQL 查问依照工夫程序拆分成若干小的 SQL 查问。

replayDS 函数参数概念如下:

  • sqlObj: SQL 元代码,示意回放的数据,如 <select * from sourceTable>。
  • dateColumn: 字符串, 示意日期列。若不指定,默认第一列为日期列。replayDS 函数默认日期列是数据源的一个分区列,并依据分区信息将原始 SQL 查问拆分为多个查问。
  • timeColumn: 字符串, 示意工夫列,配合 timeRepartitionSchema 应用。
  • timeRepartitionSchema: 工夫类型向量,如 08:00:00 .. 18:00:00。若同时指定了 timeColumn, 则对 SQL 查问在工夫维度上进一步拆分。

单个内存表回放

单内存表回放只须要设置输出表、输出表、日期列、工夫列和回放速度即可。

replay(inputTable, outputTable, `date, `time, 10)

应用 data source 的单表回放

当单表行数过多时,能够配合应用 replayDS 进行回放。首先应用 replayDS 生成 data source,本例中指定了日期列和 timeRepartitionColumn。回放调用与单个内存表回放类似,然而能够指定回放的并行度。replay 外部实现应用了 pipeline 框架,取数据和输入离开执行。当输出为 data source 时,多块数据能够并行读取,以防止输入线程期待的状况。此例中并行度设置为 2,示意有两个线程同时执行取数据的操作。

inputDS = replayDS(<select * from inputTable>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
replay(inputDS, outputTable, `date, `time, 1000, 2)

应用 data source 的多表回放

replay 也反对多张表的同时回放,只须要将多张输出表以元组的形式传给 replay, 并且别离指定输出表即可。这里输出表和输出表应该一一对应,每一对都必须有雷同的表构造。如果指定了日期列或工夫列,那么所有表中都该当有存在相应的列。

ds1 = replayDS(<select * from input1>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
ds2 = replayDS(<select * from input2>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
ds3 = replayDS(<select * from input3>, `date, `time, 08:00:00.000 + (1..10) * 3600000)
replay([ds1, ds2, ds3], [out1, out2, out3], `date, `time, 1000, 2)

勾销回放

如果 replay 函数是通过 submitJob 调用,能够应用 getRecentJob 获取 jobId,而后用 cancelJob 勾销回放。

getRecentJobs()
cancelJob(jobid)

如果是间接调用,可在另外一个 GUI session 中应用 getConsoleJobs 获取 jobId,而后应用 cancelConsoleJob 勾销回放工作。

getConsoleJobs()
cancelConsoleJob(jobId)

2.如何应用回放数据

回放的数据以流数据模式存在,咱们能够应用以下三种形式来订阅生产这些数据:

  • 在 DolphinDB 中订阅,应用 DolphinDB 脚本自定义回调函数来生产流数据。
  • 在 DolphinDB 中订阅,应用内置的流计算引擎来解决流数据,譬如工夫序列聚合引擎、横截面聚合引擎、异样检测引擎等。DolphinDB 内置的聚合引擎能够对流数据进行实时聚合计算,应用简便且性能优异。在 3.2 中,咱们应用横截面聚合引擎来解决回放的数据,并计算 ETF 的外在价值。横截面聚合引擎的具体用法参见 DolphinDB 用户手册。
  • 第三方客户端通过 DolphinDB 的流数据 API 来订阅和生产数据。

3. 金融示例

回放美国股市一天的 level1 交易数据,并计算 ETF 价值

本例中应用美国股市 2007 年 8 月 17 日的 level1 交易数据,利用 replayDS 进行数据回放,并通过 DolphinDB 内置的横截面聚合引擎计算 ETF 价值。数据寄存在分布式数据库 dfs://TAQ 的 quotes 表,上面是 quotes 表的构造以及数据预览。

// 加载数据库中 quotes 表的数据,查看表构造
quotes = database("dfs://TAQ").loadTable("quotes");
quotes.schema().colDefs;

name    typeString    typeInt
time    SECOND        10
symbol  SYMBOL        17
ofrsiz  INT           4
ofr     DOUBLE        16
mode    INT           4
mmid    SYMBOL        17
ex      CHAR          2
date    DATE          6
bidsize INT           4
bid     DOUBLE        16

// 查看 quotes 表内前十行的数据
select top 10 * from quotes;

symbol    date         time       bid     ofr     bidsiz    ofrsiz    mode    ex    mmid
A         2007.08.17   04:15:06   0.01    0       10        0         12      80
A         2007.08.17   06:21:16   1       0       1         0         12      80
A         2007.08.17   06:21:44   0.01    0       10        0         12      80
A         2007.08.17   06:49:02   32.03   0       1         0         12      80
A         2007.08.17   06:49:02   32.03   32.78   1         1         12      80
A         2007.08.17   07:02:01   18.5    0       1         0         12      84
A         2007.08.17   07:02:01   18.5    45.25   1         1         12      84
A         2007.08.17   07:54:55   31.9    45.25   3         1         12      84
A         2007.08.17   08:00:00   31.9    40      3         2         12      84
A         2007.08.17   08:00:00   31.9    35.5    3         2         12      84

(1)对要进行回放的数据进行划分。因为一天的数据共有 336,305,414 条,一次性导入内存再回放会有较长提早,还有可能导致内存溢出,因而先应用 replayDS 函数并指定参数 timeRepartitionSchema,将数据依照工夫戳分为 62 个局部。

sch = select name,typeString as type from quotes.schema().colDefs
trs = cutPoints(09:30:00.001..18:00:00.001, 60)
rds = replayDS(<select * from quotes>, `date, `time,  trs);

(2)定义输出表 outQuotes,个别为流数据表。

share streamTable(100:0, sch.name,sch.type) as outQuotes

(3)定义股票权重字典 weights 以及聚合函数 etfVal,用于计算 ETF 价值。在本例中,咱们仅计算 AAPL、IBM、MSFT、NTES、AMZN、GOOG 这几只股票的 ETF 价值。

defg etfVal(weights,sym, price) {return wsum(price, weights[sym])
}
weights = dict(STRING, DOUBLE)
weights[`AAPL] = 0.1
weights[`IBM] = 0.1
weights[`MSFT] = 0.1
weights[`NTES] = 0.1
weights[`AMZN] = 0.1
weights[`GOOG] = 0.5

(4)创立流聚合引擎,并订阅数据回放的输出表 outQuotes。订阅 outQuotes 表时,咱们指定了公布表的过滤条件,只有 symbol 为 AAPL、IBM、MSFT、NTES、AMZN、GOOG 的数据才会公布到横截面聚合引擎,缩小不必要的网络开销和数据传输。

setStreamTableFilterColumn(outQuotes, `symbol)
outputTable = table(1:0, `time`etf, [TIMESTAMP,DOUBLE])
tradesCrossAggregator=createCrossSectionalAggregator("etfvalue", <[etfVal{weights}(symbol, ofr)]>, quotes, outputTable, `symbol, `perBatch)
subscribeTable(,"outQuotes","tradesCrossAggregator",-1,append!{tradesCrossAggregator},true,,,,,`AAPL`IBM`MSFT`NTES`AMZN`GOOG) 

(5)开始回放,设定每秒回放 10 万条数据,聚合引擎则会实时地对回放的数据进行生产。

submitJob("replay_quotes", "replay_quotes_stream",  replay,  [rds],  [`outQuotes], `date, `time,100000,4)

(6)查看不同工夫点下咱们抉择的股票的 ETF 价值。

// 查看 outputTable 表内前 15 行的数据, 其中第一列工夫为聚合计算产生的工夫
>select top 15 * from outputTable;

time                    etf
2019.06.04T16:40:18.476   14.749
2019.06.04T16:40:19.476      14.749
2019.06.04T16:40:20.477      14.749
2019.06.04T16:40:21.477      22.059
2019.06.04T16:40:22.477      22.059
2019.06.04T16:40:23.477      34.049
2019.06.04T16:40:24.477      34.049
2019.06.04T16:40:25.477      284.214
2019.06.04T16:40:26.477      284.214
2019.06.04T16:40:27.477      285.68
2019.06.04T16:40:28.477      285.68
2019.06.04T16:40:29.478      285.51
2019.06.04T16:40:30.478      285.51
2019.06.04T16:40:31.478      285.51
2019.06.04T16:40:32.478      285.51

4. 性能测试

咱们在服务器上对 DolphinDB 的数据回放性能进行了性能测试。服务器配置如下:

主机:DELL PowerEdge R730xd

CPU:Intel Xeon(R) CPU E5-2650 v4(24 核 48 线程 2.20GHz)

内存:512 GB(32GB × 16, 2666 MHz)

硬盘:17T HDD(1.7T × 10, 读取速度 222 MB/s,写入速度 210 MB/s)

网络:万兆以太网

测试脚本如下:

sch = select name,typeString as type from  quotes.schema().colDefs
trs = cutPoints(09:30:00.001..18:00:00.001,60)
rds = replayDS(<select * from quotes>, `date, `time,  trs);
share streamTable(100:0, sch.name,sch.type) as outQuotes
jobid = submitJob("replay_quotes","replay_quotes_stream",  replay,  [rds],  [`outQuotes], `date, `time, , 4)

在不设定回放速率(即以最快的速率回放),并且输出表没有任何订阅时,回放 336,305,414 条数据耗时仅须要 90~110 秒。

正文完
 0