目前在做的监控项目中有个对es的聚合查询的需求,需要用go语言实现,需求就是查询某个IP在一个时间范围内,各个监控指标取时间单位内的平均值。有点拗口,如下是es的查询语句,可以很明显的看到是要聚合cpu和mem两个field。另外,时区必须要加上,否则少8小时,你懂的。GET /monitor_hf/v1/_search{ “query”: { “bool”: { “must”: [ { “term”: { “ip”: { “value”: “192.168.1.100” } } }, { “range”: { “datatime”: { “gte”: 1545634340000, “lte”: “now” } } } ] } }, “size”: 0, “aggs”: { “AVG_Metric”: { “date_histogram”: { “field”: “datatime”, “interval”: “day”, “time_zone”: “Asia/Shanghai” , “format”: “yyyy-MM-dd HH:mm:ss” }, “aggs”: { “avg_mem”: { “avg”: { “field”: “mem” } },“avg_cpu”:{ “avg”: { “field”: “cpu” } } } } }}单条数据的内容大概是这样:datatime是毫秒级的时间戳,索引的mapping一定要是date类型,否则不能做聚合。{ “cpu”:5, “mem”:77088, “datatime”:1545702661000, “ip”:“192.168.1.100”}查询出来的结果呢,长这个样子:可以看到,在buckets中查询出了3条数据,几个字段的意思:key_as_string与key:format后的时间和毫秒时间戳doc_count:聚合了多少的docavg_mem和avg_cpu:查询语句中定义的别名。{ “took”: 4, “timed_out”: false, “_shards”: { “total”: 5, “successful”: 5, “skipped”: 0, “failed”: 0 }, “hits”: { “total”: 2477, “max_score”: 0, “hits”: [] }, “aggregations”: { “AVG_Metric”: { “buckets”: [ { “key_as_string”: “2018-12-24 00:00:00”, “key”: 1545580800000, “doc_count”: 402, “avg_mem”: { “value”: 71208.1592039801 }, “avg_cpu”: { “value”: 4.338308457711443 } }, { “key_as_string”: “2018-12-25 00:00:00”, “key”: 1545667200000, “doc_count”: 1258, “avg_mem”: { “value”: 77958.16852146263 }, “avg_cpu”: { “value”: 4.639904610492846 } }, { “key_as_string”: “2018-12-26 00:00:00”, “key”: 1545753600000, “doc_count”: 817, “avg_mem”: { “value”: 86570.06609547124 }, “avg_cpu”: { “value”: 4.975520195838433 } } ] } }}下面使用go语言的es客户端“github.com/olivere/elastic”来实现相同的查询需求,我用的是v6版本:package mainimport ( “github.com/olivere/elastic” “context” “encoding/json” “fmt”)type Aggregations struct { AVG_Metric AVG_Metric json:"AVG_Metric"}type AVG_Metric struct { Buckets []Metric json:"buckets"}type Metric struct { Key int64 json:"key" Doc_count int64 json:"doc_count" Avg_mem Value json:"avg_mem" Avg_cpu Value json:"avg_cpu"}type Value struct { Value float64 json:"value"}var client, _ = elastic.NewSimpleClient(elastic.SetURL(“http://127.0.0.1:9200”))func main() { //指定ip和时间范围 boolSearch := elastic.NewBoolQuery(). Filter(elastic.NewTermsQuery(“ip”, “192.168.1.100”),elastic.NewRangeQuery(“datatime”). Gte(1545634340000).Lte(“now”)) //需要聚合的指标 mem := elastic.NewAvgAggregation().Field(“mem”) cpu := elastic.NewAvgAggregation().Field(“cpu”) //单位时间和指定字段 aggs := elastic.NewDateHistogramAggregation(). Interval(“day”). Field(“datatime”). TimeZone(“Asia/Shanghai”). SubAggregation(“avg_mem”, mem). SubAggregation(“avg_cpu”, cpu) //查询语句 result, _ := client.Search(). Index(“monitor_hf”). Type(“v1”). Query(boolSearch). Size(0). Aggregation(“AVG_Metric”, aggs). Do(context.Background()) //结果输出: //第一种方式 var m *[]Metric term, _ := result.Aggregations.Terms(“AVG_Metric”) for _, bucket := range term.Aggregations { b, _ := bucket.MarshalJSON() //fmt.Println(string(b)) json.Unmarshal(b, &m) for _, v := range *m { fmt.Println(v) } } //第二种方式 var a *Aggregations b, _ := json.Marshal(result.Aggregations) //fmt.Println(string(b)) json.Unmarshal(b, &a) for _, v := range a.AVG_Metric.Buckets { fmt.Println(v) }}关于聚合结果的输出,遇到了些问题,网上也搜索不到,官方也没找到相应的例子。不过总算试出两个可行的,比较推荐第一种,可以少定义两个结构体。