共计 11306 个字符,预计需要花费 29 分钟才能阅读完成。
简介: 在大数据时代,存在大量基于数据的业务。数据须要在不同的零碎之间流动、整合。通常,外围业务零碎的数据存在 OLTP 数据库系统中,其它业务零碎须要获取 OLTP 零碎中的数据。传统的数仓通过批量数据同步的形式,定期从 OLTP 零碎中抽取数据。
背景
在大数据时代,存在大量基于数据的业务。数据须要在不同的零碎之间流动、整合。通常,外围业务零碎的数据存在 OLTP 数据库系统中,其它业务零碎须要获取 OLTP 零碎中的数据。传统的数仓通过批量数据同步的形式,定期从 OLTP 零碎中抽取数据。然而随着业务需要的降级,批量同步无论从实时性,还是对在线 OLTP 零碎的抽取压力,都无奈满足要求。须要实时从 OLTP 零碎中获取数据变更,实时同步到上游业务零碎。
本文基于 Oracle OGG,介绍一种将 Oracle 数据库的数据实时同步到 Kafka 音讯队列的办法。
Kafka 是一种高效的音讯队列实现,通过订阅 kafka 的音讯队列,上游零碎能够实时获取在线 Oracle 零碎的数据变更状况,实现业务零碎。
环境介绍
组件版本
整体架构图
名词解释
1.OGG Manager
OGG Manager 用于配置和治理其它 OGG 组件,配置数据抽取、数据推送、数据复制,启动和进行相干组件,查看相干组件的运行状况。
2. 数据抽取(Extract)
抽取源端数据库的变更(DML, DDL)。数据抽取次要分如下几种类型:
本地抽取
从本地数据库捕捉增量变更数据,写入到本地 Trail 文件
数据推送(Data Pump)
从本地 Trail 文件读取数据,推送到指标端。
初始数据抽取
从数据库表中导出全量数据,用于首次数据加载
3. 数据推送(Data Pump)
Data Pump 是一种非凡的数据抽取(Extract)类型,从本地 Trail 文件中读取数据,并通过网络将数据发送到指标端 OGG
4.Trail 文件
数据抽取从源端数据库抓取到的事物变更信息会写入到 Trail 文件。
5. 数据接管(Collector)
数据接管程序运行在指标端机器,用于接管 Data Pump 发送过去的 Trail 日志,并将数据写入到本地 Trail 文件。
6. 数据复制(Replicat)
数据复制运行在指标端机器,从 Trail 文件读取数据变更,并将变更数据利用到指标端数据存储系统。本案例中,数据复制将数据推送到 kafka 音讯队列。
7. 检查点(Checkpoint)
检查点用于记录数据库事物变更。
操作步骤
源端 Oracle 配置
1. 查看归档
应用 OGG,须要在源端开启归档日志
SQL> archive log list;
Database log mode Archive Mode
Automatic archival Enabled
Archive destination /u01/app/oracle/product/12.2.0/db_1/dbs/arch
Oldest online log sequence 2576
Next log sequence to archive 2577
Current log sequence 2577
2. 查看数据库配置
SQL> select force_logging, supplemental_log_data_min from v$database;
FORCE_LOGG SUPPLEMENTAL_LOG_DATA_MI
---------- ------------------------
YES YES
如果没有开启辅助日志,须要开启:
SQL> alter database force logging;
SQL> alter database add supplemental log data;
3. 开启 goldengate 复制参数
SQL> alter system set enable_goldengate_replication = true;
4. 创立源端 Oracle 账号
SQL> create tablespace tbs_ogg datafile '/oradata/dtstack/tbs_ogg.dbf' size 1024M autoextend on;
SQL> create user ggsadmin identified by oracle default tablespace tbs_ogg;
SQL> grant dba to ggsadmin;
5. 创立测试表
SQL> create table baiyang.ora_to_kfk as select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id < 500;
SQL> alter table baiyang.ora_to_kfk add constraint pk_kfk_obj primary key(object_id);
SQL> select count(*) from baiyang.ora_to_kfk;
COUNT(*)
----------
436
源端 OGG 配置
1. 查看源端 OGG 环境
cd /oradata/oggorcl/ogg
./ggsci
GGSCI (dtproxy) 1> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER STOPPED
2. 创立相干文件夹
GGSCI (dtproxy) 2> create subdirs
Creating subdirectories under current directory /oradata/oggorcl/ogg
Parameter file /oradata/oggorcl/ogg/dirprm: created.
Report file /oradata/oggorcl/ogg/dirrpt: created.
Checkpoint file /oradata/oggorcl/ogg/dirchk: created.
Process status files /oradata/oggorcl/ogg/dirpcs: created.
SQL script files /oradata/oggorcl/ogg/dirsql: created.
Database definitions files /oradata/oggorcl/ogg/dirdef: created.
Extract data files /oradata/oggorcl/ogg/dirdat: created.
Temporary files /oradata/oggorcl/ogg/dirtmp: created.
Credential store files /oradata/oggorcl/ogg/dircrd: created.
Masterkey wallet files /oradata/oggorcl/ogg/dirwlt: created.
Dump files /oradata/oggorcl/ogg/dirdmp: created
3. 配置源端 Manager
GGSCI (dtproxy) 4> dblogin userid ggsadmin password oracle
Successfully logged into database.
GGSCI (dtproxy as ggsadmin@dtstack) 5> edit param ./globals
增加
oggschema ggsadmin
GGSCI (dtproxy as ggsadmin@dtstack) 6> edit param mgr
增加
PORT 7810 -- 默认监听端口
DYNAMICPORTLIST 7811-7820 -- 动静端口列表
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 -- 过程有问题,每 3 分钟重启一次,一共重启五次
PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7 --*/
LAGREPORTHOURS 1 -- 每隔一小时查看一次传输提早状况
LAGINFOMINUTES 30 -- 传输延时超过 30 分钟将写入谬误日志
LAGCRITICALMINUTES 45 -- 传输延时超过 45 分钟将写入正告日志
PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7 -- 定期清理 trail 文件
ACCESSRULE, PROG *, IPADDR 172.*.*.*, ALLOW -- 设定 172 网段可连贯
增加同步的表
GGSCI (dtproxy as ggsadmin@dtstack) 9> add trandata baiyang.ora_to_kfk
Oracle Goldengate marked following column as key columns on table BAIYANG.ORA_TO_KFK: OBJECT_ID.
GGSCI (dtproxy as ggsadmin@dtstack) 10> info trandata baiyang.ora_to_kfk
Prepared CSN for table BAIYANG.ORA_TO_KFK: 192881239
指标端 OGG 配置
1. 指标端查看环境
GGSCI (172-16-101-242) 1> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER STOPPED
2. 创立目录
GGSCI (172-16-101-242) 2> create subdirs
Creating subdirectories under current directory /app/ogg
Parameter file /app/ogg/dirprm: created.
Report file /app/ogg/dirrpt: created.
Checkpoint file /app/ogg/dirchk: created.
Process status files /app/ogg/dirpcs: created.
SQL script files /app/ogg/dirsql: created.
Database definitions files /app/ogg/dirdef: created.
Extract data files /app/ogg/dirdat: created.
Temporary files /app/ogg/dirtmp: created.
Credential store files /app/ogg/dircrd: created.
Masterkey wallet files /app/ogg/dirwlt: created.
Dump files /app/ogg/dirdmp: created.
3. 指标端 Manager 配置
GGSCI (172-16-101-242) 3> edit params mgr
增加
PORT 7810
DYNAMICPORTLIST 7811-7820
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
GGSCI (172-16-101-242) 4> edit param ./GLOBALS
CHECKPOINTTABLE ggsadmin.checkpoint
全量数据同步
1. 配置源端数据初始化
配置源端初始化过程
GGSCI (dtproxy as ggsadmin@dtstack) 15> add extract initkfk,sourceistable
配置源端初始化参数
GGSCI (dtproxy as ggsadmin@dtstack) 16> edit params initkfk
增加
EXTRACT initkfk
SETENV (NLS_LANG=AMERICAN_AMERICA.AL32UTF8)
USERID ggsadmin,PASSWORD oracle
RMTHOST 172.16.101.242, MGRPORT 7810
RMTFILE ./dirdat/ekfk,maxfiles 999, megabytes 500
table baiyang.ora_to_kfk;
2. 源端生成表构造 define 文件
GGSCI (dtproxy as ggsadmin@dtstack) 17> edit param define_kfk
增加
defsfile /oradata/oggorcl/ogg/dirdef/define_kfk.txt
userid ggsadmin,password oracle
table baiyang.ora_to_kfk;
执行
$./defgen paramfile dirprm/define_kfk.prm
-- Definitions generated for 1 table in /oradata/oggorcl/ogg/dirdef/define_kfk.txt
将此文件传输到指标段 dirdef 文件夹
scp /oradata/oggorcl/ogg/dirdef/define_kfk.txt 172.16.101.242:/app/ogg/dirdef/define_kfk.txt
3. 配置指标端数据初始化过程
配置指标端初始化过程
GGSCI (172-16-101-242) 3> ADD replicat initkfk,specialrun
GGSCI (172-16-101-242) 6> edit params initkfk
增加
SPECIALRUN
end runtime
setenv(NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
targetdb libfile libggjava.so set property=./dirprm/kafka.props
SOURCEDEFS ./dirdef/define_kfk.txt
EXTFILE ./dirdat/ekfk000000
reportcount every 1 minutes, rate
grouptransops 10000
map baiyang.ora_to_kfk,target baiyang.ora_to_kfk;
4. 配置 kafka 相干参数 vi ./dirprm/kafka.props
增加
gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=test_ogg
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.mode=op
gg.classpath=dirprm/:/data/kafka_2.12-2.2.0/libs/*:/app/ogg/:/app/ogg/lib/* --*/
vi custom_kafka_producer.properties
增加
bootstrap.servers=172.16.101.242:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000
5. 源端开启全量数据抽取
源端
GGSCI (dtproxy) 20> start mgr
GGSCI (dtproxy) 21> start initkfk
6. 指标端全量数据利用
GGSCI (172-16-101-242) 13> start mgr
./replicat paramfile ./dirprm/initkfk.prm reportfile ./dirrpt/init01.rpt -p INITIALDATALOAD
7.kafka 数据验证
应用 kafka 客户端工具查看 topic 的数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_ogg --from-beginning
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:55.946000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"C_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":2,"DATA_OBJECT_ID":2,"OBJECT_TYPE":"CLUSTER"}}
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 20:23:19.703779","current_ts":"2019-11-11T20:48:56.289000","pos":"-0000000000000000001","after":{"OWNER":"SYS","OBJECT_NAME":"I_OBJ#","SUBOBJECT_NAME":null,"OBJECT_ID":3,"DATA_OBJECT_ID":3,"OBJECT_TYPE":"INDEX"}}
全量数据曾经同步到指标 kafka topic
增量数据同步
1. 源端抽取过程配置
GGSCI (dtproxy) 9> edit param extkfk
增加
dynamicresolution
SETENV (ORACLE_SID = "dtstack")
SETENV (NLS_LANG = "american_america.AL32UTF8")
userid ggsadmin,password oracle
exttrail ./dirdat/to
table baiyang.ora_to_kfk;
增加 extract 过程
GGSCI (dtproxy) 10> add extract extkfk,tranlog,begin now
增加 trail 文件的定义与 extract 过程绑定
GGSCI (dtproxy) 11> add exttrail ./dirdat/to,extract extkfk
2. 源端数据推送过程配置
配置源端推送过程
GGSCI (dtproxy) 12> edit param pupkfk
增加
extract pupkfk
passthru
dynamicresolution
userid ggsadmin,password oracle
rmthost 172.16.101.242 mgrport 7810
rmttrail ./dirdat/to
table baiyang.ora_to_kfk;
增加 extract 过程
GGSCI (dtproxy) 13> add extract pupkfk,exttrailsource /oradata/oggorcl/ogg/dirdat/to
增加 trail 文件的定义与 extract 过程绑定
GGSCI (dtproxy) 14> add rmttrail ./dirdat/to,extract pupkfk
3. 配置指标端复原过程
配置指标端复原过程
edit param repkfk
增加
REPLICAT repkfk
SOURCEDEFS ./dirdef/define_kfk.txt
targetdb libfile libggjava.so set property=./dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP baiyang.ora_to_kfk, TARGET baiyang.ora_to_kfk;
增加 trail 文件到 replicate 过程
add replicat repkfk exttrail ./dirdat/to,checkpointtable ggsadmin.checkpoint
4. 源端开启实时数据抓取
./ggsci
GGSCI (dtproxy) 5> start extkfk
Sending START request to MANAGER ...
EXTRACT EXTKFK starting
GGSCI (dtproxy) 6> start pupkfk
Sending START request to MANAGER ...
EXTRACT PUPKFK starting
GGSCI (dtproxy) 7> status all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING EXTKFK 00:00:00 00:00:10
EXTRACT RUNNING PUPKFK 00:00:00 00:00:00
5. 指标端开启实时数据同步
./ggsci
GGSCI (172-16-101-242) 7> start replicat repkfk
Sending START request to MANAGER ...
REPLICAT REPKFK starting
GGSCI (172-16-101-242) 8> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT RUNNING REPKFK 00:00:00 00:00:00
6. 测试增量数据同步
Oracle 插入增量数据
SQL> insert into baiyang.ora_to_kfk select OWNER, OBJECT_NAME, SUBOBJECT_NAME, OBJECT_ID, DATA_OBJECT_ID, OBJECT_TYPE from all_objects where object_id >500 and object_id < 1000;
SQL> commit;
SQL> select count(*) from baiyang.ora_to_kfk;
COUNT(*)
----------
905
查看 Kafka 音讯队列生产数据
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042000","pos":"00000000000000075298","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS","SUBOBJECT_NAME":null,"OBJECT_ID":998,"DATA_OBJECT_ID":998,"OBJECT_TYPE":"TABLE"}}
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:04:11.158786","current_ts":"2019-11-11T21:10:54.042001","pos":"00000000000000075459","after":{"OWNER":"SYS","OBJECT_NAME":"APPLY$_READER_STATS_I","SUBOBJECT_NAME":null,"OBJECT_ID":999,"DATA_OBJECT_ID":999,"OBJECT_TYPE":"INDEX"}}
源端 Oracle 删除数据
SQL> delete from baiyang.ora_to_kfk ;
906 rows deleted.
SQL> commit;
查看 kafka 音讯队列生产数据
{"table":"BAIYANG.ORA_TO_KFK","op_type":"D","op_ts":"2019-11-11 21:13:11.166184","current_ts":"2019-11-11T21:13:17.449007","pos":"00000000000000216645","before":{"OWNER":"x1","OBJECT_NAME":"SSSSS","SUBOBJECT_NAME":"z1","OBJECT_ID":111000,"DATA_OBJECT_ID":2000,"OBJECT_TYPE":"x1"}}
源端插入数据
SQL> insert into baiyang.ora_to_kfk values('汉字', 'y1', 'z1', 111000,2000,'x1');
1 row created.
SQL> commit;
查看 kafka 音讯队列生产数据
{"table":"BAIYANG.ORA_TO_KFK","op_type":"I","op_ts":"2019-11-11 21:14:21.167454","current_ts":"2019-11-11T21:14:26.497000","pos":"00000000000000216794","after":{"OWNER":"汉字","OBJECT_NAME":"y1","SUBOBJECT_NAME":"z1","OBJECT_ID":111000,"DATA_OBJECT_ID":2000,"OBJECT_TYPE":"x1"}}
总结
应用 OGG 能够不便地将 Oracle 的数据变更状况实时同步到 Kafka 音讯队列。上游业务零碎通过订阅 kafka 的音讯队列,能不便地实现各类实时数据的利用。
更多数据库问题请理解云掣运维中台