乐趣区

关于集成:一文讲清楚FusionInsight-MRS-CDL如何使用

摘要:CDL 是一种简略、高效的数据实时集成服务,可能从各种 OLTP 数据库中抓取 Data Change 事件,而后推送至 Kafka 中,最初由 Sink Connector 生产 Topic 中的数据并导入到大数据生态软件应用中,从而实现数据的实时入湖。

本文分享自华为云社区《华为 FusionInsight MRS CDL 使用指南》,作者:晋红轻。

阐明

CDL 是一种简略、高效的数据实时集成服务,可能从各种 OLTP 数据库中抓取 Data Change 事件,而后推送至 Kafka 中,最初由 Sink Connector 生产 Topic 中的数据并导入到大数据生态软件应用中,从而实现数据的实时入湖。

CDL 服务蕴含了两个重要的角色:CDLConnector 和 CDLService。CDLConnector 是具体执行数据抓取工作的实例,CDLService 是负责管理和创立工作的实例。

本此实际介绍以 mysql 作为数据源进行数据抓取

前提条件

  • MRS 集群已装置 CDL 服务。
  • MySQL 数据库须要开启 mysql 的 bin log 性能(默认状况下是开启的)。

查看 MySQL 是否开启 bin log:

应用工具或者命令行连贯 MySQL 数据库(本示例应用 navicat 工具连贯),执行 show variables like ‘log_%’ 命令查看。

例如在 navicat 工具抉择 ”File > New Query” 新建查问,输出如下 SQL 命令,单击 ”Run” 在后果中 ”log_bin” 显示为 ”ON” 则示意开启胜利。

show variables like 'log_%'

工具筹备

当初 cdl 只能应用 rest api 的形式进行命令提交,所以须要提前装置工具进行调试。本文应用 VSCode 工具。

实现之后装置 rest client 插件:

实现之后创立一个 cdl.http 的文件进行编辑:

创立 CDL 工作

CDL 工作创立的流程图如下所示:

阐明:须要先创立一个 MySQL link, 在创立一个 Kafka link, 而后再创立一个 CDL 同步工作并启动。

MySQL link 局部 rest 申请代码

@hostname = 172.16.9.113
@port = 21495
@host = {{hostname}}:{{port}}
@bootstrap = "172.16.9.113:21007"
@bootstrap_normal = "172.16.9.113:21005"
@mysql_host = "172.16.2.118"
@mysql_port = "3306"
@mysql_database = "hudi"
@mysql_user = "root"
@mysql_password = "Huawei@123"

### get links
get https://{{host}}/api/v1/cdl/link

### mysql link validate

post https://{{host}}/api/v1/cdl/link?validate=true
content-type: application/json

{
"name": "MySQL_link", //link 名,全局惟一,不能反复
"description":"MySQL connection", //link 形容
"link-type":"mysql", //link 的类型
"enabled":"true",
"link-config-values":  {
"inputs": [{ "name": "host", "value": {{mysql_host}} }, // 数据库装置节点的 ip
        {"name": "port", "value": {{mysql_port}} },// 数据库监听的端口
        {"name": "database.name", "value": {{mysql_database}} }, // 连贯的数据库名
        {"name": "user", "value": {{mysql_user}} }, // 用户
        {"name": "password","value": {{mysql_password}} } ,// 明码
        {"name":"schema", "value": {{mysql_database}}}// 同数据库名
        ]
    }
}

### mysql link create

post https://{{host}}/api/v1/cdl/link
content-type: application/json

{
"name": "MySQL_link", //link 名,全局惟一,不能反复
"description":"MySQL connection", //link 形容
"link-type":"mysql", //link 的类型
"enabled":"true",
"link-config-values":  {
"inputs": [{ "name": "host", "value": {{mysql_host}} }, // 数据库装置节点的 ip
        {"name": "port", "value": {{mysql_port}} },// 数据库监听的端口
        {"name": "database.name", "value": {{mysql_database}} }, // 连贯的数据库名
        {"name": "user", "value": {{mysql_user}} }, // 用户
        {"name": "password","value": {{mysql_password}} } ,// 明码
        {"name":"schema", "value": {{mysql_database}}}// 同数据库名
        ]
    }
}

### mysql link update

put https://{{host}}/api/v1/cdl/link/MySQL_link
content-type: application/json

{
"name": "MySQL_link", //link 名,全局惟一,不能反复
"description":"MySQL connection", //link 形容
"link-type":"mysql", //link 的类型
"enabled":"true",
"link-config-values":  {
"inputs": [{ "name": "host", "value": {{mysql_host}} }, // 数据库装置节点的 ip
        {"name": "port", "value": {{mysql_port}} },// 数据库监听的端口
        {"name": "database.name", "value": {{mysql_database}} }, // 连贯的数据库名
        {"name": "user", "value": {{mysql_user}} }, // 用户
        {"name": "password","value": {{mysql_password}} } ,// 明码
        {"name":"schema", "value": {{mysql_database}}}// 同数据库名
        ]
    }
}

Kafka link 局部 rest 申请代码

### get links
get https://{{host}}/api/v1/cdl/link

### kafka link validate

post https://{{host}}/api/v1/cdl/link?validate=true
content-type: application/json

{
"name": "kafka_link",
"description":"test kafka link",
"link-type":"kafka",
"enabled":"true",
"link-config-values":  {
"inputs": [{ "name": "bootstrap.servers", "value": "172.16.9.113:21007"},
        {"name": "sasl.kerberos.service.name", "value": "kafka"},
        {"name": "security.protocol","value": "SASL_PLAINTEXT"}// 平安模式为 SASL_PLAINTEXT,一般模式为 PLAINTEXT
        ]
    }
}

### kafka link create

post https://{{host}}/api/v1/cdl/link
content-type: application/json

{
"name": "kafka_link",
"description":"test kafka link",
"link-type":"kafka",
"enabled":"true",
"link-config-values":  {
"inputs": [{ "name": "bootstrap.servers", "value": "172.16.9.113:21007"},
        {"name": "sasl.kerberos.service.name", "value": "kafka"},
        {"name": "security.protocol","value": "SASL_PLAINTEXT"}// 平安模式为 SASL_PLAINTEXT,一般模式为 PLAINTEXT
        ]
    }
}

### kafka link update

put https://{{host}}/api/v1/cdl/link/kafka_link
content-type: application/json

{
"name": "kafka_link",
"description":"test kafka link",
"link-type":"kafka",
"enabled":"true",
"link-config-values":  {
"inputs": [{ "name": "bootstrap.servers", "value": "172.16.9.113:21007"},
        {"name": "sasl.kerberos.service.name", "value": "kafka"},
        {"name": "security.protocol","value": "SASL_PLAINTEXT"}// 平安模式为 SASL_PLAINTEXT,一般模式为 PLAINTEXT
        ]
    }
}

CDL 工作命令局部 rest 申请代码

@hostname = 172.16.9.113
@port = 21495
@host = {{hostname}}:{{port}}
@bootstrap = "172.16.9.113:21007"
@bootstrap_normal = "172.16.9.113:21005"
@mysql_host = "172.16.2.118"
@mysql_port = "3306"
@mysql_database = "hudi"
@mysql_user = "root"
@mysql_password = "Huawei@123"

### create job
post https://{{host}}/api/v1/cdl/job
content-type: application/json

{
    "job_type": "CDL_JOB", //job 类型, 目前只反对 CDL_JOB 这一种
    "name": "mysql_to_kafka", //job 名称
    "description":"mysql_to_kafka", //job 形容
    "from-link-name": "MySQL_link",  // 数据源 Link
    "to-link-name": "kafka_link", // 指标源 Link
    "from-config-values": {
        "inputs": [{"name" : "connector.class", "value" : "com.huawei.cdc.connect.mysql.MysqlSourceConnector"},
            {"name" : "schema", "value" : "hudi"},
            {"name" : "db.name.alias", "value" : "hudi"},
            {"name" : "whitelist", "value" : "hudisource"},
            {"name" : "tables", "value" : "hudisource"},
            {"name" : "tasks.max", "value" : "10"},
            {"name" : "mode", "value" : "insert,update,delete"},
            {"name" : "parse.dml.data", "value" : "true"},
            {"name" : "schema.auto.creation", "value" : "false"},
            {"name" : "errors.tolerance", "value" : "all"},
            {"name" : "multiple.topic.partitions.enable", "value" : "false"},
            {"name" : "topic.table.mapping", "value" : "[{\"topicName\":\"huditableout\", \"tableName\":\"hudisource\"}
                ]"
            },
              {"name" : "producer.override.security.protocol", "value" : "SASL_PLAINTEXT"},// 平安模式为 SASL_PLAINTEXT,一般模式为 PLAINTEXT
            {"name" : "consumer.override.security.protocol", "value" : "SASL_PLAINTEXT"}// 平安模式为 SASL_PLAINTEXT,一般模式为 PLAINTEXT
        ]
    },
    "to-config-values": {"inputs": []},
    "job-config-values": {
        "inputs": [{"name" : "global.topic", "value" : "demo"}
        ]
    }
}

### get all job
get https://{{host}}/api/v1/cdl/job
### submit job
put https://{{host}}/api/v1/cdl/job/mysql_to_kafka/start
### get job status
get https://{{host}}/api/v1/cdl/submissions?jobName=mysql_to_kafka
### stop job
put https://{{host}}/api/v1/cdl/job/mysql_to_kafka/submissions/13/stop
### delete job
DELETE https://{{host}}/api/v1/cdl/job/mysql_to_kafka

场景验证

生产库 MySQL 原始数据如下:

提交 CDL 工作之后

减少操作: insert into hudi.hudisource values (11,“蒋语堂”,38,“女”,“图”,“播放器”,28732);

对应 kafka 音讯体:

更改操作: UPDATE hudi.hudisource SET uname=‘Anne Marie333’WHERE uid=11;

对应 kafka 音讯体:

删除操作:delete from hudi.hudisource where uid=11;

对应 kafka 音讯体:

点击关注,第一工夫理解华为云陈腐技术~

退出移动版