乐趣区

ElasticSearch全量或增量构建索引

一、全量构建索引

ElasticSearch 集群环境搭建好当前,首次须要全量地从关系型数据库中将指标待索引数据写入到 ElasticSearch 搜索引擎中,以下咱们将用到 logstash 的插件 logstash-input-jdbc 来全量同步数据。

  • 参考 ElasticSearch 本地集群搭建

1、Logstash 管道构建全量索引

Logstash 好比一个数据管道,通常一端连贯着关系型数据库来往管道中输出数据,一端连贯着 ElasticSearch 将管道中的数据输入。

下载 Logstash

  • 返回官网下载 Logstash,须要与 ElasticSearch 版本统一,兼容性更好。
  • 将下载的 Logstash 压缩包上传到服务器上,示例将 Logstash 装置在了与 ElasticSearch 同一台服务器上。

装置 Logstash

# 进入 logstash 压缩包所在目录
cd /usr/local

#加压 logstash 压缩包
unzip logstash-7.8.0.zip

  • 装置 logstash 的插件 logstash-input-jdbc
# 进入 logstash 装置目录的 bin 目录下
cd /usr/local/logstash-7.8.0/bin

#装置插件,留神 logstash 依赖 jdk,装置插件前请确保装置了 jdk
#以后 logstash-7.8.0 版本曾经默认装置了 logstash-integration-jdbc,而此插件曾经蕴含 logstash-input-jdbc
./logstash-plugin install logstash-input-jdbc

#查看 logstash 所有已装置的插件
./logstash-plugin list

配置 logstash

logstash 须要与 mysql 通信并获取数据,于是须要筹备与 mysql 通信的驱动程序,须要筹备拉取数据所须要执行的 sql 语句,还须要 logstash 的输出、输入端点。

  • 创立相干目录及文件
cd /usr/local/logstash-7.8.0
mkdir mysql
cd mysql
touch jdbc.conf
touch jdbc.sql

  • mysql 驱动程序,返回 maven 地方仓库获取
  • 获取数据的 sql 脚本(jdbc.sql)
SELECT
    a.id,
    a.NAME,
    a.tags,
    concat(a.latitude, ',', a.longitude) AS location,
    a.remark_score,
    a.price_per_man,
    a.category_id,
    b.NAME AS category_name,
    a.seller_id,
    c.remark_score AS seller_remark_score,
    c.disabled_flag AS seller_disabled_flag 
FROM
    shop a
    INNER JOIN category b ON a.category_id = b.id
    INNER JOIN seller c ON c.id = a.seller_id
  • logstash 输出、输入配置
input {
    jdbc {
      # mysql 数据库链接,dianping 为数据库名
      jdbc_connection_string => "jdbc:mysql://192.168.15.150:3306/dianping"
      # 用户名和明码
      jdbc_user => "root"
      jdbc_password => "123456"
      # 驱动
      jdbc_driver_library => "/usr/local/logstash-7.8.0/mysql/mysql-connector-java-5.1.47.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
    # 执行的 sql 文件门路 + 名称
      statement_filepath => "/usr/local/logstash-7.8.0/mysql/jdbc.sql"
      # 设置监听距离  各字段含意(由左至右)分、时、天、月、年,全副为 * 默认含意为每分钟都更新
      schedule => "* * * * *"
    }
}


output {
    elasticsearch {
      # ES 的 IP 地址及端口
        hosts => ["192.168.15.151:9200"]
      # 索引名称
        index => "shop"
        document_type => "_doc"
      # 自增 ID 须要关联的数据库中有有一个 id 字段,对应索引的 id 号
        document_id => "%{id}"
    }
    stdout {
     # JSON 格局输入
        codec => json_lines
    }
}

启动 logstash

# 进入 logstash 装置目录
cd /usr/local/logstash-7.8.0

#后盾过程形式启动 logstash
nohup ./bin/logstash -f mysql/jdbc.conf &

#查看 logstash 日志
tail -f -n 100 ./nohup.out

阐明:从以上日志中看出 logstash 失常启动,并且每分钟会进行全量同步数据,日志中也打印出了执行的 sql,首先会查个总数,为了后续的分页获取数据。

二、增量构建索引

1、Logstash 管道构建增量索引

其实在以上 Logstash 管道构建全量索引的根底上,稍微批改即可实现基于时间轴的增量构建索引。仅仅须要批改 jdbc.conf、新增一个记录时间的文件,批改 jdbc.sql 文件退出工夫条件即可。

获取数据的 sql 脚本(jdbc.sql)

SELECT
    a.id,
    a.NAME,
    a.tags,
    concat(a.latitude, ',', a.longitude) AS location,
    a.remark_score,
    a.price_per_man,
    a.category_id,
    b.NAME AS category_name,
    a.seller_id,
    c.remark_score AS seller_remark_score,
    c.disabled_flag AS seller_disabled_flag 
FROM
    shop a
    INNER JOIN category b ON a.category_id = b.id
    INNER JOIN seller c ON c.id = a.seller_id 
WHERE
    a.update_at > : sql_last_value 
    OR b.update_at > : sql_last_value 
    OR c.update_at > : sql_last_value

阐明:退出了工夫条件,管制抓取工夫窗口内的数据,增量更新。

logstash 输出、输入配置

input {
    jdbc {
      #设置时区 timezone
      jdbc_default_timezone => "Asia/Shanghai"
      # mysql 数据库链接,dianping 为数据库名
      jdbc_connection_string => "jdbc:mysql://192.168.15.150:3306/dianping"
      # 用户名和明码
      jdbc_user => "root"
      jdbc_password => "910625"
      # 驱动
      jdbc_driver_library => "/usr/local/logstash-7.8.0/mysql/mysql-connector-java-5.1.47.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      #记录上次运行元数据文件门路,如记录增量同步的工夫窗口
      last_run_metadata_path => "/usr/local/logstash-7.8.0/mysql/last_value_meta"
    # 执行的 sql 文件门路 + 名称
      statement_filepath => "/usr/local/logstash-7.8.0/mysql/jdbc.sql"
      # 设置监听距离  各字段含意(由左至右)分、时、天、月、年,全副为 * 默认含意为每分钟都更新
      schedule => "* * * * *"
    }
}

output {
    elasticsearch {
      # ES 的 IP 地址及端口
        hosts => ["192.168.15.151:9200"]
      # 索引名称
        index => "shop"
                document_type => "_doc"
      # 自增 ID 须要关联的数据库中有有一个 id 字段,对应索引的 id 号
        document_id => "%{id}"
    }
    stdout {
     # JSON 格局输入
        codec => json_lines
    }
}

阐明:减少了时区的配置,以及指定了记录上次运行元数据文件门路,如记录增量同步的工夫窗口。

新增 last_value_meta 文件,并写入一个初始工夫

2020-07-12 00:00:00

阐明:记录上次执行同步工作时的工夫,下次工作会持续更新这个工夫。

Logstash 管道构建增量索引的毛病

这种以工夫窗口的形式进行数据同步,始终防止不了同步提早的问题,然而有时候业务上也是可能容许的,增量同步的工夫窗口须要更具业务来定。工夫窗口太短,数据更新太多,会呈现一次增量同步工作还没实现,下一次增量工作又开始了。

退出移动版