一、全量构建索引
ElasticSearch集群环境搭建好当前,首次须要全量地从关系型数据库中将指标待索引数据写入到ElasticSearch搜索引擎中,以下咱们将用到logstash的插件logstash-input-jdbc来全量同步数据。
- 参考ElasticSearch本地集群搭建
1、Logstash管道构建全量索引
Logstash好比一个数据管道,通常一端连贯着关系型数据库来往管道中输出数据,一端连贯着ElasticSearch将管道中的数据输入。
下载Logstash
- 返回官网下载Logstash,须要与ElasticSearch版本统一,兼容性更好。
- 将下载的Logstash压缩包上传到服务器上,示例将Logstash装置在了与ElasticSearch同一台服务器上。
装置Logstash
#进入logstash压缩包所在目录cd /usr/local#加压logstash压缩包unzip logstash-7.8.0.zip
- 装置logstash的插件logstash-input-jdbc
#进入logstash装置目录的bin目录下cd /usr/local/logstash-7.8.0/bin#装置插件,留神logstash依赖jdk,装置插件前请确保装置了jdk#以后logstash-7.8.0版本曾经默认装置了logstash-integration-jdbc,而此插件曾经蕴含logstash-input-jdbc./logstash-plugin install logstash-input-jdbc#查看logstash所有已装置的插件./logstash-plugin list
配置logstash
logstash须要与mysql通信并获取数据,于是须要筹备与mysql通信的驱动程序,须要筹备拉取数据所须要执行的sql语句,还须要logstash的输出、输入端点。
- 创立相干目录及文件
cd /usr/local/logstash-7.8.0mkdir mysqlcd mysqltouch jdbc.conftouch jdbc.sql
- mysql驱动程序,返回maven地方仓库获取
- 获取数据的sql脚本(jdbc.sql)
SELECT a.id, a.NAME, a.tags, concat( a.latitude, ',', a.longitude ) AS location, a.remark_score, a.price_per_man, a.category_id, b.NAME AS category_name, a.seller_id, c.remark_score AS seller_remark_score, c.disabled_flag AS seller_disabled_flag FROM shop a INNER JOIN category b ON a.category_id = b.id INNER JOIN seller c ON c.id = a.seller_id
- logstash输出、输入配置
input { jdbc { # mysql 数据库链接,dianping为数据库名 jdbc_connection_string => "jdbc:mysql://192.168.15.150:3306/dianping" # 用户名和明码 jdbc_user => "root" jdbc_password => "123456" # 驱动 jdbc_driver_library => "/usr/local/logstash-7.8.0/mysql/mysql-connector-java-5.1.47.jar" # 驱动类名 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" # 执行的sql 文件门路+名称 statement_filepath => "/usr/local/logstash-7.8.0/mysql/jdbc.sql" # 设置监听距离 各字段含意(由左至右)分、时、天、月、年,全副为*默认含意为每分钟都更新 schedule => "* * * * *" }}output { elasticsearch { # ES的IP地址及端口 hosts => ["192.168.15.151:9200"] # 索引名称 index => "shop" document_type => "_doc" # 自增ID 须要关联的数据库中有有一个id字段,对应索引的id号 document_id => "%{id}" } stdout { # JSON格局输入 codec => json_lines }}
启动logstash
#进入logstash装置目录cd /usr/local/logstash-7.8.0#后盾过程形式启动logstashnohup ./bin/logstash -f mysql/jdbc.conf &#查看logstash日志tail -f -n 100 ./nohup.out
阐明:从以上日志中看出logstash失常启动,并且每分钟会进行全量同步数据,日志中也打印出了执行的sql,首先会查个总数,为了后续的分页获取数据。
二、增量构建索引
1、Logstash管道构建增量索引
其实在以上Logstash管道构建全量索引的根底上,稍微批改即可实现基于时间轴的增量构建索引。仅仅须要批改jdbc.conf、新增一个记录时间的文件,批改jdbc.sql文件退出工夫条件即可。
获取数据的sql脚本(jdbc.sql)
SELECT a.id, a.NAME, a.tags, concat( a.latitude, ',', a.longitude ) AS location, a.remark_score, a.price_per_man, a.category_id, b.NAME AS category_name, a.seller_id, c.remark_score AS seller_remark_score, c.disabled_flag AS seller_disabled_flag FROM shop a INNER JOIN category b ON a.category_id = b.id INNER JOIN seller c ON c.id = a.seller_id WHERE a.update_at > : sql_last_value OR b.update_at > : sql_last_value OR c.update_at > : sql_last_value
阐明:退出了工夫条件,管制抓取工夫窗口内的数据,增量更新。
logstash输出、输入配置
input { jdbc { #设置时区timezone jdbc_default_timezone => "Asia/Shanghai" # mysql 数据库链接,dianping为数据库名 jdbc_connection_string => "jdbc:mysql://192.168.15.150:3306/dianping" # 用户名和明码 jdbc_user => "root" jdbc_password => "910625" # 驱动 jdbc_driver_library => "/usr/local/logstash-7.8.0/mysql/mysql-connector-java-5.1.47.jar" # 驱动类名 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" #记录上次运行元数据文件门路,如记录增量同步的工夫窗口 last_run_metadata_path => "/usr/local/logstash-7.8.0/mysql/last_value_meta" # 执行的sql 文件门路+名称 statement_filepath => "/usr/local/logstash-7.8.0/mysql/jdbc.sql" # 设置监听距离 各字段含意(由左至右)分、时、天、月、年,全副为*默认含意为每分钟都更新 schedule => "* * * * *" }}output { elasticsearch { # ES的IP地址及端口 hosts => ["192.168.15.151:9200"] # 索引名称 index => "shop" document_type => "_doc" # 自增ID 须要关联的数据库中有有一个id字段,对应索引的id号 document_id => "%{id}" } stdout { # JSON格局输入 codec => json_lines }}
阐明:减少了时区的配置,以及指定了记录上次运行元数据文件门路,如记录增量同步的工夫窗口。
新增last_value_meta文件,并写入一个初始工夫
2020-07-12 00:00:00
阐明:记录上次执行同步工作时的工夫,下次工作会持续更新这个工夫。
Logstash管道构建增量索引的毛病
这种以工夫窗口的形式进行数据同步,始终防止不了同步提早的问题,然而有时候业务上也是可能容许的,增量同步的工夫窗口须要更具业务来定。工夫窗口太短,数据更新太多,会呈现一次增量同步工作还没实现,下一次增量工作又开始了。