乐趣区

关于dolphindb:时序数据库在工业物联网中的应用

  1. 工业物联网的数据特点和痛点

工业物联网的数据采集有着频率高、设施多、维度高的特点,数据量十分大,对系统的吞吐量有很高的要求。同时工业物联网往往须要零碎可能实时处理数据,对系统预警,监控,甚至反控。不少零碎还须要提供图形化终端供操作工人实时监控设施的运行,这给整个零碎带来了更大的压力。对于采集到的海量历史数据,通常还须要进行离线的建模和剖析。因而,工业物联网的数据平台有着十分刻薄的要求,既要有十分高的吞吐量,又要有较低的延时;既要可能实时处理流数据,又要可能解决海量的历史数据;既要满足简略的点查问的要求,又要满足批量数据简单剖析的要求。

传统的事务型数据库,比方 SQL Server、Oracle 和 MySQL,无奈满足高吞吐量的数据写入和海量数据的剖析。即便数据量较小,能满足数据写入的要求,也不能同时响应实时计算的申请。

Hadoop 生态提供了音讯引擎、实时数据写入、流数据计算、离线数据仓库、离线数据计算等多个部件。这些大数据系统组合起来,能够解决工业物联网的数据平台问题。但这样的计划过于宏大和臃肿,施行和运维的老本很高。

2. 时序数据库的工业物联网解决方案

以 DolphinDB 为例,DolphinDB database 作为一个高性能的分布式时序数据库,为工业物联网的数据存储和计算提供了一个弱小的根底平台。

  • DolphinDB 的分布式数据库能够不便的反对程度扩大和垂直扩大,零碎的吞吐量和反对的数据量能够近乎有限的扩大。
  • DolphinDB 的流计算引擎反对实时流计算解决。内置的聚合引擎能够按指定的工夫窗口大小和频率来计算各种聚合指标。聚合既能够是时间轴上(从高频到低频)的纵向聚合,也能够是多个维度的横向聚合。
  • DolphinDB 的内存数据库能够反对数据的疾速写入,查问和计算。例如聚合引擎的后果能够输入到一个内存表,承受前端 BI(如 Grafana)的的秒级轮询指令。
  • DolphinDB 集数据库、分布式计算和编程语言于一体,能够在库内疾速的实现简单的分布式计算,例如回归和分类。这大大放慢了海量历史数据的离线剖析和建模。
  • DolphinDB 也实现了与局部开源或商业化的 BI 工具的接口。不便用户可视化或监控设施数据。

3. 案例综述

企业的生产车间内总共有 1000 个传感设施,每个设施每 10ms 采集一次数据,为简化 demo 脚本,假如采集的数据仅有三个维度,均为温度。须要实现的工作包含:

  • 将采集到的原始数据存入数据库。离线的数据建模须要用到这些历史数据。
  • 实时计算每个设施过来一分钟的平均温度指标,计算的频率为每两秒钟要进行一次。
  • 因为设施的操作工须要在最快的工夫内把握温度变动,所以前端展现界面每秒查问一次实时运算的后果并刷新温度变化趋势图。

4. 案例施行

4.1 零碎的功能模块设计

针对上述的案例,咱们首先要启用 DolphinDB 的分布式数据库,创立一个命名为 iotDemoDB 的分布式数据库用于保留采集的实时数据。数据库按日期和设施两个维度进行数据分区。日期采纳值分区,设施采纳范畴分区。日后清理过期数据,只有简略的删除旧的日期分区就可实现。

启用流数据公布和订阅性能。订阅高频数据流做实时计算。createStreamingAggregator 函数能创立一个指标聚合引擎,用于实时计算。咱们在案例里指定计算窗口大小是 1 分钟,每 2 秒钟运算一次过往 1 分钟的温度均值,而后将运算后果保留到低频数据表中,供前端轮询。

部署前端 Grafana 平台展现运算后果的趋势图,设置每 1 秒钟轮询一次 DolphinDB Server,并刷新展现界面。

4.2 服务器部署

在本次 demo 里,为了应用分布式数据库,咱们须要应用一个单机多节点集群,能够参考单机多节点集群部署指南。这里咱们配置了 1 个 controller+ 1 个 agent+ 4 个 datanode 的集群,上面列出次要的配置文件内容供参考:

cluster.nodes:

localSite,mode
localhost:8701:agent1,agent
localhost:8081:node1,datanode
localhost:8083:node2,datanode
localhost:8082:node3,datanode
localhost:8084:node4,datanode

因为 DolphinDB 零碎默认是不启用 Streaming 模块性能的,所以咱们须要通过在 cluster.cfg 里做显式配置来启用它,因为本次 demo 里应用的数据量不大,为了防止 demo 复杂化,所以这里只启用了 node1 来做数据订阅。

cluster.cfg:

maxMemSize=2
workerNum=4
persistenceDir=dbcache
maxSubConnections=4
node1.subPort=8085
maxPubConnections=4

理论生产环境下,倡议应用多物理机集群,能够参考多物理机集群部署指南。

4.3 实现步骤

首先咱们定义一个 sensorTemp 流数据表用于接管实时采集的温度数据,咱们应用 enableTablePersistence 函数对 sensorTemp 表做长久化,内存中保留的最大数据量是 100 万行。

share streamTable(1000000:0,hardwareIdtstemp1temp2`temp3,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE]) as sensorTemp
enableTablePersistence(sensorTemp, true, false, 1000000)

通过订阅流数据表 sensorTmp,把采集的数据准实时的批量保留到分布式数据库中。分布式表应用日期和设施编号两个分区维度。在物联网大数据场景下,常常要革除过期的数据,这样分区的模式能够简略的通过删除指定日期分区就能够疾速的清理过期数据。subscribeTable 函数最初两个参数控制数据保留的频率,只有订阅数据达到 100 万或工夫距离达到 10 秒才批量将数据写入分布式数据库。

db1 = database(“”,VALUE,2018.08.14..2018.12.20)
db2 = database(“”,RANGE,0..10*100)
db = database(“dfs://iotDemoDB”,COMPO,[db1,db2])
dfsTable = db.createPartitionedTable(sensorTemp,”sensorTemp”,tshardwareId)
subscribeTable(, “sensorTemp”, “save_to_db”, -1, append!{dfsTable}, true, 1000000, 10)

在对流数据做分布式保留数据库的同时,零碎应用 createStreamAggregator 函数创立一个指标聚合引擎,用于实时计算。函数第一个参数指定了窗口大小为 60 秒,第二个参数指定每 2 秒钟做一次求均值运算,第三个参数是运算的元代码,能够由用户本人指定计算函数,任何零碎反对的或用户自定义的聚合函数这里都能反对,通过指定分组字段 hardwareId,函数会将流数据按设施分成 1000 个队列进行均值运算,每个设施都会按各自的窗口计算失去对应的平均温度。最初通过 subscribeTable 订阅流数据,在有新数据进来时触发实时计算,并将运算后果保留到一个新的数据流表 sensorTempAvg 中。

createStreamAggregator 参数阐明:窗口工夫,运算间隔时间,聚合运算元代码,原始数据输出表,运算后果输出表,时序字段,分组字段,触发 GC 记录数阈值。

share streamTable(1000000:0, timehardwareIdtempavg1tempavg2`tempavg3, [TIMESTAMP,INT,DOUBLE,DOUBLE,DOUBLE]) as sensorTempAvg
metrics = createStreamAggregator(60000,2000,<[avg(temp1),avg(temp2),avg(temp3)]>,sensorTemp,sensorTempAvg,ts,hardwareId,2000)
subscribeTable(, “sensorTemp”, “metric_engine”, -1, append!{metrics},true)

在 DolphinDB Server 端在对高频数据流做保留、剖析的时候,Grafana 前端程序每秒钟会轮询实时运算的后果,并刷新平均温度的趋势图。DolphinDB 提供了 Grafana_DolphinDB 的 datasource 插件,对于 Grafana 的装置以及 DolphinDB 的插件配置请参考 Grafana 配置教程。

在实现 grafana 的根本配置之后,新增一个 Graph Panel, 在 Metrics tab 里输出:

select gmtime(time) as time, tempavg1, tempavg2, tempavg3 from sensorTempAvg where hardwareId = 1

这段脚本是选出 1 号设施实时运算失去的均匀温度表。

最初,启动数据模仿生成程序,生成模仿温度数据并写入流数据表。

数据规模: 1000 个设施,以每个点 3 个维度、10ms 的频率生成数据,以每个维度 8 个 Byte (Double 类型) 计算,数据流速是 24Mbps,继续 100 秒。

def writeData(){
hardwareNumber = 1000
for (i in 0:10000) {
data = table(take(1..hardwareNumber,hardwareNumber) as hardwareId ,take(now(),hardwareNumber) as ts,rand(20..41,hardwareNumber) as temp1,rand(30..71,hardwareNumber) as temp2,rand(70..151,hardwareNumber) as temp3)
sensorTemp.append!(data)
sleep(10)
}
}
submitJob(“simulateData”, “simulate sensor data”, writeData)

点击这里下载残缺的 demo 脚本。

退出移动版