关于物联网:干货丨时序数据库DolphinDB流数据聚合引擎教程

41次阅读

共计 11426 个字符,预计需要花费 29 分钟才能阅读完成。


流数据是指随工夫持续增长的动态数据。互联网的经营数据和物联网的传感器数据都属于流数据的领域。流数据的个性决定了它的数据集是动态变化的,传统的面向静态数据表的计算引擎无奈胜任流数据畛域的剖析和计算工作,所以流数据场景须要专门的计算引擎来解决。

DolphinDB 提供了灵便的面向流数据的聚合引擎,通过 createStreamAggregator 函数创立流数据聚合引擎,可能继续一直地对已有的流数据做聚合计算,并且将计算结果继续输入到指定数据表中。

1. 聚合引擎利用框架

流聚合引擎自身是一个独立的计算引擎,只有向聚合引擎写入数据就能够触发计算,并将计算结果输入到指标表。而在流数据场景下,聚合引擎与流数据订阅性能 (subscribeTable) 配合,能够不便的将流数据继续的提供给聚合引擎。示例如下:

tradesAggregator = createStreamAggregator(5, 5, <[sum(qty)]>, trades, outputTable, `time)
subscribeTable(, "trades", "tradesAggregator", 0, append!{tradesAggregator}, true)

通过_subscribeTable_函数订阅流数据表,每次有新数据进入就会按指定规定触发 append!{tradesAggregators},把流数据继续输出到聚合引擎。

聚合引擎次要波及到以下概念:

  • 流数据表:DolphinDB 为流数据提供了一种特定的表对象——streamTable,它提供流数据的公布性能,其余节点或 APP 能够通过_subscribeTable_函数订阅或生产流数据。
  • 聚合引擎数据源:_createStreamAggregator_返回一个形象表,往这个形象表写入数据,意味着数据进入聚合引擎进行计算。
  • 聚合表达式:以元数据的格局提供一组解决流数据的聚合函数,相似如下格局 <[sum(qty)]>,<[sum(qty),max(qty),avg(price)]>。聚合引擎反对应用零碎内所有的聚合函数,也反对应用表达式来满足更简单的场景,比方 <[avg(price1)-avg(price2)]>,<[std(price1-price2)]> 这样的组合表达式。
  • 数据窗口(windowSize):指定每次计算时截取的流数据窗口长度。
  • 计算周期(step): 指定进行计算的距离。

2. 数据窗口

每次对流数据进行聚合计算,必须截取一段数据。截取的数据称为数据窗口,其长度由参数 windowSize 决定。计算距离由参数 step 决定。

数据窗口长度和计算距离的单位都是由参数 useSystemTime 决定。流数据聚合计算场景有两种工夫概念,第一种是数据的生成工夫,通常以工夫戳的格局记录于数据中,它可能采纳天、分钟、秒、毫秒、纳秒等不同的精度;第二种是数据进入聚合引擎的工夫,咱们也称为零碎工夫,这个工夫是由聚合引擎给数据打上的工夫戳,取自聚合引擎所在服务器的零碎工夫,精度为毫秒。零碎通过参数 useSystemTime 来确定数据窗口长度和计算距离是以哪一个工夫的精度为单位,当 useSystemTime=true 时以零碎工夫精度为单位,否则以数据生成工夫精度为单位。

如果依据第一条数据进入零碎的工夫来结构数据窗口的边界,那么它个别会是不规整的工夫。如果有很多组数据,并且每组都依据各自第一条数据进入零碎的工夫来结构数据窗口的边界,那么无奈将各组在雷同的数据窗口中进行比照。因而,零碎会依据 step 的值对第一个数据窗口的边界值进行规整解决,并确定一个整型的规整尺度 alignmentSize。具体的规整公式与工夫精度、step 无关:

当数据的工夫精度为秒时,如 DATETIME、SECOND 类型,alignmentSize 的取值如下:

step     alignmentSize
0~2      2
3~5      5
6~10     10
11~15    15
16~20    20
21~30    30
31~60    60

当数据工夫精度为毫秒时,如 TIMESTAMP、TIME 类型,alignmentSize 的取值如下:

step     alignmentSize
0~2      2
3~5      5
6~10     10
11~20    20
21~25    25
26~50    50
51~100   100
101~200  200
201~250  250
251~500  500
501~1000 1000

假如第一条数据工夫的最小精度值为 firstDataTime,那么第一个数据窗口的左边界最小精度通过规整后为 firstDataTime/alignmentSizealignmentSize,其中 / 代表相除后取整。例如,第一条数据工夫为 2018.10.08T01:01:01.365,则 firstDataTime=365。若 step=100,依据上表,alignmentSize=100,可得出规整后的第一个数据窗口左边界最小精度为 365100100=300,因而规整后的第一个数据窗口的左边界为 2018.10.08T01:01:01.300。

上面咱们通过一个例子来具体阐明零碎是如何进行流数据计算的。输出流数据表蕴含 time 和 qty 两列,time 精度为毫秒,依据设定的窗口对流数据进行继续 sum(qty)计算。本示例的流数据表中应用的工夫精度为毫秒,为了不便察看,模仿输出的数据流频率也设为每毫秒一条数据的频率。以下代码建设流数据表 trades,设定聚合计算参数,并定义函数 writeData 向流数据表 trades 中写入模仿数据。

share streamTable(1000:0, `time`qty, [TIMESTAMP, INT]) as trades
outputTable = table(10000:0, `time`sumQty, [TIMESTAMP, INT])
tradesAggregator = createStreamAggregator(5, 5, <[sum(qty)]>, trades, outputTable, `time)
subscribeTable(, "trades", "tradesAggregator", 0, append!{tradesAggregator}, true)    

def writeData(n){timev = 2018.10.08T01:01:01.001 + timestamp(1..n)
    qtyv = take(1, n)
    insert into trades values(timev, qtyv)
} 

第一次操作:向流数据表 trades 中写入 5 条数据。

writeData(5)

查看流数据表:

select * from trades
time                     qty
2018.10.08T01:01:01.002  1
2018.10.08T01:01:01.003  1
2018.10.08T01:01:01.004  1
2018.10.08T01:01:01.005  1
2018.10.08T01:01:01.006  1

查看输出表:

select * from outputTable
time                     sumQty
2018.10.08T01:01:01.000  3

产生计算的工夫是 2018.10.08T01:01:01.000。能够看出,系统对首个数据的工夫 2018.10.08T01:01:01.002 做了规整操作。

第二次操作:清空数据表,设置 windowSize=6,step=3,模仿写入 10 条数据:

share streamTable(1000:0, `time`qty, [TIMESTAMP, INT]) as trades
outputTable = table(10000:0, `time`sumQty, [TIMESTAMP, INT])
tradesAggregator = createStreamAggregator(6, 3, <[sum(qty)]>, trades, outputTable, `time)
subscribeTable(, "trades", "tradesAggregator", 0, append!{tradesAggregator}, true)    

def writeData(n){timev = 2018.10.08T01:01:01.001 + timestamp(1..n)
    qtyv = take(1, n)
    insert into trades values(timev, qtyv)
}
writeData(10)

查看流数据表:

select * from trades
time                     qty
2018.10.08T01:01:01.002  1
2018.10.08T01:01:01.003  1
2018.10.08T01:01:01.004  1
2018.10.08T01:01:01.005  1
2018.10.08T01:01:01.006  1
2018.10.08T01:01:01.007  1
2018.10.08T01:01:01.008  1
2018.10.08T01:01:01.009  1
2018.10.08T01:01:01.010  1
2018.10.08T01:01:01.011  1

查看输出表:

select * from outputTable
time                     qty
2018.10.08T01:01:00.997  1
2018.10.08T01:01:01.000  4
2018.10.08T01:01:01.003  6

从这个后果也能够发现聚合引擎窗口计算的规定:窗口起始工夫是以第一条数据工夫规整后为准,窗口是以 windowSize 为大小,step 为步长挪动的。

上面依据三次计算的过程来解释聚合引擎是如何进行窗口数据的确定的。为不便浏览,对工夫的形容中省略雷同的 2018.10.08T01:01:01 局部,只列出毫秒局部。窗口的起始是第一个数据的工夫 002 为根底进行对齐,工夫对齐后为 000,所以第一次触发计算的工夫是 000,依据 windowSize=6,所以实践上窗口边界是从上一秒的 997 到 002,最终第一次计算窗口中只蕴含了 002 一条记录,计算 sum(qty)的后果是 1;而第二次计算产生在 000,依据 windowSize=6, 那么理论窗口大小是 6 毫秒(从 000 到 005),理论窗口中蕴含了从 002 到 005 四个数据,计算结果为 4;以此类推,第三次的计算窗口是从 003 到 008, 理论蕴含了 6 个数据,计算结果为 6。

3. 聚合表达式

在理论的利用中,通常要对流数据进行比较复杂的聚合计算,这对聚合引擎的表达式灵活性提出了较高的要求。DolphinDB 聚合引擎反对应用简单的表达式进行实时计算。

  • 纵向聚合计算(按工夫序列聚合):
tradesAggregator = createStreamAggregator(6, 3, <sum(ofr)>, trades, outputTable, `time)
  • 横向聚合计算(按维度聚合):
tradesAggregator = createStreamAggregator(6, 3, <max(ofr)-min(ofr)>, trades, outputTable, `time)

tradesAggregator = createStreamAggregator(6, 3, <max(ofr-bid)>, trades, outputTable, `time)
  • 输入多个聚合后果:
tradesAggregator = createStreamAggregator(6, 3, <[max((ofr-bid)/(ofr+bid)*2), min((ofr-bid)/(ofr+bid)*2)]>, trades, outputTable, `time)
  • 多参数聚合函数的调用:

有些聚合函数会应用多个参数,例如 corr,percentile 等。

tradesAggregator = createStreamAggregator(6, 3, <corr(ofr,bid)>, trades, outputTable, `time)

tradesAggregator = createStreamAggregator(6, 3, <percentile(ofr-bid,99)/sum(ofr)>, trades, outputTable, `time)
  • 调用自定义函数:
def spread(x,y){return abs(x-y)/(x+y)*2
}
tradesAggregator = createStreamAggregator(6, 3, <spread(ofr, bid)>, trades, outputTable, `time)

留神:DolphinDB 不反对聚合函数嵌套调用,比方若要在流数据引擎中计算 sum(spread(ofr,bid)),零碎会给出异样提醒:Nested aggregated function is not allowed

4. 流数据源

DolphinDB 的聚合引擎应用流数据表 (streamTable) 来作为输出数据源,流数据表提供流式数据的公布性能,通过_subscribeTable_函数能够订阅流数据并触发数据处理流程,而聚合引擎就是解决数据的形式之一。

streamTable 作为聚合引擎的数据源,它并不仅仅是简略的将原始数据灌入聚合引擎,通过 subscribeTable 函数,能够在数据进入聚合引擎之前对数据做初步荡涤,上面的例子展现如何对流数据做初步过滤。

传感器采集电压和电流数据并实时上传作为流数据源,然而其中电压 voltage<=0.02 或电流 electric==NULL 的数据须要在进入聚合引擎之前过滤掉。

share streamTable(1000:0, `time`voltage`electric, [TIMESTAMP, DOUBLE, INT]) as trades
outputTable = table(10000:0, `time`avgElectric, [TIMESTAMP, DOUBLE])
// 模仿产生传感器数据
def writeData(blockNumber){timev = 2018.10.08T01:01:01.001 + timestamp(1..blockNumber)
        vt = 1..blockNumber * 0.01
        bidv = take([1,NULL,2], blockNumber)
        insert into trades values(timev, vt, bidv);
}
// 自定义数据处理过程,msg 即实时流入的数据
def dataPreHandle(aggrTable, msg){
    // 过滤 voltage<=0.02 或 electric==NULL 的有效数据
    t = select * from msg where voltage >0.02,not electric == NULL
    if(size(t)>0){insert into aggrTable values(t.time,t.voltage,t.electric)        
    }
}
tradesAggregator = createStreamAggregator(6, 3, <[avg(electric)]>, trades, outputTable, `time , false, , 2000)
// 订阅数据源时应用自定义的数据处理函数
subscribeTable(, "trades", "tradesAggregator", 0, dataPreHandle{tradesAggregator}, true)

writeData(10)

从流数据源中能够看到有两个 voltage<=0.02 和三个 electric==NULL 的数据:

select * from trades
time                      voltage    electric
2018.10.08T01:01:01.002   0.01       1
2018.10.08T01:01:01.003   0.02
2018.10.08T01:01:01.004   0.03       2
2018.10.08T01:01:01.005   0.04       1
2018.10.08T01:01:01.006   0.05
2018.10.08T01:01:01.007   0.06       2
2018.10.08T01:01:01.008   0.07       1
2018.10.08T01:01:01.009   0.08
2018.10.08T01:01:01.010   0.09       2
2018.10.08T01:01:01.011   0.1        1

查看输出表:

select * from outputTable
time                      avgElectric
2018.10.08T01:01:01.000   1.5
2018.10.08T01:01:01.003   1.5

从后果能够看到,voltage<=0.02 或 electric==NULL 的数据曾经被过滤了,所以第一个计算窗口没有数据,所以也没有聚合后果。

5. 聚合引擎输入

聚合后果能够输入到新建或已存在的内存表,也能够输入到流数据表。内存表对数据操作上较为灵便,能够进行更新或删除操作;输入到流数据表的数据无奈再做变动,然而能够通过流数据表将聚合后果再次公布。上面的例子展现如何将聚合后果表作为另一个聚合引擎的数据源。

本例从一个初始的流数据表 trades 里,通过聚合引擎 tradesAggregator 进行挪动均值计算,并将后果输入到流数据表 aggrOutput,再通过订阅 aggrOutput 表并关联聚合引擎 SecondAggregator 对计算结果求挪动峰值。

share streamTable(1000:0, `time`voltage`electric, [TIMESTAMP, DOUBLE, INT]) as trades
// 将输出表定义为流数据表,能够再次订阅
outputTable = streamTable(10000:0, `time`avgElectric, [TIMESTAMP, DOUBLE])
share outputTable as aggrOutput 

def writeData(blockNumber){timev = 2018.10.08T01:01:01.001 + timestamp(1..blockNumber)
        vt = 1..blockNumber * 0.01
        bidv = take([1,2], blockNumber)
        insert into trades values(timev, vt, bidv);
}

tradesAggregator = createStreamAggregator(6, 3, <[avg(electric)]>, trades, outputTable, `time , false, , 2000)
subscribeTable(, "trades", "tradesAggregator", 0, append!{tradesAggregator}, true)

// 对聚合后果进行订阅做二次聚合计算
outputTable2 =table(10000:0, `time`maxAggrElec, [TIMESTAMP, DOUBLE])
SecondAggregator = createStreamAggregator(6, 3, <[max(avgElectric)]>, aggrOutput, outputTable2, `time , false, , 2000)
subscribeTable(, "aggrOutput", "SecondAggregator", 0, append!{SecondAggregator}, true)

writeData(10)

查看输出表:

select * from outputTable2
time                      maxAggrElec
2018.10.08T01:01:00.992   1
2018.10.08T01:01:00.995   1.5

6.createAggregator 函数介绍及语法

createStreamAggregator 函数关联了流数据聚合利用的 3 个次要信息:

  • 输出数据源

输出数据源是流数据表,通过订阅的过后把数据源和聚合引擎分割起来。

  • 聚合表达式

定义聚合计算的逻辑,反对简单表达式。聚合引擎依据聚合表达式对流数据表做计算,并将后果输入到目标表中。

  • 输出表

聚合后果能够输入到新建或已存在的内存表或流数据表中。内存表在数据操作上更加灵便,能够做更新删除操作,而输入到流数据表的数据无奈再做改变,然而通过流数据表将聚合后果再次公布,能够满足更多利用场景。

6.1 语法

createStreamAggregator(windowTime, rollingTime, aggregators, dummyTable, outputTable, timeColumn[,useSystemTime, keyColumn, garbageSize])

6.2 返回对象

返回一个形象的表对象,作为聚合引擎的入口,向这个表写入数据,意味着数据进入聚合引擎进行计算。

6.3 参数

  • useSystemTime:布尔值,示意聚合引擎的驱动形式。当它为 true 时,示意工夫驱动,即当达到预约的工夫点,聚合引擎就会激活并以设定的窗口截取流数据进行计算。在这种模式下,工夫的精度为毫秒,零碎会给每一条进来的数据增加毫秒精度的工夫戳作为数据窗口的根据。但它为 false 时,示意数据驱动,只有当数据进入零碎时,聚合引擎才会被激活,零碎会抉择数据的工夫字段 timeColumn 作为数据窗口的根据。它是可选参数,默认值为 false。
  • windowSize:正整数,示意数据窗口的大小。数据窗口只蕴含下边界不蕴含上边界。
  • step:正整数,示意聚合计算的频率,即触发计算的工夫距离。

windowSize 和 step 的单位雷同,它们都取决于 useSystemTime。当 useSystemTime=true,它们的单位是毫秒,当 useSystemTime=false,它们的单位与数据中的工夫字段 timeColumn 雷同。

为了便于对计算结果的察看和比照,零碎会对窗口的起始工夫对立对齐。具体规定请查看2. 数据窗口

  • aggregators:元数据,示意聚合函数。支持系统内所有的聚合函数,如 <sum(qty)>,<sum(qty),max(qty),avg(price)>,也反对对聚合后果应用表达式来满足更简单的场景,如 <[avg(price1)-avg(price2)]>,<[std(price1-price2)]>。

为了晋升流数据聚合的性能,DolphinDB 对局部聚合函数进行了优化,每次计算时,充分利用上一个窗口的计算结果,最大水平地升高了反复计算。

以下是通过优化聚合函数:

corr:相关性
covar:协方差
first:第一个元素
last:最初一个元素
max:最大值
med:中位数
min:最小值
percentile:百分位数
std:标准差
sum:求和
sum2:平方和
var:方差
wavg:加权均匀
wsum:加权和
  • dummyTable:表,提供一个样本表对象,不须要有数据,然而表构造必须与输出的流数据表雷同。
  • outputTable:聚合后果的输出表。输出表的第一列是工夫类型,用于寄存产生计算的工夫点,如果 keyColumn 不为空,则第二列为 keyColumn(分组列),从第三列开始,用于寄存聚合计算的构造。最终输出表的构造如下:
工夫列   分组列   聚合后果列 1    聚合后果列 2
...
  • timeColumn:输出流数据表中的工夫列。
  • keyColumn:聚合计算的分组列。按 keyColumn 分组,对输出流数据进行分组聚合计算。它是可选参数。
  • garbageSize:正整数。当内存中缓存的历史数据记录条数超过 garbageSize 时,零碎将清理缓存。

当流数据聚合引擎在运行时,每次计算都会须要载入新的窗口数据到内存中进行计算,随着计算过程的继续,内存中缓存的数据会越来越多,这时候须要有一个机制来清理不再须要的历史数据。当内存中保留的历史数据行数超过 garbageSize 设定值时会引发清理内存。

当须要分组计算时,每个分组的历史数据记录数是别离统计的,所以内存清理的动作也是各分组独立进行的。当每个组的历史数据记录数超出 garbageSize 时都会引发清理内存。

6.4 示例

6.4.1 dummyTable 示例

本例展现 dummyTable 的作用。减少一个构造齐全与 trades 雷同的 modelTable 对象,将 modelTable 作为 dummyTable 参数,而理论的数据依然写入 trades。

share streamTable(1000:0, `time`qty, [TIMESTAMP, INT]) as trades
modelTable = table(1000:0, `time`qty, [TIMESTAMP, INT])
outputTable = table(10000:0, `time`sumQty, [TIMESTAMP, INT])
tradesAggregator = createStreamAggregator(5, 5, <[sum(qty)]>, modelTable, outputTable, `time)
subscribeTable(, "trades", "tradesAggregator", 0, append!{tradesAggregator}, true)    

def writeData(n){timev = 2018.10.08T01:01:01.001 + timestamp(1..n)
    qtyv = take(1, n)
    insert into trades values(timev, qtyv)
}

writeData(6)

最初依然输入了后果,阐明聚合引擎的 dummyTable 参数只是一个样本表,它是否蕴含数据对后果并没有影响。

6.4.2 分组聚合示例

输出的流数据表减少了分组列 sym,在聚合计算时设定 keyColumn 为 sym。

share streamTable(1000:0, `time`sym`qty, [TIMESTAMP, SYMBOL, INT]) as trades
outputTable = table(10000:0, `time`sym`sumQty, [TIMESTAMP, SYMBOL, INT])
tradesAggregator = createStreamAggregator(3, 3, <[sum(qty)]>, trades, outputTable, `time, false,`sym, 50)
subscribeTable(, "trades", "tradesAggregator", 0, append!{tradesAggregator}, true)    

def writeData(n){timev = 2018.10.08T01:01:01.001 + timestamp(1..n)
    symv =take(`A`B, n)
    qtyv = take(1, n)
    insert into trades values(timev, symv, qtyv)
}

writeData(6)

为了察看不便,对执行后果的 sym 列排序输入:

select * from trades order by sym
time                      sym   qty
2018.10.08T01:01:01.002   A     1
2018.10.08T01:01:01.004   A     1
2018.10.08T01:01:01.006   A     1
2018.10.08T01:01:01.003   B     1
2018.10.08T01:01:01.005   B     1
2018.10.08T01:01:01.007   B     1

outputTable 的后果是依据 sym 列的内容进行的分组计算。

select * from outputTable 
time                      sym   qty
2018.10.08T01:01:01.000   A     1
2018.10.08T01:01:01.003   A     1
2018.10.08T01:01:01.003   B     2

各组工夫规整后对立从 000 工夫点开始,依据 windowSize=3, step=3, 每个组的窗口会依照 000-003-006 划分,计算触发在 000,003 两个工夫点。须要留神的是窗口内若没有任何数据,零碎不会计算也不会产生后果,所以 B 组第一个窗口没有后果输入。

7. 总结

DolphinDB database 提供的 streamAggregator 是一个轻量、使用方便的流数据聚合引擎,它通过与 streamTable 流数据表单干来实现流数据的实时计算工作。它可能反对纵向聚合和横向聚合以及组合计算,反对自定义函数计算,分组聚合,有效数据预荡涤,多级计算等性能,能满足流数据实时计算各方面需要。

正文完
 0