一、全量构建索引
ElasticSearch 集群环境搭建好当前,首次须要全量地从关系型数据库中将指标待索引数据写入到 ElasticSearch 搜索引擎中,以下咱们将用到 logstash 的插件 logstash-input-jdbc 来全量同步数据。
- 参考 ElasticSearch 本地集群搭建
1、Logstash 管道构建全量索引
Logstash 好比一个数据管道,通常一端连贯着关系型数据库来往管道中输出数据,一端连贯着 ElasticSearch 将管道中的数据输入。
下载 Logstash
- 返回官网下载 Logstash,须要与 ElasticSearch 版本统一,兼容性更好。
- 将下载的 Logstash 压缩包上传到服务器上,示例将 Logstash 装置在了与 ElasticSearch 同一台服务器上。
装置 Logstash
# 进入 logstash 压缩包所在目录
cd /usr/local
#加压 logstash 压缩包
unzip logstash-7.8.0.zip
- 装置 logstash 的插件 logstash-input-jdbc
# 进入 logstash 装置目录的 bin 目录下
cd /usr/local/logstash-7.8.0/bin
#装置插件,留神 logstash 依赖 jdk,装置插件前请确保装置了 jdk
#以后 logstash-7.8.0 版本曾经默认装置了 logstash-integration-jdbc,而此插件曾经蕴含 logstash-input-jdbc
./logstash-plugin install logstash-input-jdbc
#查看 logstash 所有已装置的插件
./logstash-plugin list
配置 logstash
logstash 须要与 mysql 通信并获取数据,于是须要筹备与 mysql 通信的驱动程序,须要筹备拉取数据所须要执行的 sql 语句,还须要 logstash 的输出、输入端点。
- 创立相干目录及文件
cd /usr/local/logstash-7.8.0
mkdir mysql
cd mysql
touch jdbc.conf
touch jdbc.sql
- mysql 驱动程序,返回 maven 地方仓库获取
- 获取数据的 sql 脚本(jdbc.sql)
SELECT
a.id,
a.NAME,
a.tags,
concat(a.latitude, ',', a.longitude) AS location,
a.remark_score,
a.price_per_man,
a.category_id,
b.NAME AS category_name,
a.seller_id,
c.remark_score AS seller_remark_score,
c.disabled_flag AS seller_disabled_flag
FROM
shop a
INNER JOIN category b ON a.category_id = b.id
INNER JOIN seller c ON c.id = a.seller_id
- logstash 输出、输入配置
input {
jdbc {
# mysql 数据库链接,dianping 为数据库名
jdbc_connection_string => "jdbc:mysql://192.168.15.150:3306/dianping"
# 用户名和明码
jdbc_user => "root"
jdbc_password => "123456"
# 驱动
jdbc_driver_library => "/usr/local/logstash-7.8.0/mysql/mysql-connector-java-5.1.47.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 执行的 sql 文件门路 + 名称
statement_filepath => "/usr/local/logstash-7.8.0/mysql/jdbc.sql"
# 设置监听距离 各字段含意(由左至右)分、时、天、月、年,全副为 * 默认含意为每分钟都更新
schedule => "* * * * *"
}
}
output {
elasticsearch {
# ES 的 IP 地址及端口
hosts => ["192.168.15.151:9200"]
# 索引名称
index => "shop"
document_type => "_doc"
# 自增 ID 须要关联的数据库中有有一个 id 字段,对应索引的 id 号
document_id => "%{id}"
}
stdout {
# JSON 格局输入
codec => json_lines
}
}
启动 logstash
# 进入 logstash 装置目录
cd /usr/local/logstash-7.8.0
#后盾过程形式启动 logstash
nohup ./bin/logstash -f mysql/jdbc.conf &
#查看 logstash 日志
tail -f -n 100 ./nohup.out
阐明:从以上日志中看出 logstash 失常启动,并且每分钟会进行全量同步数据,日志中也打印出了执行的 sql,首先会查个总数,为了后续的分页获取数据。
二、增量构建索引
1、Logstash 管道构建增量索引
其实在以上 Logstash 管道构建全量索引的根底上,稍微批改即可实现基于时间轴的增量构建索引。仅仅须要批改 jdbc.conf、新增一个记录时间的文件,批改 jdbc.sql 文件退出工夫条件即可。
获取数据的 sql 脚本(jdbc.sql)
SELECT
a.id,
a.NAME,
a.tags,
concat(a.latitude, ',', a.longitude) AS location,
a.remark_score,
a.price_per_man,
a.category_id,
b.NAME AS category_name,
a.seller_id,
c.remark_score AS seller_remark_score,
c.disabled_flag AS seller_disabled_flag
FROM
shop a
INNER JOIN category b ON a.category_id = b.id
INNER JOIN seller c ON c.id = a.seller_id
WHERE
a.update_at > : sql_last_value
OR b.update_at > : sql_last_value
OR c.update_at > : sql_last_value
阐明:退出了工夫条件,管制抓取工夫窗口内的数据,增量更新。
logstash 输出、输入配置
input {
jdbc {
#设置时区 timezone
jdbc_default_timezone => "Asia/Shanghai"
# mysql 数据库链接,dianping 为数据库名
jdbc_connection_string => "jdbc:mysql://192.168.15.150:3306/dianping"
# 用户名和明码
jdbc_user => "root"
jdbc_password => "910625"
# 驱动
jdbc_driver_library => "/usr/local/logstash-7.8.0/mysql/mysql-connector-java-5.1.47.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#记录上次运行元数据文件门路,如记录增量同步的工夫窗口
last_run_metadata_path => "/usr/local/logstash-7.8.0/mysql/last_value_meta"
# 执行的 sql 文件门路 + 名称
statement_filepath => "/usr/local/logstash-7.8.0/mysql/jdbc.sql"
# 设置监听距离 各字段含意(由左至右)分、时、天、月、年,全副为 * 默认含意为每分钟都更新
schedule => "* * * * *"
}
}
output {
elasticsearch {
# ES 的 IP 地址及端口
hosts => ["192.168.15.151:9200"]
# 索引名称
index => "shop"
document_type => "_doc"
# 自增 ID 须要关联的数据库中有有一个 id 字段,对应索引的 id 号
document_id => "%{id}"
}
stdout {
# JSON 格局输入
codec => json_lines
}
}
阐明:减少了时区的配置,以及指定了记录上次运行元数据文件门路,如记录增量同步的工夫窗口。
新增 last_value_meta 文件,并写入一个初始工夫
2020-07-12 00:00:00
阐明:记录上次执行同步工作时的工夫,下次工作会持续更新这个工夫。
Logstash 管道构建增量索引的毛病
这种以工夫窗口的形式进行数据同步,始终防止不了同步提早的问题,然而有时候业务上也是可能容许的,增量同步的工夫窗口须要更具业务来定。工夫窗口太短,数据更新太多,会呈现一次增量同步工作还没实现,下一次增量工作又开始了。