乐趣区

关于后端:FeatHub流批一体的实时特征工程平台

摘要:本文整顿自阿里巴巴高级技术专家、Apache Flink/Kafka PMC 林东,在 FFA 2022 AI 特色工程专场的分享。本篇内容次要分为三个局部:

  1. 为什么须要 FeatHub
  2. FeatHub 架构和概念
  3. FeatHub API 展现

点击查看直播回放和演讲 PPT

一、为什么须要 FeatHub

1.1 指标场景

上图中展现的是 Feathub 须要反对的指标场景。

首先,思考到机器学习开发者通常是相熟 Python 环境的数据科学家,他们通常应用诸如 TensorFlow、PyTorch 和 Scikit-Learn 等 Python 算法库进行机器学习的训练和推理作业开发。因而,咱们心愿持续反对他们应用 Python 进行特色工程开发,以便他们的特色生成作业代码能够与现有的 Python 算法库进行无缝对接。

其次,思考到目前各个业务畛域(如举荐和风控)正在逐步向实时方向倒退,咱们心愿可能使这些业务的特色工程作业可能像生成离线特色一样轻松地生成实时特色。

第三,思考到越来越多的企业用户不想受限于特定的云厂商,他们对于多云部署有着强烈的需要。因而,咱们打算通过将我的项目的外围代码开源,容许用户在任意的私有云和公有云上部署 FeatHub。

1.2 实时特色工程的痛点

与离线特色相比,应用实时特色会波及到额定的痛点,包含实时特色工程的开发、部署、监控和分享四个阶段的笼罩。

在开发阶段,因为特色随着工夫而变动,可能会产生特色穿梭的问题。为了确保作业生成正确的特色,用户须要在 Flink 作业中编写代码解决工夫戳信息,以避免出现特色穿梭的状况。对于许多用户,特地是那些专一于算法开发的数据科学家,防止特色穿梭的开发成本绝对较高。

当数据科学家实现特色开发后,他们须要将特色作业部署到生产环境,并实现高吞吐和低提早的特色生产。通常状况下,平台方须要提供一些软件工程师来帮助数据科学家将试验程序转换为高性能、高可用的分布式程序进行执行,这些作业可能是 Flink 或 Spark。这个转换阶段引入了额定的人力和工夫老本,可能会引入更多的谬误,同时也可能导致开发周期缩短、节约人力资源等问题。

在监控阶段,用户须要监控曾经实现线上部署的实时特色工程作业。监控的难度较大,因为特色品质不仅取决于代码是否存在 bug,还取决于是否受到上游数据源数值散布变动的影响。

当整个作业的举荐成果下降时,通常须要手动查看每个阶段的特色散布变动,以定位问题产生的阶段并进一步进行调试。目前,这一过程须要投入大量人力,导致效率低下。为了减速实时特色工程的部署和运作,咱们心愿进一步升高监控的难度和人力耗费。

在分享阶段,不同组开发实时特色工程时经常须要定义类似甚至雷同的特色。不足元数据中心使得开发者难以注册、搜寻和复用特色定义和特色数据。因而,他们不得不反复进行开发工作和运行雷同的作业,导致资源节约。为了解决这个问题,咱们心愿反对用户在一个中心化的元数据中心中搜寻、分享和复用已有特色,以升高反复开发的老本。

1.3 point-in-time correct 语义

接下来将举例为大家介绍什么是特色穿梭。在上图的示例中,咱们有两个数据源,别离代表维表特色和样本数据。维表特征描述了用户在最近两分钟内点击网页的次数,而样本数据形容了用户在看了一个网页后是否有点击广告的行为。

咱们心愿将这两个数据源中的数据进行拼接,以产生训练样本,进而用于 TensorFlow、PyTorch 等机器学习程序的训练和推理。如果咱们在进行样本拼接时,未正确思考数据的工夫因素,可能会影响模型的推理成果。

在上图的例子中,咱们能够应用用户 ID 作为 Join Key,将这两个表进行拼接。然而,如果在拼接维表特色时未思考工夫戳,只应用表中最新的点击数,那么咱们失去的训练数据中,所有样本的“最近 2 分钟点击数”特征值都会是 6。这个后果与真实情况不符,会升高模型的推理成果。

具备“point-in-time correct”语义的拼接,会比对样本数据和维表特色的工夫戳,找到维表特色中工夫戳小于样本工夫戳、但最靠近样本工夫戳、并且具备对应 Join Key 的特色作为拼接后的特征值。

在上图的例子中,6:03 时刻的特征值应该来自于 6:00 时刻用户的点击数 10。因而,在生成的训练数据中,6:03 时刻的样本数据中的点击数是 10。

而在 7:05 时刻,因为特征值在 7:00 时刻变为 6,因而在生成的训练数据中,这个样本对应的特征值也是 6。这就是应用”point-in-time correct“语义进行拼接所失去的后果。这种形式能够防止特色穿梭问题。

1.4 Feature Store 的外围场景

上图展现了 Feature Store 的外围场景。Feature Store 是一个近两年衰亡的概念,旨在解决方才形容的特色工程场景中的外围痛点,包含特色的开发、部署、监控、共享等阶段。那么,Feature Store 是如何解决这些阶段的痛点的呢?

上面咱们将分阶段阐明 Feature Store 的外围价值。

在特色开发阶段,Feature Store 能够提供一个简略易用的 SDK,让用户专一于定义特色的计算逻辑,例如拼接、聚合、UDF 调用,而无需实现解决特色穿梭问题的逻辑。这将极大简化数据科学家定义和应用实时特色的工作。

在特色部署阶段,Feature Store 将应用内置的执行引擎来计算并生成用户定义好的特色,这样用户就不须要间接面对 Spark 或 Flink 等我的项目的编程 API 来开发分布式作业了。执行引擎须要反对高吞吐、低提早的特色计算。同时,Feature Store 能够提供丰盛的连接器抉择,反对用户从不同的数据源和数据存储中读取和写入数据。

在特色监控阶段,Feature Store 能够提供一些通用的标准化指标,让用户能够实时监控特色数值的散布变动,并反对告警引入人工干预来检查和保护机器学习链路的推理成果。

在特征分析阶段,Feature Store 将提供特色元数据中心,反对用户注册、搜寻和复用特色,激励大家单干分享,以缩小反复的开发工作。

1.5 为什么须要 FeatHub

尽管曾经有几个开源的 Feature Store,例如 Feast 和 Feather,但为什么咱们还要开发 FeatHub 呢?

在开发 FeatHub 之前,咱们调研了现有的开源 Feature Store 和一些云厂商的 Feature Store(例如 Google Vertex AI、Amazon SageMaker),发现它们提供的 SDK 在能力和易用性方面并不能满足咱们之前形容的需要(例如实时特色、Python SDK、开源)。因而,咱们从新设计了一套 Python SDK,进一步反对实时特色,并且让 SDK 更加简略易用。咱们将在介绍 FeatHub API 时进一步探讨其易用性。

除了反对实时特色和更加易用之外,FeatHub 的架构可能反对多种执行引擎,例如 Flink、Spark 以及基于 Pandas 库实现的单机执行引擎。这使得 FeatHub 可能在单机上疾速进行试验和开发,并随时切换到分布式的 Flink/Spark 集群中进行分布式高性能计算。用户能够依据需要抉择最合适的执行引擎。这也使得 FeatHub 具备十分好的可扩展性。这些个性是其余开源 Feature Store 所不具备的。

在生产部署阶段,咱们心愿 FeatHub 可能以最高效的形式计算实时特色。目前大部分开源 Feature Store 只反对应用 Spark 计算离线特色。而 FeatHub 反对应用 Flink,这是目前流计算畛域最好的执行引擎,来计算实时特色。这个能力是其余 Feature Store 所没有的。

二、FeatHub 架构和概念

2.1 架构

这张架构图展现了 FeatHub 平台的外围组件和架构。最上层是 SDK,目前基于 Python 语言开发,将来将反对 Java 或其余语言的 SDK。用户能够应用 SDK 编写申明式的特色定义,指定特色的数据源和指标存储地位,以及特色计算逻辑,例如基于滑动窗口的聚合和特色拼接。咱们预期该 SDK 可能表白所有已知的特色计算逻辑。

中间层是多种执行引擎。其中,Local Processor 反对用户在单机上利用 CPU、磁盘等资源计算特色,以不便用户在单机上实现试验。Flink Processor 能够将用户的特色定义翻译成 Flink 作业,在高可用性、分布式的集群环境中进行低提早、高吞吐的特色计算。Spark Processor 则能够将用户的特色定义翻译成 Spark 作业,反对用户应用 Spark 执行高吞吐的离线特色计算。

在执行引擎之下是特色的存储,包含离线存储(例如 HDFS)、流式存储(例如 Kafka)和在线存储(例如 Redis)。

上图展现了 FeatHub 平台如何与机器学习的训练和推理程序对接。

用户能够应用 FeatHub SDK 定义特色。一旦部署了 FeatHub 作业,FeatHub 将启动对应的 Flink/Spark ETL 作业,并从在线或离线存储中读写特色数据。如果用户须要进行离线训练,则训练程序(例如 TensorFlow)能够从离线存储(例如 HDFS)中批量读取相应的特色数据。如果用户须要进行在线推理,则在线推理作业能够从在线存储(例如 Redis)中读取特色数据。

此外,存在须要进行在线计算的场景,其中特色的值须要在接管到用户申请后进行在线计算。例如,在地图手机利用中,当服务器收到用户申请时,可能须要依据用户以后申请和上一次申请的地理位置绝对间隔计算用户的挪动速度特色。为满足这一场景需要,Feature Service 能够提供在线计算服务。

2.2 外围概念

上图中的 Table Descriptor 示意一个具备 Schema 的特色表,其概念相似于 Flink Table。咱们能够基于数据源(例如 Kafka Topic)定义一个 Table Descriptor,并对其利用计算逻辑(例如滑动窗口聚合),从而产生一个新的 Table Descriptor,并将 Table Descriptor 的数据输入到内部存储(例如 Redis)中。

Table Descriptor 能够分为 FeatureTable 和 FeatureView。FeatureTable 是指特色存储中的一个物理特色表。例如,用户能够将 FeatureTable 定义为 Kafka 中的一个 Topic。而 FeatureView 则是对 FeatureTable 利用一个或多个计算逻辑后失去的后果。例如,用户能够对来自 Kafka 的 FeatureTable 利用不同的计算逻辑,从而失去以下三类不同的 FeatureView:

  • DerivedFeatureView:它的输出行和输入行是一一对应的。用户能够应用这类 FeatureView 进行样本拼接,生成训练样本。DerivedFeatureView 能够蕴含基于单行计算和 Table Join 取得的特色。
  • SlidingFeatureView:这是一个输入随工夫变动的 FeatureView。它的输出行和输入行通常没有一一对应的关系。例如,咱们须要计算用户最近两分钟内点击某商品的次数,输出的数据是用户的点击行为。即便用户不再产生新的点击行为,随着工夫的推移,咱们晓得这个特色的数值也会逐步递加,直至降为零。SlidingFeatureView 能够蕴含基于单行计算和滑动窗口聚合取得的特色。
  • OnDemandFeatureView:它能够应用来自在线申请的特色作为输出来计算新的特色。因而,咱们须要 Feature Service 来在线计算这个 FeatureView 蕴含的特色。OnDemandFeatureView 能够蕴含基于单行计算取得的特色。

FeatHub 反对多种特色计算逻辑,包含:

  • ExpressionTransform:反对申明式的计算表达式,相似于 SQL Select 语句中的表达式。用户能够对特色进行加减乘除等操作,并调用内置的 UDF 函数。
  • JoinTransform:反对拼接不同 Table Descriptor 的特色,用户能够指定样本数据表和维表,以取得训练样本。
  • PythonUDFTransform:反对用户在 FeatHub SDK 中自定义和调用 Python 函数,不便相熟 Python 的数据科学家进行特色开发。
  • OverWindowTransform:反对基于 Over 窗口的聚合计算,相似于 SQL 中所反对的 Over 窗口聚合。例如,对于一个代表用户购买行为的输出表,应用 OverWindowTransform 找到这一时刻之前的 2 分钟内的该用户的行为数据,并统计购买总额度,作为该用户的特色。
  • SlidingWindowTransform:反对基于滑动窗口的汇合计算,能够随着工夫变动输入新的实时特色数据。该计算逻辑能够将后果输入到在线特色贮存 (e.g. Redis),不便上游机器学习推理程序实时查问和应用。与 OverWindowTransform 不同的是,SlidingWindowTransform 能够在没有新的输出时过期数据。

上图展现了 FeatHub 的执行引擎与特色贮存的工作流程。

首先,用户须要定义 Source 来表白数据源。Source 中的数据会被执行引擎依照所定义的各种 Transform 逻辑进行解决,再输入到内部特色贮存 Sink,以便上游调用。这个过程相似于传统的 ETL。

Source 能够对接常见的离线或在线贮存,例如 FileSystem、Kafka、Hive 等等。而 Sink 同样能够对接常见的离线或在线贮存,例如 FileSystem、Kafka、Redis 等等。Redis 是目前特色工程畛域应用比拟多的在线贮存。

FeatHub 反对多种执行引擎,包含 LocalProcessor、FlinkProcessor 和 SparkProcessor。用户能够依据本人生产环境的具体情况,抉择最合适的引擎来生成所需的特色。

  • LocalProcessor 会在本地机器上执行特色计算逻辑,基于 Pandas 库实现。用户能够在单机上开发特色定义和运行试验,无需部署和应用分布式集群(例如 Flink)。LocalProcessor 仅反对离线特色计算。
  • FlinkProcessor 会将特色定义翻译成能够分布式执行的 Flink 作业,能够基于流计算模式来生成实时特色。FeatHub 反对 Flink Session 模式和 Flink Application 模式。
  • SparkProcessor 会将特色定义翻译成能够分布式执行的 Spark 作业,能够基于批计算模式来生成离线特色。

三、FeatHub API 展现

3.1 特色计算性能

接下来,咱们将通过一些示例程序,展现如何应用 FeatHub 实现特色开发,并展现用户代码的简洁性和易读性。

左上角的图像显示了应用 JoinTransform 实现特色拼接的代码段。如果用户须要将来自维表的特定列连贯作为特色,则能够在特色上提供新的特色名称、特色数据类型、连贯键以及 JoinTransform 实例。在 JoinTransform 上,用户能够提供维表名称和列名称。须要留神的是,用户只需提供连贯所需的根本信息,这种申明式的定义十分简洁。

两头的图像展现了应用 OverWindowTransform 实现 Over 窗口聚合的代码段。如果咱们须要计算最近两分钟内用户购买商品的生产金额总数,则用户除了提供特色名称和数据类型外,还能够提供一个 OverWindowTransform 实例。在 OverWindowTransform 上,用户能够提供一个申明式的计算表达式 item_count * price,用于计算每个订单的生产金额,并设置 agg_function = SUM,以将所有订单的生产金额相加。此外,用户能够将 window_size 设置为 2 分钟,即每次特色计算都会聚合前 2 分钟内的原始数据输出。group_by_key = [“user_id”] 示意对每个用户 ID 进行聚合计算。将所有信息联合起来,即可残缺地表白所需的 Over 窗口聚合逻辑。这种申明式的表白十分简洁,相似于 SQL 的 select / from / where / groupBy。

上图右上角展现了应用 SlidingWindowTransform 实现滑动窗口聚合的代码片段,与 Over 窗口聚合十分类似。区别在于须要指定 step_size,这里设为 1 分钟,示意每隔 1 分钟窗口会滑动一次,移除过期数据,从新计算并输入新特征值。

上图左下角展现了应用 ExpressionTransform 调用内置函数的代码片段。FeatHub 提供罕用内置函数,相似于 Flink SQL 或 Spark SQL。代码片段中的 UNIX_TIMESTAMP 将输出的工夫戳字符串转换为整数类型的 Unix 工夫戳。例如,在这个例子中,用户能够将出租车的上车和下车工夫的字符串类型特色别离转换为整数类型,并相减失去代表本次行程总工夫的特色。

上图右下角展现了应用 PythonUdfTransform 调用用户自定义 Python 函数的代码片段。在此例中,用户应用 Python lambda 函数将输出字符串转换为小写字来取得新的特色。

3.2 样例场景

为了更清晰地展现如何应用 Feathub 实现端到端的作业开发,咱们将以生成机器学习训练数据集为例,并提供具体的解说。在本例中,咱们假如用户数据来自于两个表:Purchase Events 和 Item Price Events。

Purchase Events 中的每行数据代表一次购买商品的行为。其中,user_id 代表用户,item_id 代表被购买的商品,item_count 代表购买的商品个数,而 timestamp 代表购买商品的工夫。

Item Price Events 中的每行数据代表一次商品价格变动的事件。其中,item_id 代表价格发生变化的商品,price 代表最新的价格,timestamp 代表价格变动的工夫。

为了为机器学习训练生成一个数据集,咱们须要在这两个表格中的信息的根底上创立一个新的数据集。该数据集须要在每次购买商品的行为产生时,记录购买商品的用户在以后时刻之前的 30 分钟内的总生产金额作为新特色。

为了创立这个数据集,咱们能够应用 JoinTransform,应用 item_id 作为 Join Key,将 Item Price Events 中的价格以 point-in-time-correct 的形式拼接到每个 Purchase Events 行的数据中。而后,咱们能够应用 OverWindowTransform,以 user_id 为 group_by_key,并设置 window_size 为 30 分钟,计算每行的 item_count 乘以价格,并应用 SUM 函数进行聚合,以取得所需的新特色。

3.3 样例代码

上图展现了实现样例场景的代码片段。

首先,用户须要创立一个 FeatHubClient 实例。FeatHubClient 蕴含了外围 FeatHub 组件的配置。在这个例子中,咱们配置 FeatHubClient 应用 Flink 作为执行引擎。用户还能够进一步提供 Flink 相干的配置信息,例如 Flink 的端口和 IP 地址等。

接下来,用户须要创立 Source 来指定特色的数据源。在这个示例中,特色数据来自本地文件,因而能够应用 FileSystemSource 表白。在须要实时计算特色的场景中,用户能够应用 KafkaSource 从 Kafka 实时读取特色数据。为了可能在之后的特色计算中援用这些特色,用户须要指定 Source 的名称。

在 FileSystemSource 上,data_format 示意文件的数据格式,例如 JSON、Avro 等。timestamp_field 则指定了代表数据工夫戳的列名。有了工夫戳信息,FeatHub 就能够执行遵循 point-in-time-correct 语义的拼接和聚合计算,防止特色穿梭问题。

在创立了 Source 后,用户能够创立 FeatureView 来定义所需的拼接和聚合逻辑。在代码片段中,item_price_events.price 示意须要拼接来自 item_price_events 表上的 price 列。total_payment_last_two_minutes 示意 Over 窗口聚合失去的特色。对于来自 purchase_event_source 的每一行,它会找到之前 2 分钟内具备雷同 user_id 的所有行,计算 item_count * price,而后将计算结果相加,作为特色的数值。

定义 FeatureView 后,用户就能够从中获取特色数据,并将数据输入到特色存储中。用户能够调用 FeatHub 提供的 table#to_pandas 函数获取蕴含特色数据的 Pandas DataFrame,而后检查数据的正确性,或者将取得的数据传递给 Scikit-learn 等 Python 算法库进行训练。

实现特色作业开发后,用户须要将作业部署到生产环境中的分布式集群来解决大规模的特色数据。用户须要创立一个 Sink 实例来表白特色输入的存储地位和相干格局信息。在样例代码中,FileSystemSink 指定了 HDFS 门路以及 CSV 数据格式。

最初,用户能够调用 FeatHub 提供的 table#execute_insert 函数将作业部署到近程的 Flink 集群进行异步执行。

3.4 FeatHub 性能优化

除了提供 Python SDK 不便用户开发特色作业外,FeatHub 还为常见的特色作业提供多种内置性能优化,以升高特色作业的生产部署老本并提高效率。

在实时搜寻举荐等畛域,常常会用到多个滑动窗口聚合的特色,它们的定义简直雷同,只是窗口大小不同。例如,为了判断是否要向用户举荐某个商品,须要晓得该用户在最近 2 分钟、60 分钟、5 小时和 24 小时内点击某类商品的次数。如果间接应用 Flink API 来生成这些特色,每个特色都会有一个独立的 Window 算子来记录窗口范畴内的数据。在这个例子中,最近 2 分钟内的数据会被复制多份寄存在所有算子中,导致节约内存和磁盘空间。而 FeatHub 提供了优化,应用一个算子来记录最大窗口范畴内的数据,并复用这些数据来计算这些特色的数值,从而升高 CPU 和内存老本。

此外,针对前文提到的滑动窗口聚合的特色,咱们发现它们的输出数据通常具备稠密性。以计算用户最近 24 小时中点击商品的次数为例,大多数用户的点击事件可能只集中在其中某一小时,而其余时间段则没有点击行为,因而不会产生特色数值的变动。如果间接应用 Flink API 生成此特色并要求步长为 1 秒,则 Flink 的滑动窗口每秒都会输入数据。相比之下,FeatHub 会提供优化,仅在特色数值发生变化时才输入数据。这样能够显著缩小 Flink 输入的数据所占用的网络带宽,并升高上游计算和存储资源的耗费。

在特色工程方面,咱们打算在 FeatHub 中减少更多的优化措施,例如通过应用 SideOutput 获取早退的数据来更新滑动窗口特色,进步特色的在线和离线一致性。咱们还将提供机制来缩小爬虫造成的 hot-key 对 Flink 作业性能的影响。

3.5 FeatHub 将来工作

咱们正在踊跃开发 FeatHub,心愿尽早为用户提供生产可用的能力。以下是咱们打算实现的工作。

  • 欠缺基于 Pandas、Spark 和 Flink 的执行引擎实现,并提供尽可能多的内置性能优化,以减速特色计算的过程并进步生产环境中的性能体现。
  • 扩大反对更多的离线和在线贮存零碎,例如 Redis、Kafka、HBase 等,使得 FeatHub 可能笼罩更宽泛的场景和利用。
  • 提供可视化 UI,帮忙用户拜访特色元数据中心,轻松注册、查问和复用特色,从而进步开发和部署的效率。
  • 提供常见的指标和监控性能,例如特色的覆盖率和缺失率等,反对开箱即用的特色品质监控和告警机制,保障特色的稳定性和准确性。

欢送尝试应用 FeatHub 并提供您的开发倡议。FeatHub 是一个开源库,其地址为 https://github.com/alibaba/feathub。还能够拜访 https://github.com/flink-extended/feathub-examples 获取更多的 FeatHub 应用代码样例。

<u>**<p style=”text-align:center”><font color=FF6a00 size=4> 点击查看直播回放和演讲 PPT</font>
</p>**</u>


更多内容


流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
99 元试用 实时计算 Flink 版(包年包月、10CU)即有机会取得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
理解流动详情:https://www.aliyun.com/product/bigdata/sc

退出移动版