共计 3922 个字符,预计需要花费 10 分钟才能阅读完成。
前言:
目前大部分业务开发中,ElasticSearch 次要还是用来做搜寻。而撑持搜寻性能的数据结构比拟繁多,不会有数据嵌套或者多种关联之类的。只管没有,然而有些小众需要可能还会有一对多查问的场景。为了实现和 MySQL 的 Join 相似的查问形式,以下以 ES 的父子文档形式贮存,并具体演示 Logstash 如何将 MySQL 的多张有关联的表同步到 ES 的父子文档。
手动演示:
以下以 restful 形式创立父子文档索引,并以简略的形式查问相似 join 的数据返回。上面所有演示的索引名称都为 “my_join_index”。
1. 创立父子关联索引 PUT my_join_index
{ | |
"mappings": { | |
"properties": { | |
"my_join_field": { | |
"type": "join", | |
"relations": {"question": "answer"} | |
} | |
} | |
} | |
} |
2. 创立父文档 PUT my_join_index/_doc/1?refresh
PUT my_join_index/_doc/1?refresh | |
{ | |
"text": "This is a question", | |
"my_join_field": "question" | |
} | |
PUT my_join_index/_doc/2?refresh | |
{ | |
"text": "This is another question", | |
"my_join_field": "question" | |
} |
3. 创立子文档 PUT my_join_index/_doc/3?routing=1&refresh
PUT my_join_index/_doc/3?routing=1&refresh | |
{ | |
"text": "This is an answer", | |
"my_join_field": { | |
"name": "answer", | |
"parent": "1" | |
} | |
} | |
PUT my_join_index/_doc/4?routing=1&refresh | |
{ | |
"text": "This is another answer2", | |
"my_join_field": { | |
"name": "answer", | |
"parent": "2" | |
} | |
} |
4. 全局检索 GET my_join_index/_search
GET my_join_index/_search | |
{ | |
"query": {"match_all": {} | |
}, | |
"sort": ["_id"] | |
} |
5. 依据父文档查找子文档 GET my_join_index/_search
GET my_join_index/_search | |
{ | |
"query": { | |
"has_parent" : { | |
"parent_type" : "question", | |
"query" : { | |
"match" : {"text" : "This is"} | |
} | |
} | |
} | |
} |
6. 依据子文档查找父文档 GET my_join_index/_search
GET my_join_index/_search | |
{ | |
"query": { | |
"has_child" : { | |
"type" : "answer", | |
"query" : { | |
"match" : {"text" : "This is question"} | |
} | |
} | |
} | |
} |
7. Join 聚合 GET my_join_index/_search
GET my_join_index/_search | |
{ | |
"query": { | |
"parent_id": { | |
"type": "answer", | |
"id": "1" | |
} | |
}, | |
"aggs": { | |
"parents": { | |
"terms": { | |
"field": "my_join_field#question", | |
"size": 10 | |
} | |
} | |
}, | |
"script_fields": { | |
"parent": { | |
"script": {"source": "doc['my_join_field#question']" | |
} | |
} | |
} | |
} |
8. 单条联结查问, 能够是一条父文档对应多个子文档 GET my_join_index/_search
GET my_join_index/_search | |
{ | |
"query": { | |
"bool": { | |
"must": [ | |
{ | |
"match": {"title": "历史圈"} | |
}, | |
{ | |
"has_child": { | |
"type": "answer", | |
"query": { | |
"match": {"text":"是的"} | |
}, | |
"inner_hits":{}} | |
} | |
] | |
} | |
} | |
} |
Logstash 同步:
以下以文章分类表和文章表为例,二者系一对多的关系。同步文档时,文章分类作为父文档,文章作为子文档,关联字段为“my_join_field”。
1. 创立有父子文档的索引 PUT hhyp_article
PUT hhyp_article | |
{ | |
"mappings": { | |
"properties": { | |
"my_join_field": { | |
"type": "join", | |
"relations": {"article_cate": "article"} | |
} | |
} | |
} | |
} |
2. 配置同步代码
input {stdin {} | |
jdbc { | |
# mysql 数据库链接,shop 为数据库名 | |
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/rebuild?characterEncoding=UTF-8&useSSL=false" | |
# 用户名和明码 | |
jdbc_user => "root" | |
jdbc_password => "root" | |
# 驱动 | |
jdbc_driver_library => "E:/2setsoft/1dev/logstash-7.8.0/mysqletc/mysql-connector-java-5.1.7-bin.jar" | |
# 驱动类名 | |
jdbc_driver_class => "com.mysql.jdbc.Driver" | |
jdbc_paging_enabled => "true" | |
jdbc_page_size => "50000" | |
parameters => {"number" => "200"} | |
statement => "SELECT * FROM `hhyp_article` WHERE delete_time = 0" | |
# 是否将字段名转换为小写,默认 true(如果有数据序列化、反序列化需要,倡议改为 false);lowercase_column_names => false | |
# Value can be any of: fatal,error,warn,info,debug,默认 info;sql_log_level => warn | |
# 设置监听距离 各字段含意(由左至右)分、时、天、月、年,全副为 * 默认含意为每分钟都更新 | |
schedule => "* * * * *" | |
# 索引类型 | |
type => "article" | |
} | |
jdbc { | |
# mysql 数据库链接,shop 为数据库名 | |
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/rebuild?characterEncoding=UTF-8&useSSL=false" | |
# 用户名和明码 | |
jdbc_user => "root" | |
jdbc_password => "root" | |
# 驱动 | |
jdbc_driver_library => "E:/2setsoft/1dev/logstash-7.8.0/mysqletc/mysql-connector-java-5.1.7-bin.jar" | |
# 驱动类名 | |
jdbc_driver_class => "com.mysql.jdbc.Driver" | |
jdbc_paging_enabled => "true" | |
jdbc_page_size => "50000" | |
parameters => {"number" => "200"} | |
statement => "SELECT * FROM `hhyp_article_cate` WHERE delete_time = 0" | |
# 是否将字段名转换为小写,默认 true(如果有数据序列化、反序列化需要,倡议改为 false);lowercase_column_names => false | |
# Value can be any of: fatal,error,warn,info,debug,默认 info;sql_log_level => warn | |
# 设置监听距离 各字段含意(由左至右)分、时、天、月、年,全副为 * 默认含意为每分钟都更新 | |
schedule => "* * * * *" | |
# 索引类型 | |
type => "article_cate" | |
} | |
} | |
filter {if [type]=="article_cate" { | |
mutate {add_field => { "my_join_field" => "article_cate"} | |
} | |
} | |
if [type]=="article" { | |
mutate {add_field => {"[my_join_field][name]" => "article"} | |
#catalog_id 子表的父 id | |
add_field => {"[my_join_field][parent]" => "%{cid}"} | |
} | |
} | |
} | |
output {if[type] == "article_cate" { | |
elasticsearch { | |
hosts => "localhost:9200" | |
index => "hhyp_article" | |
document_type => "_doc" | |
document_id => "%{id}" | |
} | |
} | |
if[type] == "article" { | |
elasticsearch { | |
hosts => "localhost:9200" | |
index => "hhyp_article" | |
document_type => "_doc" | |
document_id => "%{id}" | |
routing => "%{cid}" | |
} | |
} | |
stdout {codec => json_lines} | |
} |
3. 运行命令开始同步
bin\logstash -f mysql\mysql.conf
4. 通过搜寻父文档题目查问子文档数据
正文完
发表至: elasticsearch
2023-05-22