共计 3560 个字符,预计需要花费 9 分钟才能阅读完成。
之前我们在《如何用 gpss 实现 MySQL 到 Greenplum 的增量同步》中详细介绍了 MySQL 到 Greenplum 增量同步的实现步骤。今天将给大家讲一讲 Oracle 到 Greenplum 又是如何实现的。
Oracle 数据库虽然在 OLTP 领域仍有着毋庸置疑的优势地位,但在 OLAP 领域与 Greenplum 则是差距显著。如今已经有越来越多的分析型业务从 Oracle 迁移到 Greenplum,在《如何从 Oracle 迁移到 Greenplum》系列文章中,详细介绍了业务的迁移的最佳实践;而数据迁移中最核心的就是如何实现数据的实时增量同步。
对增量同步而言,gpss 作为一个流计算框架,与源端是解耦的,因此只要 Kafka topic 中的消息,包含足够的信息,gpss 都可以提取变化的数据并重放到 gp 中。之前介绍了如何利用 gpss 同步来自 Maxwell 和 Mysql 的增量数据,这里再以 Oracle Golden Gate 为例,介绍如何实时同步来自 Oracle 的增量数据。
1 测试环境
- Oracle
- Oracle Golden Gate
- Kafka 2.2.2
- Greenplum 6.4.0
- GPSS 1.3.6
我们要完成的工作是:
- 通过 GoldenGate 将 Oracle 中的增量数据以 json 格式发送到 Kafka(略)
- 利用 gpss 解析 kafka 中的 json 消息
- 将变化的数据更新到 Greenplum 的目标表中
2 测试数据简介
测试使用的表在 Oracle 中定义如下:
CREATE TABLE SIEBEL_TEST.TEST_POC(
ID numeric,
NAME varchar2 (50),
BIRTHDAY date
)
其中 ID 列为键,用来唯一标识一条记录,NAME 和 BIRTHDAY 为更新字段。
在源端分别对这个表进行了 insert,update 和 delete 操作。
Insert 语句为:
insert into test_poc values (1, 'Igor', '01-JAN-2000');
Update 语句为:
update test_poc set birthday=add_months(birthday,1) where id <3;
Delete 语句为:
delete from test_poc where id=3;
3 Kafka 的消息格式
接下来我们对 Golden Gate 的这三种类型的消息进行简单的分析。
Insert 时生成的消息示例如下:
{
"table": "SIEBEL_TEST.TEST_POC",
"op_type": "I",
"op_ts": "2019-11-21 10:05:34.000000",
"current_ts": "2019-11-21T11:05:37.823000",
"pos": "00000000250000058833",
"tokens": {
"TK_OPTYPE": "INSERT",
"SCN": ""},"after": {"ID": 1,"NAME":"Igor","BIRTHDAY":"2000-01-01 00:00:00"}
}
Table 表示源表的表名,current_ts 表示操作发生的时间,这里我们用它做排序;op_type 和 after 表示执行的操作及对应的数据。
Delete 生成的消息如下,op_type 为 ”D”,同时 before 中包含了完整的内容。
{
"table": "SIEBEL_TEST.TEST_POC",
"op_type": "D",
"op_ts": "2019-11-21 10:13:19.000000",
"current_ts": "2019-11-21T11:13:23.060002",
"pos": "00000000250000059999",
"tokens": {
"TK_OPTYPE": "DELETE",
"SCN": ""},"before": {"ID": 3,"NAME":"Gianluca","BIRTHDAY":"2002-01-01 00:00:00"}
}
Update 除了包含新数据 (after) 外,还包含了更新之前的数据(before), op_type 类型为 ’U’。
{
"table": "SIEBEL_TEST.TEST_POC",
"op_type": "U",
"op_ts": "2019-11-21 10:13:19.000000",
"current_ts": "2019-11-21T11:13:23.060000",
"pos": "00000000250000059561",
"tokens": {
"TK_OPTYPE": "SQL COMPUPDATE",
"SCN": ""},"before": {"ID": 1,"NAME":"Igor","BIRTHDAY":"2000-01-01 00:00:00"},"after": {"ID": 1,"NAME":"Igor","BIRTHDAY":"2000-02-01 00:00:00"}
}
根据生成的消息,我们需要执行如下操作:
- 根据 id 对消息去重
- 根据 ts 对消息排序
- 对 op_type 为 D 的列执行删除操作
- 对其它 type 类型执行 Merge(upsert)操作
4 执行 gpss 的 Kafka JOB
Greenplum 中的定义包含了用于排序的字段 ts,用来区分消息更新的先后顺序,定义如下:
CREATE TABLE test_poc(
id numeric,
name varchar (50),
birthday date,
ts timestamp
);
根据数据同步的需求,gpss 需要的 yaml 配置文件如下:
DATABASE: test
USER: gpadmin
HOST: mdw
PORT: 5432
VERSION: 2
KAFKA:
INPUT:
SOURCE:
BROKERS: kafkahost:9092
TOPIC: oggpoc
VALUE:
COLUMNS:
- NAME: c1
TYPE: json
FORMAT: json
ERROR_LIMIT: 100
OUTPUT:
MODE: MERGE
MATCH_COLUMNS:
- id
UPDATE_COLUMNS:
- name
- birthday
ORDER_COLUMNS:
- ts
DELETE_CONDITION: c1->>'op_type' = 'D'
TABLE: test_poc
MAPPING:
- NAME: id
EXPRESSION: |
CASE WHEN ((c1->'after')::json is not null) THEN (c1->'after'->>'ID')::integer
ELSE (c1->'before'->>'ID')::integer end
- NAME: name
EXPRESSION: |
CASE WHEN ((c1->'after')::json is not null) THEN (c1->'after'->>'NAME')::text
ELSE null end
- NAME: birthday
EXPRESSION: |
CASE WHEN ((c1->'after')::json is not null) THEN (c1->'after'->>'BIRTHDAY')::date
ELSE null end
- NAME: ts
EXPRESSION: (c1->>'current_ts')::timestamp
COMMIT:
MINIMAL_INTERVAL: 2000
相关字段的含义和 gpss 实际执行的操作可参见参考文献[2],这里着重介绍下有区别的的地方,也就是由于 insert 和 update 操作的实际内容包含在 after 中,而 delete 的内容包含在 before 中,每个字段的内容需要额外的判断逻辑:有 after 时读取 after 中的内容,否则读取 before 中的内容。
此外需要注意的是,当新消息的 ORDER_COLUMNS 的内容有重复时,gpss 会把所有的包含重复内容的行,都记录到目标表中;这样的主要目的是为了避免数据丢失。因此在实际使用时一定要确保排序地段的唯一性。
配置文件准备好后,我们通过 gpkafka 来执行加载:
gpkafka load ogg.yaml
gpkafka 便会从 kafka 中拉取对应的消息,按照设定的操作将 Kafka 中的增量数据同步到目标表中。
5 小结
这里简单介绍了如何用 gpss 从 Kafka 消费 GoldenGate 生成的 Oracle 增量数据进行同步,其它数据库及 CDC 工具(例如 Informatica,StreamSet,NIFI 等)也都可以利用类似的方案实现同步。今后会做更多的相关介绍,欢迎大家试用,反馈,指导。
6 参考文献
- https://github.com/pdeemea/ka…
- 如何用 gpss 实现 MySQL 到 Greenplum 的增量同步
- https://gpdb.docs.pivotal.io/…