关于数据库:CloudCanal20-自定义代码实时加工能力自定义实时ETL说明与介绍

65次阅读

共计 7988 个字符,预计需要花费 20 分钟才能阅读完成。

简介

CloudCanal 2.0 中咱们将迎来一项重磅更新——自定义代码实时加工能力。自定义代码实时加工容许用户应用 Java 语言编写自定义的数据行解决逻辑,而后将代码 jar 包上传 CloudCanal 平台后,数据同步工作在执行全量、增量时会主动利用用户的自定义解决逻辑,而后再写入对端数据源。

利用场景

自定义代码实时加工是一种非常灵活的实时数据加工伎俩,在自定义代码中用户能够进行跨实例查问、微服务调用、缓存查问等各种操作,而后对实时接管到的数据行进行编辑。数据编辑反对用户自定义新增行、批改行、删除行。其中批改行反对用户新增列、批改列、删除列。自定义代码实时加工可用于以下场景:

数据荡涤

构建实时数仓、数据湖以及进行数据治理时都须要对数据进行荡涤,波及数据过滤、加工、标准化。在这个过程中,用户能够上传自定义代码,引入本人的一些企业外部数据标准化解决的二方包或者调用一些微服务或者反查数据库对收到的实时数据进行编辑。这样同步到对端后,就是间接曾经荡涤好的数据。

宽表构建

数据同步到数仓、数据湖中时,必然面临的一个问题就是解决多个源端关联表的关系,引入自定义代码,能够很容易的来解决这些关系,生成对端数据源须要的宽表行。在 CloudCanal 平台上,咱们会订阅一张主表,也就是多个关系中的主体,作为驱动表。这张表内的数据行在同步的时候是不残缺的。构建一个写入到对端的宽表行,须要补足其中缺失的列,这些列也就是其余表中的数据。在自定义代码中,用户能够反查本人的数据库取到这些数据,组装成一个残缺的宽表行,返回给 CloudCanal,而后再写入对端。上面咱们以一个具体的 MySQL->ElasticSearch 的同步来解释如何采纳 CloudCanal 自定义代码性能实现宽表行构建。

源端表构造和数据

对端 ES 索引构造以及预期的宽表行数据

其中 Field product_detail 的类型为 nested,保留订单和商品的一对多关系

具体的 JSON Array 信息如下,保留一个订单关联的所有商品的明细信息

{
    "detail": [{
            "product_id": 1,
            "product_name": "洗发液",
            "expire_data": "2025 - 01 - 01"
        },
        {
            "product_id": 2,
            "product_name": "沐浴露",
            "expire_data": "2025 - 01 - 01"
        }
    ]
}

宽表行构建操作步骤

  • ES 索引创立一个订单商品索引,其中商品这个 field 采纳 NESTED 类型
  • CloudCanal 订阅订单表(关系主体)
  • 编写自定义代码,对于收到的订单表的行,取到其中的 order_id 信息,用于反查源端商品表(能够跨实例反查)
  • 对收到的订单表的行,新增一个自定义列,保留关联的所有商品信息(以 json array 存储),生成宽表行
  • 将宽表行写入到 ES,实现数据写入
  • 用户能够从 ES 订单商品索引中检索残缺的蕴含关联商品明细的订单信息。

宽表行的构建过程能够参考下图:

自定义_id 列

CloudCanal 写入 ES 的时候容许用户抉择源端某些列作为对端自定义_id 列。在创立工作数据处理阶段,在上传自定义代码包之后,在对端映射列的下拉选项中能够抉择。对于 ES 而言,雷同_id 上的 doc,再次写入的时候会以 upsert 的形式写入。例如_id= 1 的文档在 ES 中曾经存在,则当一个新的文档 (id=1) 再次写入时,会将对应 field 执行 update 操作。这个个性次要用于确保:从表的更新也能及时反馈到对端的索引上。用户创立 MySQL->ES 的工作时,只须要设置_id 为 join 的关联列。这样后续从表中的字段有更新时,能够依据 join 列在对端索引上进行更新。

只订阅不映射

自定义代码有时候须要依赖源端的列来进行计算,然而实际上不须要同步到对端。这时候能够在创立工作数据处理阶段,

常见问题 1:主表发生变化,被动反查宽表,如果只是宽表字段发生变化,这种状况呢?怎么保护到 es 中对应的索引中?

答:CloudCanal 宽表构建是基于主表触发的,一般而言,关联表也须要及时反馈的指标端有如下形式:新建一个工作,把原来工作反查的表 (从表) 作为主表,全副订阅。创立工作的时候指定自定义_id 列为参加宽表构建的关联列,这样从表的更新都会在对端索引对应的文档上进行 update。

数据汇聚

数据汇聚是以后用户构建数据中台、实时数仓等都会面临的问题。这外面次要波及数据标准化、数据清理等工作。联合自定义代码,用户能够联合本人的数据标准化要求,自在的加工、拼接来自多个源端表内的数据,实现数据会集的工作。

应用须知

  • 新增列的值类型必须能反对转换成 String: 当初不反对用户在自定义代码中指定列类型,以后 CloudCanal 将用户设置的 Object 值,都全副依照 String 解决,写入对端关系型数据库时指定的 java sql type 为 varchar
  • 全量、增量共享一个自定义代码包:自定义代码包是整个工作级别共享的,全量、增量全副会应用自定义的代码包
  • 源端不订阅的列,即便自定义代码中增加了也不会同步:源端订阅的表,如果某些字段没有被订阅,则会退出列黑名单。后续自定义代码中,如果本人再次新增了这个列,留神这个列也不会同步到对端,会被裁剪掉。
  • 用于全量和增量实现不同,全量和增量自定义代码收到的列信息有所区别:全量是在自定义代码之后才进行列过滤,所以在自定义代码中会收到理论没有订阅的列的信息;增量是在自定义代码之前进行列过滤,自定义代码中收到的音讯曾经裁减掉没有订阅的列。
  • 自定义代码解决的一批数据,如果须要写入关系型数据库,则列必须对齐:默认 CloudCanal 采纳 batch 写入,如果一批数据中,有的 x 列,有的 y 列,批量写入不能共用一个 SQL 模板
  • 自定义代码中如果新增行,则不反对 DDL 同步:例如源端执行新增列的 DDL 当前,源端过来的行全副会蕴含新增列的内容。然而自定义代码包中没有感知这个变动,所以新增的行依然应用原来的列数。这就导致一批数据的列不对其,会间接被 CloudCanal 拦挡

最佳实际

  • 数据编辑请应用 SDK 提供的 RecordBuilder 操作,不要本人批改外部对象,可能导致不合乎预期的后果
  • 数据处理的时候不要随便扭转 List 中 Record 的程序,免得乱序引发数据不统一

SDK 应用

顶级接口

SDK 提供的接口比拟简洁,用户在自定义代码中实现 CloudCanalProcessor 接口实现自定义逻辑即可。其中的 process 办法会将 CloudCanal 实时同步的一批数据吐给用户,由用户自定义的解决这些行。

public interface CloudCanalProcessor {List<CustomRecord> process(List<CustomRecord> customRecordList, CustomProcessorContext customProcessorContext);
}

用户自定义解决上下文 CustomProcessContext

顶级接口 process 蕴含一个入参 CustomProcessContext,这个对象中蕴含了一个 Map,保留 CloudCanal 传递个用户自定义处理器的上下文信息。其中的 Key 由 SDK 中的 ContextKey 的实现类指定。

以后反对的 Key 次要如下,容许用户从 context 中间接获取 CloudCanal 帮忙初始化好的源端和指标端的 DataSource。针对关系型数据库,这个对象实际上是一个 DruidDataSource,并且常驻在 CloudCanal 内存中,用户不须要本人 close 改数据源,改数据源会在运行时被重用。

public class RdbContextKey implements ContextKey {
    public static final String SOURCE_DATASOURCE = "srcDataSource";
    public static final String TARGET_DATASOURCE = "dstDataSource";

    public RdbContextKey() {}
}

SDK 反对的数据处理操作

  • 批改行

    • 新增列
    • 删除列
    • 批改已有列的列值
  • 新增行
  • 删除行

外围数据结构与元数据

用户须要解决的数据行对应的外围数据结构是 CustomRecord。其中蕴含的内容次要如下。

    private int opsFlag = 0; // 操作标记位 0 示意不做任何解决,- 1 删除行,1 新增行,2 批改行,用户无需感知
    private Set<String> customAddFields = new LinkedHashSet(); // 记录新增列的列名,外部元数据,用户无需感知
    private CustomRecord.Coordination coordination; // 记录 CustomRecord 关联的外部 Record 的坐标,用户无需感知
    private Map<String, String> recordMetaMap = new LinkedHashMap(); // 保留了数据行的元数据信息,例如音讯来自源端哪个库哪个表
    private Map<String, CustomField> fieldMapAfter = new LinkedHashMap();// 记录变更后的列值
    private Map<String, CustomField> fieldMapBefore = new LinkedHashMap();// 记录变更前的列值

用户在具体应用时只有关怀其中的 recordMetaMap 即可,其中的 key 值由 SDK 提供的类来给出。例如关系型数据库能够获取的 meta 信息均记录在 RdbMetaKeys

public class RdbMetaKeys {
    // 数据行起源的 db
    public static final String DB_NAME = "dbName";
    // 数据行起源的 schema
    public static final String SCHEMA_NAME = "schemaName";
     // 数据行起源的 tableName
    public static final String TABLE_NAME = "tableName";
     // 数据行起源的 action(全量固定为 INSERT, 增量为 INSERT/UPDATE/DELETE)
    public static final String ACTION_NAME = "actionName";

    public RdbMetaKeys() {}
}

数据行编辑器

用户不须要关怀 CustomRecord 外部元数据的治理,应用 SDK 提供的 RecordBuilder 能够实现对数据行的所有操作。RecordBuilder 采纳 Builder 模式实现,反对链式调用。相熟 Lombok 的敌人应该很容易上手。值得注意的是,只有批改行相干的操作(批改列、新增列、删除列)能力 反对链式调用。操作结束后间接返回 CustomRecord 对象。

他的接口定义如下:

    /**
     * 创立一个新的数据行,Map 的 key 为列名,value 为具体的值。*/
    RecordBuilder createRecord(Map<String, Object> fieldValueMap);

    /**
     * 删除以后 RecordBuilder 关联的数据行
     */
    RecordBuilder deleteRecord();


    /**
     * 新增一个列
     */
    RecordBuilder addField(String addFieldName, Object addFieldValue);

    /**
     * 新增多个列
     */
    RecordBuilder addField(Map<String, Object> fieldValueMap);

    /**
     * 删除一个列
     */
    RecordBuilder dropField(String dropFieldName);

    /**
     * 删除多个列
     */
    RecordBuilder dropField(List<String> dropFieldNames);

    /**
     * 更新已有的列的列值
     */
    RecordBuilder updateField(Map<String, Object> fieldValueMap);

新增列的映射关系

在自定义代码中,用户往往会新增列。CloudCanal 默认应用“同名映射规定”。因为新增的列是没有在创立工作的时候指定映射关系的,所以新增的列默认均应用同名映射规定。假如我新增的列为 name,则 CloudCanal 认为该列也应该被写到对端的 name 字段中。因而,在自定义代码解决中,为了确保能正确写入对端,新增列的列名能够应用对端用户本人想要映射的列名,这样会间接写入对端。

具体应用步骤

创立 RecordBuilder 实例

  • 新增 Record 须要应用 RecordBuilder.RecordBuilder.createRecordBuilder():createRecordBuilder 无参数
  • 其余 Record 操作应用 RecordBuilder.modifyRecordBuilder(customRecord): modifyRecordBuilder 办法接管一个参数,传递须要解决的 customRecord

调用 RecordBuilder 实例提供的解决办法

应用 RecordBuilder 提供的办法,能够实现列的增删改、行的新增与删除。

处理完毕后应用 build 办法生成最终的 CustomRecord

应用 RecordBuilder 实现一系列操作后,能够调用 build,生成最终的 CustomRecord 对象

具体的示例能够参考《应用案例》章节提供的示例代码

如何调试自定义代码

倡议部署测试环境和生产环境的 CloudCanal,仅在测试环境开启 debug 模式,调试自定义代码。

代码 debug

  • 先失常创立一个自定义代码解决的同步工作,并且设置不要主动启动
  • 点击性能列表抉择批改参数,设置 debugMode=true,并且设置调试端口 debugPort(端口只容许设置范畴为 8787-8797 之间的端口号)

  • 在 IDE 中以近程 debug 模式启动(咱们以 IDEA 为例)

    • 右上角新增启动配置
    • 抉择 Remote JVM Debug
      - 填 写 ip 地址 (容器所在宿主机 ip) 和端口,近程调试端口为 8787


  • 在 CloudCanal 平台上启动工作
  • 进入自定义代码断点后即可进行调试
  • 【重要!!】调试结束后请从新批改参数 debugMode=false 并重启工作,否则该工作会无奈失常执行

自定义解决代码日志

用户自定义代码中应用如下形式定义 logger 后,能够在指定门路下查看日志。日志门路为:/home/clougence/logs/cloudcanal/tasks/${taskName}/custom_processor.log

    private static final String LOG_NAME     = "custom_processor";
    private static final Logger customLogger = LoggerFactory.getLogger(LOG_NAME);

debug 日志

在工作详情页能够关上参数设置,开启自定义代码的 debug 日志,这样代码解决前后的数据内容会进行残缺的打印,这会占用较多磁盘空间和影响性能,线上环境慎用。日志会打印在工作日志门路下的 custom_process.log 中

如何自定义类名

样例工程中的 resource 目录下的内容是打包自定义解决插件必备的。如果须要批改类名,能够批改 resource/META-INF/cloudcanal/plugin.properties 中的类名,须要应用全限定名称。

创立自定义代码包工作

创立工作

在创立工作的第四步,数据处理,能够抉择配置数据处理插件

而后抉择上传代码包

代码包治理

工作实现创立后,能够在页面治理本人的代码包

Tips:

  • 全量和增量阶段共享一个代码包
  • 激活的代码包重启后才会失效

应用案例

案例一:数据根本解决

本次咱们以 MySQL->MySQL 的数据同步为例,蕴含构造迁徙、全量迁徙和增量实时同步。

筹备的表构造如下:

/* -- 学生表 --*/
CREATE TABLE `student` (`id` int(4) NOT NULL AUTO_INCREMENT COMMENT '学号',
  `name` varchar(20) DEFAULT NULL COMMENT '名字',
  `score` int(3) NOT NULL COMMENT '问题',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=27 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='学生信息表'

咱们的数据加工需要如下:

批改行

新增列

针对过去的所有数据,都进行新增列的操作。新增的列名字别离为 col_varchar 和 col_int,列值别离为 null 和 9999

批改列

针对来自源端 integration_src 库和源端 student 表的行,才进行批改列的操作。把分数为 0 的同学改为分数 99

删除列

针对来自源端 integration_src 库和源端 student 表的行,才进行删除列的操作。针对这一批的所有数据,删除 name 这一列

新增行

每一批数据过去时,满足业务判断条件,则新增两行新的数据,主键采纳自增 id。新增的两行同时蕴含两个自定义的新增列,然而不蕴含 name 列。

删除行

针对来自源端 integration_src 库和源端 student 表的行,如果分数小于 60 的行则会被删除

源表数据

id name score
1 万少 90
2 update99 0
7 张删 50
8 张删 30
9 万少 10
10 万少 10
11 布丁 0
12 布丁 0
16 万少 0
17 张删 0
25 万少 77
26 need99 0

预期对端同步后的数据

id name score col\_varchar col\_int
1 NULL 90 NULL 9999
2 NULL 99 NULL 9999
11 NULL 99 NULL 9999
12 NULL 99 NULL 9999
16 NULL 99 NULL 9999
17 NULL 99 NULL 9999
25 NULL 77 NULL 9999
26 NULL 99 NULL 9999
1637224973 NULL 100 new\_varchar\_value 9999
1637224975 NULL 100 new\_varchar\_value 9999

示例代码 &SDK 下载

参考示例代码:https://gitee.com/clougence/c…

TIPS: 以后 SDK 间接集成在 demo 工程内,后续会独立进去

用户落地案例

  • CloudCanal 自定义代码助力德勤乐融 MySQL 同步 ES 构建宽表场景
  • CloudCanal 助力万店掌 MySQL

FAQ

  • 传递给自定义代码解决的一批数据的程序怎么?是否来自多个表?答:传递给用户的一批数据会蕴含来自多个表的数据。自定义代码的解决当初是并行化的,也就是源端变更会变成多个批次,由多个并行的用户自定义 processor 解决。尽管是并行化解决的,然而整体上数据行的程序依然是保序的,只是针对每一批的数据处理是并行的。

社区快讯

咱们创立 CloudCanal 微信粉丝群啦,在外面,你能够失去最新版本公布信息和资源链接,你能看到其余用户一手评测、应用状况,你更能失去激情的问题解答,当然你还能够给咱们提需要和问题。快快退出吧。

扫描下方二维码,增加咱们小助手微信拉您进群,接头语(“加 CloudCanal 社区群”)

退出 CloudCanal 粉丝群把握一手音讯和获取更多福利,请增加咱们小助手微信:suhuayue001
CloudCanal- 收费好用的企业级数据同步工具,欢送品鉴。
理解更多产品能够查看官方网站:http://www.clougence.com
CloudCanal 社区:https://www.askcug.com/

正文完
 0