前言:

上一篇实际了通过Logstash同步MySQL的几张关联表到Elasticsearch中。为了实现同一种业务需要,嵌套文档在资源开销和查问速度上要优于父子文档(针对大量数据的状况)。所以以下就实际一下嵌套文档的根本应用和,以及Logstash如何同步一对多关系表到ElasticSearch的嵌套文档中。 

RESTful模仿:

以下以博客内容和博客评论为例,从映射创立,到增,删,改,查,聚合演示嵌套文档的应用办法,索引名 “blog_new”。

1. 创立映射PUT blog_new

{ "mappings": {    "properties": {        "title": {          "type": "text"        },        "body": {          "type": "text"        },        "tags": {          "type": "keyword"        },        "published_on": {          "type": "keyword"        },        "comments": {          "type": "nested",          "properties": {            "name": {              "type": "text"            },            "comment": {              "type": "text"            },            "age": {              "type": "short"            },            "rating": {              "type": "short"            },            "commented_on": {              "type": "text"            }          }        }      }  }}

2. 增加POST blog_new/blog/2

{  "title": "Hero",  "body": "Hero test body...",  "tags": ["Heros", "happy"],  "published_on": "6 Oct 2018",  "comments": [    {      "name": "steve",      "age": 24,      "rating": 18,      "comment": "Nice article..",      "commented_on": "3 Nov 2018"    }  ]}

3.  删除POST blog_new/blog/1/_update

{ "script": {    "lang": "painless",    "source": "ctx._source.comments.removeIf(it -> it.name == 'John');" }}

4. 批改POST blog_new/blog/2/_update

{  "script": {    "source": "for(e in ctx._source.comments){if (e.name == 'steve') {e.age = 25; e.comment= 'very very good article...';}}"   }}

5. 查问GET /blog_new/_search?pretty

{  "query": {    "bool": {      "must": [        {          "nested": {            "path": "comments",            "query": {              "bool": {                "must": [                  {                    "match": {                      "comments.name": "William"                    }                  },                  {                    "match": {                      "comments.age": 34                    }                  }                ]              }            }          }        }      ]    }  }}

6. 聚合GET blog_new/_search

{  "size": 0,  "aggs": {    "comm_aggs": {      "nested": {        "path": "comments"      },      "aggs": {        "min_age": {          "min": {            "field": "comments.age"          }        }      }    }  }}

Logstash同步:

同步到ES的嵌套文档和后面的父子文档就有点不一样了,这里只须要一个jdbc。合并次要是通过关联查问出后果,而后聚合导入到ElasticSearch中。以下还是以博客和评论为例,创立索引映射和其余MySQL表之类的就省略,间接看运行命令。

  1. 创立嵌套文档索引和映射能够用下面RESTful形式的映射创立进行批改,次要的是嵌套的类型是nested,执行配置前运行SQL查问成果如下。
  2. 配置同步代码
input {    stdin {}        jdbc {        jdbc_driver_library => "E:/2setsoft/1dev/logstash-7.8.0/mysqletc/mysql-connector-java-5.1.7-bin.jar"                jdbc_driver_class => "com.mysql.jdbc.Driver"                jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/community?characterEncoding=UTF-8&useSSL=false"                jdbc_user => root                jdbc_password => "root"                schedule => "*/1 * * * *"               statement => "SELECT community.id AS community_id, community.content, community.location, community.images, comment.content AS comment_content , comment.id AS comment_id FROM yiqi_comment comment LEFT JOIN yiqi_community community ON community.id = comment.community_id"   } } filter {        aggregate {        task_id => "%{community_id}"        code => "            map['id'] = event.get('community_id')            map['content'] = event.get('content')            map['location'] = event.get('location')            map['images'] = event.get('images')            map['comment_list'] ||=[]            map['comment'] ||=[]            if (event.get('comment_id') != nil)                if !(map['comment_list'].include? event.get('comment_id'))                      map['comment_list'] << event.get('comment_id')                            map['comment'] << {                        'comment_id' => event.get('comment_id'),                        'content' => event.get('comment_content')                    }                end            end            event.cancel()        "                push_previous_map_as_event => true        timeout => 5    }        json {        source => "message"        remove_field => ["message"]        #remove_field => ["message", "type", "@timestamp", "@version"]    }        mutate  {        #将不须要的JSON字段过滤,且不会被存入 ES 中        remove_field => ["tags", "@timestamp", "@version"]    }    } output {    stdout {        #codec => json_lines    }    elasticsearch {                hosts => ["127.0.0.1:9200"]        index => "test_nested_community_content"        document_id => "%{id}"   }}
  1. 运行命令开始同步
    bin\logstash -f mysql\mysql.conf

  1. 查问