1. 背景

始终在欠缺本人的微服务架构,其中蕴含分布式工作流服务的建设,目前采纳的是Camunda工作流引擎。应用Camunda工作流,就会波及到工作流引擎的用户体系如何与现有用户体系集成的问题(Flowable、Activity也相似)。现有设计中,工作流定位偏重于企业外部流程的流转,因而零碎中设计了单位、部门、人员以及人事归属与Camunda工作流用户体系对应。

功能设计实现,就面临另外一个问题,如何解决现有人事体系数据如何【实时】同步至Camunda工作流引擎中。如果现有体系数据与工作流数据在同一个库中,绝对比拟好解决。而微服务架构中,不同服务的数据通常寄存在不同数据库中,那么就须要进行数据的同步。采纳的形式不同,能够获得的成果也雷同。

最后思考如下两种计划,然而都略感有余:

  • ETL:应用ETL工具进行数据同步是典型的形式,能够抉择工具也比拟多。开源的ETL工具增量同步问题解决的并不现实,不应用增量同步数那么数据同步始终存在时间差;商业的ETL工具增量同步解决的比拟好,然而宏大且低廉。
  • 音讯队列:音讯队列是多系统集成广泛采纳的形式,能够很好的解决数据同步的实时问题。然而数据同步的两端都须要本人编写代码,一端写生产代码一端写生产代码,生产端代码还要捆绑现有体系数据所有操作,须要的编写量比拟大。

查问比照的大量的材料,最终抉择了Debezimu来解决以上问题以及将来更多数据同步的问题。

2. Debezium介绍

RedHat开源的Debezium是一个将多种数据源实时变更数据捕捉,造成数据流输入的开源工具。
它是一种CDC(Change Data Capture)工具,工作原理相似大家所熟知的Canal, DataBus, Maxwell等,是通过抽取数据库日志来获取变更的。
官网介绍为:

Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable and fast, so your apps can respond quickly and never miss an event, even when things go wrong

Debezium是一个分布式平台,它将您现有的数据库转换为事件流,因而应用程序能够看到数据库中的每一个行级更改并立刻做出响应。Debezium构建在Apache Kafka之上,并提供Kafka连贯兼容的连接器来监督特定的数据库管理系统。

Debezium当初已反对以下数据库:

  • MySQL
  • MongoDB
  • PostgreSQL
  • Oracle
  • SQL Server
  • Db2
  • Cassandra
  • Vitess

与ETL不同,Debezimu只反对在生产端连贯数据库,生产端不反对连贯数据库,而是须要本人编写代码接管Kafka音讯数据。剖析下来这种形式更加灵便,还能够很好利用现有微服务架构中的Kafka。

3. 疾速搭建Debezimu测试环境。

目前,Debezium最新的Stable版本是1.6。 Debezium曾经把要用到的Component打包成了Docker的Image,因而,咱们只须要装置并启动Docker后就能够按上面的步骤疾速搭建测试环境了。

如何在Windows下搭建Docker环境,能够参考集体的相干文章:

(1)Windows 10 2004 (20H1) 装置 Docker Desktop for Windows (2.3.0.2) 以 WSL 2 形式运行容器

(2)Windows 10 将 Docker Desktop for Windows(WSL 2 形式)文件存储移出C盘搁置到其它目录

3.1 运行Zookeeper

docker run -it --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.6

3.2 运行Kafka

docker run -it --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.6

3.3 运行PostgreSQL

docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres debezium/example-postgres:1.6
下面代码中应用的是:debezium/example-postgres:1.6,查看Debezimu官网文档以及其它示例都是这个。实际上Debezimu对PostgreSQL 9~13都进行了Docker封装,能够依据本人的须要在Docker Hub中选择响应的PostgreSQL版本。
debezium/postgres很小,应用也比拟不便,而且也进行了必要的设置,无须再进行额定的配置就能够间接应用。

3.4 运行Debezimu Connect

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link postgres:postgres debezium/connect:1.6

Debezium的container启动时须要传入如下环境变量:

  • GROUP_ID: 分组ID,若须要启动多个Debezium的实例组成集群,那么它们的GROUP_ID必须被设置为一样
  • CONFIG_STORAGE_TOPIC:上面须要调用Debezium提供的RestFUL API治理connector,connector的信息就是保留在CONFIG_STORAGE_TOPIC指定的kafka topic下。
  • OFFSET_STORAGE_TOPIC: connector监控数据流的offset,若咱们应用的是PostgreSQL Connector,那么OFFSET_STORAGE_TOPIC指定的topic中存的就是PostgreSQL的lsn。

3.5 创立Connector

通过下面4个步骤后,Debezium的测试环境就搭建好了,当初须要调用Debezium提供的API创立connector,使Debezium与数据库之间建设关系。咱们把上面的payload POST到http://<ip addr of debezium>:8083/connectors/

{  "name": "fulfillment-connector",    "config": {    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",     "database.hostname": "192.168.99.100",     "database.port": "5432",     "database.user": "postgres",     "database.password": "postgres",     "database.dbname" : "postgres",     "database.server.name": "fulfillment",     "table.include.list": "public.inventory"  }}
  1. "name":注册到Kafka Connect服务的Connector名称
  2. "connector.class":PostgreSQL connector class名称
  3. "database.hostname":PostgreSQL 数据库地址
  4. "database.port":PostgreSQL 数据库端口
  5. "database.user":PostgreSQL 数据库用户名
  6. "database.password":PostgreSQL数据明码
  7. "database.dbname":连贯的PostgreSQL数据库
  8. "database.server.name":虚构的数据库Server名称,能够依据理论需要定义,生产Kafka数据时要应用该值
  9. "table.include.list":监听的数据表列表,以","宰割。PostgreSQL要将表名写全,格局"<schema-name>.<table-name>"。如果没有特定的Schema,那么就是默认的public

上面为实现的curl命令:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name": "fulfillment-connector", "config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "192.168.99.100", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "postgres",    "database.server.name": "fulfillment", "table.include.list": "public.inventory" }}'

下面是示例,因为应用的是Windows,集体感觉curl不不便,换用postman:

3.6 Docker Compose 配置

为了方便使用,将以上Docker命令整合为Docker Compose配置,具体如下:

version: "3"services:  postgres:    image: debezium/postgres:13    container_name: postgres    hostname: postgres    environment:      POSTGRES_USER: herodotus      POSTGRES_PASSWORD: herodotus    ports:      - 5432:5432  zookeeper:    image: debezium/zookeeper:1.6    container_name: zookeeper    restart: always    ports:      - 2181:2181      - 2888:2888      - 3888:3888  kafka:    image: debezium/kafka:1.6    container_name: kafka    restart: always    ports:      - 9092:9092    environment:      ZOOKEEPER_CONNECT: zookeeper:2181      BOOTSTRAP_SERVERS: kafka:9092    depends_on:      - zookeeper  connect:    image: debezium/connect:1.6    container_name: connect    restart: always    ports:      - 8083:8083    environment:      GROUP_ID: 1      CONFIG_STORAGE_TOPIC: herodotus_connect_configs      OFFSET_STORAGE_TOPIC: herodotus_connect_offsets      STATUS_STORAGE_TOPIC: herodotus_connect_statuses      BOOTSTRAP_SERVERS: kafka:9092    depends_on:      - kafka

4. 内部数据库配置

上一章节,介绍了Debezimu测试环境的形式,其中应用的debezium/postgres是曾经进行过配置的,所以应用起来比拟不便。在理论应用过程中,很多时候是应用独立搭建PostgreSQL,那么就须要对PostgreSQL进行配置。

4.1 以Docker的形式运行根底组件

本章节次要介绍Debezimu与独立的PostgreSQL数据库连贯,因而除了PostgreSQL以外,Zookeeper、Kafka、Debezimu Connect仍旧应用Docker形式部署。具体部署的Docker Compose配置如下:

version: "3"services:  zookeeper:    image: debezium/zookeeper:1.6    container_name: zookeeper    hostname: zookeeper    environment:      ZOOKEEPER_SERVER_ID: 1    ports:      - 2181:2181      - 2888:2888      - 3888:3888  kafka:    image: debezium/kafka:1.6    container_name: kafka    hostname: kafka    ports:      - 9092:9092    environment:      BROKER_ID: 1      ZOOKEEPER_CONNECT: zookeeper:2181      KAFKA_LISTENERS: LISTENER_INNER://kafka:29092,LISTENER_OUTER://0.0.0.0:9092      KAFKA_ADVERTISED_LISTENERS: LISTENER_INNER://kafka:29092,LISTENER_OUTER://192.168.101.10:9092      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_INNER:PLAINTEXT,LISTENER_OUTER:PLAINTEXT      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_INNER      KAFKA_ALLOW_PLAINTEXT_LISTENER: 'yes'      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'    depends_on:      - zookeeper  connect:    image: debezium/connect:1.6    container_name: connect    hostname: connect    ports:      - 8083:8083    environment:      GROUP_ID: 1      CONFIG_STORAGE_TOPIC: herodotus_connect_configs      OFFSET_STORAGE_TOPIC: herodotus_connect_offsets      STATUS_STORAGE_TOPIC: herodotus_connect_statuses      BOOTSTRAP_SERVERS: kafka:9092    depends_on:      - kafka

其中Kafka Listener相干的配置,是为了解决Spring Kafka连贯Kafka会呈现:Connection to node -1 could not be established. Broker may not be available.问题。

4.2 批改PostgreSQL配置

Logical Decoding性能是PostgreSQL在9.4退出的,它是一种机制,容许提取提交到事务日志的更改,并在输入插件的帮忙下以用户敌对的形式解决这些更改。输入插件使客户机可能应用更改。

PostgreSQL connector 读取和解决数据库变动次要蕴含两个局部:

  • Logical Decoding 输入插件:依据抉择可能须要装置输入插件。运行PostgreSQL服务之前,必须配置replication slot 来启用你所抉择的输入插件,有以下几个输入插件供选择:

    • decoderbufs 是基于Protobuf的,目前由Debezimu社区保护
    • wal2json 是基于JSON的,目前由wal2json社区保护
    • pgoutput在PostgreSQL 10及以上版本中是规范的Logical Decoding 输入插件。是由PostgreSQL社区保护,由PostgreSQL本人用于Logical Replication。这个插件是内置装置的,所以不须要额定装置。
  • Java代码(就是连贯Kafka Connect的代码):负责读取由Logical Decoding 输入插件产生的数据。
  • Logical Decoding 输入插件不反对DDL变更,这意味着Connector不能把DDL变更事件发送给消费者
  • Logical Decoding Replicaiton Slots反对数据库的primary服务器。因而如果是PostgreSQL服务的集群,Connector只能在primary服务器激活。如果primary服务器呈现问题,那么connector就会停掉。

4.2.1 批改PostgreSQL配置

在${PostgreSQL_HOME}/13/data目录下,找到postgresql.conf

批改以下配置:

wal_level=logicalmax_wal_senders=1max_replication_slots=1
  • wal_level 告诉数据库应用 logical decoding 读取预写日志
  • max_wal_senders 告诉数据库独立解决WAL变更的独立过程数量
  • max_replication_slots 告诉数据库解决WAL变更流所容许最大replication slots数目
配置实现后记得重启数据库

4.2.2 设置数据库权限

须要给PostgreSQL 用户调配replication权限。定义一个PostgreSQL role,至多调配REPLICATIONLOGION两项权限,示例代码如下:

CREATE ROLE <name> REPLICATION LOGIN;

具体操作能够参考一下脚本:

-- pg新建用户CREATE USER user WITH PASSWORD 'pwd';-- 给用户复制流权限ALTER ROLE user replication;-- 给用户登录数据库权限grant CONNECT ON DATABASE test to user;-- 把以后库public下所有表查问权限赋给用户GRANT SELECT ON ALL TABLES IN SCHEMA public TO user;

4.3 创立Connector

把上面的payload POST到http://<ip addr of debezium>:8083/connectors/

{    "name": "herodotus-connector",    "config": {        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",         "database.hostname": "192.168.101.10",         "database.port": "15432",         "database.user": "athena",         "database.password": "athena",         "database.dbname" : "athena",         "database.server.name": "herodotus",        "slot.name": "herodotus_slot",        "table.include.list": "public.sys_organization",        "publication.name": "herodotus_public_connector",        "publication.autocreate.mode": "filtered",        "plugin.name": "pgoutput"    }}

postman 界面操作如下图:

payload有两个字段,name是connector的名字,config是connector的配置信息,下表为config中的字段的解释:

字段名称阐明
connector.classconnector的实现类,本文应用的是io.debezium.connector.postgresql.PostgresConnector,因为咱们的数据库是PostgreSQL
database.hostname数据库服务的IP或域名
database.port数据库服务的端口
database.user连贯数据库的用户
database.password连贯数据库的明码
database.dbname数据库名
database.server.name每个被监控的表在Kafka都会对应一个topic,topic的命名标准是<database.server.name>.<schema>.<table>
slot.namePostgreSQL的复制槽(Replication Slot)名称
table.include.list如果设置了table.include.list,即在该list中的表才会被Debezium监控
plugin.namePostgreSQL服务端装置的解码插件名称,能够是decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming 和 pgoutput。如果不指定该值,则默认应用decoderbufs。<br/><br/>本例子中应用了pgoutput,因为它是PostgreSQL 10+自带的解码器,而其余解码器都必须在PostgreSQL服务器装置插件。
publication.namePostgreSQL端的WAL公布(publication)名称,每个Connector都应该在PostgreSQL有本人对应的publication,如果不指定该参数,那么publication的名称为dbz_publication
publication.autocreate.mode该值在plugin.name设置为pgoutput才会无效。有以下三个值:<br/><br/>all_tables - debezium会查看publication是否存在,如果publication不存在,connector则应用脚本CREATE PUBLICATION <publication_name> FOR ALL TABLES创立publication,即该发布者会监控所有表的变更状况。<br/><br/>disabled - connector不会查看有无publication存在,如果publication不存在,则在创立connector会报错.<br/><br/>filtered - 与all_tables不同的是,debezium会依据connector的配置中的table.include.list生成生成创立publication的脚本: CREATE PUBLICATION <publication_name> FOR TABLE <tbl1, tbl2, tbl3>。例如,本例子中,“table.include.list"值为"public.sys_organization”,则publication只会监控这个表的变更状况。

上面联合本例子中connector的配置信息对几个重点属性进行进一步阐明:

Slot.name 重点阐明

依照上例Debezium会在PostgreSQL创立一个名为herodotus_slot的复制槽,本例中创立的connector须要通过该复制槽获取数据变更的信息。

能够通过以下sql查看复制槽的信息:

select * from pg_replication_slots;

上图中,active_pid为14200,即过程ID为14200的wal_sender过程曾经在应用该复制槽与Debezium交互了

database.server.name和table.include.list

当connector获取到数据变更的信息后,会把该信息转化为对立的数据格式,并公布到Kafka的topic中。Debezium规定一个表对应一个topic,topic的名字的格局为 <database.server.name>.<schema name>.<table name>,本例中的表的数据变更音讯将保留到Kafka的topic herodotus.public.sys_organization中。

能够通过以下代码查看接管到的信息

 @KafkaListener(topics = {"herodotus.public.sys_organization"}, groupId = "herodotus.debezium") public void received(String message) {     log.info("[Herodotus] |- Recived message from Debezium : [{}]", message); }

5. 运行测试

当初,能够基于以上环境的配置,进行Debezium捕捉数据成果的测试。能够进入到Kafka容器中,应用应用Kafka提供的kafka-console-consumer.sh查看Topic接管到的数据。具体命令如下:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.101.10:9092 --topic herodotus.public.sys_organization

5.1 Insert 测试

在数据库sys_organization中插入一条数据

Kafka的消费者命令行工具收到了来自Debezium公布的数据变更音讯:

格式化后的音讯体如下,其中schema字段在此先疏忽,重点放payload.before,payload.after及payload.op字段上:

{    "schema": {        ...    },    "payload": {        "before": null,        "after": {            "organization_id": "4",            "create_time": null,            "ranking": null,            "update_time": null,            "description": null,            "is_reserved": null,            "reversion": null,            "status": 1,            "a4_biz_org_id": null,            "biz_org_code": null,            "biz_org_desc": null,            "biz_org_id": null,            "biz_org_name": null,            "biz_org_type": null,            "organization_name": "AAAAA",            "parent_id": null,            "partition_code": null,            "short_name": null        },        "source": {            "version": "1.6.0.Final",            "connector": "postgresql",            "name": "herodotus",            "ts_ms": 1626594964405,            "snapshot": "false",            "db": "athena",            "sequence": "[\"63461608\",\"63461608\"]",            "schema": "public",            "table": "sys_organization",            "txId": 2460,            "lsn": 63461896,            "xmin": null        },        "op": "c",        "ts_ms": 1626594964846,        "transaction": null    }}

因为是insert操作,所以op为c (create),before为null,after为咱们插入的数据。

5.2 Update 测试

在数据库sys_organization中批改一条数据

Kafka的消费者命令行工具收到了来自Debezium公布的数据变更音讯:

格式化后的音讯体如下:

{    "schema": {        ...    },    "payload": {        "before": null,        "after": {            "organization_id": "4",            "create_time": null,            "ranking": null,            "update_time": null,            "description": null,            "is_reserved": null,            "reversion": null,            "status": 1,            "a4_biz_org_id": null,            "biz_org_code": null,            "biz_org_desc": null,            "biz_org_id": null,            "biz_org_name": null,            "biz_org_type": null,            "organization_name": "BBBBB",            "parent_id": null,            "partition_code": null,            "short_name": null        },        "source": {            "version": "1.6.0.Final",            "connector": "postgresql",            "name": "herodotus",            "ts_ms": 1626595173601,            "snapshot": "false",            "db": "athena",            "sequence": "[\"63466888\",\"63466888\"]",            "schema": "public",            "table": "sys_organization",            "txId": 2461,            "lsn": 63467176,            "xmin": null        },        "op": "u",        "ts_ms": 1626595173825,        "transaction": null    }}

进行更新产品信息的操作后,consumer将收到一条op为u (update)的信息,after为批改后的数据。

5.3 Delete测试

在数据库sys_organization中删除一条数据

Kafka的消费者命令行工具收到了来自Debezium公布的数据变更音讯:

格式化后的音讯体如下:

{    "schema": {        ...    },    "payload": {        "before": {            "organization_id": "3",            "create_time": null,            "ranking": null,            "update_time": null,            "description": null,            "is_reserved": null,            "reversion": null,            "status": null,            "a4_biz_org_id": null,            "biz_org_code": null,            "biz_org_desc": null,            "biz_org_id": null,            "biz_org_name": null,            "biz_org_type": null,            "organization_name": null,            "parent_id": null,            "partition_code": null,            "short_name": null        },        "after": null,        "source": {            "version": "1.6.0.Final",            "connector": "postgresql",            "name": "herodotus",            "ts_ms": 1626594566933,            "snapshot": "false",            "db": "athena",            "sequence": "[\"63461120\",\"63461120\"]",            "schema": "public",            "table": "sys_organization",            "txId": 2458,            "lsn": 63461176,            "xmin": null        },        "op": "d",        "ts_ms": 1626594567136,        "transaction": null    }}

进行删除产品信息的操作后,consumer将收到一条op为d (delete)的信息,before为刪除前的数据,after为null。

6.总结

通过Debezimu进行数据同步,不仅解决了传统ETL时效性不高的问题,还解决了基于音讯队列须要两端编写代码的工程量,而且基于容器的形式更适宜微服务架构的应用,应用Kafka进行生产端的整合,使得整合形式更加灵便便捷、终端类型更加丰盛。

示例代码地址:

  • Gitee
  • Github

参考资料

[1] :https://blog.csdn.net/foshans...

[2] :https://access.redhat.com/doc...

[3] :https://debezium.io/documenta...