关于.net:使用MASA-StackNet-从零开始搭建IoT平台-第五章-使用时序库存储上行数据

106次阅读

共计 6635 个字符,预计需要花费 17 分钟才能阅读完成。

前言

咱们能够将设施上行数据存储到关系型数据库中,咱们须要两张带有工夫戳的表(最新数据表 历史数据表 ), 历史数据表 存储所有设施上报的数据,最新数据表 须要存储设备最新一条上报数据,这条最新数据相当于设施的以后状态。而后展现的时候只展现最新一条数据的状态,报表查问能够依照设施 id 和工夫从 历史数据表 查问汇总。
这样是能够的,然而咱们的 最新数据表 须要被频繁的更新,数据量少的时候没问题。但数据量大,并发高的时候就会呈现问题。
1、存储老本:数据不会被压缩,导致占用存储资源。
2、保护老本:单表数据量太大时,须要人工分库分表。
3、写入性能:单机写入吞吐量难以满足大量上行数据的写入需要,数据库存在性能瓶颈。
4、查问性能:数据量太大导致查问性能受到影响。

剖析

咱们能够采纳时序库来解决上述问题,首先来理解一下什么是时序数据。时序数据是依照工夫维度进行索引的数据,它记录了某个被测量实体在肯定工夫范畴内,每个工夫点上的一组测试值。传感器上传的室内 PM2.5 和甲醛数据、净水器传感器以后的 TDS 值、计算机系统的监控数据等,都属于时序数据,时序数据有如下特点:
1、数据量较大,写入操作是继续且安稳的,而且写多读少。
2、只有写入操作,简直没有更新操作,比方去批改传感器的历史数据,是没有意义的。
3、没有随机删除,即便删除也是依照工夫范畴进行删除。删除某一个工夫点的数据没有意义,然而删除 2 年前的数据是有意义的。
4、数据实时性和时效性强,数据随着工夫的推移一直追加,旧数据很快失去意义。
5、大部分以工夫和实体为维度进行查问,很少以测试值为维度查问,比方用户会查问某个时间段的温度数据,然而很少会去查问温度高于多少度的数据记录。
显然 IoT 的业务是合乎应用时序库的场景的。
序数据库就是用来存储时序数据的数据库,时序数据库相较于传统的关系型数据和非关系型数据库而言,专门优化了对时序数据的存储,开源的时序数据库有 InfluxDB OpenTSDB、TimeScaleDB 等。本文以 InfluxDB 数据库进行演示。
时序数据库有如下几个概念。
1.Metric: 度量,相当于关系型数据库中的表(table)。
2.Data Point: 数据点,相当于关系型数据库的中的行(row)。
3.Timestamp: 工夫戳,数据点生成时的工夫戳。
4.Field: 测量值,比方温度和湿度、PM2.5 等。
5.Tag: 标签,用于标识数据点,通常用来标识数据点的起源,比方温度和湿度数据来自哪个房间,哪个设施,能够当作关系型数据库表的主键。

如下图,度量为 Wind,每一个数据点都具备一个 timestamp,两个 field:direction 和 speed,两个 tag:sensor、city。它的第一行和第三行,寄存的都是 sensor 号码为 95D8-7913 的设施,属性城市是上海。随着工夫的变动,风向和风速都产生了扭转,风向从 23.4 变成 23.2;而风速从 3.4 变成了 3.3。

图片来自网络

施行步骤

时序库的装置

装置参考官网文档,为了不便,我这里采纳 docker 装置

docker run --name influxdb -p 8086:8086 influxdb:2.7.0

https://docs.influxdata.com/influxdb/v2.7/install/

咱们关上 服务器 ip:8086 能够看到它自带的治理界面,咱们首先创立用户名明码,组织、以及 Bucket 的名称。
这里的 bucket “IoTDemos” 相当于数据库的名称

咱们记录一下这个 Token,一会连贯 influxdb 须要,相当于账号密码

解决 playload 没有工夫戳问题

对于时序库来讲,工夫戳是十分重要的,然而咱们拿到的 playload 并没有工夫戳(MQTTNet 包我没有找到拿工夫戳的办法)。
所以咱们须要在 mqtt 上想方法,让设施上报数据的时候,mqtt 主动增加工夫戳到 playload 中。
1、咱们在数据集成 -> 规定中新建一条规定名称为 ”Add_Ts”。SQL 编写如下

SELECT
  *,
  now_timestamp('millisecond') as payload.Ts
FROM
  "topic/#"

topic/# 代表音讯公布到 ”topic/#” 主题的事件
now_timestamp 函数返回以后工夫的 Unix 工夫戳,咱们将工夫戳写入到 payload 的 Ts 属性中,对于更多内置 SQL 函数,请参考官网文档

https://www.emqx.io/docs/zh/v5.0/data-integration/rule-sql-bu…

2、咱们关上上面的调试,模仿设施上报一条数据,能够看到这条规定帮咱们退出了工夫戳。

3、而后咱们还须要解决增加了工夫戳的处理结果,咱们在右侧增加一个动作,抉择 音讯重公布 ,将刚刚增加了工夫戳的音讯重发到一个新的 Topic 上,咱们应用topic/dp,并在 playload 中增加 ${payload},这样咱们就批改了 playload 中的信息,增加了咱们须要的工夫戳,当然,咱们 Hub 订阅的音讯也须要对应批改,增加 /dp 后缀。
[](https://img2023.cnblogs.com/blog/2849835/202305/2849835-20230…)
4、首先咱们先批改MASA.IoT.Hub 的配置文件,Topic 增加 ”/dp” 后缀

  "MqttSetting": {
...
    "Topic": "$share/IotHub/topic/+/dp"
  },

5、CallbackAsync 中,因为咱们设施名称是从 Topic 截取的,也要对应批改一下。

    private async Task CallbackAsync(MqttApplicationMessageReceivedEventArgs e)
    {var deviceDataPointStr = System.Text.Encoding.Default.GetString(e.ApplicationMessage.PayloadSegment);

        Console.WriteLine(deviceDataPointStr);
        var pubSubOptions = new PubSubOptions
        {
            // 批改一下获取设施名称的形式
            DeviceName = e.ApplicationMessage.Topic[6..^3],
            Msg = deviceDataPointStr,
            PubTime = new DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds(),
            TrackId = Guid.NewGuid()};                            
...
    }

代码编写

解决完工夫戳的问题,咱们就能够编写代码向 InfluxDB 中写入数据了,咱们首先在 Infrastructure 文件夹下创立 ITimeSeriesDbClient 接口和 TimeSeriesDbClient 类,应用接口也不便咱们日后更换其余的时序库。
这里应用了 InfluxDB.Client 包。
ITimeSeriesDbClient.cs

namespace MASA.IoT.Core.Infrastructure
{
    public interface ITimeSeriesDbClient
    {bool WriteMeasurement<T>(T measurement);
    }
}

TimeSeriesDbClient.cs

using InfluxDB.Client;
using InfluxDB.Client.Api.Domain;
using MASA.IoT.WebApi;
using Microsoft.Extensions.Options;

namespace MASA.IoT.Core.Infrastructure
{
    public class TimeSeriesDbClient : ITimeSeriesDbClient
    {
        private readonly InfluxDBClient _client;
        private readonly string _bucket;
        private readonly string _org;
        private readonly AppSettings _appSettings;
        
        public TimeSeriesDbClient(IOptions<AppSettings> settings)
        {
            _appSettings = settings.Value;
            _org = _appSettings.InfluxDBSetting.Org;
            _bucket = _appSettings.InfluxDBSetting.Bucket;
            _client = new InfluxDBClient(_appSettings.InfluxDBSetting.Url, _appSettings.InfluxDBSetting.Token);
        }

        public bool WriteMeasurement<T>(T measurement)
        {
            try
            {using var writeApi = _client.GetWriteApi();
                writeApi.WriteMeasurement<T>(measurement, WritePrecision.Ms, _bucket, _org);
                return true;
            }
            catch (Exception ex)
            {Console.WriteLine(ex.Message);
                return false;
            }
        }
    }
}

这里应用 new InfluxDBClient(_appSettings.InfluxDBSetting.Url, _appSettings.InfluxDBSetting.Token)来结构 InfluxDBClient。
Token就是咱们创立 Bucket 过程中保留的 Token
Url 是咱们 InfluxDB 的拜访地址:http://127.0.0.1:8086
写入的办法 WriteMeasurement 中咱们通过 _client.GetWriteApi 创立一个写入的 api 而后间接将咱们要写入的泛型实体写入,第二个可选参数代表写入精度,这里咱们应用 WritePrecision.Ms
咱们在 DeviceHandler.cs 中注入 ITimeSeriesDbClient 并增加一个WriteMeasurementAsync 办法,在办法中咱们先依据设施名称获取产品,如果辨认产品 ID 为 10001(空净产品),
那么咱们就写入数据到 Measurement:AirPurifierDataPoint
Measurement 相当于数据库的表。
MeasurementColumn个性都是 InfluxDB.Client.Core 提供的,能够用来标识 TagTimestamp

using InfluxDB.Client.Core;
using Newtonsoft.Json;

namespace MASA.IoT.Core.Contract
{[Measurement("AirPurifierDataPoint")]
    public class AirPurifierDataPoint
    {
        /// <summary>
        /// 设施名称
        /// </summary>
        [Column("DeviceName", IsTag = true)] public string DeviceName {get; set;}

        /// <summary>
        /// 产品 ID
        /// </summary>
        [Column("ProductId", IsTag = true)] public Guid ProductId {get; set;}

        /// <summary>
        /// Pm2.5
        /// </summary>
        [Column("PM_25")] public double? Pm_25 {get; set;}
        /// <summary>
        /// 温度
        /// </summary>
        [Column("Temperature")] public double? Temperature {get; set;}
        /// <summary>
        /// 湿度
        /// </summary>
        [Column("Humidity")] public double? Humidity {get; set;}
        /// <summary>
        /// 工夫戳
        /// </summary>
        [JsonProperty(propertyName: "Ts")]
        [Column(IsTimestamp = true)] public long Timestamp {get; set;}
    }
}
    public class DeviceHandler : IDeviceHandler
    {
        private readonly MASAIoTContext _ioTDbContext;
        private readonly IMqttHandler _mqttHandler;
        private readonly ITimeSeriesDbClient _timeSeriesDbClient;

        public DeviceHandler(MASAIoTContext ioTDbContext, IMqttHandler mqttHandler, ITimeSeriesDbClient timeSeriesDbClient)
        {
            _ioTDbContext = ioTDbContext;
            _mqttHandler = mqttHandler;
            _timeSeriesDbClient = timeSeriesDbClient;
        }

        /// <summary>
        /// 写入数据
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="pubSubOptions"></param>
        /// <returns></returns>
        public async Task<bool> WriteMeasurementAsync<T>(PubSubOptions pubSubOptions)
        {var device = await _ioTDbContext.IoTDeviceInfo.Include(o => o.ProductInfo).AsNoTracking()
                .FirstOrDefaultAsync(o => o.DeviceName == pubSubOptions.DeviceName);

            if (device != null && device.ProductInfo.ProductCode == "10001")  // 空气净化器产品
            {var airPurifierDataPoint = JsonConvert.DeserializeObject<AirPurifierDataPoint>(pubSubOptions.Msg);

                airPurifierDataPoint.ProductId = device.ProductInfoId;
  
                return _timeSeriesDbClient.WriteMeasurement<AirPurifierDataPoint>(airPurifierDataPoint);

            }
            return false;
        }

除了 WriteMeasurement 办法之外,还提供了很多其余办法,如 WritePoint,和批量写入的办法,可自行测试。

测试

咱们启动我的项目,通过 MQTTX 向 “topic/284202304230001” 上报一条数据

{
  "DeviceName":"284202304230001",
  "Pm_25":100,
  "Temperature":25,
  "Humidity":50
}

咱们在 influxDB 的管理工具中应用 Data Explorer,应用如下的flux query 查问语句,即可查出 5 分钟之内的数据,留神,这里的工夫是 UTC 工夫

如果想显示北京时区不便调试,能够在前面增加|> timeShift(duration: 8h)

from(bucket: "IoTDemos") 
|> range(start:-5m)

对于 flux 查问语法

https://docs.influxdata.com/flux/v0.x/

总结

本节咱们简略介绍了开源时序数据库 influxDB 的装置。
咱们借助 InfluxDB.Client 库实现设施从上报到时序库数据存储的全过程,下一节咱们介绍从时序库查问数据。

残缺代码在这里:https://github.com/sunday866/MASA.IoT-Training-Demos

正文完
 0