共计 7093 个字符,预计需要花费 18 分钟才能阅读完成。
物联网数据采集波及到大量设施接入、海量的时序数据传输,EMQ X MQTT 服务器 与 TDengine 大数据平台的组合技术栈齐全可能胜任场景中的海量工夫序列监测数据的传输、存储和计算。
数据入库后,往往须要其余形式如数据可视化零碎将数据依照规定统计、展示进去,实现数据的监控、指标统计等业务需要,以便充分发挥数据的价值,TDengine 搭配开源软件 Grafana 能够疾速搭建物联网数据可视化平台。
上述整套计划无需代码开发,波及的产品均能提供开源软件、企业服务、云端 SaaS 服务不同档次的交付模式,可能依据我的项目需要实现免费版或企业版私有化落地以及云端部署。
计划介绍
EMQ X 简介
EMQ X 是基于高并发的 Erlang/OTP 语言平台开发,反对百万级连贯和分布式集群架构,公布订阅模式的开源 MQTT 音讯服务器。EMQ X 内置了大量开箱即用的性能,其 开源版 EMQ X Broker 及 企业版 EMQ X Enterprise 均反对通过规定引擎将设施音讯存储到 TDengine。
TDengine 是什么
TDengine 是涛思数据专为物联网、车联网、工业互联网、IT 运维等设计和优化的大数据平台。除外围的快 10 倍以上的时序数据库性能外,还提供缓存、数据订阅、流式计算等性能,最大水平缩小研发和运维的复杂度,且外围代码,包含集群性能全副开源。
TDengine 提供社区版、企业版和云服务版,装置 / 应用教程详见 TDengine 应用文档。
Grafana 简介
Grafana 是一个跨平台、开源的度量剖析和可视化工具,能够查询处理各类数据源中的数据,进行可视化的展现。它能够疾速灵便创立的客户端图表,面板插件有许多不同形式的可视化指标和日志,官网库中具备丰盛的仪表盘插件,比方热图、折线图、图表等多种展现形式;反对 Graphite,TDengine、InfluxDB,OpenTSDB,Prometheus,Elasticsearch,CloudWatch 和 KairosDB 等数据源,反对数据项独立 / 混合查问展现;能够创立自定义告警规定并告诉到其余音讯解决服务或组件中。
业务场景
本文模仿物联网环境数据采集场景,假如现有肯定数据的环境数据采集点,所有采集点数据均通过 MQTT 协定 传输至采集平台(MQTT Publish),主题设计如下:
sensor/data
传感器发送的数据格式为 JSON,数据包含传感器采集的温度、湿度、噪声音量、PM10、PM2.5、二氧化硫、二氧化氮、一氧化碳、传感器 ID、区域、采集工夫等数据。
{
"temperature": 30,
"humidity" : 20,
"volume": 44.5,
"PM10": 23,
"pm25": 61,
"SO2": 14,
"NO2": 4,
"CO": 5,
"id": "10-c6-1f-1a-1f-47",
"area": 1,
"ts": 1596157444170
}
当初须要实时存储以便在后续任意工夫查看数据,提出以下的需要:
- 每个设施依照每 5 秒钟一次的频率进行数据上报,数据库需存储每条数据以供后续回溯剖析;
- 通过可视化零碎查看 任意区域、任意工夫区间内 的指标数据,如平均值、最大值、最小值。
环境筹备
本文所用各个组件均有 Docker 镜像,除 EMQ X 须要批改多数配置为了便于操作应用下载安装外,TDengine 与 Grafana 均应用 Docker 搭建。
安装包资源与应用教程参照各自官网:
- EMQ X:EMQ 官网 https://www.emqx.io/cn/
- TDengine:涛思数据官网 https://www.taosdata.com/cn/
- Grafana:Grafana 官网 https://grafana.com/
装置 EMQ X
如果您是 EMQ X 老手用户,举荐通过 EMQ X 文档 疾速上手
拜访 EMQ X 下载 页面下载适宜您操作系统的安装包,本文截稿时 EMQ X 开源版最新版本为 v4.1.2,下载 zip 包的启动步骤如下:
## 解压下载好的安装包
unzip emqx-macosx-v4.1.1.zip
cd emqx
## 以 console 模式启动 EMQ X 不便调试
./bin/emqx console
启动胜利后浏览器拜访 http://127.0.0.1:18083 拜访 EMQ X 治理控制台 Dashboard,应用 admin
public
默认用户名明码实现首次登录。
装置 TDengine
为了不便测试应用通过 Docker 进行装置(需映射网络端口),也能够应用安装包的形式进行装置:
## 拉取并启动容器
docker run -d --name tdengine -p 6030-6041:6030-6041 tdengine/tdengine:latest
## 启动后查看容器运行状态
docker ps -a
装置 Grafana
应用以下命令通过 Docker 装置并启动 Grafana:
docker run -d --name=grafana -p 3000:3000 grafana/grafana
启动胜利后浏览器拜访 http://127.0.0.1:3000 拜访 Grafana 可视化面板,应用 admin
admin
默认用户名明码实现首次登录,登录后依照提醒批改明码应用新密码登录进入主界面:
配置 EMQ X 存储数据到 TDengine
TDengine 创立数据库与数据表
进入 TDengine Docker 容器:
docker exec -it tdengine bash
创立 test
数据库:
taos
create database test;
创立 sensor_data 表,对于 TDengine 数据结构以及 SQL 命令参见 TAOS SQL:
use test;
CREATE TABLE sensor_data (
ts timestamp,
temperature float,
humidity float,
volume float,
PM10 float,
pm25 float,
SO2 float,
NO2 float,
CO float,
sensor_id NCHAR(255),
area TINYINT,
coll_time timestamp
);
配置 EMQ X 规定引擎
关上 EMQ X Dashboared,进入 规定引擎 -> 规定 页面,点击 创立 按钮进入创立页面。
规定 SQL
规定 SQL 用于 EMQ X 音讯以及事件筛选,以下 SQL 示意从 sensor/data
主题筛选出 payload 数据:
SELECT
payload
FROM
"sensor/data"
应用 SQL 测试性能,输出测试数据进行筛选后果测试,测试有后果且输入内容如下,表明 SQL 编写正确:
{"payload": "{\"temperature\":30,\"humidity\":20,\"volume\":44.5,\"PM10\":23,\"pm2.5\":61,\"SO2\":14,\"NO2\":4,\"CO\":5,\"id\":\"10-c6-1f-1a-1f-47\",\"area\":1,\"ts\":1596157444170}"
}
响应动作
为反对各种不同类型平台的开发,TDengine 提供合乎 REST 设计标准的 API。通过 RESTful Connector 提供了最简略的连贯形式,即应用 HTTP 申请携带认证信息与要执行的 SQL 操作 TDengine。
应用 EMQ X 开源版中的 发送到 Web 服务 即可通过 RESTful Connector 写入数据到 TDengine。行将到来的 EMQ X 企业版 4.1.1 版本将提供原生更高性能的写入 Connector。
发送到 Web 服务须要两个数据,一个是关联资源,另一个是音讯内容模板。
- 关联资源:HTTP 服务器配置信息,此处为 TDengine 的 RESTful Connector
- 音讯内容模板:此处为携带数据的 INSERT SQL,留神咱们该当在 SQL 中指定数据库名,字符类型也要用单引号括起来,音讯内容模板为:
INSERT INTO test.sensor_data VALUES(
now,
${payload.temperature},
${payload.humidity},
${payload.volume},
${payload.PM10},
${payload.pm25},
${payload.SO2},
${payload.NO2},
${payload.CO},
'${payload.id}',
${payload.area},
${payload.ts}
)
创立过程
点击响应动作下的 增加 按钮,在弹出框内抉择 发送数据到 Web 服务 ,点击 新建资源 新建一个 WebHook 资源。
资源类型抉择 Webhook,申请 URL 填写 http://127.0.0.1:6041/rest/sql,申请办法抉择 POST,还需增加 Authorization 申请头作为认证信息。
Authorization 的值为 Basic + TDengine 的 {username}:{password}
通过 Base64 编码之后的字符串, 例如 root:taosdata
编码后为 cm9vdDp0YW9zZGF0YQ==
,理论填入的值为:Basic cm9vdDp0YW9zZGF0YQ==
在响应动作创立页面抉择新建的资源,并填入音讯模板内容即可。
生成模仿数据
以下脚本模仿了 10000 个设施在过来 24 小时内、每隔 5 秒钟上报一条模仿数据并发送到 EMQ X 的场景。
- 总数据量:24 3600 / 5 10000 = 1.72 亿条
- 音讯 TPS:2000
读者装置 Node.js,按需批改配置参数后能够通过以下命令启动:
npm install mqtt mockjs --save --registry=https://registry.npm.taobao.org
node mock.js
附:模仿生成数据并发送到 EMQ X 代码,请依据集群性能调整相干参数
// mock.js
const mqtt = require('mqtt')
const Mock = require('mockjs')
const EMQX_SERVER = 'mqtt://localhost:1883'
const CLIENT_NUM = 10000
const STEP = 5000 // 模仿采集工夫距离 ms
const AWAIT = 5000 // 每次发送完后休眠工夫,避免音讯速率过快 ms
const CLIENT_POOL = []
startMock()
function sleep(timer = 100) {
return new Promise(resolve => {setTimeout(resolve, timer)
})
}
async function startMock() {const now = Date.now()
for (let i = 0; i < CLIENT_NUM; i++) {const client = await createClient(`mock_client_${i}`)
CLIENT_POOL.push(client)
}
// last 24h every 5s
const last = 24 * 3600 * 1000
for (let ts = now - last; ts <= now; ts += STEP) {for (const client of CLIENT_POOL) {const mockData = generateMockData()
const data = {
...mockData,
id: client.clientId,
area: 0,
ts,
}
client.publish('sensor/data', JSON.stringify(data))
}
const dateStr = new Date(ts).toLocaleTimeString()
console.log(`${dateStr} send success.`)
await sleep(AWAIT)
}
console.log(`Done, use ${(Date.now() - now) / 1000}s`)
}
/**
* Init a virtual mqtt client
* @param {string} clientId ClientID
*/
function createClient(clientId) {return new Promise((resolve, reject) => {
const client = mqtt.connect(EMQX_SERVER, {clientId,})
client.on('connect', () => {console.log(`client ${clientId} connected`)
resolve(client)
})
client.on('reconnect', () => {console.log('reconnect')
})
client.on('error', (e) => {console.error(e)
reject(e)
})
})
}
/**
* Generate mock data
*/
function generateMockData() {
return {"temperature": parseFloat(Mock.Random.float(22, 100).toFixed(2)),
"humidity": parseFloat(Mock.Random.float(12, 86).toFixed(2)),
"volume": parseFloat(Mock.Random.float(20, 200).toFixed(2)),
"PM10": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
"pm25": parseFloat(Mock.Random.float(0, 300).toFixed(2)),
"SO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
"NO2": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
"CO": parseFloat(Mock.Random.float(0, 50).toFixed(2)),
"area": Mock.Random.integer(0, 20),
"ts": 1596157444170,
}
}
可视化配置
组件装置实现,模仿数据写入胜利后,依照 Grafana 可视化界面的操作指引,实现业务所需数据可视化配置。
增加数据源(Add data source)
增加数据源,即显示的数据源信息。选取 TDengine 类型数据源,输出连贯参数进行配置,默认状况下,要害配置信息如下:
增加仪表盘(New Dashboard)
增加好数据源后,增加须要显示的数据仪表盘信息。仪表盘为多个可视化面板的汇合,点击 New Dashboard 后,抉择 + Query 通过查问来增加数据面板。
创立面板须要四个步骤,别离是 Queries(查问)、Visualization(可视化)、General(图表配置)、Alert(告警),创立工夫
平均值面板
应用 Grafana 的可视化查问构建工具,查问出所有设施的平均值。
以下 SQL 依照指定时间段($form $to)、指定工夫距离($interval),查问出数据中要害指标的平均值:
select avg(temperature), avg(humidity), avg(volume), avg(PM10), avg(pm25), avg(SO2), avg(NO2), avg(CO) from test.sensor_data where coll_time >= $from and coll_time < $to interval($interval)
Visualization 默认不做更改,General 外面批改面板名称为 历史平均值,如果须要对业务进行监控告警,能够在 Alert 里编排告警规定,此处仅做可视化展现,不应用此性能。
实现创立后,点击左上角返回按钮,该 Dashboard 里胜利增加一个数据面板。点击顶部导航栏 保留 图标,输出 Dashboard 名称实现 Dashboard 的创立。
最大值、最小值面板
持续点击 Dashboard 的 Add panel 按钮,增加最大值、最小值图表。操作步骤同增加平均值,仅对查问中 SELECT 统计办法字段做出调整,调整为 AVG 函数为 MAX 与 MIN:
select max(temperature), max(humidity), max(volume), max(PM10), max(pm25), max(SO2), max(NO2), max(CO), min(temperature), min(humidity), min(volume), min(PM10), min(pm25), min(SO2), min(NO2), min(CO) from test.sensor_data where coll_time >= $from and coll_time < $to interval($interval)
仪表盘成果
保留仪表盘,拖拽调整每个数据面板大小、地位,最终失去一个视觉效果较好的数据仪表盘。仪表盘右上角能够抉择工夫区间、主动刷新工夫,此时设施继续发送数据采集数据,仪表盘数据值会有所变动,实现了比拟好的可视化成果。
总结
至此咱们借助 EMQ X + TDengine 实现了物联网数据传输、存储、展示整个流程的零碎搭建,读者能够理解到 EMQ X 丰盛的拓展能力与 TDengine 齐备的大数据平台个性在物联网数据采集中的利用。深刻学习把握 Grafana 的其余性能后,用户能够定制出更欠缺的数据可视化乃至监控告警零碎。
版权申明:本文为 EMQ 原创,转载请注明出处。
原文链接:https://www.emqx.io/cn/blog/e…