1. 概述

物联网设施(如机床、锅炉、电梯、水表、气表等等)无时无刻不在产生海量的设施状态数据和业务音讯数据,这些数据的在采集、计算、剖析过程中又经常波及异样数据的检测。

DolphinDB作为一个高性能的分布式时序数据库 (time series database),内置了一个流数据框架,既能实时处理剖析这些物联网数据,也能对历史数据进行计算剖析,帮忙用户利用、施展这些数据的价值。DolphinDB内置的流数据框架反对流数据的公布、订阅、预处理、实时内存计算、简单指标的滚动窗口计算等,是一个运行高效,应用便捷的流数据处理框架。具体介绍详见DolphinDB流数据教程。

针对异样数据检测的需要,DolphinDB提供基于流数据框架的异样检测引擎函数,用户只需指定异样指标,异样检测引擎就能够实时地进行异样数据检测。

2. 异样检测引擎框架

DolphinDB的异样检测引擎建设在流数据的公布-订阅模型之上。下例中,通过createAnomalyDetectionEngine函数创立异样检测引擎,并通过subscribeTable函数订阅流数据,每次有新数据流入就会按指定规定触发append!{engine},将流数据继续输出异样检测引擎中。异样检测引擎实时检测数据是否合乎用户自定义的警报指标temp>65,如发现异常数据,将它们输入到表outputTable中。

share streamTable(1000:0, `time`device`temp, [TIMESTAMP, SYMBOL, DOUBLE]) as sensorshare streamTable(1000:0, `time`device`anomalyType`anomalyString, [TIMESTAMP, SYMBOL, INT, SYMBOL]) as outputTableengine = createAnomalyDetectionEngine("engine1", <[temp > 65]>, sensor, outputTable, `time, `device, 10, 1)subscribeTable(, "sensor", "sensorAnomalyDetection", 0, append!{engine}, true)

这里对异样解决引擎波及到的一些概念做简要介绍:

  • 流数据表:DolphinDB为流式数据提供的一种特定的表对象,提供流式数据的公布性能。通过subscribeTable函数,其余的节点或利用能够订阅和生产流数据。
  • 异样解决引擎数据源:为异样解决引擎提供"原料"的通道。createAnomalyDetectionEngine函数返回一个形象表,向这个形象表写入数据,就意味着数据进入异样解决引擎进行计算。
  • 异样指标:以元代码的格局提供一组解决流数据的布尔表达式。其中能够蕴含聚合函数,以反对简单的场景。
  • 数据窗口:每次计算时截取的流数据窗口长度。数据窗口仅在指标中蕴含聚合函数时有意义。
  • 输出表:异样检测引擎的输出表第一列必须是工夫类型,用于寄存检测到异样的工夫戳,如果有指定分组列,那么第二列为分组列,之后的两列别离为int类型和string或symbol类型,用于记录异样的类型(异样指标的表达式在metrics中的下标)和异样的内容。

3. 异样指标

异样检测引擎中的指标均要求返回布尔值。个别是一个函数或一个表达式。当指标中蕴含聚合函数,必须指定窗口长度和计算的工夫距离,异样检测引擎每隔一段时间,在固定长度的挪动窗口中计算指标。异样指标个别有以下三种类型:

  • 只蕴含列名或非聚合函数,例如qty > 10, lt(qty, prev(qty))。对于这类指标,异样检测引擎会对每一条收到的数据进行计算,判断是否合乎指标并决定是否输入。
  • 所有呈现的列名都在聚合函数的参数中,例如avg(qty - price) > 10, percentile(qty, 90) < 100, sum(qty) > prev(sum(qty))。对于这类指标,异样检测引擎只会在窗口产生挪动时对数据进行聚合计算,和工夫序列聚合引擎(Time Series Aggregator)相似。
  • 呈现的列名中,既有作为聚合函数的参数,也有不是聚合函数参数,例如avg(qty) > qty, le(med(qty), price)。对于这类指标,异样检测引擎会在在窗口产生挪动时对聚合列进行聚合计算,并在有数据达到时对每一条数据进行计算,其中聚合函数的返回值应用最近一个窗口的计算值。

4. 数据窗口

当异样指标中蕴含聚合函数时,用户必须指定数据窗口。流数据聚合计算是每隔一段时间,在固定长度的挪动窗口中进行。窗口长度由参数windowSize设定;计算的工夫距离由参数step设定。

在有多组数据的状况下,若每组都依据各自第一条数据进入零碎的工夫来结构数据窗口的边界,则个别无奈将各组的计算结果在雷同数据窗口中进行比照。思考到这一点,零碎依照参数step值确定一个整型的规整尺度alignmentSize,以对各组第一个数据窗口的边界值进行规整解决。

(1)当数据工夫类型为MONTH时,会以第一条数据对应年份的1月作为窗口的上边界。

(2)当数据的工夫类型为DATE时,不对第一个数据窗口的边界值进行规整。

(2)当数据工夫精度为秒或分钟时,如MINUTE, DATETIME或SECOND类型,alignmentSize取值规定如下表:

step     alignmentSize0~2      23~5      56~10     1011~15    1516~20    2021~30    3031~60    60

(2)当数据工夫精度为毫秒时,如TIMESTAMP或TIME类型,alignmentSize取值规定如下表:

step       alignmentSize0~2        23~5        56~10       1011~20      2021~25      2526~50      5051~100     100101~200    200201~250    250251~500    500501~1000   1000

假如第一条数据工夫的最小精度值为x,那么第一个数据窗口的左边界最小精度通过规整后为x/alignmentSizealignmentSize,其中/代表相除后取整。举例来说,若第一条数据工夫为 2018.10.08T01:01:01.365,则x=365。若step=100,依据上表,alignmentSize=100,可得出规整后的第一个数据窗口左边界最小精度为365100100=300,因而规整后的第一个数据窗口范畴为2018.10.08T01:01:01.300至 2018.10.08T01:01:01.400。

5. 利用示例

5.1 利用场景

现模仿传感器设施采集温度。假如窗口长度为4ms,每隔2ms挪动一次窗口,每隔1ms采集一次温度,规定以下异样指标:

  • 单次采集的温度超过65;
  • 单次采集的温度超过上一个窗口中75%的值;
  • 窗口内平均温度和上一个窗口的平均温度相对误差大于1%。

5.2 零碎设计

采集的数据寄存到流数据表中,异样检测引擎通过订阅流数据表来获取实时数据,并进行异样检测,合乎异样指标的数据输入到另外一个表中。

5.3 实现步骤

(1) 定义流数据表sensor来寄存采集的数据:

share streamTable(1000:0, `time`temp, [TIMESTAMP, DOUBLE]) as sensor

(2) 定义异样检测引擎和输出表outputTable,输出表也是流数据表:

share streamTable(1000:0, `time`anomalyType`anomalyString, [TIMESTAMP, INT, SYMBOL]) as outputTableengine = createAnomalyDetectionEngine("engine1", <[temp > 65, temp > percentile(temp, 75), abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01]>, sensor, outputTable, `time, , 6, 3)

(3) 异样检测引擎engine订阅流数据表sensor:

subscribeTable(, "sensor", "sensorAnomalyDetection", 0, append!{engine}, true)

(4) 向流数据表sensor中写入10次数据模仿采集温度:

timev = 2018.10.08T01:01:01.001 + 1..10tempv = 59 66 57 60 63 51 53 52 56 55insert into sensor values(timev, tempv)

查看流数据表sensor的内容:

time                       temp2018.10.08T01:01:01.002    592018.10.08T01:01:01.003    662018.10.08T01:01:01.004    572018.10.08T01:01:01.005    602018.10.08T01:01:01.006    632018.10.08T01:01:01.007    512018.10.08T01:01:01.008    532018.10.08T01:01:01.009    522018.10.08T01:01:01.010    562018.10.08T01:01:01.011    55

再查看后果表outputTable:

time                      anomalyType    anomalyString2018.10.08T01:01:01.003    0             temp > 652018.10.08T01:01:01.003    1             temp > percentile(temp, 75)2018.10.08T01:01:01.005    1             temp > percentile(temp, 75)2018.10.08T01:01:01.006    2             abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.012018.10.08T01:01:01.006    1             temp > percentile(temp, 75)2018.10.08T01:01:01.009    2             abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01

上面具体解释异样检测引擎的计算过程。为不便浏览,对工夫的形容中省略雷同的2018.10.08T01:01:01局部,只列出毫秒局部。

(1)指标temp > 65只蕴含不作为函数参数的列temp,因而会在每条数据达到时计算。模仿数据中只有003时的温度满足检测异样的指标。

(2)指标temp > percentile(temp, 75)中,temp列既作为聚合函数percentile的参数,又独自呈现,因而会在每条数据达到时,将其中的temp与上一个窗口计算失去的percentile(temp, 75)比拟。第一个窗口基于第一行数据的工夫002进行对齐,对齐后窗口起始边界为000,第一个窗口是从000到002,只蕴含002一条记录,计算percentile(temp, 75)的后果是59,数据003到005与这个值比拟,满足条件的有003和005。第二个窗口是从002到005,计算percentile(temp, 75)的后果是60,数据006到008与这个值比拟,满足条件的有006。第三个窗口是从003到008,计算percentile(temp, 75)的后果是63,数据009到011与这个值比拟,其中没有满足条件的行。最初一条数据011达到后,尚未触发新的窗口计算。

(3)指标abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01中,temp只作为聚合函数avg的参数呈现,因而只会在每次窗口计算时查看。相似上一个指标的剖析,前三个窗口计算失去的avg(temp)别离为59, 60.5, 58.33,满足abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01的工夫为第二个窗口和第三个窗口的计算工夫006和009。

5.4 监控异样检测引擎的状态

getAggregatorStat().AnomalDetectionAggregatorname    user  status lastErrMsg numGroups numRows numMetrics metrics             ------- ----- ------ ---------- --------- ------- ---------- --------------------engine1 guest OK                0         10      3          temp > 65, temp > percentile(temp, 75), abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01

5.5 删除异常检测引擎

removeAggregator("engine1")
  1. createAnomalyEngine函数介绍

语法

createAnomalyDetectionEngine(name, metrics, dummyTable, outputTable, timeColumn, [keyColumn], [windowSize], [step], [garbageSize]) 

返回对象

createAnomalyDetectionEngine函数的作用是返回一个表对象,向该表写入数据意味着这些数据进入异样检测引擎进行计算。

参数

  • name: 一个字符串,示意异样检测引擎的名称,是异样检测引擎的惟一标识。它能够蕴含字母,数字和下划线,但必须以字母结尾。
  • metrics: 元代码。它的返回值必须是bool类型。它能够是函数或表达式,如<[qty > 5, eq(qty, price)]>。能够在其中应用零碎内置或用户自定义的聚合函数(应用defg关键字定义),如<[sum(qty) > 5, lt(avg(price), price)]>。详情可参考元编程。
  • dummyTable: 表对象,它能够不蕴含数据,但它的构造必须与订阅的流数据表构造雷同。
  • outputTable: 表对象,用于保留计算结果。它的第一列必须是工夫类型,用于寄存检测到异样的工夫戳,并且该列的数据类型要与dummyTable的工夫列统一。如果keyColumn参数不为空,那么outputTable的第二列为keyColumn。之后的两列别离为int类型和string/symbol类型,用于记录异样的类型(在metrics中的下标)和异样的内容
  • timeColumn: 字符串标量,示意输出流数据表的工夫列名称。
  • keyColumn: 字符串标量,示意分组列。异样检测引擎会依照keyColumn对输出数据分组,并在每组中进行聚合计算。它是可选参数。
  • windowSize: 正整数。当metrics中蕴含聚合函数时,windowSize必须指定,示意用于聚合计算的数据窗口的长度。如果metrics中没有聚合函数,这个参数不起作用。
  • step: 正整数。当metrics中蕴含聚合函数时,step必须指定,示意计算的工夫距离。windowSize必须是step的整数倍,否则会抛出异样。如果metrics中没有聚合函数,这个参数不起作用。
  • garbageSize: 正整数。它是可选参数,默认值是50,000。如果没有指定keyColumn,当内存中历史数据的数量超过garbageSize时,零碎会清理本次计算不须要的历史数据。如果指定了keyColumn,意味着须要分组计算时,内存清理是各分组独立进行的。当一个组的历史数据记录数超出garbageSize时,会清理该组不再须要的历史数据。若一个组的历史数据记录数未超出garbageSize,则该组数据不会被清理。如果metrics中没有聚合函数,这个参数不起作用。
  1. 总结

DolphinDB提供的异样检测引擎是一个轻量、使用方便的流数据引擎,它通过与流数据表单干来实现流数据的实时检测工作,可能满足物联网实时监控和预警的需要。