关于人工智能:使用-Alluxio-优化-EMR-上-Flink-Join

1次阅读

共计 9166 个字符,预计需要花费 23 分钟才能阅读完成。

业务背景 & 痛点

  • 流式解决的业务场景,常常会遇到实时音讯数据须要与历史存量数据关联查问或者聚合,比方电商常见的订单场景,订单表做为实时事实表,是典型的流式音讯数据,通常会在 kafka 中,而客户信息,商品 SKU 表是维度表,通常存在业务数据库或者数仓中,是典型的离线数据。实时订单数据在实时处理时通常须要事实表与维度表 join 做 reference 补全,以便拿到订单详情并实时统计当天或截至当天的所有订单的商品散布详情。
亚马逊云科技开发者社区为开发者们提供寰球的开发技术资源。这里有技术文档、开发案例、技术专栏、培训视频、流动与比赛等。帮忙中国开发者对接世界最前沿技术,观点,和我的项目,并将中国优良开发者或技术举荐给寰球云社区。如果你还没有关注 / 珍藏,看到这里请肯定不要匆匆划过,点这里让它成为你的技术宝库!
  • 流式计算通常采纳 Flink 做为数据处理平台,上文中提到的实时和离线数据 join 的场景,Flink 提供了 Hive/ jdbc/Hudi/ filesystem 各种 connector 实现与离线数据的提取和读写,这样一来在 Flink 应用程序中,即可应用 Table,Sql API 来 join 关联流态表和离线表数据,实现聚合计算等操作

应用 Flink Sql 离线表 Join 流态表的惯例 lookup join,是通过 Flink hive sql connector 或者 filesystem connector,对离线 hive 库表或者 S3 上离线数据建 Flink Table,而后对 kafka 音讯流中的数据建流态表,而后间接做量表做 join 操作

该形式架构如下图所示:

该形式次要面临的问题是:

  • lookup 维度表数据只会在首次拉起 Flink 利用的时候,保留在 task manager state 中,后续继续查问或者开窗聚合等操作时,是不会再次拉取维度表数据,业务须要定期重启 Flink 利用,或者刷新维度表数据到长期表,以便 join 聚合时和最新的维度数据关联:
  • 每次须要从新全量拉取维度表数据,存在冷启动问题,且维度表数据量大的时候(如上千万注册用户信息表,上万的商品 SKU 属性字段),造成很大 IO 开销,存在性能瓶颈
  • Flink 的 checkpoint 机制在继续查问或者开窗聚合时,须要保留 state 状态及解决数据到检查点快照中,造成 state 快照数据收缩

    解决方案思路

    基于以上业务难点,本文提出一种解决方案思路,即通过 Alluxio 缓存层,将 hive 维度表数据主动加载至 Alluxio UFS 缓存中,同时通过 Flink 时态表 join,把维度表数据做成继续变动表上某一时刻的视图

同时应用 Flink 的 Temporal table function 表函数,传递一个工夫参数,返回 Temporal table 这一指定时刻的视图,这样实时动静表主表与这个 Temporal table 表关联的时候,能够关联到某一个版本(历史上某一个时刻)的维度数据

优化后的整体架构如下图所示:

计划施行落地 Detail

本文以 Kafka 中用户行为日志数据做为实时流态的事实表数据,hive 上用户信息数据做为离线维度表数据,采纳 Alluxio+Flink temproal 的 demo,来验证其 flink join 优化的解决方案

实时事实表

本实例中咱们应用 json-data-generator 开源组件模仿的用户行为 json 数据,实时写入 kafka 中,通过 Flink kafka connector 转换为继续查问的 Flink 流态表,从而做为实时 join 的时候的 Fact 事实表数据

用户行为 json 模仿数据如下格局:

[{"timestamp": "nowTimestamp()",
                    "system": "BADGE",
                    "actor": "Agnew",
                    "action": "EXIT",
                    "objects": ["Building 1"],
                    "location": "45.5,44.3",
                    "message": "Exited Building 1"
                }]

蕴含用户行为的业务工夫,登录零碎,用户署名,行为 activity 动作,操作波及对象,地位信息,及相干文本音讯字段。咱们在

Flink Sql 中建抉择次要字段建事实表如下

CREATE TABLE logevent_source (`timestamp`  string, 
`system` string,
 actor STRING,
 action STRING
) WITH (
'connector' = 'kafka',
'topic' = 'logevent',
'properties.bootstrap.servers' = 'b-6.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092,b-5.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092,b-1.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092 (http://b-6.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092%2Cb-5.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092%2Cb-1.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092/)',
'properties.group.id' = 'testGroup6',
'scan.startup.mode'='latest-offset',
'format' = 'json'
);

Alluxio 缓存维度表

Alluxio 是大数据技术堆栈的分布式缓存,它提供了一个对立的 UFS 文件系统能够对接底层 S3,hdfs 数据,在读写 Alluxio UFS 的时候,能够针对 S3,HDFS 分布式存储层实现 warm up,显著晋升吞吐量和缩小网络开销,且与下层计算引擎如 Hive,spark,Trino 都有深度的集成,很适宜做为离线维度数据的缓存加速器

Amazon EMR 对 Alluxio 提供了良好的集成,能够通过 boostrap 启动脚本形式,在 EMR 创立时主动部署 Alluxio 组件并启动 Alluxio master、worker 过程,具体 EMR 装置和部署 Alluxio 步骤能够参考另一篇文章 Alluxio EMR 集成实际

在集成 Alluxio 的 Amazon EMR 集群中,应用 Alluxio 中创立 hive 离线维表数据的缓存表办法如下:

hive-env.sh 中设置设置 client jar 包:$ export HIVE_AUX_JARS_PATH=/<PATH_TO_ALLUXIO>/client/alluxio-2.2.0-client.jar:${HIVE_AU

确保装置部署 alluxio 的 EMR 集群上 ufs 已配置,并且表或者 db 门路已创立
alluxio fs mkdir alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/customer
alluxio fs chown hadoop:hadoop alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/customer

在 AWS EMR 集群上,创立 hive 表门路指向 alluxio namespace uri:!connect jdbc:hive2://xxx.xxx.xxx.xxx:10000/default;
hive> CREATE TABLE customer(
    c_customer_sk             bigint,
    c_customer_id             string,
    c_current_cdemo_sk        bigint,
    c_current_hdemo_sk        bigint,
    c_current_addr_sk         bigint,
    c_first_shipto_date_sk    bigint,
    c_first_sales_date_sk     bigint,
    c_salutation              string,
    c_first_name              string,
    c_last_name               string,
    c_preferred_cust_flag     string,
    c_birth_day               int,
    c_birth_month             int,
    c_birth_year              int,
    c_birth_country           string,
    c_login                   string,
    c_email_address           string
)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '|'
    STORED AS TEXTFILE
    LOCATION 'alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/customer';
OK
Time taken: 3.485 seconds

如上所示,该 Alluxio 表 location 指向的门路即为 hive 维度表所在 S3 门路,因而对 Customer 用户维度信息表的写入操作会主动同步到 alluxio 缓存中。

创立好 Alluxio hive 离线维度表后,在 flink sql 中,能够通过 hive 的 catalog,连贯到 hive 元数据,即能够查看到 alluxio 缓存表的详细信息:

CREATE CATALOG hiveCatalog WITH (  'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/etc/hive/conf/',
    'hive-version' = '3.1.2',
    'hadoop-conf-dir'='/etc/hadoop/conf/'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG hiveCatalog;
show create table customer;
create external table customer(
    c_customer_sk             bigint,
    c_customer_id             string,
    c_current_cdemo_sk        bigint,
    c_current_hdemo_sk        bigint,
    c_current_addr_sk         bigint,
    c_first_shipto_date_sk    bigint,
    c_first_sales_date_sk     bigint,
    c_salutation              string,
    c_first_name              string,
    c_last_name               string,
    c_preferred_cust_flag     string,
    c_birth_day               int,
    c_birth_month             int,
    c_birth_year              int,
    c_birth_country           string,
    c_login                   string,
    c_email_address           string
) 
row format delimited fields terminated by '|'
location 'alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/30/customer' 
TBLPROPERTIES (
  'streaming-source.enable' = 'false',  
  'lookup.join.cache.ttl' = '12 h'
)

如上图所示,能够看到该维度表 location 门路是 alluxio 缓存 ufs 门路的 uri,业务程序读写该维度表时,alluxio 会自动更新缓存中的 customer 维度表数据,并异步写入到 alluxio 的 backend storage 的 S3 表门路,实现数据湖的表数据同步更新。

Flink Temporal 时态表 join

Flink 时态表 (Temporal table) 也是动静表的一种,时态表的每条记录都会有一个或多个工夫字段相关联,当咱们事实表 join 维度表的时候,通常须要获取实时的维度表数据做 lookup,所以通常须要在事实表 create table 或者 join 时,通过 proctime()函数指定事实表的工夫字段,同时在 join 时,通过 FOR SYSTEM_TIME AS OF 语法,指定维度表 lookup 时对应的事实表工夫版本的数据

在本 Demo 示例中,客户信息在 hive 离线表作为一个变动的维度表的角色,客户行为在 kafka 中作为事实表的角色,因而在 flink kafka source table 中,通过 proctime()指定工夫字段,而后在 flink hive table 做 join 时,应用 FOR SYSTEM_TIME AS OF 指定 lookup 的 kafka source table 的工夫字段,从而实现 Flink temporal 时态表 join 业务解决

如下所示,Flink Sql 中通过 Kafka connector 创立用户行为的事实表,其中 ts 字段即为时态表 join 时的工夫戳:

CREATE TABLE logevent_source (`timestamp`  string, 
`system` string,
 actor STRING,
 action STRING,
 ts as PROCTIME()) WITH (
'connector' = 'kafka',
'topic' = 'logevent',
'properties.bootstrap.servers' = 'b-6.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092,b-5.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092,b-1.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092 (http://b-6.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092%2Cb-5.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092%2Cb-1.msk06.dr04w4.c3.kafka.ap-southeast-1.amazonaws.com:9092/)',
'properties.group.id' = 'testGroup-01',
'scan.startup.mode'='latest-offset',
'format' = 'json'
);

Flink 离线维度表与流式实时表具体 join 办法如下:

select a.`timestamp`,a.`system`,a.actor,a.action,b.c_login from 
      (select *, proctime() as proctime from user_logevent_source) as a 
left join customer  FOR SYSTEM_TIME AS OF a.proctime as b on a.actor=b.c_last_name;

如上代码示例,在事实表 logevent_source join lookup 维度表时,通过 proctime 函数获取到维度表的刹时最新的版本数据,保障 join 时的一致性和实时性

同时,该维度表数据曾经在 alluxio cache,因而读取时性能远高于离线读取 s3 上的表数据

通过 hive 切换 S3 和 alluxio 门路的 customer 信息 维度表,比照测试 flink join 能够看出 alluxio 缓存后性能显著劣势

通过 alter table 不便切换本地和 cache 的 location 门路:

alter table customer set location "s3://xxxxxx/data/s3/30/customer";
alter table customer  set location "alluxio://ip-xxx-xxx-xxx-xxx.ap-southeast-1.compute.internal:19998/s3/30/customer";

选取某一 split 数据分片的 TaskManager 日志:

  • cache 前(S3 门路读取): 5s 加载
2022-06-29 02:54:34,791 INFO  com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem           [] - Opening 's3://salunchbucket/data/s3/30/customer/data-m-00029' for reading
2022-06-29 02:54:39,971 INFO  org.apache.flink.table.filesystem.FileSystemLookupFunction   [] - Loaded 433000 row(s) into lookup join cache
  • cache 后(alluxio 读取): 2s 加载
2022-06-29 03:25:14,476 INFO  com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem           [] - Opening 's3://salunchbucket/data/s3/30/customer/data-m-00029' for reading
2022-06-29 03:25:16,397 INFO  org.apache.flink.table.filesystem.FileSystemLookupFunction   [] - Loaded 433000 row(s) into lookup join cache

在 JobManager 上查看 Timeline,比照 alluxio 和 s3 门路下 job 的执行工夫能够看到更加分明

能够看到,单个 task 查问晋升 1 倍以上,整体 job 性能晋升更加显著

其余须要思考的问题

继续 Join 每次都须要拉取维度数据做 join,Flink 的 checkpoint state 是否始终收缩导致 TM 的 RockDB 撑爆或者内存溢出?

state 自带有 ttl 机制,能够设置 ttl 过期策略,触发 Flink 清理过期 state 数据,Flink Sql 能够通过 Hint 形式设置

insert into logevent_sink
select a.`timestamp`,a.`system`,a.actor,a.action,b.c_login from 
(select *, proctime() as proctime from logevent_source) as a 
  left join 
customer/*+ OPTIONS('lookup.join.cache.ttl' = '5 min')*/  FOR SYSTEM_TIME AS OF a.proctime as b 
on a.actor=b.c_last_name;

Flink Table/Streaming API 相似:

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.days(7))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .cleanupInRocksdbCompactFilter() 
    .build();
ValueStateDescriptor<Long> lastUserLogin = 
    new ValueStateDescriptor<>("lastUserLogin", Long.class);
lastUserLogin.enableTimeToLive(ttlConfig);
StreamTableEnvironment.getConfig().setIdleStateRetentionTime(min, max);

设置后重新启动 lookup join,从 Flink TM 日志中能够看到,ttl 到期后,会触发清理并从新拉取 hive 维表数据:

2022-06-29 04:17:09,161 INFO  org.apache.flink.table.filesystem.FileSystemLookupFunction   
[] - Lookup join cache has expired after 5 minute(s), reloading

此外,能够通过配置 flink state retain,缩小 checkpoint 时候快照数量,从而缩小快照时候 state 的占用空间

Flink job 中配置:-D state.checkpoints.num-retained=5

设置后,能够看到 s3 checkpoint 门路上,Flink Job 会主动清理历史快照,只保留最近的 5 次快照数据,从而确保 checkpoint 快照数据不会沉积

[hadoop@ip-172-31-41-131 ~]$ aws s3 ls s3://salunchbucket/data/checkpoints/7b9f2f9becbf3c879cd1e5f38c6239f8/
                           PRE chk-3/
                           PRE chk-4/
                           PRE chk-5/
                           PRE chk-6/
                           PRE chk-7/

附录

Alluxio 整体架构

Alluxio on EMR 疾速部署

在 Amazon EMR 中利用 Alluxio 的分层存储架构

EMR Alluxio 集成 detail

Flink Temporal Join 具体

本篇作者

唐清原

Amazon 数据分析解决方案架构师,负责 Amazon Data Analytic 服务计划架构设计以及性能优化,迁徙,治理等 Deep Dive 反对。10+ 数据畛域研发及架构设计教训,历任 Oracle 高级征询参谋,咪咕文化数据集市高级架构师,澳新银行数据分析畛域架构师职务。在大数据,数据湖,智能湖仓,及相干举荐零碎 /MLOps 平台等我的项目有丰盛实战经验

陈昊

Amazon 合作伙伴解决方案架构师,有将近 20 年的 IT 从业教训,在企业应用开发、架构设计及建设方面具备丰盛的实践经验。目前次要负责 Amazon (中国)合作伙伴的计划架构征询和设计工作,致力于 Amazon 云服务在国内的利用推广以及帮忙合作伙伴构建更高效的 Amazon 云服务解决方案。
文章起源:https://dev.amazoncloud.cn/column/article/6309af45d4155422a46…

正文完
 0