共计 4273 个字符,预计需要花费 11 分钟才能阅读完成。
基于 Pub/Sub 模式的同步调用实战
1. 同步调用场景
1.1 背景
MQTT 协定是基于 PUB/SUB 的异步通信模式,无奈实现服务端下发指令给设施端,同时须要设施端返回响应后果的场景。IoT 物联网平台基于 MQTT 协定制订了一套申请和响应的同步机制,无需改变 MQTT 协定即可实现同步通信。应用服务器通过 POP API 发动 Rrpc 调用,IoT 设施端只须要在 Timeout 内,依照固定的格局回复 Pub 音讯,服务端即可同步获取 IoT 设施端的响应后果。
具体流程如下:
1.2 Topic 格局约定
申请:/sys/${productKey}/${deviceName}/rrpc/request/${messageId}**<br />** 响应:**/sys/${productKey}/${deviceName}/rrpc/**response**/**${messageId}
$ 示意变量,每个设施不同
messageId 为 IoT 平台生成的音讯 ID,
设施端回复 responseTopic 里的 messageId 要与 requestTopic 统一
示例 :
设施端须要订阅:
/sys/${productKey}/${deviceName}/rrpc/request/+
运行中设施收到 Topic:
/sys/PK100101/DN213452/rrpc/request/443859344534
收到音讯后,在 timeout 工夫内回复 Topic:
/sys/PK100101/DN213452/rrpc/response/443859344534
2. 同步调用 RRPC 示例
2.1 设施端代码
const mqtt = require('aliyun-iot-mqtt');
// 设施属性
const options = require("./iot-device-config.json");
// 建设连贯
const client = mqtt.getAliyunIotMqttClient(options);
client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)
client.on('message', function(topic, message) {if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){handleRrpc(topic, message)
}
})
function handleRrpc(topic, message){topic = topic.replace('/request/','/response/');
console.log("topic=" + topic)
// 一般 Rrpc,响应 payload 自定义
const payloadJson = {code:200,msg:"handle ok"};
client.publish(topic, JSON.stringify(payloadJson));
}
2.2 服务端 POP 调用 Rrpc
const co = require('co');
const RPCClient = require('@alicloud/pop-core').RPCClient;
const options = require("./iot-ak-config.json");
//1. 初始化 client
const client = new RPCClient({
accessKeyId: options.accessKey,
secretAccessKey: options.accessKeySecret,
endpoint: 'https://iot.cn-shanghai.aliyuncs.com',
apiVersion: '2018-01-20'
});
const payload = {"msg": "hello Rrpc"};
//2. 构建 request
const params = {
ProductKey:"a1gMu82K4m2",
DeviceName:"h5@nuwr5r9hf6l@1532088166923",
RequestBase64Byte:new Buffer(JSON.stringify(payload)).toString("base64"),
Timeout:3000
};
co(function*() {
//3. 发动 API 调用
const response = yield client.request('Rrpc', params);
console.log(JSON.stringify(response));
});
rrpc 响应:
{
"MessageId": "1037292594536681472",
"RequestId": "D2150496-2A61-4499-8B2A-4B3EC4B2A432",
"PayloadBase64Byte": "eyJjb2RlIjoyMDAsIm1zZyI6ImhhbmRsZSBvayJ9",
"Success": true,
"RrpcCode": "SUCCESS"
}
// PayloadBase64Byte 解码: {"code":200,"msg":"handle ok"}
3. 物模型 - 服务同步调用 InvokeThingService 示例
留神:物模型 服务调用 接口 InvokeThingService,不是 Rrpc
设施订阅 subTopic
留神:服务同步调用 API 是 InvokeThingService
/sys/${productKey}/${deviceName}/rrpc/request/+
IoT 云端上行的 payload 格局
{
“id”: 3536123,
“version”: “1.0”,
“params”: {
“ 入参 key1”: “ 入参 value1”,
“ 入参 key2”: “ 入参 value2”
},
“method”: “thing.service.{tsl.service.identifier
}
设施响应 replyTopic
/sys/${productKey}/${deviceName}/rrpc/response/request 的音讯 Id
设施响应 payload 格局
{
“id”: 3536123,
“code”: 200,
“data”: {
“ 出参 key1”: “ 出参 value1”,
“ 出参 key2”: “ 出参 value2”
}
}
3.1 物模型 - 同步服务定义
3.2 设施端实现
const mqtt = require('aliyun-iot-mqtt');
// 设施属性
const options = require("./iot-device-config.json");
// 建设连贯
const client = mqtt.getAliyunIotMqttClient(options);
client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)
client.on('message', function(topic, message) {if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){handleRrpc(topic, message)
}
})
/*
* 如果存在多个同步调用服务,须要通过 payload 里的 method 辨别
*/
function handleRrpc(topic, message){topic = topic.replace('/request/','/response/');
console.log("topic=" + topic)
// 物模型 同步服务调用,响应 payload 构造:const payloadJson = {id: Date.now(),
code:200,
data: {currentMode: Math.floor((Math.random() * 20) + 10)
}
}
client.publish(topic, JSON.stringify(payloadJson));
}
留神:设施端响应的 payload 要满足物模型定义的出参构造
3.3 服务端 POP 接口 InvokeThingService
const co = require('co');
const RPCClient = require('@alicloud/pop-core').RPCClient;
const options = require("./iot-ak-config.json");
//1. 初始化 client
const client = new RPCClient({
accessKeyId: options.accessKey,
secretAccessKey: options.accessKeySecret,
endpoint: 'https://iot.cn-shanghai.aliyuncs.com',
apiVersion: '2018-01-20'
});
const params = {
ProductKey: "a1gMu82K4m2",
DeviceName: "h5@nuwr5r9hf6l@1532088166923",
Args: JSON.stringify({"mode": "1"}),
Identifier: "thing.service.setMode"
};
co(function*() {
try {
//3. 发动 API 调用
const response = yield client.request('InvokeThingService', params);
console.log(JSON.stringify(response));
} catch (err) {console.log(err);
}
});
调用后果:
{
"Data":{"Result": "{\"currentMode\":12}",
"MessageId": "1536145625658"
},
"RequestId": "29FD78CE-D1FF-48F7-B0A7-BD52C142DD7F",
"Success": true
}
物联网平台产品介绍详情:https://www.aliyun.com/product/iot/iot_instc_public_cn
阿里云物联网平台客户交换群