乐趣区

关于物联网:使用流式计算引擎-eKuiper-处理-Protocol-Buffers-数据

Protocol Buffers (Protobuf) 是一种语言中立、平台中立的可扩大机制,用于序列化结构化数据的二进制传输格局。相比惯例数据传输格局(如 JSON 或 XML),Protobuf 更加高效和疾速并节俭传输带宽,因而失去了宽泛的利用。

在云边协同架构中,往往既须要发送数据到云端,同时也须要接管云端发送过去的数据,进行云边协同计算。大规模的云边协同计算传输的数据总量微小,在公网带宽资源无限而且低廉的状况下,采纳更紧凑的数据传输格局显得尤为重要。

LF Edge eKuiper 是适宜部署于资源受限的边缘端的超轻量物联网边缘数据流式剖析引擎,可通过 source 和 sink 连贯 MQTT、HTTP 等各种通信协议的内部零碎。eKuiper 反对配置 source/sink 的传输数据的编解码格局,目前可反对 JSON、ProtoBuf 和 Binary 格局。

本文将以 Protobuf 格局为例,解说如何在 eKuiper 中设置编解码格局,通过 source 读入并解析该格局的数据以及在 sink 中应用该格局编码写入,从而实现高效的云边协同数据传输,缓解云边传输带宽缓和问题。

本教程采纳 eKuiper Manager 进行规定的创立和治理,请参考 UI 教程。您也能够采纳 REST API 或者在 eKuiper 运行的边端运行命令行工具来实现雷同的规定治理操作。

环境筹备

开始入手操作之前,须要筹备以下环境:

  • MQTT 服务器用于数据传输。本教程应用位于 tcp://broker.emqx.io:1883 的 MQTT 服务器,broker.emqx.io 是一个由 EMQX Cloud 提供的公共 MQTT 服务器。若本地运行 eKuiper,须要更改 etc/mqtt_source.yaml,配置项 server 改为 ”tcp://broker.emqx.io:1883″;若应用 docker 启动,应设置环境变量 MQTT_SOURCEDEFAULTSERVER=”tcp://broker.emqx.io:1883″。
  • 为了不便察看运行后果,咱们须要装置一个 MQTT 客户端,例如 MQTT X。

模式注册(Schema Registry)

相比于无模式的 JSON 格局,Protobuf 须要提前定义数据结构,即模式。在 proto 文件中,能够蕴含多个 message 以及其余实体的定义,然而在编解码格局的配置中,只有 message 的定义能够被应用。本教程中,咱们应用以下模式进行数据结构的定义。该文件定义了一个名为 Book 的 message 构造,其中蕴含字符串类型的 title 和整型的 price。传输的数据将根据此构造对书籍数据进行二进制数据的编解码。

message Book {
  required string title = 1; 
  required int32 price = 2;
}
  1. 注册模式。在治理控制台中,关上配置 -> 模式,点击创立模式。

  2. 在模式创立窗口中,如下图所示填写。其中,模式类型抉择protobuf;模式名称可输出自定义的不反复的名称作为后续规定创立中模式的标识 id;模式内容可采纳文件或者文本内容填写。抉择 file 的状况下,须要填写文件所在的 url;本教程应用的模式较为简单,因而可抉择 content,而后在内容框中填入 proto 文件的文本。

  3. 点击提交。在模式列表中该当可能看到新创建的模式。后续可应用操作栏中的按钮进行批改或删除的操作。

至此,咱们曾经注册了名为 schema1 的模式,其中定义了 Book 这种类型,在规定的 source 和 sink 中能够应用该注册的模式。用户也能够持续在此界面进行更多的模式注册和管理工作。

读取 Protobuf 数据

本节中,咱们以 MQTT source 为例,介绍如何接入并解析基于 Protobuf 编码传输的数据,使之能够在 eKuiper 中进行规定的计算。须要留神的是,在 Source 中,编码格局与传输协定并不是绑定的。任何的 source 类型如 MQTT,httpPull 等都能够搭配不同的编码格局,例如 ProtoBuf 和 JSON 等。

假如咱们有一个 MQTT 主题 demo,出于节俭传输带宽的目标,外面传输的数据为 Protobuf 编码的二进制数据。接下来,咱们将配置 eKuiper 数据源,接入这个主题的数据并进行解决。

  1. 创立数据流:在治理控制台中,抉择源治理 -> 流治理,点击创立流。
  2. 配置数据流及其格局:流名称可设置为自定义的不反复的名称;数据源为要监听的 MQTT 主题;流类型设置为 mqtt;流格局抉择 protobuf;模式名称抉择上一步注册的 schema1;模式音讯设置为 proto 文件里定义的 message Book。该配置示意数据流 protoDemo 将监听 MQTT 主题 protoDemo,收到二进制数据后将采纳 schema1 中的 Book 的格局进行 protobuf 解码。点击提交,在流列表中该当列出新创建的流。

  3. 创立规定:抉择规定,点击新建规定,进入规定创立界面。如下图所示,右上角点击进入文本模式,输出自定义的规定 ID,规定名字,在文本内容中输出规定的 JSON 文本。该规定示意抉择流 protoDemo 中的内容,发送到 MQTT 主题 result/protobuf 中。

    {
       "id": "ruleDecode",
       "sql": "SELECT * FROM protoDemo",
       "actions": [{
         "mqtt": {
           "server": "tcp://broker.emqx.io:1883",
           "topic": "result/protobuf",
           "sendSingle": true
         }
       }]
    }
  4. 发送数据并查看后果:咱们将应用 MQTTX 发送 Protobuf 编码后的二进制数据到 protoDemo 主题中,察看收到的后果是否是解码后的正确数据。

    1. 关上 MQTT X,连贯到云端 tcp://broker.emqx.io:1883
    2. 订阅主题上文规定发送后果的主题 result/protobuf,便于察看后果。
    3. 在音讯发送窗格中,设置主题为 protoDemo,Payload 格局为 Hex, 发送依据 schema1 中 Book 格局编码的二进制数据,例如 0a1073747265616d696e672073797374656d107b

    4. 确保接管窗口收到正确的 JSON 数据,如下图所示。

至此,咱们实现了 Protobuf 数据的读取和解码并用简略的规定进行解决输入。用户像解决一般 JSON 格局数据一样创立各种各样的规定。若未失去预期后果,可在治理控制台的规定列表页面,查看规定状态,确保规定数据入出的指标合乎预期。

写入 Protobuf 数据

本节中,咱们将展现读取 JSON 格局数据进行解决后采纳 Protobuf 格局发送到云端 MQTT broker 的用法。在物联网边云协同的场景中,该用法可节俭边云传输的带宽开销。部署在边缘端的 eKuiper 接入本地的 MQTT broker 无需耗费带宽,可通过解决较快的 JSON 格局接入。规定运算之后,计算结果须要发送到云端 MQTT broker 时,可应用 Protobuf 编码节俭带宽。

  1. 创立数据流:在治理控制台中,抉择源治理 -> 流治理,点击创立流。如下图所示,创立一个连入 demo 主题,JSON 格局数据的流。

  2. 创立规定,应用 Protobuf 格局发送到云端。

    1. 点击新建规定,输出自定义的 Rule ID 和名称,输出 SQL SELECT * FROM demo
    2. 点击动作左边的新建按钮,配置 MQTT 动作。其中,MQTT 服务器地址配置为云端 broker 地址,MQTT 主题为 result/protobufOut;数据按条发送配置为 true,确保收到的为单条数据以匹配格局配置;流格局配置为 protobuf,模式名称为第一节注册的 schema1,模式音讯为 Book。该规定将读取 JSON 数据,而后依照 Book 的格局编码成二进制数据发往 result/protobufOut 主题。点击提交,实现动作配置。

    3. 每个规定能够有多个动作,每个动作应用的编码格局是独立的。用户能够持续配置其余动作。全副配置实现后,点击提交,实现规定的创立。
  3. 发送数据并查看后果,该流程与上一节相似。本次咱们将向 demo 主题发送 JSON 数据,并冀望在订阅的 result/protobufOut 主题中查看到 protobuf 编码的二进制数据。如下图所示,留神数据格式的配置免得显示乱码。

总结

本教程介绍了如何在 eKuiper 中进行 Protobuf 数据的读取和写入。ProtoBuf 格局是 eKuiper 对外连贯的格局的一种,各种格局之间能够任意组合,接入零碎后应用的都是外部的格局示意。首先,用户须要先定义 Protobuf 的模式;之后在流的创立和动作的创立中可配置 Protobuf 格局,并抉择已定义的模式进行数据的编解码。

版权申明:本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.com/zh/blog/using-ekuiper-to-process-protocol-buffers-data

退出移动版