前言
咱们能够将设施上行数据存储到关系型数据库中,咱们须要两张带有工夫戳的表(最新数据表 和 历史数据表),历史数据表存储所有设施上报的数据,最新数据表须要存储设备最新一条上报数据,这条最新数据相当于设施的以后状态。而后展现的时候只展现最新一条数据的状态,报表查问能够依照设施id和工夫从历史数据表查问汇总。
这样是能够的,然而咱们的最新数据表须要被频繁的更新,数据量少的时候没问题。但数据量大,并发高的时候就会呈现问题。
1、存储老本:数据不会被压缩,导致占用存储资源。
2、保护老本:单表数据量太大时,须要人工分库分表。
3、写入性能:单机写入吞吐量难以满足大量上行数据的写入需要,数据库存在性能瓶颈。
4、查问性能:数据量太大导致查问性能受到影响。
剖析
咱们能够采纳时序库来解决上述问题,首先来理解一下什么是时序数据。时序数据是依照工夫维度进行索引的数据,它记录了某个被测量实体在肯定工夫范畴内,每个工夫点上的一组测试值。传感器上传的室内PM2.5和甲醛数据、净水器传感器以后的TDS值、计算机系统的监控数据等,都属于时序数据,时序数据有如下特点:
1、数据量较大,写入操作是继续且安稳的,而且写多读少。
2、只有写入操作,简直没有更新操作,比方去批改传感器的历史数据,是没有意义的。
3、没有随机删除,即便删除也是依照工夫范畴进行删除。删除某一个工夫点的数据没有意义,然而删除2年前的数据是有意义的。
4、数据实时性和时效性强,数据随着工夫的推移一直追加,旧数据很快失去意义。
5、大部分以工夫和实体为维度进行查问,很少以测试值为维度查问,比方用户会查问某个时间段的温度数据,然而很少会去查问温度高于多少度的数据记录。
显然IoT的业务是合乎应用时序库的场景的。
序数据库就是用来存储时序数据的数据库,时序数据库相较于传统的关系型数据和非关系型数据库而言,专门优化了对时序数据的存储,开源的时序数据库有InfluxDB OpenTSDB、TimeScaleDB 等。本文以InfluxDB数据库进行演示。
时序数据库有如下几个概念。
1.Metric:度量,相当于关系型数据库中的表(table)。
2.Data Point:数据点,相当于关系型数据库的中的行(row)。
3.Timestamp:工夫戳,数据点生成时的工夫戳。
4.Field:测量值,比方温度和湿度、PM2.5等。
5.Tag:标签,用于标识数据点,通常用来标识数据点的起源,比方温度和湿度数据来自哪个房间,哪个设施,能够当作关系型数据库表的主键。
如下图,度量为 Wind,每一个数据点都具备一个 timestamp,两个 field:direction 和 speed,两个 tag:sensor、city。它的第一行和第三行,寄存的都是 sensor 号码为 95D8-7913 的设施,属性城市是上海。随着工夫的变动,风向和风速都产生了扭转,风向从 23.4 变成 23.2;而风速从 3.4 变成了 3.3。
图片来自网络
施行步骤
时序库的装置
装置参考官网文档,为了不便,我这里采纳docker装置
docker run --name influxdb -p 8086:8086 influxdb:2.7.0
https://docs.influxdata.com/influxdb/v2.7/install/
咱们关上 服务器ip:8086 能够看到它自带的治理界面,咱们首先创立用户名明码,组织、以及Bucket的名称。
这里的bucket "IoTDemos" 相当于数据库的名称
咱们记录一下这个Token,一会连贯influxdb须要,相当于账号密码
解决playload没有工夫戳问题
对于时序库来讲,工夫戳是十分重要的,然而咱们拿到的playload并没有工夫戳(MQTTNet包我没有找到拿工夫戳的办法)。
所以咱们须要在mqtt上想方法,让设施上报数据的时候,mqtt主动增加工夫戳到playload中。
1、咱们在数据集成->规定中新建一条规定名称为"Add_Ts"。SQL编写如下
SELECT *, now_timestamp('millisecond') as payload.TsFROM "topic/#"
topic/# 代表音讯公布到"topic/#"主题的事件
now_timestamp函数返回以后工夫的 Unix 工夫戳,咱们将工夫戳写入到payload的Ts属性中,对于更多内置SQL函数,请参考官网文档
https://www.emqx.io/docs/zh/v5.0/data-integration/rule-sql-bu...
2、咱们关上上面的调试,模仿设施上报一条数据,能够看到这条规定帮咱们退出了工夫戳。
3、而后咱们还须要解决增加了工夫戳的处理结果,咱们在右侧增加一个动作,抉择音讯重公布,将刚刚增加了工夫戳的音讯重发到一个新的Topic上,咱们应用topic/dp,并在playload中增加${payload},这样咱们就批改了playload中的信息,增加了咱们须要的工夫戳,当然,咱们Hub订阅的音讯也须要对应批改,增加/dp后缀。
[](https://img2023.cnblogs.com/blog/2849835/202305/2849835-20230...)
4、首先咱们先批改MASA.IoT.Hub的配置文件,Topic增加"/dp"后缀
"MqttSetting": {... "Topic": "$share/IotHub/topic/+/dp" },
5、CallbackAsync中,因为咱们设施名称是从Topic截取的,也要对应批改一下。
private async Task CallbackAsync(MqttApplicationMessageReceivedEventArgs e) { var deviceDataPointStr = System.Text.Encoding.Default.GetString(e.ApplicationMessage.PayloadSegment); Console.WriteLine(deviceDataPointStr); var pubSubOptions = new PubSubOptions { //批改一下获取设施名称的形式 DeviceName = e.ApplicationMessage.Topic[6..^3], Msg = deviceDataPointStr, PubTime = new DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds(), TrackId = Guid.NewGuid() }; ... }
代码编写
解决完工夫戳的问题,咱们就能够编写代码向InfluxDB中写入数据了,咱们首先在Infrastructure文件夹下创立ITimeSeriesDbClient接口和TimeSeriesDbClient类,应用接口也不便咱们日后更换其余的时序库。
这里应用了InfluxDB.Client包。
ITimeSeriesDbClient.cs
namespace MASA.IoT.Core.Infrastructure{ public interface ITimeSeriesDbClient { bool WriteMeasurement<T>(T measurement); }}
TimeSeriesDbClient.cs
using InfluxDB.Client;using InfluxDB.Client.Api.Domain;using MASA.IoT.WebApi;using Microsoft.Extensions.Options;namespace MASA.IoT.Core.Infrastructure{ public class TimeSeriesDbClient : ITimeSeriesDbClient { private readonly InfluxDBClient _client; private readonly string _bucket; private readonly string _org; private readonly AppSettings _appSettings; public TimeSeriesDbClient(IOptions<AppSettings> settings) { _appSettings = settings.Value; _org = _appSettings.InfluxDBSetting.Org; _bucket = _appSettings.InfluxDBSetting.Bucket; _client = new InfluxDBClient(_appSettings.InfluxDBSetting.Url, _appSettings.InfluxDBSetting.Token); } public bool WriteMeasurement<T>(T measurement) { try { using var writeApi = _client.GetWriteApi(); writeApi.WriteMeasurement<T>(measurement, WritePrecision.Ms, _bucket, _org); return true; } catch (Exception ex) { Console.WriteLine(ex.Message); return false; } } }}
这里应用new InfluxDBClient(_appSettings.InfluxDBSetting.Url, _appSettings.InfluxDBSetting.Token)来结构InfluxDBClient。
Token就是咱们创立Bucket过程中保留的Token。
Url是咱们InfluxDB的拜访地址:http://127.0.0.1:8086
写入的办法WriteMeasurement中咱们通过_client.GetWriteApi创立一个写入的api而后间接将咱们要写入的泛型实体写入,第二个可选参数代表写入精度,这里咱们应用WritePrecision.Ms
咱们在DeviceHandler.cs中注入ITimeSeriesDbClient 并增加一个WriteMeasurementAsync办法,在办法中咱们先依据设施名称获取产品,如果辨认产品ID为10001(空净产品),
那么咱们就写入数据到Measurement:AirPurifierDataPoint
Measurement相当于数据库的表。
Measurement和Column个性都是InfluxDB.Client.Core提供的,能够用来标识Tag、Timestamp等
using InfluxDB.Client.Core;using Newtonsoft.Json;namespace MASA.IoT.Core.Contract{ [Measurement("AirPurifierDataPoint")] public class AirPurifierDataPoint { /// <summary> /// 设施名称 /// </summary> [Column("DeviceName", IsTag = true)] public string DeviceName { get; set; } /// <summary> /// 产品ID /// </summary> [Column("ProductId", IsTag = true)] public Guid ProductId { get; set; } /// <summary> /// Pm2.5 /// </summary> [Column("PM_25")] public double? Pm_25 { get; set; } /// <summary> /// 温度 /// </summary> [Column("Temperature")] public double? Temperature { get; set; } /// <summary> /// 湿度 /// </summary> [Column("Humidity")] public double? Humidity { get; set; } /// <summary> /// 工夫戳 /// </summary> [JsonProperty(propertyName: "Ts")] [Column(IsTimestamp = true)] public long Timestamp { get; set; } }}
public class DeviceHandler : IDeviceHandler { private readonly MASAIoTContext _ioTDbContext; private readonly IMqttHandler _mqttHandler; private readonly ITimeSeriesDbClient _timeSeriesDbClient; public DeviceHandler(MASAIoTContext ioTDbContext, IMqttHandler mqttHandler, ITimeSeriesDbClient timeSeriesDbClient) { _ioTDbContext = ioTDbContext; _mqttHandler = mqttHandler; _timeSeriesDbClient = timeSeriesDbClient; } /// <summary> /// 写入数据 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="pubSubOptions"></param> /// <returns></returns> public async Task<bool> WriteMeasurementAsync<T>(PubSubOptions pubSubOptions) { var device = await _ioTDbContext.IoTDeviceInfo.Include(o => o.ProductInfo).AsNoTracking() .FirstOrDefaultAsync(o => o.DeviceName == pubSubOptions.DeviceName); if (device != null && device.ProductInfo.ProductCode == "10001") //空气净化器产品 { var airPurifierDataPoint = JsonConvert.DeserializeObject<AirPurifierDataPoint>(pubSubOptions.Msg); airPurifierDataPoint.ProductId = device.ProductInfoId; return _timeSeriesDbClient.WriteMeasurement<AirPurifierDataPoint>(airPurifierDataPoint); } return false; }
除了WriteMeasurement办法之外,还提供了很多其余办法,如WritePoint,和批量写入的办法,可自行测试。
测试
咱们启动我的项目,通过MQTTX向"topic/284202304230001"上报一条数据
{ "DeviceName":"284202304230001", "Pm_25":100, "Temperature":25, "Humidity":50}
咱们在influxDB的管理工具中应用Data Explorer,应用如下的flux query查问语句,即可查出5分钟之内的数据,留神,这里的工夫是UTC工夫
如果想显示北京时区不便调试,能够在前面增加|> timeShift(duration: 8h)
from(bucket: "IoTDemos") |> range(start:-5m)
对于flux查问语法
https://docs.influxdata.com/flux/v0.x/
总结
本节咱们简略介绍了开源时序数据库influxDB的装置。
咱们借助InfluxDB.Client库实现设施从上报到时序库数据存储的全过程,下一节咱们介绍从时序库查问数据。
残缺代码在这里:https://github.com/sunday866/MASA.IoT-Training-Demos