关于iot:EMQ-X-IoTDB存储-MQTT-消息到时序数据库

12次阅读

共计 5202 个字符,预计需要花费 14 分钟才能阅读完成。

IoTDB 是最早由清华大学发动的开源时序数据库我的项目,现曾经是 Apache 的顶级我的项目。IoTDB 能够为用户提供数据收集、存储和剖析等服务。因为其轻量级架构、高性能和高可用的个性,以及与 Hadoop 和 Spark 生态的无缝集成,满足了工业 IoT 畛域中海量数据存储、高吞吐量数据写入和简单数据查问剖析的需要。

EMQ X 是一个大规模扩大、可弹性伸缩的开源云原生分布式物联网消息中间件,由开源物联网数据基础设施软件供应商 EMQ 映云科技公布。EMQ X 能够高效牢靠地解决海量物联网设施的并发连贯,并且内置了弱小的规定引擎性能,用以对事件和音讯流数据进行高性能地实时处理。规定引擎通过 SQL 语句提供了灵便的 “ 配置式 ” 的业务集成计划,简化了业务开发流程,晋升了易用性,升高了用户的业务逻辑与 EMQ X 的耦合度。

本文将介绍如何应用 EMQ X 规定引擎的 MQTT 数据桥接性能,接管 MQTT 客户端发送的数据,并实时插入到时序数据库 IoTDB。

筹备工作

本文示例中用到的软件和环境:

  • 操作系统: Mac OSX
  • IoTDB: Binary 包(Server),版本 0.12.4
  • MQTT 服务器: EMQ X 开源版 v4.3.11
  • MQTT 客户端软件:MQTTX v1.6.0

IoTDB 装置

首先咱们须要从 IoTDB 官网页面下载 IoTDB Server(单机版)的二进制包。

下载实现之后解压,进入解压后的目录:

% ls
LICENSE         README.md       RELEASE_NOTES.md data             ext             licenses         sbin
NOTICE           README_ZH.md     conf             docs             lib             logs             tools

要启用 IoTDB 的 MQTT 协定反对,须要改变 IoTDB 的配置文件 conf/iotdb-engine.properties

* 后续建模应用了一个存储组 root.sg,为了减少写入并行度,须要同时将 iotdb-engine.properties 中的 virtual_storage_group_num 设置为机器核数。

####################
### MQTT Broker Configuration
####################

# whether to enable the mqtt service.
enable_mqtt_service=true

# the mqtt service binding host.
mqtt_host=0.0.0.0

# the mqtt service binding port.
mqtt_port=2883

# the handler pool size for handing the mqtt messages.
mqtt_handler_pool_size=1

# the mqtt message payload formatter.
mqtt_payload_formatter=json

# max length of mqtt message in byte
mqtt_max_message_size=1048576

其中 enable_mqtt_service 默认为 false,须要改成 truemqtt_port 默认值是 1883,为了防止与 emqx 的端口号抵触,须要改为 2883。

而后应用 ./sbin/start-server.sh 启动 IoTDB 服务端:

% ./sbin/start-server.sh
---------------------
Starting IoTDB
---------------------
Maximum memory allocation pool = 2048MB, initial memory allocation pool = 512MB
If you want to change this configuration, please check conf/iotdb-env.sh(Unix or OS X, if you use Windows, check conf/iotdb-env.bat).
2022-01-10 14:15:31,914 [main] INFO o.a.i.d.c.IoTDBDescriptor:121 - Start to read config file file:./sbin/../conf/iotdb-engine.properties
...
2022-01-10 14:14:28,690 [main] INFO o.a.i.d.s.UpgradeSevice:73 - Upgrade service stopped
2022-01-10 14:14:28,690 [main] INFO o.a.i.db.service.IoTDB:153 - Congratulation, IoTDB is set up successfully. Now, enjoy yourself!
2022-01-10 14:14:28,690 [main] INFO o.a.i.db.service.IoTDB:101 - IoTDB has started

咱们放弃这个终端窗口不动,另外关上一个新的命令行终端窗口,启动 IoTDB 的 shell 工具:

% ./sbin/start-cli.sh
---------------------
Starting IoTDB Cli
---------------------
_____       _________ ______   ______
|_   _|     | _   _ ||_   _ `.|_   _ \
| |   .--.|_/ | | \_| | | `. \ | |_) |
| | / .'`\ \ | |     | | | | | __'.
_| |_| \__. | _| |_   _| |_.' /_| |__) |
|_____|'.__.' |_____| |______.'|_______/ version 0.12.4


IoTDB> login successfully
IoTDB>

至此 IoTDB 环境就筹备好了。如要理解 IoTDB 的根本应用办法,能够参考官网的疾速上手页面。

装置、配置 EMQ X

下载和启动 EMQ X

咱们间接应用命令行下载 macOS 版本的 EMQ X 开源版,更多安装包请拜访 EMQ X 开源版下载页面。

% wget https://www.emqx.com/en/downloads/broker/4.3.11/emqx-macos-4.3.11-amd64.zip

而后解压并启动 EMQ X:

% unzip -q emqx-macos-4.3.11-amd64.zip
% cd emqx
% ./bin/emqx console

log.to = "console"
Erlang/OTP 23 [erts-11.1.8] [emqx] [64-bit] [smp:8:8] [ds:8:8:8] [async-threads:4] [hipe]
Starting emqx on node emqx@127.0.0.1
Start mqtt:tcp:internal listener on 127.0.0.1:11883 successfully.
Start mqtt:tcp:external listener on 0.0.0.0:1883 successfully.
Start mqtt:ws:external listener on 0.0.0.0:8083 successfully.
Start mqtt:ssl:external listener on 0.0.0.0:8883 successfully.
Start mqtt:wss:external listener on 0.0.0.0:8084 successfully.
Start http:management listener on 8081 successfully.
Start http:dashboard listener on 18083 successfully.
EMQ X Broker 4.3.11 is running now!
Eshell V11.1.8 (abort with ^G)
(emqx@127.0.0.1)1>

配置规定

应用浏览器关上 EMQ X Dashboard,在规定引擎页面创立一条规定:

SQL 语句为:

SELECT
    clientid,
    now_timestamp('millisecond') as now_ts_ms,
    payload.bar as bar
FROM
    "t/#"

而后咱们在页面的底部,给规定加一个 “ 桥接数据到 MQTT Broker” 动作:

这个动作须要关联一个资源,咱们点击右上角的“新建资源”来创立一个 MQTT Bridge 资源:

近程 Broker 地址要填写 IoTDB 的 MQTT 服务地址,即 “127.0.0.1:2883″。客户端 Id、用户名、明码都填写 root,因为 root 是 IoTDB 默认的用户名和明码。

其余选项放弃默认值不变,点击”测试连贯“按钮确保配置无误,而后再点击右下角的”新建“按钮创立资源。

当初返回到动作创立页面,关联资源的下拉框里主动填充了咱们方才创立的资源。

当初咱们持续填写更多的动作参数:

IoTDB 不关怀音讯主题,咱们填一个任意的主题:foo

IoTDB 要求音讯内容是一个 JSON 格局,音讯内容模板能够依照上图中款式填写。详情请参见 IoTDB 的通信服务协定文档。

{"device": "root.sg.${clientid}",
 "timestamp": ${now_ts_ms},
 "measurements": ["bar"],
 "values": [${bar}
 ]
}

留神其中的 “${clientid}”, “${now_ts_ms}” 以及 “${bar}” 都是从规定的 SQL 语句的输入中提取的变量,所以必须保障这些变量跟 SQL 语句的 SELECT 字句对应上。

当初能够点击”确认“保留动作配置,而后再次点击”新建“实现规定的创立。

应用 MQTT Client 发送音讯

接下来咱们应用 MQTT 客户端工具 – MQTT X,来发送一条音讯给 EMQ X:

MQTT X 是 EMQ 公布的一款齐全开源的 MQTT 5.0 跨平台桌面客户端。反对疾速创立多个同时在线的 MQTT 客户端连贯,不便测试 MQTT/TCP、MQTT/TLS、MQTT/WebSocket 的连贯、公布、订阅性能及其他 MQTT 协定个性。

MQTT 客户端的连贯参数外面,咱们只须要填一个参数,Client ID:”abc”,其余的放弃默认值不变。

连贯胜利之后,咱们发送 2 条主题为:”t/1″ 的音讯,音讯内容格局为:

{"bar": 0.2}

而后回到 EMQ X Dashboard 的规定引擎页面,察看规定的命中次数,确认规定被触发了 2 次:

最初咱们回到命令行终端的 IoTDB 客户端窗口,应用上面的 SQL 语句查问数据:

IoTDB> SHOW TIMESERIES root.sg.abc
+---------------+-----+-------------+--------+--------+-----------+----+----------+
|     timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|
+---------------+-----+-------------+--------+--------+-----------+----+----------+
|root.sg.abc.bar| null|     root.sg|   FLOAT| GORILLA|     SNAPPY|null|      null|
+---------------+-----+-------------+--------+--------+-----------+----+----------+
Total line number = 1
It costs 0.006s

IoTDB> SELECT * FROM root.sg.abc
+-----------------------------+---------------+
|                         Time|root.sg.abc.bar|
+-----------------------------+---------------+
|2022-01-10T17:39:41.724+08:00|            0.3|
|2022-01-10T17:40:32.805+08:00|            0.2|
+-----------------------------+---------------+
Total line number = 2
It costs 0.007s
IoTDB>

数据插入胜利!

结语

至此,咱们实现了通过 EMQ X 规定引擎性能将音讯长久化到 IoTDB 时序数据库。

在理论生产场景中,咱们能够应用 EMQ X 解决海量的物联网设施并发连贯,并通过规定引擎灵便地解决业务性能,而后将设施发送的音讯长久化到 IoTDB 数据库,最初应用 Hadoop/Spark、Flink 或 Grafana 等对接 IoTDB 实现大数据分析、可视化展现等。

EMQ X + IoTDB 的组合是一个简洁、高效且易扩大、高可用的服务端集成计划,对于物联网设施治理和数据处理场景来说,是一个不错的抉择。

正文完
 0