关于消息中间件:智能手持测温枪接入阿里云IoT物联网平台实践实践类

36次阅读

共计 5623 个字符,预计需要花费 15 分钟才能阅读完成。

1. 概述

随着新型冠状病毒疫情倒退,社区居家隔离成为无效伎俩,而体温排查是社区工作的重中之重!借助 IoT 物联网技术能够不便的实现居民体温实时监控和历史数据的残缺追溯。

2. 技术架构计划

基于稳定性,高并发,低时延的考量咱们抉择阿里云 IoT 物联网平台搭建整套零碎。首先手持测温枪通过蓝牙连贯到 DTU 模块,DTU 模块以 MQTT 协定接入物联网平台。数据上云后,通过规定引擎流转服务端订阅的 AMQP 生产组,实时推送到咱们业务服务器。管理人员应用手机小程序即可实时看到出入人员的体温数据。

3. 云端开发

3.1 产品创立

进入物联网平台控制台,创立产品。

在产品详情 Topic 列表, 减少用于数据传输的 Topic,如下:

3.2 注册设施

产品定义好后,咱们基于这个产品创立一个具体设施,获取到设施身份三元组。

3.3 创立生产组

接下来,咱们要在服务端订阅创立用来接收数据的生产组,查看下图:

3.4 配置规定引擎

最初,咱们通过规定引,把设施上报的数据做业务解决后,流转到咱们服务器的生产组,从而实现企业本人的设施采集的业务数据达到企业本人的后盾服务器的流转过程。

4. 设施开发

在实现了云上控制台的配置工作后,咱们要做的就是设施端业务开发。这里咱们在 Mac 上用 nodejs 脚本模仿设施业务行为,设施 MQTT 连贯,数据上报。残缺代码如下:

// 引入依赖 mqtt 库,或本人实现
const mqtt = require('aliyun-iot-mqtt');
// 设施身份
var options = {
    productKey: "设施 pk",
    deviceName: "设施 dn",
    deviceSecret: "设施 ds",
    regionId: "cn-shanghai"
};

// 1. 建设连贯
const client = mqtt.getAliyunIotMqttClient(options);

// 2. 设施接管云端指令数据
client.on('message', function(topic, message) {console.log("topic" + topic)
    console.log("message" + message)
})

// 3. 模仿设施 上报数据(原始报文)setInterval(function() {client.publish(`/${options.productKey}/${options.deviceName}/user/data`, getPostData(),{qos:1});

}, 1000);


// 模仿 设施原有报文格式
function getPostData() {
    let payload = {temperature:Math.floor((Math.random() * 20) + 10)
    };

    console.log("payload=[" + payload+"]")
    return JSON.stringify(payload);
}

至此,咱们实现了设施端业务开发。

5. 服务端开发

服务端咱们以 Java 为例,演示如何接管 IoT 平台推送过去的设施上报数据。

5.1 业务服务器接管 IoT 数据

参考服务端订阅 AMQP 文档 https://help.aliyun.com/docum…
残缺代码如下:

package com.aliyun.iot;

import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.net.URI;
import java.util.Hashtable;

public class AMQPClient {private final static Logger logger = LoggerFactory.getLogger(AMQPClient.class);
    // 生产组配置参数
    private static String accessKey = "阿里云账号 ak";
    private static String accessSecret = "阿里云账号 as";
    private static String consumerGroupId = "服务端订阅生产组 ID";
    private static String aliUID = "替换你的阿里云账号 UID";

    public static void main(String[] args) throws Exception {long timeStamp = System.currentTimeMillis();
        // 签名办法
        String signMethod = "hmacsha1";
        // 控制台服务端订阅中生产组状态页客户端 ID 一栏将显示 clientId 参数。// 倡议应用机器 UUID、MAC 地址、IP 等惟一标识等作为 clientId。便于您辨别辨认不同的客户端。String clientId = "ecs_"+System.currentTimeMillis();

        //UserName 组装
        String userName = clientId + "|authMode=aksign"
                + ",signMethod=" + signMethod
                + ",timestamp=" + timeStamp
                + ",authId=" + accessKey
                + ",consumerGroupId=" + consumerGroupId
                + "|";
        //password 组装
        String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;
        String password = doSign(signContent,accessSecret, signMethod);
        // 依照 qpid-jms 的标准,组装连贯 URL。String connectionUrl = "failover:(amqps://"+aliUID+".iot-amqp.cn-shanghai.aliyuncs.com:5671?amqp.idleTimeout=80000)"
                + "?failover.reconnectDelay=30";

        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("connectionfactory.SBCF",connectionUrl);
        hashtable.put("queue.QUEUE", "default");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
        Destination queue = (Destination)context.lookup("QUEUE");
        // 创立和 IoT 平台的 AMQP 连贯
        Connection connection = cf.createConnection(userName, password);
        ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
        // 创立 Session
        // Session.CLIENT_ACKNOWLEDGE: 收到音讯后,须要手动调用 message.acknowledge()
        // Session.AUTO_ACKNOWLEDGE: SDK 主动 ACK(举荐)Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection.start();
        // 创立消费者
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(messageListener);
    }

    private static MessageListener messageListener = new MessageListener() {
        @Override
        public void onMessage(Message message) {
            try {byte[] body = message.getBody(byte[].class);
                String content = new String(body);
                String topic = message.getStringProperty("topic");
                String messageId = message.getStringProperty("messageId");
                logger.info("receive message"
                        + ", topic =" + topic
                        + ", messageId =" + messageId
                        + ", content =" + content);
                System.out.println();
                // 如果创立 Session 抉择的是 Session.CLIENT_ACKNOWLEDGE,这里须要手动 ACK。//message.acknowledge();
                // 如果要对收到的音讯做耗时的解决,请异步解决,确保这里不要有耗时逻辑。} catch (Exception e) {e.printStackTrace();
            }
        }
    };

    private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
        /**
         * 连贯胜利建设。*/
        @Override
        public void onConnectionEstablished(URI remoteURI) {logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
        }

        /**
         * 尝试过最大重试次数之后,最终连贯失败。*/
        @Override
        public void onConnectionFailure(Throwable error) {logger.error("onConnectionFailure, {}", error.getMessage());
        }

        /**
         * 连贯中断。*/
        @Override
        public void onConnectionInterrupted(URI remoteURI) {logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
        }

        /**
         * 连贯中断后又主动重连上。*/
        @Override
        public void onConnectionRestored(URI remoteURI) {logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
        }

        @Override
        public void onInboundMessage(JmsInboundMessageDispatch envelope) {}

        @Override
        public void onSessionClosed(Session session, Throwable cause) {}

        @Override
        public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}

        @Override
        public void onProducerClosed(MessageProducer producer, Throwable cause) {}};

    /**
     * password 签名计算方法,请参见上一篇文档:AMQP 客户端接入阐明。*/
    private static String doSign(String toSignString, String secret, String signMethod) throws Exception {SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
        Mac mac = Mac.getInstance(signMethod);
        mac.init(signingKey);
        byte[] rawHmac = mac.doFinal(toSignString.getBytes());
        return Base64.encodeBase64String(rawHmac);
    }
}

6. 设施运行日志

6.1 运行数据上报

6.2 数据流转日志

6.3 服务端订阅生产组状况

【往期回顾】

1. 自建 MQTT 集群迁徙阿里云 IoT 平台
2.IoT 时代:WiFi 配网技术分析
3. 微信小程序和 IoT 智能家居实际
4.IoT 云端通用数据解析脚本实际

物联网平台产品介绍详情:https://www.aliyun.com/produc…

             阿里云物联网平台客户交换群 

正文完
 0