简介

CloudCanal 2.0中咱们将迎来一项重磅更新——自定义代码实时加工能力。自定义代码实时加工容许用户应用Java语言编写自定义的数据行解决逻辑,而后将代码jar包上传CloudCanal平台后,数据同步工作在执行全量、增量时会主动利用用户的自定义解决逻辑,而后再写入对端数据源。

利用场景

自定义代码实时加工是一种非常灵活的实时数据加工伎俩,在自定义代码中用户能够进行跨实例查问、微服务调用、缓存查问等各种操作,而后对实时接管到的数据行进行编辑。数据编辑反对用户自定义新增行、批改行、删除行。其中批改行反对用户新增列、批改列、删除列。自定义代码实时加工可用于以下场景:

数据荡涤

构建实时数仓、数据湖以及进行数据治理时都须要对数据进行荡涤,波及数据过滤、加工、标准化。在这个过程中,用户能够上传自定义代码,引入本人的一些企业外部数据标准化解决的二方包或者调用一些微服务或者反查数据库对收到的实时数据进行编辑。这样同步到对端后,就是间接曾经荡涤好的数据。

宽表构建

数据同步到数仓、数据湖中时,必然面临的一个问题就是解决多个源端关联表的关系,引入自定义代码,能够很容易的来解决这些关系,生成对端数据源须要的宽表行。在CloudCanal平台上,咱们会订阅一张主表,也就是多个关系中的主体,作为驱动表。这张表内的数据行在同步的时候是不残缺的。构建一个写入到对端的宽表行,须要补足其中缺失的列,这些列也就是其余表中的数据。在自定义代码中,用户能够反查本人的数据库取到这些数据,组装成一个残缺的宽表行,返回给CloudCanal,而后再写入对端。上面咱们以一个具体的MySQL->ElasticSearch的同步来解释如何采纳CloudCanal自定义代码性能实现宽表行构建。

源端表构造和数据

对端ES索引构造以及预期的宽表行数据

其中Field product_detail的类型为nested,保留订单和商品的一对多关系

具体的JSON Array信息如下,保留一个订单关联的所有商品的明细信息

{    "detail": [{            "product_id": 1,            "product_name": "洗发液",            "expire_data": "2025 - 01 - 01"        },        {            "product_id": 2,            "product_name": "沐浴露",            "expire_data": "2025 - 01 - 01"        }    ]}

宽表行构建操作步骤

  • ES索引创立一个订单商品索引,其中商品这个field采纳NESTED类型
  • CloudCanal订阅订单表(关系主体)
  • 编写自定义代码,对于收到的订单表的行,取到其中的order_id信息,用于反查源端商品表(能够跨实例反查)
  • 对收到的订单表的行,新增一个自定义列,保留关联的所有商品信息(以json array存储),生成宽表行
  • 将宽表行写入到ES,实现数据写入
  • 用户能够从ES订单商品索引中检索残缺的蕴含关联商品明细的订单信息。

宽表行的构建过程能够参考下图:

自定义_id列

CloudCanal写入ES的时候容许用户抉择源端某些列作为对端自定义_id列。在创立工作数据处理阶段,在上传自定义代码包之后,在对端映射列的下拉选项中能够抉择。对于ES而言,雷同_id上的doc,再次写入的时候会以upsert的形式写入。例如_id=1的文档在ES中曾经存在,则当一个新的文档(id=1)再次写入时,会将对应field执行update操作。这个个性次要用于确保:从表的更新也能及时反馈到对端的索引上。用户创立MySQL->ES的工作时,只须要设置_id为join的关联列。这样后续从表中的字段有更新时,能够依据join列在对端索引上进行更新。

只订阅不映射

自定义代码有时候须要依赖源端的列来进行计算,然而实际上不须要同步到对端。这时候能够在创立工作数据处理阶段,

常见问题1:主表发生变化,被动反查宽表,如果只是宽表字段发生变化,这种状况呢?怎么保护到es中对应的索引中?

答:CloudCanal宽表构建是基于主表触发的,一般而言,关联表也须要及时反馈的指标端有如下形式:新建一个工作,把原来工作反查的表(从表)作为主表,全副订阅。创立工作的时候指定自定义_id列为参加宽表构建的关联列,这样从表的更新都会在对端索引对应的文档上进行update。

数据汇聚

数据汇聚是以后用户构建数据中台、实时数仓等都会面临的问题。这外面次要波及数据标准化、数据清理等工作。联合自定义代码,用户能够联合本人的数据标准化要求,自在的加工、拼接来自多个源端表内的数据,实现数据会集的工作。

应用须知

  • 新增列的值类型必须能反对转换成String: 当初不反对用户在自定义代码中指定列类型,以后CloudCanal将用户设置的Object值,都全副依照String解决,写入对端关系型数据库时指定的java sql type为varchar
  • 全量、增量共享一个自定义代码包: 自定义代码包是整个工作级别共享的,全量、增量全副会应用自定义的代码包
  • 源端不订阅的列,即便自定义代码中增加了也不会同步:源端订阅的表,如果某些字段没有被订阅,则会退出列黑名单。后续自定义代码中,如果本人再次新增了这个列,留神这个列也不会同步到对端,会被裁剪掉。
  • 用于全量和增量实现不同,全量和增量自定义代码收到的列信息有所区别:全量是在自定义代码之后才进行列过滤,所以在自定义代码中会收到理论没有订阅的列的信息;增量是在自定义代码之前进行列过滤,自定义代码中收到的音讯曾经裁减掉没有订阅的列。
  • 自定义代码解决的一批数据,如果须要写入关系型数据库,则列必须对齐:默认CloudCanal采纳batch写入,如果一批数据中,有的x列,有的y列,批量写入不能共用一个SQL模板
  • 自定义代码中如果新增行,则不反对DDL同步:例如源端执行新增列的DDL当前,源端过来的行全副会蕴含新增列的内容。然而自定义代码包中没有感知这个变动,所以新增的行依然应用原来的列数。这就导致一批数据的列不对其,会间接被CloudCanal拦挡

最佳实际

  • 数据编辑请应用SDK提供的RecordBuilder操作,不要本人批改外部对象,可能导致不合乎预期的后果
  • 数据处理的时候不要随便扭转List中Record的程序,免得乱序引发数据不统一

SDK应用

顶级接口

SDK提供的接口比拟简洁,用户在自定义代码中实现CloudCanalProcessor接口实现自定义逻辑即可。其中的process办法会将CloudCanal实时同步的一批数据吐给用户,由用户自定义的解决这些行。

public interface CloudCanalProcessor {    List<CustomRecord> process(List<CustomRecord> customRecordList, CustomProcessorContext customProcessorContext);}

用户自定义解决上下文CustomProcessContext

顶级接口process蕴含一个入参CustomProcessContext,这个对象中蕴含了一个Map,保留CloudCanal传递个用户自定义处理器的上下文信息。其中的Key由SDK中的ContextKey的实现类指定。

以后反对的Key次要如下,容许用户从context中间接获取CloudCanal帮忙初始化好的源端和指标端的DataSource。针对关系型数据库,这个对象实际上是一个DruidDataSource,并且常驻在CloudCanal内存中,用户不须要本人close改数据源,改数据源会在运行时被重用。

public class RdbContextKey implements ContextKey {    public static final String SOURCE_DATASOURCE = "srcDataSource";    public static final String TARGET_DATASOURCE = "dstDataSource";    public RdbContextKey() {    }}

SDK反对的数据处理操作

  • 批改行

    • 新增列
    • 删除列
    • 批改已有列的列值
  • 新增行
  • 删除行

外围数据结构与元数据

用户须要解决的数据行对应的外围数据结构是CustomRecord。其中蕴含的内容次要如下。

    private int opsFlag = 0; //操作标记位0示意不做任何解决,-1删除行,1新增行,2批改行,用户无需感知    private Set<String> customAddFields = new LinkedHashSet(); //记录新增列的列名,外部元数据,用户无需感知    private CustomRecord.Coordination coordination; //记录CustomRecord关联的外部Record的坐标,用户无需感知    private Map<String, String> recordMetaMap = new LinkedHashMap(); // 保留了数据行的元数据信息,例如音讯来自源端哪个库哪个表    private Map<String, CustomField> fieldMapAfter = new LinkedHashMap();//记录变更后的列值    private Map<String, CustomField> fieldMapBefore = new LinkedHashMap();//记录变更前的列值

用户在具体应用时只有关怀其中的recordMetaMap即可,其中的key值由SDK提供的类来给出。例如关系型数据库能够获取的meta信息均记录在RdbMetaKeys

public class RdbMetaKeys {    //数据行起源的db    public static final String DB_NAME = "dbName";    //数据行起源的schema    public static final String SCHEMA_NAME = "schemaName";     //数据行起源的tableName    public static final String TABLE_NAME = "tableName";     //数据行起源的action(全量固定为INSERT,增量为INSERT/UPDATE/DELETE)    public static final String ACTION_NAME = "actionName";    public RdbMetaKeys() {    }}

数据行编辑器

用户不须要关怀CustomRecord外部元数据的治理,应用SDK提供的RecordBuilder能够实现对数据行的所有操作。RecordBuilder采纳Builder模式实现,反对链式调用。相熟Lombok的敌人应该很容易上手。值得注意的是,只有批改行相干的操作(批改列、新增列、删除列)能力 反对链式调用。操作结束后间接返回CustomRecord对象。

他的接口定义如下:

    /**     * 创立一个新的数据行,Map的key为列名,value为具体的值。     */    RecordBuilder createRecord(Map<String, Object> fieldValueMap);    /**     * 删除以后RecordBuilder关联的数据行     */    RecordBuilder deleteRecord();    /**     * 新增一个列     */    RecordBuilder addField(String addFieldName, Object addFieldValue);    /**     * 新增多个列     */    RecordBuilder addField( Map<String, Object> fieldValueMap);    /**     * 删除一个列     */    RecordBuilder dropField(String dropFieldName);    /**     * 删除多个列     */    RecordBuilder dropField( List<String> dropFieldNames);    /**     * 更新已有的列的列值     */    RecordBuilder updateField(Map<String, Object> fieldValueMap);

新增列的映射关系

在自定义代码中,用户往往会新增列。CloudCanal默认应用“同名映射规定”。因为新增的列是没有在创立工作的时候指定映射关系的,所以新增的列默认均应用同名映射规定。假如我新增的列为name,则CloudCanal认为该列也应该被写到对端的name字段中。因而,在自定义代码解决中,为了确保能正确写入对端,新增列的列名能够应用对端用户本人想要映射的列名,这样会间接写入对端。

具体应用步骤

创立RecordBuilder实例

  • 新增Record须要应用RecordBuilder.RecordBuilder.createRecordBuilder(): createRecordBuilder 无参数
  • 其余Record操作应用RecordBuilder.modifyRecordBuilder(customRecord): modifyRecordBuilder 办法接管一个参数,传递须要解决的customRecord

调用RecordBuilder实例提供的解决办法

应用RecordBuilder提供的办法,能够实现列的增删改、行的新增与删除。

处理完毕后应用build办法生成最终的CustomRecord

应用RecordBuilder实现一系列操作后,能够调用build,生成最终的CustomRecord对象

具体的示例能够参考《应用案例》章节提供的示例代码

如何调试自定义代码

倡议部署测试环境和生产环境的CloudCanal,仅在测试环境开启debug模式,调试自定义代码。

代码debug

  • 先失常创立一个自定义代码解决的同步工作,并且设置不要主动启动
  • 点击性能列表抉择批改参数,设置debugMode=true,并且设置调试端口debugPort(端口只容许设置范畴为8787-8797之间的端口号)

  • 在IDE中以近程debug模式启动(咱们以IDEA为例)

    • 右上角新增启动配置
    • 抉择Remote JVM Debug
      -填 写ip地址(容器所在宿主机ip)和端口,近程调试端口为8787


  • 在CloudCanal平台上启动工作
  • 进入自定义代码断点后即可进行调试
  • 【重要!!】调试结束后请从新批改参数debugMode=false并重启工作,否则该工作会无奈失常执行

自定义解决代码日志

用户自定义代码中应用如下形式定义logger后,能够在指定门路下查看日志。日志门路为:/home/clougence/logs/cloudcanal/tasks/${taskName}/custom_processor.log

    private static final String LOG_NAME     = "custom_processor";    private static final Logger customLogger = LoggerFactory.getLogger(LOG_NAME);

debug日志

在工作详情页能够关上参数设置,开启自定义代码的debug日志,这样代码解决前后的数据内容会进行残缺的打印,这会占用较多磁盘空间和影响性能,线上环境慎用。日志会打印在工作日志门路下的custom_process.log中

如何自定义类名

样例工程中的resource目录下的内容是打包自定义解决插件必备的。如果须要批改类名,能够批改resource/META-INF/cloudcanal/plugin.properties中的类名,须要应用全限定名称。

创立自定义代码包工作

创立工作

在创立工作的第四步,数据处理,能够抉择配置数据处理插件

而后抉择上传代码包

代码包治理

工作实现创立后,能够在页面治理本人的代码包

Tips:

  • 全量和增量阶段共享一个代码包
  • 激活的代码包重启后才会失效

应用案例

案例一:数据根本解决

本次咱们以MySQL->MySQL的数据同步为例,蕴含构造迁徙、全量迁徙和增量实时同步。

筹备的表构造如下:

/* -- 学生表 --*/CREATE TABLE `student` (  `id` int(4) NOT NULL AUTO_INCREMENT COMMENT '学号',  `name` varchar(20) DEFAULT NULL COMMENT '名字',  `score` int(3) NOT NULL COMMENT '问题',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=27 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='学生信息表'

咱们的数据加工需要如下:

批改行

新增列

针对过去的所有数据,都进行新增列的操作。新增的列名字别离为col_varchar和col_int,列值别离为null和9999

批改列

针对来自源端integration_src库和源端student表的行,才进行批改列的操作。把分数为0的同学改为分数99

删除列

针对来自源端integration_src库和源端student表的行,才进行删除列的操作。针对这一批的所有数据,删除name这一列

新增行

每一批数据过去时,满足业务判断条件,则新增两行新的数据,主键采纳自增id。新增的两行同时蕴含两个自定义的新增列,然而不蕴含name列。

删除行

针对来自源端integration_src库和源端student表的行,如果分数小于60的行则会被删除

源表数据

idnamescore
1万少90
2update990
7张删50
8张删30
9万少10
10万少10
11布丁0
12布丁0
16万少0
17张删0
25万少77
26need990

预期对端同步后的数据

idnamescorecol\_varcharcol\_int
1NULL90NULL9999
2NULL99NULL9999
11NULL99NULL9999
12NULL99NULL9999
16NULL99NULL9999
17NULL99NULL9999
25NULL77NULL9999
26NULL99NULL9999
1637224973NULL100new\_varchar\_value9999
1637224975NULL100new\_varchar\_value9999

示例代码&SDK下载

参考示例代码:https://gitee.com/clougence/c...

TIPS: 以后SDK间接集成在demo工程内,后续会独立进去

用户落地案例

  • CloudCanal自定义代码助力德勤乐融MySQL同步ES构建宽表场景
  • CloudCanal助力万店掌MySQL

FAQ

  • 传递给自定义代码解决的一批数据的程序怎么?是否来自多个表?答:传递给用户的一批数据会蕴含来自多个表的数据。自定义代码的解决当初是并行化的,也就是源端变更会变成多个批次,由多个并行的用户自定义processor解决。尽管是并行化解决的,然而整体上数据行的程序依然是保序的,只是针对每一批的数据处理是并行的。

社区快讯

咱们创立 CloudCanal 微信粉丝群啦,在外面,你能够失去最新版本公布信息和资源链接,你能看到其余用户一手评测、应用状况,你更能失去激情的问题解答,当然你还能够给咱们提需要和问题。快快退出吧。

扫描下方二维码,增加咱们小助手微信拉您进群,接头语(“加 CloudCanal 社区群”)

退出CloudCanal粉丝群把握一手音讯和获取更多福利,请增加咱们小助手微信:suhuayue001
CloudCanal-收费好用的企业级数据同步工具,欢送品鉴。
理解更多产品能够查看官方网站: http://www.clougence.com
CloudCanal社区:https://www.askcug.com/