共计 7273 个字符,预计需要花费 19 分钟才能阅读完成。
对加密货币盘口与逐笔交易数据的回放展现,可帮忙量化钻研人员测验量化策略,也有助于交易员复盘,加深对市场的洞察。DolphinDB 可实现盘口和逐笔交易数据的高速回放,以及对回放后果逐点查问。
DolphinDB database 反对将多个分布式表同步回放并公布到流数据表,例如对盘口和交易这两个表进行同步回放。前端 JavaScript 应用 DolphinDB Web API 来轮询回放输入的流数据表,实现盘口和交易数据的可视化回放。DolphinDB 自带 Web 服务器,整个流程可在 DolphinDB 内实现,无内部依赖。
加密货币盘口与逐笔交易数据回放可通过以下 4 个步骤来实现。用户亦可应用 docker 疾速体验回放性能,具体请参考文末介绍。
- 部署 DolphinDB 节点
到官网下载 DolphinDB 最新版本,并部署集群。部署教程请参考单服务器集群部署教程。
- 下载盘口和逐笔交易数据
本文应用的是火币研究院提供的加密货币交易数据,能够通过火币数据 API 获取。获取数据的示例代码能够参考 python 示例代码或 java 示例代码。
- 导入数据到 DolphinDB
本文将获取的 orderBook 的 tick 级数据保留为 csv 文件,通过 loadTextEx 函数疾速地将文件导入到数据库。用户也能够通过 Python API 或 Java API 将数据导入到 DolphinDB 中。以下代码在 DolphinDB GUI 中执行。
(1)数据预处理
如果保留的 csv 文件中第一行是无关信息,能够采纳上面脚本进行数据预处理,解决好的文件保留到某个目录,本案例将两个文件别离保留到 /hdd/data/orderBook-processed 和 /hdd/data/tick-processes 目录。如果 csv 文件第一行没有无关信息,可疏忽这一步骤。
// 删除数据文件第一行无关信息
def dataPreProcess(DIR){if(!exists(DIR+ "-processed/"))
mkdir(DIR+ "-processed/")
fileList = exec filename from files(DIR) where isDir = false, filename like "%.csv"
for(filename in fileList){f = file(DIR + "/" + filename)
y = f.readLines(1000000).removeHead!(1)
saveText(y, DIR+ "-processed/" + filename)
}
}
dataPreProcess("/hdd/data/orderBook")
dataPreProcess("/hdd/data/tick")
(2)创立 DolphinDB 数据库
依据数据量以及查问字段,数据库可依照交易标的代码和业务工夫进行组合分区。本案例中,数据库的名称为 dfs://huobiDB。如果须要批改,必须同时批改 replay.html 中数据库的名称。
def createDB(){if(existsDatabase("dfs://huobiDB"))
dropDatabase("dfs://huobiDB")
// 依照数据集的时间跨度,请自行调整 VALUE 分区日期范畴
db1 = database(, VALUE, 2018.09.01..2018.09.30)
db2 = database(, HASH, [SYMBOL,20])
db = database("dfs://huobiDB", COMPO, [db1,db2])
}
def createTick(){tick = table(100:0, `aggregate_ID`server_time`price`amount`buy_or_sell`first_trade_ID`last_trade_ID`product , [INT,TIMESTAMP,DOUBLE,DOUBLE,CHAR,INT,INT,SYMBOL])
db = database("dfs://huobiDB")
return db.createPartitionedTable(tick, `tick, `server_time`product)
}
def createOrderBook(){orderData = table(100:0, `lastUpdateId`server_time`buy_1_price`buy_2_price`buy_3_price`buy_4_price`buy_5_price`buy_6_price`buy_7_price`buy_8_price`buy_9_price`buy_10_price`buy_11_price`buy_12_price`buy_13_price`buy_14_price`buy_15_price`buy_16_price`buy_17_price`buy_18_price`buy_19_price`buy_20_price`sell_1_price`sell_2_price`sell_3_price`sell_4_price`sell_5_price`sell_6_price`sell_7_price`sell_8_price`sell_9_price`sell_10_price`sell_11_price`sell_12_price`sell_13_price`sell_14_price`sell_15_price`sell_16_price`sell_17_price`sell_18_price`sell_19_price`sell_20_price`buy_1_amount`buy_2_amount`buy_3_amount`buy_4_amount`buy_5_amount`buy_6_amount`buy_7_amount`buy_8_amount`buy_9_amount`buy_10_amount`buy_11_amount`buy_12_amount`buy_13_amount`buy_14_amount`buy_15_amount`buy_16_amount`buy_17_amount`buy_18_amount`buy_19_amount`buy_20_amount`sell_1_amount`sell_2_amount`sell_3_amount`sell_4_amount`sell_5_amount`sell_6_amount`sell_7_amount`sell_8_amount`sell_9_amount`sell_10_amount`sell_11_amount`sell_12_amount`sell_13_amount`sell_14_amount`sell_15_amount`sell_16_amount`sell_17_amount`sell_18_amount`sell_19_amount`sell_20_amount`product,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,SYMBOL])
db = database("dfs://huobiDB")
return db.createPartitionedTable(orderData, `orderBook, `server_time`product)
}
(3)将文本数据导入数据库
def loadTick(path, filename, mutable tb){tmp = filename.split("_")
product = tmp[1]
file = path + "/" + filename
t = loadText(file)
t[`product]=product
tb.append!(t)
}
def loopLoadTick(mutable tb, path){fileList = exec filename from files(path,"%.csv")
for(filename in fileList){
print filename
loadTick(path, filename, tb)
}
}
def loadOrderBook(path, filename, mutable tb){tmp = filename.split("_")
product = tmp[1]
file = path + "/" + filename
t = loadText(file)
t[`product] = product
tb.append!(t)
}
def loopLoadOrderBook(mutable tb, path){fileList = exec filename from files(path, "%.csv")
for(filename in fileList){
print filename
loadOrderBook(path, filename, tb)
}
}
login("admin","123456")
tb = createOrderBook()
loopLoadOrderBook(tb, "/hdd/data/orderBook-processed")
tb = createTick()
loopLoadTick(tb, "/hdd/data/tick-processed")
(4)定义数据回放函数
def replayData(productCode, startTime, length, rate){login('admin', '123456');
tick = loadTable('dfs://huobiDB', 'tick');
orderbook = loadTable('dfs://huobiDB', 'orderBook');
schTick = select name,typeString as type from tick.schema().colDefs;
schOrderBook = select name,typeString as type from orderbook.schema().colDefs;
share(streamTable(100:0, schOrderBook.name, schOrderBook.type), `outOrder);
share(streamTable(100:0, schTick.name, schTick.type), `outTick);
enableTablePersistence(objByName(`outOrder), true,true, 100000);
enableTablePersistence(objByName(`outTick), true,true, 100000);
clearTablePersistence(objByName(`outOrder));
clearTablePersistence(objByName(`outTick));
share(streamTable(100:0, schOrderBook.name, schOrderBook.type), `outOrder);
share(streamTable(100:0, schTick.name, schTick.type), `outTick);
enableTablePersistence(objByName(`outOrder), true,true, 100000);
enableTablePersistence(objByName(`outTick), true,true, 100000);
endTime = temporalAdd(startTime, length, "m")
sqlTick = sql(sqlCol("*"), tick, [<product=productCode>, <server_time between timestamp(pair(startTime, endTime))>]);
sqlOrder = sql(sqlCol("*"), orderbook, [<product=productCode>, <server_time between timestamp(pair(startTime, endTime))>]);
cutCount = length * 60 / 20
trs = cutPoints(timestamp(startTime..endTime), cutCount);
rds = replayDS(sqlTick, `server_time , , trs);
rds2 = replayDS(sqlOrder, `server_time , , trs);
return submitJob('replay_huobi','replay_huobi', replay, [rds,rds2], [`outTick,`outOrder],`server_time ,, rate);
}
addFunctionView(replayData);
- 数据回放
下载数据回放界面的 html 压缩包,下载地址:https://github.com/dolphindb/applications/raw/master/cryptocurr_replay/replay.zip。将 replay.zip 解压到 DolphinDB 程序包的 web 目录。
在浏览器地址栏中输出 http://[host]:[port]/replay.html 关上数据回放界面。这里的 host 和 port 是指数据节点的 IP 地址和端口号,如 http://192.168.1.135:8902/replay.html。
数据回放前,咱们能够设置以下参数:
- Product:加密货币代码
- Replay Rate:回放速度,即每秒钟回放的记录数。如果市场每秒钟产生 100 笔交易,Replay Rate 设置为 1000 就以 10 倍的速度回放。
- Start Time:数据的起始工夫
- Length:数据的时间跨度,单位是分钟。如果 Start Time 设置为 2018.09.17 00:00:00,Length 设置为 60,示意回放的数据是在 2018.09.17 00:00:00-2018.09.17 00:59:59 之间产生的。
回放完结后,点击左上角正方形图标按钮(“完结”按钮)。单击价格趋势图中的点,表格中会显示该工夫点之前的 10 笔数据。具体操作请查看图片 https://raw.githubusercontent.com/dolphindb/Tutorials_CN/master/images/replay/v.gif。
应用 docker 疾速体验回放性能
咱们提供了一个蕴含 DolphinDB Server 以及演示数据的 docker 容器,并打包成 tar 文件提供下载。用户仅须要装置 docker 环境,下载打包文件,运行上面的命令就能够疾速实现演示环境部署。
tar 文件下载地址:https://www.dolphindb.cn/downloads/cryptocurr_replay.tar.gz
gunzip cryptocurr_replay.tar.gz
##docker 若没有赋予非治理拜访权限,能够应用 sudo docker
docker import cryptocurr_replay.tar ddb/replay:v1
## 生成并启动容器
docker run -dt -p 8888:8848 --name replay1 ddb/replay:v1 /bin/bash /dolphindb/start.sh
启动容器后,docker 内 DolphinDB database 的拜访端口被映射到宿主机 8888 端口,关上浏览器拜访 http://[宿主机 ip]:8888/replay.html
,进入到回放演示界面。
为了管制 docker 容器大小,不便下载,演示数据仅蕴含 2018.09.17 一天的加密货币编号为 ETHUSDT,ETHBTC,BTCUSDT
的交易数据。
留神:因为内置的 license 文件会过期, 须要从官网下载最新的社区版 license 替换。下载社区版解压后进入 server 目录下,拷贝 dolphindb.lic 文件笼罩 docker 中的 /dolphindb/ 目录下同名文件。
sudo docker cp ./dolphindb.lic replay1:/dolphindb/dolphindb.lic
重启 docker:
sudo docker restart replay1
注意事项
- 本案例以后仅限单用户应用,不反对多用户同时回放。
- 为了简化操作,数据库名称和数据库用户信息均固化在网页中,若有须要请自行批改 replay.html 文件
应用中有任何问题,欢送退出智臾科技:DolphinDB 技术交换群,内含二维码!