摘要:CDL 是一种简略、高效的数据实时集成服务,可能从各种 OLTP 数据库中抓取 Data Change 事件,而后推送至 Kafka 中,最初由 Sink Connector 生产 Topic 中的数据并导入到大数据生态软件应用中,从而实现数据的实时入湖。
本文分享自华为云社区《华为 FusionInsight MRS CDL 使用指南》,作者:晋红轻。
阐明
CDL 是一种简略、高效的数据实时集成服务,可能从各种 OLTP 数据库中抓取 Data Change 事件,而后推送至 Kafka 中,最初由 Sink Connector 生产 Topic 中的数据并导入到大数据生态软件应用中,从而实现数据的实时入湖。
CDL 服务蕴含了两个重要的角色:CDLConnector 和 CDLService。CDLConnector 是具体执行数据抓取工作的实例,CDLService 是负责管理和创立工作的实例。
本此实际介绍以 mysql 作为数据源进行数据抓取
前提条件
- MRS 集群已装置 CDL 服务。
- MySQL 数据库须要开启 mysql 的 bin log 性能(默认状况下是开启的)。
查看 MySQL 是否开启 bin log:
应用工具或者命令行连贯 MySQL 数据库(本示例应用 navicat 工具连贯),执行 show variables like ‘log_%’ 命令查看。
例如在 navicat 工具抉择 ”File > New Query” 新建查问,输出如下 SQL 命令,单击 ”Run” 在后果中 ”log_bin” 显示为 ”ON” 则示意开启胜利。
show variables like 'log_%'
工具筹备
当初 cdl 只能应用 rest api 的形式进行命令提交,所以须要提前装置工具进行调试。本文应用 VSCode 工具。
实现之后装置 rest client 插件:
实现之后创立一个 cdl.http 的文件进行编辑:
创立 CDL 工作
CDL 工作创立的流程图如下所示:
阐明:须要先创立一个 MySQL link, 在创立一个 Kafka link, 而后再创立一个 CDL 同步工作并启动。
MySQL link 局部 rest 申请代码
@hostname = 172.16.9.113
@port = 21495
@host = {{hostname}}:{{port}}
@bootstrap = "172.16.9.113:21007"
@bootstrap_normal = "172.16.9.113:21005"
@mysql_host = "172.16.2.118"
@mysql_port = "3306"
@mysql_database = "hudi"
@mysql_user = "root"
@mysql_password = "Huawei@123"
### get links
get https://{{host}}/api/v1/cdl/link
### mysql link validate
post https://{{host}}/api/v1/cdl/link?validate=true
content-type: application/json
{
"name": "MySQL_link", //link 名,全局惟一,不能反复
"description":"MySQL connection", //link 形容
"link-type":"mysql", //link 的类型
"enabled":"true",
"link-config-values": {
"inputs": [{ "name": "host", "value": {{mysql_host}} }, // 数据库装置节点的 ip
{"name": "port", "value": {{mysql_port}} },// 数据库监听的端口
{"name": "database.name", "value": {{mysql_database}} }, // 连贯的数据库名
{"name": "user", "value": {{mysql_user}} }, // 用户
{"name": "password","value": {{mysql_password}} } ,// 明码
{"name":"schema", "value": {{mysql_database}}}// 同数据库名
]
}
}
### mysql link create
post https://{{host}}/api/v1/cdl/link
content-type: application/json
{
"name": "MySQL_link", //link 名,全局惟一,不能反复
"description":"MySQL connection", //link 形容
"link-type":"mysql", //link 的类型
"enabled":"true",
"link-config-values": {
"inputs": [{ "name": "host", "value": {{mysql_host}} }, // 数据库装置节点的 ip
{"name": "port", "value": {{mysql_port}} },// 数据库监听的端口
{"name": "database.name", "value": {{mysql_database}} }, // 连贯的数据库名
{"name": "user", "value": {{mysql_user}} }, // 用户
{"name": "password","value": {{mysql_password}} } ,// 明码
{"name":"schema", "value": {{mysql_database}}}// 同数据库名
]
}
}
### mysql link update
put https://{{host}}/api/v1/cdl/link/MySQL_link
content-type: application/json
{
"name": "MySQL_link", //link 名,全局惟一,不能反复
"description":"MySQL connection", //link 形容
"link-type":"mysql", //link 的类型
"enabled":"true",
"link-config-values": {
"inputs": [{ "name": "host", "value": {{mysql_host}} }, // 数据库装置节点的 ip
{"name": "port", "value": {{mysql_port}} },// 数据库监听的端口
{"name": "database.name", "value": {{mysql_database}} }, // 连贯的数据库名
{"name": "user", "value": {{mysql_user}} }, // 用户
{"name": "password","value": {{mysql_password}} } ,// 明码
{"name":"schema", "value": {{mysql_database}}}// 同数据库名
]
}
}
Kafka link 局部 rest 申请代码
### get links
get https://{{host}}/api/v1/cdl/link
### kafka link validate
post https://{{host}}/api/v1/cdl/link?validate=true
content-type: application/json
{
"name": "kafka_link",
"description":"test kafka link",
"link-type":"kafka",
"enabled":"true",
"link-config-values": {
"inputs": [{ "name": "bootstrap.servers", "value": "172.16.9.113:21007"},
{"name": "sasl.kerberos.service.name", "value": "kafka"},
{"name": "security.protocol","value": "SASL_PLAINTEXT"}// 平安模式为 SASL_PLAINTEXT,一般模式为 PLAINTEXT
]
}
}
### kafka link create
post https://{{host}}/api/v1/cdl/link
content-type: application/json
{
"name": "kafka_link",
"description":"test kafka link",
"link-type":"kafka",
"enabled":"true",
"link-config-values": {
"inputs": [{ "name": "bootstrap.servers", "value": "172.16.9.113:21007"},
{"name": "sasl.kerberos.service.name", "value": "kafka"},
{"name": "security.protocol","value": "SASL_PLAINTEXT"}// 平安模式为 SASL_PLAINTEXT,一般模式为 PLAINTEXT
]
}
}
### kafka link update
put https://{{host}}/api/v1/cdl/link/kafka_link
content-type: application/json
{
"name": "kafka_link",
"description":"test kafka link",
"link-type":"kafka",
"enabled":"true",
"link-config-values": {
"inputs": [{ "name": "bootstrap.servers", "value": "172.16.9.113:21007"},
{"name": "sasl.kerberos.service.name", "value": "kafka"},
{"name": "security.protocol","value": "SASL_PLAINTEXT"}// 平安模式为 SASL_PLAINTEXT,一般模式为 PLAINTEXT
]
}
}
CDL 工作命令局部 rest 申请代码
@hostname = 172.16.9.113
@port = 21495
@host = {{hostname}}:{{port}}
@bootstrap = "172.16.9.113:21007"
@bootstrap_normal = "172.16.9.113:21005"
@mysql_host = "172.16.2.118"
@mysql_port = "3306"
@mysql_database = "hudi"
@mysql_user = "root"
@mysql_password = "Huawei@123"
### create job
post https://{{host}}/api/v1/cdl/job
content-type: application/json
{
"job_type": "CDL_JOB", //job 类型, 目前只反对 CDL_JOB 这一种
"name": "mysql_to_kafka", //job 名称
"description":"mysql_to_kafka", //job 形容
"from-link-name": "MySQL_link", // 数据源 Link
"to-link-name": "kafka_link", // 指标源 Link
"from-config-values": {
"inputs": [{"name" : "connector.class", "value" : "com.huawei.cdc.connect.mysql.MysqlSourceConnector"},
{"name" : "schema", "value" : "hudi"},
{"name" : "db.name.alias", "value" : "hudi"},
{"name" : "whitelist", "value" : "hudisource"},
{"name" : "tables", "value" : "hudisource"},
{"name" : "tasks.max", "value" : "10"},
{"name" : "mode", "value" : "insert,update,delete"},
{"name" : "parse.dml.data", "value" : "true"},
{"name" : "schema.auto.creation", "value" : "false"},
{"name" : "errors.tolerance", "value" : "all"},
{"name" : "multiple.topic.partitions.enable", "value" : "false"},
{"name" : "topic.table.mapping", "value" : "[{\"topicName\":\"huditableout\", \"tableName\":\"hudisource\"}
]"
},
{"name" : "producer.override.security.protocol", "value" : "SASL_PLAINTEXT"},// 平安模式为 SASL_PLAINTEXT,一般模式为 PLAINTEXT
{"name" : "consumer.override.security.protocol", "value" : "SASL_PLAINTEXT"}// 平安模式为 SASL_PLAINTEXT,一般模式为 PLAINTEXT
]
},
"to-config-values": {"inputs": []},
"job-config-values": {
"inputs": [{"name" : "global.topic", "value" : "demo"}
]
}
}
### get all job
get https://{{host}}/api/v1/cdl/job
### submit job
put https://{{host}}/api/v1/cdl/job/mysql_to_kafka/start
### get job status
get https://{{host}}/api/v1/cdl/submissions?jobName=mysql_to_kafka
### stop job
put https://{{host}}/api/v1/cdl/job/mysql_to_kafka/submissions/13/stop
### delete job
DELETE https://{{host}}/api/v1/cdl/job/mysql_to_kafka
场景验证
生产库 MySQL 原始数据如下:
提交 CDL 工作之后
减少操作: insert into hudi.hudisource values (11,“蒋语堂”,38,“女”,“图”,“播放器”,28732);
对应 kafka 音讯体:
更改操作: UPDATE hudi.hudisource SET uname=‘Anne Marie333’WHERE uid=11;
对应 kafka 音讯体:
删除操作:delete from hudi.hudisource where uid=11;
对应 kafka 音讯体:
点击关注,第一工夫理解华为云陈腐技术~