一、全量构建索引
ElasticSearch集群环境搭建好当前,首次须要全量地从关系型数据库中将指标待索引数据写入到ElasticSearch搜索引擎中,以下咱们将用到logstash的插件logstash-input-jdbc来全量同步数据。
- 参考ElasticSearch本地集群搭建
1、Logstash管道构建全量索引
Logstash好比一个数据管道,通常一端连贯着关系型数据库来往管道中输出数据,一端连贯着ElasticSearch将管道中的数据输入。
下载Logstash
- 返回官网下载Logstash,须要与ElasticSearch版本统一,兼容性更好。
- 将下载的Logstash压缩包上传到服务器上,示例将Logstash装置在了与ElasticSearch同一台服务器上。
装置Logstash
#进入logstash压缩包所在目录
cd /usr/local
#加压logstash压缩包
unzip logstash-7.8.0.zip
- 装置logstash的插件logstash-input-jdbc
#进入logstash装置目录的bin目录下
cd /usr/local/logstash-7.8.0/bin
#装置插件,留神logstash依赖jdk,装置插件前请确保装置了jdk
#以后logstash-7.8.0版本曾经默认装置了logstash-integration-jdbc,而此插件曾经蕴含logstash-input-jdbc
./logstash-plugin install logstash-input-jdbc
#查看logstash所有已装置的插件
./logstash-plugin list
配置logstash
logstash须要与mysql通信并获取数据,于是须要筹备与mysql通信的驱动程序,须要筹备拉取数据所须要执行的sql语句,还须要logstash的输出、输入端点。
- 创立相干目录及文件
cd /usr/local/logstash-7.8.0
mkdir mysql
cd mysql
touch jdbc.conf
touch jdbc.sql
- mysql驱动程序,返回maven地方仓库获取
- 获取数据的sql脚本(jdbc.sql)
SELECT
a.id,
a.NAME,
a.tags,
concat( a.latitude, ',', a.longitude ) AS location,
a.remark_score,
a.price_per_man,
a.category_id,
b.NAME AS category_name,
a.seller_id,
c.remark_score AS seller_remark_score,
c.disabled_flag AS seller_disabled_flag
FROM
shop a
INNER JOIN category b ON a.category_id = b.id
INNER JOIN seller c ON c.id = a.seller_id
- logstash输出、输入配置
input {
jdbc {
# mysql 数据库链接,dianping为数据库名
jdbc_connection_string => "jdbc:mysql://192.168.15.150:3306/dianping"
# 用户名和明码
jdbc_user => "root"
jdbc_password => "123456"
# 驱动
jdbc_driver_library => "/usr/local/logstash-7.8.0/mysql/mysql-connector-java-5.1.47.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 执行的sql 文件门路+名称
statement_filepath => "/usr/local/logstash-7.8.0/mysql/jdbc.sql"
# 设置监听距离 各字段含意(由左至右)分、时、天、月、年,全副为*默认含意为每分钟都更新
schedule => "* * * * *"
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["192.168.15.151:9200"]
# 索引名称
index => "shop"
document_type => "_doc"
# 自增ID 须要关联的数据库中有有一个id字段,对应索引的id号
document_id => "%{id}"
}
stdout {
# JSON格局输入
codec => json_lines
}
}
启动logstash
#进入logstash装置目录
cd /usr/local/logstash-7.8.0
#后盾过程形式启动logstash
nohup ./bin/logstash -f mysql/jdbc.conf &
#查看logstash日志
tail -f -n 100 ./nohup.out
阐明:从以上日志中看出logstash失常启动,并且每分钟会进行全量同步数据,日志中也打印出了执行的sql,首先会查个总数,为了后续的分页获取数据。
二、增量构建索引
1、Logstash管道构建增量索引
其实在以上Logstash管道构建全量索引的根底上,稍微批改即可实现基于时间轴的增量构建索引。仅仅须要批改jdbc.conf、新增一个记录时间的文件,批改jdbc.sql文件退出工夫条件即可。
获取数据的sql脚本(jdbc.sql)
SELECT
a.id,
a.NAME,
a.tags,
concat( a.latitude, ',', a.longitude ) AS location,
a.remark_score,
a.price_per_man,
a.category_id,
b.NAME AS category_name,
a.seller_id,
c.remark_score AS seller_remark_score,
c.disabled_flag AS seller_disabled_flag
FROM
shop a
INNER JOIN category b ON a.category_id = b.id
INNER JOIN seller c ON c.id = a.seller_id
WHERE
a.update_at > : sql_last_value
OR b.update_at > : sql_last_value
OR c.update_at > : sql_last_value
阐明:退出了工夫条件,管制抓取工夫窗口内的数据,增量更新。
logstash输出、输入配置
input {
jdbc {
#设置时区timezone
jdbc_default_timezone => "Asia/Shanghai"
# mysql 数据库链接,dianping为数据库名
jdbc_connection_string => "jdbc:mysql://192.168.15.150:3306/dianping"
# 用户名和明码
jdbc_user => "root"
jdbc_password => "910625"
# 驱动
jdbc_driver_library => "/usr/local/logstash-7.8.0/mysql/mysql-connector-java-5.1.47.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#记录上次运行元数据文件门路,如记录增量同步的工夫窗口
last_run_metadata_path => "/usr/local/logstash-7.8.0/mysql/last_value_meta"
# 执行的sql 文件门路+名称
statement_filepath => "/usr/local/logstash-7.8.0/mysql/jdbc.sql"
# 设置监听距离 各字段含意(由左至右)分、时、天、月、年,全副为*默认含意为每分钟都更新
schedule => "* * * * *"
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["192.168.15.151:9200"]
# 索引名称
index => "shop"
document_type => "_doc"
# 自增ID 须要关联的数据库中有有一个id字段,对应索引的id号
document_id => "%{id}"
}
stdout {
# JSON格局输入
codec => json_lines
}
}
阐明:减少了时区的配置,以及指定了记录上次运行元数据文件门路,如记录增量同步的工夫窗口。
新增last_value_meta文件,并写入一个初始工夫
2020-07-12 00:00:00
阐明:记录上次执行同步工作时的工夫,下次工作会持续更新这个工夫。
Logstash管道构建增量索引的毛病
这种以工夫窗口的形式进行数据同步,始终防止不了同步提早的问题,然而有时候业务上也是可能容许的,增量同步的工夫窗口须要更具业务来定。工夫窗口太短,数据更新太多,会呈现一次增量同步工作还没实现,下一次增量工作又开始了。
发表回复