基于Pub/Sub模式的同步调用实战

1.同步调用场景

1.1 背景

MQTT协定是基于PUB/SUB的异步通信模式,无奈实现服务端下发指令给设施端,同时须要设施端返回响应后果的场景。IoT物联网平台基于MQTT协定制订了一套申请和响应的同步机制,无需改变MQTT协定即可实现同步通信。应用服务器通过POP API发动Rrpc调用,IoT设施端只须要在Timeout内,依照固定的格局回复Pub音讯,服务端即可同步获取IoT设施端的响应后果。

具体流程如下

1.2 Topic格局约定

申请:/sys/${productKey}/${deviceName}/rrpc/request/${messageId}**<br />**响应:**/sys/${productKey}/${deviceName}/rrpc/**response**/**${messageId}

$示意变量,每个设施不同
messageId为IoT平台生成的音讯ID,
设施端回复responseTopic里的messageId要与requestTopic统一

示例
设施端须要订阅:
/sys/${productKey}/${deviceName}/rrpc/request/+
运行中设施收到Topic:
/sys/PK100101/DN213452/rrpc/request/443859344534
收到音讯后,在timeout工夫内回复Topic:
/sys/PK100101/DN213452/rrpc/response/443859344534

2.同步调用RRPC示例

2.1 设施端代码

const mqtt = require('aliyun-iot-mqtt');//设施属性const options = require("./iot-device-config.json");//建设连贯const client = mqtt.getAliyunIotMqttClient(options);client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)client.on('message', function(topic, message) {        if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){        handleRrpc(topic, message)    }})function handleRrpc(topic, message){    topic = topic.replace('/request/','/response/');    console.log("topic=" + topic)    //一般Rrpc,响应payload自定义    const payloadJson = {code:200,msg:"handle ok"};    client.publish(topic, JSON.stringify(payloadJson));}

2.2 服务端POP调用Rrpc

const co = require('co');const RPCClient = require('@alicloud/pop-core').RPCClient;const options = require("./iot-ak-config.json");//1.初始化clientconst client = new RPCClient({    accessKeyId: options.accessKey,    secretAccessKey: options.accessKeySecret,    endpoint: 'https://iot.cn-shanghai.aliyuncs.com',    apiVersion: '2018-01-20'});const payload = {  "msg": "hello Rrpc"};//2.构建requestconst params = {    ProductKey:"a1gMu82K4m2",    DeviceName:"h5@nuwr5r9hf6l@1532088166923",    RequestBase64Byte:new Buffer(JSON.stringify(payload)).toString("base64"),    Timeout:3000};co(function*() {    //3.发动API调用    const response = yield client.request('Rrpc', params);    console.log(JSON.stringify(response));});

rrpc响应:

{    "MessageId": "1037292594536681472",    "RequestId": "D2150496-2A61-4499-8B2A-4B3EC4B2A432",    "PayloadBase64Byte": "eyJjb2RlIjoyMDAsIm1zZyI6ImhhbmRsZSBvayJ9",    "Success": true,    "RrpcCode": "SUCCESS"}// PayloadBase64Byte 解码: {"code":200,"msg":"handle ok"}

3.物模型-服务同步调用InvokeThingService示例

留神:物模型 服务调用 接口InvokeThingService,不是Rrpc

设施订阅subTopic
留神:服务同步调用API是InvokeThingService
/sys/${productKey}/${deviceName}/rrpc/request/+

IoT云端上行的payload格局

"id": 3536123,
 "version": "1.0", 
"params": {
   "入参key1": "入参value1",
   "入参key2": "入参value2"
 },
 "method": "thing.service.{tsl.service.identifier
}

设施响应replyTopic
/sys/${productKey}/${deviceName}/rrpc/response/request的音讯Id

设施响应payload格局

"id": 3536123,
"code": 200, 
"data": { 
 "出参key1": "出参value1", 
 "出参key2": "出参value2"
 }
}

3.1 物模型-同步服务定义

3.2 设施端实现

const mqtt = require('aliyun-iot-mqtt');//设施属性const options = require("./iot-device-config.json");//建设连贯const client = mqtt.getAliyunIotMqttClient(options);client.subscribe(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/+`)client.on('message', function(topic, message) {        if(topic.indexOf(`/sys/${options.productKey}/${options.deviceName}/rrpc/request/`)>-1){        handleRrpc(topic, message)    }})/** 如果存在多个同步调用服务,须要通过payload里的method辨别*/function handleRrpc(topic, message){    topic = topic.replace('/request/','/response/');    console.log("topic=" + topic)    //物模型 同步服务调用,响应payload构造:    const payloadJson = {        id: Date.now(),        code:200,        data: {            currentMode: Math.floor((Math.random() * 20) + 10)        }    }    client.publish(topic, JSON.stringify(payloadJson));}

留神:设施端响应的payload要满足物模型定义的出参构造

3.3 服务端POP 接口InvokeThingService

const co = require('co');const RPCClient = require('@alicloud/pop-core').RPCClient;const options = require("./iot-ak-config.json");//1.初始化clientconst client = new RPCClient({    accessKeyId: options.accessKey,    secretAccessKey: options.accessKeySecret,    endpoint: 'https://iot.cn-shanghai.aliyuncs.com',    apiVersion: '2018-01-20'});const params = {    ProductKey: "a1gMu82K4m2",    DeviceName: "h5@nuwr5r9hf6l@1532088166923",    Args: JSON.stringify({ "mode": "1" }),    Identifier: "thing.service.setMode"};co(function*() {    try {        //3.发动API调用        const response = yield client.request('InvokeThingService', params);        console.log(JSON.stringify(response));    } catch (err) {        console.log(err);    }});

调用后果:

{    "Data":{        "Result": "{\"currentMode\":12}",        "MessageId": "1536145625658"    },    "RequestId": "29FD78CE-D1FF-48F7-B0A7-BD52C142DD7F",    "Success": true}

物联网平台产品介绍详情:https://www.aliyun.com/product/iot/iot_instc_public_cn

            阿里云物联网平台客户交换群