CDC(Change Data Capture,即变更数据捕捉)可能帮忙您监测并捕捉数据库的变动。包含数据或数据表的插入、更新和删除等。CDC 将这些变更按产生的程序残缺记录下来,写入到消息中间件中,以供其余服务进行订阅及生产。您可用 CDC 提供的数据做历史库、近实时缓存、提供给音讯队列(MQ),用户生产 MQ 做剖析和审计等。
Flink CDC (CDC Connectors for Apache Flink) 是 Apache Flink 的一组 Source 连接器,它反对从大多数据库中实时地读取存量历史数据和增量变更数据。Flink CDC 可能将数据库的全量和增量数据同步到音讯队列和数据仓库中。Flink CDC 也能够用于实时数据集成,您能够应用它将数据库数据实时导入数据湖或者数据仓库。同时,Flink CDC 还反对数据加工,您能够通过它的 SQL Client 对数据库数据做实时关联、打宽、聚合,并将后果写入到各种存储中。借助 Flink OceanBase CDC 连接器,您能够应用 SQL DDL 创立一个 CDC 源来监控单个表的变动。您也能够在没有部署 Debezium 和 Apache Kafka 的状况下,在一个作业中生产多个数据库和表的变动。在没有 Flink OceanBase CDC 连接器之前,您不能做实时数据的多流 JOIN。
OceanBase 数据库的 CDC 组件次要有:obcdc(原 liboblog)、oblogmsg、oblogproxy 和 oblogclient。各组件性能见下表:
组件名 | 类型 | 依赖组件 | 作用 |
---|---|---|---|
obcdc(原 liboblog) | C++ 依赖库 | 无 | 程序拉取增量日志 |
oblogmsg | C++ 依赖库 | 无 | 解析增量日志格局 |
oblogproxy | C++ 服务 | obcdc 和 oblogmsg | 拉取增量日志 |
oblogclient | Java 依赖库 | 连贯 oblogproxy 获取增量日志 |
本文介绍如何应用 Flink OceanBase CDC 连接器。本文将以 Elasticsearch 为指标数据源,演示如何将数据从 OceanBase 数据库迁徙至 Elasticsearch。
个性
At-Least-Once 解决
OceanBase CDC 连接器是一个 Flink Source 连接器。它将首先读取数据库快照,而后再读取变动事件,并进行 At-Least-Once 解决 。
OceanBase 数据库是一个分布式数据库,它的日志也扩散在不同的服务器上。因为没有相似 MySQL binlog 偏移量的地位信息,OceanBase 数据库用工夫戳作为地位标记。为确保读取残缺的数据,liboblog(读取 OceanBase 日志记录的 C++ 库)可能会在给定的工夫戳之前读取一些日志数据。因而,OceanBase 数据库可能会读到起始点左近工夫戳的反复数据,从而保障了 At-Least-Once 解决 。
启动模式
配置选项 scan.startup.mode
指定 OceanBase CDC 连接器的启动模式。可用取值包含:
- initial(默认):在首次启动时对受监督的数据库表执行初始快照,并持续读取最新的事务日志。
latest-offset
:首次启动时,不对受监督的数据库表执行快照,仅从连接器启动时读取事务日志。timestamp
:在首次启动时不对受监督的数据库表执行初始快照,仅从指定的scan.startup.timestamp
读取事务日志。
生产事务日志
OceanBase CDC 连接器应用 oblogclient 生产 oblogproxy 中的事务日志。
DataStream Source
OceanBase CDC 连接器也能够作为 DataStream Source 应用。您能够创立一个 SourceFunction,例如:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.connectors.oceanbase.OceanBaseSource;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseTableSourceFactory;
import com.ververica.cdc.connectors.oceanbase.table.StartupMode;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
public class OceanBaseSourceExample {public static void main(String[] args) throws Exception {
SourceFunction<String> oceanBaseSource =
OceanBaseSource.<String>builder()
.rsList("127.0.0.1:2882:2881") // set root server list
.startupMode(StartupMode.INITIAL) // set startup mode
.username("[email protected]_tenant") // set cluster username
.password("pswd") // set cluster password
.tenantName("test_tenant") // set captured tenant name, do not support regex
.databaseName("test_db") // set captured database, support regex
.tableName("test_table") // set captured table, support regex
.hostname("127.0.0.1") // set hostname of OceanBase server or proxy
.port(2881) // set the sql port for OceanBase server or proxy
.logProxyHost("127.0.0.1") // set the hostname of log proxy
.logProxyPort(2983) // set the port of log proxy
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env.addSource(oceanBaseSource).print().setParallelism(1);
env.execute("Print OceanBase Snapshot + Commit Log");
}
}
数据类型映射
当启动模式不是 INITIAL
时,连接器无奈取得一个列的精度和比例。为兼容不同的启动模式,连接器不会将一个不同精度的 OceanBase 类型映射到不同的 FLink 类型。例如,BOOLEAN
、TINYINT(1)
或 BIT(1)
均会转换成 BOOLEAN
。在 OceanBase 数据库中,BOOLEAN
等同于 TINYINT(1)
,所以 BOOLEAN
和 TINYINT
类型的列在 Flink 中会被映射为 TINYINT
,而 BIT(1)
在 Flink 中会被映射为 BINARY(1)
。
OceanBase 数据类型 | Flink SQL 类型 | 形容 | |
---|---|---|---|
BOOLEAN TINYINT |
TINYINT | ||
SMALLINT TINYINT UNSIGNED |
SMALLINT | ||
INT MEDIUMINT SMALLINT UNSIGNED |
INT | ||
BIGINT INT UNSIGNED |
BIGINT | ||
BIGINT UNSIGNED | DECIMAL(20, 0) | ||
REAL FLOAT | FLOAT | ||
DOUBLE | DOUBLE | ||
NUMERIC(p, s) DECIMAL(p, s) where p <= 38 |
DECIMAL(p, s) | ||
NUMERIC(p, s) DECIMAL(p, s) where 38 < p <=65 |
STRING | DECIMAL 等同于 NUMERIC。在 OceanBase 数据库中,DECIMAL 数据类型的精度最高为 65。但在 Flink 中,DECIMAL 的精度为 38。因而,如果你定义了一个精度大于 38 的 DECIMAL 列,你要把它映射为 STRING,以防止精度损失。 | |
DATE | DATE | ||
TIME [(p)] | TIME [(p)] | ||
TIMESTAMP [(p)] DATETIME [(p)] |
TIMESTAMP [(p)] | ||
CHAR(n) | CHAR(n) | ||
VARCHAR(n) | VARCHAR(n) | ||
BIT(n) | BINARY(⌈n/8⌉) | ||
BINARY(n) | BINARY(n) | ||
VARBINARY(N) | VARBINARY(N) | ||
TINYTEXT TEXT MEDIUMTEXT LONGTEXT |
STRING | ||
TINYBLOB BLOB MEDIUMBLOB LONGBLOB |
BYTES | ||
YEAR | INT | ||
ENUM | STRING | ||
SET | STRING |
体验 Flink OceanBase CDC 连接器
前提条件
在迁徙 OceanBase 的数据之前,您须要确认以下信息:
- 您已装置 OceanBase 数据库。更多信息,参考 OceanBase 数据库文档。
- 您已装置 Elasticsearch。更多信息,参考 Elasticsearch 文档。
- 您已装置 OceanBase 增量日志拉取组件 oblogproxy。更多信息,参考 oblogproxy 文档(社区版)或 oblogproxy 文档(企业版)。
- 您已装置 Flink。更多信息,参考 Flink 文档。
OceanBase CDC 连接器反对从 OceanBase 数据库读取快照数据和增量数据。本节介绍如何设置 OceanBase CDC 连接器,以在 OceanBase 数据库中查问数据。
依赖
要应用 OceanBase CDC 连接器,您必须提供相干的依赖信息。以下依赖信息实用于应用主动构建工具(如 Maven 或 SBT)构建的我的项目和带有 SQL JAR 包的 SQL 客户端。
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oceanbase-cdc</artifactId>
<!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
<version>2.2.1</version>
</dependency>
下载 SQL 客户端 JAR 包
点击 flink-sql-connector-oceanbase-cdc-2.2.1.jar 下载 JAR 包至 <FLINK_HOME>/lib/
.
阐明:
下载链接仅实用于稳固发行版本。
flink-sql-connector-oceanbase-cdc-XXX-SNAPSHOT
快照版本与开发分支的版本对应。要应用快照版本,您必须自行下载并编译源代码。举荐应用稳固发行版本,例如 flink-sql-connector-oceanbase-cdc-2.2.1.jar
。您能够在 Maven 地方仓库中找到应用稳固发行版本。
配置 OceanBase 数据库和 oblogproxy 服务
- 依照 部署文档 配置 OceanBase 集群。
-
在 sys 租户中,为 oblogproxy 创立一个带明码的用户。更多信息,参考 用户治理文档。
mysql -h${host} -P${port} -uroot mysql> SHOW TENANT; mysql> CREATE USER ${sys_username} IDENTIFIED BY '${sys_password}'; mysql> GRANT ALL PRIVILEGES ON *.* TO ${sys_username} WITH GRANT OPTION;
- 为你想要监控的租户创立一个用户,这个用户用来读取快照数据和变动事件数据。
-
获取
rootservice_list
或config-url
的值。
如果您是社区版用户,应用以下命令:mysql> SHOW PARAMETERS LIKE 'rootservice_list';
如果您是企业版用户,应用以下命令:
mysql> SHOW PARAMETERS LIKE 'obconfig_url';
- 依照 oblogproxy 文档 配置 oblogproxy。
创立 OceanBase CDC 表
应用以下命令,创立 OceanBase CDC 表:
-- 每 3000 毫秒做一次 checkpoint
Flink SQL> SET 'execution.checkpointing.interval' = '3s';
-- 在 Flink SQL 中创立 OceanBase 表 `orders`
Flink SQL> CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'username' = '[email protected]_tenant',
'password' = 'pswd',
'tenant-name' = 'test_tenant',
'database-name' = 'test_db',
'table-name' = 'orders',
'hostname' = '127.0.0.1',
'port' = '2881',
'rootserver-list' = '127.0.0.1:2882:2881',
'logproxy.host' = '127.0.0.1',
'logproxy.port' = '2983');
-- 从表 orders 中读取快照数据和 binlog 数据
Flink SQL> SELECT * FROM orders;
您也能够拜访 Flink 官网文档,疾速体验将数据从 OceanBase 导入到 Elasticsearch。更多信息,参考 Flink 官网文档。
Flink OceanBase CDC 连接器配置项
配置项 | 是否必选 | 默认值 | 类型 | 形容 | |
---|---|---|---|---|---|
connector | 是 | 无 | String | 指定要应用的连接器。此处为 oceanbase-cdc 。 |
|
scan.startup.mode | 是 | 无 | String | 指定 OceanBase CDC 消费者的启动模式。可取值为 initial 、latest-offset 或 timestamp 。 |
|
scan.startup.timestamp | 否 | 无 | Long | 起始点的工夫戳,单位为秒。仅在 scan.startup.mode 的值为 timestamp 时实用。 |
|
username | 是 | 无 | String | 连贯 OceanBase 数据库的用户的名称。 | |
password | 是 | 无 | String | 连贯 OceanBase 数据库时应用的明码。 | |
tenant-name | 是 | 无 | String | 待监控 OceanBase 数据库的租户名,填入准确值。 | |
database-name | 是 | 无 | String | 待监控 OceanBase 数据库的数据库名。 | |
table-name | 是 | 无 | String | 待监控 OceanBase 数据库的表名。 | |
hostname | 否 | 无 | String | OceanBase 数据库或 OceanBbase 代理 ODP 的 IP 地址或主机名。 | |
port | 否 | 无 | String | Integer | OceanBase 数据库服务器的整数端口号。能够是 OceanBase 服务器的 SQL 端口号,默认值为 2881 。或 ODP 的端口号默认值为 2883 。 |
connect.timeout | 否 | 30s | Duration | 连接器在尝试连贯到 OceanBase 数据库服务器后的超时工夫。 | |
server-time-zone | 否 | UTC | String | 数据库服务器中的会话时区,例如 "Asia/Shanghai" 。此选项管制 OceanBase 数据库中的 TIMESTAMP 类型在快照读取时如何转换为 STRING 。确保此选项与 oblogproxy 的时区设置雷同。 |
|
rootserver-list | 是 | 无 | String | 格局为 ip:rpc_port:sql_port 。多个服务器地址应用英文分号 ; 隔开。 |
|
logproxy.host | 是 | 无 | String | oblogproxy 的 IP 地址或主机名。 | |
logproxy.port | 是 | 无 | Integer | oblogproxy 的端口号。 |
反对的元数据
在创立表时,您能够应用以下格局的元数据作为只读列(VIRTUAL)。
Key | 数据类型 | 形容 |
---|---|---|
tenant_name | STRING NOT NULL | 以后记录所属的租户名称。 |
database_name | STRING NOT NULL | 以后记录所属的数据库名称。 |
table_name | STRING NOT NULL | 以后记录所属的表名称。 |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | 该值示意此批改在数据库中产生的工夫。如果这条记录是该表在快照阶段读取的记录,则该值返回 0。 |
如下 SQL 展现了如何在表中应用这些元数据列:
CREATE TABLE products (
tenant_name STRING METADATA FROM 'tenant_name' VIRTUAL,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'username' = '[email protected]_tenant',
'password' = 'pswd',
'tenant-name' = 'test_tenant',
'database-name' = 'test_db',
'table-name' = 'orders',
'hostname' = '127.0.0.1',
'port' = '2881',
'rootserver-list' = '127.0.0.1:2882:2881',
'logproxy.host' = '127.0.0.1',
'logproxy.port' = '2983');