关于mysql:logstash同步mysql数据到elasticsearch

34次阅读

共计 2017 个字符,预计需要花费 6 分钟才能阅读完成。

1. mysql 建表 test;

2. 装置 logstash(跟 es 版本统一)

# 下载
wget https://repo.huaweicloud.com/logstash/7.14.2/logstash-7.14.2-linux-x86_64.tar.gz
# 解压
tar -zxvf logstash-7.14.2-linux-x86_64.tar.gz
# 须要 mysql-connector-java-5.1.40.jar,轻易放到比方目录
# /var/lib/hadoop-hdfs/logstash-7.14.2/lib/mysql-connector-java-5.1.40.jar

3. 新建 es 索引test

curl -u elastic:changeme -X PUT  http://192.168.20.130:9200/test -H 'Content-Type: application/json' -d'{"settings": {"number_of_shards": 1,"number_of_replicas" : 1},
    "mappings" : {
        "properties": {
          "id": {"type" : "long"}, 
          "type": {"type": "keyword"}, 
          "keyword_1": {
            "type": "text",
            "analyzer" : "ik_smart"
          }, 
          "keyword_2": {
            "type": "text",
            "analyzer" : "ik_smart"
          },
          "keyword_3": {
            "type": "text",
            "analyzer" : "ik_smart"
          },
          "data": {"type": "keyword"},
          "created_at": {
            "type": "date",
            "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd"
          }, 
          "updated_at": {
            "type": "date",
            "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd"
          }
      }
    }
}'

4. 编辑配置文件,vim ~/script/logstash/logstash_mysql2es.conf

input {stdin{}
  jdbc{
    # 连贯的数据库地址和数据库,指定编码格局,禁用 ssl 协定,设定主动重连
    # 此处 10.112.103.2 为 MySQL 所在 IP 地址,也是 elastic search 所在 IP 地址
    jdbc_connection_string => "jdbc:mysql://192.168.13.28:3306/test?characterEncoding=UTF-8&useSSL=FALSE&autoReconnect=true"
    #数据库用户名
    jdbc_user => "root"
    # 数据库用户名对应的明码
    jdbc_password => "root"
    # jar 包寄存地位
    jdbc_driver_library => "/var/lib/hadoop-hdfs/logstash-7.14.2/lib/mysql-connector-java-5.1.40.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_default_timezone => "Asia/Shanghai"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "320000"
    lowercase_column_names => false
    statement => "select id, type, tags, title from test"
  }
}
filter {
    # 移除无关的字段
    mutate {remove_field => ["@version", "@timestamp"]
    }
}
output {
  elasticsearch {hosts => ["http://192.168.20.130:9200"]
    user => "elastic"
    password => "changeme"
    index => "test"
    document_type => "_doc"
    # 将字段 type 和 id 作为文档 id
    document_id => "%{type}_%{id}"
  }
  stdout {codec => json_lines}
}

重要配置参数阐明

  1. remove_field => ["@version", "@timestamp"]: 默认 logstash 会增加这两个字段,这里去掉;
  2. document_id => "%{type}_%{id}": 将两个字段拼接作为 es 的文档 id;

5. 启动工作

./logstash-7.14.2/bin/logstash -f script/logstash/logstash_mysql2es.conf

参考资料

  1. Jdbc input plugin
  2. Elasticsearch output plugin
  3. 通过 logstash 将 mysql 数据同步到 elastic search

正文完
 0