关于flink:从-19-到-111聊聊-PyFlink-的核心功能演进附-Demo-代码

31次阅读

共计 6838 个字符,预计需要花费 18 分钟才能阅读完成。

1.PyFlink 的发展史

1.1、v1.8.x

  1. Flink 在 1.8 版本的时候就曾经提供 Python API,只在 Datase/Stream 上提供反对。
  2. 存在一些问题,比方:
  • Table API 不反对 Python。
  • 两套各自独立实现的一个 Python API。
  • 底层实现是 JPython,JPython 无奈反对 Python3.x。

1.2、v1.9.x

  1. 2019 年 8 月公布。
  2. 反对 Python Table API。

1.3、v1.10.x

  1. 2020 年 2 月公布。
  2. 提供了 Python UDF 的反对。
  3. 提供 UDF 的依赖治理。

1.4、将来倒退

  1. 提供 Pandas UDF 的反对。
  2. 提供用户自定义的一些 UDF Metrics。
  3. ML API。
  4. 在易用性方面,提供 SQL DDL 反对 Python UDF。
  5. 在前面的一些版本中,咱们也心愿越来越多的人可能参加到 PyFlink 的奉献和开发中去。

2.PyFlink 外围性能及原理介绍

PyFlink 外围性能将次要从每个版本的划分来跟大家进行介绍,第 1 个 PyFlink 1.9 版本外面提供 Python Table API 的反对,而后是 PyFlink 1.10 外面提供了 Python UDF 还有相干依赖治理,最初 1.11 版本外面提供了 Pandas UDF 和用户自定义的 Metrics。

2.1、Python Table API (PyFlink 1.9)

■ 1.Python Table API

什么是 Python Table API 呢?咱们能够从编程的角度来介绍一下。Python Table API 大略提供了一些 Python 的 API,比方这里次要能够看一下 Table 的接口,Table 接口上有很多 Table 相干的算子,这些算子能够分为两类:

  • 1. 跟 sql 相干的算子。比方 select、filter、join、window 等;
  • 2. 在 sql 的根底上扩大的一些算子。比方 drop_columns(..),能够用来晋升 sql 的便利性,比方:

当有一个很大的表并且想删除某一列的时候,能够用 drop_columns 来删除某一列。

对于咱们来说,能够随便组合 Table 上的办法,而后编写不同的业务逻辑。咱们接下来看一下,如何用 Table API 来写一个 WordCount 的例子,能够让大家有一个比拟残缺的意识。

■ 2.WordCount

如下图所示,是一个残缺的 Python Table API 的 WordCount 的例子。次要能够蕴含 4 个局部。

  • 首先,咱们须要去初始化环境,比方第 6 行,咱们先拿到了一个 ExecutionEnvironment,而后第 7 行,去创立一个 TableEnvironment。
  • 创立 TableEnvironment 之后,须要去定义 source 跟 sink,这里 source 跟 sink 都是指定了输出和输入的文件门路,还指定了文件中 Table 对应的一些字段,以及字段对应的数据类型。而且能够定义输入分隔符。
  • 定义好 source 跟 sink 之后,再来看一下如何编写计算逻辑。能够用 from_path 算子来读取 source 表,读取完之后,就能够进行 group by 的一些聚合,做 group by 跟 wordcount。
  • 做完之后,能够把后果表用 insert_into 进行输入。最初调用 Environment 的 execute 来提交作业。

通过下面 4 步,咱们就残缺的写出了一个 Python Table API 的 WordCount。那么对于 WordCount 例子,它的底层实现逻辑是怎么样的呢?接下来看一下,Python Table API 的一个架构。

■ 3.Table API 架构

  • 通过这个架构图,能够看到,Python Table API 是建设在 Java Table API 的根底上的,咱们并没有独自的从上到下实现一套 Python Table API。
  • Python Table API 是一个特地的存在,它是在 Java Table API 的根底上加了一层薄薄的 API,这两层 API 是能够互相调用的。
  • 在 client 端的时候,会起一个 Python VM 而后也会起一个 Java VM,两个 VM 进行通信。通信的细节能够看上面这张图。

  • 咱们能够看到 Python 跟 Java VM 外面都会用 Py4J 各自起一个 Gateway。而后 Gateway 会保护一些对象。
  • 比方咱们在 Python 这边创立一个 table 对象的时候,它也会在相应的 Java 这边创立一个雷同 table 对象。如果创立一个 TableEnvironment 对象,在 Java 局部也会创立一个 TableEnvironment 对象。
  • 如果你调用 table 对象上的办法,那么也会映射到 Java 这边,所以是一个一一映射的关系。
  • 基于这一套架构,咱们能够得出一个论断:如果你用 Python Table API 写出了一个作业,这个作业没有 Python UDF 的时候,那么这个作业的性能跟你用 Java 写进去的作业性能是一样的。因为底层的架构都是同一套 Java 的架构。

刚刚咱们介绍了 PyFlink 1.9 版本外面的 Python Table API,也提到了 table 的接口下面提供了很多不同的算子,而且能够用这些算子去组合,实现不同的业务逻辑。然而对于这些算子来说,它的性能还无奈满足一些特定的状况,比方某些业务须要编写一些自定义的逻辑,此时就须要强依赖 Python UDF,所以在 PyFlink 1.10 版本外面,提供了 Python UDF 并且提供了相应的依赖治理。

2.2、Python UDF & 依赖治理 (PyFlink 1.10)

■ 1.Python UDF 架构

  • 如果你的作业是蕴含一个 Python UDF 的作业,那么从提交的时候,就是右边的架构图,而后 deploy 到 Remote 端的时候,能够看到 Remote 端的架构图分为两个局部。右边局部是 Java 的 Operator,左边局部是 Python 的 Operator。
  • 大体的流程咱们能够大略看一下:
  • 在 open 办法里进行 Java Operator 和 Python Operator 环境的初始化。
  • 环境初始化好之后,会进行数据处理。当 Java Operator 收到数据之后,先把数据放到一个 input buffer 缓冲区中,达到肯定的阈值后,才会 flash 到 Python 这边。Python 解决完之后,也会先将数据放到一个后果的缓冲区中,当达到肯定阈值,比方达到肯定的记录的行数,或者是达到肯定的工夫,才会把后果 flush 到这边。
  • state 拜访的链路。
  • logging 拜访的链路。
  • metrics 汇报的链路。

■ 2.Python UDF 的应用

  • PyFlink-1.9 版本中,Python API 中反对注册应用 Java UDF,应用办法如下:能够调 TableEnvironment 上的 register_java_function 这个办法,有两个参数,一个参数是给 UDF 的命名,第 2 个是 Java 类的门路。
table_env.register_java_function("func1", "java.user.defined.function.class.name")

上面是一个例子:

  • Python UDF 的应用:

能够调 TableEnvironment 上的 register_function 这个办法,有两个参数,一个参数是给 UDF 起的名字,第 2 个是 python_udf 的一个对象。

table_env.register_function("func1", python_udf)

上面是一个例子:

■ 3.Python UDF 的定义形式

PyFlink 外面也反对一些其余的形式去定义 UDF,咱们能够看一下,总共有 4 种形式:

  1. 能够继承 ScalaFunction 基类,并重写 eval 办法。
  2. 间接定义一个 Named Function,而后再用 UDF 的签名去申明 UDF 的输出类型和输入类型。
  3. 也能够用刚刚例子外面的 Lambda Function 的这种形式,来定义 Python UDF。
  4. 最初一种是 Callable Function 的形式。也是申明其输出和输入的类型。

■ 4. 依赖治理

写完 UDF 的时候,常常遇到一个问题,UDF 外面可能会有一些依赖,如何去解决这些依赖问题呢?PyFlink 提供了 4 种依赖的 API,如下所示。

  • 依赖文件

如果 UDF 外面依赖一个文件的话,能够用 add_python_file 加载依赖的文件的门路,指定完之后,作业提交的时候,就会把这个文件散发到集群,那么在近程执行的时候,你的 UDF 就能够去拜访这个文件。

table_env.add_python_file(file_path)
  • 依赖存档(打包)文件

可能会去依赖一个存档的文件,这个时候你能够用 add_python_archive 办法,传入两个参数。第 2 个参数是一个可选的参数。第 1 个参数示意对你存档文件的重命名。如果调用了 API,那么在 UDF 外面就能够去拜访存档文件外面的所有文件。

table_env.add_python_archive("py_env.zip", "myenv") 
# the files contained in the archive file can be accessed in UDF 
def my_udf(): 
    with open("myenv/py_env/data/data.txt") as f:
  • 依赖第三方我的项目

能够用 set_python_requirements 办法去指定你的第三方依赖。也是有两个参数,第 1 个参数是传一个文件,文件中写了所依赖的第三方我的项目,以及它对应的版本。第 2 个参数是一个可选的参数,如果集群是一个有网络的环境,那么第 2 个参数能够不填,当第 2 个参数不填的时候,作业提交开始初始化的时候,Python 就会去依据你的 requirements 文件外面配置的依赖,主动的去网络下载你的依赖,而后装置。如果集群是没有网络的,能够事后把这些依赖下载好,下载到 cached 的目录外面去。而后把目录也一起提交到集群,集群拿到这个目录会去装置这些依赖。

# commands executed in shell 
echo numpy==1.16.5 > requirements.txt 
pip download -d cached_dir -r requirements.txt --no-binary :all: 
# python code 
table_env.set_python_requirements("requirements.txt", "cached_dir")
  • 指定 Python Interpreter 门路

假如你的 Python UDF 运行的时候,会依赖某一个版本的 Python 解释器。那么这个时候能够去指定你所心愿 Python UDF 运行的一个解释器的门路。

table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python")

2.3、Pandas UDF & User-defined Metrics (PyFlink 1.11)

咱们在 Pyflink 1.11 的版本外面提供了 Pandas UDF,还有用户自定义的 Metrics。当然 Pyflink 1.11 版本外面,不光是这两个性能,我这里次要是介绍一下这两个性能。Pyflink 1.11 版本也会行将在 2020 年的 6 月份进行公布。

接下来会从性能和性能两个角度来介绍一下 Pandas UDF。

■ 1.Pandas UDF – 性能

咱们先来看一下性能方面,如果你要编写一个 Pandas UDF,那么跟方才定义一般 UDF 的模式基本上是统一的。这里只须要去申明一个 udf_type,指定为 Pandas 就行了。

指定之后,UDF 运行起来的时候零碎传的 i 跟 j 就变成一个 pandas.Series 的数据结构。这个时候能够间接用 series 来进行操作。与此同时会有一个益处,就是咱们拿到的是一个 pandas 的数据结构,能够调用 pandas 相干的一些库函数,并且能够调用一些数值计算相干的库函数,这样能够极大的扩大性能。不须要再去实现一套逻辑。

■ 2.Pandas UDF – 性能

那么性能上 Pandas UDF 的益处,次要有两点。

  1. 缩小了调用的开销,因为刚刚说到了零碎传给 UDF 的是一个 pandas.series,它相当于是将多行的数据一次性的传给了 UDF。而一般 UDF 解决多行时,每行都须要调用一次 UDF。所以比照能够发现,Pandas UDF 能够显著缩小 UDF 的调用开销。
  2. 能够缩小 UDF 的序列化和反序列化开销。这里具体介绍一下,Pandas UDF 是如何缩小了序列化和反序列化的。

咱们能够看一下左边这个图,右边是 Java Operator,左边是 Python Operator。假如 Operator 收到了一个 X,而后 X 在这里会进行一个序列化,变成 arrow 的内存数据格式,这个时候用 X’来示意。那么这个时候 Java 这边会把 X’传给 Python,Python 就能够间接来拜访 arrow 数据结构,因为 pandas 底层的数据结构就是用 arrow 来示意的,所以这个时候不须要在 Python 进行反序列化,能够间接来操作 X’。而后在 X’加一之后,失去 Y’,Y’也是间接生成的 arrow 内存数据格式,这里也不须要反序列化。那么把 Y’传到 Java 时,就须要进行一个反序列化。

咱们能够发现,只须要在 Java 进行一个序列化和反序列化。Python 这边能够省去了序列化和反序列化开销。

而且这里须要提出的一点是,如果你的输出 X 也是一个 arrow 的内存数据格式,那么 Java 这边的序列化跟反序列化也是能够防止的。比方你的 source 是一个 Parquet Source,那么它输入的数据格式也是 arrow 数据格式,这个时候就能够防止掉 Java 的序列化和反序列化。所以,Pandas UDF 也是能够缩小序列化反序列化的开销。

■ 3.User-defined Metrics

咱们再来看一下用户自定义 Metrics。

  • Metric 注册

先来看一下 Metric 的注册,Metric 注册能够是在 metric_group 上调用对应的 Metric 办法来注册。

  • Metric Scope

metric_group 还能够调用他的 add_group 办法去定义你的 Metric 的一个域,能够对 Metric 进行分类。

  • Metric 类型

目前 PyFlink 外面提供的 Metric 类型有以下 4 种:

  • Counter

相似累加器。一开始须要在 open 办法外面进行 Counter 的注册,而后调用 match_group 上 Counter 办法,这里咱们给了一个 Metric 的名字叫 my_counter。定义完之后,就能够在 Eval 办法外面进行应用。而后 Counter 能够提供 Inc 办法,你能够调用 Inc 进行相应的减少。

  • Gauge

它是用来反映一个瞬时值。假如咱们须要在 Metric 上显示 length 值的变动状况。那么咱们须要用 Gauge 办法来注册,名字是 my_gauge。第 2 个参数这里须要留神它是一个 UDF,咱们须要返回要监控数值的值是什么,返回这个值。而后在 Eval 办法里或者其余 UDF 的调用里能够扭转这个值。框架底层就会一直去汇报这个值以后值是多少。

  • Meter

Meter 这种 Metric 是示意以后这一秒往前一个工夫区间内所有数值相加的一个均值。咱们看能够调用 Meter 办法来注册。第 2 个参数是一个默认的参数,默认是 60 秒,示意 60 秒内所有值的一个均值。这里须要留神的是,Meter 每一秒都会去汇报以后这一秒往前 60 秒工夫区间内,所有值的均值。能够用 Meter 的 mark_event 办法来汇报。

  • Distribution (sum/count/min/max/mean)

最初一种是 Distribution 的一个 Metric 类型,它对你的值能提供一些 sum/count/min/max/mean 等统计信息。能够调用 metric_group 上的 Distribution 这个办法。更新上能够调用 distribution.update。

3. PyFlink 的 Demo 演示

接下来对这些外围性能做一些 Demo 的演示跟解说。此处咱们提供了一个 playgrounds 的 git。次要是心愿帮忙大家更疾速地相熟 PyFlink 所有的性能及应用,并附上了相干代码示例。具体参考信息请见下方链接:

https://github.com/pyflink/pl…

4.PyFlink 社区搀扶打算

  • 为什么要发动 PyFlink 社区搀扶打算?

        用户逐步变多、有教训用户少

  • 社区指标:并肩作战,营造双赢
  • 如何参加 PyFlink 打算?

        https://survey.aliyun.com/app…

        初步审核符合条件后咱们会在收到问卷的 10 个工作日内与您分割。

  • 搀扶指标

        面向所有 PyFlink 社区企业用户

  • PyFlink 问题反对 & 共享

        如果你有一些相干的问题或者是其余的一些意见,能够发到社区的邮件列表外面去。

点击「下方链接」可回顾作者分享视频~

https://ververica.cn/develope…

正文完
 0