一、全量构建索引

ElasticSearch集群环境搭建好当前,首次须要全量地从关系型数据库中将指标待索引数据写入到ElasticSearch搜索引擎中,以下咱们将用到logstash的插件logstash-input-jdbc来全量同步数据。

  • 参考ElasticSearch本地集群搭建

1、Logstash管道构建全量索引

Logstash好比一个数据管道,通常一端连贯着关系型数据库来往管道中输出数据,一端连贯着ElasticSearch将管道中的数据输入。

下载Logstash

  • 返回官网下载Logstash,须要与ElasticSearch版本统一,兼容性更好。
  • 将下载的Logstash压缩包上传到服务器上,示例将Logstash装置在了与ElasticSearch同一台服务器上。

装置Logstash

#进入logstash压缩包所在目录cd /usr/local#加压logstash压缩包unzip logstash-7.8.0.zip

  • 装置logstash的插件logstash-input-jdbc
#进入logstash装置目录的bin目录下cd /usr/local/logstash-7.8.0/bin#装置插件,留神logstash依赖jdk,装置插件前请确保装置了jdk#以后logstash-7.8.0版本曾经默认装置了logstash-integration-jdbc,而此插件曾经蕴含logstash-input-jdbc./logstash-plugin install logstash-input-jdbc#查看logstash所有已装置的插件./logstash-plugin list

配置logstash

logstash须要与mysql通信并获取数据,于是须要筹备与mysql通信的驱动程序,须要筹备拉取数据所须要执行的sql语句,还须要logstash的输出、输入端点。

  • 创立相干目录及文件
cd /usr/local/logstash-7.8.0mkdir mysqlcd mysqltouch jdbc.conftouch jdbc.sql

  • mysql驱动程序,返回maven地方仓库获取
  • 获取数据的sql脚本(jdbc.sql)
SELECT    a.id,    a.NAME,    a.tags,    concat( a.latitude, ',', a.longitude ) AS location,    a.remark_score,    a.price_per_man,    a.category_id,    b.NAME AS category_name,    a.seller_id,    c.remark_score AS seller_remark_score,    c.disabled_flag AS seller_disabled_flag FROM    shop a    INNER JOIN category b ON a.category_id = b.id    INNER JOIN seller c ON c.id = a.seller_id
  • logstash输出、输入配置
input {    jdbc {      # mysql 数据库链接,dianping为数据库名      jdbc_connection_string => "jdbc:mysql://192.168.15.150:3306/dianping"      # 用户名和明码      jdbc_user => "root"      jdbc_password => "123456"      # 驱动      jdbc_driver_library => "/usr/local/logstash-7.8.0/mysql/mysql-connector-java-5.1.47.jar"      # 驱动类名      jdbc_driver_class => "com.mysql.jdbc.Driver"      jdbc_paging_enabled => "true"      jdbc_page_size => "50000"    # 执行的sql 文件门路+名称      statement_filepath => "/usr/local/logstash-7.8.0/mysql/jdbc.sql"      # 设置监听距离  各字段含意(由左至右)分、时、天、月、年,全副为*默认含意为每分钟都更新      schedule => "* * * * *"    }}output {    elasticsearch {      # ES的IP地址及端口        hosts => ["192.168.15.151:9200"]      # 索引名称        index => "shop"        document_type => "_doc"      # 自增ID 须要关联的数据库中有有一个id字段,对应索引的id号        document_id => "%{id}"    }    stdout {     # JSON格局输入        codec => json_lines    }}

启动logstash

#进入logstash装置目录cd /usr/local/logstash-7.8.0#后盾过程形式启动logstashnohup ./bin/logstash -f mysql/jdbc.conf &#查看logstash日志tail -f -n 100 ./nohup.out

阐明:从以上日志中看出logstash失常启动,并且每分钟会进行全量同步数据,日志中也打印出了执行的sql,首先会查个总数,为了后续的分页获取数据。

二、增量构建索引

1、Logstash管道构建增量索引

其实在以上Logstash管道构建全量索引的根底上,稍微批改即可实现基于时间轴的增量构建索引。仅仅须要批改jdbc.conf、新增一个记录时间的文件,批改jdbc.sql文件退出工夫条件即可。

获取数据的sql脚本(jdbc.sql)

SELECT    a.id,    a.NAME,    a.tags,    concat( a.latitude, ',', a.longitude ) AS location,    a.remark_score,    a.price_per_man,    a.category_id,    b.NAME AS category_name,    a.seller_id,    c.remark_score AS seller_remark_score,    c.disabled_flag AS seller_disabled_flag FROM    shop a    INNER JOIN category b ON a.category_id = b.id    INNER JOIN seller c ON c.id = a.seller_id WHERE    a.update_at > : sql_last_value     OR b.update_at > : sql_last_value     OR c.update_at > : sql_last_value

阐明:退出了工夫条件,管制抓取工夫窗口内的数据,增量更新。

logstash输出、输入配置

input {    jdbc {      #设置时区timezone      jdbc_default_timezone => "Asia/Shanghai"      # mysql 数据库链接,dianping为数据库名      jdbc_connection_string => "jdbc:mysql://192.168.15.150:3306/dianping"      # 用户名和明码      jdbc_user => "root"      jdbc_password => "910625"      # 驱动      jdbc_driver_library => "/usr/local/logstash-7.8.0/mysql/mysql-connector-java-5.1.47.jar"      # 驱动类名      jdbc_driver_class => "com.mysql.jdbc.Driver"      jdbc_paging_enabled => "true"      jdbc_page_size => "50000"      #记录上次运行元数据文件门路,如记录增量同步的工夫窗口      last_run_metadata_path => "/usr/local/logstash-7.8.0/mysql/last_value_meta"    # 执行的sql 文件门路+名称      statement_filepath => "/usr/local/logstash-7.8.0/mysql/jdbc.sql"      # 设置监听距离  各字段含意(由左至右)分、时、天、月、年,全副为*默认含意为每分钟都更新      schedule => "* * * * *"    }}output {    elasticsearch {      # ES的IP地址及端口        hosts => ["192.168.15.151:9200"]      # 索引名称        index => "shop"                document_type => "_doc"      # 自增ID 须要关联的数据库中有有一个id字段,对应索引的id号        document_id => "%{id}"    }    stdout {     # JSON格局输入        codec => json_lines    }}

阐明:减少了时区的配置,以及指定了记录上次运行元数据文件门路,如记录增量同步的工夫窗口。

新增last_value_meta文件,并写入一个初始工夫

2020-07-12 00:00:00

阐明:记录上次执行同步工作时的工夫,下次工作会持续更新这个工夫。

Logstash管道构建增量索引的毛病

这种以工夫窗口的形式进行数据同步,始终防止不了同步提早的问题,然而有时候业务上也是可能容许的,增量同步的工夫窗口须要更具业务来定。工夫窗口太短,数据更新太多,会呈现一次增量同步工作还没实现,下一次增量工作又开始了。