前言

获取一个设施的在线和离线状态,是一个很要害的性能。咱们对设施下发的控制指令,设施处于在线状态能力及时给咱们反馈。这里的在线和离线,咱们能够简略的了解为设施与MQTT的连贯状态。

剖析

咱们打电话的时候常常能听到:"您拨打的用户已关机“和”用户不在服务区或临时无奈接通“,这两种的区别是什么?


1、当用户开机时,会主动向最近的挪动基站注册,基站标记该用户为"attach"(在线)状态。
2、当用户关机时,手机会发动datach流程,告知基站本人关机了,基站标记该用户为"detach"(离线)状态。这样再次拨打就能够节俭寻呼资源,间接提醒用户关机。
3、当用户突然进入无网络的环境,或者手机故障,导致来不及发动datach流程,基站还认为用户"在线",当有人拨打用户号码时,基站测会对用户进行寻呼,然而超时得不到回应后,就会提醒"不在服务区"或者"临时无奈接通" 的语音。

其实这个计划在IoT上也是可行的,咱们能够让设施在线和离线的过程中向特定Topic发送状态音讯,然而存在问题,咱们须要一个独自的Broker去订阅这个Topic,然而这个独自的Broker很容易成为单点故障点。而且如果设施数量很大,这种意外离线的设施也很难及时发现,须要下发指令后期待设施响应超时能力发现。

计划1:遗嘱音讯

MQTT 遗嘱音讯能够在客户端意外断线时将“遗嘱”优雅地发送给第三方订阅者,以实现离线告诉、设施状态更新等业务。其中意外断线指客户端断开前未向服务器发送 DISCONNECT 音讯,比方:

因网络故障或网络稳定,设施在放弃连贯周期内未能通信,连贯被服务端敞开
设施意外掉电
设施尝试进行不被容许的操作而被服务端敞开连贯,例如订阅本身权限以外的主题等
遗嘱音讯在 MQTT 客户端向服务器端 CONNECT 申请时设置,可选属性包含是否发送遗嘱音讯 (Will Message)标记,和遗嘱音讯主题 (Topic) 与内容(Payload) 以及 Properties。

值得一提的,遗嘱音讯公布的工夫可能会有提早:通常意外断线时,服务器无奈立刻检测到断线行为,须要通过连贯保活心跳机制并通过肯定周期后才会触发;MQTT 5.0 提供的遗嘱提早距离(Will Delay Interval)属性也会影响公布工夫。

演示遗嘱音讯的应用

咱们应用A、B两台电脑应用MQTT X来演示。
咱们在A电脑的 MQTT X 中新建一个名为 Test 的连贯,Host 批改为 批改为咱们的MQTT地址(192.120.5.204),并输出账号密码,在 Advanced 局部抉择 MQTT Version 为 5.0,并且将 Session Expiry Interval 设置为 10,确保会话不会在遗嘱音讯公布前过期。

而后在 Lass Will and Testament 局部将 Last-Will Topic 设置为 offline,Last-Will Payload 设置为 I'm offline,Will Delay Interval (s) 设置为 5。


实现以上设置后,咱们点击右上角的 Connect 按钮以建设连贯。

咱们在B电脑的MQTTX中新建一个连贯Sub,mqtt地址同样指向咱们的mqtt服务器(192.120.5.204)

并订阅offline主题

咱们用工作管理器间接完结A电脑的MQTTX过程,这是连贯会被间接断开,模仿了设施断电的场景,在5s之后,在B电脑的MQTTX订阅中收到了一条内容为 I‘m offline 的遗嘱音讯。

施行流程

1、设施遗嘱音讯内容设置为offline,该遗嘱主题与一个一般发送状态的主题设定成同一个 {设施名称}/status。例如 284202304230001/status
2、当设施连贯时,向主题 {设施名称}/status 发送内容为 online 的Retained音讯,其它客户端订阅主题 {设施名称}/status 的时候,将获取到 Retained 音讯为 online。

保留音讯(Retain )
MQTT 服务端收到 Retain 标记为 1 的 PUBLISH 报文时,会将该报文视为保留音讯,除了被失常转发以外, 保留音讯会被存储在服务端,每个主题下只能存在一份保留音讯,因而如果曾经存在雷同主题的保留音讯,则该保留音讯被替换。
当客户端建设订阅时,如果服务端存在主题匹配的保留音讯,则这些保留音讯将被立刻发送给该客户端。 借助保留音讯,新的订阅者可能立刻获取最近的状态,而不须要期待无奈预期的工夫,这在很多场景下是十分重要的。
EMQX 默认开启保留音讯的能力和服务,能够在 etc/emqx.conf 中批改 mqtt.retain_available 为 false 来敞开保留音讯的能力, 这样客户端将被禁止发送 Retain 标记为 1 的 PUBLISH 报文,否则,客户端将会收到起因码为 0x9A(不反对保留音讯)的 DISCONNECT 报文。
保留音讯的服务会存储和治理客户端发送的保留音讯,并发送给相应的订阅者。

3、当客户端异样断开时,零碎主动向主题 {设施名称}/status 发送内容为 offline 的音讯,其它订阅了此主题的客户端会马上收到 offline 音讯;如果遗嘱音讯设置了 Will Retain,那么此时如果有新的订阅 A/status 主题的客户端上线,也将获取到内容为 offline 的遗嘱音讯。

计划2:应用WebHook

计划1须要设施被动设置遗嘱音讯能力实现,那么有没有更简略的形式,间接通过设施与Mqtt的连贯事件来获取连贯状态呢。
EMQX 设计了一套WebHook零碎,能够通过这个自带的WebHook零碎获取外部的事件并进行解决。WebHook的原理很简略,当设施与mqtt建设连贯或者断开连接时,EMQX会把事件的信息通过咱们的配置调用特定的URL上的接口,实现告诉。
应用WebHook还能够无限防止单点故障。所以本我的项目会采纳WebHook的形式来实现对设施在线和离线的治理。

开启WebHook

数据集成 -> 数据桥接 中创立一个Webhook

名称设置为ConnectedEvent,URL 中填写咱们的Webhook地址,也就是触发事件之后的调用接口地址,这里咱们填:

http://192.120.5.204:5000/api/Device/ConnectedEvent

申请形式为Post,其余内容放弃默认不变

这里留神URL能够通过${field}的形式拼接,申请体也能够本人指定,如果留空会原样转发音讯,咱们这里申请体留空
设施在线和离线的事件转发的音讯格局如下

{    "username": "284202304230001",    "timestamp": 1682652598840,    "sockname": "172.17.0.5:1883",    "receive_maximum": 32,    "proto_ver": 5,    "proto_name": "MQTT",    "peername": "172.17.0.1:48524",    "node": "emqx@172.17.0.5",    "mountpoint": "undefined",    "metadata": {        "rule_id": "rule_3hsx"    },    "keepalive": 60,    "is_bridge": false,    "expiry_interval": 10,    "event": "client.connected",    "connected_at": 1682652598840,    "conn_props": {        "User-Property": {},        "Session-Expiry-Interval": 10    },    "clientid": "mqttx_c4491df0",    "clean_start": false}

咱们点击 创立 ,并持续点击 创立规定

咱们在创立规定中指定新的规定名称 rule_client_connected,并在SQL编辑器复制以下内容

SELECT  *FROM  "$events/client_connected",  "$events/client_disconnected"

在右侧的事件中,咱们能够看到所有可用的事件,咱们抉择了连贯和断开两个事件,在这两个事件触发时会通过Webhook调用咱们配置的接口,这样咱们就能获取到设施的在线、离线状态了。

咱们点击 创立按钮 实现规定的创立
咱们能够看见咱们创立好的规定

点击规定ID,还能够看到统计数据

在FLows中还能够看到整个工作流程

演示Webhook

咱们应用MQTTX模仿一次设施连贯和断开动作,能够在规定统计界面看到咱们的操作曾经被记录。

编写代码

咱们这里采纳计划2。
咱们须要实现之前配置的ConnectedEvent接口

    /// <summary>    /// 连贯事件申请    /// </summary>    public class ConnectedEventRequest    {        /// <summary>        /// 设施名称        /// </summary>        public string Username { get; set; }        /// <summary>        /// 工夫戳        /// </summary>        public long Timestamp { get; set; }        /// <summary>        /// 事件(连贯/断开)        /// </summary>        public string Event { get; set; }        /// <summary>        /// 连接时间(断开事件中为0)        /// </summary>        public long Connected_at { get; set; }        /// <summary>        /// Client ID        /// </summary>        public string Clientid { get; set; }    }
        /// <summary>        /// 更新设施在线状态        /// </summary>        /// <param name="deviceName"></param>        /// <param name="onlineStatus"></param>        /// <returns></returns>        public async Task UpdateDeviceOnlineStatusAsync(string deviceName, OnLineStates onlineStatus)        {            var device = await _ioTDbContext.IoTDeviceInfo.Include(o => o.IoTDeviceExtend).AsNoTracking()                .FirstOrDefaultAsync(o => o.DeviceName == deviceName);            if (device == null)            {                return;            }            else            {                if (device.IoTDeviceExtend == null) //扩大表为空                {                    device.IoTDeviceExtend = new IoTDeviceExtend                    {                        DeviceInfoId = device.Id,                        OnLineStates = (int)onlineStatus,                    };                    _ioTDbContext.Attach(device.IoTDeviceExtend);                                        _ioTDbContext.Entry(device.IoTDeviceExtend).State = EntityState.Added;                    _ioTDbContext.Entry(device.IoTDeviceExtend).Property(o => o.OnLineStates).IsModified = true;                    await _ioTDbContext.SaveChangesAsync();                }                if (device.IoTDeviceExtend.OnLineStates != (int)onlineStatus)         //在线状态不统一                {                    device.IoTDeviceExtend.OnLineStates = (int)onlineStatus;                    _ioTDbContext.Attach(device.IoTDeviceExtend);                    //避免更新其余字段                    _ioTDbContext.Entry(device.IoTDeviceExtend).State = EntityState.Unchanged;                    _ioTDbContext.Entry(device.IoTDeviceExtend).Property(o => o.OnLineStates).IsModified = true;                    await _ioTDbContext.SaveChangesAsync();                }            }        }

咱们依据Event中的内容来判断是 连贯(client.connected)/断开(client.disconnected) 的事件

        /// <summary>        /// 连贯、断开事件        /// </summary>        /// <param name="request"></param>        /// <returns></returns>        [HttpPost]        public async Task ConnectedEventAsync([FromBody] ConnectedEventRequest request)        {            var onlineStatus = request.Event switch            {                "client.connected" => OnLineStates.OnLine,                _ => OnLineStates.OffLine            };            await _deviceHandler.UpdateDeviceOnlineStatusAsync(request.Username, onlineStatus);        }

总结

以上就是本文要讲的内容,咱们能够通过MQTTX来测试咱们的代码有效性。
该计划还存在局部毛病,例如:
1、每次设施高低线会导致频繁的申请接口,在大量设施接入的场景中须要思考接口性能。
2、因为网络等问题,Web调用程序可能不能齐全保障,兴许离线会比在线事件更早解决,从而导致状态不统一。咱们前面会尝试用其余计划来代替WebHook,尝试解决上述问题,在此之前咱们都会持续应用WebHook进行性能演示。

残缺代码在这里:https://github.com/sunday866/MASA.IoT-Training-Demos


如果你对咱们的 MASA 感兴趣,无论是代码奉献、应用、提 Issue,欢送分割咱们

WeChat:MasaStackTechOps
QQ:7424099