摘要:本文整顿自阿里巴巴高级技术专家付典,在 FFA 核心技术专场的分享。本篇内容次要分为四个局部:
- PyFlink 倒退现状介绍
- PyFlink 最新性能解读
- PyFlink 典型利用场景介绍
- PyFlink 下一步的倒退布局
点击查看直播回放 & 演讲 PPT
一、PyFlink 倒退现状介绍
很多 PyFlink 的新用户都会问这样一些问题,PyFlink 是否成熟?性能是否齐全?性能怎么样?在这里,咱们针对用户的这样一些问题,进行一个具体的解读。
首先,在性能层面,PyFlink 曾经对齐了 Flink Java API 中的绝大多数性能。用户应用 Java API 能够实现的性能,基本上都能够用 Python API 实现得进去。
同时,PyFlink 还面向 Python 用户提供了很多特有的能力,比如说 Python UDF、Pandas UDF 等,容许用户在 PyFlink 作业中应用各种 Python 三方库。
在部署模式上,PyFlink 反对各种常见的部署模式,比如说 YARN、kubernetes、Standalone 等,这意味着用户能够依据须要,灵便地抉择作业的部署模式。
除了性能层面之外,性能也是很多用户十分关怀的。在性能上,PyFlink 也做了很多优化,首先,在执行打算层面,PyFlink 做了一系列的优化,尽可能优化作业的物理执行打算,比方算子交融。
当作业的物理执行打算确定之后,在 Python 运行时,PyFlink 通过 Cython 实现了 Python 运行时中外围链路的代码,尽量升高 Python 运行时中框架局部的开销。对于 Cython 有所理解的同学应该晓得,Cython 在执行时会被编译成 native 代码来执行,性能十分高。
同时,PyFlink 还在现有的过程模式的根底之上,引入了线程执行模式,以进一步晋升 Python 运行时的性能。线程执行模式在 JVM 中执行用户的 Python 代码,通过这种形式,在一些典型利用场景中,性能甚至能够追平 Java,这一块前面咱们还会具体介绍。
通过这一系列的优化之后,目前 PyFlink 无论在性能上还是在性能上,都曾经根本齐备,达到了生产可用的状态。
PyFlink 达到目前这样一个状态,并不是欲速不达的,从 Flink 1.9 开始引入 PyFlink,到目前为止,PyFlink 曾经累计公布了 8 个大版本,20 多个小版本。
从 Flink 1.9 到 Flink 1.11 这几个版本中,咱们重点在欠缺 Python Table API,基本上对齐了 Java Table API 中的绝大多数性能,同时也反对了 Python UDF、Pandas UDF 等性能。
在 Flink 1.12 至 Flink 1.14 版本中,社区次要是在欠缺 Python DataStream API,目前曾经基本上对齐了 Python DataStream API 上的绝大部分罕用性能。
在 Flink 1.15 至 Flink 1.16 版本中,PyFlink 的重点是在性能优化上,在原有的过程执行模式的根底之上,为 Python 运行时引入了线程执行模式,以进一步地晋升 Python 运行时的性能。
随着 PyFlink 性能的逐步欠缺,咱们也看到 PyFlink 的用户数也在逐步增长,PyPI 的日均下载量在过来一年也有了显著的增长,从最开始的日均 400 屡次,曾经增长到日均 2000 屡次。
二、PyFlink 最新性能解读
接下来,咱们看一下 PyFlink 在 Flink 1.16 中的性能。PyFlink 在 Flink 1.16 中反对的性能次要是围绕使 PyFlink 在性能及性能上全面生产可用这样一个目标。为此,咱们重点补齐了 PyFlink 在性能以及性能上的最初几处短板。
如上图所示,PyFlink 在 Flink 1.16 中反对了 side output 性能。用户能够把一条数据流,切分成多条数据流。以机器学习为例,用户能够通过该性能,把一份数据集给切分成多份,别离用于模型训练和模型验证。
除此之外,用户也能够通过 side output 解决早退数据或者脏数据。将早退数据或者脏数据通过 side output 拆分进去,独自进行解决。用户也能够通过 side output 把早退数据或脏数据,写入内部存储,进行离线剖析。
PyFlink 在 Flink 1.16 中,还反对了 broadcast state。通过该性能,用户能够将一条数据流中的数据,播送发送到另一条数据流算子中的多个并发实例上,并通过 broadcast state 保留播送流的状态,以确保作业在 failover 时,所有算子复原的状态是统一的。
比方咱们用 PyFlink 做近线预测,当模型更新后,能够将最新的模型文件地址,播送发送到所有的预测算子,来实现模型的热更新,并通过 broadcast state 确保在作业 failover 时,所有算子加载的模型文件是统一的。
PyFlink 在 Flink 1.16 中,对于 DataStream API 上 Window 的反对也做了很多欠缺,原生反对了各种窗口,比方滚动窗口、滑动窗口、会话窗口等等。Window 能够将有限流中的数据,划分成不同的工夫窗口进行计算,在流计算中是十分重要的性能,有着十分丰盛的利用场景。
比方机器学习用户能够应用 Window 来计算实时特色。在短视频利用中,能够通过 Window,计算用户最近五分钟的无效视频观看列表,也能够通过 Window,来计算最近 30 分钟,某个视频在各个人群中的点击散布等。
除此之外,在 Flink 1.16 中,PyFlink 还新增了对于很多 DataStream API 上 connector 的反对,包含 Elasticsearch、Kinesis、Pulsar、Hybrid source 等。与此同时,也反对 Orc、Parquet 等 format。
有了这些 connector 以及 format 的反对,PyFlink 基本上曾经对齐了所有 Table API 以及 DataStream API 上 Flink 官网所反对的 connector。
须要阐明的是,对于 PyFlink 中没有原生提供反对的 connector,如果有对应的 Java 实现,也是能够在 PyFlink 作业中应用的,其中 Table API 以及 SQL 上的 connector,能够间接在 PyFlink 作业中应用,不须要任何开发。
对于 DataStream API connector,用户只须要十分大量的开发即可在 PyFlink 作业中应用。如果用户有需要的话,能够参考一下 PyFlink 中现有的 DataStream API connector 是如何反对的,基本上只须要一两个小时即可实现一个 connector 的反对。
除了后面介绍的这些性能层面的加强之外,在性能层面,Flink 1.16 也做了很多工作,根本实现了 Python 运行时线程执行模式的反对。相比于过程执行模式,线程执行模式的性能更好。
线程执行模式通过 JNI 调用的形式,执行 Python 代码,节俭序列化 / 反序列化开销及通信开销。特地是当单条数据比拟大时,成果更加显著。
因为不波及跨过程通信,线程执行模式目前采纳同步执行的形式,不须要在算子中进行攒批操作,没有攒批提早,实用于对提早敏感的场景,比方量化交易。
与此同时,与其余 Java/Python 互调用计划相比,PyFlink 所采纳的计划兼容性更好。很多 Java/Python 互调用计划对于所能反对的 Python 库,都有肯定水平的限度。PyFlink 所采纳的计划,对于用户在作业中所应用的 Python 库没有任何限度。
如上图左侧所示,展现了过程模式和线程模式架构的区别。过程模式须要启动一个独立的 Python 过程,用于执行用户的 Python 代码。线程执行模式在 JVM 中,通过 JNI 调用的形式执行 Python 代码。PEMJA 是 PyFlink 中 Java 代码和 Python 代码之间互调用的库。
如上图右侧所示,在解决时延上,相比于过程模式,线程模式有显著的升高。因为过程模式不须要攒数据,来一条解决一条。与此同时,在解决性能上,线程模式相比过程模式也有较大的晋升,在某些状况下,性能甚至能够追平 Java。
这里须要阐明的是,Python UDF 的执行性能既取决于 PyFlink 执行框架的性能,也跟 Python UDF 的实现是否高效非亲非故。通过各种优化伎俩,目前 PyFlink 执行框架的性能曾经十分高效,开销十分小。用户的 PyFlink 作业的执行性能很大水平取决于,用户作业中的 Python UDF 实现得是否高效。
如果用户的 Python UDF 实现得足够高效,比如说实现的过程中针对一些耗时操作,有针对性地进行来一些优化或者利用一些高性能的 Python 三方库,那么 PyFlink 作业的性能其实是能够实现的十分好的。
三、PyFlink 典型利用场景介绍
接下来,讲一讲 PyFlink 的利用场景。目前,实时机器学习是 PyFlink 用户的重点利用场景。以举荐零碎为例,上图是实时举荐零碎的一个典型架构。用户的行为日志,通过 APP 埋点等伎俩,实时采集到音讯队列中,通过实时数据荡涤,归一化解决之后,在特色生成、样本拼接等模块应用。实时的用户行为日志,能够用来计算实时特色。
首先,实时用户行为日志能够被用来计算实时特色。实时特色是 Flink 十分重要的利用场景。实时特色对于举荐成果的晋升非常明显,建设难度相对来说比拟小,是以后很多公司投入的重点。
比方在短视频利用中,用户最近 N 分钟的无效视频观看列表就是短视频利用中,十分重要的用户实时特色。这个特色能够通过一个 Flink 作业,实时剖析用户的行为日志失去。
一般来说,用户的行为日志还会同步一份到离线存储中,用于生成离线特色。这块次要是用于计算一些简单特色或者是说长周期特色。不论是离线特色还是实时特色,最终都会存储到特色库中供在线举荐零碎应用。
实时的用户行为日志岂但能够用来结构实时特色,而且能够用来结构实时样本,用于模型训练。
通过剖析用户的行为日志,能够主动实现对样本打标签。比方在举荐零碎中,给用户举荐了 10 个 item,如果用户点击了某个 item,那么在行为日志中就会呈现这个 item 的点击事件。有了这个点击事件,咱们就能够失去一条正样本。同理,如果对于某个 item,只有曝光事件,没有点击事件,咱们就能够将其看成是一条负样本。
除了辨别正负样本之外,还须要拼接上用户的特色以及 item 的特色之后,能力失去一条残缺的样本。这里须要留神的是,做样本拼接时所用的特色不是来自于实时特色库,而是来自于历史特色库。
因为实时特色库中的特色是不断更新的,比方在短视频利用中,用户最近 N 分钟的视频点击列表特色,随着工夫的推移,在一直发生变化。因而在样本拼接时,咱们心愿拼接举荐产生时所用到的特色,而不是以后时刻的特色。样本拼接可能产生在举荐事件过来一段时间之后,此时在实时特色库中存储的特色可能曾经产生了变动,因而这里拼接的是历史特色库。因为历史特色库中的数据来自于举荐产生时所用到的特色。
样本经过训练之后,最终生成模型。通过验证,如果没有问题,就能够把模型部署到线上,供在线推理服务应用。
在举荐零碎中,在线推理服务包含召回、排序等多个环节。其中,在召回环节应用比拟宽泛的一种伎俩是多路召回技术,每一路召回应用不同的策略。比如说能够依据用户画像、以后的热点内容、经营策略等,别离生成不同的召回后果。对于这些召回后果,合并之后,再通过排序等环节之后展现给用户。
由此可见,多路召回的益处是不言而喻的。通过多路召回,能够加强举荐后果的多样性。这里须要指出的是,因为举荐零碎对于延时比拟敏感,对于召回策略或者模型的性能要求十分高。
因而,召回中应用的模型或者策略个别都比较简单。目前一些公司也在摸索,在多路召回零碎中引入近线召回。近线召回能够事后计算召回后果,并将召回后果缓存,作为多路召回中的一路,供在线推理服务间接应用。因而近线召回没有时延束缚,用户能够在近线召回中应用一些比较复杂的模型或者策略。
接下来,介绍一下在上述步骤中,如何应用 PyFlink 实现各项性能的开发。在实时数据荡涤局部,机器学习利用中,输出数据中往往蕴含很多列。
Flink 的其余性能也能够用于数据荡涤,比方 SQL。SQL 自身也是十分不便的,那么和 SQL 相比,PyFlink 能够提供哪些附加价值呢?
首先,在机器学习场景中,广泛有一个共性的特点,数据中的列十分多,可能有几十列甚至上百列。在这种状况下 SQL 语句可能写起来十分长,比方在这个例子中,用户可能心愿对第 9 列和第 10 列进行一个合并操作,其余列保留。
然而在 SELECT 语句中,须要把所有的其余的无关列都写进去,如果数据中的列十分多,写起来十分繁琐,同时可读性、可维护性也会变得比拟差,PyFlink 对于这块提供了欠缺的反对。
另外,机器学习用户通常对于 Pandas 比拟相熟,习惯于应用 Pandas 进行数据处理,很多机器学习相干的库的数据结构都是采纳 Pandas 或者 Numpy 的数据结构,PyFlink 在这块也提供了很好的反对,反对用户在 Python UDF 的实现中应用 Pandas 库。
接下来,咱们通过几个具体的例子看一下如何在 PyFlink 中应用上述性能。
首先,PyFlink 提供了行操作和列操作的 API,从而简化用户的代码逻辑。通过列操作,用户能够十分不便的减少列、删除列或替换列。列操作实用于输出数据的列很多,且只有个别列发生变化的场景。
比方在上述例子中,咱们通过 add_or_replace_columns 操作,对数据中的 item_id 一列进行归一化后,替换原有的 item_id 列,数据中的其余列不须要再显式列出来。
除了列操作之外,PyFlink 还反对行操作,能够以行为单位对数据进行变换。在行操作的 UDF 中,能够间接通过列名援用对应列,应用起来十分不便,实用于输出数据中的列很多,且须要对多个列进行解决的场景。
在行操作中,不须要在 UDF 的输出参数中,把所有用到的列都显式列出来,而是把一行数据都作为输出传进来,供 UDF 应用。
在上述例子中,咱们通过 map 操作对数据进行变换,map 的输出是一个 Python 函数,Python 函数的输入输出类型都是 Row 类型,Row 是 PyFlink 中定义的一个数据结构,在 Python 函数的实现中能够通过列名援用输出数据中对应列的值,应用起来十分不便。
除了 map 之外,PyFlink 中还提供了多个行操作相干的 API,如果有需要的话,大家能够从 PyFlink 的官网文档中理解详细信息。
如果用户相熟 Pandas 库,也能够在 Python 函数中应用 Pandas 库。用户只须要将 Python 函数的类型,标记成 Pandas 即可。
在这种状况下,Python 函数的输出类型是 Pandas 的 DataFrame。PyFlink 运行时框架会在调用用户的 Python 函数之前,将输出数据转换成 Pandas 的 DataFrame 构造,不便用户应用。除此之外,Python 函数的输入类型也须要是 Pandas DataFrame。
接下来,咱们看一下实时特色计算局部。
以后有很多公司,开发一个实时特色工作的流程通常是这样的:
首先,算法团队的同学通过数据挖掘等伎俩,发现某个特色会比拟有用。而后,找到数据开发团队的同学,进行需要沟通。将特色的详细描述信息,甚至 Python 代码参考实现,提给数据开发团队的同学。
而后,数据开发团队的同学进行需要排期并实现。在实现的过程中,算法团队的同学和数据开发团队的同学可能还须要进行多轮沟通,确保数据开发团队的同学的了解和实现没有问题。
另外,算法团队同学提供的 Python 参考实现,有可能不太容易翻译成 Java 代码。比方外面用到了一些 Python 三方库,找不到适合的 Java 实现等等。
最初,当特色工作开发好之后,算法同学通过一系列的验证,很有可能发现这个特色可能并没有预期中的成果那么好,这样的一个特色工作很可能就废除了。数据开发团队的同学也白忙活了。
从这个过程中,咱们看到特色的开发成本是十分高的,波及到跨团队的沟通、开发语言的转换,特色上线的周期也十分长,通常以周甚至月为单位。
而 PyFlink 能够显著升高实时特色工作的开发门槛、缩短实时特色的上线周期。有了 PyFlink,算法团队的同学齐全能够本人来开发实时特色工作。同时,在特色工作开发的过程中,能够应用各种 Python 库,没有任何限度。
在举荐场景中,可能会用到这样一个特色,计算用户最近 5 分钟的拜访物品列表。
为此,PyFlink 提供了多种实现伎俩。
首先,用户能够通过 SQL+Pandas UDAF 的形式来实现上述性能。
上述 SQL 语句定义了一个长度为 5 分钟、步长为 30 秒的滑动窗口,针对窗口中的数据,定义了一个 Pandas UDAF 来计算用户在这个窗口中的拜访序列。Pandas UDAF 的次要逻辑是对窗口中用户拜访的 item 进行排序,并应用|作为分隔符,生成拜访序列字符串。
除此之外,用户能够通过 DataStream API 计算序列特色,实现上述性能。通过应用 DataStream API,定义了一个窗口大小为 5 分钟、步长为 30 秒的滑动窗口,并定义了一个聚合函数来解决每一个窗口中的数据。聚合函数须要实现 create_accumulator、add、get_result、merge,定义如何针对窗口中的数据进行聚合运算。
针对窗口中的每一条数据,框架会顺次调用聚合函数的 add 办法,当窗口中所有的数据都解决完后,框架会调用聚合函数的 get_result 办法来取得聚合值,因而用户只须要依据业务逻辑的须要实现这几个办法即可。
在 add 办法中,咱们将数据缓存起来,在 get_result 中对于所有数据进行排序,并以|作为分隔符,生成拜访序列字符串。
接下来,咱们来看一下实时样本生成局部。在实时样本生成局部,次要有正负样本判断和特色拼接。
首先,咱们看一下正负样本的结构。在举荐场景中,当给用户举荐了一批 item 之后,如果用户点击了某个 item,就会成为一个正样本,而如果用户没有点击,则成为一个负样本。
在离线场景中,判断正负样本是非常容易的,而在实时场景中,就不那么容易了。给用户展示了某个 item 之后,用户有可能不点击,也有可能隔了很久之后才点击。
在 Flink 中,用户能够通过定时器解决正负样本问题。针对每条曝光事件,用户能够注册一个定时器。定时器的工夫距离,能够依据业务的须要确定。
在这个例子中,咱们定义了一个 10 分钟的定时器。在 10 分钟内,如果收到了这条曝光事件对应的点击事件,则能够将其看成是一个正样本,否则,如果在 10 分钟内还没有收到对应的点击事件,则能够将其看成一个负样本。
正负样本的问题解决了之后,样本的标签也就确定了,接下来,还须要拼接上举荐产生时所用到的特色,能力成为一条残缺的样本。
这里,咱们能够应用 Flink 中的维表 Join 性能,来进行特色的拼接。为了解决特色穿梭问题,也就是说在拼接用户的实时特色时,用户的实时特色相比举荐产生时可能曾经产生了变动,后面咱们提到,在线推理服务在举荐时,能够将所用到的实时特色保留到历史特色库中。为了便于辨别特色的版本,能够给特色加一个惟一的标识,比方 trace_id,而后在做特色拼接时,通过 trace_id 来定位举荐产生时所应用的特色,解决特色穿梭的问题。
接下来,咱们来看一下近线推理。近线推理是十分典型的利用场景。目前,很多用户在用 PyFlink 做近线推理。
首先,用户能够通过 Table API 做近线推理。在 Table API 里,用户能够通过 Select 语句做近线推理。推理逻辑能够封装在用户的自定义函数中。在自定义函数里,用户能够通过 open 办法,加载机器学习模型。
open 办法只会在作业启动阶段调用一次,因而能够确保机器学习模型只 load 一次。理论的预测逻辑能够定义在 eval 办法中。
除了 Table API 之外,用户也能够通过 DataStream API 做近线推理。跟 Table API 相似,DataStream API 中的自定义函数中也提供了一个 open 办法。用户能够在 open 办法里,加载机器学习模型。应用形式跟 Table API 比拟像,用户能够依据本人的需要,抉择应用 Table API,还是 DataStream API。
除此之外,用户能够通过 timer 晋升推理的时效性。在某些场景中,为了进步时效性,能够通过定时器来做周期性推理。该办法实用于沉闷用户的范畴比拟确定,且用户拜访比拟频繁的场景。
在这些场景中,能够针对沉闷用户,或者圈选一批重点用户,周期性地进行近线推理,以进一步晋升举荐成果。在这里,咱们每 5 分钟对于沉闷用户进行一次近线推理。
某些公司可能是 Java 技术栈。算法团队训练出模型之后,由开发团队再去负责部署应用。在这种状况下,用户可能会偏向于应用 Flink 的 Java API 进行预测。
在 PyFlink 反对线程模式的过程中,形象出了一个 library PEMJA,反对 Java 和 Python 之间的互调用,跟其余的 Java/Python 互调用库相比,PEMJA 的性能更好,并且对于各种 Python 库的反对也比拟好,兼容所有的 Python 库。
这个例子展现了如何利用 PEMJA 提供的 API,在 Flink Java 作业中加载机器学习模型、并进行预测。从这个例子能够看出,通过 PEMJA,用户能够在 JAVA 代码中调用并执行 Python 代码。
四、PyFlink 下一步倒退布局
接下来,PyFlink 的建设重点,会逐渐从性能以及性能,转向易用性、稳定性以及文档,帮忙用户更好的应用 PyFlink。咱们接下来会重点欠缺以下几个方面。
首先,因为以后 PyFlink 的端到端示例相对来说还比拟少,不利于新用户疾速上手。接下来,咱们会建设一个独立的 PyFlink 网站,联合具体场景,展现更多的端到端应用示例。
其次,在易用性方面,接下来会重点优化作业执行过程中的报错提醒,让报错信息更敌对,使用户在开发作业的过程中更容易定位问题。
与此同时,咱们也在重构以后 Python API 的文档。这块次要是参考一些其余成熟的 Python 我的项目的教训,比方 Pandas。使得用户在 Python API 文档中,更容易找到 PyFlink 中各个 API 的应用形式。
最初,作业运行的稳定性也十分重要。咱们也会继续改良并增强 PyFlink 作业运行的稳定性,比方升高 PyFlink 作业在过程模式下 checkpoint 的耗时等。
最初,也欢送大家退出【PyFlink 交换群】交换和反馈 PyFlink 相干的问题和想法。
点击查看直播回放 & 演讲 PPT
更多内容
流动举荐
阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
99 元试用 实时计算 Flink 版(包年包月、10CU)即有机会取得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
理解流动详情:https://www.aliyun.com/produc…