关于数据库:三步玩转如何通过Flink-OceanBase-CDC连接器快速查询数据

24次阅读

共计 7844 个字符,预计需要花费 20 分钟才能阅读完成。

CDC(Change Data Capture,即变更数据捕捉)可能帮忙您监测并捕捉数据库的变动。包含数据或数据表的插入、更新和删除等。CDC 将这些变更按产生的程序残缺记录下来,写入到消息中间件中,以供其余服务进行订阅及生产。您可用 CDC 提供的数据做历史库、近实时缓存、提供给音讯队列(MQ),用户生产 MQ 做剖析和审计等。

Flink CDC (CDC Connectors for Apache Flink) 是 Apache Flink 的一组 Source 连接器,它反对从大多数据库中实时地读取存量历史数据和增量变更数据。Flink CDC 可能将数据库的全量和增量数据同步到音讯队列和数据仓库中。Flink CDC 也能够用于实时数据集成,您能够应用它将数据库数据实时导入数据湖或者数据仓库。同时,Flink CDC 还反对数据加工,您能够通过它的 SQL Client 对数据库数据做实时关联、打宽、聚合,并将后果写入到各种存储中。借助 Flink OceanBase CDC 连接器,您能够应用 SQL DDL 创立一个 CDC 源来监控单个表的变动。您也能够在没有部署 Debezium 和 Apache Kafka 的状况下,在一个作业中生产多个数据库和表的变动。在没有 Flink OceanBase CDC 连接器之前,您不能做实时数据的多流 JOIN。

OceanBase 数据库的 CDC 组件次要有:obcdc(原 liboblog)、oblogmsg、oblogproxy 和 oblogclient。各组件性能见下表:

组件名 类型 依赖组件 作用
obcdc(原 liboblog) C++ 依赖库 程序拉取增量日志
oblogmsg C++ 依赖库 解析增量日志格局
oblogproxy C++ 服务 obcdc 和 oblogmsg 拉取增量日志
oblogclient Java 依赖库 连贯 oblogproxy 获取增量日志

本文介绍如何应用 Flink OceanBase CDC 连接器。本文将以 Elasticsearch 为指标数据源,演示如何将数据从 OceanBase 数据库迁徙至 Elasticsearch。

个性

At-Least-Once 解决

OceanBase CDC 连接器是一个 Flink Source 连接器。它将首先读取数据库快照,而后再读取变动事件,并进行 At-Least-Once 解决

OceanBase 数据库是一个分布式数据库,它的日志也扩散在不同的服务器上。因为没有相似 MySQL binlog 偏移量的地位信息,OceanBase 数据库用工夫戳作为地位标记。为确保读取残缺的数据,liboblog(读取 OceanBase 日志记录的 C++ 库)可能会在给定的工夫戳之前读取一些日志数据。因而,OceanBase 数据库可能会读到起始点左近工夫戳的反复数据,从而保障了 At-Least-Once 解决

启动模式

配置选项 scan.startup.mode 指定 OceanBase CDC 连接器的启动模式。可用取值包含:

  • initial(默认):在首次启动时对受监督的数据库表执行初始快照,并持续读取最新的事务日志。
  • latest-offset:首次启动时,不对受监督的数据库表执行快照,仅从连接器启动时读取事务日志。
  • timestamp:在首次启动时不对受监督的数据库表执行初始快照,仅从指定的 scan.startup.timestamp 读取事务日志。

生产事务日志

OceanBase CDC 连接器应用 oblogclient 生产 oblogproxy 中的事务日志。

DataStream Source

OceanBase CDC 连接器也能够作为 DataStream Source 应用。您能够创立一个 SourceFunction,例如:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.connectors.oceanbase.OceanBaseSource;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseTableSourceFactory;
import com.ververica.cdc.connectors.oceanbase.table.StartupMode;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;

public class OceanBaseSourceExample {public static void main(String[] args) throws Exception {
    SourceFunction<String> oceanBaseSource =
        OceanBaseSource.<String>builder()
            .rsList("127.0.0.1:2882:2881")  // set root server list
            .startupMode(StartupMode.INITIAL) // set startup mode
            .username("[email protected]_tenant")  // set cluster username
            .password("pswd")  // set cluster password
            .tenantName("test_tenant")  // set captured tenant name, do not support regex
            .databaseName("test_db")  // set captured database, support regex
            .tableName("test_table")  // set captured table, support regex
            .hostname("127.0.0.1")  // set hostname of OceanBase server or proxy
            .port(2881)  // set the sql port for OceanBase server or proxy
            .logProxyHost("127.0.0.1")  // set the hostname of log proxy
            .logProxyPort(2983)  // set the port of log proxy
            .deserializer(new JsonDebeziumDeserializationSchema())  // converts SourceRecord to JSON String
            .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // enable checkpoint
    env.enableCheckpointing(3000);
    
    env.addSource(oceanBaseSource).print().setParallelism(1);

    env.execute("Print OceanBase Snapshot + Commit Log");
  }
}

数据类型映射

当启动模式不是 INITIAL 时,连接器无奈取得一个列的精度和比例。为兼容不同的启动模式,连接器不会将一个不同精度的 OceanBase 类型映射到不同的 FLink 类型。例如,BOOLEANTINYINT(1)BIT(1) 均会转换成 BOOLEAN。在 OceanBase 数据库中,BOOLEAN 等同于 TINYINT(1),所以 BOOLEANTINYINT 类型的列在 Flink 中会被映射为 TINYINT,而 BIT(1) 在 Flink 中会被映射为 BINARY(1)

OceanBase 数据类型 Flink SQL 类型 形容
BOOLEAN
TINYINT
TINYINT
SMALLINT
TINYINT
UNSIGNED
SMALLINT
INT
MEDIUMINT
SMALLINT
UNSIGNED
INT
BIGINT
INT UNSIGNED
BIGINT
BIGINT UNSIGNED DECIMAL(20, 0)
REAL FLOAT FLOAT
DOUBLE DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
where p <= 38
DECIMAL(p, s)
NUMERIC(p, s)
DECIMAL(p, s)
where 38 < p <=65
STRING DECIMAL 等同于 NUMERIC。在 OceanBase 数据库中,DECIMAL 数据类型的精度最高为 65。但在 Flink 中,DECIMAL 的精度为 38。因而,如果你定义了一个精度大于 38 的 DECIMAL 列,你要把它映射为 STRING,以防止精度损失。
DATE DATE
TIME [(p)] TIME [(p)]
TIMESTAMP [(p)]
DATETIME [(p)]
TIMESTAMP [(p)]
CHAR(n) CHAR(n)
VARCHAR(n) VARCHAR(n)
BIT(n) BINARY(⌈n/8⌉)
BINARY(n) BINARY(n)
VARBINARY(N) VARBINARY(N)
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
STRING
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BYTES
YEAR INT
ENUM STRING
SET STRING

体验 Flink OceanBase CDC 连接器

前提条件

在迁徙 OceanBase 的数据之前,您须要确认以下信息:

  • 您已装置 OceanBase 数据库。更多信息,参考 OceanBase 数据库文档。
  • 您已装置 Elasticsearch。更多信息,参考 Elasticsearch 文档。
  • 您已装置 OceanBase 增量日志拉取组件 oblogproxy。更多信息,参考 oblogproxy 文档(社区版)或 oblogproxy 文档(企业版)。
  • 您已装置 Flink。更多信息,参考 Flink 文档。

OceanBase CDC 连接器反对从 OceanBase 数据库读取快照数据和增量数据。本节介绍如何设置 OceanBase CDC 连接器,以在 OceanBase 数据库中查问数据。

依赖

要应用 OceanBase CDC 连接器,您必须提供相干的依赖信息。以下依赖信息实用于应用主动构建工具(如 Maven 或 SBT)构建的我的项目和带有 SQL JAR 包的 SQL 客户端。

<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-connector-oceanbase-cdc</artifactId>
  <!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
  <version>2.2.1</version>
</dependency>

下载 SQL 客户端 JAR 包

点击 flink-sql-connector-oceanbase-cdc-2.2.1.jar 下载 JAR 包至 <FLINK_HOME>/lib/.

阐明:

下载链接仅实用于稳固发行版本。

flink-sql-connector-oceanbase-cdc-XXX-SNAPSHOT 快照版本与开发分支的版本对应。要应用快照版本,您必须自行下载并编译源代码。举荐应用稳固发行版本,例如 flink-sql-connector-oceanbase-cdc-2.2.1.jar。您能够在 Maven 地方仓库中找到应用稳固发行版本。

配置 OceanBase 数据库和 oblogproxy 服务

  1. 依照 部署文档 配置 OceanBase 集群。
  2. 在 sys 租户中,为 oblogproxy 创立一个带明码的用户。更多信息,参考 用户治理文档。

    mysql -h${host} -P${port} -uroot
    mysql> SHOW TENANT;
    mysql> CREATE USER ${sys_username} IDENTIFIED BY '${sys_password}';
    mysql> GRANT ALL PRIVILEGES ON *.* TO ${sys_username} WITH GRANT OPTION;
  3. 为你想要监控的租户创立一个用户,这个用户用来读取快照数据和变动事件数据。
  4. 获取 rootservice_listconfig-url 的值。
    如果您是社区版用户,应用以下命令:

    mysql> SHOW PARAMETERS LIKE 'rootservice_list';

    如果您是企业版用户,应用以下命令:

    mysql> SHOW PARAMETERS LIKE 'obconfig_url';
  5. 依照 oblogproxy 文档 配置 oblogproxy。

创立 OceanBase CDC 表

应用以下命令,创立 OceanBase CDC 表:

-- 每 3000 毫秒做一次 checkpoint                   
Flink SQL> SET 'execution.checkpointing.interval' = '3s';

-- 在 Flink SQL 中创立 OceanBase 表 `orders`
Flink SQL> CREATE TABLE orders (
    order_id     INT,
    order_date   TIMESTAMP(0),
    customer_name STRING,
    price        DECIMAL(10, 5),
    product_id   INT,
    order_status BOOLEAN,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'oceanbase-cdc',
    'scan.startup.mode' = 'initial',
    'username' = '[email protected]_tenant',
    'password' = 'pswd',
    'tenant-name' = 'test_tenant',
    'database-name' = 'test_db',
    'table-name' = 'orders',
    'hostname' = '127.0.0.1',
    'port' = '2881',
    'rootserver-list' = '127.0.0.1:2882:2881',
    'logproxy.host' = '127.0.0.1',
    'logproxy.port' = '2983');

-- 从表 orders 中读取快照数据和 binlog 数据
Flink SQL> SELECT * FROM orders;

您也能够拜访 Flink 官网文档,疾速体验将数据从 OceanBase 导入到 Elasticsearch。更多信息,参考 Flink 官网文档。

Flink OceanBase CDC 连接器配置项

配置项 是否必选 默认值 类型 形容
connector String 指定要应用的连接器。此处为 oceanbase-cdc
scan.startup.mode String 指定 OceanBase CDC 消费者的启动模式。可取值为 initiallatest-offsettimestamp
scan.startup.timestamp Long 起始点的工夫戳,单位为秒。仅在 scan.startup.mode 的值为 timestamp 时实用。
username String 连贯 OceanBase 数据库的用户的名称。
password String 连贯 OceanBase 数据库时应用的明码。
tenant-name String 待监控 OceanBase 数据库的租户名,填入准确值。
database-name String 待监控 OceanBase 数据库的数据库名。
table-name String 待监控 OceanBase 数据库的表名。
hostname String OceanBase 数据库或 OceanBbase 代理 ODP 的 IP 地址或主机名。
port String Integer OceanBase 数据库服务器的整数端口号。能够是 OceanBase 服务器的 SQL 端口号,默认值为 2881。或 ODP 的端口号默认值为 2883
connect.timeout 30s Duration 连接器在尝试连贯到 OceanBase 数据库服务器后的超时工夫。
server-time-zone UTC String 数据库服务器中的会话时区,例如 "Asia/Shanghai"。此选项管制 OceanBase 数据库中的 TIMESTAMP 类型在快照读取时如何转换为 STRING。确保此选项与 oblogproxy 的时区设置雷同。
rootserver-list String 格局为 ip:rpc_port:sql_port。多个服务器地址应用英文分号 ; 隔开。
logproxy.host String oblogproxy 的 IP 地址或主机名。
logproxy.port Integer oblogproxy 的端口号。

反对的元数据

在创立表时,您能够应用以下格局的元数据作为只读列(VIRTUAL)。

Key 数据类型 形容
tenant_name STRING NOT NULL 以后记录所属的租户名称。
database_name STRING NOT NULL 以后记录所属的数据库名称。
table_name STRING NOT NULL 以后记录所属的表名称。
op_ts TIMESTAMP_LTZ(3) NOT NULL 该值示意此批改在数据库中产生的工夫。如果这条记录是该表在快照阶段读取的记录,则该值返回 0。

如下 SQL 展现了如何在表中应用这些元数据列:

CREATE TABLE products (
    tenant_name STRING METADATA FROM 'tenant_name' VIRTUAL,
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    order_id INT,
    order_date TIMESTAMP(0),
    customer_name STRING,
    price DECIMAL(10, 5),
    product_id INT,
    order_status BOOLEAN,
    PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
   'connector' = 'oceanbase-cdc',
   'scan.startup.mode' = 'initial',
   'username' = '[email protected]_tenant',
   'password' = 'pswd',
   'tenant-name' = 'test_tenant',
   'database-name' = 'test_db',
   'table-name' = 'orders',
   'hostname' = '127.0.0.1',
   'port' = '2881',
   'rootserver-list' = '127.0.0.1:2882:2881',
   'logproxy.host' = '127.0.0.1',
   'logproxy.port' = '2983');

正文完
 0