乐趣区

关于物联网:EMQ-X-CNN-在-AIoT-中的融合应用

提起物联网(IoT)和人工智能(AI),人们并不生疏。作为当今时代非常热门的科技概念,它们其实都与「数据」无关:IoT 解决了数据从哪里来,AI 则解决了数据去往何方、用于何处。 一个将两者联合的新概念「AIoT」也应运而生:IoT 通过万物连贯与通信为 AI 提供海量数据,AI 则通过对数据的一直学习与剖析,将其转化为无效信息,为理论畛域提供效用

在本文中,咱们将提出 AIoT 的一个简略交融利用:利用 物联网消息中间件 EMQ X Broker 收集液压零碎温度传感器数据,并将其转发到一维 卷积神经网络 (1D CNN),利用这一 AI 深度学习的代表算法预测液压零碎冷却器状态。

在一维卷积神经网络上,工夫将被看做一个空间纬度,每个输入工夫步都是利用输出序列在工夫维度上的一小段失去的,为此咱们能够利用该个性实现时序数据的预测。咱们将应用 Python 代码模仿温度传感器时序数据,通过 MQTT 协定 传输到 EMQ X Broker,并利用其灵便的规定引擎将数据转发到 webhook,根据输出的温度传感器时序数据实现以后液压零碎冷却器的状态预测。

数据集筹备

在本文中咱们将应用 UCI 机器学习与智能零碎核心提供的 液压零碎状态监测数据集,该数据集是在液压试验台上试验取得。该试验台由一级工作回路和二级冷却过滤回路组成,二级冷却过滤回路通过油箱相连。该零碎周期性地反复负载循环 (60 秒),通过扭转四个液压元件(冷却器、阀门、泵和蓄能器)的状态,获取压力、体积流量和温度等过程值。

  • 在该数据集中,TS1.txt, TS2.txt, TS3.txt, TS4.txt 别离为 4 个液压零碎的冷却器温度传感器以 60 秒一个周期所获取到的温度数据,第一个周期传感器温度数据如下图:

  • profile.txt 第一列示意以后周期内液压零碎冷却器状态

    • 3:靠近故障 (close to total failure)
    • 20:低效率 (reduced efficiency)
    • 100:全效率 (full efficiency)

模型训练

咱们将应用一维卷积神经网络 (1D CNN) 来实现模型训练,1D CNN 能够很好地利用于温度传感器数据的工夫序列剖析。在本文中咱们应用 这篇文章 中形容的程序模型来构建一维卷积神经网络,并适当调整数据集以进步预测准确度。

  • 一维卷积神经网络模型构建

    num_sensors = 4
    TIME_PERIODS = 60
    BATCH_SIZE = 16
    EPOCHS = 10
    model_m = Sequential()
    model_m.add(Conv1D(100, 6, activation='relu', input_shape=(TIME_PERIODS, num_sensors)))
    model_m.add(Conv1D(100, 6, activation='relu'))
    model_m.add(MaxPooling1D(3))
    model_m.add(Conv1D(160, 6, activation='relu'))
    model_m.add(Conv1D(160, 6, activation='relu'))
    model_m.add(GlobalAveragePooling1D(name='G_A_P_1D'))
    model_m.add(Dropout(0.5))
    model_m.add(Dense(3, activation='softmax'))
    print(model_m.summary())
    model_m.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    history = model_m.fit(X_train, y_train, batch_size=BATCH_SIZE, epochs=EPOCHS, validation_split=0.2, verbose=2)
  • 模型分类指标的报告

    从报告中能够看出通过温度数据预测冷却器状态 3 (靠近故障),20 (低效率),100 (全效率) 准确率别离为 95%,80%,89%。

模仿数据输出

在本文中咱们将模仿生产环境下冷却器温度传感器数据上报,为此咱们将应用 Python 代码读取数据集中温度数据,并通过 MQTT 协定上报到 EMQ X Broker。

在上面代码中咱们首先应用 pandas 读取数据集中温度数据 (‘TS1.txt’, ‘TS2.txt’, ‘TS3.txt’, ‘TS4.txt’),并对数据做简略解决,而后将数据每秒上报到 EMQ X Broker。

import json
import time

import pandas as pd
from paho.mqtt import client as mqtt_client


broker = '127.0.0.1'
port = 1883
topic = "/1dcnn"
client_id = f'1dcnn-client'


def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def load_data():
    names = ['TS1.txt', 'TS2.txt', 'TS3.txt', 'TS4.txt']
    df = pd.DataFrame()
    for name in names:
        data_file = f'./dataset/{name}'
        read_df = pd.read_csv(data_file, sep='\t', header=None)
        df = df.append(read_df)
    df = df.sort_index()
    df_values = df.values
    df = df_values.reshape(-1, 4, len(df.columns))
    data = df.transpose(0, 2, 1)
    return data


def publish(client):
    data = load_data()
    for x_data in data[-10:]:
        for y_data in x_data:
            t_1, t_2, t_3, t_4 = tuple(y_data)
            msg = {'t1': round(t_1, 3),
                't2': round(t_2, 3),
                't3': round(t_3, 3),
                't4': round(t_4, 3)
            }
            time.sleep(1)
            result = client.publish(topic, json.dumps(msg))
            # result: [0, 1]
            status = result[0]
            if status == 0:
                print(f"Send `{msg}` to topic `{topic}`")
            else:
                print(f"Failed to send message to topic {topic}")


def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)


if __name__ == '__main__':
    run()

故障预测

在本文中咱们将应用 EMQ X Broker 规定引擎将温度传感器数据转发到 webhook,并通过温度传感器采集的数据,实现对冷却器状态预测。

  1. Webhook 代码编写

    import asyncio
    import json
    
    import numpy as np
    import uvicorn
    from keras.models import load_model
    from sklearn.preprocessing import StandardScaler
    from starlette.applications import Starlette
    from starlette.background import BackgroundTask
    from starlette.responses import JSONResponse
    
    
    app = Starlette()
    queue = asyncio.Queue()
    model = load_model('./1d-cnn.h5')
    
    
    @app.on_event('startup')
    async def on_startup():
        print('startup webhook')
    
    
    @app.route('/webhook', methods=['POST'])
    async def webhook(request):
        request_dict = await request.json()
        payload = request_dict['payload']
        data = json.loads(payload)
        values = list(data.values())
        if queue.qsize() == 60:
            items = clear_queue(queue)
            task = BackgroundTask(predictive, data=items)
        else:
            task = None
        queue.put_nowait(values)
        record = {'status': 'success'}
        return JSONResponse(record, status_code=201, background=task)
    
    
    async def predictive(data):
        y_label = {
            0: 3,
            1: 20,
            2: 100
        }
        y_status = {
            3: 'close to total failure',
            20: 'reduced efficiency',
            100: 'full efficiency'
        }
        x_test = np.array(data)
        scaler = StandardScaler()
        x_test = scaler.fit_transform(x_test.reshape(-1, x_test.shape[-1])).reshape(x_test.shape)
        x_test = x_test.reshape(-1, x_test.shape[0], x_test.shape[1])
        results = model.predict(x_test)
        msg = "Current cooler state probability:"
        for i, probability in enumerate(results[0]):
            status = y_status[y_label[i]]
            msg += f"{probability * 100:.2f}% {status}({y_label[i]}),"
        print(msg)
    
    
    def clear_queue(q):
        items = []
        while not q.empty():
            items.append(q.get_nowait())
        return items
    
    
    if __name__ == '__main__':
        uvicorn.run(
            app,
            host='127.0.0.1',
            port=8080,
            loop='uvloop',
            log_level='warning'
        )
    
  2. EMQ X Broker 资源创立

    拜访 EMQ X Dashboard,登录用户名和明码为 admin, public,点击左侧菜单栏规定 -> 资源,创立资源。

  3. EMQ X Broker 规定创立

测试

  1. 启动 Webhook

    python3 webhook.py

  2. 启动 EMQ X Broker

    ./bin/emqx start

  3. 模仿数据输出

    python publish.py

  4. 查看液压零碎冷却器状态预测后果

    从下图中咱们能够看出前五个周期内,通过输出传感器温度预测出以后冷却器状态为靠近故障 (close to total failure),这与数据集中给出的冷却器状态统一。

  5. 别离调整输出数据,查看不同温度下冷却器状态预测后果,并和数据集中试验后果做比照

    • 输出前十个周期内温度传感器数据,查看预测后果并与实验台收集的后果做比照

      批改 publish.py 文件中: for x_data in data: -> for x_data in data[:10]:

      从上图中咱们能够看到预测后果与试验台收集后果统一

    • 抉择数据集中状态为 3 靠近故障 (close to total failure),20 低效率 (reduced efficiency) 的数据作为输出,查看预测后果并与实验台收集的后果做比照

      批改 publish.py 文件中: for x_data in data: -> for x_data in data[728:737]:

      从上图中咱们能够看到预测后果与实验台收集后果有肯定误差,这也验证了模型分类指标的报告中预测准确性概率。

    • 输出后十个周期内温度传感器数据,查看预测后果并与实验台收集的后果做比照

      批改 publish.py 文件中: for x_data in data: -> for x_data in data[-10:]:

      从上图中咱们能够看到预测后果与试验台收集后果大抵统一,但还是存在肯定偏差

总结

至此咱们实现了传感器数据上报,利用 EMQ X 规定引擎实现数据转发,并应用一维卷积神经网络 (1D CNN) 实现了液压零碎冷却器故障预测。

在工业各个领域,不论是机械、电子、钢铁,还是制作、橡胶、纺织、化工、食品,液压传动技术都已成为一项根本利用技术。随着古代工业的一直倒退,液压零碎逐步向高性能、高精度演进,其可靠性就变得至关重要,液压系统故障的检测与诊断也因而越来越受到重视。利用 AI 与深度学习,通过 IoT 大数据采集与剖析对液压零碎的状态进行监控,从而实现故障预测,是 AIoT 为传统工业畛域带来的新的可能。

而在各畛域对液压系统故障预测的理论利用中,为了利用 AI 作出更加精准的预测,须要采集量级更高的时序数据加以分析训练。因而,须要选用性能指标突出且高度稳固牢靠的消息中间件以进行海量数据的接入与传输。EMQ X Broker 作为一款高并发低延时,反对分布式集群架构的开源 MQTT 音讯服务器,反对单机百万连贯,无疑可满足该利用场景以及其余更多物联网利用下的数据传输需要

版权申明:本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.io/cn/blog/emqx-and-1d-cnn-in-aiot

退出移动版