共计 2917 个字符,预计需要花费 8 分钟才能阅读完成。
目前在做的监控项目中有个对 es 的聚合查询的需求, 需要用 go 语言实现,
需求就是查询某个 IP 在一个时间范围内,各个监控指标取时间单位内的平均值。有点拗口,如下是 es 的查询语句,可以很明显的看到是要聚合 cpu 和 mem 两个 field。另外,时区必须要加上,否则少 8 小时,你懂的。
GET /monitor_hf/v1/_search
{
“query”: {
“bool”: {
“must”: [
{
“term”: {
“ip”: {
“value”: “192.168.1.100”
}
}
},
{
“range”: {
“datatime”: {
“gte”: 1545634340000,
“lte”: “now”
}
}
}
]
}
},
“size”: 0,
“aggs”: {
“AVG_Metric”: {
“date_histogram”: {
“field”: “datatime”,
“interval”: “day”,
“time_zone”: “Asia/Shanghai” ,
“format”: “yyyy-MM-dd HH:mm:ss”
},
“aggs”: {
“avg_mem”: {
“avg”: {
“field”: “mem”
}
},”avg_cpu”:{
“avg”: {
“field”: “cpu”
}
}
}
}
}
}
单条数据的内容大概是这样:datatime 是毫秒级的时间戳,索引的 mapping 一定要是 date 类型,否则不能做聚合。
{
“cpu”:5,
“mem”:77088,
“datatime”:1545702661000,
“ip”:”192.168.1.100″
}
查询出来的结果呢,长这个样子:可以看到,在 buckets 中查询出了 3 条数据,几个字段的意思:key_as_string 与 key:format 后的时间和毫秒时间戳 doc_count:聚合了多少的 docavg_mem 和 avg_cpu:查询语句中定义的别名。
{
“took”: 4,
“timed_out”: false,
“_shards”: {
“total”: 5,
“successful”: 5,
“skipped”: 0,
“failed”: 0
},
“hits”: {
“total”: 2477,
“max_score”: 0,
“hits”: []
},
“aggregations”: {
“AVG_Metric”: {
“buckets”: [
{
“key_as_string”: “2018-12-24 00:00:00”,
“key”: 1545580800000,
“doc_count”: 402,
“avg_mem”: {
“value”: 71208.1592039801
},
“avg_cpu”: {
“value”: 4.338308457711443
}
},
{
“key_as_string”: “2018-12-25 00:00:00”,
“key”: 1545667200000,
“doc_count”: 1258,
“avg_mem”: {
“value”: 77958.16852146263
},
“avg_cpu”: {
“value”: 4.639904610492846
}
},
{
“key_as_string”: “2018-12-26 00:00:00”,
“key”: 1545753600000,
“doc_count”: 817,
“avg_mem”: {
“value”: 86570.06609547124
},
“avg_cpu”: {
“value”: 4.975520195838433
}
}
]
}
}
}
下面使用 go 语言的 es 客户端“github.com/olivere/elastic”来实现相同的查询需求,我用的是 v6 版本:
package main
import (
“github.com/olivere/elastic”
“context”
“encoding/json”
“fmt”
)
type Aggregations struct {
AVG_Metric AVG_Metric `json:”AVG_Metric”`
}
type AVG_Metric struct {
Buckets []Metric `json:”buckets”`
}
type Metric struct {
Key int64 `json:”key”`
Doc_count int64 `json:”doc_count”`
Avg_mem Value `json:”avg_mem”`
Avg_cpu Value `json:”avg_cpu”`
}
type Value struct {
Value float64 `json:”value”`
}
var client, _ = elastic.NewSimpleClient(elastic.SetURL(“http://127.0.0.1:9200”))
func main() {
// 指定 ip 和时间范围
boolSearch := elastic.NewBoolQuery().
Filter(elastic.NewTermsQuery(“ip”, “192.168.1.100”),elastic.NewRangeQuery(“datatime”).
Gte(1545634340000).Lte(“now”))
// 需要聚合的指标
mem := elastic.NewAvgAggregation().Field(“mem”)
cpu := elastic.NewAvgAggregation().Field(“cpu”)
// 单位时间和指定字段
aggs := elastic.NewDateHistogramAggregation().
Interval(“day”).
Field(“datatime”).
TimeZone(“Asia/Shanghai”).
SubAggregation(“avg_mem”, mem).
SubAggregation(“avg_cpu”, cpu)
// 查询语句
result, _ := client.Search().
Index(“monitor_hf”).
Type(“v1”).
Query(boolSearch).
Size(0).
Aggregation(“AVG_Metric”, aggs).
Do(context.Background())
// 结果输出:
// 第一种方式
var m *[]Metric
term, _ := result.Aggregations.Terms(“AVG_Metric”)
for _, bucket := range term.Aggregations {
b, _ := bucket.MarshalJSON()
//fmt.Println(string(b))
json.Unmarshal(b, &m)
for _, v := range *m {
fmt.Println(v)
}
}
// 第二种方式
var a *Aggregations
b, _ := json.Marshal(result.Aggregations)
//fmt.Println(string(b))
json.Unmarshal(b, &a)
for _, v := range a.AVG_Metric.Buckets {
fmt.Println(v)
}
}
关于聚合结果的输出,遇到了些问题,网上也搜索不到,官方也没找到相应的例子。不过总算试出两个可行的,比较推荐第一种,可以少定义两个结构体。