关于后端:一键实现-Oracle-数据整库同步至-Apache-Doris

43次阅读

共计 7489 个字符,预计需要花费 19 分钟才能阅读完成。

在实时数据仓库建设或迁徙的过程中,用户必须思考如何高效便捷将关系数据库数据同步到实时数仓中来,Apache Doris 用户也面临这样的挑战。而对于从 Oracle 到 Doris 的数据同步,通常会用到以下两种常见的同步形式:

OGG/XStream/LogMiner 工具: 通过该形式先将数据同步到 Kafka 中,而后通过 Routine Load 生产 Kafka 中的数据进行实时同步。这种形式的同步链路绝对较长,特地是在上游数据表较多的状况下,须要手动创立大量的 Routine Load 作业,同步流程不仅繁琐,也给用户减少了较大的应用及保护压力。

FlinkCDC: 该形式尽管能够间接将上游数据同步到 Doris 中,并在肯定水平上缩短了同步链路,理论在应用过程中还会遇到以下问题:

  • 数据同步时,须要在 Flink 中对每张表手动配置参数及字段映射,尤其是在多表或整库同步场景中,不仅带来大量配置工作量,还减少了 FlinkSQL 脚本的保护老本。
  • 数据同步时,须要当时在 Doris 中手动一一创立表,而面对数量宏大的上游表时,手动创立表不仅消耗工夫,而且工作效率很低,间接影响数据同步的效率。
  • 因为每张 Source 表都会应用同一个链接,因而在整库同步时会给源端造成很大的链接压力。

为了解决上述问题, 在新版本的 Doris-Flink-Connector  中,咱们实现了 FlinkCDC 的 Datastream API 集成,无需提前在 Doris 中创立表以及映射关系,仅仅通过简略的参数配置就能一键实现从 Oracle 等关系型数据库到 Apache Doris 的整库数据同步。

此外,Doris-Flink-Connector 也能够一键实现万表 MySQL 整库同步至 Apache Doris 中来,具体应用可参考:一键实现万表 MySQL 整库同步至 Apache Doris

同步流程 & 实战演示


在进行整库同步前,咱们先理解一下具体同步流程:

  • 在启动 Flink 工作之前,Doris-Flink-Connector  会主动读取须要同步的 Oracle 表的元数据信息,并主动在 Doris 中创立相应的表。
  • 通过 FlinkCDC 提供的 OracleSource 性能,可能从 Oracle 数据库中读取数据,并将其传递到上游进行解决。
  • 通过 Flink 的侧输入流性能,依据自定义规定将数据分流到不同的 Doris Sink 中,并同步到 Doris 中来。

通过以上简略操作,即可实现上游 Oracle 数据库的整库数据实时数据接入到 Apache Doris 中。接下来咱们通过一个理论案例来具体阐明具体的操作步骤:

01  Oracle 环境筹备

# 拉取镜像
docker pull registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g

# 启动镜像
docker run -it -d \
--privileged \
-p 1521:1521 \
--name oracle11g \
-e ORACLE_ALLOW_REMOTE=true \
-v /mnt/disk1/oracle:/data/oracle \
registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g

# 进入容器
docker exec -it oracle11g bash

Oracle 归档日志(Binlog)配置:启动归档日志时,需对日志大小和寄存地址进行设置,设置实现需进行重启。该步骤实现后才可进行后续增量数据的同步。

# 进入 SQL 命令行
[oracle@ef6d9de18e59 ~]$ sqlplus /nolog
SQL> conn /as sysdba
Connected.

SQL> alter system set db_recovery_file_dest_size = 10G;
System altered.

SQL> alter system set db_recovery_file_dest = '/home/oracle/oracle-data' scope=spfile;
System altered.

SQL> shutdown immediate;
Database closed.
Database dismounted.
ORACLE instance shut down.

SQL> startup mount;
ORACLE instance started.
Total System Global Area 1603411968 bytes
Fixed Size                  2213776 bytes
Variable Size             402655344 bytes
Database Buffers         1174405120 bytes
Redo Buffers               24137728 bytes
Database mounted.

SQL> alter database archivelog;
Database altered.

SQL> alter database open;
Database altered.
# 查看日志归档是否开启
SQL> archive log list;
Database log mode              Archive Mode
Automatic archival             Enabled
Archive destination            USE_DB_RECOVERY_FILE_DEST
Oldest online log sequence     1
Next log sequence to archive   1
Current log sequence           1

# 启用补充日志记录
SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
Database altered.

#创立用户
CREATE USER admin IDENTIFIED BY admin123;
GRANT dba TO admin;

数据筹备

[oracle@ef6d9de18e59 ~]$ sqlplus admin/admin123 
SQL> CREATE TABLE PERSONS(ID NUMBER(10),
      NAME VARCHAR2(128) NOT NULL,
      PRIMARY KEY(ID)
   );
Table created.

SQL> INSERT INTO "PERSONS" VALUES (1, 'zhangsan');
SQL> INSERT INTO "PERSONS" VALUES (2, 'lisi');
SQL> INSERT INTO "PERSONS" VALUES (3, 'wangwu');

SQL> CREATE TABLE PERSONS_1(ID NUMBER(10),
      NAME VARCHAR2(128) NOT NULL,
      PRIMARY KEY(ID)
   );
Table created.

SQL> INSERT INTO "PERSONS_1" VALUES (1, 'zhangsan');
SQL> INSERT INTO "PERSONS_1" VALUES (2, 'lisi');
SQL> INSERT INTO "PERSONS_1" VALUES (3, 'wangwu');

02  Flink 环境配置

将 FlinkCDC-Oracle 的依赖和 Doris-Flink-Connector 包放到 Flink 的 lib 目录下,同时启动 Flink 集群。

# 下载相干依赖
wget https://repo.maven.apache.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.3.0/flink-sql-connector-oracle-cdc-2.3.0.jar
wget https://repository.apache.org/content/repositories/snapshots/org/apache/doris/flink-doris-connector-1.16/1.5.0-SNAPSHOT/flink-doris-connector-1.16-1.5.0-20230811.065053-1.jar -O flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar

# 启动 Flink 集群
bin/start-cluster.sh

03  一键提交整库同步作业

本次同步以 PERSON 结尾的所有的表。

<FLINK_HOME>/bin/flink run \
     -Dexecution.checkpointing.interval=10s \
     -Dparallelism.default=1 \
     -c org.apache.doris.flink.tools.cdc.CdcTools \
     ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \
     oracle-sync-database \
     --database test_db \
     --oracle-conf hostname=127.0.0.1 \
     --oracle-conf port=1521 \
     --oracle-conf username=admin \
     --oracle-conf password=admin123 \
     --oracle-conf database-name=HELOWIN \
     --oracle-conf schema-name=ADMIN \
     --including-tables "PERSONS.*" \
     --sink-conf fenodes=127.0.0.1:8030 \
     --sink-conf username=root \
     --sink-conf password=\
     --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
     --sink-conf sink.label-prefix=label \
     --table-conf replication_num=1

具体参数可参考:https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris…

提交胜利后,能够在 FlinkWeb 上看到该同步工作的状态。

进入 Doris 能够查看主动创立的表以及同步胜利的全量数据。

mysql> use test_db;                                                                                                                                        
Reading table information for completion of table and column names                                                                                         
You can turn off this feature to get a quicker startup with -A                                                                                             
                                                                                                                                                           
Database changed                                                                                                                                           
mysql> show tables;                                                                                                                                        
+-------------------+                                                                                                                                      
| Tables_in_test_db |                                                                                                                                      
+-------------------+                                                                                                                                      
| PERSONS           |                                                                                                                                      
| PERSONS_1         |                                                                                                                                      
+-------------------+                                                                                                                                      
2 rows in set (0.00 sec)                                                                                                                                   
                                                                                                                                                           
mysql> select * from PERSONS;                                                                                                                              
+------+----------+                                                                                                                                        
| ID   | NAME     |                                                                                                                                        
+------+----------+                                                                                                                                        
|    2 | lisi     |                                                                                                                                        
|    3 | wangwu   |                                                                                                                                        
|    1 | zhangsan |                                                                                                                                        
+------+----------+                                                                                                                                        
3 rows in set (0.01 sec)                                                                                                                                   
                                                                                                                                                           
mysql> select * from PERSONS_1;                                                                                                                            
+------+----------+                                                                                                                                        
| ID   | NAME     |                                                                                                                                        
+------+----------+                                                                                                                                        
|    2 | lisi     |                                                                                                                                        
|    3 | wangwu   |                                                                                                                                        
|    1 | zhangsan |                                                                                                                                        
+------+----------+                                                                                                                                        
3 rows in set (0.01 sec)

在 Oracle 中模仿实时增删改数据

INSERT INTO PERSONS VALUES(4,'doris');
UPDATE PERSONS SET name = 'zhangsan-update' WHERE ID =1;
DELETE PERSONS WHERE ID =2; 

在 Doris 中进行验证,能够确认增量数据曾经胜利同步。

mysql> select * from PERSONS;                                                                                                                              
+------+-----------------+                                                                                                                                 
| ID   | NAME            |                                                                                                                                 
+------+-----------------+                                                                                                                                 
|    1 | zhangsan-update |                                                                                                                                 
|    4 | doris           |                                                                                                                                 
|    3 | wangwu          |                                                                                                                                 
+------+-----------------+                                                                                                                                 
3 rows in set (0.01 sec)  

通过以上操作,胜利实现将 Oracle 中数据整库同步到 Doris 中,同时也实现了上游全量与增量数据的主动接入。

理论应用反馈


原先将 Oracle 数据同步到 Doris 中时,须要手动创立 Source 和 Sink 表,而应用 Doris-Flink-Connector 后能够实现多表、整库数据一键同步,极大简化了开发流程,该工具还能实现字段类型主动转换,数据同步更加简略便捷。

—— 近景能源 资深大数据工程师 孙全隆

在应用 Doris-Flink-Connector 之前,咱们个别是通过 DataX 定时从业务零碎中抽取数据,当进行全量同步时,抽取数据会对业务零碎造成肯定的压力,且该形式只能做到小时级的同步。期间咱们也尝试了 FlinkCDC,该形式尽管能够实现数据实时写入 Doris,但每个表都须要手动创立新工作,配置工作量大且会节约服务器资源。而 Doris-Flink-Connector 能够实现一键化脚本操作,为咱们缩小了繁冗的手工配置流程,高效稳固的实现了整库数据疾速同步。

—— 郑煤机数耘科技 资深大数据工程师 杨开元

Doris-Flink-Connector  一键操作即可疾速实现 Oracle 数据整库同步到 Doris,节俭了手动配置以及编写简单同步代码的步骤,防止了手动同步中可能呈现数据不统一的问题。不仅能进步数据的准确性和可靠性,也极大晋升了工作的效率。

—— 海程邦达 资深大数据工程师 王新

在实时数仓的建设过程中,对于 ODS 贴源数据层的同步需要,Doris-Flink-Connector  可能很好的解决全量数据、增量数据、增量表、表构造变更主动监听。同时它也对 Stream Load 逻辑进行了优化,能够防止频繁对空数据进行 Load,加重了数据库压力。此外,Doris-Flink-Connector 可能帮忙咱们节俭大量 Flink 集群资源,特地是业务变更频繁期间,能很好及时的同步上游状态,确保上下游数据的一致性。

——旺小宝 数据架构师 米华军

咱们在 MySQL 和 Orcale 两个场景下均进行了全量 + 增量的尝试,Doris-Flink-Connector 是真正的拆箱即用,真正实现了一键式操作、无感知建表,这为开发人员节俭了不少工夫老本,同时在应用期间遇到问题,SelectDB 技术同学的响应速度十分给力,帮忙咱们疾速推动数据同步工作。

—— 博思软件 资深大数据开发工程师 刘工

总结


Doris-Flink-Connector 通过集成 FlinkCDC,可能将上游 Oracle 数据库中的数据疾速同步到 Doris 中。特地是在整库同步场景中,用户只需执行一键导入命令,即可疾速将整个数据库的全量和增量数据导入到 Doris 中。 这一性能的引入大大降低了数据同步的门槛,使数据同步变得更加简略高效。

最初,欢送有须要的小伙伴应用该工具,感兴趣的搭档能够在评论区留言或私信申请进入专项反对群,如果你在应用过程中遇到任何问题,均可向咱们反馈~

# 作者介绍: 吴迪,SelectDB 生态研发工程师。

正文完
 0