乐趣区

关于spark:KubeEdge和Kuiper双剑合并轻松解决边缘流式数据处理

摘要:本篇文章次要分享基于 KubeEdge 和 Kuiper 实现边缘流式数据处理的实践经验。

引言:KubeEdge 是一个开源的边缘计算平台,它在 Kubernetes 原生的容器编排和调度能力之上,扩大实现了 云边协同、计算下沉、海量边缘设施治理、边缘自治等能力。KubeEdge 还将通过插件的模式反对 5G MEC、AI 云边协同等场景,目前在很多畛域都已落地利用。

在边缘的散失解决产品 Kuiper

Kuiper 是从 2019 年初开始做的,在 2019 年 10 月份,公布了第一个版本,始终继续迭代到当初,它的整个架构是一个比拟经典的流式解决架构。

产品设计指标:在云端运行的流式解决,像 Spark 与 Flink 能够运行在边缘端

Kuiper 架构图

整体架构可分为 3 局部,左侧为 sources, 代表数据起源的地位,数据起源可能是 KubeEdge 外面有个边缘端的 MQTT macOS broker,也可能是文件、窗口、数据库;
右侧为 Sinks,代表数据处理实现后所要存储的地位,也就是指标零碎,指标能够是 MQTT,能够将其存到文件、数据库外面,也能够调用 HTTP service;

两头局部分成了这几层,最上层为数据业务逻辑解决,这个层面提供了 SQL statement、Rule Parser,SQL processors 进行解决后并将其转化成 SQL plan;上面层为 Streaming runtime 和 SQL runtime, 运行最终执行进去的 plan;最底层为 storage,用来存储有些音讯流出。

Kuiper 应用场景

流式解决:实现在边缘端的实时流式解决

规定引擎:灵便定义规定引擎,实现告警和音讯转发

数据格式与协定转换:实现边缘与云端不同类型的数据格式与异构协定之间灵便转换,实现 IT&OT 交融

KubeEdge 与 Kuiper 集成

局部架构图

Kuiper是装在 KubeEdge MQTT Broker 前面,整个都运行在边缘端,底下为不同的 Mapper,也就是接入各种各样不同的协定。边缘 MQTT Broker 用来替换音讯。

数据处理的类型:

从设施模型文件定义中获取类型定义

将数据转换为 Kuiper 的数据类型

创立流时,可应用 schema-less 流定义

反对的数据类型有 int、string、bool、float

KubeEdge 模型文件和配置

下图为局部配置文件,包含设施的名称、属性、name、data type、Description 等。

局部配置文件

保留设施模型文件

在 ect/mqtt_source.yaml 中配置模型文件信息

1)KubeEdgeVersion: 目前未应用,为适配未来不同的版本模型文件预留

2)KubeEdgeModelFile:模型文件门路

通过 config-map 下发配置,保留到相干目录下

Kuiper 应用过程

1)定义流:相似余数据库中表格的定义

DATASOURCE=”$hw/events/device/+/twin/update”为 KubeEdge 里定义好的 topic

2)定义并提交规定

用 SQL 实现业务逻辑,并将运行后果发送到指定指标

反对的 SQL

SELECT/FROM/WHERE/ORDER

JOIN/GROUP/HAVING

4 类工夫窗口 + 1 个计数窗口

60+SQL 函数

3)运行

KubeEdge 中部署 Kuiper 规定

1)使用 Kuiper-Kubernetes-tool

2)该程序为一个工具类,独自运行在容器中,执行通过 config-map 下发的命令配置文件

配置文件中用于指定 kuiper 服务所在的地址和端口等信息

命令文件所在的目录

3)通过 config-map 下发命令执行文件,该工具定期主动扫描文件,而后执行命令

Kuiper manager- 云边协同治理控制台

另外一种形式是通过治理控制台来治理很多Kuiper 节点,因为 Kuiper 能够运行在很多节点上。

比方 Kuiper 能够运行在车联网的盒子外面,车联网有很多车,能够通过 Kuiper-manager 把 所有的实例都接入进来,对立对其进行规定更新。

第一步是装置插件,咱们提供了一些插件的常识,比方要接入不同的源,如果咱们这边的源不反对,则能够本人写个插件,将插件进行装置,装置下来之后咱们提供安卓插件界面,就能够应用了。

接下来为创立流定义

下图为数据存储的地位,下图所示为将数据保留到文件系统,进行门路的指定。

下图为可视化的编辑界面,能够进行规定的编写。

利用案例:国家工业互联网大数据中心

该案例是一个十分典型的应用场景。K8s+CloudCore 部署在云端,将规定通过治理通道下放到 Kuiper,Kuiper 的地位是放在 MQTT broker,会将数据定义,实现数据的荡涤。目前通道有两条,第一条是将解决完的音讯发往 Cloud MQTT broker,第二条通道比方本地要做数据长久化,可将其存到 Influxdb 这个继续数据库,咱们在边缘产生的一些第三方利用能够间接去调 Influxdb 外面的数据,做一些展现可视化等。底层是通过 Mapper 把不同的数据给接上来。

Kuiper 里规定引擎的应用场景

LF EdgeX Foundry 内置规定引擎,于 2020 年 4 月 Geneva 版本中曾经正式公布。

利用案例:异构零碎对接数据格式转换

实现与 ERP、MES 等 IT 零碎数据交换,咱们提供了一个非常灵活的扩大能力,包含异构数据通过扩大插件采集后,能够利用 SQL 内置函数或者扩大函数进行疾速、灵活处理;第二点是拿到数据处理后果后,通过 sink 的数据模板能够对剖析后果进行转换,灵便适配各类指标零碎所需的数据格式和协定,比方同样一条温度大于 30 度的规定,如果要去发送管制设施的指令,并且要发到微信上。这两个不同的指标零碎,它所须要的接口和数据是不一样的,但对于这个规定是一样的,那么能够在 data 外面,依据同一条规定触发两个不同的操作,你能够指定不同的 topic,数据即可发送,不需再进行简单的编程;第三点是利用 SAP NetWeaver RFC SDK, 实现从 SAP 中读取数据,解决并转换后发送到别的异构零碎。

性能数据

Kuiper 反对并发运行数千条规定

8000 规定 *0.1 音讯 / 秒 / 规定,共计的 TPS 为 800 条 / 秒

规定定义

源:MQTT

SQL:select temperature from source where temperature>20(90% 数据被过滤)

指标:日志

配置

AWS:2core*4GB

Ubuntu

资源应用

Memory:89%~72%;0.4MB/rule

GPU:25%

AWS t2.micro 配置 10k+/ s 音讯吞吐

点击关注,第一工夫理解华为云陈腐技术~

退出移动版