• GreatSQL社区原创内容未经受权不得随便应用,转载请分割小编并注明起源。
  • GreatSQL是MySQL的国产分支版本,应用上与MySQL统一。

    一、Debezium介绍

    摘自官网:

    Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a _change event stream_, and applications simply read these streams to see the change events in the same order in which they occurred.

简略了解就是Debezium能够捕捉数据库中所有行级的数据变动并包装成事件流程序输入。

二、根本应用

上面以MySQL为例介绍Debezium的根本应用。

1. MySQL的筹备工作

  1. 筹备一个MySQL用户并且领有相应权限,像这样:

    CREATE USER 'dbz'@'%' IDENTIFIED BY 'dbzpwd';GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbz' IDENTIFIED BY 'dbzpwd';
  2. 查看MySQL是否开启log-bin

    SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin';-- If the following error occurs: The 'INFORMATION_SCHEMA.GLOBAL_VARIABLES' feature is disabled...-- please execute the given SQL again after execute this SQL: set global show_compatibility_56=on;

    如果是OFF则须要批改MySQL配置文件,相似上面这样:

    server-id         = 223344        #必须有log_bin           = mysql-bin    #log_bin的值是binlog文件序列的根本名称binlog_format     = ROW                #必须是ROWbinlog_row_image  = FULL            #必须是FULLexpire_logs_days  = 10                #根据理论状况而定
  3. 筹备数据库&表

    create database inventory;create table inventory.a (id bigint primary key auto_increment, name varchar(32));insert into inventory.a values (null, 'n1'),(null, 'n2'),(null, 'n3');

2. 编写程序

2.1. 工程依赖(Maven)

pom.xml

<dependency>    <groupId>io.debezium</groupId>    <artifactId>debezium-api</artifactId>    <version>${version.debezium}</version></dependency><dependency>    <groupId>io.debezium</groupId>    <artifactId>debezium-embedded</artifactId>    <version>${version.debezium}</version></dependency><dependency>    <groupId>io.debezium</groupId>    <artifactId>debezium-connector-mysql</artifactId>    <version>${version.debezium}</version></dependency>

目前Debezium最新稳固版本为:1.9.5.Final

2.2. 筹备数据库&表

create database inventory;create table inventory.a (id bigint primary key, name varchar(32));insert into inventory.a values (1, 'n1'),(2, 'n2'),(3, 'n3');

2.3. 代码编写

package com.greatdb.dbzdemo;import java.io.IOException;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import io.debezium.engine.ChangeEvent;import io.debezium.engine.DebeziumEngine;import io.debezium.engine.format.Json;/** * @author wang.jianwen * @version 1.0 * @date 2022/07/29 */public class DebeziumTest {    private static DebeziumEngine<ChangeEvent<String, String>> engine;    public static void main(String[] args) throws Exception {        final Properties props = new Properties();        props.setProperty("name", "dbz-engine");        props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");        //offset config begin - 应用文件来存储已解决的binlog偏移量        props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");        props.setProperty("offset.storage.file.filename", "/tmp/dbz/storage/mysql_offsets.dat");        props.setProperty("offset.flush.interval.ms", "0");        //offset config end        props.setProperty("database.server.name", "mysql-connector");        props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");        props.setProperty("database.history.file.filename", "/tmp/dbz/storage/mysql_dbhistory.txt");        props.setProperty("database.server.id", "122112");    //须要与MySQL的server-id不同        props.setProperty("database.hostname", "tmg");        props.setProperty("database.port", "3306");        props.setProperty("database.user", "mysqluser");        props.setProperty("database.password", "mysqlpw");        props.setProperty("database.include.list", "inventory");//要捕捉的数据库名        props.setProperty("table.include.list", "inventory.a");//要捕捉的数据表        props.setProperty("snapshot.mode", "initial");//全量+增量        // 应用上述配置创立Debezium引擎,输入款式为Json字符串格局        engine = DebeziumEngine.create(Json.class)                .using(props)                .notifying(record -> {                    System.out.println(record);//输入到控制台                })                .using((success, message, error) -> {                    if (error != null) {                        // 报错回调                        System.out.println("------------error, message:" + message + "exception:" + error);                    }                    closeEngine(engine);                })                .build();        ExecutorService executor = Executors.newSingleThreadExecutor();        executor.execute(engine);        addShutdownHook(engine);        awaitTermination(executor);        System.out.println("------------main finished.");    }    private static void closeEngine(DebeziumEngine<ChangeEvent<String, String>> engine) {        try {            engine.close();        } catch (IOException ignored) {        }    }    private static void addShutdownHook(DebeziumEngine<ChangeEvent<String, String>> engine) {        Runtime.getRuntime().addShutdownHook(new Thread(() -> closeEngine(engine)));    }    private static void awaitTermination(ExecutorService executor) {        if (executor != null) {            try {                executor.shutdown();                while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {                }            } catch (InterruptedException e) {                Thread.currentThread().interrupt();            }        }    }}

3. 测试

程序跑起来后,能够看到控制台输入:

...(省略)EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":1}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":1,"name":"n1"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005186,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005191,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=1,name=n1},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005186,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005191}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":2}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"n2"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005195,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=2,name=n2},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005195,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":3}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":3,"name":"n3"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005196,"snapshot":"last","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=3,name=n3},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005196,snapshot=last,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]...(省略)

能够看到全量的数据曾经输入,要害的数据如下:

..."payload":{"before":null,"after":{"id":1,"name":"n1"}..."op":"r"......"payload":{"before":null,"after":{"id":2,"name":"n2"}..."op":"r"......"payload":{"before":null,"after":{"id":3,"name":"n3"}..."op":"r"...
  • 接下来新增一条数据:

    insert into inventory.a values (4, 'n4');

    控制台输入:

    ..."payload":{"before":null,"after":{"id":4,"name":"n4"}..."op":"c"...
  • 批改一条数据:

    update inventory.a set name = 'n4-upd' where id = 4;

    控制台输入:

    ..."payload":{"before":{"id":4,"name":"n4"},"after":{"id":4,"name":"n4-upd"}..."op":"u"...
  • 删除一条数据:

    delete from inventory.a where id = 1;

    控制台输入:

    ..."payload":{"before":{"id":1,"name":"n1"},"after":null..."op":"d"...

    三、总结

    本文以MySQL为例介绍了Debezium在代码中根本应用流程,对MySQL的数据进行常见的增删改操作,Debezium将捕捉这些数据行的变动,并记录了数据行变动前后的数据,并对外提供事件流,内部能够获取并对事件进行相应解决。

参考:https://debezium.io/documenta...