Kuiper-中使用-Golang-模版-template-定制分析结果

36次阅读

共计 4851 个字符,预计需要花费 13 分钟才能阅读完成。

简介

用户通过 Kuiper 进行数据分析解决后,应用各种 sink 能够往不同的零碎发送数据分析后果。针对同样的剖析后果,不同的 sink 须要的格局可能未必一样。比方,在某物联网场景中,当发现某设施温度过高的时候,须要向云端某 rest 服务发送一个申请,同时在本地须要通过 MQTT 协定 往设施发送一个管制命令,这两者须要的数据格式可能并不一样,因而,须要对来自于剖析的后果进行「二次解决」后,才能够往不同的指标发送针对数据。本文将介绍如何利用 sink 中的数据模版(data template)来实现对剖析后果的「二次解决」。

Golang 模版介绍

Golang 模版将一段逻辑利用到数据上,而后依照用户指定的逻辑对数据进行格式化输入,Golang 模版常见的应用场景为在网页开发中,比方将 Golang 中的某数据结构进行转换和管制后,将其转换为 HTML 标签输入到浏览器。在 Kuiper 应用了 Golang 的 template(模版)对剖析后果实现「二次解决」,请参考以下来自于 Golang 的官网介绍。

模版是通过将其利用到一个数据结构上来执行的。模版中的正文 (Annotations) 指的是数据结构中的元素(典型的为构造体中的一个字段,或者 map 中的一个 key),正文用于管制执行、并获取用于显示的值。模版的执行会迭代数据结构并设置游标,通过符号「.」来示意,称之为「dot」,在执行过程中指向数据结构中的以后地位。

模版的输出文本能够为 UTF-8 编码的任意文本。「动作 (Actions)」— 数据求值或者控制结构 – 是通过 “{{” 和 “}}” 来界定的;所有在 动作 之外的文本会被放弃原样到输入,除了 raw strings,动作 不可跨行(正文除外)。

动作 (Actions)

Golang 模版提供了一些内置的动作,能够让用户写各种管制语句,用于提取内容。比方,

  • 依据判断条件来输入不同的内容
{{if pipeline}} T1 {{else}} T0 {{end}}
  • 循环遍历数据,并进行解决
{{range pipeline}} T1 {{else}} T0 {{end}}

读者能够看到,动作是用 {{}} 界定的,在 Kuiper 的数据模版应用过程中,因为输入个别也是 JSON 格局,而 JSON 格局是用 {} 来界定,因而读者在不太熟悉应用的时候,在应用 Kuiper 的数据模版的性能会感觉比拟难以了解。比方以下的例子中,

{{if pipeline}} {"field1": true} {{else}}  {"field1": false} {{end}}

上述表达式的意思如下(请留神动作的界定符和 JSON 的界定符):

  • 如果满足了条件 pipeline,则输入 JSON 字符串 {"field1": true}
  • 否则输入 JSON 字符串 {"field1": false}

Kuiper sink 数据格式

Golang 的模版能够作用于各种数据结构,比方 map、切片 (slice),通道等,而 Kuiper 的 sink 中的数据模版失去的数据类型是固定的,是一个蕴含了 Golang map 切片的数据类型,如下所示。

[]map[string]interface{}

切片 (slice) 数据按条发送

流入 sink 的数据是一个 map[string]interface{} 切片的数据结构,然而用户往指标 sink 发送数据的时候,可能是须要单条的数据,而不是所有的数据。比方在这篇 Kuiper 与 AWS IoT Hub 集成 的文章中所介绍的,规定产生的样例数据如下所示。

[{"device_id":"1","t_av":36.25,"t_count":4,"t_max":80,"t_min":10},
  {"device_id":"2","t_av":27,"t_count":4,"t_max":45,"t_min":12}
]

在发送到 sink 的时候,心愿每条数据离开发送,首先须要将 sink 的 sendSingle 设置为 true,而后应用数据模版:{{json .}},残缺配置如下,用户能够将其拷贝到某 sink 配置的最初。

 ...
 "sendSingle": true,
 "dataTemplate": "{{json .}}"
  • sendSingle 设置为 true后,Kuiper 把传递给 sink 的 []map[string]interface{} 数据类型进行遍历解决,对于遍历过程中的每一条数据都会利用用户指定的数据模版
  • json 是 Kuiper 提供的函数(用户能够参考 Kuiper 扩大模版函数来理解更多的 Kuiper 扩大),能够将传入的参数转化为 JSON 字符串输入,对于遍历到的每一条数据,将 map 中的内容转换为 JSON 字符串

Golang 还内置提供了一些函数,用户能够参考更多 Golang 内置提供的函数来获取更多函数信息。

数据内容转换

还是针对上述例子,须要对返回的 t_av(平均温度)做一些转换,转换的根本要求就是依据不同的平均温度,退出不同的形容文字,用于指标 sink 中的解决。规定如下,

  • 当温度小于 30,形容字段为「Current temperature is$t_av, it’s normal.」
  • 当温度大于 30,形容字段为「Current temperature is$t_av, it’s high.」

假如指标 sink 还是须要 JSON 数据,该数据模版的内容如下,

...
"dataTemplate": "{\"device_id\": {{.device_id}}, \"description\": \"{{if lt .t_av 30.0}}Current temperature is {{.t_av}}, it's normal.\"{{else if ge .t_av 30.0}}Current temperature is {{.t_av}}, it's high.\"{{end}}}""sendSingle": true,

在上述的数据模版中,应用了 {{if pipeline}} T1 {{else if pipeline}} T0 {{end}} 的内置动作,看上去比较复杂,略微调整一下,去掉本义并退出缩进后排版如下(留神:在生成 Kuiper 规定的时候,不能传入以下优化后排版的规定)。

{"device_id": {{.device_id}}, "description": "
  {{if lt .t_av 30.0}}
    Current temperature is {{.t_av}}, it's normal."
  {{else if ge .t_av 30.0}}
    Current temperature is {{.t_av}}, it's high."
  {{end}}
}

应用了 Golang 内置的二元比拟函数,

  • lt:小于
  • ge:大于等于

值得注意的是,在 ltge 函数中,第二个参数值的类型应该与 map 中的数据理论的数据类型统一,否则会出错。如在上述的例子中,温度大于 30 的状况,因为 map 中理论平均数的类型为 float,因而第二个参数的值需传入 30.0,而不是 30

另外,模版还是利用到切片中每条记录上,所以还是须要将 sendSingle 属性设置为 true。最终该数据模版针对上述数据产生的内容如下,

{"device_id": 1, "description": "Current temperature is 36.25, it's high."}
{"device_id": 2, "description": "Current temperature is 27, it's normal."}

数据遍历

通过给 sink 的 sendSingle 属性设置为 true,能够实现把传递给 sink 的切片数据进行遍历。在此处,咱们将介绍一些更为简单的例子,比方在 sink 的后果中,蕴含了嵌套的数组类型的数据,如何通过在数据模版中提供的遍历性能,本人来实现遍历。

假如流入 sink 中的数据内容如下所示,

{"device_id":"1", 
 "values": [{"temperature": 10.5},
  {"temperature": 20.3},
  {"temperature": 30.3}
 ]
}

需要为,

  • 当发现 “values” 数组中某个 temperature 值小于等于 25 的时候,减少一个名为 description 的属性,将其值设置为 fine
  • 当发现 “values” 数组中某个 temperature 值大于 25 的时候,减少一个名为 description 的属性,将其值设置为 high
"sendSingle": true,
"dataTemplate": "{{$len := len .values}} {{$loopsize := add $len -1}} {\"device_id\": \"{{.device_id}}\", \"description\": [{{range $index, $ele := .values}} {{if le .temperature 25.0}}\"fine\"{{else if gt .temperature 25.0}}\"high\"{{end}} {{if eq $loopsize $index}}]{{else}},{{end}}{{end}}}"

该数据模板比较复杂,解释如下,

  • {{$len := len .values}} {{$loopsize := add $len -1}},这一段执行了两个表达式,第一个 len 函数获得数据中 values 的长度,第二个 add 将其值减 1 并赋值到变量 loopsize:因为 Golang 的表达式中目前还不反对间接将数值减 1 的操作,add 是 Kuiper 为实现该性能而扩大的函数。
  • {\"device_id\": \"{{.device_id}}\", \"description\": [ 这一段模版在作用到样例数据后,生成了 JSON 串 {"device_id": "1", "description": [
  • {{range $index, $ele := .values}} {{if le .temperature 25.0}}\"fine\"{{else if gt .temperature 25.0}}\"high\"{{end}} {{if eq $loopsize $index}}]{{else}},{{end}}{{end}},这一段模版看起来比较复杂,然而如果把它调整一下,去掉本义并退出缩进后排版如下,看起来可能会更加清晰(留神:在生成 Kuiper 规定的时候,不能传入以下优化后排版的规定)。

    {{range $index, $ele := .values}} 
      {{if le .temperature 25.0}}
        "fine"
      {{else if gt .temperature 25.0}}
        "high"
      {{end}} 
      {{if eq $loopsize $index}}
        ]
      {{else}}
        ,
      {{end}}
    {{end}}

    第一个条件判断生成是 fine 或者 high;第二个条件判断是生成分隔数组的 , 还是数组结尾的 ]

另外,模版还是利用到切片中每条记录上,所以还是须要将 sendSingle 属性设置为 true。最终该数据模版针对上述数据产生的内容如下,

  {"device_id": "1", "description": [ "fine" , "fine" , "high"]}

总结

通过 Kuiper 提供的数据模版性能能够实现对剖析后果的二次解决,以满足不同的 sink 指标的需要。然而读者也能够看到,因为 Golang 模版自身的限度,实现比较复杂的数据转换的时候会比拟蠢笨,心愿未来 Golang 模版的性能能够做得更加弱小和灵便,这样能够反对解决更加简单的需要。目前倡议用户能够通过数据模版来实现一些较为简单的数据的转换;如果用户须要对数据进行比较复杂的解决,并且本人扩大了 sink 的状况下,能够在 sink 的实现中间接进行解决。

另外,Kuiper 团队在布局未来反对自定义扩大 sink 中的模版函数,这样一些比较复杂的逻辑能够在函数外部实现,用户调用的时候只需一个简略的模版函数调用即可实现。

版权申明:本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.io/cn/blog/k…

正文完
 0