1. 概述
物联网设施(如机床、锅炉、电梯、水表、气表等等)无时无刻不在产生海量的设施状态数据和业务音讯数据,这些数据的在采集、计算、剖析过程中又经常波及异样数据的检测。
DolphinDB 作为一个高性能的分布式时序数据库 (time series database),内置了一个流数据框架,既能实时处理剖析这些物联网数据,也能对历史数据进行计算剖析,帮忙用户利用、施展这些数据的价值。DolphinDB 内置的流数据框架反对流数据的公布、订阅、预处理、实时内存计算、简单指标的滚动窗口计算等,是一个运行高效,应用便捷的流数据处理框架。具体介绍详见 DolphinDB 流数据教程。
针对异样数据检测的需要,DolphinDB 提供基于流数据框架的异样检测引擎函数,用户只需指定异样指标,异样检测引擎就能够实时地进行异样数据检测。
2. 异样检测引擎框架
DolphinDB 的异样检测引擎建设在流数据的公布 - 订阅模型之上。下例中,通过 createAnomalyDetectionEngine 函数创立异样检测引擎,并通过 subscribeTable 函数订阅流数据,每次有新数据流入就会按指定规定触发 append!{engine},将流数据继续输出异样检测引擎中。异样检测引擎实时检测数据是否合乎用户自定义的警报指标 temp>65,如发现异常数据,将它们输入到表 outputTable 中。
share streamTable(1000:0, `time`device`temp, [TIMESTAMP, SYMBOL, DOUBLE]) as sensor
share streamTable(1000:0, `time`device`anomalyType`anomalyString, [TIMESTAMP, SYMBOL, INT, SYMBOL]) as outputTable
engine = createAnomalyDetectionEngine("engine1", <[temp > 65]>, sensor, outputTable, `time, `device, 10, 1)
subscribeTable(, "sensor", "sensorAnomalyDetection", 0, append!{engine}, true)
这里对异样解决引擎波及到的一些概念做简要介绍:
- 流数据表:DolphinDB 为流式数据提供的一种特定的表对象,提供流式数据的公布性能。通过 subscribeTable 函数,其余的节点或利用能够订阅和生产流数据。
- 异样解决引擎数据源:为异样解决引擎提供 ” 原料 ” 的通道。createAnomalyDetectionEngine 函数返回一个形象表,向这个形象表写入数据,就意味着数据进入异样解决引擎进行计算。
- 异样指标:以元代码的格局提供一组解决流数据的布尔表达式。其中能够蕴含聚合函数,以反对简单的场景。
- 数据窗口:每次计算时截取的流数据窗口长度。数据窗口仅在指标中蕴含聚合函数时有意义。
- 输出表:异样检测引擎的输出表第一列必须是工夫类型,用于寄存检测到异样的工夫戳,如果有指定分组列,那么第二列为分组列,之后的两列别离为 int 类型和 string 或 symbol 类型,用于记录异样的类型(异样指标的表达式在 metrics 中的下标)和异样的内容。
3. 异样指标
异样检测引擎中的指标均要求返回布尔值。个别是一个函数或一个表达式。当指标中蕴含聚合函数,必须指定窗口长度和计算的工夫距离,异样检测引擎每隔一段时间,在固定长度的挪动窗口中计算指标。异样指标个别有以下三种类型:
- 只蕴含列名或非聚合函数,例如 qty > 10, lt(qty, prev(qty))。对于这类指标,异样检测引擎会对每一条收到的数据进行计算,判断是否合乎指标并决定是否输入。
- 所有呈现的列名都在聚合函数的参数中,例如 avg(qty – price) > 10, percentile(qty, 90) < 100, sum(qty) > prev(sum(qty))。对于这类指标,异样检测引擎只会在窗口产生挪动时对数据进行聚合计算,和工夫序列聚合引擎(Time Series Aggregator)相似。
- 呈现的列名中,既有作为聚合函数的参数,也有不是聚合函数参数,例如 avg(qty) > qty, le(med(qty), price)。对于这类指标,异样检测引擎会在在窗口产生挪动时对聚合列进行聚合计算,并在有数据达到时对每一条数据进行计算,其中聚合函数的返回值应用最近一个窗口的计算值。
4. 数据窗口
当异样指标中蕴含聚合函数时,用户必须指定数据窗口。流数据聚合计算是每隔一段时间,在固定长度的挪动窗口中进行。窗口长度由参数 windowSize 设定;计算的工夫距离由参数 step 设定。
在有多组数据的状况下,若每组都依据各自第一条数据进入零碎的工夫来结构数据窗口的边界,则个别无奈将各组的计算结果在雷同数据窗口中进行比照。思考到这一点,零碎依照参数 step 值确定一个整型的规整尺度 alignmentSize,以对各组第一个数据窗口的边界值进行规整解决。
(1)当数据工夫类型为 MONTH 时,会以第一条数据对应年份的 1 月作为窗口的上边界。
(2)当数据的工夫类型为 DATE 时,不对第一个数据窗口的边界值进行规整。
(2)当数据工夫精度为秒或分钟时,如 MINUTE, DATETIME 或 SECOND 类型,alignmentSize 取值规定如下表:
step alignmentSize
0~2 2
3~5 5
6~10 10
11~15 15
16~20 20
21~30 30
31~60 60
(2)当数据工夫精度为毫秒时,如 TIMESTAMP 或 TIME 类型,alignmentSize 取值规定如下表:
step alignmentSize
0~2 2
3~5 5
6~10 10
11~20 20
21~25 25
26~50 50
51~100 100
101~200 200
201~250 250
251~500 500
501~1000 1000
假如第一条数据工夫的最小精度值为 x,那么第一个数据窗口的左边界最小精度通过规整后为 x /alignmentSizealignmentSize,其中 / 代表相除后取整。举例来说,若第一条数据工夫为 2018.10.08T01:01:01.365,则 x =365。若 step=100,依据上表,alignmentSize=100,可得出规整后的第一个数据窗口左边界最小精度为 365100100=300,因而规整后的第一个数据窗口范畴为 2018.10.08T01:01:01.300 至 2018.10.08T01:01:01.400。
5. 利用示例
5.1 利用场景
现模仿传感器设施采集温度。假如窗口长度为 4ms,每隔 2ms 挪动一次窗口,每隔 1ms 采集一次温度,规定以下异样指标:
- 单次采集的温度超过 65;
- 单次采集的温度超过上一个窗口中 75% 的值;
- 窗口内平均温度和上一个窗口的平均温度相对误差大于 1%。
5.2 零碎设计
采集的数据寄存到流数据表中,异样检测引擎通过订阅流数据表来获取实时数据,并进行异样检测,合乎异样指标的数据输入到另外一个表中。
5.3 实现步骤
(1) 定义流数据表 sensor 来寄存采集的数据:
share streamTable(1000:0, `time`temp, [TIMESTAMP, DOUBLE]) as sensor
(2) 定义异样检测引擎和输出表 outputTable,输出表也是流数据表:
share streamTable(1000:0, `time`anomalyType`anomalyString, [TIMESTAMP, INT, SYMBOL]) as outputTable
engine = createAnomalyDetectionEngine("engine1", <[temp > 65, temp > percentile(temp, 75), abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01]>, sensor, outputTable, `time, , 6, 3)
(3) 异样检测引擎 engine 订阅流数据表 sensor:
subscribeTable(, "sensor", "sensorAnomalyDetection", 0, append!{engine}, true)
(4) 向流数据表 sensor 中写入 10 次数据模仿采集温度:
timev = 2018.10.08T01:01:01.001 + 1..10
tempv = 59 66 57 60 63 51 53 52 56 55
insert into sensor values(timev, tempv)
查看流数据表 sensor 的内容:
time temp
2018.10.08T01:01:01.002 59
2018.10.08T01:01:01.003 66
2018.10.08T01:01:01.004 57
2018.10.08T01:01:01.005 60
2018.10.08T01:01:01.006 63
2018.10.08T01:01:01.007 51
2018.10.08T01:01:01.008 53
2018.10.08T01:01:01.009 52
2018.10.08T01:01:01.010 56
2018.10.08T01:01:01.011 55
再查看后果表 outputTable:
time anomalyType anomalyString
2018.10.08T01:01:01.003 0 temp > 65
2018.10.08T01:01:01.003 1 temp > percentile(temp, 75)
2018.10.08T01:01:01.005 1 temp > percentile(temp, 75)
2018.10.08T01:01:01.006 2 abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01
2018.10.08T01:01:01.006 1 temp > percentile(temp, 75)
2018.10.08T01:01:01.009 2 abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01
上面具体解释异样检测引擎的计算过程。为不便浏览,对工夫的形容中省略雷同的 2018.10.08T01:01:01 局部,只列出毫秒局部。
(1)指标 temp > 65 只蕴含不作为函数参数的列 temp,因而会在每条数据达到时计算。模仿数据中只有 003 时的温度满足检测异样的指标。
(2)指标 temp > percentile(temp, 75) 中,temp 列既作为聚合函数 percentile 的参数,又独自呈现,因而会在每条数据达到时,将其中的 temp 与上一个窗口计算失去的 percentile(temp, 75) 比拟。第一个窗口基于第一行数据的工夫 002 进行对齐,对齐后窗口起始边界为 000,第一个窗口是从 000 到 002,只蕴含 002 一条记录,计算 percentile(temp, 75) 的后果是 59,数据 003 到 005 与这个值比拟,满足条件的有 003 和 005。第二个窗口是从 002 到 005,计算 percentile(temp, 75) 的后果是 60,数据 006 到 008 与这个值比拟,满足条件的有 006。第三个窗口是从 003 到 008,计算 percentile(temp, 75) 的后果是 63,数据 009 到 011 与这个值比拟,其中没有满足条件的行。最初一条数据 011 达到后,尚未触发新的窗口计算。
(3)指标 abs((avg(temp) – prev(avg(temp))) / avg(temp)) > 0.01 中,temp 只作为聚合函数 avg 的参数呈现,因而只会在每次窗口计算时查看。相似上一个指标的剖析,前三个窗口计算失去的 avg(temp) 别离为 59, 60.5, 58.33,满足 abs((avg(temp) – prev(avg(temp))) / avg(temp)) > 0.01 的工夫为第二个窗口和第三个窗口的计算工夫 006 和 009。
5.4 监控异样检测引擎的状态
getAggregatorStat().AnomalDetectionAggregator
name user status lastErrMsg numGroups numRows numMetrics metrics
------- ----- ------ ---------- --------- ------- ---------- --------------------
engine1 guest OK 0 10 3 temp > 65, temp > percentile(temp, 75), abs((avg(temp) - prev(avg(temp))) / avg(temp)) > 0.01
5.5 删除异常检测引擎
removeAggregator("engine1")
- createAnomalyEngine 函数介绍
语法
createAnomalyDetectionEngine(name, metrics, dummyTable, outputTable, timeColumn, [keyColumn], [windowSize], [step], [garbageSize])
返回对象
createAnomalyDetectionEngine 函数的作用是返回一个表对象,向该表写入数据意味着这些数据进入异样检测引擎进行计算。
参数
- name: 一个字符串,示意异样检测引擎的名称,是异样检测引擎的惟一标识。它能够蕴含字母,数字和下划线,但必须以字母结尾。
- metrics: 元代码。它的返回值必须是 bool 类型。它能够是函数或表达式,如 <[qty > 5, eq(qty, price)]>。能够在其中应用零碎内置或用户自定义的聚合函数(应用 defg 关键字定义),如 <[sum(qty) > 5, lt(avg(price), price)]>。详情可参考元编程。
- dummyTable: 表对象,它能够不蕴含数据,但它的构造必须与订阅的流数据表构造雷同。
- outputTable: 表对象,用于保留计算结果。它的第一列必须是工夫类型,用于寄存检测到异样的工夫戳,并且该列的数据类型要与 dummyTable 的工夫列统一。如果 keyColumn 参数不为空,那么 outputTable 的第二列为 keyColumn。之后的两列别离为 int 类型和 string/symbol 类型,用于记录异样的类型(在 metrics 中的下标)和异样的内容
- timeColumn: 字符串标量,示意输出流数据表的工夫列名称。
- keyColumn: 字符串标量,示意分组列。异样检测引擎会依照 keyColumn 对输出数据分组,并在每组中进行聚合计算。它是可选参数。
- windowSize: 正整数。当 metrics 中蕴含聚合函数时,windowSize 必须指定,示意用于聚合计算的数据窗口的长度。如果 metrics 中没有聚合函数,这个参数不起作用。
- step: 正整数。当 metrics 中蕴含聚合函数时,step 必须指定,示意计算的工夫距离。windowSize 必须是 step 的整数倍,否则会抛出异样。如果 metrics 中没有聚合函数,这个参数不起作用。
- garbageSize: 正整数。它是可选参数,默认值是 50,000。如果没有指定 keyColumn,当内存中历史数据的数量超过 garbageSize 时,零碎会清理本次计算不须要的历史数据。如果指定了 keyColumn,意味着须要分组计算时,内存清理是各分组独立进行的。当一个组的历史数据记录数超出 garbageSize 时,会清理该组不再须要的历史数据。若一个组的历史数据记录数未超出 garbageSize,则该组数据不会被清理。如果 metrics 中没有聚合函数,这个参数不起作用。
- 总结
DolphinDB 提供的异样检测引擎是一个轻量、使用方便的流数据引擎,它通过与流数据表单干来实现流数据的实时检测工作,可能满足物联网实时监控和预警的需要。