关于iot:使用MASA全家桶从零开始搭建IoT平台三管理设备的连接状态

8次阅读

共计 5759 个字符,预计需要花费 15 分钟才能阅读完成。

前言

获取一个设施的在线和离线状态,是一个很要害的性能。咱们对设施下发的控制指令,设施处于在线状态能力及时给咱们反馈。这里的在线和离线,咱们能够简略的了解为设施与 MQTT 的连贯状态。

剖析

咱们打电话的时候常常能听到:” 您拨打的用户已关机“和”用户不在服务区或临时无奈接通“,这两种的区别是什么?

1、当用户开机时,会主动向最近的挪动基站注册,基站标记该用户为 ”attach”(在线)状态。
2、当用户关机时,手机会发动 datach 流程,告知基站本人关机了,基站标记该用户为 ”detach”(离线)状态。这样再次拨打就能够节俭寻呼资源,间接提醒用户关机。
3、当用户突然进入无网络的环境,或者手机故障,导致来不及发动 datach 流程,基站还认为用户 ” 在线 ”, 当有人拨打用户号码时,基站测会对用户进行寻呼,然而超时得不到回应后,就会提醒 ” 不在服务区 ” 或者 ” 临时无奈接通 ” 的语音。

其实这个计划在 IoT 上也是可行的,咱们能够让设施在线和离线的过程中向特定 Topic 发送状态音讯,然而存在问题,咱们须要一个独自的 Broker 去订阅这个 Topic,然而这个独自的 Broker 很容易成为单点故障点。而且如果设施数量很大,这种意外离线的设施也很难及时发现,须要下发指令后期待设施响应超时能力发现。

计划 1: 遗嘱音讯

MQTT 遗嘱音讯能够在客户端意外断线时将“遗嘱”优雅地发送给第三方订阅者,以实现离线告诉、设施状态更新等业务。其中意外断线指客户端断开前未向服务器发送 DISCONNECT 音讯,比方:

因网络故障或网络稳定,设施在放弃连贯周期内未能通信,连贯被服务端敞开
设施意外掉电
设施尝试进行不被容许的操作而被服务端敞开连贯,例如订阅本身权限以外的主题等
遗嘱音讯在 MQTT 客户端向服务器端 CONNECT 申请时设置,可选属性包含是否发送遗嘱音讯 (Will Message)标记,和遗嘱音讯主题 (Topic) 与内容(Payload) 以及 Properties。

值得一提的,遗嘱音讯公布的工夫可能会有提早:通常意外断线时,服务器无奈立刻检测到断线行为,须要通过连贯保活心跳机制并通过肯定周期后才会触发;MQTT 5.0 提供的遗嘱提早距离(Will Delay Interval)属性也会影响公布工夫。

演示遗嘱音讯的应用

咱们应用 A、B 两台电脑应用 MQTT X 来演示。
咱们在 A 电脑的 MQTT X 中新建一个名为 Test 的连贯,Host 批改为 批改为咱们的 MQTT 地址(192.120.5.204),并输出账号密码,在 Advanced 局部抉择 MQTT Version 为 5.0,并且将 Session Expiry Interval 设置为 10,确保会话不会在遗嘱音讯公布前过期。

而后在 Lass Will and Testament 局部将 Last-Will Topic 设置为 offline,Last-Will Payload 设置为 I’m offline,Will Delay Interval (s) 设置为 5。

实现以上设置后,咱们点击右上角的 Connect 按钮以建设连贯。

咱们在 B 电脑的 MQTTX 中新建一个连贯 Sub,mqtt 地址同样指向咱们的 mqtt 服务器(192.120.5.204)

并订阅 offline 主题

咱们用工作管理器间接完结 A 电脑的 MQTTX 过程,这是连贯会被间接断开,模仿了设施断电的场景,在 5s 之后,在 B 电脑的 MQTTX 订阅中收到了一条内容为 I‘m offline 的遗嘱音讯。

施行流程

1、设施遗嘱音讯内容设置为 offline,该遗嘱主题与一个一般发送状态的主题设定成同一个 {设施名称}/status。例如 284202304230001/status
2、当设施连贯时,向主题 {设施名称}/status 发送内容为 online 的 Retained 音讯,其它客户端订阅主题 {设施名称}/status 的时候,将获取到 Retained 音讯为 online。

保留音讯 (Retain)
MQTT 服务端收到 Retain 标记为 1 的 PUBLISH 报文时,会将该报文视为保留音讯,除了被失常转发以外,保留音讯会被存储在服务端,每个主题下只能存在一份保留音讯,因而如果曾经存在雷同主题的保留音讯,则该保留音讯被替换。
当客户端建设订阅时,如果服务端存在主题匹配的保留音讯,则这些保留音讯将被立刻发送给该客户端。借助保留音讯,新的订阅者可能立刻获取最近的状态,而不须要期待无奈预期的工夫,这在很多场景下是十分重要的。
EMQX 默认开启保留音讯的能力和服务,能够在 etc/emqx.conf 中批改 mqtt.retain_available 为 false 来敞开保留音讯的能力,这样客户端将被禁止发送 Retain 标记为 1 的 PUBLISH 报文,否则,客户端将会收到起因码为 0x9A(不反对保留音讯)的 DISCONNECT 报文。
保留音讯的服务会存储和治理客户端发送的保留音讯,并发送给相应的订阅者。

3、当客户端异样断开时,零碎主动向主题 {设施名称}/status 发送内容为 offline 的音讯,其它订阅了此主题的客户端会马上收到 offline 音讯;如果遗嘱音讯设置了 Will Retain,那么此时如果有新的订阅 A/status 主题的客户端上线,也将获取到内容为 offline 的遗嘱音讯。

计划 2: 应用 WebHook

计划 1 须要设施被动设置遗嘱音讯能力实现,那么有没有更简略的形式,间接通过设施与 Mqtt 的连贯事件来获取连贯状态呢。
EMQX 设计了一套 WebHook 零碎,能够通过这个自带的 WebHook 零碎获取外部的事件并进行解决。WebHook 的原理很简略,当设施与 mqtt 建设连贯或者断开连接时,EMQX 会把事件的信息通过咱们的配置调用特定的 URL 上的接口,实现告诉。
应用 WebHook 还能够无限防止单点故障。所以本我的项目会采纳 WebHook 的形式来实现对设施在线和离线的治理。

开启 WebHook

数据集成 -> 数据桥接 中创立一个 Webhook

名称设置为ConnectedEvent,URL 中填写咱们的 Webhook 地址,也就是触发事件之后的调用接口地址,这里咱们填:

http://192.120.5.204:5000/api/Device/ConnectedEvent

申请形式为 Post,其余内容放弃默认不变

这里留神 URL 能够通过 ${field}的形式拼接,申请体也能够本人指定,如果留空会原样转发音讯,咱们这里申请体留空
设施在线和离线的事件转发的音讯格局如下

{
    "username": "284202304230001",
    "timestamp": 1682652598840,
    "sockname": "172.17.0.5:1883",
    "receive_maximum": 32,
    "proto_ver": 5,
    "proto_name": "MQTT",
    "peername": "172.17.0.1:48524",
    "node": "emqx@172.17.0.5",
    "mountpoint": "undefined",
    "metadata": {"rule_id": "rule_3hsx"},
    "keepalive": 60,
    "is_bridge": false,
    "expiry_interval": 10,
    "event": "client.connected",
    "connected_at": 1682652598840,
    "conn_props": {"User-Property": {},
        "Session-Expiry-Interval": 10
    },
    "clientid": "mqttx_c4491df0",
    "clean_start": false
}

咱们点击 创立 ,并持续点击 创立规定

咱们在创立规定中指定新的规定名称 rule_client_connected, 并在 SQL 编辑器复制以下内容

SELECT
  *
FROM
  "$events/client_connected",
  "$events/client_disconnected"

在右侧的事件中,咱们能够看到所有可用的事件,咱们抉择了连贯和断开两个事件,在这两个事件触发时会通过 Webhook 调用咱们配置的接口,这样咱们就能获取到设施的在线、离线状态了。

咱们点击 创立 按钮 实现规定的创立
咱们能够看见咱们创立好的规定

点击规定 ID,还能够看到统计数据

在 FLows 中还能够看到整个工作流程

演示 Webhook

咱们应用 MQTTX 模仿一次设施连贯和断开动作,能够在规定统计界面看到咱们的操作曾经被记录。

编写代码

咱们这里采纳计划 2。
咱们须要实现之前配置的 ConnectedEvent 接口

    /// <summary>
    /// 连贯事件申请
    /// </summary>
    public class ConnectedEventRequest
    {
        /// <summary>
        /// 设施名称
        /// </summary>
        public string Username {get; set;}
        /// <summary>
        /// 工夫戳
        /// </summary>
        public long Timestamp {get; set;}

        /// <summary>
        /// 事件(连贯 / 断开)/// </summary>
        public string Event {get; set;}
        /// <summary>
        /// 连接时间(断开事件中为 0)/// </summary>
        public long Connected_at {get; set;}

        /// <summary>
        /// Client ID
        /// </summary>
        public string Clientid {get; set;}
    }
        /// <summary>
        /// 更新设施在线状态
        /// </summary>
        /// <param name="deviceName"></param>
        /// <param name="onlineStatus"></param>
        /// <returns></returns>
        public async Task UpdateDeviceOnlineStatusAsync(string deviceName, OnLineStates onlineStatus)
        {var device = await _ioTDbContext.IoTDeviceInfo.Include(o => o.IoTDeviceExtend).AsNoTracking()
                .FirstOrDefaultAsync(o => o.DeviceName == deviceName);
            if (device == null)
            {return;}
            else
            {if (device.IoTDeviceExtend == null) // 扩大表为空
                {
                    device.IoTDeviceExtend = new IoTDeviceExtend
                    {
                        DeviceInfoId = device.Id,
                        OnLineStates = (int)onlineStatus,
                    };
                    _ioTDbContext.Attach(device.IoTDeviceExtend);
                    
                    _ioTDbContext.Entry(device.IoTDeviceExtend).State = EntityState.Added;
                    _ioTDbContext.Entry(device.IoTDeviceExtend).Property(o => o.OnLineStates).IsModified = true;
                    await _ioTDbContext.SaveChangesAsync();}
                if (device.IoTDeviceExtend.OnLineStates != (int)onlineStatus)         // 在线状态不统一
                {device.IoTDeviceExtend.OnLineStates = (int)onlineStatus;

                    _ioTDbContext.Attach(device.IoTDeviceExtend);
                    // 避免更新其余字段
                    _ioTDbContext.Entry(device.IoTDeviceExtend).State = EntityState.Unchanged;
                    _ioTDbContext.Entry(device.IoTDeviceExtend).Property(o => o.OnLineStates).IsModified = true;
                    await _ioTDbContext.SaveChangesAsync();}
            }
        }

咱们依据 Event 中的内容来判断是 连贯(client.connected)/ 断开(client.disconnected) 的事件

        /// <summary>
        /// 连贯、断开事件
        /// </summary>
        /// <param name="request"></param>
        /// <returns></returns>
        [HttpPost]
        public async Task ConnectedEventAsync([FromBody] ConnectedEventRequest request)
        {
            var onlineStatus = request.Event switch
            {
                "client.connected" => OnLineStates.OnLine,
                _ => OnLineStates.OffLine
            };

            await _deviceHandler.UpdateDeviceOnlineStatusAsync(request.Username, onlineStatus);
        }

总结

以上就是本文要讲的内容,咱们能够通过 MQTTX 来测试咱们的代码有效性。
该计划还存在局部毛病,例如:
1、每次设施高低线会导致频繁的申请接口,在大量设施接入的场景中须要思考接口性能。
2、因为网络等问题,Web 调用程序可能不能齐全保障,兴许离线会比在线事件更早解决,从而导致状态不统一。咱们前面会尝试用其余计划来代替 WebHook,尝试解决上述问题,在此之前咱们都会持续应用 WebHook 进行性能演示。

残缺代码在这里:https://github.com/sunday866/MASA.IoT-Training-Demos


如果你对咱们的 MASA 感兴趣,无论是代码奉献、应用、提 Issue,欢送分割咱们

WeChat:MasaStackTechOps
QQ:7424099

正文完
 0