乐趣区

关于数据库:云MSP技本功基于OGG-实现Oracle到Kafka增量数据实时同步

简介: 在大数据时代,存在大量基于数据的业务。数据须要在不同的零碎之间流动、整合。通常,外围业务零碎的数据存在 OLTP 数据库系统中,其它业务零碎须要获取 OLTP 零碎中的数据。传统的数仓通过批量数据同步的形式,定期从 OLTP 零碎中抽取数据。

背景

在大数据时代,存在大量基于数据的业务。数据须要在不同的零碎之间流动、整合。通常,外围业务零碎的数据存在 OLTP 数据库系统中,其它业务零碎须要获取 OLTP 零碎中的数据。传统的数仓通过批量数据同步的形式,定期从 OLTP 零碎中抽取数据。然而随着业务需要的降级,批量同步无论从实时性,还是对在线 OLTP 零碎的抽取压力,都无奈满足要求。须要实时从 OLTP 零碎中获取数据变更,实时同步到上游业务零碎。

本文基于 Oracle OGG,介绍一种将 Oracle 数据库的数据实时同步到 Kafka 音讯队列的办法。

Kafka 是一种高效的音讯队列实现,通过订阅 kafka 的音讯队列,上游零碎能够实时获取在线 Oracle 零碎的数据变更状况,实现业务零碎。

环境介绍

组件版本


整体架构图

名词解释

1.OGG Manager

OGG Manager 用于配置和治理其它 OGG 组件,配置数据抽取、数据推送、数据复制,启动和进行相干组件,查看相干组件的运行状况。

2. 数据抽取(Extract)

抽取源端数据库的变更(DML, DDL)。数据抽取次要分如下几种类型:

本地抽取
从本地数据库捕捉增量变更数据,写入到本地 Trail 文件

数据推送(Data Pump)
从本地 Trail 文件读取数据,推送到指标端。

初始数据抽取
从数据库表中导出全量数据,用于首次数据加载

3. 数据推送(Data Pump)

Data Pump 是一种非凡的数据抽取(Extract)类型,从本地 Trail 文件中读取数据,并通过网络将数据发送到指标端 OGG

4.Trail 文件

数据抽取从源端数据库抓取到的事物变更信息会写入到 Trail 文件。

5. 数据接管(Collector)

数据接管程序运行在指标端机器,用于接管 Data Pump 发送过去的 Trail 日志,并将数据写入到本地 Trail 文件。

6. 数据复制(Replicat)

数据复制运行在指标端机器,从 Trail 文件读取数据变更,并将变更数据利用到指标端数据存储系统。本案例中,数据复制将数据推送到 kafka 音讯队列。

7. 检查点(Checkpoint)

检查点用于记录数据库事物变更。

操作步骤

源端 Oracle 配置

1. 查看归档

应用 OGG,须要在源端开启归档日志

SQL> archive log list;
 
    Database log mode              Archive Mode
 
    Automatic archival             Enabled
 
    Archive destination            /u01/app/oracle/product/12.2.0/db_1/dbs/arch
 
    Oldest online log sequence     2576
 
    Next log sequence to archive   2577
 
    Current log sequence           2577

2. 查看数据库配置

SQL> select force_logging, supplemental_log_data_min from v$database;
 
FORCE_LOGG SUPPLEMENTAL_LOG_DATA_MI
 
---------- ------------------------
 
YES        YES

如果没有开启辅助日志,须要开启:

SQL> alter database force logging;
 
SQL> alter database add supplemental log data;

3. 开启 goldengate 复制参数

SQL> alter system set enable_goldengate_replication = true;

4. 创立源端 Oracle 账号

SQL> create tablespace tbs_ogg datafile '/oradata/dtstack/tbs_ogg.dbf' size 1024M autoextend on;
 
SQL> create user ggsadmin identified by oracle default tablespace tbs_ogg;
 
SQL> grant dba to ggsadmin;

5. 创立测试表   

SQL> create table baiyang.ora_to_kfk as select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id < 500;
 
SQL> alter table baiyang.ora_to_kfk add constraint pk_kfk_obj primary key(object_id);
 
SQL> select count(*) from baiyang.ora_to_kfk;
    COUNT(*)
 
----------
 
        436

 源端 OGG 配置

1. 查看源端 OGG 环境

cd /oradata/oggorcl/ogg
 
./ggsci
 
GGSCI (dtproxy) 1> info all
 
Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     STOPPED

2. 创立相干文件夹    

GGSCI (dtproxy) 2> create subdirs
 
    Creating subdirectories under current directory /oradata/oggorcl/ogg
 
   
    Parameter file                 /oradata/oggorcl/ogg/dirprm: created.
 
    Report file                    /oradata/oggorcl/ogg/dirrpt: created.
 
    Checkpoint file                /oradata/oggorcl/ogg/dirchk: created.
 
    Process status files           /oradata/oggorcl/ogg/dirpcs: created.
 
    SQL script files               /oradata/oggorcl/ogg/dirsql: created.
 
    Database definitions files     /oradata/oggorcl/ogg/dirdef: created.
 
    Extract data files             /oradata/oggorcl/ogg/dirdat: created.
 
    Temporary files                /oradata/oggorcl/ogg/dirtmp: created.
 
    Credential store files         /oradata/oggorcl/ogg/dircrd: created.
 
    Masterkey wallet files         /oradata/oggorcl/ogg/dirwlt: created.
 
    Dump files                     /oradata/oggorcl/ogg/dirdmp: created

3. 配置源端 Manager

GGSCI (dtproxy) 4> dblogin userid ggsadmin password oracle
 
    Successfully logged into database.
 
GGSCI (dtproxy as ggsadmin@dtstack) 5> edit param ./globals

增加

oggschema ggsadmin
 
GGSCI (dtproxy as ggsadmin@dtstack) 6> edit param mgr

增加

PORT 7810 -- 默认监听端口
 
DYNAMICPORTLIST  7811-7820 -- 动静端口列表
 
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 -- 过程有问题,每 3 分钟重启一次,一共重启五次
 
PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7  --*/
 
LAGREPORTHOURS 1 -- 每隔一小时查看一次传输提早状况
 
LAGINFOMINUTES 30 -- 传输延时超过 30 分钟将写入谬误日志
 
LAGCRITICALMINUTES 45 -- 传输延时超过 45 分钟将写入正告日志
 
PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7 -- 定期清理 trail 文件
 
ACCESSRULE, PROG *, IPADDR 172.*.*.*, ALLOW -- 设定 172 网段可连贯 

增加同步的表

GGSCI (dtproxy as ggsadmin@dtstack) 9> add trandata baiyang.ora_to_kfk
 
Oracle Goldengate marked following column as key columns on table BAIYANG.ORA_TO_KFK: OBJECT_ID.
 
GGSCI (dtproxy as ggsadmin@dtstack) 10> info trandata baiyang.ora_to_kfk
 
Prepared CSN for table BAIYANG.ORA_TO_KFK: 192881239

指标端 OGG 配置
1. 指标端查看环境

GGSCI (172-16-101-242) 1> info all
 
    Program     Status      Group       Lag at Chkpt  Time Since Chkpt
 
    MANAGER     STOPPED 

 2. 创立目录    

GGSCI (172-16-101-242) 2> create subdirs
 
    Creating subdirectories under current directory /app/ogg
 
    Parameter file                 /app/ogg/dirprm: created.
 
    Report file                    /app/ogg/dirrpt: created.
 
    Checkpoint file                /app/ogg/dirchk: created.
 
    Process status files           /app/ogg/dirpcs: created.
 
    SQL script files               /app/ogg/dirsql: created.
 
    Database definitions files     /app/ogg/dirdef: created.
 
    Extract data files             /app/ogg/dirdat: created.
 
    Temporary files                /app/ogg/dirtmp: created.
 
    Credential store files         /app/ogg/dircrd: created.
 
    Masterkey wallet files         /app/ogg/dirwlt: created.
 
Dump files                     /app/ogg/dirdmp: created.

3. 指标端 Manager 配置

GGSCI (172-16-101-242) 3> edit params mgr

增加

PORT 7810
 
    DYNAMICPORTLIST 7811-7820
 
    AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
 
    PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
    
    GGSCI (172-16-101-242) 4> edit  param  ./GLOBALS
CHECKPOINTTABLE ggsadmin.checkpoint

    
全量数据同步

1. 配置源端数据初始化

 配置源端初始化过程

GGSCI (dtproxy as ggsadmin@dtstack) 15> add extract initkfk,sourceistable
 

配置源端初始化参数

GGSCI (dtproxy as ggsadmin@dtstack) 16> edit params initkfk

 增加

EXTRACT initkfk
    SETENV (NLS_LANG=AMERICAN_AMERICA.AL32UTF8)
    USERID ggsadmin,PASSWORD oracle
    RMTHOST 172.16.101.242, MGRPORT 7810
    RMTFILE ./dirdat/ekfk,maxfiles 999, megabytes 500
table baiyang.ora_to_kfk;

  
2. 源端生成表构造 define 文件

GGSCI (dtproxy as ggsadmin@dtstack) 17> edit param define_kfk

增加

defsfile /oradata/oggorcl/ogg/dirdef/define_kfk.txt
 
    userid ggsadmin,password oracle
 
    table baiyang.ora_to_kfk;

执行

$./defgen paramfile dirprm/define_kfk.prm
 
-- Definitions generated for 1 table in /oradata/oggorcl/ogg/dirdef/define_kfk.txt

将此文件传输到指标段 dirdef 文件夹

scp /oradata/oggorcl/ogg/dirdef/define_kfk.txt 172.16.101.242:/app/ogg/dirdef/define_kfk.txt

3. 配置指标端数据初始化过程

 配置指标端初始化过程

GGSCI (172-16-101-242) 3> ADD replicat initkfk,specialrun
 
GGSCI (172-16-101-242) 6> edit params initkfk

增加

SPECIALRUN
 
    end runtime
 
    setenv(NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
 
    targetdb libfile libggjava.so set property=./dirprm/kafka.props
 
    SOURCEDEFS ./dirdef/define_kfk.txt
 
    EXTFILE ./dirdat/ekfk000000
 
    reportcount every 1 minutes, rate
 
    grouptransops 10000
 
map baiyang.ora_to_kfk,target baiyang.ora_to_kfk;

    
4. 配置 kafka 相干参数
vi ./dirprm/kafka.props

增加

gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=test_ogg
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.mode=op
gg.classpath=dirprm/:/data/kafka_2.12-2.2.0/libs/*:/app/ogg/:/app/ogg/lib/*  --*/

vi custom_kafka_producer.properties

 增加

bootstrap.servers=172.16.101.242:9092
 
acks=1
 
compression.type=gzip
 
reconnect.backoff.ms=1000
 
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
 
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
 
batch.size=102400
 
linger.ms=10000

    
5. 源端开启全量数据抽取
源端

GGSCI (dtproxy) 20>  start mgr
 
GGSCI (dtproxy) 21>  start initkfk

6. 指标端全量数据利用

GGSCI (172-16-101-242) 13> start mgr
 
./replicat paramfile ./dirprm/initkfk.prm reportfile ./dirrpt/init01.rpt -p INITIALDATALOAD

7.kafka 数据验证

应用 kafka 客户端工具查看 topic 的数据

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_ogg --from-beginning
 
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:55.946000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"C_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":2,"DATA_OBJECT_ID":2,"OBJECT_TYPE":"CLUSTER"}}
 
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:56.289000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"I_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":3,"DATA_OBJECT_ID":3,"OBJECT_TYPE":"INDEX"}}

全量数据曾经同步到指标 kafka topic
增量数据同步

1. 源端抽取过程配置

GGSCI (dtproxy) 9> edit param extkfk

增加

dynamicresolution
 
SETENV (ORACLE_SID = "dtstack")
 
SETENV (NLS_LANG = "american_america.AL32UTF8")
 
userid ggsadmin,password oracle
 
exttrail ./dirdat/to
 
table baiyang.ora_to_kfk;

增加 extract 过程

GGSCI (dtproxy) 10> add extract extkfk,tranlog,begin now

增加 trail 文件的定义与 extract 过程绑定

GGSCI (dtproxy) 11> add exttrail ./dirdat/to,extract extkfk

2. 源端数据推送过程配置

配置源端推送过程

GGSCI (dtproxy) 12> edit param pupkfk

增加

extract pupkfk
 
passthru
 
dynamicresolution
 
userid ggsadmin,password oracle
 
rmthost 172.16.101.242 mgrport 7810
 
rmttrail ./dirdat/to
 
table baiyang.ora_to_kfk;

增加 extract 过程

GGSCI (dtproxy) 13>  add extract pupkfk,exttrailsource /oradata/oggorcl/ogg/dirdat/to

增加 trail 文件的定义与 extract 过程绑定

GGSCI (dtproxy) 14>  add rmttrail ./dirdat/to,extract pupkfk

3. 配置指标端复原过程

配置指标端复原过程

edit param repkfk

增加

REPLICAT repkfk
 
SOURCEDEFS ./dirdef/define_kfk.txt
 
targetdb libfile libggjava.so set property=./dirprm/kafka.props
 
REPORTCOUNT EVERY 1 MINUTES, RATE
 
GROUPTRANSOPS 10000
 
MAP baiyang.ora_to_kfk, TARGET baiyang.ora_to_kfk;

增加 trail 文件到 replicate 过程

add replicat repkfk exttrail ./dirdat/to,checkpointtable ggsadmin.checkpoint

4. 源端开启实时数据抓取

./ggsci
 
GGSCI (dtproxy) 5> start extkfk
 
Sending START request to MANAGER ...
 
EXTRACT EXTKFK starting

GGSCI (dtproxy) 6> start pupkfk
 
Sending START request to MANAGER ...
 
EXTRACT PUPKFK starting
  
GGSCI (dtproxy) 7> status all
 
Program     Status      Group       Lag at Chkpt  Time Since Chkpt
 
MANAGER     RUNNING
 
EXTRACT     RUNNING     EXTKFK      00:00:00      00:00:10
 
EXTRACT     RUNNING     PUPKFK      00:00:00      00:00:00

5. 指标端开启实时数据同步

./ggsci
 
GGSCI (172-16-101-242) 7> start replicat repkfk
 
Sending START request to MANAGER ...
 
REPLICAT REPKFK starting
  
GGSCI (172-16-101-242) 8> info all
 
Program     Status      Group       Lag at Chkpt  Time Since Chkpt
  
MANAGER     RUNNING
 
REPLICAT    RUNNING     REPKFK      00:00:00      00:00:00

6. 测试增量数据同步

Oracle 插入增量数据

SQL> insert into baiyang.ora_to_kfk  select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id >500 and  object_id < 1000;
 
SQL> commit;
 
SQL> select count(*) from baiyang.ora_to_kfk;
COUNT(*)
 
----------
 
    905

查看 Kafka 音讯队列生产数据

{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042000","pos":"00000000000000075298","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS","SUBOBJECT_NAME":null,"OBJECT_ID":998,"DATA_OBJECT_ID":998,"OBJECT_TYPE":"TABLE"}}
 
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042001","pos":"00000000000000075459","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS_I","SUBOBJECT_NAME":null,"OBJECT_ID":999,"DATA_OBJECT_ID":999,"OBJECT_TYPE":"INDEX"}}

 源端 Oracle 删除数据

SQL> delete from baiyang.ora_to_kfk ;
 
906 rows deleted.
 
SQL> commit;

查看 kafka 音讯队列生产数据

{"table":"BAIYANG.ORA_TO_KFK","op_type":"D","op_ts":"2019-11-11 21:13:11.166184","current_ts":"2019-11-11T21:13:17.449007","pos":"00000000000000216645","before":{"OWNER":"x1","OBJECT_NAME":"SSSSS","SUBOBJECT_NAME":"z1","OBJECT_ID":111000,"DATA_OBJECT_ID":2000,"OBJECT_TYPE":"x1"}}

源端插入数据 

SQL> insert into  baiyang.ora_to_kfk values('汉字', 'y1', 'z1', 111000,2000,'x1');
 
1 row created.
 
SQL> commit;

查看 kafka 音讯队列生产数据

{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:14:21.167454","current_ts":"2019-11-11T21:14:26.497000","pos":"00000000000000216794","after":{"OWNER":"汉字","OBJECT_NAME":"y1","SUBOBJECT_NAME":"z1","OBJECT_ID":111000,"DATA_OBJECT_ID":2000,"OBJECT_TYPE":"x1"}}

总结

应用 OGG 能够不便地将 Oracle 的数据变更状况实时同步到 Kafka 音讯队列。上游业务零碎通过订阅 kafka 的音讯队列,能不便地实现各类实时数据的利用。

更多数据库问题请理解云掣运维中台

退出移动版