如何通过Logstash同步多表关联数据至Elasticsearch

42次阅读

共计 1838 个字符,预计需要花费 5 分钟才能阅读完成。

如果你对 使用 Logstash 保持 Elasticsearch 与数据库同步 方案还不是很熟悉,建议先花点时间精读它。
上面的文章以单表同步场景为例,清楚讲述了如何通过 JDBC 同步数据至 ES,而对于实际开发中经常出现的多表关联同步并未提及,以下是我针对多表关联同步的趟坑过程希望对你有所帮助。

数据库表的约定原则

同步单表时我们对于表字段的约定:

  • 表中要有主键字段(如 id),最近变更时间字段(如 modification_time),软删除标记字段(如 is_deleted),以便 jdbc-input 数据采集的轮询 Job 可以识别出增量变动的数据。
  • 提示:jdbc input 轮询需要基于 modification_time 条件查询,所以给该字段加上索引。

多表关联同步方案

多表关联的情况下我们需要 JOIN 其他表查询得到结果,这个结果就是 ES 需要的打平后的宽表。ES 新的版本中也增加了 join 操作,但这事不是 ES 擅长的,我们选择交给更擅长的数据库处理,让 ES 只存储打平后的单层索引。

如果你理解单表同步而困惑多表关联同步的话,试着 将关联查询的复杂 SQL 想象 (定义) 为视图,是不是后续操作就跟单表没区别了!

我们来逐个看下多表关联的同步问题 (假设表 a 多对多关联表 b):

  • 单表的 id 字段绑定到 ES document 的_id,可以实现 ES 索引幂等性,不会出现 job 原因导致索引文档重复。那对于多表关联的情况呢,可以使用各表 id 的组合作为 document 的_id。如 SELECT:

    concat(a.id, '_', b.id) AS docid

    (如果你不关注幂等,也可以用_id 默认生成策略。)

  • 单表基于 modification_time 就可以识别出自上次轮询后新的变化数据,多表关联的情况呢也类似:

    (CASE WHEN a.modification_time > b.modification_time THEN a.modification_time ELSE     b.modification_time END) AS modification_time
  • 同理软删除字段 is_deleted 的处理逻辑:

    (CASE WHEN a.is_deleted=0 AND b.is_deleted=0 THEN 0 ELSE 1 END) AS is_deleted

    这样无论表 a 还是表 b 发生变更,都可以被 logstash 识别出来采集到。

如此我们就可以写出多表关联同步的 SQL 了,为了方便更新维护 SQL 及保持 logstash-jdbc 端 conf 配置文件的简洁,你可以把 SQL 定义成一张视图,conf 文件中的 SQL statement 可以像写单表处理一样了。

示例 conf:

input {
  jdbc {
    jdbc_driver_library => "../drivers/mysql-connector-java-8.0.16.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/es_db?serverTimezone=UTC"
    jdbc_user => "usr"
    jdbc_password => "pwd"
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM esview WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
  }
}
filter {
  mutate {copy => { "docid" => "[@metadata][_id]"}
    remove_field => ["docid", "@version", "unix_ts_in_secs"]
  }
}
output {
  elasticsearch {
      index => "test_idx"
      document_id => "%{[@metadata][_id]}"
  }
}

diboot 简单高效的轻代码开发框架 (求 star)

正文完
 0