关于前端:DGIOT平台实时展示OPC上报数据全流程代码剖析

50次阅读

共计 3615 个字符,预计需要花费 10 分钟才能阅读完成。

[小 迪 导读]:OPC 软件作为工业自动化畛域利用最宽泛的软件,深受工业管制人员的青睐。但也有许多状况下,OPC 软件并不能满足理论的应用需要:
应用场景
1.OPC 只在内网运行,心愿能够将数据传递至外网,随时随地查看
2.OPC 数据难以存库
3. 心愿能够更好展现数据,进行数据分析

整体交互图

1. dgiot_dtu 从 kepserver 获取数据

在 opc 与 kepserver 实现连贯之后,dgiot_dtu 通过调用 GetOpcDaService 函数连贯 kepserver 实现数据回调

 public OpcDaService GetOpcDaService(string host, string serviceProgId)
        {var service = hostCollection.Where(a => a.ServiceIds.Contains(serviceProgId) && a.Host == host)
                      .FirstOrDefault();

            if (service == null)
            {return null;}

            OpcDaService service1 = null;
            if (CheckServiceExisted(service, serviceProgId))
            {service1 = opcDaServices.Find(item => { return item.Host == service.Host && item.ServiceId == serviceProgId;});
            }
            else
            {OpcDaServer daService = new OpcDaServer(serviceProgId, service.Host);

                service1 = new OpcDaService()
                {
                    Host = service.Host,

                    ServiceId = serviceProgId,

                    Service = daService,

                    OpcDaGroupS = new Dictionary<string, OpcDaGroup>()};
                opcDaServices.Add(service1);
            }

            if (service1.Service.IsConnected == false)
            {
                try
                {service1.Service.ConnectionStateChanged += new EventHandler<OpcDaServerConnectionStateChangedEventArgs>(ConnectionStateChanged);

                    service1.Service.Connect();}
                catch (Exception e)
                {LogHelper.Log("Connect" + service1.Host + ", ServiceId" + service1.ServiceId + "error!!" + e.Message);
                }
            }

            return service1;
        }

2. dgiot_dtu 数据公布

dgiot_dtu 在实现数据收集后,和 dgiot 平台连贯是通过 mqtt 来进行连贯,dgiot_dtu 作为设施端进行公布、dgiot 平台作为服务端进行订阅


        public void ValueChangedCallBack(OpcDaGroup group, OpcDaItemValue[] values)
        {

            string groupKey = "";

            JsonObject properties = new JsonObject();
            values.ToList().ForEach(v =>
            {if (v.Item != null && v.Value != null)
                {properties.Add(v.Item.ItemId, v.Value);
                    groupKey = v.Item.UserData as string;
                    OpcDa.setItems(groupKey, v.Item.ItemId, properties);
                }
            });
            string topic = "$dg/thing/" + productId + "/" + devAddr + "/properties/report";
            MqttClientHelper.Publish(topic, Encoding.UTF8.GetBytes(properties.ToString()));
            return;
            }

3. 平台通过 dlink 进行数据点位转换

在 dgiot 收到订阅信息时,在 dink 中调用 on_message_publish 来匹配 topic 类型进行点位转换

on_message_publish(Message = #message{topic = <<"$dg/thing/", Topic/binary>>, payload = Payload, from = _ClientId, headers = _Headers}, _State) ->
    case re:split(Topic, <<"/">>) of
        [ProductId, DevAddr, <<"properties">>, <<"report">>] ->
            dgiot_dlink_proctol:properties_report(ProductId, DevAddr, get_payload(Payload));
        [ProductId, DevAddr, <<"firmware">>, <<"report">>] ->
            dgiot_dlink_proctol:firmware_report(ProductId, DevAddr, get_payload(Payload));
        _ ->
            pass
    end,
    {ok, Message};

4. 平台通过 task 将数据存入 TD, 并应用 mqtt 上报给物模型

实现点位转换之后,将数据存入 TD 数据库、同时也将数据上报给物模型展现

save_td(ProductId, DevAddr, Ack, AppData) ->
    case length(maps:to_list(Ack)) of
        0 ->
            #{};
        _ ->
            NewAck = dgiot_task:get_collection(ProductId, [], Ack, Ack),
            NewData = dgiot_task:get_calculated(ProductId, NewAck),
            Keys = dgiot_product:get_keys(ProductId),
            DeviceId = dgiot_parse_id:get_deviceid(ProductId, DevAddr),
            Interval = maps:get(<<"interval">>, AppData, 3),
            AllData = merge_cache_data(DeviceId, NewData, Interval),
            AllDataKey = maps:keys(AllData),
            case Keys -- AllDataKey of
                List when length(List) == 0 andalso length(AllDataKey) =/= 0 ->
                    ChannelId = dgiot_parse_id:get_channelid(dgiot_utils:to_binary(?BRIDGE_CHL), <<"DGIOTTOPO">>, <<"TOPO 组态通道"/utf8>>),
                    dgiot_channelx:do_message(ChannelId, {topo_thing, ProductId, DeviceId, AllData}),
                    dgiot_tdengine_adapter:save(ProductId, DevAddr, AllData),
                    Channel = dgiot_product:get_taskchannel(ProductId),
                    dgiot_bridge:send_log(Channel, ProductId, DevAddr, "~s ~p save td => ProductId ~p DevAddr ~p ~ts", [?FILE, ?LINE, ProductId, DevAddr, unicode:characters_to_list(jsx:encode(AllData))]),
                    dgiot_metrics:inc(dgiot_task, <<"task_save">>, 1),
                    NotificationTopic = <<"$dg/user/alarm/", ProductId/binary, "/", DeviceId/binary, "/properties/report">>,
                    dgiot_mqtt:publish(DeviceId, NotificationTopic, jsx:encode(AllData)),
                    AllData;
                _ ->
                    save_cache_data(DeviceId, AllData),
                    AllData
            end
    end.

5. 平台配置物模型并数据展现

[小 迪 点评]

  • 鉴此,dgiot 专门提供了基于 OPC 通信的 OPC 接口,以实现 OPC 数据的简略传输,解决行业痛点。

想理解更多 dgiot 的具体细节,欢送大家在 GitHub 上查看相干源代码。

正文完
 0