关于物联网:NeuroneKuiper-实现工业物联网数据采集清理与反控

64次阅读

共计 6207 个字符,预计需要花费 16 分钟才能阅读完成。

Neuron 是运行在各类物联网边缘网关硬件上的工业协定网关软件,反对同时为多个不同通信协定设施、数十种工业协定进行一站式接入及 MQTT 协定转换,仅占用超低资源,即能够原生或容器的形式部署在 X86、ARM 等架构的各类边缘硬件中。同时,用户能够通过基于 Web 的治理控制台实现在线的网关配置管理。

在 eKuiper 1.5.0 之前的版本中,Neuron 与 eKuiper 之间须要采纳 MQTT 作为直达。二者协同时,须要额定部署 MQTT broker。同时,用户须要自行处理数据格式,包含读入和输入时的解码编码工作。

近日公布的 eKuiper 1.5.0 版本退出了 Neuron source 和 sink,使得用户无需配置即可在 eKuiper 中接入 Neuron 中采集到的数据进行计算;也能够不便地从 eKuiper 中通过 Neuron 管制设施。两个产品的整合,能够显著升高边缘计算解决方案对资源的应用要求,升高应用门槛。

本文将以工业物联网数据采集和荡涤的场景为例,介绍如何应用 eKuiper 和 Neuron 进行云边协同的数据采集、数据清理和数据反控。

Neuron 与 eKuiper 的集成

Neuron 2.0 中,北向利用减少了 eKuiper 反对。当 Neuron 开启北向 eKuiper 利用之后,二者之间通过 NNG 协定的过程间通信形式进行连贯,从而显著升高网络通信耗费,进步性能。

eKuiper 与 Neuron 之间的集成是双向的,其实现次要蕴含两个局部:

  • 提供了一个 Neuron 源,反对从 Neuron 订阅数据。
  • 提供了一个 Neuron sink,反对通过 Neuron 管制设施。

典型的工业物联网边缘数据处理场景中,Neuron 和 eKuiper 部署在同一台边缘机器上。这也是目前二者集成所反对的场景。若须要通过网络进行通信,则依然能够通过之前 MQTT 的形式进行协同。

筹备工作

Neuron 和 eKuiper 部署在凑近设施的边缘端网关或者工控机上。Neuron 采集的数据通过 eKuiper 解决后发送到云端的 MQTT broker 以便于云端的利用进行下一步的解决。同时,eKuiper 能够接管云端 MQTT 的指令,通过 Neuron 管制本地的设施。

开始入手操作之前,须要筹备以下环境:

  • MQTT 服务器:能够应用 EMQ 提供的公共 MQTT 服务器,或参考 EMQX 文档疾速部署一个本地 MQTT broker。假如 MQTT broker 地址为 tcp://broker.emqx.io:1883,以下教程将以此地址为例。
  • 为了不便察看运行后果,咱们须要装置一个 MQTT 客户端,例如 MQTT X。

疾速部署

Neuron 和 eKuiper 都反对二进制安装包以及 Docker 容器化部署计划。本文将以 Docker 计划为例,采纳 docker compose 形式,一键实现边缘端两个组件的疾速部署。

  1. 复制 docker-compose.yml 文件到部署的机器上。其内容如下,蕴含了 Neuron、eKuiper 以及 eKuiper 的治理界面 eKuiper manager(可选)。其中,eKuiper 和 Neuron 共享了名为 nng-ipc 的 volume,用于二者通信。

    version: '3.4'
    
    services:
      manager:
        image: emqx/ekuiper-manager:1.5
        container_name: ekuiper-manager
        ports:
          - "9082:9082"
      ekuiper:
        image: lfedge/ekuiper:1.5-slim
        ports:
          - "9081:9081"
          - "127.0.0.1:20498:20498"
        container_name: manager-ekuiper
        hostname: manager-ekuiper
        environment:
          MQTT_SOURCE__DEFAULT__SERVER: "tcp://broker.emqx.io:1883"
          KUIPER__BASIC__CONSOLELOG: "true"
          KUIPER__BASIC__IGNORECASE: "false"
        volumes:
          - nng-ipc:/tmp
      neuron:
        image: neugates/neuron:2.0.1
        ports:
          - "127.0.0.1:7001:7001"
          - "127.0.0.1:7000:7000"
        container_name: manager-neuron
        hostname: manager-neuron
        volumes:
          - nng-ipc:/tmp
    
    volumes:
      nng-ipc:
  2. 在该文件所在目录,运行:

    # docker compose up -d
  3. 所有的容器启动结束之后,请应用 docker ps 命令确定所有的容器曾经失常启动。

    CONTAINER ID   IMAGE                        COMMAND                  CREATED        STATUS          PORTS                                                NAMES
    3d61c1b166e5   neugates/neuron:2.0.1        "/usr/bin/entrypoint…"   18 hours ago   Up 11 seconds   127.0.0.1:7000-7001->7000-7001/tcp                   manager-neuron
    62a74d0be2ea   lfedge/ekuiper:1.5-slim    "/usr/bin/docker-ent…"   18 hours ago   Up 11 seconds   0.0.0.0:9081->9081/tcp, 127.0.0.1:20498->20498/tcp   manager-ekuiper
    7deffb470c1a   emqx/ekuiper-manager:1.5   "/usr/bin/docker-ent…"   18 hours ago   Up 11 seconds   0.0.0.0:9082->9082/tcp                               ekuiper-manager

配置 Neuron 和 eKuiper

Neuron 启动之后,咱们须要配置 Neuron 的南向设施和北向 eKuiper 利用通道,而后启动模拟器进行模仿数据采集。

南向设施和模拟器配置,请参考 Neuron 疾速教程,实现到运行和应用的 3. 南向配置局部。该教程中的北向配置局部为 MQTT 利用,本教程须要采纳 eKuiper 作为北向利用。

Neuron 北向 eKuiper 利用配置

在配置菜单中选择北向利用治理,进入到北向利用治理界面,此时未增加任何利用,须要手动增加利用,在本例中,咱们将创立一个 eKuiper 利用。

第一步,增加北向利用:

  1. 点击右上角的增加配置按键;
  2. 填写利用名称,例如,ekuiper-1;
  3. 下拉框中显示在该软件版本中,咱们可用的北向利用,此次咱们抉择 ekuiper 的插件,如下图所示。
  4. 创立利用胜利之后,会在北向利用治理界面呈现一个刚刚创立的利用的卡片,此时利用的工作状态在初始化,连贯状态在断开连接状态中,如下图所示。

第二步,订阅 Group:

点击第一步利用卡片 ekuiper-1 中任意空白处,进入到订阅 Group 界面,如下图所示。

  1. 点击右上角的增加订阅按键增加订阅;
  2. 下拉框抉择南向设施,这里咱们抉择下面建好的 modbus-plus-tcp-1 的设施;
  3. 下拉框抉择所要订阅的 Group,这里咱们抉择下面建好的 group-1;
  4. 点击提交,实现订阅。
  5. 点击北向利用治理,点开利用卡片中的工作状态开关,使利用进入运行中的状态。

至此,Neuron 已配置好数据采集,并将采集到的数据发送到北向的 eKuiper 通道中。

eKuiper manager 配置

eKuiper manager 是一个 Web 治理界面,可治理多个 eKuiper 实例。因而,咱们须要设置 manager 治理的 eKuiper 实例。具体设置请参考 eKuiper 治理控制台的应用。

eKuiper 治理可应用 REST API,命令行以及治理控制台。以下教程中,咱们次要应用 REST API 进行治理,包含流和规定的创立。

创立流

应用如下命令创立名为 neuronStream 的流。其中,type 属性设置为 neuron,示意该流会连贯到 neuron 中。neuron 中采集到的数据会全副发送过去,从而在 eKuiper 中多条规定都会针对同一份数据进行解决,因而流属性 shared 设置为 true。

curl -X POST --location http://127.0.0.1:9081/streams \
    -H 'Content-Type: application/json' \
    -d '{"sql":"CREATE STREAM neuronStream() WITH (TYPE=\"neuron\",FORMAT=\"json\",SHARED=\"true\");"}'

采集规定

Neuron 流建设之后,咱们能够在 eKuiper 里创立任意多条规定,对采集的数据进行各种计算和解决。本文以两个采集规定为例,实现边缘采集到云端的场景。更多 eKuiper 的数据处理能力,请参考扩大浏览局部。

荡涤数据到云端

假如 Neuron 中设置的两个 tag 的实在含意为:

  • tag1: decimal 示意的温度数据,理论温度应该除以 10
  • tag2: 整型的湿度数据。

本规定将采集的 Neuron 数据换算为正确的精度,并重命名为有意义的名字。后果发送到云端的 MQTT 动静 topic ${nodeName}/${groupName} 中。创立规定的 REST 命令如下。其中,规定名为 ruleNAll, 规定的 SQL 中对采集的值进行计算,并选取了 node_name 和 group_name 这些元数据。在动作中,规定的后果发送到云端的 MQTT broker,而且 topic 为动静名字。依据前文配置,咱们采集的 node_name 为 modbus-plus-tcp-1,group_name 为 group-1。因而,在 MQTT X 中,订阅 modbus-plus-tcp-1/group-1 主题即可失去计算的后果。

curl -X POST --location http://127.0.0.1:9081/rules \
    -H 'Content-Type: application/json' \
    -d '{"id":"ruleNAll","sql":"SELECT node_name, group_name, values->tag1/10 as temperature, values->tag2 as humidity FROM neuronStream","actions": [{"mqtt": {"server":"tcp://cloud.host:1883","topic":"{{.node_name}}/{{.group_name}}","sendSingle": true
    }
  }]
}'

关上 MQTT X,连贯到云端 broker,订阅 modbus-plus-tcp-1/group-1 主题,则可失去如下后果。因为采集频率为 100ms 一次,此处收到的数据也是相似的频率。

在 Modbus TCP 模拟器批改数据,可失去变动的输入。

采集变动数据到云端

采集频率较高而数据变动频率较低时,用户通常会采集到大量的冗余反复数据,全副上传云端会占据大量带宽。eKuiper 提供了应用层的去重性能,能够创立规定采集变动数据。对下面的规定进行革新,减少过滤条件,仅当采集到的任一 tag 数据变动时才发送数据。新的规定变为:

curl -X POST --location http://127.0.0.1:9081/rules \
    -H 'Content-Type: application/json' \
    -d '{"id":"ruleChange","sql":"SELECT node_name, group_name, values->tag1/10 as temperature, values->tag2 as humidity FROM neuronStream WHERE HAD_CHANGED(true, values->tag1, values->tag2)","actions": [{"mqtt": {"server":"tcp://cloud.host:1883","topic":"changed/{{.node_name}}/{{.group_name}}","sendSingle": true
    }
  }]
}'

关上 MQTT X,连贯到云端 broker,订阅 changed/modbus-plus-tcp-1/group-1 主题,收到数据的频率大大降低。在 Modbus TCP 模拟器批改数据才可收到新的数据。

通过 Neuron 管制设施

得益于 Neuron sink 组件,eKuiper 能够在数据处理后通过 Neuron 管制设施。在上面的规定中,eKuiper 接管 MQTT 的指令,对 Neuron 进行动静的反控。

假如有个利用场景,用户通过往云端的 MQTT 服务器的某个主题发送控制指令来对部署在边缘端的设施进行管制操作,比方设定指标设施的冀望的温度。首先,咱们在 eKuiper 中须要创立一个 MQTT 流,用于接管从别的利用发到 command MQTT 主题的指令。

curl -X POST --location "http://127.0.0.1:9081/streams" \
    -H 'Content-Type: application/json' \
    -d '{"sql":"CREATE STREAM mqttCommand() WITH (TYPE=\"mqtt\",SHARED=\"TRUE\",DATASOURCE=\"command\");"}'

接着,咱们创立一个规定,读取来自该 MQTT 流的数据,并依据规定通过 Neuron 写入数据。与前文雷同,假如 tag1 为温度传感器的 decimal 类型的读数。该规定读取 MQTT payload 中的 temperature 值并乘 10 之后作为 tag1 的值;应用 payload 中的 nodeName、groupName 字段作为写到 Neuron 中的动静 node 和 group 名。

curl -X POST --location http://127.0.0.1:9081/rules \
    -H 'Content-Type: application/json' \
    -d '{"id":"ruleCommand","sql":"SELECT temperature * 10 as tag1, nodeName, groupName from mqttCommand","actions": [{"log": {},"neuron": {"nodeName":"{{.nodeName}}","groupName":"{{.groupName}}","tags": ["tag1"]
    }
  }]
}'

规定运行之后,关上 MQTT X,向 command 主题写入如下格局的 JSON 串。须要留神的是,该当确保 node 和 group 在 Neuron 中已创立。在本教程的配置中,只创立了 modbus-plus-tcp-1 和 group-1。

{
  "nodeName": "modbus-plus-tcp-1",
  "temperature": 24,
  "groupName": "group-1"
}

关上 Neuron 的数据监控,可见 tag1 数据更改为 240,表明反控胜利。

同时,前文创立的两个规定应该采集到新的数值。

版权申明:本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.com/zh/blog/industrial-iot-data-collection-cleaning-and-control

正文完
 0