前言
咱们能够将设施上行数据存储到关系型数据库中,咱们须要两张带有工夫戳的表(最新数据表 和 历史数据表 ), 历史数据表 存储所有设施上报的数据,最新数据表 须要存储设备最新一条上报数据,这条最新数据相当于设施的以后状态。而后展现的时候只展现最新一条数据的状态,报表查问能够依照设施 id 和工夫从 历史数据表 查问汇总。
这样是能够的,然而咱们的 最新数据表 须要被频繁的更新,数据量少的时候没问题。但数据量大,并发高的时候就会呈现问题。
1、存储老本:数据不会被压缩,导致占用存储资源。
2、保护老本:单表数据量太大时,须要人工分库分表。
3、写入性能:单机写入吞吐量难以满足大量上行数据的写入需要,数据库存在性能瓶颈。
4、查问性能:数据量太大导致查问性能受到影响。
剖析
咱们能够采纳时序库来解决上述问题,首先来理解一下什么是时序数据。时序数据是依照工夫维度进行索引的数据,它记录了某个被测量实体在肯定工夫范畴内,每个工夫点上的一组测试值。传感器上传的室内 PM2.5 和甲醛数据、净水器传感器以后的 TDS 值、计算机系统的监控数据等,都属于时序数据,时序数据有如下特点:
1、数据量较大,写入操作是继续且安稳的,而且写多读少。
2、只有写入操作,简直没有更新操作,比方去批改传感器的历史数据,是没有意义的。
3、没有随机删除,即便删除也是依照工夫范畴进行删除。删除某一个工夫点的数据没有意义,然而删除 2 年前的数据是有意义的。
4、数据实时性和时效性强,数据随着工夫的推移一直追加,旧数据很快失去意义。
5、大部分以工夫和实体为维度进行查问,很少以测试值为维度查问,比方用户会查问某个时间段的温度数据,然而很少会去查问温度高于多少度的数据记录。
显然 IoT 的业务是合乎应用时序库的场景的。
序数据库就是用来存储时序数据的数据库,时序数据库相较于传统的关系型数据和非关系型数据库而言,专门优化了对时序数据的存储,开源的时序数据库有 InfluxDB OpenTSDB、TimeScaleDB 等。本文以 InfluxDB 数据库进行演示。
时序数据库有如下几个概念。
1.Metric: 度量,相当于关系型数据库中的表(table)。
2.Data Point: 数据点,相当于关系型数据库的中的行(row)。
3.Timestamp: 工夫戳,数据点生成时的工夫戳。
4.Field: 测量值,比方温度和湿度、PM2.5 等。
5.Tag: 标签,用于标识数据点,通常用来标识数据点的起源,比方温度和湿度数据来自哪个房间,哪个设施,能够当作关系型数据库表的主键。
如下图,度量为 Wind,每一个数据点都具备一个 timestamp,两个 field:direction 和 speed,两个 tag:sensor、city。它的第一行和第三行,寄存的都是 sensor 号码为 95D8-7913 的设施,属性城市是上海。随着工夫的变动,风向和风速都产生了扭转,风向从 23.4 变成 23.2;而风速从 3.4 变成了 3.3。
图片来自网络
施行步骤
时序库的装置
装置参考官网文档,为了不便,我这里采纳 docker 装置
docker run --name influxdb -p 8086:8086 influxdb:2.7.0
https://docs.influxdata.com/influxdb/v2.7/install/
咱们关上 服务器 ip:8086 能够看到它自带的治理界面,咱们首先创立用户名明码,组织、以及 Bucket 的名称。
这里的 bucket “IoTDemos” 相当于数据库的名称
咱们记录一下这个 Token,一会连贯 influxdb 须要,相当于账号密码
解决 playload 没有工夫戳问题
对于时序库来讲,工夫戳是十分重要的,然而咱们拿到的 playload 并没有工夫戳(MQTTNet 包我没有找到拿工夫戳的办法)。
所以咱们须要在 mqtt 上想方法,让设施上报数据的时候,mqtt 主动增加工夫戳到 playload 中。
1、咱们在数据集成 -> 规定中新建一条规定名称为 ”Add_Ts”。SQL 编写如下
SELECT
*,
now_timestamp('millisecond') as payload.Ts
FROM
"topic/#"
topic/# 代表音讯公布到 ”topic/#” 主题的事件
now_timestamp 函数返回以后工夫的 Unix 工夫戳,咱们将工夫戳写入到 payload 的 Ts 属性中,对于更多内置 SQL 函数,请参考官网文档
https://www.emqx.io/docs/zh/v5.0/data-integration/rule-sql-bu…
2、咱们关上上面的调试,模仿设施上报一条数据,能够看到这条规定帮咱们退出了工夫戳。
3、而后咱们还须要解决增加了工夫戳的处理结果,咱们在右侧增加一个动作,抉择 音讯重公布 ,将刚刚增加了工夫戳的音讯重发到一个新的 Topic 上,咱们应用topic/dp,并在 playload 中增加 ${payload},这样咱们就批改了 playload 中的信息,增加了咱们须要的工夫戳,当然,咱们 Hub 订阅的音讯也须要对应批改,增加 /dp 后缀。
[](https://img2023.cnblogs.com/blog/2849835/202305/2849835-20230…)
4、首先咱们先批改MASA.IoT.Hub 的配置文件,Topic 增加 ”/dp” 后缀
"MqttSetting": {
...
"Topic": "$share/IotHub/topic/+/dp"
},
5、CallbackAsync 中,因为咱们设施名称是从 Topic 截取的,也要对应批改一下。
private async Task CallbackAsync(MqttApplicationMessageReceivedEventArgs e)
{var deviceDataPointStr = System.Text.Encoding.Default.GetString(e.ApplicationMessage.PayloadSegment);
Console.WriteLine(deviceDataPointStr);
var pubSubOptions = new PubSubOptions
{
// 批改一下获取设施名称的形式
DeviceName = e.ApplicationMessage.Topic[6..^3],
Msg = deviceDataPointStr,
PubTime = new DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds(),
TrackId = Guid.NewGuid()};
...
}
代码编写
解决完工夫戳的问题,咱们就能够编写代码向 InfluxDB 中写入数据了,咱们首先在 Infrastructure 文件夹下创立 ITimeSeriesDbClient 接口和 TimeSeriesDbClient 类,应用接口也不便咱们日后更换其余的时序库。
这里应用了 InfluxDB.Client 包。
ITimeSeriesDbClient.cs
namespace MASA.IoT.Core.Infrastructure
{
public interface ITimeSeriesDbClient
{bool WriteMeasurement<T>(T measurement);
}
}
TimeSeriesDbClient.cs
using InfluxDB.Client;
using InfluxDB.Client.Api.Domain;
using MASA.IoT.WebApi;
using Microsoft.Extensions.Options;
namespace MASA.IoT.Core.Infrastructure
{
public class TimeSeriesDbClient : ITimeSeriesDbClient
{
private readonly InfluxDBClient _client;
private readonly string _bucket;
private readonly string _org;
private readonly AppSettings _appSettings;
public TimeSeriesDbClient(IOptions<AppSettings> settings)
{
_appSettings = settings.Value;
_org = _appSettings.InfluxDBSetting.Org;
_bucket = _appSettings.InfluxDBSetting.Bucket;
_client = new InfluxDBClient(_appSettings.InfluxDBSetting.Url, _appSettings.InfluxDBSetting.Token);
}
public bool WriteMeasurement<T>(T measurement)
{
try
{using var writeApi = _client.GetWriteApi();
writeApi.WriteMeasurement<T>(measurement, WritePrecision.Ms, _bucket, _org);
return true;
}
catch (Exception ex)
{Console.WriteLine(ex.Message);
return false;
}
}
}
}
这里应用 new InfluxDBClient(_appSettings.InfluxDBSetting.Url, _appSettings.InfluxDBSetting.Token)来结构 InfluxDBClient。
Token就是咱们创立 Bucket 过程中保留的 Token。
Url 是咱们 InfluxDB 的拜访地址:http://127.0.0.1:8086
写入的办法 WriteMeasurement 中咱们通过 _client.GetWriteApi 创立一个写入的 api 而后间接将咱们要写入的泛型实体写入,第二个可选参数代表写入精度,这里咱们应用 WritePrecision.Ms
咱们在 DeviceHandler.cs 中注入 ITimeSeriesDbClient 并增加一个WriteMeasurementAsync 办法,在办法中咱们先依据设施名称获取产品,如果辨认产品 ID 为 10001(空净产品),
那么咱们就写入数据到 Measurement:AirPurifierDataPoint
Measurement 相当于数据库的表。
Measurement 和Column个性都是 InfluxDB.Client.Core 提供的,能够用来标识 Tag、Timestamp 等
using InfluxDB.Client.Core;
using Newtonsoft.Json;
namespace MASA.IoT.Core.Contract
{[Measurement("AirPurifierDataPoint")]
public class AirPurifierDataPoint
{
/// <summary>
/// 设施名称
/// </summary>
[Column("DeviceName", IsTag = true)] public string DeviceName {get; set;}
/// <summary>
/// 产品 ID
/// </summary>
[Column("ProductId", IsTag = true)] public Guid ProductId {get; set;}
/// <summary>
/// Pm2.5
/// </summary>
[Column("PM_25")] public double? Pm_25 {get; set;}
/// <summary>
/// 温度
/// </summary>
[Column("Temperature")] public double? Temperature {get; set;}
/// <summary>
/// 湿度
/// </summary>
[Column("Humidity")] public double? Humidity {get; set;}
/// <summary>
/// 工夫戳
/// </summary>
[JsonProperty(propertyName: "Ts")]
[Column(IsTimestamp = true)] public long Timestamp {get; set;}
}
}
public class DeviceHandler : IDeviceHandler
{
private readonly MASAIoTContext _ioTDbContext;
private readonly IMqttHandler _mqttHandler;
private readonly ITimeSeriesDbClient _timeSeriesDbClient;
public DeviceHandler(MASAIoTContext ioTDbContext, IMqttHandler mqttHandler, ITimeSeriesDbClient timeSeriesDbClient)
{
_ioTDbContext = ioTDbContext;
_mqttHandler = mqttHandler;
_timeSeriesDbClient = timeSeriesDbClient;
}
/// <summary>
/// 写入数据
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="pubSubOptions"></param>
/// <returns></returns>
public async Task<bool> WriteMeasurementAsync<T>(PubSubOptions pubSubOptions)
{var device = await _ioTDbContext.IoTDeviceInfo.Include(o => o.ProductInfo).AsNoTracking()
.FirstOrDefaultAsync(o => o.DeviceName == pubSubOptions.DeviceName);
if (device != null && device.ProductInfo.ProductCode == "10001") // 空气净化器产品
{var airPurifierDataPoint = JsonConvert.DeserializeObject<AirPurifierDataPoint>(pubSubOptions.Msg);
airPurifierDataPoint.ProductId = device.ProductInfoId;
return _timeSeriesDbClient.WriteMeasurement<AirPurifierDataPoint>(airPurifierDataPoint);
}
return false;
}
除了 WriteMeasurement 办法之外,还提供了很多其余办法,如 WritePoint,和批量写入的办法,可自行测试。
测试
咱们启动我的项目,通过 MQTTX 向 “topic/284202304230001” 上报一条数据
{
"DeviceName":"284202304230001",
"Pm_25":100,
"Temperature":25,
"Humidity":50
}
咱们在 influxDB 的管理工具中应用 Data Explorer,应用如下的flux query 查问语句,即可查出 5 分钟之内的数据,留神,这里的工夫是 UTC 工夫
如果想显示北京时区不便调试,能够在前面增加|> timeShift(duration: 8h)
from(bucket: "IoTDemos")
|> range(start:-5m)
对于 flux 查问语法
https://docs.influxdata.com/flux/v0.x/
总结
本节咱们简略介绍了开源时序数据库 influxDB 的装置。
咱们借助 InfluxDB.Client 库实现设施从上报到时序库数据存储的全过程,下一节咱们介绍从时序库查问数据。
残缺代码在这里:https://github.com/sunday866/MASA.IoT-Training-Demos