es实现聚合

es通过agg实现聚合,详情可见 es文档


有时候查询es数据的时候可能需要实现多字段group by的功能,例如:

SELECT sum(item_count) from A group by field1, field2, field3

要实现多个维度的聚合,需要嵌套的agg查询语句:

{    "query": {    },    "aggs": {        "field1": {            "terms": {                "field": "field1",                "size": 2147483647 #设置一个大的分桶数,防止一次统计不完整            },            "aggs": {                "field2": {                    "terms": {                        "field": "field2",                        "size": 2147483647                    },                    "aggs": {                        "field3": {                            "terms": {                                "field": "field3",                                "size": 2147483647                            },                            "aggs": {                                "sum_field": {                                    "sum": {                                        "field": "sum_field"                                    }                                }                            }                        }                    }                }            }        }    },    "size": 0}

用函数构建聚合语句的agg部分:

def build_query_aggs(fields, sum_field):    agg_data = {}    curr_field = agg_data    for item in fields:        curr_field[item] = {            "terms": {                "field": item,                "size": 2147483647            },            "aggs": {}        }        curr_field = curr_field[item]["aggs"]    curr_field[sum_field] = {        "sum": {                "field": sum_field            }    }    return agg_data

处理得到的数据,将其组织成list:

def build_es_aggs_data(data, fields, sum_field):    curr_field = None    res_data = []    if len(fields) > 0:        curr_field = fields[0]    else:        return    curr_buckets = data[curr_field]['buckets']    for item in curr_buckets:        if len(fields) == 1:            curr_data= {}            curr_data[curr_field] = item['key']            curr_data[sum_field] = item[sum_field]["value"]            res_data.append(curr_data)        else:            pre_data = deepcopy(build_es_aggs_data(item, fields[1:], sum_field))            for pre_item in pre_data:                pre_item[curr_field] = item['key']                res_data.append(pre_item)    return res_data