关于spring-cloud:使用DebeziumPostgres和Kafka进行数据实时采集CDC

33次阅读

共计 13354 个字符,预计需要花费 34 分钟才能阅读完成。

1. 背景

始终在欠缺本人的微服务架构,其中蕴含分布式工作流服务的建设,目前采纳的是 Camunda 工作流引擎。应用 Camunda 工作流,就会波及到工作流引擎的用户体系如何与现有用户体系集成的问题(Flowable、Activity 也相似)。现有设计中,工作流定位偏重于企业外部流程的流转,因而零碎中设计了单位、部门、人员以及人事归属与 Camunda 工作流用户体系对应。

功能设计实现,就面临另外一个问题,如何解决现有人事体系数据如何【实时】同步至 Camunda 工作流引擎中。如果现有体系数据与工作流数据在同一个库中,绝对比拟好解决。而微服务架构中,不同服务的数据通常寄存在不同数据库中,那么就须要进行数据的同步。采纳的形式不同,能够获得的成果也雷同。

最后思考如下两种计划,然而都略感有余:

  • ETL:应用 ETL 工具进行数据同步是典型的形式,能够抉择工具也比拟多。开源的 ETL 工具增量同步问题解决的并不现实,不应用增量同步数那么数据同步始终存在时间差;商业的 ETL 工具增量同步解决的比拟好,然而宏大且低廉。
  • 音讯队列:音讯队列是多系统集成广泛采纳的形式,能够很好的解决数据同步的实时问题。然而数据同步的两端都须要本人编写代码,一端写生产代码一端写生产代码,生产端代码还要捆绑现有体系数据所有操作,须要的编写量比拟大。

查问比照的大量的材料,最终抉择了 Debezimu 来解决以上问题以及将来更多数据同步的问题。

2. Debezium 介绍

RedHat 开源的 Debezium 是一个将多种数据源实时变更数据捕捉,造成数据流输入的开源工具。
它是一种 CDC(Change Data Capture)工具,工作原理相似大家所熟知的 Canal, DataBus, Maxwell 等,是通过抽取数据库日志来获取变更的。
官网介绍为:

Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable and fast, so your apps can respond quickly and never miss an event, even when things go wrong

Debezium 是一个分布式平台,它将您现有的数据库转换为事件流,因而应用程序能够看到数据库中的每一个行级更改并立刻做出响应。Debezium 构建在 Apache Kafka 之上,并提供 Kafka 连贯兼容的连接器来监督特定的数据库管理系统。

Debezium 当初已反对以下数据库:

  • MySQL
  • MongoDB
  • PostgreSQL
  • Oracle
  • SQL Server
  • Db2
  • Cassandra
  • Vitess

与 ETL 不同,Debezimu 只反对在生产端连贯数据库,生产端不反对连贯数据库,而是须要本人编写代码接管 Kafka 音讯数据。剖析下来这种形式更加灵便,还能够很好利用现有微服务架构中的 Kafka。

3. 疾速搭建 Debezimu 测试环境。

目前,Debezium 最新的 Stable 版本是 1.6。Debezium 曾经把要用到的 Component 打包成了 Docker 的 Image,因而,咱们只须要装置并启动 Docker 后就能够按上面的步骤疾速搭建测试环境了。

如何在 Windows 下搭建 Docker 环境,能够参考集体的相干文章:

(1)Windows 10 2004 (20H1) 装置 Docker Desktop for Windows (2.3.0.2) 以 WSL 2 形式运行容器

(2)Windows 10 将 Docker Desktop for Windows(WSL 2 形式)文件存储移出 C 盘搁置到其它目录

3.1 运行 Zookeeper

docker run -it --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.6

3.2 运行 Kafka

docker run -it --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.6

3.3 运行 PostgreSQL

docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:1.6

下面代码中应用的是:debezium/example-postgres:1.6,查看 Debezimu 官网文档以及其它示例都是这个。实际上 Debezimu 对 PostgreSQL 9~13 都进行了 Docker 封装,能够依据本人的须要在 Docker Hub 中选择响应的 PostgreSQL 版本。

debezium/postgres 很小,应用也比拟不便,而且也进行了必要的设置,无须再进行额定的配置就能够间接应用。

3.4 运行 Debezimu Connect

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link postgres:postgres debezium/connect:1.6

Debezium 的 container 启动时须要传入如下环境变量:

  • GROUP_ID: 分组 ID,若须要启动多个 Debezium 的实例组成集群,那么它们的 GROUP_ID 必须被设置为一样
  • CONFIG_STORAGE_TOPIC:上面须要调用 Debezium 提供的 RestFUL API 治理 connector,connector 的信息就是保留在 CONFIG_STORAGE_TOPIC 指定的 kafka topic 下。
  • OFFSET_STORAGE_TOPIC: connector 监控数据流的 offset,若咱们应用的是 PostgreSQL Connector,那么 OFFSET_STORAGE_TOPIC 指定的 topic 中存的就是 PostgreSQL 的 lsn。

3.5 创立 Connector

通过下面 4 个步骤后,Debezium 的测试环境就搭建好了,当初须要调用 Debezium 提供的 API 创立 connector,使 Debezium 与数据库之间建设关系。咱们把上面的 payload POST 到http://<ip addr of debezium>:8083/connectors/

{
  "name": "fulfillment-connector",  
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
    "database.hostname": "192.168.99.100", 
    "database.port": "5432", 
    "database.user": "postgres", 
    "database.password": "postgres", 
    "database.dbname" : "postgres", 
    "database.server.name": "fulfillment", 
    "table.include.list": "public.inventory"
  }
}
  1. “name”:注册到 Kafka Connect 服务的 Connector 名称
  2. “connector.class”:PostgreSQL connector class 名称
  3. “database.hostname”:PostgreSQL 数据库地址
  4. “database.port”:PostgreSQL 数据库端口
  5. “database.user”:PostgreSQL 数据库用户名
  6. “database.password”:PostgreSQL 数据明码
  7. “database.dbname”:连贯的 PostgreSQL 数据库
  8. “database.server.name”:虚构的数据库 Server 名称,能够依据理论需要定义,生产 Kafka 数据时要应用该值
  9. “table.include.list”:监听的数据表列表,以 ”,” 宰割。PostgreSQL 要将表名写全,格局 ”<schema-name>.<table-name>”。如果没有特定的 Schema,那么就是默认的public

上面为实现的 curl 命令:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name":"fulfillment-connector","config": {"connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.hostname":"192.168.99.100","database.port":"5432","database.user":"postgres","database.password":"postgres","database.dbname":"postgres","database.server.name":"fulfillment","table.include.list":"public.inventory"}}'

下面是示例,因为应用的是 Windows,集体感觉 curl 不不便,换用 postman:

3.6 Docker Compose 配置

为了方便使用,将以上 Docker 命令整合为 Docker Compose 配置,具体如下:

version: "3"
services:
  postgres:
    image: debezium/postgres:13
    container_name: postgres
    hostname: postgres
    environment:
      POSTGRES_USER: herodotus
      POSTGRES_PASSWORD: herodotus
    ports:
      - 5432:5432

  zookeeper:
    image: debezium/zookeeper:1.6
    container_name: zookeeper
    restart: always
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888

  kafka:
    image: debezium/kafka:1.6
    container_name: kafka
    restart: always
    ports:
      - 9092:9092
    environment:
      ZOOKEEPER_CONNECT: zookeeper:2181
      BOOTSTRAP_SERVERS: kafka:9092
    depends_on:
      - zookeeper

  connect:
    image: debezium/connect:1.6
    container_name: connect
    restart: always
    ports:
      - 8083:8083
    environment:
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: herodotus_connect_configs
      OFFSET_STORAGE_TOPIC: herodotus_connect_offsets
      STATUS_STORAGE_TOPIC: herodotus_connect_statuses
      BOOTSTRAP_SERVERS: kafka:9092
    depends_on:
      - kafka

4. 内部数据库配置

上一章节,介绍了 Debezimu 测试环境的形式,其中应用的 debezium/postgres 是曾经进行过配置的,所以应用起来比拟不便。在理论应用过程中,很多时候是应用独立搭建 PostgreSQL,那么就须要对 PostgreSQL 进行配置。

4.1 以 Docker 的形式运行根底组件

本章节次要介绍 Debezimu 与独立的 PostgreSQL 数据库连贯,因而除了 PostgreSQL 以外,Zookeeper、Kafka、Debezimu Connect 仍旧应用 Docker 形式部署。具体部署的 Docker Compose 配置如下:

version: "3"
services:
  zookeeper:
    image: debezium/zookeeper:1.6
    container_name: zookeeper
    hostname: zookeeper
    environment:
      ZOOKEEPER_SERVER_ID: 1
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888

  kafka:
    image: debezium/kafka:1.6
    container_name: kafka
    hostname: kafka
    ports:
      - 9092:9092
    environment:
      BROKER_ID: 1
      ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: LISTENER_INNER://kafka:29092,LISTENER_OUTER://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: LISTENER_INNER://kafka:29092,LISTENER_OUTER://192.168.101.10:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_INNER:PLAINTEXT,LISTENER_OUTER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_INNER
      KAFKA_ALLOW_PLAINTEXT_LISTENER: 'yes'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
    depends_on:
      - zookeeper

  connect:
    image: debezium/connect:1.6
    container_name: connect
    hostname: connect
    ports:
      - 8083:8083
    environment:
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: herodotus_connect_configs
      OFFSET_STORAGE_TOPIC: herodotus_connect_offsets
      STATUS_STORAGE_TOPIC: herodotus_connect_statuses
      BOOTSTRAP_SERVERS: kafka:9092
    depends_on:
      - kafka

其中 Kafka Listener 相干的配置,是为了解决 Spring Kafka 连贯 Kafka 会呈现:Connection to node -1 could not be established. Broker may not be available.问题。

4.2 批改 PostgreSQL 配置

Logical Decoding性能是 PostgreSQL 在 9.4 退出的,它是一种机制,容许提取提交到事务日志的更改,并在输入插件的帮忙下以用户敌对的形式解决这些更改。输入插件使客户机可能应用更改。

PostgreSQL connector 读取和解决数据库变动次要蕴含两个局部:

  • Logical Decoding 输入插件:依据抉择可能须要装置输入插件。运行 PostgreSQL 服务之前,必须配置 replication slot 来启用你所抉择的输入插件,有以下几个输入插件供选择:

    • decoderbufs 是基于 Protobuf 的,目前由 Debezimu 社区保护
    • wal2json 是基于 JSON 的,目前由 wal2json 社区保护
    • pgoutput在 PostgreSQL 10 及以上版本中是规范的 Logical Decoding 输入插件。是由 PostgreSQL 社区保护,由 PostgreSQL 本人用于 Logical Replication。这个插件是内置装置的,所以不须要额定装置。
  • Java 代码(就是连贯 Kafka Connect 的代码):负责读取由 Logical Decoding 输入插件产生的数据。
  • Logical Decoding 输入插件不反对 DDL 变更,这意味着 Connector 不能把 DDL 变更事件发送给消费者
  • Logical Decoding Replicaiton Slots 反对数据库的 primary 服务器。因而如果是 PostgreSQL 服务的集群,Connector 只能在 primary 服务器激活。如果 primary 服务器呈现问题,那么 connector 就会停掉。

4.2.1 批改 PostgreSQL 配置

在 ${PostgreSQL_HOME}/13/data 目录下,找到postgresql.conf

批改以下配置:

wal_level=logical
max_wal_senders=1
max_replication_slots=1
  • wal_level 告诉数据库应用 logical decoding 读取预写日志
  • max_wal_senders 告诉数据库独立解决 WAL 变更的独立过程数量
  • max_replication_slots 告诉数据库解决 WAL 变更流所容许最大 replication slots 数目

配置实现后记得重启数据库

4.2.2 设置数据库权限

须要给 PostgreSQL 用户调配 replication 权限。定义一个 PostgreSQL role,至多 调配 REPLICATIONLOGION两项权限,示例代码如下:

CREATE ROLE <name> REPLICATION LOGIN;

具体操作能够参考一下脚本:

-- pg 新建用户
CREATE USER user WITH PASSWORD 'pwd';

-- 给用户复制流权限
ALTER ROLE user replication;

-- 给用户登录数据库权限
grant CONNECT ON DATABASE test to user;

-- 把以后库 public 下所有表查问权限赋给用户
GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;

4.3 创立 Connector

把上面的 payload POST 到http://<ip addr of debezium>:8083/connectors/

{
    "name": "herodotus-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
        "database.hostname": "192.168.101.10", 
        "database.port": "15432", 
        "database.user": "athena", 
        "database.password": "athena", 
        "database.dbname" : "athena", 
        "database.server.name": "herodotus",
        "slot.name": "herodotus_slot",
        "table.include.list": "public.sys_organization",
        "publication.name": "herodotus_public_connector",
        "publication.autocreate.mode": "filtered",
        "plugin.name": "pgoutput"
    }
}

postman 界面操作如下图:

payload 有两个字段,name 是 connector 的名字,config 是 connector 的配置信息,下表为 config 中的字段的解释:

字段名称 阐明
connector.class connector 的实现类,本文应用的是 io.debezium.connector.postgresql.PostgresConnector,因为咱们的数据库是 PostgreSQL
database.hostname 数据库服务的 IP 或域名
database.port 数据库服务的端口
database.user 连贯数据库的用户
database.password 连贯数据库的明码
database.dbname 数据库名
database.server.name 每个被监控的表在 Kafka 都会对应一个 topic,topic 的命名标准是 <database.server.name>.<schema>.<table>
slot.name PostgreSQL 的复制槽 (Replication Slot) 名称
table.include.list 如果设置了 table.include.list,即在该 list 中的表才会被 Debezium 监控
plugin.name PostgreSQL 服务端装置的解码插件名称,能够是 decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming 和 pgoutput。如果不指定该值,则默认应用 decoderbufs。<br/><br/> 本例子中应用了 pgoutput,因为它是 PostgreSQL 10+ 自带的解码器,而其余解码器都必须在 PostgreSQL 服务器装置插件。
publication.name PostgreSQL 端的 WAL 公布 (publication) 名称,每个 Connector 都应该在 PostgreSQL 有本人对应的 publication,如果不指定该参数,那么 publication 的名称为 dbz_publication
publication.autocreate.mode 该值在 plugin.name 设置为 pgoutput 才会无效。有以下三个值:<br/><br/>all_tables – debezium 会查看 publication 是否存在,如果 publication 不存在,connector 则应用脚本 CREATE PUBLICATION <publication_name> FOR ALL TABLES 创立 publication,即该发布者会监控所有表的变更状况。<br/><br/>disabled – connector 不会查看有无 publication 存在,如果 publication 不存在,则在创立 connector 会报错.<br/><br/>filtered – 与 all_tables 不同的是,debezium 会依据 connector 的配置中的 table.include.list 生成生成创立 publication 的脚本:CREATE PUBLICATION <publication_name> FOR TABLE <tbl1, tbl2, tbl3>。例如,本例子中,“table.include.list” 值为 ”public.sys_organization”,则 publication 只会监控这个表的变更状况。

上面联合本例子中 connector 的配置信息对几个重点属性进行进一步阐明:

Slot.name 重点阐明

依照上例 Debezium 会在 PostgreSQL 创立一个名为 herodotus_slot 的复制槽,本例中创立的 connector 须要通过该复制槽获取数据变更的信息。

能够通过以下 sql 查看复制槽的信息:

select * from pg_replication_slots;

上图中,active_pid 为 14200,即过程 ID 为 14200 的 wal_sender 过程曾经在应用该复制槽与 Debezium 交互了

database.server.name 和 table.include.list

当 connector 获取到数据变更的信息后,会把该信息转化为对立的数据格式,并公布到 Kafka 的 topic 中。Debezium 规定一个表对应一个 topic,topic 的名字的格局为 <database.server.name>.<schema name>.<table name>,本例中的表的数据变更音讯将保留到 Kafka 的 topic herodotus.public.sys_organization 中。

能够通过以下代码查看接管到的信息

 @KafkaListener(topics = {"herodotus.public.sys_organization"}, groupId = "herodotus.debezium")
 public void received(String message) {log.info("[Herodotus] |- Recived message from Debezium : [{}]", message);
 }

5. 运行测试

当初,能够基于以上环境的配置,进行 Debezium 捕捉数据成果的测试。能够进入到 Kafka 容器中,应用应用 Kafka 提供的 kafka-console-consumer.sh 查看 Topic 接管到的数据。具体命令如下:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.101.10:9092 --topic herodotus.public.sys_organization

5.1 Insert 测试

在数据库 sys_organization 中插入一条数据

Kafka 的消费者命令行工具收到了来自 Debezium 公布的数据变更音讯:

格式化后的音讯体如下,其中 schema 字段在此先疏忽,重点放 payload.before,payload.after 及 payload.op 字段上:

{
    "schema": {...},
    "payload": {
        "before": null,
        "after": {
            "organization_id": "4",
            "create_time": null,
            "ranking": null,
            "update_time": null,
            "description": null,
            "is_reserved": null,
            "reversion": null,
            "status": 1,
            "a4_biz_org_id": null,
            "biz_org_code": null,
            "biz_org_desc": null,
            "biz_org_id": null,
            "biz_org_name": null,
            "biz_org_type": null,
            "organization_name": "AAAAA",
            "parent_id": null,
            "partition_code": null,
            "short_name": null
        },
        "source": {
            "version": "1.6.0.Final",
            "connector": "postgresql",
            "name": "herodotus",
            "ts_ms": 1626594964405,
            "snapshot": "false",
            "db": "athena",
            "sequence": "[\"63461608\",\"63461608\"]",
            "schema": "public",
            "table": "sys_organization",
            "txId": 2460,
            "lsn": 63461896,
            "xmin": null
        },
        "op": "c",
        "ts_ms": 1626594964846,
        "transaction": null
    }
}

因为是 insert 操作,所以 op 为 c (create),before 为 null,after 为咱们插入的数据。

5.2 Update 测试

在数据库 sys_organization 中批改一条数据

Kafka 的消费者命令行工具收到了来自 Debezium 公布的数据变更音讯:

格式化后的音讯体如下:

{
    "schema": {...},
    "payload": {
        "before": null,
        "after": {
            "organization_id": "4",
            "create_time": null,
            "ranking": null,
            "update_time": null,
            "description": null,
            "is_reserved": null,
            "reversion": null,
            "status": 1,
            "a4_biz_org_id": null,
            "biz_org_code": null,
            "biz_org_desc": null,
            "biz_org_id": null,
            "biz_org_name": null,
            "biz_org_type": null,
            "organization_name": "BBBBB",
            "parent_id": null,
            "partition_code": null,
            "short_name": null
        },
        "source": {
            "version": "1.6.0.Final",
            "connector": "postgresql",
            "name": "herodotus",
            "ts_ms": 1626595173601,
            "snapshot": "false",
            "db": "athena",
            "sequence": "[\"63466888\",\"63466888\"]",
            "schema": "public",
            "table": "sys_organization",
            "txId": 2461,
            "lsn": 63467176,
            "xmin": null
        },
        "op": "u",
        "ts_ms": 1626595173825,
        "transaction": null
    }
}

进行更新产品信息的操作后,consumer 将收到一条 op 为 u (update)的信息,after 为批改后的数据。

5.3 Delete 测试

在数据库 sys_organization 中删除一条数据

Kafka 的消费者命令行工具收到了来自 Debezium 公布的数据变更音讯:

格式化后的音讯体如下:

{
    "schema": {...},
    "payload": {
        "before": {
            "organization_id": "3",
            "create_time": null,
            "ranking": null,
            "update_time": null,
            "description": null,
            "is_reserved": null,
            "reversion": null,
            "status": null,
            "a4_biz_org_id": null,
            "biz_org_code": null,
            "biz_org_desc": null,
            "biz_org_id": null,
            "biz_org_name": null,
            "biz_org_type": null,
            "organization_name": null,
            "parent_id": null,
            "partition_code": null,
            "short_name": null
        },
        "after": null,
        "source": {
            "version": "1.6.0.Final",
            "connector": "postgresql",
            "name": "herodotus",
            "ts_ms": 1626594566933,
            "snapshot": "false",
            "db": "athena",
            "sequence": "[\"63461120\",\"63461120\"]",
            "schema": "public",
            "table": "sys_organization",
            "txId": 2458,
            "lsn": 63461176,
            "xmin": null
        },
        "op": "d",
        "ts_ms": 1626594567136,
        "transaction": null
    }
}

进行删除产品信息的操作后,consumer 将收到一条 op 为 d (delete)的信息,before 为刪除前的数据,after 为 null。

6. 总结

通过 Debezimu 进行数据同步,不仅解决了传统 ETL 时效性不高的问题,还解决了基于音讯队列须要两端编写代码的工程量,而且基于容器的形式更适宜微服务架构的应用,应用 Kafka 进行生产端的整合,使得整合形式更加灵便便捷、终端类型更加丰盛。

示例代码地址:

  • Gitee
  • Github

参考资料

[1] :https://blog.csdn.net/foshans…

[2] :https://access.redhat.com/doc…

[3] :https://debezium.io/documenta…

正文完
 0