乐趣区

关于elasticsearch:Elasticsearch数据同步嵌套文档关联

前言:

上一篇实际了通过 Logstash 同步 MySQL 的几张关联表到 Elasticsearch 中。为了实现同一种业务需要,嵌套文档在资源开销和查问速度上要优于父子文档(针对大量数据的状况)。所以以下就实际一下嵌套文档的根本应用和,以及 Logstash 如何同步一对多关系表到 ElasticSearch 的嵌套文档中。

RESTful 模仿:

以下以博客内容和博客评论为例,从映射创立,到增,删,改,查,聚合演示嵌套文档的应用办法,索引名“blog_new”。

1. 创立映射 PUT blog_new

{
 "mappings": {
    "properties": {
        "title": {"type": "text"},
        "body": {"type": "text"},
        "tags": {"type": "keyword"},
        "published_on": {"type": "keyword"},
        "comments": {
          "type": "nested",
          "properties": {
            "name": {"type": "text"},
            "comment": {"type": "text"},
            "age": {"type": "short"},
            "rating": {"type": "short"},
            "commented_on": {"type": "text"}
          }
        }
      }
  }
}

2. 增加 POST blog_new/blog/2

{
  "title": "Hero",
  "body": "Hero test body...",
  "tags": ["Heros", "happy"],
  "published_on": "6 Oct 2018",
  "comments": [
    {
      "name": "steve",
      "age": 24,
      "rating": 18,
      "comment": "Nice article..",
      "commented_on": "3 Nov 2018"
    }
  ]
}

3.  删除 POST blog_new/blog/1/_update

{
 "script": {
    "lang": "painless",
    "source": "ctx._source.comments.removeIf(it -> it.name =='John');"
 }
}

4. 批改 POST blog_new/blog/2/_update

{
  "script": {"source": "for(e in ctx._source.comments){if (e.name =='steve') {e.age = 25; e.comment='very very good article...';}}" 
  }
}

5. 查问 GET /blog_new/_search?pretty

{
  "query": {
    "bool": {
      "must": [
        {
          "nested": {
            "path": "comments",
            "query": {
              "bool": {
                "must": [
                  {
                    "match": {"comments.name": "William"}
                  },
                  {
                    "match": {"comments.age": 34}
                  }
                ]
              }
            }
          }
        }
      ]
    }
  }
}

6. 聚合 GET blog_new/_search

{
  "size": 0,
  "aggs": {
    "comm_aggs": {
      "nested": {"path": "comments"},
      "aggs": {
        "min_age": {
          "min": {"field": "comments.age"}
        }
      }
    }
  }
}

Logstash 同步:

同步到 ES 的嵌套文档和后面的父子文档就有点不一样了,这里只须要一个 jdbc。合并次要是通过关联查问出后果,而后聚合导入到 ElasticSearch 中。以下还是以博客和评论为例,创立索引映射和其余 MySQL 表之类的就省略,间接看运行命令。

  1. 创立嵌套文档索引和映射能够用下面 RESTful 形式的映射创立进行批改,次要的是嵌套的类型是 nested,执行配置前运行 SQL 查问成果如下。
  2. 配置同步代码
input {stdin {}
    
    jdbc {
        jdbc_driver_library => "E:/2setsoft/1dev/logstash-7.8.0/mysqletc/mysql-connector-java-5.1.7-bin.jar"
        
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        
        jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/community?characterEncoding=UTF-8&useSSL=false"
        
        jdbc_user => root
        
        jdbc_password => "root"
        
        schedule => "*/1 * * * *"
       
        statement => "SELECT community.id AS community_id, community.content, community.location, community.images, comment.content AS comment_content , comment.id AS comment_id FROM yiqi_comment comment LEFT JOIN yiqi_community community ON community.id = comment.community_id"
   }
 
}
 
filter {
    
    aggregate {task_id => "%{community_id}"
        code => "map['id'] = event.get('community_id')
            map['content'] = event.get('content')
            map['location'] = event.get('location')
            map['images'] = event.get('images')
            map['comment_list'] ||=[]
            map['comment'] ||=[]
            if (event.get('comment_id') != nil)
                if !(map['comment_list'].include? event.get('comment_id'))  
                    map['comment_list'] << event.get('comment_id')        
                    map['comment'] << {'comment_id' => event.get('comment_id'),
                        'content' => event.get('comment_content')
                    }
                end
            end
            event.cancel()
        "
        
        push_previous_map_as_event => true
        timeout => 5
    }
    
    json {
        source => "message"
        remove_field => ["message"]
        #remove_field => ["message", "type", "@timestamp", "@version"]
    }
    
    mutate  {
        #将不须要的 JSON 字段过滤,且不会被存入 ES 中
        remove_field => ["tags", "@timestamp", "@version"]
    }
    
}
 
output {
    stdout {#codec => json_lines}
    elasticsearch {hosts => ["127.0.0.1:9200"]
        index => "test_nested_community_content"
        document_id => "%{id}"
   }
}
  1. 运行命令开始同步
    bin\logstash -f mysql\mysql.conf
  1. 查问
退出移动版