关于数据同步:开源数据集成平台SeaTunnelMySQL实时同步到es

56次阅读

共计 2344 个字符,预计需要花费 6 分钟才能阅读完成。

一、前言

  • 最近,我的项目有几个表要从 MySQL 实时同步到 另一个 MySQL,也有同步到 ElasticSearch 的。
  • 目前,公司生产环境同步,用的是 阿里云的 DTS,每个同步工作每月 500 多元,有点小贵。
  • 其余环境:MySQL 同步到 ES,用的是 CloudCanal,不反对 数据转换,增加同步字段比拟麻烦,社区版限度 5 个工作,不够用;MySQL 同步到 MySQL,用的是 debezium,不反对写入 ES。
  • 恰好 3 年前用过 SeaTunnel 的 前身 WaterDrop,那就开始吧。本文以 2.3.1 版本,Ubuntu 零碎为例

二、开源数据集成平台 SeaTunnel

1. 简介

  • SeaTunnel 是 Apache 软件基金会下的一个高性能开源大数据集成工具,为数据集成场景提供灵便易用、易扩大并反对千亿级数据集成的解决方案。
  • Seaunnel 为实时 (CDC) 和批量数据提供高性能数据同步能力,反对十种以上数据源,曾经在 B 站、腾讯云、字节等数百家公司应用。
  • 能够抉择 SeaTunnel Zeta 引擎上运行,也能够在 Apache Flink 或 Spark 引擎上运行。

2. 装置

  • 下载,这里抉择 2.3.1 版本,执行 tar -xzvf apache-seatunnel-*.tar.gz 解压缩
  • 因为 2.3.2 版本,MySQL-CDC 找不到驱动,bug 修复详见

    Caused by: java.sql.SQLException: No suitable driver
          at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
          at com.zaxxer.hikari.util.DriverDataSource.<init>(DriverDataSource.java:106)
          ... 20 more
    
          ... 11 more
    
          at org.apache.seatunnel.engine.client.job.ClientJobProxy.waitForJobComplete(ClientJobProxy.java:122)
          at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:181)

3. 装置 connectors 插件

  • 执行 bash bin/install-plugin.sh,国内倡议先配置 maven 镜像,不然容易失败 或者 慢
  • 官网文档写着执行 sh bin/install-plugin.sh,我在 Ubuntu 20.04.2 LTS 上执行报错(bin/install-plugin.sh: 54: Bad substitution),我提了 PR

4. 编写配置文件

  • config 目录下,新建配置文件:如 mysql-es-test.conf
  • 增加 env 配置
    因为是 实时同步,这里 job.mode = “STREAMING”,execution.parallelism 是 并发数

    env {
    # You can set flink configuration here
    execution.parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 2000
    #execution.checkpoint.interval = 10000
    #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
    }
  • MySQL 实时同步,需开启 binlog
  • 增加 数据源 配置
    result_table_name 取个 长期表名,便于后续应用。table-names 必须是 数据库. 表名,base-url 必须指定 数据库。
    startup.mode 默认是 INITIAL,先同步历史数据,后增量同步,详情点击

    source {
    MySQL-CDC {
      result_table_name = "t1"
      server-id = 5656
      username = "root"
      password = "pwd"
      table-names = ["db.t1"]
      base-url = "jdbc:mysql://host:3306/db"
    }
    }
  • 增加 转换 配置,sql 比拟灵便。
    函数列表请点击

    transform {
    Sql {
      source_table_name = "t1"
      query = "SELECT id, alias_name aliasName FROM t1 WHERE c1 ='1'"
    }
    }
  • 增加 输入 配置
    CDC 实时同步 es,必须配置 primary_keys

    sink {
      Elasticsearch {hosts = ["host:9200"]
          username = "elastic"
          password = "pwd"
    
          index = "index_t1"
          # cdc required options
          primary_keys = ["id"]
      }
    }
  • 最终配置截图

5. 启动工作

这里以 本地模式为例,另有 集群、spark、flink 模式。

./bin/seatunnel.sh -e local --config ./config/mysql-es-test.conf

三、总结

  • 开源数据集成平台 SeaTunnel 可能比拟不便的进行 MySQL 实时同步到 es 等,收费,还不便增加 同步字段。更多弱小性能,请看官网文档。
  • 新版本自带 同步引擎,不必依赖 spark、flink 等运行,升高了 小数据量同步场景 部署复杂度
  • 新版本开始提供 UI 界面,目前强依赖 调度平台 Apache DolphinScheduler

本文恪守【CC BY-NC】协定,转载请保留原文出处及本版权申明,否则将查究法律责任。
本文首先公布于 https://www.890808.xyz/,其余平台须要审核更新慢一些。

正文完
 0