关于物联网:EMQ-X-与-HStreamDB-集成实践通过规则引擎实现数据存储

35次阅读

共计 2931 个字符,预计需要花费 8 分钟才能阅读完成。

面对物联网时代海量设施连贯及其实时产生的大规模数据流,EMQ 提供从边缘到云的古代数据基础设施,助力云边端物联网数据的对立「连贯、挪动、解决、剖析」。

现在,可「随处运行、有限连贯、任意集成」的云原生分布式消息中间件 EMQ X 已解决了海量连贯的挑战,流数据库 HStreamDB 则正试图解决海量物联网数据的存储、解决与实时剖析。

作为首个专为流数据设计的云原生流数据库,HStreamDB 致力于高效的大规模数据流存储和治理。EMQ X 与 HStreamDB 的组合,将使海量数据接入、存储、实时处理与剖析的一站式治理变得不再艰难。

最近公布的 HStreamDB v0.6 新增了数据写入 Rest API,能够应用任何语言通过 Rest API 向 HStreamDB 写入数据,不便开源用户围绕 HStreamDB 进行二次开发。咱们也通过这一性能与 EMQ X 开源版的 Webhook 性能联合,实现了 EMQ X 和 HStreamDB 的疾速集成。

本文就将具体介绍应用 HStreamDB 对 EMQ X 的接入数据进行长久化存储的具体操作。

注:本文介绍基于 EMQ X 4.3 和 hstreamdb/hstream:v0.6.1 镜像。

启动 EMQ X 和 HStreamDB

首先咱们须要一个运行中的 EMQ X,如何装置、部署并启动请参考:EMQ X 文档。

同时,咱们须要一个运行中的 HStreamDB,更具体的如何装置、部署与启动教程请参考:HStreamDB Docs。

对于不相熟 HStreamDB 的用户,能够先通过 docker-compose 疾速启动一个单机的 HStreamDB 集群。

启动 HStreamDB

先间接通过链接下载 docker-compose.yaml 文件。

创立一个用来存储数据库数据的文件:

mkdir /data/store

在后盾启动 HStreamDB:

docker-compose -f quick-start.yaml up -d

通过:

docker-compose -f quick-start.yaml logs hstream-http-server

将会看到以下 log:

Server is configured with:
     gRPCServerHost: hserver
     gRPCServerPort: 6570
     httpServerPort: 6580
 Setting gRPC connection
 Setting HTTP server
 Server started on port 6580 

通过 HStreamDB CLI 创立所须要的 Stream

Stream 是 HStreamDB 中用来存储流式数据的对象,能够看作是一些数据的汇合。

启动 HStreamDB CLI

用 docker 启动一个 HStreamDB 的命令行界面:

docker run -it --rm --name some-hstream-cli --network host hstreamdb/hstream hstream-client --port 6570 --client-id 1

你将会进入到以下界面:

      __  _________________  _________    __  ___
     / / / / ___/_  __/ __ \/ ____/   |  /  |/  /
    / /_/ /\__ \ / / / /_/ / __/ / /| | / /|_/ /
   / __  /___/ // / / _  _/ /___/ ___ |/ /  / /
  /_/ /_//____//_/ /_/ |_/_____/_/  |_/_/  /_/

>

创立 HStreamDB Stream,用来保留桥接过来的数据:

> CREATE STREAM emqx_rule_engine_output ;
emqx_rule_engine_output

当然咱们也能够通过 SHOW 失去曾经创立好的 Stream:

> SHOW STREAMS;
emqx_rule_engine_output

配置 EMQ X

而后,咱们关上 EMQ X 的 Dashboard,点击规定引擎(Rule Engine),进入资源(Resource)界面。

咱们能够先创立一个 WebHook 资源,如下图:

Request URL 一栏中填入 hstream-http-server 的监听地址,<host>:6580/streams/emqx_rule_engine_output:publish,而后点击 test connection 测试链接。

接着,咱们来创立所须要的规定引擎规定:

SELECT 
  payload,                 -- 在 HStreamDB 的 http 协定中,咱们须要一个 payload 项
  str(payload) as payload, -- HStreamDB 要求 payload 是一个 JSON String
  0 as flag                -- HStreamDB 中 flag 为 0 示意 payload 是一个 JSON String
FROM 
  "#"                      -- 这个符号会匹配所有的 topic

咱们须要减少一个 Action Handler,抉择 ActionData to Web Server

Method 设置为 POSTHeader 退出 content-type application/json

这个时候,咱们曾经实现了最根本的桥接的设置,接下来让咱们通过 websocket 和 hstreamdb-cli 来测试一下吧。

通过 HStreamDB CLI 察看数据的长久化存储是否实现

首先咱们在刚刚启动的 HStreamDB CLI 中创立一个 Query:

> SELECT * FROM emqx_rule_engine_output EMIT CHANGES;

在 HStreamDB 中,每一个 Stream 都示意一串动态变化的数据流,因而一个 Query 并不是简略地读取数据,而是会继续读取并输入被写入 Stream 中的数据。在 CLI 中,读取和输入数据的终点是就是胜利创立 Query 的这一刻。以后,咱们能够察看到的是,CLI 中并没有任何输入。

此时咱们能够通过 EMQ X DashBoard 的 WebSocket 或者其余 MQTT 客户端(例如跨平台 MQTT 5.0 桌面客户端工具 – MQTT X)向 EMQ X 写入数据。

以下用 WebSocket 举例,咱们能够先连贯上咱们启动的 EMQ X 集群:

再向指定的 topic 发送数据:

如果一切正常的话,咱们就能够实时地在 HStreamDB CLI 看到咱们发到 EMQ X 的数据。

> SELECT * FROM emqx_rule_engine_output EMIT CHANGES;
{"location":{"lng":116.296011,"lat":40.005091},"speed":32.12,"tachometer":9001.0,"ts":1563268202,"direction":198.33212,"id":"NXP-058659730253-963945118132721-22","dynamical":8.93}

至此,咱们实现了 EMQ X 接入的数据在 HStreamDB 的长久化存储。

通过将 EMQ X 与 HStreamDB 集成,咱们不仅能够实现对发送到 EMQ X 的数据的长久化存储,还能对这些数据进行实时处理剖析,取得进一步的数据洞察。随着两个产品的不断完善,咱们置信在将来,EMQ X + HStreamDB 的高效组合将在 IoT 畛域实时流数据的剖析和解决场景施展重要作用,成为数据转化与变现过程中的重要一环,为企业数据资产的价值发明提供能源。

正文完
 0