前言:
上一篇实际了通过Logstash同步MySQL的几张关联表到Elasticsearch中。为了实现同一种业务需要,嵌套文档在资源开销和查问速度上要优于父子文档(针对大量数据的状况)。所以以下就实际一下嵌套文档的根本应用和,以及Logstash如何同步一对多关系表到ElasticSearch的嵌套文档中。
RESTful模仿:
以下以博客内容和博客评论为例,从映射创立,到增,删,改,查,聚合演示嵌套文档的应用办法,索引名 “blog_new”。
1. 创立映射PUT blog_new
{ "mappings": { "properties": { "title": { "type": "text" }, "body": { "type": "text" }, "tags": { "type": "keyword" }, "published_on": { "type": "keyword" }, "comments": { "type": "nested", "properties": { "name": { "type": "text" }, "comment": { "type": "text" }, "age": { "type": "short" }, "rating": { "type": "short" }, "commented_on": { "type": "text" } } } } }}
2. 增加POST blog_new/blog/2
{ "title": "Hero", "body": "Hero test body...", "tags": ["Heros", "happy"], "published_on": "6 Oct 2018", "comments": [ { "name": "steve", "age": 24, "rating": 18, "comment": "Nice article..", "commented_on": "3 Nov 2018" } ]}
3. 删除POST blog_new/blog/1/_update
{ "script": { "lang": "painless", "source": "ctx._source.comments.removeIf(it -> it.name == 'John');" }}
4. 批改POST blog_new/blog/2/_update
{ "script": { "source": "for(e in ctx._source.comments){if (e.name == 'steve') {e.age = 25; e.comment= 'very very good article...';}}" }}
5. 查问GET /blog_new/_search?pretty
{ "query": { "bool": { "must": [ { "nested": { "path": "comments", "query": { "bool": { "must": [ { "match": { "comments.name": "William" } }, { "match": { "comments.age": 34 } } ] } } } } ] } }}
6. 聚合GET blog_new/_search
{ "size": 0, "aggs": { "comm_aggs": { "nested": { "path": "comments" }, "aggs": { "min_age": { "min": { "field": "comments.age" } } } } }}
Logstash同步:
同步到ES的嵌套文档和后面的父子文档就有点不一样了,这里只须要一个jdbc。合并次要是通过关联查问出后果,而后聚合导入到ElasticSearch中。以下还是以博客和评论为例,创立索引映射和其余MySQL表之类的就省略,间接看运行命令。
- 创立嵌套文档索引和映射能够用下面RESTful形式的映射创立进行批改,次要的是嵌套的类型是nested,执行配置前运行SQL查问成果如下。
- 配置同步代码
input { stdin {} jdbc { jdbc_driver_library => "E:/2setsoft/1dev/logstash-7.8.0/mysqletc/mysql-connector-java-5.1.7-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/community?characterEncoding=UTF-8&useSSL=false" jdbc_user => root jdbc_password => "root" schedule => "*/1 * * * *" statement => "SELECT community.id AS community_id, community.content, community.location, community.images, comment.content AS comment_content , comment.id AS comment_id FROM yiqi_comment comment LEFT JOIN yiqi_community community ON community.id = comment.community_id" } } filter { aggregate { task_id => "%{community_id}" code => " map['id'] = event.get('community_id') map['content'] = event.get('content') map['location'] = event.get('location') map['images'] = event.get('images') map['comment_list'] ||=[] map['comment'] ||=[] if (event.get('comment_id') != nil) if !(map['comment_list'].include? event.get('comment_id')) map['comment_list'] << event.get('comment_id') map['comment'] << { 'comment_id' => event.get('comment_id'), 'content' => event.get('comment_content') } end end event.cancel() " push_previous_map_as_event => true timeout => 5 } json { source => "message" remove_field => ["message"] #remove_field => ["message", "type", "@timestamp", "@version"] } mutate { #将不须要的JSON字段过滤,且不会被存入 ES 中 remove_field => ["tags", "@timestamp", "@version"] } } output { stdout { #codec => json_lines } elasticsearch { hosts => ["127.0.0.1:9200"] index => "test_nested_community_content" document_id => "%{id}" }}
- 运行命令开始同步
bin\logstash -f mysql\mysql.conf
- 查问