1.PyFlink 的发展史
1.1、v1.8.x
- Flink 在 1.8 版本的时候就曾经提供 Python API,只在 Datase/Stream 上提供反对。
- 存在一些问题,比方:
- Table API 不反对 Python。
- 两套各自独立实现的一个 Python API。
- 底层实现是 JPython,JPython 无奈反对 Python3.x。
1.2、v1.9.x
- 2019 年 8 月公布。
- 反对 Python Table API。
1.3、v1.10.x
- 2020 年 2 月公布。
- 提供了 Python UDF 的反对。
- 提供 UDF 的依赖治理。
1.4、将来倒退
- 提供 Pandas UDF 的反对。
- 提供用户自定义的一些 UDF Metrics。
- ML API。
- 在易用性方面,提供 SQL DDL 反对 Python UDF。
- 在前面的一些版本中,咱们也心愿越来越多的人可能参加到 PyFlink 的奉献和开发中去。
2.PyFlink 外围性能及原理介绍
PyFlink 外围性能将次要从每个版本的划分来跟大家进行介绍,第 1 个 PyFlink 1.9 版本外面提供 Python Table API 的反对,而后是 PyFlink 1.10 外面提供了 Python UDF 还有相干依赖治理,最初 1.11 版本外面提供了 Pandas UDF 和用户自定义的 Metrics。
2.1、Python Table API (PyFlink 1.9)
■ 1.Python Table API
什么是 Python Table API 呢?咱们能够从编程的角度来介绍一下。Python Table API 大略提供了一些 Python 的 API,比方这里次要能够看一下 Table 的接口,Table 接口上有很多 Table 相干的算子,这些算子能够分为两类:
- 1. 跟 sql 相干的算子。比方 select、filter、join、window 等;
- 2. 在 sql 的根底上扩大的一些算子。比方 drop_columns(..),能够用来晋升 sql 的便利性,比方:
当有一个很大的表并且想删除某一列的时候,能够用 drop_columns 来删除某一列。
对于咱们来说,能够随便组合 Table 上的办法,而后编写不同的业务逻辑。咱们接下来看一下,如何用 Table API 来写一个 WordCount 的例子,能够让大家有一个比拟残缺的意识。
■ 2.WordCount
如下图所示,是一个残缺的 Python Table API 的 WordCount 的例子。次要能够蕴含 4 个局部。
- 首先,咱们须要去初始化环境,比方第 6 行,咱们先拿到了一个 ExecutionEnvironment,而后第 7 行,去创立一个 TableEnvironment。
- 创立 TableEnvironment 之后,须要去定义 source 跟 sink,这里 source 跟 sink 都是指定了输出和输入的文件门路,还指定了文件中 Table 对应的一些字段,以及字段对应的数据类型。而且能够定义输入分隔符。
- 定义好 source 跟 sink 之后,再来看一下如何编写计算逻辑。能够用 from_path 算子来读取 source 表,读取完之后,就能够进行 group by 的一些聚合,做 group by 跟 wordcount。
- 做完之后,能够把后果表用 insert_into 进行输入。最初调用 Environment 的 execute 来提交作业。
通过下面 4 步,咱们就残缺的写出了一个 Python Table API 的 WordCount。那么对于 WordCount 例子,它的底层实现逻辑是怎么样的呢?接下来看一下,Python Table API 的一个架构。
■ 3.Table API 架构
- 通过这个架构图,能够看到,Python Table API 是建设在 Java Table API 的根底上的,咱们并没有独自的从上到下实现一套 Python Table API。
- Python Table API 是一个特地的存在,它是在 Java Table API 的根底上加了一层薄薄的 API,这两层 API 是能够互相调用的。
- 在 client 端的时候,会起一个 Python VM 而后也会起一个 Java VM,两个 VM 进行通信。通信的细节能够看上面这张图。
- 咱们能够看到 Python 跟 Java VM 外面都会用 Py4J 各自起一个 Gateway。而后 Gateway 会保护一些对象。
- 比方咱们在 Python 这边创立一个 table 对象的时候,它也会在相应的 Java 这边创立一个雷同 table 对象。如果创立一个 TableEnvironment 对象,在 Java 局部也会创立一个 TableEnvironment 对象。
- 如果你调用 table 对象上的办法,那么也会映射到 Java 这边,所以是一个一一映射的关系。
- 基于这一套架构,咱们能够得出一个论断:如果你用 Python Table API 写出了一个作业,这个作业没有 Python UDF 的时候,那么这个作业的性能跟你用 Java 写进去的作业性能是一样的。因为底层的架构都是同一套 Java 的架构。
刚刚咱们介绍了 PyFlink 1.9 版本外面的 Python Table API,也提到了 table 的接口下面提供了很多不同的算子,而且能够用这些算子去组合,实现不同的业务逻辑。然而对于这些算子来说,它的性能还无奈满足一些特定的状况,比方某些业务须要编写一些自定义的逻辑,此时就须要强依赖 Python UDF,所以在 PyFlink 1.10 版本外面,提供了 Python UDF 并且提供了相应的依赖治理。
2.2、Python UDF & 依赖治理 (PyFlink 1.10)
■ 1.Python UDF 架构
- 如果你的作业是蕴含一个 Python UDF 的作业,那么从提交的时候,就是右边的架构图,而后 deploy 到 Remote 端的时候,能够看到 Remote 端的架构图分为两个局部。右边局部是 Java 的 Operator,左边局部是 Python 的 Operator。
- 大体的流程咱们能够大略看一下:
- 在 open 办法里进行 Java Operator 和 Python Operator 环境的初始化。
- 环境初始化好之后,会进行数据处理。当 Java Operator 收到数据之后,先把数据放到一个 input buffer 缓冲区中,达到肯定的阈值后,才会 flash 到 Python 这边。Python 解决完之后,也会先将数据放到一个后果的缓冲区中,当达到肯定阈值,比方达到肯定的记录的行数,或者是达到肯定的工夫,才会把后果 flush 到这边。
- state 拜访的链路。
- logging 拜访的链路。
- metrics 汇报的链路。
■ 2.Python UDF 的应用
- PyFlink-1.9 版本中,Python API 中反对注册应用 Java UDF,应用办法如下:能够调 TableEnvironment 上的 register_java_function 这个办法,有两个参数,一个参数是给 UDF 的命名,第 2 个是 Java 类的门路。
table_env.register_java_function("func1", "java.user.defined.function.class.name")
上面是一个例子:
- Python UDF 的应用:
能够调 TableEnvironment 上的 register_function 这个办法,有两个参数,一个参数是给 UDF 起的名字,第 2 个是 python_udf 的一个对象。
table_env.register_function("func1", python_udf)
上面是一个例子:
■ 3.Python UDF 的定义形式
PyFlink 外面也反对一些其余的形式去定义 UDF,咱们能够看一下,总共有 4 种形式:
- 能够继承 ScalaFunction 基类,并重写 eval 办法。
- 间接定义一个 Named Function,而后再用 UDF 的签名去申明 UDF 的输出类型和输入类型。
- 也能够用刚刚例子外面的 Lambda Function 的这种形式,来定义 Python UDF。
- 最初一种是 Callable Function 的形式。也是申明其输出和输入的类型。
■ 4. 依赖治理
写完 UDF 的时候,常常遇到一个问题,UDF 外面可能会有一些依赖,如何去解决这些依赖问题呢?PyFlink 提供了 4 种依赖的 API,如下所示。
- 依赖文件
如果 UDF 外面依赖一个文件的话,能够用 add_python_file 加载依赖的文件的门路,指定完之后,作业提交的时候,就会把这个文件散发到集群,那么在近程执行的时候,你的 UDF 就能够去拜访这个文件。
table_env.add_python_file(file_path)
- 依赖存档(打包)文件
可能会去依赖一个存档的文件,这个时候你能够用 add_python_archive 办法,传入两个参数。第 2 个参数是一个可选的参数。第 1 个参数示意对你存档文件的重命名。如果调用了 API,那么在 UDF 外面就能够去拜访存档文件外面的所有文件。
table_env.add_python_archive("py_env.zip", "myenv")
# the files contained in the archive file can be accessed in UDF
def my_udf():
with open("myenv/py_env/data/data.txt") as f:
- 依赖第三方我的项目
能够用 set_python_requirements 办法去指定你的第三方依赖。也是有两个参数,第 1 个参数是传一个文件,文件中写了所依赖的第三方我的项目,以及它对应的版本。第 2 个参数是一个可选的参数,如果集群是一个有网络的环境,那么第 2 个参数能够不填,当第 2 个参数不填的时候,作业提交开始初始化的时候,Python 就会去依据你的 requirements 文件外面配置的依赖,主动的去网络下载你的依赖,而后装置。如果集群是没有网络的,能够事后把这些依赖下载好,下载到 cached 的目录外面去。而后把目录也一起提交到集群,集群拿到这个目录会去装置这些依赖。
# commands executed in shell
echo numpy==1.16.5 > requirements.txt
pip download -d cached_dir -r requirements.txt --no-binary :all:
# python code
table_env.set_python_requirements("requirements.txt", "cached_dir")
- 指定 Python Interpreter 门路
假如你的 Python UDF 运行的时候,会依赖某一个版本的 Python 解释器。那么这个时候能够去指定你所心愿 Python UDF 运行的一个解释器的门路。
table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python")
2.3、Pandas UDF & User-defined Metrics (PyFlink 1.11)
咱们在 Pyflink 1.11 的版本外面提供了 Pandas UDF,还有用户自定义的 Metrics。当然 Pyflink 1.11 版本外面,不光是这两个性能,我这里次要是介绍一下这两个性能。Pyflink 1.11 版本也会行将在 2020 年的 6 月份进行公布。
接下来会从性能和性能两个角度来介绍一下 Pandas UDF。
■ 1.Pandas UDF – 性能
咱们先来看一下性能方面,如果你要编写一个 Pandas UDF,那么跟方才定义一般 UDF 的模式基本上是统一的。这里只须要去申明一个 udf_type,指定为 Pandas 就行了。
指定之后,UDF 运行起来的时候零碎传的 i 跟 j 就变成一个 pandas.Series 的数据结构。这个时候能够间接用 series 来进行操作。与此同时会有一个益处,就是咱们拿到的是一个 pandas 的数据结构,能够调用 pandas 相干的一些库函数,并且能够调用一些数值计算相干的库函数,这样能够极大的扩大性能。不须要再去实现一套逻辑。
■ 2.Pandas UDF – 性能
那么性能上 Pandas UDF 的益处,次要有两点。
- 缩小了调用的开销,因为刚刚说到了零碎传给 UDF 的是一个 pandas.series,它相当于是将多行的数据一次性的传给了 UDF。而一般 UDF 解决多行时,每行都须要调用一次 UDF。所以比照能够发现,Pandas UDF 能够显著缩小 UDF 的调用开销。
- 能够缩小 UDF 的序列化和反序列化开销。这里具体介绍一下,Pandas UDF 是如何缩小了序列化和反序列化的。
咱们能够看一下左边这个图,右边是 Java Operator,左边是 Python Operator。假如 Operator 收到了一个 X,而后 X 在这里会进行一个序列化,变成 arrow 的内存数据格式,这个时候用 X’来示意。那么这个时候 Java 这边会把 X’传给 Python,Python 就能够间接来拜访 arrow 数据结构,因为 pandas 底层的数据结构就是用 arrow 来示意的,所以这个时候不须要在 Python 进行反序列化,能够间接来操作 X’。而后在 X’加一之后,失去 Y’,Y’也是间接生成的 arrow 内存数据格式,这里也不须要反序列化。那么把 Y’传到 Java 时,就须要进行一个反序列化。
咱们能够发现,只须要在 Java 进行一个序列化和反序列化。Python 这边能够省去了序列化和反序列化开销。
而且这里须要提出的一点是,如果你的输出 X 也是一个 arrow 的内存数据格式,那么 Java 这边的序列化跟反序列化也是能够防止的。比方你的 source 是一个 Parquet Source,那么它输入的数据格式也是 arrow 数据格式,这个时候就能够防止掉 Java 的序列化和反序列化。所以,Pandas UDF 也是能够缩小序列化反序列化的开销。
■ 3.User-defined Metrics
咱们再来看一下用户自定义 Metrics。
- Metric 注册
先来看一下 Metric 的注册,Metric 注册能够是在 metric_group 上调用对应的 Metric 办法来注册。
- Metric Scope
metric_group 还能够调用他的 add_group 办法去定义你的 Metric 的一个域,能够对 Metric 进行分类。
- Metric 类型
目前 PyFlink 外面提供的 Metric 类型有以下 4 种:
- Counter
相似累加器。一开始须要在 open 办法外面进行 Counter 的注册,而后调用 match_group 上 Counter 办法,这里咱们给了一个 Metric 的名字叫 my_counter。定义完之后,就能够在 Eval 办法外面进行应用。而后 Counter 能够提供 Inc 办法,你能够调用 Inc 进行相应的减少。
- Gauge
它是用来反映一个瞬时值。假如咱们须要在 Metric 上显示 length 值的变动状况。那么咱们须要用 Gauge 办法来注册,名字是 my_gauge。第 2 个参数这里须要留神它是一个 UDF,咱们须要返回要监控数值的值是什么,返回这个值。而后在 Eval 办法里或者其余 UDF 的调用里能够扭转这个值。框架底层就会一直去汇报这个值以后值是多少。
- Meter
Meter 这种 Metric 是示意以后这一秒往前一个工夫区间内所有数值相加的一个均值。咱们看能够调用 Meter 办法来注册。第 2 个参数是一个默认的参数,默认是 60 秒,示意 60 秒内所有值的一个均值。这里须要留神的是,Meter 每一秒都会去汇报以后这一秒往前 60 秒工夫区间内,所有值的均值。能够用 Meter 的 mark_event 办法来汇报。
- Distribution (sum/count/min/max/mean)
最初一种是 Distribution 的一个 Metric 类型,它对你的值能提供一些 sum/count/min/max/mean 等统计信息。能够调用 metric_group 上的 Distribution 这个办法。更新上能够调用 distribution.update。
3. PyFlink 的 Demo 演示
接下来对这些外围性能做一些 Demo 的演示跟解说。此处咱们提供了一个 playgrounds 的 git。次要是心愿帮忙大家更疾速地相熟 PyFlink 所有的性能及应用,并附上了相干代码示例。具体参考信息请见下方链接:
https://github.com/pyflink/pl…
4.PyFlink 社区搀扶打算
- 为什么要发动 PyFlink 社区搀扶打算?
用户逐步变多、有教训用户少
- 社区指标:并肩作战,营造双赢
- 如何参加 PyFlink 打算?
https://survey.aliyun.com/app…
初步审核符合条件后咱们会在收到问卷的 10 个工作日内与您分割。
- 搀扶指标
面向所有 PyFlink 社区企业用户
- PyFlink 问题反对 & 共享
如果你有一些相干的问题或者是其余的一些意见,能够发到社区的邮件列表外面去。
点击「下方链接」可回顾作者分享视频~
https://ververica.cn/develope…