关于mysql:Debezium的基本使用以MySQL为例

3次阅读

共计 14410 个字符,预计需要花费 37 分钟才能阅读完成。

  • GreatSQL 社区原创内容未经受权不得随便应用,转载请分割小编并注明起源。
  • GreatSQL 是 MySQL 的国产分支版本,应用上与 MySQL 统一。

    一、Debezium 介绍

    摘自官网:

    Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a _change event stream_, and applications simply read these streams to see the change events in the same order in which they occurred.

简略了解就是 Debezium 能够捕捉数据库中所有行级的数据变动并包装成事件流程序输入。

二、根本应用

上面以 MySQL 为例介绍 Debezium 的根本应用。

1. MySQL 的筹备工作

  1. 筹备一个 MySQL 用户并且领有相应权限,像这样:

    CREATE USER 'dbz'@'%' IDENTIFIED BY 'dbzpwd';
    
    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbz' IDENTIFIED BY 'dbzpwd';
  2. 查看 MySQL 是否开启log-bin

    SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin';
    
    -- If the following error occurs: The 'INFORMATION_SCHEMA.GLOBAL_VARIABLES' feature is disabled...
    -- please execute the given SQL again after execute this SQL: set global show_compatibility_56=on;

    如果是 OFF 则须要批改 MySQL 配置文件,相似上面这样:

    server-id         = 223344        #必须有
    log_bin           = mysql-bin    #log_bin 的值是 binlog 文件序列的根本名称
    binlog_format     = ROW                #必须是 ROW
    binlog_row_image  = FULL            #必须是 FULL
    expire_logs_days  = 10                #根据理论状况而定
  3. 筹备数据库 & 表

    create database inventory;
    create table inventory.a (id bigint primary key auto_increment, name varchar(32));
    insert into inventory.a values (null, 'n1'),(null, 'n2'),(null, 'n3');

2. 编写程序

2.1. 工程依赖(Maven)

pom.xml

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${version.debezium}</version>
</dependency>

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>${version.debezium}</version>
</dependency>

目前 Debezium 最新稳固版本为:1.9.5.Final

2.2. 筹备数据库 & 表

create database inventory;
create table inventory.a (id bigint primary key, name varchar(32));
insert into inventory.a values (1, 'n1'),(2, 'n2'),(3, 'n3');

2.3. 代码编写

package com.greatdb.dbzdemo;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;

/**
 * @author wang.jianwen
 * @version 1.0
 * @date 2022/07/29
 */
public class DebeziumTest {

    private static DebeziumEngine<ChangeEvent<String, String>> engine;

    public static void main(String[] args) throws Exception {final Properties props = new Properties();
        props.setProperty("name", "dbz-engine");
        props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");

        //offset config begin - 应用文件来存储已解决的 binlog 偏移量
        props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
        props.setProperty("offset.storage.file.filename", "/tmp/dbz/storage/mysql_offsets.dat");
        props.setProperty("offset.flush.interval.ms", "0");
        //offset config end

        props.setProperty("database.server.name", "mysql-connector");
        props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");
        props.setProperty("database.history.file.filename", "/tmp/dbz/storage/mysql_dbhistory.txt");

        props.setProperty("database.server.id", "122112");    // 须要与 MySQL 的 server-id 不同
        props.setProperty("database.hostname", "tmg");
        props.setProperty("database.port", "3306");
        props.setProperty("database.user", "mysqluser");
        props.setProperty("database.password", "mysqlpw");
        props.setProperty("database.include.list", "inventory");// 要捕捉的数据库名
        props.setProperty("table.include.list", "inventory.a");// 要捕捉的数据表

        props.setProperty("snapshot.mode", "initial");// 全量 + 增量

        // 应用上述配置创立 Debezium 引擎,输入款式为 Json 字符串格局
        engine = DebeziumEngine.create(Json.class)
                .using(props)
                .notifying(record -> {System.out.println(record);// 输入到控制台
                })
                .using((success, message, error) -> {if (error != null) {
                        // 报错回调
                        System.out.println("------------error, message:" + message + "exception:" + error);
                    }
                    closeEngine(engine);
                })
                .build();

        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(engine);
        addShutdownHook(engine);
        awaitTermination(executor);

        System.out.println("------------main finished.");
    }

    private static void closeEngine(DebeziumEngine<ChangeEvent<String, String>> engine) {
        try {engine.close();
        } catch (IOException ignored) {}}

    private static void addShutdownHook(DebeziumEngine<ChangeEvent<String, String>> engine) {Runtime.getRuntime().addShutdownHook(new Thread(() -> closeEngine(engine)));
    }

    private static void awaitTermination(ExecutorService executor) {if (executor != null) {
            try {executor.shutdown();
                while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {}} catch (InterruptedException e) {Thread.currentThread().interrupt();}
        }
    }
}

3. 测试

程序跑起来后,能够看到控制台输入:

...(省略)EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":1}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":1,"name":"n1"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005186,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005191,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=1,name=n1},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005186,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005191}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":2}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"n2"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005195,"snapshot":"true","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154, snapshot=true}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=2,name=n2},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005195,snapshot=true,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"}],"optional":false,"name":"mysql_connector.inventory.a.Key"},"payload":{"id":3}}, value={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"mysql_connector.inventory.a.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.inventory.a.Envelope"},"payload":{"before":null,"after":{"id":3,"name":"n3"},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql-connector","ts_ms":1659064005196,"snapshot":"last","db":"inventory","sequence":null,"table":"a","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":154,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1659064005196,"transaction":null}}, sourceRecord=SourceRecord{sourcePartition={server=mysql-connector}, sourceOffset={ts_sec=1659064005, file=mysql-bin.000001, pos=154}} ConnectRecord{topic='mysql-connector.inventory.a', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_connector.inventory.a.Key:STRUCT}, value=Struct{after=Struct{id=3,name=n3},source=Struct{version=1.8.1.Final,connector=mysql,name=mysql-connector,ts_ms=1659064005196,snapshot=last,db=inventory,table=a,server_id=0,file=mysql-bin.000001,pos=154,row=0},op=r,ts_ms=1659064005196}, valueSchema=Schema{mysql_connector.inventory.a.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]
...(省略)

能够看到全量的数据曾经输入,要害的数据如下:

..."payload":{"before":null,"after":{"id":1,"name":"n1"}..."op":"r"...
..."payload":{"before":null,"after":{"id":2,"name":"n2"}..."op":"r"...
..."payload":{"before":null,"after":{"id":3,"name":"n3"}..."op":"r"...
  • 接下来 新增 一条数据:

    insert into inventory.a values (4, 'n4');

    控制台输入:

    ..."payload":{"before":null,"after":{"id":4,"name":"n4"}..."op":"c"...
  • 批改 一条数据:

    update inventory.a set name = 'n4-upd' where id = 4;

    控制台输入:

    ..."payload":{"before":{"id":4,"name":"n4"},"after":{"id":4,"name":"n4-upd"}..."op":"u"...
  • 删除 一条数据:

    delete from inventory.a where id = 1;

    控制台输入:

    ..."payload":{"before":{"id":1,"name":"n1"},"after":null..."op":"d"...

    三、总结

    本文以 MySQL 为例介绍了 Debezium 在代码中根本应用流程,对 MySQL 的数据进行常见的增删改操作,Debezium 将捕捉这些数据行的变动,并记录了数据行变动前后的数据,并对外提供事件流,内部能够获取并对事件进行相应解决。

参考:https://debezium.io/documenta…

正文完
 0