乐趣区

关于java:多库多表场景下使用-Amazon-EMR-CDC-实时入湖最佳实践

一、前言

CDC(Change Data Capture) 从狭义上讲所有可能捕捉变更数据的技术都能够称为 CDC,但本篇文章中对 CDC 的定义限定为以非侵入的形式实时捕捉数据库的变更数据。例如:通过解析 MySQL 数据库的 Binlog 日志捕捉变更数据,而不是通过 SQL Query 源表捕捉变更数据。Hudi 作为最热的数据湖技术框架之一, 用于构建具备增量数据处理管道的流式数据湖。其外围的能力包含对象存储上数据行级别的疾速更新和删除,增量查问 (Incremental queries,Time Travel),小文件治理和查问优化(Clustering,Compactions,Built-in metadata),ACID 和并发写反对。Hudi 不是一个 Server,它自身不存储数据,也不是计算引擎,不提供计算能力。其数据存储在 S3(也反对其它对象存储和 HDFS),Hudi 来决定数据以什么格局存储在 S3(Parquet,Avro,…), 什么形式组织数据能让实时摄入的同时反对更新,删除,ACID 等个性。Hudi 通过 Spark,Flink 计算引擎提供数据写入, 计算能力,同时也提供与 OLAP 引擎集成的能力,使 OLAP 引擎可能查问 Hudi 表。从应用上看 Hudi 就是一个 JAR 包,启动 Spark, Flink 作业的时候带上这个 JAR 包即可。Amazon EMR 上的 Spark,Flink,Presto,Trino 原生集成 Hudi, 且 EMR 的 Runtime 在 Spark,Presto 引擎上相比开源有 2 倍以上的性能晋升。在多库多表的场景下(比方:百级别库表),当咱们须要将数据库(mysql,postgres,sqlserver,oracle,mongodb 等) 中的数据通过 CDC 的形式以分钟级别 (1minute+) 提早写入 Hudi,并以增量查问的形式构建数仓档次,对数据进行实时高效的查问剖析时。咱们要解决三个问题,第一,如何应用对立的代码实现百级别库表 CDC 数据并行写入 Hudi,升高开发保护老本。第二,源端 Schema 变更如何同步到 Hudi 表。第三,应用 Hudi 增量查问构建数仓档次比方 ODS->DWD->DWS (各层均是 Hudi 表),DWS 层的增量聚合如何实现。本篇文章举荐的计划是: 应用 Flink CDC DataStream API (非 SQL)先将 CDC 数据写入 Kafka,而不是间接通过 Flink SQL 写入到 Hudi 表,次要起因如下,第一,在多库表且 Schema 不同的场景下,应用 SQL 的形式会在源端建设多个 CDC 同步线程,对源端造成压力,影响同步性能。第二,没有 MSK 做 CDC 数据上下游的解耦和数据缓冲层,上游的多端生产和数据回溯比拟艰难。CDC 数据写入到 MSK 后,举荐应用 Spark Structured Streaming DataFrame API 或者 Flink StatementSet 封装多库表的写入逻辑,但如果须要源端 Schema 变更主动同步到 Hudi 表,应用 Spark Structured Streaming DataFrame API 实现更为简略,应用 Flink 则须要基于 HoodieFlinkStreamer 做额定的开发。Hudi 增量 ETL 在 DWS 层须要数据聚合的场景的下,能够通过 Flink Streaming Read 将 Hudi 作为一个无界流,通过 Flink 计算引擎实现数据实时聚合计算写入到 Hudi 表。

亚马逊云科技开发者社区为开发者们提供寰球的开发技术资源。这里有技术文档、开发案例、技术专栏、培训视频、流动与比赛等。帮忙中国开发者对接世界最前沿技术,观点,和我的项目,并将中国优良开发者或技术举荐给寰球云社区。如果你还没有关注 / 珍藏,看到这里请肯定不要匆匆划过,点这里让它成为你的技术宝库!

二、架构设计与解析

2.1 CDC 数据实时写入 MSK

图中标号 1,2 是将数据库中的数据通过 CDC 形式实时发送到 MSK (Amazon 托管的 Kafka 服务)。flink-cdc-connectors 是以后比拟风行的 CDC 开源工具。它内嵌 debezium 引擎,反对多种数据源,对于 MySQL 反对 Batch 阶段 (全量同步阶段) 并行,无锁,Checkpoint (能够从失败地位复原,无需从新读取,对大表敌对)。反对 Flink SQL API 和 DataStream API,这里须要留神的是如果应用 SQL API 对于库中的每张表都会独自创立一个链接,独立的线程去执行 binlog dump。如果须要同步的表比拟多,会对源端产生较大的压力。在须要整库同步表十分多的场景下,应该应用 DataStream API 写代码的形式只建一个 binlog dump 同步所有须要的库表。另一种场景是如果只同步分库分表的数据,比方 user 表做了分库,分表,其表 Schema 都是一样的,Flink CDC 的 SQL API 反对正则匹配多个库表,这时应用 SQL API 同步仍然只会建设一个 binlog dump 线程。须要阐明的是通过 Flink CDC 能够间接将数据 Sink 到 Hudi, 两头无需 MSK,但思考到上下游的解耦,数据的回溯,多业务端生产,多表治理保护,仍然倡议 CDC 数据先到 MSK,上游再从 MSK 接数据写入 Hudi。

2.2 CDC 工具比照

图中标号 3,除了 flink-cdc-connectors 之外,DMS (Amazon Database Migration Services) 是 Amazon 托管的数据迁徙服务,提供多种数据源 (mysql,oracle,sqlserver,postgres,mongodb,documentdb 等)的 CDC 反对,反对可视化的 CDC 工作配置,运行,治理,监控。因而能够抉择 DMS 作为 CDC 的解析工具,DMS 反对将 MSK 或者自建 Kafka 作为数据投递的指标,所以 CDC 实时同步到 MSK 通过 DMS 能够疾速可视化配置管理。当然除了 DMS 之外还有很多开源的 CDC 工具,也能够实现 CDC 的同步工作,但须要在 EC2 上搭建相干服务。下图列出了 CDC 工具的比照项,供大家参考

2.3 Spark Structured Streaming 多库表并行写 Hudi 及 Schema 变更
图中标号 4,CDC 数据到了 MSK 之后,能够通过 Spark/Flink 计算引擎生产数据写入到 Hudi 表,咱们把这一层咱们称之为 ODS 层。无论 Spark 还是 Flink 都能够做到数据 ODS 层的数据落地,应用哪一个咱们须要综合考量,这里论述一些绝对重要的点。首先对于 Spark 引擎,咱们肯定是应用 Spark Structured Streaming 生产 MSK 写入 Hudi,因为能够应用 DataFrame API 写 Hudi, 因而在 Spark 中能够不便的实现生产 CDC Topic 并依据其每条数据中的元信息字段 (数据库名称,表名称等) 在单作业内分流写入不同的 Hudi 表,封装多表并行写入逻辑,一个 Job 即可实现整库多表同步的逻辑。样例代码截图如下,残缺代码点击 Github 获取

咱们晓得 CDC 数据中是带着 I(insert)、U(update)、D(delete) 信息的, 不同的 CDC 工具数据格式不同,但要表白的含意是统一的。应用 Spark 写入 Hudi 咱们次要关注 U、D 信息,数据带着 U 信息示意该条数据是一个更新操作,对于 Hudi 而言只有设定源表的主键为 Hudi 的 recordKey,同时依据需要场景设定 precombineKey 即可。这里对 precombineKey 做一个阐明,它示意的是当数据须要更新时 (recordKey 雷同), 默认抉择两条数据中 precombineKey 的大保留在 Hudi 中。其实 Hudi 有非常灵活的 Payload 机制,通过参数 hoodie.datasource.write.payload.class 能够抉择不同的 Payload 实现,比方 Partial Update (局部字段更新) 的 Payload 实现 OverwriteNonDefaultsWithLatestAvroPayload,也能够自定义 Payload 实现类,它外围要做的就是如何依据 precombineKey 指定的字段更新数据。所以对于 CDC 数据 Sink Hudi 而言,咱们须要保障上游的音讯程序,只有咱们表中有能判断哪条数据是最新的数据的字段即可,那这个字段在 MySQL 中往往咱们设计成数据更新工夫 modify_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP。如果没有相似字段,倡议定义设计规范加上这个字段,否则就必须保证数据有序(这会给架构设计和性能带来更多的阻力),不然数据在 Hudi 中 Updata 的后果可能就是错的。对于带着 D 信息的数据,它示意这条数据在源端被删除,Hudi 是提供删除能力的,其中一种形式是当一条数据中蕴含 _hoodie_is_deleted 字段,且值为 true 是,Hudi 会主动删除此条数据,这在 Spark Structured Streaming 代码中很容易实现,只需在 map 操作实现增加一个字段且当数据中蕴含 D 信息设定字段值为 true 即可。

2.4 Flink StatementSet 多库表 CDC 并行写 Hudi

对于应用 Flink 引擎生产 MSK 中的 CDC 数据落地到 ODS 层 Hudi 表,如果想要在一个 JOB 实现整库多张表的同步,Flink StatementSet 来实现通过一个 Kafka 的 CDC Source 表,依据元信息抉择库表 Sink 到 Hudi 中。但这里须要留神的是因为 Flink 和 Hudi 集成,是以 SQL 形式先创立表,再执行 Insert 语句写入到该表中的,如果须要同步的表有上百之多,封装一个自动化的逻辑可能加重咱们的工作,你会发现 SQL 形式写入 Hudi 尽管对于单表写入应用上很不便,不必编程只须要写 SQL 即可,但也带来了一些限度,因为写入 Hudi 时是通过 SQL 先建表,Schema 在建表时已将定义,如果源端 Schema 变更,通过 SQL 形式是很难实现上游 Hudi 表 Schema 的主动变更的。尽管在 Hudi 的官网并未提供 Flink DataStream API 写入 Hudi 的例子,但 Flink 写入 Hudi 是能够通过 HoodieFlinkStreamer 以 DataStream API 的形式实现,在 Hudi 源码中能够找到。因而如果想要更加灵便简略的实现多表的同步,以及 Schema 的主动变更,须要自行参照 HoodieFlinkStreamer 代码以 DataStream API 的形式写 Hudi。对于 I,U,D 信息,Flink 的 debezium ,maxwell,canal format 会间接将音讯解析为 Flink 的 changelog 流,换句话说就是 Flink 会将 I,U,D 操作间接解析成 Flink 外部的数据结构 RowData,间接 Sink 到 Hudi 表即可,咱们同样须要在 SQL 中设定 recordKey,precombineKey,也能够设定 Payload class 的不同实现类。

2.5 Flink Streaming Read 模式读 Hudi 实现 ODS 层聚合

图中标号 5,数据通过 Spark/Flink 落地到 ODS 层后,咱们可能须要构建 DWD 和 DWS 层对数据做进一步的加工解决,(DWD 和 DWS 并非必须的,依据你的场景而定,你能够间接让 OLAP 引擎查问 ODS 层的 Hudi 表)咱们心愿可能应用到 Hudi 的增量查问能力,只查问变更的数据来做后续 DWD 和 DWS 的 ETL,这样可能减速构建同时缩小资源耗费。对于 Spark 引擎,在 DWD 层如果仅仅是对数据做 map,fliter 等相干类型操作,是能够应用增量查问的,但如果 DWD 层的构建有 Join 操作,是无奈通过增量查问实现的,只能全表 (或者分区) 扫描。DWS 层的构建如果聚合类型的操作没有去重,窗口类型的操作,只是 SUM, AVG,MIN, MAX 等类型的操作,能够通过增量查问之后和指标表做 Merge 实现,反之,只能全表 (或者分区) 扫描。对于 Flink 引擎来构建 DWD 和 DWS, 因为 Flink 反对 Hudi 表的 streaming read, 在 SQL 设定 read.streaming.enabled= true,changelog.enabled=true 等相干流式读取的参数即可。设定后 Flink 把 Hudi 表当做了一个无界的 changelog 流表,无论怎样做 ETL 都是反对的,Flink 会本身存储状态信息,整个 ETL 的链路是流式的。

2.6 OLAP 引擎查问 Hudi 表

图中标号 6, EMR Hive/Presto/Trino 都能够查问 Hudi 表,但须要留神的是不同引擎对于查问的反对是不同的, 参见官网,这些引擎对于 Hudi 表只能查问,不能写入。对于 Schema 的主动变更,首先 Hudi 本身是反对 Schema Evolution, 咱们想要做到源端 Schema 变更主动同步到 Hudi 表,通过上文的形容,能够晓得如果应用 Spark 引擎,能够通过 DataFrame API 操作数据,通过 from_json 动静生成 DataFrame,因而能够较为不便的实现主动增加列。如果应用 Flink 引擎上文曾经阐明想要主动实现 Schema 的变更,通过 HoodieFlinkStreamer 以 DataStream API 的形式实现 Hudi 写入的同时融入 Schema 变更的逻辑。

三、EMR CDC 整库同步 Demo

接下的 Demo 操作中会抉择 RDS MySQL 作为数据源,Flink CDC DataStream API 同步库中的所有表到 Kafka,应用 Spark 引擎生产 Kafka 中 binlog 数据实现多表写入 ODS 层 Hudi,应用 Flink 引擎以 streaming read 的模式做 DWD 和 DWS 层的 Hudi 表构建。

3.1 环境信息

EMR 6.6.0 
Hudi 0.10.0 
Spark 3.2.0 
Flink 1.14.2  
Presto 0.267
MySQL 5.7.34

3.2 创立源表

在 MySQL 中创立 test_db 库及 user,product,user_order 三张表,插入样例数据,后续 CDC 先加载表中已有的数据,之后源增加新数据并批改表构造增加新字段,验证 Schema 变更主动同步到 Hudi 表。

-- create databases
create database if not exists test_db default character set utf8mb4 collate utf8mb4_general_ci;
use test_db;

-- create  user table
drop table if exists user;
create table if not exists user
(
    id           int auto_increment primary key,
    name         varchar(155)                        null,
    device_model varchar(155)                        null,
    email        varchar(50)                         null,
    phone        varchar(50)                         null,
    create_time  timestamp default CURRENT_TIMESTAMP not null,
    modify_time  timestamp default CURRENT_TIMESTAMP null on update CURRENT_TIMESTAMP
)charset = utf8mb4;

-- insert data
insert into user(name,device_model,email,phone) values
('customer-01','dm-01','abc01@email.com','188776xxxxx'),
('customer-02','dm-02','abc02@email.com','166776xxxxx');

-- create product table
drop table if exists product;
create table if not exists product
(
    pid          int not null primary key,
    pname        varchar(155)                        null,
    pprice       decimal(10,2)                           ,
    create_time  timestamp default CURRENT_TIMESTAMP not null,
    modify_time  timestamp default CURRENT_TIMESTAMP null on update CURRENT_TIMESTAMP
)charset = utf8mb4;

-- insert data
insert into product(pid,pname,pprice) values
('1','prodcut-001',125.12),
('2','prodcut-002',225.31);

-- create order table
drop table if exists user_order;
create table if not exists user_order
(
    id           int auto_increment primary key,
    oid          varchar(155)                        not null,
    uid          int                                         ,
    pid          int                                         ,
    onum         int                                         ,
    create_time  timestamp default CURRENT_TIMESTAMP not null,
    modify_time  timestamp default CURRENT_TIMESTAMP null on update CURRENT_TIMESTAMP
)charset = utf8mb4;

-- insert data
insert into user_order(oid,uid,pid,onum) values 
('o10001',1,1,100),
('o10002',1,2,30),
('o10001',2,1,22),
('o10002',2,2,16);

-- select data
select * from user;
select * from product;
select * from user_order;
复制代码

3.3 Flink CDC 发送数据到 Kafka

应用 DataStream API 编写 CDC 同步程序。样例代码 Github

# 创立 topic
kafka-topics.sh --create --zookeeper ${zk}  --replication-factor 2 --partitions 8  --topic cdc_topic
# 下载代码,编译打包
mvn clean package  -Dscope.type=provided  -DskipTests
# 也能够应用曾经打好的包,进入 EMR 主节点,执行命令
wget https://dxs9dnjebzm6y.cloudfront.net/tmp/emr-flink-cdc-1.0-SNAPSHOT.jar
# disalbe check-leaked-classloader
sudo sed -i -e '$a\classloader.check-leaked-classloader: false' /etc/flink/conf/flink-conf.yaml
# 启动 flink cdc 发送数据到 Kafka
sudo flink run -m yarn-cluster \
-yjm 1024 -ytm 2048 -d \
-ys 4 -p 8 \
-c  com.aws.analytics.MySQLCDC  \
/home/hadoop/emr-flink-cdc-1.0-SNAPSHOT.jar \
-b xxxxx.amazonaws.com:9092 \
-t cdc_topic_001 \
-c s3://xxxxx/flink/checkpoint/ \
-l 30 -h xxxxx.rds.amazonaws.com:3306 -u admin \
-P admin123456 \
-d test_db -T test_db.* \
-p 4 \
-e 5400-5408
# 相干的参数阐明如下
MySQLCDC 1.0
Usage: MySQLCDC [options]

  -c, --checkpointDir <value>
                           checkpoint dir
  -l, --checkpointInterval <value>
                           checkpoint interval: default 60 seconds
  -b, --brokerList <value>
                           kafka broker list,sep comma
  -t, --sinkTopic <value>  kafka topic
  -h, --host <value>       mysql hostname, eg. localhost:3306
  -u, --username <value>   mysql username
  -P, --pwd <value>        mysql password
  -d, --dbList <value>     cdc database list: db1,db2,..,dbn
  -T, --tbList <value>     cdc table list: db1.*,db2.*,db3.tb*...,dbn.*
  -p, --parallel <value>   cdc source parallel
  -s, --position <value>   cdc start position: initial or latest,default: initial
  -e, --serverId <value>   cdc server id
  
# 生产 Kafka topic 察看数据
./kafka_2.12-2.6.2/bin/kafka-console-consumer.sh --bootstrap-server $brok --topic cdc_topic_001 --from-beginning |jq .

3.4 Spark 生产 CDC 数据整库同步

# 整库同步样例代码  https://github.com/yhyyz/emr-hudi-example/blob/main/src/main/scala/com/aws/analytics/Debezium2Hudi.scala

# 下载代码,编译打包
mvn clean package  -Dscope.type=provided  -DskipTests
# 也能够应用曾经打好的包,进入 EMR 主节点,执行命令
wget https://dxs9dnjebzm6y.cloudfront.net/tmp/emr-hudi-example-1.0-SNAPSHOT-jar-with-dependencies.jar 

# 执行如下命令提交作业,命令中设定 -s hms,hudi 表同步到 Glue Catalog
spark-submit  --master yarn \
--deploy-mode client \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 2 \
--num-executors  2 \
--conf "spark.dynamicAllocation.enabled=false" \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.sql.hive.convertMetastoreParquet=false" \
--jars  /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar \
--class com.aws.analytics.Debezium2Hudi /home/hadoop/emr-hudi-example-1.0-SNAPSHOT-jar-with-dependencies.jar \
-e prod -b xxxxx.amazonaws.com:9092 \
-t cdc_topic_001 -p emr-cdc-group-02 -s true \
-o earliest \
-i 60 -y cow -p 10 \
-c s3://xxxxx/spark-checkpoint/emr-hudi-cdc-005/ \
-g s3://xxxxx/emr-hudi-cdc-005/ \
-r jdbc:hive2://localhost:10000  \
-n hadoop -w upsert  \
-s hms \
--concurrent false \
-m "{\"tableInfo\":[{\"database\":\"test_db\",\"table\":\"user\",\"recordKey\":\"id\",\"precombineKey\":\"modify_time\",\"partitionTimeColumn\":\"create_time\",\"hudiPartitionField\":\"year_month\"},
{\"database\":\"test_db\",\"table\":\"user_order\",\"recordKey\":\"id\",\"precombineKey\":\"modify_time\",\"partitionTimeColumn\":\"create_time\",\"hudiPartitionField\":\"year_month\"},{\"database\":\"test_db\",\"table\":\"product\",\"recordKey\":\"pid\",\"precombineKey\":\"modify_time\",\"partitionTimeColumn\":\"create_time\",\"hudiPartitionField\":\"year_month\"}]}"

# 相干参数阐明如下:Debezium2Hudi 1.0
Usage: spark ss Debezium2Hudi [options]

  -e, --env <value>        env: dev or prod
  -b, --brokerList <value>
                           kafka broker list,sep comma
  -t, --sourceTopic <value>
                           kafka topic
  -p, --consumeGroup <value>
                           kafka consumer group
  -s, --syncHive <value>   whether sync hive,default:false
  -o, --startPos <value>   kafka start pos latest or earliest,default latest
  -m, --tableInfoJson <value>
                           table info json str
  -i, --trigger <value>    default 300 second,streaming trigger interval
  -c, --checkpointDir <value>
                           hdfs dir which used to save checkpoint
  -g, --hudiEventBasePath <value>
                           hudi event table hdfs base path
  -y, --tableType <value>  hudi table type MOR or COW. default COW
  -t, --morCompact <value>
                           mor inline compact,default:true
  -m, --inlineMax <value>  inline max compact,default:20
  -r, --syncJDBCUrl <value>
                           hive server2 jdbc, eg. jdbc:hive2://localhost:10000
  -n, --syncJDBCUsername <value>
                           hive server2 jdbc username, default: hive
  -p, --partitionNum <value>
                           repartition num,default 16
  -w, --hudiWriteOperation <value>
                           hudi write operation,default insert
  -u, --concurrent <value>
                           write multiple hudi table concurrent,default false
  -s, --syncMode <value>   sync mode,default jdbc, glue catalog set dms
  -z, --syncMetastore <value>
                           hive metastore uri,default thrift://localhost:9083
                           
# 下图能够看到表曾经同步到 Glue Catalog , 数据曾经写入到 S3
-- 向 MySQL 的 user 表中增加一列,并插入一条新数据, 查问 hudi 表,能够看到新列和数据曾经主动同步到 user 表,留神以下 SQL 在 MySQL 端执行
alter table user add column age int
insert into user(name,device_model,email,phone,age) values
('customer-03','dm-03','abc03@email.com','199776xxxxx',18);

3.5 Flink Streaming Read 实时聚合

# 留神最初一个参数,-t 是把 /etc/hive/conf/hive-site.xml 退出到 classpath,这样 hudi 执行表同步到 Glue 是就能够退出加载到这个配置,配置中的要害是 hive.metastore.client.factory.class = com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory,这样就能够加载用到 Glue 的 Catalog 实现. 如果 EMR 集群启动时就抉择了 Glue Metastore, 该文件中 /etc/hive/conf/hive-site.xml 曾经配置了 AWSGlueDataCatalogHiveClientFactory. 如果启动 EMR 没有抉择 Glue Metastore, 还须要同步数据到 Glue,须要手动加上。# 留神替换为你的 S3 Bucket
checkpoints=s3://xxxxx/flink/checkpoints/datagen/

flink-yarn-session -jm 1024 -tm 4096 -s 2  \
-D state.backend=rocksdb \
-D state.checkpoint-storage=filesystem \
-D state.checkpoints.dir=${checkpoints} \
-D execution.checkpointing.interval=5000 \
-D state.checkpoints.num-retained=5 \
-D execution.checkpointing.mode=EXACTLY_ONCE \
-D execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \
-D state.backend.incremental=true \
-D execution.checkpointing.max-concurrent-checkpoints=1 \
-D rest.flamegraph.enabled=true \
-d \
-t /etc/hive/conf/hive-site.xml 

# 启动 Flink sql client
/usr/lib/flink/bin/sql-client.sh embedded -j /usr/lib/hudi/hudi-flink-bundle.jar shell
-- user 表,开启 streaming read, changelog.enalbe=true
set sql-client.execution.result-mode=tableau;

CREATE TABLE `user`(
    id string,
    name STRING,
    device_model STRING,
    email STRING,
    phone STRING,
    age string,
    create_time STRING,
    modify_time STRING,
    year_month STRING
)
PARTITIONED BY (`year_month`)
WITH (
  'connector' = 'hudi',
  'path' = 's3://xxxxx/emr-hudi-cdc-005/test_db/user/',
  'hoodie.datasource.write.recordkey.field' = 'id',
  'table.type' = 'COPY_ON_WRITE',
  'index.bootstrap.enabled' = 'true',
  'read.streaming.enabled' = 'true',
  'read.start-commit' = '20220607014223',
  'changelog.enabled' = 'false',
  'read.streaming.check-interval' = '1'
);

# 实时查问数据
select * from `user`;

# 在 MySQL 中批改 user 表中 id= 3 的 name 为 new-customer-03,留神以下 SQL 在 MySQL 端执行
update  user set name="new-customer-03" where id=3;

# 在 Flink 端能够能够看到数据变更
-- Flink 聚合操作 Sink 到 Hudi 表

-- batch
CREATE TABLE  user_agg(
num BIGINT,
device_model STRING
)WITH(
  'connector' = 'hudi',
  'path' = 's3://xxxxx/emr-cdc-hudi/user_agg/',
  'table.type' = 'COPY_ON_WRITE',  
  'write.precombine.field' = 'device_model',
  'write.operation' = 'upsert',
  'hoodie.datasource.write.recordkey.field' = 'device_model',
  'hive_sync.database' = 'dws',
  'hive_sync.enable' = 'true',
  'hive_sync.table' = 'user_agg',
  'hive_sync.mode' = 'HMS',
  'hive_sync.use_jdbc' = 'false',
  'hive_sync.username' = 'hadoop'
);

insert into user_agg select count(1) as num, device_model from `user` group by device_model;

# 动静参数关上,对 user_agg 表进行 streaming 读取,查看实时变动后果
set table.dynamic-table-options.enabled=true;
select *  from user_agg/*+ OPTIONS('read.streaming.enabled'='true','read.start-commit' = '20220607014223')*/ 

# 能够在 MySQL 源端多增加几条数据,查看数据后果,留神以下 SQL 在 MySQL 端执行
insert into user(name,device_model,email,phone,age) values ('customer-03','dm-03','abc03@email.com','199776xxxxx',18);

四、总结

本篇文章解说了如何通过 EMR 实现 CDC 数据入湖及 Schema 的主动变更。通过 Flink CDC DataStream API 先将整库数据发送到 MSK,这时 CDC 在源端只有一个 binlog dump 线程,升高对源端的压力。应用 Spark Structured Streaming 动静解析数据写入到 Hudi 表来实现 Shema 的主动变更,实现单个 Job 治理多表 Sink, 多表状况下升高开发保护老本,能够并行或者串行写多张 Hudi 表,元数据同步 Glue Catalog。应用 Flink Hudi 的 Streaming Read 模式实现实时数据 ETL,满足 DWD 和 DWS 层的实时 Join 和聚合的需要。Amazon EMR 环境中原生集成 Hudi, 应用 Amazon EMR 轻松构建了整库同步的 Demo。

本篇作者

潘超
亚马逊云科技数据分析解决方案架构师。负责客户大数据解决方案的征询与架构设计,在开源大数据方面领有丰盛的教训。工作之外喜爱爬山。

文章链接:https://dev.amazoncloud.cn/column/article/6309e29be0f88a79bcf…

退出移动版