乐趣区

关于云计算:如何构建智能湖仓架构亚马逊工程师的代码实践来了

数据仓库的数据体系严格、治理容易,业务规模越大,ROI 越高;数据湖的数据品种丰盛,治理艰难,业务规模越大,ROI 越低,但胜在灵便。

当初,鱼和熊掌我都想要,应该怎么办?湖仓一体架构就在这种状况下,疾速在产业内遍及。

要构建湖仓一体架构并不容易,须要解决十分多的数据问题。比方,计算层、存储层、异构集群层都要买通,对元数据要进行对立的治理和治理。对于很多业内技术团队而言,曾经是个比拟大的挑战。

可即便如此,在亚马逊云科技技术专家潘超看来,也未必最能贴合企业级大数据处理的最新理念。在 11 月 18 日早晨 20:00 的直播中,潘超具体分享了亚马逊云科技眼中的智能湖仓架构,以及以流式数据接入为主的最佳实际。

01 现代化数据平台架构的要害指标

传统湖仓一体架构的不足之处是,着重解决点的问题,也就是“湖”和“仓”的买通,而漠视了面的问题:数据在整个数据平台的自在流转。

潘超认为,古代数据平台架构应该具备几个要害特色:

  1. 以任何规模来存储数据;
  2. 在整套架构波及的所有产品体系中,获得最佳性价比;
  3. 实现无缝的数据拜访,实现数据的自在流动;
  4. 实现数据的对立治理;用 AI/ML 解决业务难题;

在构建企业级古代数据平台架构时,这五个要害特色,本质上笼罩了三方视角 ——

对于架构师而言,第一点和第二点值得引起留神。前者是迁徙上云的一大外围诉求,后者是架构评审肯定会过问的外围事项;

对于开发者而言,第三点和第四点尤为重要,对元数据的治理最重要实现的是数据在整个零碎内的自在流动和拜访,而不仅仅是买通数据湖和数据仓库;

对于产品经理而言,第五点点明了当下大数据平台的价值导向,即数据的收集和治理,应以解决业务问题为指标。

为了不便了解,也不便通过 Demo 演示,潘超将这套架构体系,等同替换为了亚马逊云科技现有产品体系,包含:Amazon Athena、Amazon Aurora、Amazon MSK、Amazon EMR 等,而流式数据入湖,重点波及 Amazon MSK、Amazon EMR,以及另一个外围服务:Apache Hudi。

02 Amazon MSK 的扩大能力与最佳实际

Amazon MSK 是亚马逊托管的高可用、强平安的 Kafka 服务,是数据分析畛域,负责消息传递的根底,也因而在流式数据入湖局部无足轻重。

之所以以 Amazon MSK 举例,而不是批改 Kafka 代码间接构建这套零碎,是为了最大水平将开发者的注意力聚焦于流式利用自身,而不是治理和保护基础设施。况且,一旦你决定从头构建 PaaS 层基础设施,波及到的工作就不仅仅是拉起一套 Kafka 集群了。一张图能够很形象地反映这个问题:

这张图从左至右,顺次为不应用任何云服务的工作列表,应用 Amazon EC2 的工作列表,以及应用 Amazon MSK 的工作列表,工作量和 ROI 高下立现。

而对于 Amazon MSK 来说,扩大能力是其重要个性。Amazon MSK 能够主动扩容,也能够手动 API 扩容。但如果对本人的“入手能力”没有短缺的信念,倡议抉择主动扩容。

Amazon MSK 的主动扩容能够依据存储利用率来设定阈值,倡议设定 50%-60%。主动扩容每次扩大 Max(10GB,10%* 集群存储空间),同时主动扩大每次有 6 个小时的冷却工夫。一次如果一次须要扩容更大的容量,能够应用手动扩容。

这种扩容既包含横向扩容 —— 通过 API 或者控制台向集群增加新的 Brokers,期间不会影响集群的可用性,也包含纵向扩容 —— 调整集群 Broker 节点的 Amazon EC2 实例类型。

但无论是主动还是手动,是横向还是纵向,前提都是你曾经做好了磁盘监控,能够应用 Amazon CloudWatch 云监控集成的监控服务,也能够在 Amazon MSK 里勾选其余的监控服务 (Prometheus),最终监控后果都能可视化显示。

须要留神的是,Amazon MSK 集群减少 Broker,每个旧 Topic 的分区如果想重调配,须要手动执行。重调配的时候,会带来额定的带宽,有可能会影响业务,所以能够通过一些参数管制 Broker 间流量带宽,避免过程当中对业务造成太大的影响。当然像 Cruise 一样的开源工具,也能够多多用起来。Cruise 是做大规模集群的治理的 Amazon MSK 工具,它能够帮你做 Broker 间负载的 Re-balance。

对于 Amazon MSK 集群的高可用,有三点须要留神:

  1. 对于两 AZ 部署的集群,正本因子至多保障为 3。如果只有 1,那么当集群滚动降级的时候,就不能对外提供服务了;
  2. 最小的 ISR(in-sync replicas)最多设置为 RF – 1,不然也会影响集群的滚动降级;
  3. 当客户端连贯 Broker 节点时,尽管配置一个 Broker 节点的连贯地址就能够,但还是倡议配置多个。Amazon MSK 故障节点主动替换以及在滚动降级的过程中,如果客户端只装备了一个 Broker 节点,可能会链接超时。如果配置了多个,还能够重试连贯。

在 CPU 层面,Amazon CloudWatch 里有两个对于 Amazon MSK 的指标值得注意,一个是 CpuSystem,另一个是 CpuUser,举荐放弃在 60% 以下,这样在 Amazon MSK 降级保护时,都有足够的 CPU 资源可用。

如果 CPU 利用率过高,触发报警,则能够通过以下几种形式来扩大 Amazon MSK 集群:

  1. 垂直扩大,通过滚动降级进行替换。每个 Broker 的替换大略须要 10-15 分钟的工夫。当然,是否替换集群内所有机器,要依据理论状况做抉择,免得造成资源节约;
  2. 横向拓展,Topic 减少分区数;
  3. 增加 Broker 到集群,之前创立的 Topic 进行 reassign Partitions,重调配会耗费集群资源,当然这是可控的。

最初,对于 ACK 参数的设置也值得注意, ACK = 2 意味着在生产者发送音讯后,等到所有正本都接管到音讯,才返回胜利。这尽管保障了音讯的可靠性,但吞吐率最低。比方日志类数据,参考业务具体情况,就能够酌情设置 ACK = 1,容忍数据失落的可能,但大幅提高了吞吐率。

03 Amazon EMR 存算拆散及资源动静扩缩

Amazon EMR 是托管的 Hadoop 生态,罕用的 Hadoop 组件在 Amazon EMR 上都会有,然而 Amazon EMR 外围特色有两点,一是存算拆散,二是资源动静扩缩

在大数据畛域,存算拆散概念的热度,不下于流批一体、湖仓一体。以亚马逊云科技产品栈为例,实现存算拆散后,数据是在 Amazon S3 上存储,Amazon EMR 只是一个计算集群,是一个无状态的数据。而数据与元数据都在内部,集群简化为无状态的计算资源,用的时候关上,不必的时候敞开就能够。

举个例子,凌晨 1 点到 5 点,少量 ETL 作业,开启集群。其余工夫则齐全不必开启集群。用时开启,不必敞开,对于上云企业而言,交服务费就像交电费,分外节俭。

而资源的动静扩缩次要是指依据不同的工作负载,动静裁减节点,按使用量计费。但如果数据是在 HDFS 上做存算拆散与动静扩缩,就不太容易操作了,扩缩容如果附带 DataNote 数据,就会引发数据的 Re-balance,十分影响效率。如果独自扩大 NodeManager,在云下的场景,资源不再是弹性的,集群也个别是预制好的,与云上有本质区别。

Amazon EMR 有三类节点,第一类是 Master 主节点,部署着 Resource Manager 等服务;Core 外围节点,有 DataNote,NodeManager, 仍然能够选用 HDFS;第三类是工作节点,运行着 Amazon EMR 的 NodeManager 服务,是一个计算节点。所以,Amazon EMR 的扩缩,在于外围节点与工作节点的扩缩,能够依据 YARN 上 Application 的个数、CPU 的利用率等指标配置扩缩策略。也能够应用 Amazon EMR 提供 Managed Scaling 策略其内置了智能算法来实现主动扩缩,也是举荐的形式,对开发者而言是无感的。

04 Amazon EMR Flink Hudi 构建数据湖及 CDC 同步计划

那么应该如何利用 MSK 和 EMR 做数据湖的入湖呢?其具体架构图如下,分作六步详解:

图中标号 1:日志数据和业务数据发送⾄ MSK (Kafka),通过 Flink (TableAPI) 建设 Kafka 表,生产 Kafka 数据,Hive Metastore 存储 Schema;

图中标号 2:RDS (MySQL) 中的数据通过 Flink CDC(flink-cdc-connector) 间接生产 Binlog 数据,⽆需搭建其余生产 Binlog 的服务 (⽐如 Canal,Debezium)。留神使⽤ flink-cdc-connector 的 2.x 版本,⽀持 parallel reading, lock-free and checkpoint feature;

图中标号 3:应用 Flink Hudi Connector, 将数据写⼊ Hudi (S3) 表, 对于⽆需 Update 的数据使⽤ Insert 模式写⼊,对于须要 Update 的 数据 (业务数据和 CDC 数据) 应用 Upsert 模式写⼊;

图中标号 4:应用 Presto 作为查问引擎,对外提供查问服务。此条数据链路的提早取决于入 Hudi 的提早及 Presto 查问的提早,总体在分钟级别;

图中标号 5:对于须要秒级别提早的指标,间接在 Flink 引擎中做计算,计算结果输入到 Amazon RDS 或者 KV 数据库,对外提供 API 查问服务;

图中标号 6:应用 QuickSight 做数据可视化,反对多种数据源接入。

当然,在具体的实际过程中,仍须要开发者对数据湖计划有足够的理解,能力切合场景抉择适合的调参配置。

Q/A 问答

  1. 如何从 Apache Kafka 迁徙至 Amazon MSK?

Amazon MSK 托管的是 Apache Kafka,其 API 是齐全兼容的,业务利用代码不须要调整,更换为 Amazon MSK 的链接地址即可。如果已有的 Kafka 集群数据要迁徙到 Amazon MSK,能够应用 MirrorMaker2 做数据同步,而后切换利用链接地址即可。

  1. Amazon MSK 反对 schema registry 吗?

Amazon MSK 反对 Schema Registry, 不仅反对应用 Amazon Glue 作为 Schema Registry, 还反对第三方的比方 confluent-schema-registry。

  1. MySQL cdc 到 hudi 的提早如何?

总体来讲是分钟级别提早。和数据量,抉择的 Hudi 表类型,计算资源都有关系。

  1. Amazon EMR 比规范 Apache Spark 快多少?

Amazon EMR 比规范 Apache Spark 快 3 倍以上。

  • Amazon EMR 在 Spark3.0 上比开源 Spark 快 1.7 倍,在 TPC-DS 3TB 数据的测试。

参见:
https://aws.amazon.com/cn/blo…

  • Amazon EMR 在 Spark 2.x 上比开源 Spark 快 2~3 倍以上。
  • Amazon Presto 比开源的 PrestoDB 快 2.6 倍。

参见:
https://aws.amazon.com/cn/blo…

  1. 智能湖仓和湖仓一体的区别是什么?

这在本次分享中的现代化数据平台建设和 Amazon 的智能湖仓架构图中都有所体现,Amazon 的智能湖仓架构灵便扩大,安全可靠 ; 专门构建,极致性能 ; 数据交融,对立治理 ; 麻利剖析,深度智能 ; 拥抱开源,开发共赢。湖仓一体只是开始,智能湖仓才是终极。

亚马逊云科技 re:Invent 大会正在拉斯维加斯同步举办,往年是 re:Invent 的第十年,明天 Keynote 上 Adam 为咱们提出的从未构想过的“数据之旅”,从数据湖扩大与平安登程,公布了 Row and cell-level security for Lake Formation,实现了海量数据的精细化治理。同时,适应云计算时代无服务器化的技术发展趋势,同时公布了四款以后云上专门数据分析工具的无服务器版本。成为了行业内全栈式无服务器数据分析的先行者。大数据分析的易用性再次晋升,体验在云上操作数据举重若轻的感觉。相干公布内容能够注册观看大会直播。扫描下方二维码或者点击“浏览原文”即刻注册。

05 附录:操作代码施行

01 创立 Amazon EMR 集群

log_uri="s3://*****/emr/log/"
key_name="****"
jdbc="jdbc:mysql:\/\/*****.ap-southeast-1.rds.amazonaws.com:3306\/hive_metadata_01?
createDatabaseIfNotExist=true"cluster_name="tech-talk-001"

aws emr create-cluster \
--termination-protected \
--region ap-southeast-1 \
--applications Name=Hadoop Name=Hive Name=Flink Name=Tez Name=Spark
Name=JupyterEnterpriseGateway Name=Presto Name=HCatalog \
--scale-down-behavior TERMINATE_AT_TASK_COMPLETION \
--release-label emr-6.4.0 \
--ebs-root-volume-size 50 \
--service-role EMR_DefaultRole \
--enable-debugging \
--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m5.xlarge
InstanceGroupType=CORE,InstanceCount=2,InstanceType=m5.xlarge \
--managed-scaling-policy
ComputeLimits='{MinimumCapacityUnits=2,MaximumCapacityUnits=5,MaximumOnDemandCapacityUnits=2,Ma
ximumCoreCapacityUnits=2,UnitType=Instances}' \
--name "${cluster_name}" \
--log-uri "${log_uri}" \
--ec2-attributes '{"KeyName":"'${key_name}'","SubnetId":"subnet-
0f79e4471cfa74ced","InstanceProfile":"EMR_EC2_DefaultRole"}' \
--configurations '[{"Classification":"hive-site","Properties":
{"javax.jdo.option.ConnectionURL": "'${jdbc}'","javax.jdo.option.ConnectionDriverName":
"org.mariadb.jdbc.Driver","javax.jdo.option.ConnectionUserName":
"admin","javax.jdo.option.ConnectionPassword": "xxxxxx"}}]'

* 左滑查看更多

02 创立 Amazon MSK 集群

# MSK 集群创立能够通过 CLI, 也能够通过 Console 创立
# 下载 kafka, 创立 topic 写⼊数据
wget https://dlcdn.apache.org/kafka/2.6.2/kafka_2.12-2.6.2.tgz
# msk zk 地址,broker 地址
zk_servers=*****.c3.kafka.ap-southeast-1.amazonaws.com:2181
bootstrap_server=******.5ybaio.c3.kafka.ap-southeast-1.amazonaws.com:9092
topic=tech-talk-001
# 创立 tech-talk-001 topic
./bin/kafka-topics.sh --create --zookeeper ${zk_servers} --replication-factor 2 --partitions 4
--topic ${topic}
# 写⼊音讯
./bin/kafka-console-producer.sh --bootstrap-server ${bootstrap_server} --topic ${topic}
{"id":"1","name":"customer"}
{"id":"2","name":"aws"}
# 生产音讯
./bin/kafka-console-consumer.sh --bootstrap-server ${bootstrap_server}

* 左滑查看更多

03 Amazon EMR 启动 Flink

# 启动 flink on yarn session cluster
# 下载 kafka connector
sudo wget -P /usr/lib/flink/lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql?
connector-kafka_2.12/1.13.1/flink-sql-connector-kafka_2.12-1.13.1.jar && sudo chown flink:flink
/usr/lib/flink/lib/flink-sql-connector-kafka_2.12-1.13.1.jar
# hudi-flink-bundle 0.10.0
sudo wget -P /usr/lib/flink/lib/ https://dxs9dnjebzm6y.cloudfront.net/tmp/hudi-flink?
bundle_2.12-0.10.0-SNAPSHOT.jar && sudo chown flink:flink /usr/lib/flink/lib/hudi-flink?
bundle_2.12-0.10.0-SNAPSHOT.jar
# 下载 cdc connector
sudo wget -P /usr/lib/flink/lib/ https://repo1.maven.org/maven2/com/ververica/flink-sql?
connector-mysql-cdc/2.0.0/flink-sql-connector-mysql-cdc-2.0.0.jar && sudo chown flink:flink
/usr/lib/flink/lib/flink-sql-connector-mysql-cdc-2.0.0.jar
# flink session
flink-yarn-session -jm 1024 -tm 4096 -s 2 \
-D state.checkpoints.dir=s3://*****/flink/checkpoints \
-D state.backend=rocksdb \
-D state.checkpoint-storage=filesystem \
-D execution.checkpointing.interval=60000 \
-D state.checkpoints.num-retained=5 \
-D execution.checkpointing.mode=EXACTLY_ONCE \
-D execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \
-D state.backend.incremental=true \
-D execution.checkpointing.max-concurrent-checkpoints=1 \
-D rest.flamegraph.enabled=true \
-d

* 左滑查看更多

04 Flink SQL 客户端

# 这是使⽤ flink sql client 写 SQL 提交作业
# 启动 client
/usr/lib/flink/bin/sql-client.sh -s application_*****
# result-mode
set sql-client.execution.result-mode=tableau;
# set default parallesim
set 'parallelism.default' = '1';

* 左滑查看更多

05 生产 Kafka 写⼊ Hudi

# 创立 kafka 表
CREATE TABLE kafka_tb_001 (
id string,
name string,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'tech-talk-001',
'properties.bootstrap.servers' = '****:9092',
'properties.group.id' = 'test-group-001',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.ignore-parse-errors' = 'true',
'json.fail-on-missing-field' = 'false',
'sink.parallelism' = '2'
);
# 创立 flink hudi 表
CREATE TABLE flink_hudi_tb_106(
uuid string,
name string,
ts TIMESTAMP(3),
logday VARCHAR(255),
hh VARCHAR(255)
)PARTITIONED BY (`logday`,`hh`)
WITH (
'connector' = 'hudi',
'path' = 's3://*****/teck-talk/flink_hudi_tb_106/',
'table.type' = 'COPY_ON_WRITE',
'write.precombine.field' = 'ts',
'write.operation' = 'upsert',
'hoodie.datasource.write.recordkey.field' = 'uuid',
'hive_sync.enable' = 'true',
'hive_sync.metastore.uris' = 'thrift://******:9083',
'hive_sync.table' = 'flink_hudi_tb_106',
'hive_sync.mode' = 'HMS',
'hive_sync.username' = 'hadoop',
'hive_sync.partition_fields' = 'logday,hh',
'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
);
# 插⼊数据
insert into flink_hudi_tb_106 select id as uuid,name,ts,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy?
MM-dd') as logday, DATE_FORMAT(CURRENT_TIMESTAMP,'hh') as hh from kafka_tb_001;
# 除了在创立表是指定同步数据的⽅式,也能够通过 cli 同步 hudi 表元数据到 hive, 但要留神分区格局
./run_sync_tool.sh --jdbc-url jdbc:hive2:\/\/*****:10000 --user hadop --pass hadoop --
partitioned-by logday --base-path s3://****/ --database default --table *****
# presto 查问数据
presto-cli --server *****:8889 --catalog hive --schema default

* 左滑查看更多

06 mysql cdc 同步到 hudi

# 创立 mysql CDC 表
CREATE TABLE mysql_cdc_002 (
id INT NOT NULL,
name STRING,
create_time TIMESTAMP(3),
modify_time TIMESTAMP(3),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '*******',
'port' = '3306',
'username' = 'admin',
'password' = '*****',
'database-name' = 'cdc_test_db',
'table-name' = 'test_tb_01',
'scan.startup.mode' = 'initial'
);
# 创立 hudi 表
CREATE TABLE hudi_cdc_002 (
id INT ,
name STRING,
create_time TIMESTAMP(3),
modify_time TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'path' = 's3://******/hudi_cdc_002/',
'table.type' = 'COPY_ON_WRITE',
'write.precombine.field' = 'modify_time',
'hoodie.datasource.write.recordkey.field' = 'id',
'write.operation' = 'upsert',
'write.tasks' = '2',
'hive_sync.enable' = 'true',
'hive_sync.metastore.uris' = 'thrift://*******:9083',
'hive_sync.table' = 'hudi_cdc_002',
'hive_sync.db' = 'default',
'hive_sync.mode' = 'HMS',
'hive_sync.username' = 'hadoop'
);
# 写⼊数据
insert into hudi_cdc_002 select * from mysql_cdc_002;

* 左滑查看更多

07 sysbench

# sysbench 写⼊ mysql 数据
# 下载 sysbench
curl -s https://packagecloud.io/install/repositories/akopytov/sysbench/script.rpm.sh | sudo
bash
sudo yum -y install sysbench
# 留神以后应用的“lua”并未提供构建,请依据本身状况定义, 上述⽤到表构造如下
CREATE TABLE if not exists `test_tb_01` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(155) DEFAULT NULL,
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
# 创立表
sysbench creates.lua --mysql-user=admin --mysql-password=admin123456 --mysql?
host=****.rds.amazonaws.com --mysql-db=cdc_test_db --report-interval=1 --events=1 run
# 插⼊数据
sysbench insert.lua --mysql-user=admin --mysql-password=admin123456 --mysql?
host=****.rds.amazonaws.com --mysql-db=cdc_test_db --report-interval=1 --events=500 --
time=0 --threads=1 --skip_trx=true run
# 更新数据
sysbench update.lua --mysql-user=admin --mysql-password=admin123456 --mysql?
host=****.rds.amazonaws.com --mysql-db=cdc_test_db --report-interval=1 --events=1000 --
time=0 --threads=10 --skip_trx=true --update_id_min=3 --update_id_max=500 run
# 删除表
sysbench drop.lua --mysql-user=admin --mysql-password=admin123456 --mysql?
host=****.rds.amazonaws.com --mysql-db=cdc_test_db --report-interval=1 --events=1 run

* 左滑查看更多

退出移动版