关于Flink:Flink-CDC-系列-实时抽取-Oracle-数据排雷和调优实践

61次阅读

共计 6940 个字符,预计需要花费 18 分钟才能阅读完成。

本文作者为中国农业银行研发核心丁杨,在 Flink CDC 2.1 版本公布后第一工夫下载应用,并胜利实现了对 Oracle 的实时数据捕捉以及性能调优,现将试用过程中的一些要害细节进行分享。次要内容包含:

  1. 无奈连贯数据库
  2. 无奈找到 Oracle 表
  3. 数据提早较大
  4. 调节参数持续升高数据提早
  5. Debezium Oracle Connector 的暗藏参数

Flink CDC 于 2021 年 11 月 15 日公布了最新版本 2.1,该版本通过引入内置 Debezium 组件,减少了对 Oracle 的反对。笔者第一工夫下载了该版本进行试用并胜利实现了对 Oracle 的实时数据捕捉以及性能调优,现将试用过程中的一些要害细节进行分享。

阐明:本文力求依据理论的问题排查教训,以及外部执行原理分享一些“干货”,所以对 Flink CDC,以及其内置的 Debezium 模块的根底应用办法并未波及,对于根底的应用办法、参数等,读者可参考以下地址:

Flink CDC:

https://ververica.github.io/f…

Debezium:

https://debezium.io/documenta…

试用环境

Oracle:11.2.0.4.0(RAC 部署)

Flink:1.13.1

Hadoop:3.2.1

通过 Flink on Yarn 形式部署应用

一、无奈连贯数据库

依据官网文档阐明,在 Flink SQL CLI 中输出以下语句:

create table TEST (A string)
WITH ('connector'='oracle-cdc',
    'hostname'='10.230.179.125',
    'port'='1521',
    'username'='myname',
    'password'='***',
    'database-name'='MY_SERVICE_NAME',
    'schema-name'='MY_SCHEMA',
    'table-name'='TEST' );

之后尝试通过 select * from TEST 察看,发现无奈失常连贯 Oracle,报错如下:

[ERROR] Could not execute SQL statement. Reason:
oracle.net.ns.NetException: Listener refused the connection with the following error:
ORA-12505, TNS:listener does not currently know of SID given in connect descriptor

从报错信息来看,可能是因为 Flink CDC 误将连贯信息中提供的 MY_SERVICE_NAME (Oracle 的服务名) 错认为 SID。于是尝试浏览 Flink CDC 波及到 Oracle Connector 的源码,发现在 com.ververica.cdc.connectors.oracle.OracleValidator 中,对于 Oracle 连贯的代码如下:

public static Connection openConnection(Properties properties) throws SQLException {DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
    String hostname = properties.getProperty("database.hostname");
    String port = properties.getProperty("database.port");
    String dbname = properties.getProperty("database.dbname");
    String userName = properties.getProperty("database.user");
    String userpwd = properties.getProperty("database.password");
    return DriverManager.getConnection("jdbc:oracle:thin:@" + hostname + ":" + port + ":" + dbname, userName, userpwd);
}

由上能够看出,在以后版本的 Flink CDC 中,对于 SID 和 Service Name 的连贯形式并未做辨别,而是间接在代码中写死了 SID 的连贯形式 (即 port 和 dbname 两头应用“:”分隔开)。

从 Oracle 8i 开始,Oracle 曾经引入了 Service Name 的概念以反对数据库的集群 (RAC) 部署,一个 Service Name 可作为一个数据库的逻辑概念,对立对该数据库不同的 SID 实例的连贯。据此,能够思考以下两种形式:

  1. 在 Flink CDC 的 create table 语句中,将 database-name 由 Service Name 替换成其中一个 SID。该形式能解决连贯问题,但无奈适应支流的 Oracle 集群部署的实在场景;
  2. 对该源码进行批改。具体可在新建工程中,重写 com.ververica.cdc.connectors.oracle.OracleValidator 办法,批改为 Service Name 的连贯形式 (即 port 和 dbname 两头应用“/”分隔开),即:

    “jdbc:oracle:thin:@” + hostname + “:” + port + “/” + dbname, userName, userpwd);

笔者采纳的就是第二种办法,实现了失常连贯数据库的同时,保留对 Oracle Service Name 个性的应用。

该问题已提交至 Flink CDC Issue 701:

https://github.com/ververica/…

二、无奈找到 Oracle 表

依照上述步骤,再次通过 select * from TEST 察看,发现仍然无奈失常获取数据,报错如下:

[ERROR] Could not execute SQL statement. Reason:
io.debezium.DebeziumException: Supplemental logging not configured for table MY_SERVICE_NAME.MY_SCHEMA.test.  Use command: ALTER TABLE MY_SCHEMA.test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS

察看到谬误日志中提到的表是 MY_SERVICE_NAME.MY_SCHEMA.test,为什么数据库名、Schema 名都是大写,而表名是小写?

留神到该谬误由 io.debezium 包报出,通过剖析该包的源代码 (通过 Flink CDC 的 pom.xml 文件可知,目前应用的是 debezium 1.5.4 版本) 可知,在 io.debezium.relational.Tables 中有如下代码:

private TableId toLowerCaseIfNeeded(TableId tableId) {return tableIdCaseInsensitive ? tableId.toLowercase() : tableId;
}

可见,Debezium 的开发者将“大小写不敏感”对立定义为了“须要将表名转换为小写”。对于 Debezium 反对的 PostgreSQL、Mysql 等的确如此。然而对于 Oracle 数据库,“大小写不敏感”却意味着在外部元信息存储时,须要将表名转换为大写

见 https://docs.oracle.com/cd/E1…

“Nonquoted identifiers are not case sensitive. Oracle interprets them as uppercase”)。

因此 Debezium 在读取到“大小写不敏感”的配置后,依照上述代码逻辑,只会因为尝试去读取小写的表名而报错。

因为 Debezium 直到目前最新的稳固版本 1.7.1,以及最新的开发版本 1.8.0 都未修复该问题,咱们能够通过以下两种办法绕过该问题:

  1. 如需应用 Oracle“大小写不敏感”的个性,可间接批改源码,将上述 toLowercase 批改为 toUppercase (这也是笔者抉择的办法);
  2. 如果不违心批改源码,且无需应用 Oracle“大小写不敏感”的个性,能够在 create 语句中加上 ‘debezium.database.tablename.case.insensitive’=’false’,如下示例:

    create table TEST (A string)
    WITH ('connector'='oracle-cdc',
        'hostname'='10.230.179.125',
        'port'='1521',
        'username'='myname',
        'password'='***',
        'database-name'='MY_SERVICE_NAME',
    'schema-name'='MY_SCHEMA',
    'table-name'='TEST',
    'debezium.database.tablename.case.insensitive'='false' );

该办法的弊病是丢失了 Oracle“大小写不敏感”的个性,在 ‘table-name’ 中必须显式指定大写的表名。

须要注明的是,对于 database.tablename.case.insensitive 参数,Debezium 目前仅对 Oracle 11g 默认设置为 true,对其余 Oracle 版本均默认设置为 false。所以读者如果应用的不是 Oracle 11g 版本,可无需批改该参数,但仍需显式指定大写的表名。

该问题已提交至 Flink CDC Issue 702:

https://github.com/ververica/…

三、数据提早较大

数据提早较大,有时须要 3-5 分钟能力捕捉到数据变动。对于该问题,在 Flink CDC FAQ 中已给出了明确的解决方案:在 create 语句中加上如下两个配置项:

'debezium.log.mining.strategy'='online_catalog',
'debezium.log.mining.continuous.mine'='true'

那么为什么要这样做呢?咱们仍然能够通过剖析源码和日志,联合 Oracle Logminer 的工作原理来加深对工具的了解。

对 Logminer 的抽取工作,次要在 Debezium 的 io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource 中 execute 办法进行。为节约篇幅,本文不列出理论的源码,仅提炼出要害过程绘于上面的流程图,有趣味的读者能够对照该流程图,结合实际源码进行剖析:

采纳 redo_log_catalog 的形式,能够监控数据表的 DDL 信息,且因为 archive logs 被永恒保留到磁盘上,能够在数据库宕机后仍然失常获取到宕机前的所有 DDL 和 DML 操作。但因为波及到比 online catalog 更多的信息监控,以及由此带来的频繁的日志切换和日志转储操作,其代价也是惊人的。

依据笔者理论测试状况,如果 debezium.log.mining.strategy 为默认配置 redo_log_catalog,则不仅须要多执行第 ① 步操作 (该操作耗时约为半分钟到 1 分钟之间),在第 ④ 步,依据 archived logs 的数据量,耗时也会在 1 分钟到 4 分钟之间浮动;在第 ⑤ 步,理论查问 V$LOGMNR_CONTENTS 视图也经常须要十几秒能力实现。

此外,因为 archive logs 在理论零碎中增长速度较快,因而在理论应用中,常会配合进行定期删除或转储过期日志的操作。因为上述第 ④ 步的耗时较长,笔者察看到在第 ④ 步执行的过程中,在肯定概率下会产生第 ② 步退出的 a rchive logs 已过期而被删除转储的状况,于是在第 ⑤ 步查问的时候,会因为找不到第 ② 步退出的日志,而报上面的谬误:

ORA-00308: cannot open archive log '/path/to/archive/log/...'
ORA-27037: unable to obtain file status

一般来说,Flink CDC 所须要监控的表,特地是对于业务零碎有重大意义的表,个别不会进行 DDL 操作,仅须要捕获 DML 操作即可,且对于数据库宕机等极非凡状况,也可应用在数据库复原后进行全量数据更新的形式保障数据的一致性。因此,online_catalog 的形式足以满足咱们的须要。

另外,无论应用 online_catalog,还是默认的 redo_log_catalog,都会存在第 ② 步找到的日志和第 ⑤ 步理论须要的日志不同步的问题,因而,退出 ‘debezium.log.mining.continuous.mine’=’true’ 参数,将实时收集日志的工作交给 Oracle 主动实现,即可躲避这一问题。

笔者依照这两个参数配置后,数据提早个别能够从数分钟降至 5 秒钟左右。

四、调节参数持续升高数据提早

上述流程图的第 ③ 步和第 ⑦ 步,提到了依据配置项来确定 LogMiner 监控时序范畴,以及确定休眠工夫。上面对该过程进行进一步剖析,并对单个表的进一步调优给出一般性的方法论。

通过观察 io.debezium.connector.oracle.logminer.LogMinerHelper 类中的 getEndScn 办法,可理解到 debezium 对监控时序范畴和休眠工夫的调节原理。为便于读者了解,将该办法用流程图阐明如下:

从上述的流程图中能够看出,debezium 给出 log.mining.batch.size.* 和 log.mining.sleep.time.* 两组参数,就是为了让每一次 logMiner 运行的步长可能尽可能和数据库本身 SCN 减少的步长统一。由此可见:

  • log.mining.batch.size.* 和 log.mining.sleep.time.* 参数的设定,和数据库整体的体现无关,和单个表的数据变动状况无关;
  • log.mining.batch.size.default 不仅仅是监控时序范畴的起始值,还是监控时序范畴变动的阈值。所以如果要实现更灵便的监控时序范畴调整,可思考适当减小该参数;
  • 因为每一次确定监控时序范畴时,都会依据 topScn 和 currentScn 的大小来调整 sleepTime,所以为了实现休眠工夫更灵便的调整,可思考适当增大 log.mining.sleep.time.increment.ms;
  • log.mining.batch.size.max 不能过小,否则会有监控时序范畴永远无奈追上数据库以后 SCN 的危险。为此,debezium 在 io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics 中存在以下逻辑:

    if (currentBatchSize == batchSizeMax) {LOGGER.info("LogMiner is now using the maximum batch size {}. This could be indicative of large SCN gaps", currentBatchSize);
    }

如果以后的监控时序范畴达到了 log.mining.batch.size.max,那么 debezium 会在日志中给出如上提醒。在理论利用中,察看 Flink CDC 产生的 log 是否蕴含该提醒,便可得悉 log.mining.batch.size.max 的值是否正当。

五、Debezium Oracle Connector 的暗藏参数

事实上从上文中咱们曾经理解到了两个暗藏参数:debezium.database.tablename.case.insensitive (见第二节内容) 和 debezium.log.mining.continuous.mine (见第三节内容),这两个参数在 Debezium 的官网文档中均未给出理论阐明,但实际上能够应用。通过剖析源码,现给出 Debezium Oracle Connector 的所有暗藏参数,以及其阐明如下:

笔者认为除了下面咱们曾经用到的两个参数以外,同样值得重点关注的是 log.mining.history.recorder.class 参数。因为该参数目前默认为 io.debezium.connector.oracle.logminer.NeverHistoryRecorder,是一个空类,所以咱们在剖析 Flink CDC 行为时,通过自定义实现 io.debezium.connector.oracle.logminer.HistoryRecorder 接口的类,可在不批改源码的状况下,实现对 Flink CDC 行为的个性化监控。

Flink-CDC 我的项目地址:
https://github.com/ververica/…


近期热点

Flink Forward Asia 2021 延期,线上相见


更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~

正文完
 0