共计 1185 个字符,预计需要花费 3 分钟才能阅读完成。
es 实现聚合
es 通过 agg
实现聚合,详情可见 es 文档
有时候查询 es 数据的时候可能需要实现多字段 group by 的功能,例如:
SELECT sum(item_count) from A group by field1, field2, field3
要实现多个维度的聚合,需要嵌套的 agg
查询语句:
{"query": {}, | |
"aggs": { | |
"field1": { | |
"terms": { | |
"field": "field1", | |
"size": 2147483647 #设置一个大的分桶数,防止一次统计不完整 | |
}, | |
"aggs": { | |
"field2": { | |
"terms": { | |
"field": "field2", | |
"size": 2147483647 | |
}, | |
"aggs": { | |
"field3": { | |
"terms": { | |
"field": "field3", | |
"size": 2147483647 | |
}, | |
"aggs": { | |
"sum_field": { | |
"sum": {"field": "sum_field"} | |
} | |
} | |
} | |
} | |
} | |
} | |
} | |
}, | |
"size": 0 | |
} |
用函数构建聚合语句的 agg
部分:
def build_query_aggs(fields, sum_field): | |
agg_data = {} | |
curr_field = agg_data | |
for item in fields: | |
curr_field[item] = { | |
"terms": { | |
"field": item, | |
"size": 2147483647 | |
}, | |
"aggs": {}} | |
curr_field = curr_field[item]["aggs"] | |
curr_field[sum_field] = { | |
"sum": {"field": sum_field} | |
} | |
return agg_data |
处理得到的数据,将其组织成 list:
def build_es_aggs_data(data, fields, sum_field): | |
curr_field = None | |
res_data = [] | |
if len(fields) > 0: | |
curr_field = fields[0] | |
else: | |
return | |
curr_buckets = data[curr_field]['buckets'] | |
for item in curr_buckets: | |
if len(fields) == 1: | |
curr_data= {} | |
curr_data[curr_field] = item['key'] | |
curr_data[sum_field] = item[sum_field]["value"] | |
res_data.append(curr_data) | |
else: | |
pre_data = deepcopy(build_es_aggs_data(item, fields[1:], sum_field)) | |
for pre_item in pre_data: | |
pre_item[curr_field] = item['key'] | |
res_data.append(pre_item) | |
return res_data |
正文完
发表至:无分类
2019-07-16