实时同步是 ChunJun 的⼀个重要个性,指在数据同步过程中,数据源与⽬标零碎之间的数据传输和更新⼏乎在同⼀工夫进⾏。
在实时同步场景中咱们更加关注源端,当源零碎中的数据发⽣变动时,这些变动会⽴即传输并应⽤到⽬标零碎,以保障两个零碎中的数据放弃⼀致。这个个性须要作业运⾏过程中 source 插件不间断地频繁拜访源端。在⽣产场景下,对于这类⻓工夫运⾏、资源可预估、须要稳定性的作业,咱们举荐使⽤ perjob 模式部署。
插件⽀持 JSON 脚本和 SQL 脚本两种配置⽅式,具体的参数配置请参考「ChunJun 连接器文档」:https://sourl.cn/vxq6Zp
本文将为大家介绍如何应用 ChunJun 实时同步,以及 ChunJun ⽀持的 RDB 实时采集插件的个性、采集逻辑及其原理,帮忙大家更好地了解 ChunJun 与实时同步。
如何应用 ChunJun 实时同步
为了让⼤家能更深⼊理解如何使⽤ ChunJun 做实时同步,咱们假如有这样⼀个场景:⼀个电商⽹站心愿将其订单数据从 MySQL 数据库实时同步到 HBase 数据库,以便于后续的数据分析和解决。
在这个场景中,咱们将使⽤ Kafka 作为两头音讯队列,以实现 MySQL 和 HBase 之间的数据同步。这样做的益处是 MySQL 表中变更能够实时同步到 HBase 后果表中,⽽不⽤担⼼历史数据被批改后 HBase 表未被同步。
如果在⼤家的理论利用场景中,不关⼼历史数据是否变更(或者历史数据基本不会变更),且业务表有⼀个递增的主键,那么能够参考本⽂之后的 JDBC-Polling 模式⼀节的内容。
· 数据源组件的部署以及 ChunJun 的部署这⾥不做详细描述
· 案例中的脚本均以 SQL 脚本为例,JSON 脚本也能实现雷同性能,但在参数名上可能存在出⼊,使⽤ JSON 的同学能够参考上文「ChunJun 连接器」⽂档中的参数介绍
采集 MySQL 数据到 Kafka
● 数据筹备
⾸先,咱们在 Kafka 中创立⼀个名为 order_dml 的 topic,而后在 MySQL 中创立⼀个订单表,并插⼊⼀些测试数据。创立表的 SQL 语句如下:
-- 创立⼀个名为 ecommerce_db 的数据库,⽤于存储电商⽹站的数据
CREATE DATABASE IF NOT EXISTS ecommerce_db;
USE ecommerce_db;
-- 创立⼀个名为 orders 的表,⽤于存储订单信息
CREATE TABLE IF NOT EXISTS orders (
id INT AUTO_INCREMENT PRIMARY KEY, -- ⾃增主键
order_id VARCHAR(50) NOT NULL, -- 订单编号,不能为空
user_id INT NOT NULL, -- ⽤户 ID,不能为空
product_id INT NOT NULL, -- 产品 ID,不能为空
quantity INT NOT NULL, -- 订购数量,不能为空
order_date TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP -- 订单⽇期,默认值为以后工夫
戳,不能为空
);
-- 插⼊⼀些测试数据到 orders 表
INSERT INTO orders (order_id, user_id, product_id, quantity)
VALUES ('ORD123', 1, 101, 2),
('ORD124', 2, 102, 1),
('ORD125', 3, 103, 3),
('ORD126', 1, 104, 1),
('ORD127', 2, 105, 5);
● 应用 Binlog 插件采集数据到 Kafka
为了示意数据的变动类型和更好地解决数据变动,实时采集插件个别会用 RowData(Flink 外部数据结构)中的 RowKind 记录⽇志中的数据事件(insert、delete 等)类型,binlog 插件也⼀样。而当数据被打到 Kafka 中时,RowKind 信息应该怎么解决呢?
这⾥咱们就须要⽤到 upsert-kafka-x,upsert-kafka-x 会辨认 RowKind。对各类工夫的解决逻辑如下:
• insert 数据:序列化后间接打⼊
• delete 数据:只写 key,value 置为 null
• update 数据:分为⼀条 delete 数据和 insert 数据处理,即先依据主键删除本来的数据,再写⼊ update 后的数据
在下⼀步中咱们再解释如何将 Kafka 中的数据还原到 HBase 或者其余⽀持 upsert 语义的数据库中,接下来咱们来编写 SQL 脚本,实现 MySQL 数据实时采集到 Kafka 中的性能,示例如下:
CREATE TABLE binlog_source (
id int,
order_id STRING,
user_id INT,
product_id int,
quantity int,
order_date TIMESTAMP(3)
) WITH (
'connector' = 'binlog-x',
'username' = 'root',
'password' = 'root',
'cat' = 'insert,delete,update',
'url' = 'jdbc:mysql://localhost:3306/ecommerce_db?useSSL=false',
'host' = 'localhost',
'port' = '3306',
'table' = 'ecommerce_db.orders',
'timestamp-format.standard' = 'SQL',
'scan.parallelism' = '1'
);
CREATE TABLE kafka_sink (
id int,
order_id STRING,
user_id INT,
product_id int,
quantity int,
order_date TIMESTAMP(3),PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka-x',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'value.format' = 'json',
'value.fields-include' = 'ALL',
'sink.parallelism' = '1'
);
insert into
kafka_sink
select
*
from
binlog_source u;
还原 Kafka 中的数据到 HBase
上述步骤中,咱们通过 binlog-x 和 upsert-kafka-x,将 MySQL 中的数据实时采集到了 Kafka 中。解铃还须系铃⼈,咱们能够通过 upsert-kafka-x 再去将 Kafka 中的数据解析成带有 upsert 语义的数据。
upsert-kafka-x 作为 source 插件时,会判断 Kafka 中数据的 value 是否为 null,如果 value 为 null 则标记这条数据的 RowKind 为 DELETE,否则将数据的 ROWKIND 标记为 INSERT。
ChunJun 的 hbase-x 插件⽬前曾经具备了 upsert 语句的能⼒,使⽤ hbase-x 即可将 Kafka 中的数据还原到 hbase 中。接下来是 SQL 脚本示例,为了⽅便在 HBase 中查看数据后果,咱们将 int 数据 cast 为 string 类型:
CREATE TABLE kafka_source (
id int,
order_id STRING,
user_id INT,
product_id INT,
quantity INT,
order_date TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka-x',
'topic' = 'orders',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'test_group',
'key.format' = 'json',
'value.format' = 'json',
'scan.parallelism' = '1'
);
CREATE TABLE hbase_sink(
rowkey STRING, order_info ROW < order_id STRING,
user_id STRING,
product_id STRING,
quantity STRING,
order_date STRING >,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH(
-- 这⾥以 hbase14 为例,如果 hbase 版本是 2.x,咱们能够使⽤ hbase2- x 插件代替
'connector' = 'hbase14-x',
'zookeeper.quorum' = 'localhost:2181',
'zookeeper.znode.parent' = '/hbase',
'table-name' = 'ecommerce_db:orders',
'sink.parallelism' = '1'
);
INSERT INTO
hbase_sink
SELECT
cast(id as STRING),
ROW(cast(order_id as STRING),
cast(user_id as STRING),
cast(product_id as STRING),
cast(quantity as STRING),
cast(order_date as STRING)
)
FROM
kafka_source
Tips:如果咱们不须要 Kafka 中间件,也能够使⽤ binlog-x 插件间接对接 hbase-x 插件。
ChunJun 反对的 RDB 实时采集插件
本节次要介绍 ChunJun 的 RDB 实时采集插件的个性、采集逻辑及其原理。
ChunJun 的 RDB 实时采集能够实时监督数据库中的更改,并在发⽣更改时读取数据变动,例如插⼊、更新和删除操作。使⽤ ChunJun 实时采集,咱们能够实时获取无关数据库中更改的信息,从⽽可能及时响应这些更改,如此便能够帮忙咱们更好地治理和利⽤ RDB 数据库中的数据。
并且 ChunJun 提供了故障复原和断点续传性能来确保数据的完整性。ChunJun 实时采集类插件的⼤致实现步骤如下:
· 连贯数据库,确认读取点位,读取点位能够了解为⼀个 offset,如 Binlog 中,指⽇志的⽂件名和⽂件的 position 信息
· 依据读取点位开始读取 redolog,获取其中对于数据变更相干的操作记录
· 依据 tableName、操作事件(如 insert、delete、update)等过滤信息过滤出须要的 log ⽇志
· 解析 log ⽇志,解析后的事件信息包含表名、数据库名、操作类型(插⼊、更新或删除)和变更的数据⾏等
· 将解析进去的数据会加⼯为 ChunJun 外部统⼀的 DdlRowData 供上游使⽤
ChunJun ⽬前已⽀持的实时采集 Connector 有:binlog(mysql)、oceanbasecdc、oraclelogminer、sqlservercdc。
Binlog 简介
ChunJun binlog 插件的次要性能是读取 MySQL 的⼆进制⽇志(binlog)⽂件。这些⽂件记录了所有对数据的更改操作,如插⼊、更新和删除等。⽬前,该插件依赖 Canal 组件来读取 MySQL 的 binlog ⽂件。
核⼼操作步骤如下:
• 确认读取点位:在 binlog 插件中,咱们能够在脚本的 start 字段中间接指定 journal-name(binlog ⽂件名)和 position(⽂件的特定地位)
• 读取 binlog:binlog 插件将⾃身伪装成 MySQL 的 Slave 节点,向 MySQL Master 发送申请,要求将 binlog ⽂件的数据流发送给它
• 故障复原和断点续传:故障时,插件会记录以后的 binlog 地位信息,从 checkpoint/savepoint 复原后,咱们能够从上次记录的地位持续读取 binlog ⽂件,确保数据变动的完整性
使⽤ binlog 所需的权限在「binlog 插件使⽤⽂档」中有具体阐明,链接如下:
https://sourl.cn/mvae9m
OracleLogminer 简介
Logminer 插件借助 Oracle 提供的 Logminer ⼯具通过读取视图的⽅式获取 Oracle redolog 中的信息。
核⼼操作步骤如下:
01 定位需读取起始点位(start_scn)
⽬前 logminer ⽀持四种策略指定 StartScn:
· all:从 Oracle 数据库中最早的归档⽇志组开始采集 (不倡议使⽤)
· current:工作运⾏时的 SCN 号
· time:指定工夫点对应的 SCN 号
· scn:间接指定 SCN 号
02 定位须要读取的完结点位 (end_scn)
插件依据 start_scn 和 maxLogFileSize(默认 5G)获取可加载的 redolog ⽂件列表,end_scn 取这个⽂件列表中最⼤的 scn 值。
03 加载 redo ⽇志到 Logminer
通过⼀个存储过程,将 scn 区间范畴内的 redolog 加载到 Logminer ⾥。
04 从视图中读取数据
以 scn > ? 作为 where 条件间接查问 v$logmnr_contents 视图内的信息即可获取 redolog 中的数据。
05 反复 1 - 4 步骤,实现一直的读取
如题目。
06 故障复原和断点续传
在发⽣故障时,插件会保留以后生产的 scn 号,重启时从上次的 scn 号开始读取,确保数据完整性。
• 对于该插件原理的具体介绍请参⻅「Oracle Logminer 实现原理阐明⽂档」:
https://sourl.cn/6vqz4b
• 使⽤ lominer 插件的前提条件详⻅「Oracle 配置 LogMiner」:
https://sourl.cn/eteyZY
SqlServerCDC 简介
SqlServerCDC 插件依赖 SQL Server 的 CDC Agent 服务提供的视图获取 redolog 中的信息。
核⼼操作步骤如下:
01 定位需读取起始点位(from_lsn)
⽬前 SqlserverCDC 仅⽀持间接配置 lsn 号,如果 lsn 号未配置,则取数据库中以后最⼤的 lsn 号为 from_lsn。
02 定位须要读取的完结点位 (to_lsn)
SqlserverCDC 插件定期地(可通过 pollInterval 参数指定)获取数据库中的最⼤ lsn 为 end_lsn。
03 从视图中读取数据
查问 Agent 服务提供的视图中 lsn 区间范畴内的数据,过滤出须要监听的表及事件类型。
04 反复 1 - 3 步骤,实现一直的读取
如题目。
05 故障复原和断点续传
在发⽣故障时,插件会保留以后生产的 lsn 号。重启时从上次的 lsn 号开始读取,确保数据完整性。
• 对于该插件原理的具体介绍请参⻅「Sqlserver CDC 实现原理阐明⽂档」:
https://sourl.cn/5pQvEM
• 配置 SqlServer CDC Agent 服务详⻅「Sqlserver 配置 CDC ⽂档」:
https://sourl.cn/h5nd8j
OceanBaseCDC 简介
OceanBase 是蚂蚁团体开源的⼀款分布式关系型数据库,它使⽤⼆进制⽇志(binlog)记录数据变更。OceanBaseCDC 的实现依赖于 OceanBase 提供的 LogProxy 服务,LogProxy 提供了基于公布 - 订阅模型的服务,容许使⽤ OceanBase 的 logclient 订阅特定的 binlog 数据流。
OceanBaseCDC 启动⼀个 Listener 线程。当 logclient 连贯到 LogProxy 后,Listener 会订阅通过数据过滤的 binlog,而后将其增加到外部保护的列表中。当收到 COMMIT 信息后,Listener 会将⽇志变更信息传递给⼀个阻塞队列,由主线程生产并将其转换为 ChunJun 外部的 DdlRowData,最终发送到上游。
JDBC-Polling 模式读
JDBC 插件的 polling 读取模式是基于 SQL 语句做数据读取的,绝对于基于重做⽇志的实时采集老本更低,但 jdbc 插件做实时同步对业务场景有更⾼的要求:
· 有⼀个数值类型或者工夫类型的递增主键
· 不更新历史数据或者不关⼼历史数据是否更新,仅关⼼新数据的获取
实现原理简介
• 设置递增的业务主键作为 polling 模式依赖的增量键
• 在增量读取的过程中,实时记录 increColumn 对应的值(state),作为下⼀次数据读取的起始点位
• 当⼀批数据读取完后,距离⼀段时间之后根据 state 读取下⼀批数据
polling 依赖局部增量同步的逻辑,对于增量同步的更多介绍能够点击:
https://sourl.cn/UC8n6K
如何配置⼀个 jdbc-polling 作业
先介绍⼀下开启 polling 模式须要关注的配置项:
以 MySQL 为例,假如咱们有⼀个存储订单信息的历史表,且订单的 order_id 是递增的,咱们心愿定期地获取这张表的新增数据。
CREATE TABLE order.realtime_order_archive (
order_id INT PRIMARY KEY COMMENT '订单唯⼀标识',
customer_id INT COMMENT '客户唯⼀标识',
product_id INT COMMENT '产品唯⼀标识',
order_date TIMESTAMP COMMENT '订单⽇期和工夫',
payment_method VARCHAR(255) COMMENT '⽀付⽅式(信⽤卡、⽀付宝、微信⽀付等)',
shipping_method VARCHAR(255) COMMENT '配送⽅式(顺丰速运、圆通速递等)',
shipping_address VARCHAR(255) COMMENT '配送地址',
order_total DECIMAL(10,2) COMMENT '订单总⾦额',
discount DECIMAL(10,2) COMMENT '折扣⾦额',
order_status VARCHAR(255) COMMENT '订单状态(已实现、已勾销等)'
);
咱们能够这样配置 json 脚本的 reader 信息。
"name": "mysqlreader",
"parameter": {
"column" : ["*" // 这⾥假如咱们读取所有字段,能够填写‘*’],
"increColumn": "id",
"polling": true,
"pollingInterval": 3000,
"username": "username",
"password": "password",
"connection": [
{
"jdbcUrl": ["jdbc:mysql://ip:3306/liuliu?useSSL=false"],
"schema":"order",
"table": ["realtime_order_archive"]
}
]
}
}
《数栈产品白皮书》:https://fs80.cn/cw0iw1
《数据治理行业实际白皮书》下载地址:https://fs80.cn/380a4b
想理解或征询更多无关袋鼠云大数据产品、行业解决方案、客户案例的敌人,浏览袋鼠云官网:https://www.dtstack.com/?src=szsf
同时,欢送对大数据开源我的项目有趣味的同学退出「袋鼠云开源框架钉钉技术 qun」,交换最新开源技术信息,qun 号码:30537511,我的项目地址:https://github.com/DTStack