在解决实时流数据时,不仅须要依照工夫做纵向聚合计算,还须要对最新的数据做横向比拟和计算,如金融里对所有股票的最新报价求百分位、工业物联网中计算一批设施的温度均值等。DolphinDB database 提供了横截面聚合引擎,能够对流数据中所有分组的最新数据做聚合运算。

横截面引擎的主体分为两局部:横截面数据表和计算引擎。横截面数据是横截面引擎的外部表,保留了所有分组最新的截面数据。计算引擎是一组聚合计算表达式以及触发器,零碎会依照指定的形式触发聚合运算,计算结果会输入到另外一个表中。

1. 根本用法

在DolphinDB database中,通过createCrossSectionalAggregator创立横截面聚合引擎。它返回一个横截面数据表,保留了所有分组最新的截面数据,往这个表写入数据意味着这些数据进入横截面聚合引擎进行计算。具体用法如下:

createCrossSectionalAggregator(name, [metrics], dummyTable, [outputTable], keyColumn, [triggeringPattern="perBatch"], [triggeringInterval=1000])

  • name是一个字符串,示意横截面聚合引擎的名称,是横截面聚合引擎的惟一标识。它能够蕴含字母,数字和下划线,但必须以字母结尾。
  • metrics是元代码。它能够是零碎内置或用户自定义的函数,如<[sum(qty), avg(price)]>,能够对聚合后果应用表达式,如<[avg(price1)-avg(price2)]>,也能够对计算列进行聚合运算,如<[std(price1-price2)]>。详情可参考元编程。
  • dummyTable是表对象,它能够不蕴含数据,但它的构造必须与订阅的流数据表雷同。
  • outputTable是表对象,用于保留计算结果。输出表的列数为metrics数量+1,第一列为TIMESTAMP类型,用于寄存产生计算的工夫戳,,其余列的数据类型必须与metrics返回后果的数据类型统一。
  • keyColumn是一个字符串,指定dummyTable的某列为横截面聚合引擎的key。keyColumn指定列中的每一个key对应表中的惟一一行。
  • triggeringPattern是一个字符串,示意触发计算的形式。它能够是以下取值:
  • "perRow": 每插入一行数据触发一次计算
  • "perBatch": 每插入一次数据触发一次计算
  • "interval": 按肯定的工夫距离触发计算
  • triggeringInterval是一个整数。只有当triggeringPattern的取值为interval时才失效,示意触发计算的工夫距离。默认值为1000毫秒。

2. 示例

上面通过一个例子阐明横截面聚合引擎的利用。在金融交易中,往往须要实时理解所有股票最新的报价均值、最近一次成交量总和以及最近一次交易的交易量。DolphinDB的横截面聚合引擎联合流数据订阅性能能够不便地实现这些工作。

(1)创立实时交易表

股票的实时交易表trades,蕴含以下次要字段:

sym:股票代码

time:工夫

price:成交价

qty:成交量

每当交易产生时,实时数据会写入trades表。创立trades表的脚本如下:

share streamTable(10:0,timesympriceqty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades

(2)创立横截面聚合引擎

tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", <[avg(price), sum(qty), sum(price*qty)]>, trades, outputTable, sym, perRow)

tradesCrossAggregator是横截面数据表,它按股票代码分组,每个股票有且仅有一行。当数据进入该表时,会计算每个股票的avg(price), sum(qty)和sum(price*qty)。每插入一条数据触发一次计算。

(3)横截面数据表订阅实时交易表

subscribeTable(,"trades","tradesCrossAggregator",-1,append!{tradesCrossAggregator},true)

通过流数据订阅性能,把实时数据写入横截面数据表。

(4)模仿数据产生

def writeData(n){

timev = 2000.10.08T01:01:01.001 + timestamp(1..n)

symv = take(AB, n)

pricev = take(102.1 33.4 73.6 223,n)

qtyv = take(60 74 82 59, n)

insert into trades values(timev, symv, pricev,qtyv)

}

writeData(4);

查看实时交易表,共有4条数据。

select * from trades

time sym price qty


2000.10.08T01:01:01.002 A 102.1 60

2000.10.08T01:01:01.003 B 33.4 74

2000.10.08T01:01:01.004 A 73.6 82

2000.10.08T01:01:01.005 B 223 59

查看横截面数据表,外面保留了A、B两只股票最近的两笔交易记录。

select * from tradesCrossAggregator

time sym price qty


2000.10.08T01:01:01.004 A 73.6 82

2000.10.08T01:01:01.005 B 223 59

查看横截面引擎的输出表,因为横截面引擎采纳了perRow每行触发计算的频率,所以每往横截面表写入一行数据,聚合引擎都会做一次计算,因而一共有4条记录。

select * from outputTable

time avgPrice sumqty Total


2019.07.08T10:04:41.731 102.1 60 6126

2019.07.08T10:04:41.732 67.75 134 8597.6

2019.07.08T10:04:41.732 53.5 156 8506.8

2019.07.08T10:04:41.732 148.3 141 19192.2

通过getAggregatorStat函数查看横截面引擎的状态。

getAggregatorStat().CrossSectionalAggregator

name user status lastErrMsg numRows numMetrics metrics triggeringPattern triggeringInterval


CrossSectionalDemo guest OK 2 3 [ avg(price), su...perRow 1000

通过removeAggregator函数删除横截面引擎。

removeAggregator("CrossSectionalDemo")

3. 触发计算的几种形式

横截面引擎一共有三种触发计算的形式:perRow、perBatch和interval。下面的例子中采纳的是每插入一行数据触发一次友链交易计算。上面介绍另外两种触发计算的形式。

  • perBatch

perBatch参数示意每追加一批数据就触发一次写入,下例按perBatch模式启用横截面引擎,脚本一共生成12条记录,分三批写入,输出表中预期有3条记录。

share streamTable(10:0,timesympriceqty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades

outputTable = table(1:0, timeavgPricesumqtyTotal, [TIMESTAMP,DOUBLE,INT,DOUBLE])

tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", <[avg(price), sum(qty), sum(price*qty)]>, trades, outputTable, sym, perBatch)

subscribeTable(,"trades","tradesCrossAggregator",-1,append!{tradesCrossAggregator},true)

def writeData(n){

timev = 2000.10.08T01:01:01.001 + timestamp(1..n)

symv = take(AB, n)

pricev = take(102.1 33.4 73.6 223,n)

qtyv = take(60 74 82 59, n)

insert into trades values(timev, symv, pricev,qtyv)

}

//写入三批数据,预期会触发三次计算,输入三次聚合后果。

writeData(4);

writeData(4);

writeData(4);

查看横截面数据表。

select * from tradesCrossAggregator

time sym price qty


2000.10.08T01:01:01.002 A 73.6 82

2000.10.08T01:01:01.003 B 33.4 59

查看输出表。插入了三批数据,因而输出表中有3条记录。

select * from outputTable

time avgPrice sumqty Total


2019.07.08T10:14:54.446 148.3 141 19192.2

2019.07.08T10:14:54.446 148.3 141 19192.2

2019.07.08T10:14:54.446 148.3 141 19192.2

  • interval

当触发计算的形式为interval时,须要指定triggeringInterval,示意每隔triggeringInterval毫秒触发一次计算。上面的例子中,分6次写入12条记录,每次距离500毫秒。设置横截面引擎每1000毫秒触发一次计算,预期最终输入3条记录。

share streamTable(10:0,timesympriceqty,[TIMESTAMP,SYMBOL,DOUBLE,INT]) as trades

outputTable = table(1:0, timeavgPricesumqtyTotal, [TIMESTAMP,DOUBLE,INT,DOUBLE])

tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", <[avg(price), sum(qty), sum(price*qty)]>, trades, outputTable, sym, interval,1000)

subscribeTable(,"trades","tradesCrossAggregator",-1,append!{tradesCrossAggregator},true)

def writeData(n){

timev = 2000.10.08T01:01:01.001 + timestamp(1..n)

symv = take(AB, n)

pricev = take(102.1 33.4 73.6 223,n)

qtyv = take(60 74 82 59, n)

insert into trades values(timev, symv, pricev,qtyv)

}

a = now()

writeData(2);

sleep(500)

writeData(2);

sleep(500)

writeData(2);

sleep(500)

writeData(2);

sleep(500)

writeData(2);

sleep(500)

writeData(2);

sleep(500)

b = now()

select count(*) from outputTable

3

如果再次执行select count(*) from outputTable,会发现随着工夫的推移,输出表的记录数会一直增长。这是因为在interval模式下,计算是依照事实工夫定时触发,并不依赖于是否有新的数据进来。

4. 横截面数据表的独立应用

从下面的例子中能够看出,横截面表尽管是为聚合计算提供的一个两头数据表,但其实在很多场合还是能独立发挥作用的。比方咱们须要定时刷新某只股票的最新交易价格,依照惯例思路是从实时交易表中按代码筛选股票并拿出最初一条记录,而交易表的数据量是随着工夫快速增长的,如果频繁做这样的查问,无论从零碎的资源耗费还是从查问的效力来看都不是很好的做法。而横截面表永远只保留所有股票的最近一次交易数据,数据量是稳固的,对于这种定时轮询的场景十分适合。

如果要独自应用横截面表,须要在创立横截面引擎时,把metrics,outputTable这两个参数设置为空。

tradesCrossAggregator=createCrossSectionalAggregator("CrossSectionalDemo", , trades,, sym, perRow)