摘要:CDL是一种简略、高效的数据实时集成服务,可能从各种OLTP数据库中抓取Data Change事件,而后推送至Kafka中,最初由Sink Connector生产Topic中的数据并导入到大数据生态软件应用中,从而实现数据的实时入湖。

本文分享自华为云社区《华为FusionInsight MRS CDL使用指南》,作者:晋红轻。

阐明

CDL是一种简略、高效的数据实时集成服务,可能从各种OLTP数据库中抓取Data Change事件,而后推送至Kafka中,最初由Sink Connector生产Topic中的数据并导入到大数据生态软件应用中,从而实现数据的实时入湖。

CDL服务蕴含了两个重要的角色:CDLConnector和CDLService。CDLConnector是具体执行数据抓取工作的实例,CDLService是负责管理和创立工作的实例。

本此实际介绍以mysql作为数据源进行数据抓取

前提条件

  • MRS集群已装置CDL服务。
  • MySQL数据库须要开启mysql的bin log性能(默认状况下是开启的)。

查看MySQL是否开启bin log:

应用工具或者命令行连贯MySQL数据库(本示例应用navicat工具连贯),执行show variables like 'log_%'命令查看。

例如在navicat工具抉择"File > New Query"新建查问,输出如下SQL命令,单击"Run"在后果中"log_bin"显示为"ON"则示意开启胜利。

show variables like 'log_%'

工具筹备

当初cdl只能应用rest api的形式进行命令提交,所以须要提前装置工具进行调试。本文应用VSCode工具。

实现之后装置rest client插件:

实现之后创立一个cdl.http的文件进行编辑:

创立CDL工作

CDL工作创立的流程图如下所示:

阐明:须要先创立一个MySQL link, 在创立一个Kafka link, 而后再创立一个CDL同步工作并启动。

MySQL link局部rest申请代码

@hostname = 172.16.9.113@port = 21495@host = {{hostname}}:{{port}}@bootstrap = "172.16.9.113:21007"@bootstrap_normal = "172.16.9.113:21005"@mysql_host = "172.16.2.118"@mysql_port = "3306"@mysql_database = "hudi"@mysql_user = "root"@mysql_password = "Huawei@123"### get linksget https://{{host}}/api/v1/cdl/link### mysql link validatepost https://{{host}}/api/v1/cdl/link?validate=truecontent-type: application/json{"name": "MySQL_link", //link名,全局惟一,不能反复"description":"MySQL connection", //link形容"link-type":"mysql", //link的类型"enabled":"true","link-config-values":  {"inputs": [        { "name": "host", "value": {{mysql_host}} }, //数据库装置节点的ip        { "name": "port", "value": {{mysql_port}} },//数据库监听的端口        { "name": "database.name", "value": {{mysql_database}} }, //连贯的数据库名        { "name": "user", "value": {{mysql_user}} }, //用户        { "name": "password","value": {{mysql_password}} } ,//明码        { "name":"schema", "value": {{mysql_database}}}//同数据库名        ]    }}### mysql link createpost https://{{host}}/api/v1/cdl/linkcontent-type: application/json{"name": "MySQL_link", //link名,全局惟一,不能反复"description":"MySQL connection", //link形容"link-type":"mysql", //link的类型"enabled":"true","link-config-values":  {"inputs": [        { "name": "host", "value": {{mysql_host}} }, //数据库装置节点的ip        { "name": "port", "value": {{mysql_port}} },//数据库监听的端口        { "name": "database.name", "value": {{mysql_database}} }, //连贯的数据库名        { "name": "user", "value": {{mysql_user}} }, //用户        { "name": "password","value": {{mysql_password}} } ,//明码        { "name":"schema", "value": {{mysql_database}}}//同数据库名        ]    }}### mysql link updateput https://{{host}}/api/v1/cdl/link/MySQL_linkcontent-type: application/json{"name": "MySQL_link", //link名,全局惟一,不能反复"description":"MySQL connection", //link形容"link-type":"mysql", //link的类型"enabled":"true","link-config-values":  {"inputs": [        { "name": "host", "value": {{mysql_host}} }, //数据库装置节点的ip        { "name": "port", "value": {{mysql_port}} },//数据库监听的端口        { "name": "database.name", "value": {{mysql_database}} }, //连贯的数据库名        { "name": "user", "value": {{mysql_user}} }, //用户        { "name": "password","value": {{mysql_password}} } ,//明码        { "name":"schema", "value": {{mysql_database}}}//同数据库名        ]    }}

Kafka link局部rest申请代码

### get linksget https://{{host}}/api/v1/cdl/link### kafka link validatepost https://{{host}}/api/v1/cdl/link?validate=truecontent-type: application/json{"name": "kafka_link","description":"test kafka link","link-type":"kafka","enabled":"true","link-config-values":  {"inputs": [        { "name": "bootstrap.servers", "value": "172.16.9.113:21007" },        { "name": "sasl.kerberos.service.name", "value": "kafka" },        { "name": "security.protocol","value": "SASL_PLAINTEXT" }//平安模式为SASL_PLAINTEXT,一般模式为PLAINTEXT        ]    }}### kafka link createpost https://{{host}}/api/v1/cdl/linkcontent-type: application/json{"name": "kafka_link","description":"test kafka link","link-type":"kafka","enabled":"true","link-config-values":  {"inputs": [        { "name": "bootstrap.servers", "value": "172.16.9.113:21007" },        { "name": "sasl.kerberos.service.name", "value": "kafka" },        { "name": "security.protocol","value": "SASL_PLAINTEXT" }//平安模式为SASL_PLAINTEXT,一般模式为PLAINTEXT        ]    }}### kafka link updateput https://{{host}}/api/v1/cdl/link/kafka_linkcontent-type: application/json{"name": "kafka_link","description":"test kafka link","link-type":"kafka","enabled":"true","link-config-values":  {"inputs": [        { "name": "bootstrap.servers", "value": "172.16.9.113:21007" },        { "name": "sasl.kerberos.service.name", "value": "kafka" },        { "name": "security.protocol","value": "SASL_PLAINTEXT" }//平安模式为SASL_PLAINTEXT,一般模式为PLAINTEXT        ]    }}

CDL工作命令局部rest申请代码

@hostname = 172.16.9.113@port = 21495@host = {{hostname}}:{{port}}@bootstrap = "172.16.9.113:21007"@bootstrap_normal = "172.16.9.113:21005"@mysql_host = "172.16.2.118"@mysql_port = "3306"@mysql_database = "hudi"@mysql_user = "root"@mysql_password = "Huawei@123"### create jobpost https://{{host}}/api/v1/cdl/jobcontent-type: application/json{    "job_type": "CDL_JOB", //job类型,目前只反对CDL_JOB这一种    "name": "mysql_to_kafka", //job名称    "description":"mysql_to_kafka", //job形容    "from-link-name": "MySQL_link",  //数据源Link    "to-link-name": "kafka_link", //指标源Link    "from-config-values": {        "inputs": [            {"name" : "connector.class", "value" : "com.huawei.cdc.connect.mysql.MysqlSourceConnector"},            {"name" : "schema", "value" : "hudi"},            {"name" : "db.name.alias", "value" : "hudi"},            {"name" : "whitelist", "value" : "hudisource"},            {"name" : "tables", "value" : "hudisource"},            {"name" : "tasks.max", "value" : "10"},            {"name" : "mode", "value" : "insert,update,delete"},            {"name" : "parse.dml.data", "value" : "true"},            {"name" : "schema.auto.creation", "value" : "false"},            {"name" : "errors.tolerance", "value" : "all"},            {"name" : "multiple.topic.partitions.enable", "value" : "false"},            {"name" : "topic.table.mapping", "value" : "[                    {\"topicName\":\"huditableout\", \"tableName\":\"hudisource\"}                ]"            },              {"name" : "producer.override.security.protocol", "value" : "SASL_PLAINTEXT"},//平安模式为SASL_PLAINTEXT,一般模式为PLAINTEXT            {"name" : "consumer.override.security.protocol", "value" : "SASL_PLAINTEXT"}//平安模式为SASL_PLAINTEXT,一般模式为PLAINTEXT        ]    },    "to-config-values": {"inputs": []},    "job-config-values": {        "inputs": [            {"name" : "global.topic", "value" : "demo"}        ]    }}### get all jobget https://{{host}}/api/v1/cdl/job### submit jobput https://{{host}}/api/v1/cdl/job/mysql_to_kafka/start### get job statusget https://{{host}}/api/v1/cdl/submissions?jobName=mysql_to_kafka### stop jobput https://{{host}}/api/v1/cdl/job/mysql_to_kafka/submissions/13/stop### delete jobDELETE https://{{host}}/api/v1/cdl/job/mysql_to_kafka

场景验证

生产库MySQL原始数据如下:

提交CDL工作之后

减少操作: insert into hudi.hudisource values (11,“蒋语堂”,38,“女”,“图”,“播放器”,28732);

对应kafka音讯体:

更改操作: UPDATE hudi.hudisource SET uname=‘Anne Marie333’ WHERE uid=11;

对应kafka音讯体:

删除操作:delete from hudi.hudisource where uid=11;

对应kafka音讯体:

点击关注,第一工夫理解华为云陈腐技术~