给定高频交易数据以及报价数据,如何判断每笔交易是由买方驱动或是卖方驱动,是进行高频交易数据分析常常须要解决的问题。本文将介绍如何应用DolphinDB疾速计算每笔交易的驱动方,只需不到2秒钟即可对美国一天的level 1的高频交易数据进行计算并存入数据库。本文应用了非同时连贯(asof join)以及map-reduce。
本文用到的数据是含有逐笔交易的交易表trade和交易报价表nbbo。它们别离蕴含以下字段:
trade
Symbol:股票代码
Time:工夫
Trade_Volume:交易量
Trade_Price:交易价格
nbbo
Symbol:股票代码
Time:工夫
Bid_Price:买方报价
Offer_Price:卖方报价
本文用到的数据都是从纽约证券交易所网站获取,能够从NYSE的ftp下载。下载EQY_US_ALL_TRADE_20161024.gz和EQY_US_ALL_NBBO_20161024.gz两个文件,而后把它们解压,保留在/home/DolphinDB/Data目录下,把两个文件的最初一行删除,因为最初一行是用来标记文件结尾的。
sed -i '$ d' EQY_US_ALL_TRADE_20161024sed -i '$ d' EQY_US_ALL_NBBO_20161024复制代码
在DolphinDB中执行以下脚本,把数据导入到 DolphinDB database 中。本教程应用的是分布式数据库,如果想应用内存数据库,只需把dbPath批改为"",若要应用本地磁盘数据库,只需把dbPath批改为磁盘目录,比方“/home/DolphinDB/Data/EQY”。
DATA_DIR = "/home/DolphinDB/Data"login("admin","123456")dbPath= "dfs://EQY"db = database(dbPath, SEQ, 16)trade = loadTextEx(db, `trade, DATA_DIR + "/EQY_US_ALL_TRADE_20161024",'|')nbbo = loadTextEx(db, `nbbo, DATA_DIR + "/EQY_US_ALL_NBBO_20161024",'|')复制代码
把分布式表加载到内存中:
db=database(dbPath);trade = db.loadTable("trade")nbbo = db.loadTable("nbbo")复制代码
通过map-reduce分布式计算框架,把后果保留至分布式表中。分布式表的数据在物理上散布在不同的节点,通过DolphinDB的分布式引擎,能够做对立查问。
创立分布式表trade_side,用于保留计算结果。用于保留后果的表除了蕴含trade表中的字段,还蕴含Bid_Price、Offer_Price和Side字段。
model=select top 1 * from trademodel[`Bid_Price]=0.0model[`Offer_Price]=0.0model[`Side]='B'if(existsTable(dbPath, "trade_side")) db.dropTable("trade_side")db.createPartitionedTable(model, "trade_side", "Symbol")复制代码
判断每笔交易由买方或卖方驱动,咱们定义的算法如下:如果交易价格小于交易报价的平均价格,交易为卖方驱动,把Side设置为'S';如果交易价格大于交易报价的平均价格,交易为买方驱动,把Side设置为'B'。如果买方报价等于交易报价的平均价格,则把Side设置为NULL。
def saveTradeSide(t){ update t set Side = iif(Trade_Price<(Bid_Price + Offer_Price)*0.5, 'S',iif(Trade_Price>(Bid_Price + Offer_Price)*0.5, 'B',char())) update t set Side = NULL where Bid_Price >= Offer_Price or Bid_Price <= 0 loadTable("dfs://EQY", "trade_side").append!(t) return t.size()}复制代码
iif(condition, x, y)
:iif是条件运算符。condition是条件向量,如果condition[i]为true,则返回x[i],否则返回y[i]。
以下代码连贯交易表trades和交易报价表nbbo,sqlDS函数会依据输出的SQL元代码创立数据源。通过map-reduce函数mr把saveTradeSide利用到各个数据源。
ds = sqlDS(<select trade.*, Bid_Price, Offer_Price from aj(trade,nbbo,`Symbol`Time) where Time between 09:30:00.000000000 : 15:59:59.999999999>)mr(ds,saveTradeSide,+)复制代码
aj(asof join)是DolphinDB专门为时序数据设计的连贯形式。因为成交和交易报价的产生工夫不可能完全一致,因而不能应用等值连贯(equal join)。在下面的代码中,如果对同一支股票,表nbbo中没有与表trade中Time匹配的行,asof join会在右表中取同一支股票该时刻之前最近的工夫以匹配。
DolphinDB提供了基于map-reduce和迭代的分布式算法。用户只须要指定分布式数据源和外围函数,如map函数、reduce函数、final函数等,十分不便。DolphinDB的分布式应用无需编译、打包或者部署,能够在线应用,大大提高了数据分析师的工作效率。trade表有8023只股票共2700万条交易记录,nbbo表有7800万条记录。如此宏大的数据量,应用分布式计算,仅需1秒多,性能极佳。
查看IBM的前100条后果:
select top 100 Time, Exchange, Symbol, Trade_Volume, Trade_Price, Bid_Price, Offer_Price, Side from db.loadTable("trade_side") where Symbol=`IBMTime Exchange Symbol Trade_Volume Trade_Price Bid_Price Offer_Price Side09:30:00.105112000 80 IBM 900 150.4 150.12 150.97 'S'09:30:00.105201000 80 IBM 900 150.4 150.12 150.97 'S'09:30:00.105293000 80 IBM 400 150.4 150.12 150.97 'S'09:30:00.105398000 80 IBM 119 150.4 150.12 150.97 'S'09:30:00.105498000 80 IBM 81 150.4 150.12 150.97 'S'09:30:00.432775000 80 IBM 100 150.49 150.49 150.97 'S'09:30:00.452763000 90 IBM 200 150.49 150.49 150.97 'S'09:30:00.480602000 84 IBM 100 150.49 150.49 150.73 'S'09:30:00.480698000 84 IBM 100 150.49 150.49 150.73 'S'09:30:00.563528000 78 IBM 55,940 150.58 150.49 150.73 'S'09:30:00.577708000 90 IBM 100 150.59 150.49 150.95 'S'09:30:00.578129000 78 IBM 40 150.65 150.49 150.95 'S'09:30:00.578235000 78 IBM 60 150.69 150.2 150.9 'B'09:30:00.584212000 80 IBM 89 150.5 150.2 150.9 'S'09:30:00.600259000 80 IBM 1 150.5 150.2 150.9 'S'...复制代码
如果数据量不大,能够通过SQL语句进行计算,间接在线应用,十分不便。
select trade.*, Bid_Price, Offer_Price,iif(Trade_Price<(Bid_Price + Offer_Price)*0.5, 'S',i