乐趣区

关于物联网:基于PubSub模式的阿里云IoT同步调用详解设备管理运维类

1. 同步调用场景

1.1 背景

MQTT 协定是基于 PUB/SUB 的异步通信模式,无奈实现服务端下发指令给设施端,同时须要设施端返回响应后果的场景。
IoT 物联网平台基于 MQTT 协定制订了一套申请和响应的同步机制,无需改变 MQTT 协定即可实现同步通信。应用服务器通过 POP API 发动 Rrpc 调用,IoT 设施端只须要在 Timeout 内,依照固定的格局回复 Pub 音讯,服务端即可同步获取 IoT 设施端的响应后果。
具体流程如下

1.2 Topic 格局约定

申请 :/sys/${productKey}/${deviceName}/rrpc/request/${messageId}
响应:/sys/${productKey}/${deviceName}/rrpc/response/${messageId}
$ 示意变量,每个设施不同
messageId 为 IoT 平台生成的音讯 ID,设施端回复 responseTopic 里的 messageId 要与 requestTopic 统一

示例
设施端须要订阅 :
/sys/${productKey}/${deviceName}/rrpc/request/+
运行中设施收到 Topic:
/sys/PK100101/DN213452/rrpc/request/443859344534
收到音讯后,在 timeout 工夫内回复 Topic:
/sys/PK100101/DN213452/rrpc/response/443859344534  

2. 同步调用 RRPC 示例

2.1 设施端代码


const mqtt = require('aliyun-iot-mqtt');
// 设施属性
const options = require("./iot-device-config.json");
// 建设连贯
const client = mqtt.getAliyunIotMqttClient(options);

client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)
client.on('message', function(topic, message) {if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){handleRrpc(topic, message)
    }
})

function handleRrpc(topic, message){topic = topic.replace('/request/','/response/');
    console.log("topic=" + topic)
    // 一般 Rrpc,响应 payload 自定义
    const payloadJson = {code:200,msg:"handle ok"};
    client.publish(topic, JSON.stringify(payloadJson));
}

2.2 服务端 POP 调用 Rrpc

const co = require('co');
const RPCClient = require('@alicloud/pop-core').RPCClient;

const options = require("./iot-ak-config.json");

//1. 初始化 client
const client = new RPCClient({
    accessKeyId: options.accessKey,
    secretAccessKey: options.accessKeySecret,
    endpoint: 'https://iot.cn-shanghai.aliyuncs.com',
    apiVersion: '2017-04-20'
});

const payload = {"msg": "hello Rrpc"};

//2. 构建 request
const params = {
    ProductKey:"a1gMu82K4m2",
    DeviceName:"h5@nuwr5r9hf6l@1532088166923",
    RequestBase64Byte:new Buffer(JSON.stringify(payload)).toString("base64"),
    Timeout:3000
};

co(function*() {
    //3. 发动 API 调用
    const response = yield client.request('Rrpc', params);

    console.log(JSON.stringify(response));
});

rrpc 响应

{
    "MessageId": "1037292594536681472",
    "RequestId": "D2150496-2A61-4499-8B2A-4B3EC4B2A432",
    "PayloadBase64Byte": "eyJjb2RlIjoyMDAsIm1zZyI6ImhhbmRsZSBvayJ9",
    "Success": true,
    "RrpcCode": "SUCCESS"
}

// PayloadBase64Byte 解码: {"code":200,"msg":"handle ok"}

3. 物模型 - 服务同步调用 InvokeThingService 示例

留神:物模型 服务调用 接口 InvokeThingService,不是 Rrpc

3.1 物模型 - 同步服务定义

3.2 设施端实现

const mqtt = require('aliyun-iot-mqtt');
// 设施属性
const options = require("./iot-device-config.json");
// 建设连贯
const client = mqtt.getAliyunIotMqttClient(options);

client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)
client.on('message', function(topic, message) {if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){handleRrpc(topic, message)
    }
})
/*
* 如果存在多个同步调用服务,须要通过 payload 里的 method 辨别
*/
function handleRrpc(topic, message){topic = topic.replace('/request/','/response/');
    console.log("topic=" + topic)
    // 物模型 同步服务调用,响应 payload 构造:const payloadJson = {id: Date.now(),
        code:200,
        data: {currentMode: Math.floor((Math.random() * 20) + 10)
        }
    }

    client.publish(topic, JSON.stringify(payloadJson));
}

留神:设施端响应的 payload 要满足物模型定义的出参构造​

3.3 服务端 POP 接口 InvokeThingService

const co = require('co');
const RPCClient = require('@alicloud/pop-core').RPCClient;

const options = require("./iot-ak-config.json");

//1. 初始化 client
const client = new RPCClient({
    accessKeyId: options.accessKey,
    secretAccessKey: options.accessKeySecret,
    endpoint: 'https://iot.cn-shanghai.aliyuncs.com',
    apiVersion: '2018-01-20'
});

const params = {
    ProductKey: "a1gMu82K4m2",
    DeviceName: "h5@nuwr5r9hf6l@1532088166923",
    Args: JSON.stringify({"mode": "1"}),
    Identifier: "thing.service.setMode"
};

co(function*() {
    try {
        //3. 发动 API 调用
        const response = yield client.request('InvokeThingService', params);

        console.log(JSON.stringify(response));
    } catch (err) {console.log(err);
    }
});

 调用后果:

{
    "Data":{"Result": "{\"currentMode\":12}",
        "MessageId": "1536145625658"
    },
    "RequestId": "29FD78CE-D1FF-48F7-B0A7-BD52C142DD7F",
    "Success": true
}

物联网平台产品介绍详情:​​https://www.aliyun.com/product/iot/iot_instc_public_cn​​

             阿里云物联网平台客户交换群
退出移动版