乐趣区

关于SegmentFault:阿里云-EMR-Delta-Lake-在流利说数据接入中的架构和实践

简介:为了毁灭数据孤岛,企业往往会把各个组织的数据都接入到数据湖以提供对立的查问或剖析。本文将介绍流畅说以后数据接入的整个过程,期间遇到的挑战,以及 delta 在数据接入中产生的价值。

背景

流畅说目前的离线计算工作中,大部分数据源都是来自于业务 DB,业务 DB 数据接入的准确性、稳定性和及时性,决定着上游整个离线计算 pipeline 的准确性和及时性。同时,咱们还有局部业务需要,须要对 DB 中的数据和 hive 中的数据做近实时的联结查问。
在引入阿里云 EMR Delta Lake 之前,咱们通过封装 DataX 来实现业务 DB 数据的接入,采纳 Master-Slave 架构,Master 保护着每日要执行的 DataX 工作的元数据信息,Worker 节点通过一直的以抢占的形式获取状态为 init 和 restryable 的 DataX 工作来执行,直到当天的所有的 DataX 工作全都执行结束为止。

架构图大抵如下:

Worker 解决的过程如下:

对于近实时需要,咱们是间接开一个从库,配置 presto connector 去连贯从库,来实现业务 BD 中的数据和 hive 中的数据做近实时的联结查问需要。

这种架构计划的长处是简略,易于实现。然而随着数据量也来越多,毛病也就逐步裸露进去了:
性能瓶颈: 随着业务的增长,这种通过 SELECT 的形式接入数据的性能会越来越差,受 DB 性能瓶颈影响,无奈通过减少 Worker 节点的形式来缓解。
规模大的表只能通过从库来拉取,造成数据接入的老本越来越高。
无奈业务满足近实时的查问需要,近实时查问只能通过从库的形式查问,进一步加大了接入的老本。
为了解决这些问题,咱们将眼光聚焦到了 CDC 实时接入的计划上。

技术计划选型

对于 CDC 实时接入的计划,目前业内次要有以下几种: CDC + Merge 计划、CDC + Hudi、CDC + Delta Lake 及 CDC + Iceberg 等几种计划。其中,CDC + Merge 计划是在是在数据湖计划呈现之前的做法,这种计划能节俭 DB 从库的老本,然而无奈满足业务近实时查问的需要等性能,所以最开始就 pass 掉了,而 Iceberg 在咱们选型之初,还不够成熟,业界也没有可参考的案列,所以也被 pass 掉了,最初咱们是在 CDC + Hudi 和 CDC + Delta Lake 之间抉择。
在选型时,Hudi 和 Delta Lake 两者的性能上都是大同小异的,所以咱们次要是从这几计划来思考的: 稳定性、小文件合并、是否反对 SQL、云厂商反对水平、语言反对水平等几个方面来思考。

基于以上指标,加上咱们整个数据平台都是基于阿里云 EMR 搭建的,抉择 Delta Lake 的话,会省掉大量的适配开发工作,所以咱们最终抉择了 CDC + Delta Lake 的计划。
整体架构

总体架构图

整体的架构如上图所示。咱们接入的数据会分为两局部,存量历史数据和新数据,存量历史数据应用 DataX 从 MySQL 中导出,存入 OSS 中,新数据应用 Binlog 采集存入 Delta Lake 表中。每日凌晨跑 ETL 工作前,先对历史数据和新数据做 Merge 操作,ETL 工作应用 Merge 之后的数据。

Delta Lake 数据接入
在 Binlog 实时采集方面,咱们采纳了开源的 Debezium,负责从 MySQL 实时拉取 Binlog 并实现适当解析,每张表对应一个 Topic,分库分表合并为一个 Topic 散发到 Kafka 上供上游生产。Binlog 数据接入到 Kafka 之后,咱们须要创立 Kafka Source 表指向对应的 Kafka Topic 中, 表的格局为:

CREATE TABLE kafka_{db_name}_{table_name} (key BINARY, value BINARY, topic STRING, partition INT, offset BIGINT, timestamp TIMESTAMP, timestampType INT)
USING kafka
OPTIONS (
kafka.sasl.mechanism 'PLAIN',
subscribe 'cdc-{db_name}-{table_name}',
serialization.format '1',
kafka.sasl.jaas.config '*****(redacted)',
kafka.bootstrap.servers '{bootstrap-servers}',
kafka.security.protocol 'SASL_PLAINTEXT'
)

咱们次要用到的字段是 value 和 offset,其中 value 的格局如下:

{
"payload": {
"before": {db 记录变更前的 schema 及内容,op= c 时,为 null},
"after": {db 记录变更后的 schema 及内容,op= d 时,为 null},
"source": {ebezium 配置信息},
"op": "c",
"ts_ms":
}
}

同时创立 Delta Lake 表,Location 指向 HDFS 或者 OSS,表构造为:

CREATE TABLE IF NOT EXISTS delta.delta_{dbname}{table_name}({row_key_info},
ts_ms bigint,
json_record string,
operation_type string,
offset bigint
)
USING delta
LOCATION '------/delta/{db_name}.db/{table_name}'

其中 row_key_info 为 Delta Lake 表的惟一索引字段,对于单库单表而言,row_key_info 为 mysql 表的 primary key 字段 eg: id long,对于分库分表及分实例分库分表而言,row_key_info 为分库分表的字段和单表里 primary key 字段组成,eg: 以 user_id 为分表字段,每张表里以 id 为 primary key , 那么对应的 row_key_info 为 id long, user_id long。
StreamingSQL 解决 Kafka 中的数据,咱们次要是提取 Kafka Source 表中的 offset、value 字段及 value 字段中的 CDC 信息如: op、ts_ms 及 payload 的 after 和 before 字段。StreamingSQL 中,咱们采纳 5min 一个 mini batch,次要是思考到 mini batch 太小会产生很多小文件,处理速度会越来越慢,也会影响读的性能,太大了又没法满足近实时查问的要求。而 Delta Lake 表,咱们不将 after 或者 before 字段解析进去,次要是思考到咱们业务表 的 schema 常常变更,业务表 schema 一变更就要去修复一遍数据,老本比拟大。在 StreamingSQL 处理过程中,对于 op=’c’的数据咱们会间接 insert 操作,json_record 取 after 字段。对于 op=’u’或者 op=’d’的数据,如果 Delta Lake 表中不存在,那么执行 insert 操作, 如果存在,那么执行 update 操作;json_record 的赋值值,op=’d’,json_record 取 before 字段,op=’u’,json_record 取 after 字段。保留 op=’d’的字段,次要是思考到删除的数据可能在存量历史表中,如果间接删除的话,凌晨 merge 的数据中,存在存量历史表中的数据就不会被删除。
整个 StreamingSQL 的解决大抵如下:_

 CREATE SCAN incremental{dbname}{tablename} on kafka{dbname}{table_name} USING STREAM
OPTIONS(
startingOffsets='earliest',
maxOffsetsPerTrigger='1000000',
failOnDataLoss=false
);
CREATE STREAM job
OPTIONS(checkpointLocation='------/delta/{db_name}.db/{table_name}checkpoint',
triggerIntervalMs='300000'
)
MERGE INTO delta.delta{dbname}{table_name} as target
USING (
SELECT * FROM (SELECT ts_ms, offset, operation_type, {key_column_sql}, coalesce(after_record, before_record) as after_record, row_number() OVER (PARTITION BY {key_column_partition_sql} ORDER BY ts_ms DESC, offset DESC) as rank
FROM (SELECT ts_ms, offset, operation_type, before_record, after_record, {key_column_include_sql}
FROM (SELECT get_json_object(string(value), '$.payload.op') as operation_type, get_json_object(string(value), '$.payload.before') as before_record,
get_json_object(string(value), '$.payload.after') as after_record, get_json_object(string(value), '$.payload.ts_ms') as tsms,
offset
FROM incremental{dbname}{table_name}
) binlog
) binlog_wo_init ) binlog_rank where rank = 1) as source
ON {key_column_condition_sql}
WHEN MATCHED AND (source.operation_type = 'u' or source.operation_type='d') THEN
UPDATE SET {set_key_column_sql}, ts_ms=source.ts_ms, json_record=source.after_record, operation_type=source.operation_type, offset=source.offset
WHEN NOT MATCHED AND (source.operation_type='c' or source.operation_type='u' or source.operation_type='d') THEN
INSERT ({inser_key_column_sql}, ts_ms, json_record, operation_type, offset) values ({insert_key_column_value_sql}, source.ts_ms, source.after_record, source.operation_type, source.offset);

执行完 StreamingSQL 之后,就会生成如下格局的数据:

其中 part-xxxx.snappy.parquet 保留的是 DeltaLake 表的数据文件,而 _delta_log 目录下保留的是 DeltaLake 表的元数据,包含如下:
其中 xxxxxxxx 示意的是版本信息,xxxxxxxx.json 文件里保留的是无效的 parquet 文件信息,其中 add 类型的为无效的 parquet 文件,remove 为有效的 parquet 文件。
Delta Lake 是反对 Time travel 的,然而咱们 CDC 数据接入的话,用不到数据回滚策略,如果多版本的数据始终保留会给咱们的存储带来肯定的影响,所以咱们要定期删除过期版本的数据,目前是仅保留 2 个小时内的版本数据。同时,Delta Lake 不反对主动合并小文件的性能,所以咱们还须要定期合并小文件。目前咱们的做法是,每小时通过 OPTIMIZE 和 VACCUM 来做一次合并小文件操作及清理过期数据文件操作:_

 optimize delta{dbname}{tablename};
set spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM delta{dbname}{table_name} RETAIN 1 HOURS;

因为目前 Hive 和 Presto 无奈间接读取 Spark SQL 创立的 Delta Lake 表,然而监控及近实时查问需要,须要查问 Delta Lake 表,所以咱们还创立了用于 Hive 和 Presto 表查问的。
Delta Lake 数据与存量数据 Merge

因为 Delta Lake 的数据咱们仅接入新数据,对于存量历史数据咱们是通过 DataX 一次性导入的,加上 Delta Lake 表 Hive 无奈间接查问,所以每日凌晨咱们须要对这两局部数据做一次 merge 操作,写入到新的表中便于 Spark SQL 和 Hive 对立应用。这一模块的架构大抵如下:
图片

每日凌晨 0 点前,调用 DeltaService API,依据 Delta Lake 工作的配置主动生成 merge 工作 的 task 信息、spark-sql 脚本及 对应的 Airflow DAG 文件。
merge 工作的 task 信息次要包含如下信息:

主动生成 Merge 脚本,次要是从 Delta Lake 工作的配置中获取 mysql 表的 schema 信息,删掉历史的 Hive 表,再依据 schema 信息从新创立 Hive 内部表,再依据新的 schema 从 Delta Lake 表的 json_record 字段和历史存量数据表中获取对应的字段值做 union all 操作,缺失值采纳 mysql 的默认值, union 之后,再依据 row_key 进行分组,按 ts_ms 排序取第一条,同时取出 operation_type=’d’的数据。整体如下:

 CREATE DATABASE IF NOT EXISTS {db_name} LOCATION '------/delta/{db_name}.db';
DROP TABLE IF EXISTS {db_name}.{table_name};
CREATE TABLE IF NOT EXISTS {db_name}.{table_name}({table_column_infos}
)
STORED AS PARQUET
LOCATION '------/delta/{db_name}.db/{table_name}/data_date=${{data_date}}';
INSERT OVERWRITE TABLE {db_name}.{table_name}
SELECT {table_columns}
FROM (SELECT {table_columns}, _operation_type, row_number() OVER (PARTITION BY {row_keys} ORDER BY ts_ms DESC) as ranknum
FROM (SELECT {delta_columns}, operation_type as _operation_type, tsms
FROM delta{dbname}{table_name}
UNION ALL
SELECT {hive_columns}, 'c' as _operation_type, 0 as ts_ms
FROM {db_name}.{table_name}_delta_history
) union_rank
) ranked_data
WHERE ranknum=1
AND _operation_type <> 'd'

凌晨 0 点之后,Airflow 会依据 Airflow DAG 文件主动调度执行 merge 的 Spark SQL 脚本,脚本执行胜利后,更新 merge task 的状态为 succeed,Airflow 的 ETL DAG 会依据 merge task 的状态主动调度上游的 ETL 工作。

Delta Lake 数据监控对于 Delta Lake 数据的监控,咱们次要是为了两个目标:监控数据是否提早及监控数据是否失落,次要是在 MySQL 与 Delta Lake 表之间及 CDC 接入过去的 Kafka Topic 与 Delta Lake 表之间。

CDC 接入过去的 Kafka Topic 和 Delta Lake 表之间的提早监控:咱们是每 15 分钟从 Kafka 的 Topic 中获取每个 Partition 的最大 offset 对应的 mysql 的 row_key 字段内容,放入监控的 MySQL 表 delta_kafka_monitor_info 中,再从 delta_kafka_monitor_info 中获取上一周期的 row_key 字段内容,到 Delta Lake 表中查问,如果查问不到,阐明数据有提早或者失落,收回告警。

MySQL 与 Delta Lake 之间的监控:咱们有两种,一种是探针计划,每 15 分钟,从 MySQL 中获取最大的 id,对于分库分表,只监控一张表的,存入 delta_mysql_monitor_info 中,再从 delta_mysql_monitor_info 中获取上一周期的最大 id,到 Delta Lake 表中查问,如果查问不到,阐明数据有提早或者失落,收回告警。另一种是间接 count(id),这种计划又分为单库单表和分库分表两种,元数据保留在 mysql 表 id_based_mysql_delta_monitor_info 中,次要蕴含 min_id、max_id、mysql_count 三个字段,对于单库单表,也是每隔 5 分钟,从 Delta Lake 表中获取 min_id 和 max_id 之间的 count 值,跟 mysql_count 比照,如果小于 mysql_count 值阐明有数据失落或者提早,收回告警。再从 mysql 中获取 max(id) 和 max_id 与 max(id) 之间的 count 值,更新到 id_based_mysql_delta_monitor_info 表中。对于分库分表的状况,依据分库分表规定,生成每一张表对应的 id_based_mysql_delta_monitor_info 信息,每半小时执行一遍监控,规定同单库单表。

遇到的挑战

业务表 schema 变更频繁,Delta Lake 表如果间接解析 CDC 的字段信息的话,如果不能及时发现并修复数据的话,前期修复数据的老本会较大,目前咱们是不解析字段,等到凌晨 merge 的时候再解析。
随着数据量越来越大,StreamingSQL 工作的性能会越来越差。咱们目前是 StreamingSQL 解决提早,呈现大量提早告警后,将 Delta Lake 存量数据替换成昨日 merge 后的数据,再删掉 Delta Lake 表,删除 checkpoint 数据,从头开始生产 KafkaSource 表的数据。升高 Delta Lake 表数据,从而缓解 StreamingSQL 的压力。
Hive 和 Presto 不能间接查问 Spark SQL 创立的 Delta Lake 表,目前咱们是创立反对 Hive 和 Presto 查问的内部表来供 Hive 和 Presto 应用,然而这些表又无奈通过 Spark SQL 查问。所以下层 ETL 利用无奈在不更改代码的状况下,在 Hive 和 Spark SQL 及 Presto 引擎之间自在切换。

带来的收益

节俭了 DB 从库的老本,采纳 CDC + Delta Lake 之后,咱们的老本节俭了近 80%。
凌晨 DB 数据接入的工夫老本大大降低,可能确保所有非特殊要求的 DB 数据接入都能在 1 个小时内跑完。

后续布局

StreamingSQL 工作随着 Delta Lake 表数据量越来越大,性能越来越差问题跟进。
推动是否解决 Spark SQL 创立的 Delta Lake 表,无奈间接应用 Hive 和 Presto 查问的问题。

作者:张宽天
原文链接
本文为阿里云原创内容,未经容许不得转载

退出移动版