简介:本文介绍了PyFlink我的项目的指标和倒退历程,以及PyFlink目前的外围性能,包含Python Table API、Python UDF、向量化Python UDF、Python UDF Metrics、PyFlink依赖治理和Python UDF执行优化,同时也针对性能展现了相干demo。

作者|付典

本文介绍了PyFlink我的项目的指标和倒退历程,以及PyFlink目前的外围性能,包含Python Table API、Python UDF、向量化Python UDF、Python UDF Metrics、PyFlink依赖治理和Python UDF执行优化,同时也针对性能展现了相干demo。本文次要分为4个局部:

  1. PyFlink介绍
  2. PyFlink相干性能
  3. PyFlink性能演示
  4. PyFlink下一步布局

PyFlink介绍

PyFlink是Flink的一个子模块,也是整个Flink我的项目的一部分,次要目标是提供Flink的Python语言反对。因为在机器学习和数据分析等畛域,Python语言十分重要,甚至是最次要的开发语言。所以,为了满足更多用户需要,拓宽Flink的生态,咱们启动了PyFlink我的项目。

PyFlink我的项目的指标次要有两点,第一点是将Flink的计算能力输入给Python用户,也就是咱们会在Flink中提供一系列的Python API,不便对Python语言比拟相熟的用户开发Flink作业。

第二点,就是将Python生态基于Flink进行分布式化。尽管咱们会在Flink中提供一系列的Python API来给Python用户来应用,但这对用户来说是有学习老本的,因为用户要学习怎么应用Flink的Python API,理解每一个API的用处。所以咱们心愿用户能在API层应用他们比拟相熟的 Python库的API,然而底层的计算引擎应用Flink,从而升高他们的学习老本。这是咱们将来要做的事件,目前处于启动阶段。

下图是PyFlink我的项目的倒退状况,目前公布了3个版本,反对的内容也越来越丰盛。

PyFlink相干性能介绍

咱们次要介绍PyFlink以下性能,Python Table API、Python UDF、向量化Python UDF、Python UDF Metrics、PyFlink依赖治理和Python UDF执行优化。

Python Table API

Python Table API的目标是为了让用户能够应用Python语言来开发Flink作业。Flink外面有三种类型的API,Process、Function和Table API,前两者是较为底层的API,基于Process和Function开发的作业,其逻辑会严格依照用户定义的行为进行执行,而Table API是较为高层的API,基于Table API开发的作业,其逻辑会通过一系列的优化之后进行执行。

Python Table API,顾名思义就是提供 Table API的Python语言反对。

以下是Python Table API开发的一个Flink作业,作业逻辑是读取文件,计算word count,而后再把计算结果写到文件中去。这个例子尽管简略,但包含了开发一个Python Table API作业的所有根本流程。

首先咱们须要定义作业的执行模式,比如说是批模式还是流模式,作业的并发度是多少?作业的配置是什么。接下来咱们须要定义source表和sink表,source表定义了作业的数据源来源于哪里,数据的格局是什么;sink表定义了作业的执行后果写到哪里去,数据格式是什么。最初咱们须要定义作业的执行逻辑,在这个例子中是计算写过来的count。

以下是Python Table API的局部截图,能够看到它的数量和性能都比拟齐全。

Python UDF

Python Table API是一种关系型的API,其性能能够类比成SQL,而SQL里自定义函数是十分重要的性能,能够极大地扩大SQL的应用范畴。Python UDF的次要目标就是容许用户应用Python语言来开发自定义函数,从而扩大Python Table API的应用场景。同时,Python UDF除了能够用在Python Table API作业中之外,还能够用在Java Table API作业以及SQL作业中。

在PyFlink中咱们反对多种形式来定义Python UDF。用户能够定义一个Python类,继承ScalarFunction,也能够定义一个一般的Python函数或者Lambda函数,实现自定义函数的逻辑。除此之外,咱们还反对通过Callable Function和Partial Function定义Python UDF。用户能够依据本人的须要抉择最适宜本人的形式。

PyFlink外面提供了多种Python UDF的应用形式,包含Python Table API、Java table API和SQL,咱们一一介绍。

在Python Table API中应用Python UDF,在定义完Python UDF之后,用户首先须要注册Python UDF,能够调用table environment register来注册,而后命名,而后就能够在作业中通过这个名字来应用 Python UDF了。

在Java Table API中它的应用形式也比拟类似,然而注册形式不一样,Java Table API作业中须要通过DDL语句来进行注册。

除此之外,用户也能够在SQL的作业中应用Python UDF。与后面两种形式相似,用户首先须要注册Python UDF,能够在SQL脚本中通过DDL语句来注册,也能够在SQL Client的环境配置文件外面注册。

Python UDF架构

简略介绍下Python UDF的执行架构。Flink是用Java语言编写的,运行在Java虚拟机中,而Python UDF运行在 Python虚拟机中,所以Java过程和Python过程须要进行数据通信。 除此之外,两者间还须要传输state、log、metrics,它们的传输协定须要反对4种类型。

向量化Python UDF

向量化Python UDF的次要目标是使 Python用户能够利用Pandas或者Numpy等数据分析畛域罕用的Python库,开发高性能的Python UDF。

向量化Python UDF是绝对于一般Python UDF而言的,咱们能够在下图看到两者的区别。

下图显示了向量化Python UDF的执行过程。首先在Java端,Java在攒完多条数据之后会转换成Arrow格局,而后发送给Python过程。Python过程在收到数据之后,将其转换成Pandas的数据结构,而后调用用户自定义的向量化Python UDF。同时向量化Python UDF的执行后果会再转化成Arrow格局的数据,再发送给 Java过程。

在应用形式上,向量化Python UDF与一般Python UDF是相似的,只有以下几个中央稍有不同。首先向量化Python UDF的申明形式须要加一个UDF type,申明这是一个向量化Python UDF,同时UDF的输入输出类型是Pandas Series。

Python UDF Metrics

后面咱们提到 Python UDF有多种定义形式,然而如果须要在Python UDF中应用Metrics,那么Python UDF必须继承ScalarFunction来进行定义。在Python UDF的 open办法外面提供了一个Function Context参数,用户能够通过Function Context参数来注册Metrics,而后就能够通过注册的 Metrics对象来汇报了。

PyFlink依赖治理

从类型来说,PyFlink依赖次要包含以下几种类型,一般的PyFlink文件、存档文件,第三方的库、PyFlink解释器,或者Java的Jar包等等。从解决方案来看,针对每种类型的依赖,PyFlink提供了两种解决方案,一种是API的解决方案,一种是命令行选项的形式,大家抉择其一即可。

Python UDF执行优化

Python UDF的执行优化次要包含两个方面,执行打算优化和运行时优化。它与SQL十分像,一个蕴含Python UDF的作业,首先会通过事后定义的规定,生成一个最优的执行打算。在执行打算曾经确定的状况下,在理论执行的时候,又能够使用一些其余的优化伎俩来达到尽可能高的执行效率。

Python UDF执行打算优化

执行打算的优化次要有以下几个优化思路。一个是不同类型的 UDF的拆分,因为在一个节点中可能同时蕴含多种类型的UDF,而不同的类型的UDF是不能放在一块执行的;第二个方面是Filter下推,其次要目标是尽可能升高含有Python UDF节点的输出数据量,从而晋升整个作业的执行性能;第三个优化思路是Python UDF Chaining,Java过程与Python过程之间的通信开销以及序列化反序列化开销比拟大,而Python UDF Chaining能够尽量减少Java过程和Python过程之间的通信开销。

不同类型UDF的拆分

如果有这样一个作业,它蕴含了两个UDF,其中add是Python UDF, subtract是向量化Python UDF。默认状况下,这个作业的执行打算会有一个project节点,这两个 UDF同时位于这一project的节点外面。这个执行打算的次要问题是,一般Python UDF每次解决一条数据,而向量化Python UDF,每次解决多条数据,所以这样的一个执行打算是没有方法执行的。

然而通过拆分,咱们能够把这一个project的节点拆分成了两个project的节点,其中第一个project的节点只蕴含一般Python UDF,而第二个节点只蕴含向量化Python UDF。不同类型的Python UDF拆分到不同的节点之后,每一个节点都只蕴含了一种类型的UDF,所以算子就能够依据它所蕴含的UDF的类型抉择最合适的执行形式。

Filter下推到Python UDF之前

Filter下推的次要目标是将过滤算子下推到Python UDF节点之前,尽量减少Python UDF节点的数据量。

如果咱们有这样一个作业,作业原始执行打算外面包含了两个Project的节点,一个是add、 subtract,同时还包含一个Filter节点。这个执行打算是能够运行的,但须要更优化。能够看到,因为Python的节点位于Filter节点之前,所以在Filter节点之前Python UDF曾经计算完了,然而如果把Filter过滤下,推到Python UDF之前,那么就能够大大降低Python UDF节点的输出数据量。

Python UDF Chaining

如果咱们有这样一个作业,外面蕴含两种类型的UDF,一个是add,一个是subtract,它们都是一般的Python UDF。在一个执行打算外面蕴含两个project的节点,其中第一个project的节点先算subtract,而后再传输给第二个project节点进行执行。

它的次要问题是,因为subtract和add位于两个不同的节点,其计算结果须要从Python发送回Java,而后再由Java过程发送给第二个节点的Python进行执行。相当于数据在Java过程和Python过程之间转了一圈,所以它带来了齐全没有必要的通信开销和序列化反序列化开销。因而,咱们能够将执行打算优化成右图,就是将add节点和subtract节点放在一个节点中运行,subtract节点的后果计算出来之后间接去调用add节点。

Python UDF运行时优化

目前进步Python UDF经营时的执行效率有三种:一是Cython优化,用它来进步Python代码的执行效率;二是自定义Java过程和Python过程之间的序列化器和反序列化器,进步序列化和反序列化效率;三是提供向量化Python UDF性能。

PyFlink相干性能演示

首先大家关上这个页面,外面提供了PyFlink的一些demo,这些demo是运行在docker外面的,所以大家如果要运行这些demo就须要在本机装置docker环境。

随后,咱们能够运行命令,命令会启动一个PyFlink的集群,前面咱们运行的PyFlink的例子都会提交到集群去执行。

第一个例子是word count,咱们首先在外面定义了环境、source、sink等,咱们能够运行一下这个作业。

这是作业的执行后果,能够看到Flink这个单词呈现了两次,PyFlink这个单词呈现了一次。

接下来再运行一个Python UDF的例子。这个例子和后面有一些相似,首先咱们定义它应用PyFlink,运行在批这种模式下,同时作业的并发度是1。不一样的中央是咱们在作业里定义了一个UDF,它的输出包含两个列,都是Bigint类型,而且它输入类型也是对应的。这个UDF的逻辑是把这两个列的相加作为一个后果输入。

咱们执行一下作业,执行后果是3。

接下来咱们再运行一个带有依赖的Python UDF。后面作业的UDF是不蕴含任何依赖的,间接就把两个输出列相加起来。而在这个例子里,UDF援用了一个第三方的依赖,咱们能够通过API set python requirement来执行。

接下来咱们运行作业,它的执行后果和后面是一样的,因为这两个作业的逻辑是相似的。

接下来咱们再看一个向量化Python UDF的例子。在 UDF定义的时候,咱们加了一个UDF的type字段,阐明说咱们是一个向量化的Python UDF,其余的逻辑和一般Python UDF的逻辑相似。最初它的执行后果也是3,因为它的逻辑和后面是一样的,计算两页的之和。

咱们再来看一个例子,在Java的Table作业外面应用Python。在这个作业外面咱们又会用到一个Python UDF,它通过DDL语句进行注册,而后在execute SQL语句外面进行应用。

接下来咱们再看在纯SQL作业中应用Python UDF的例子。在资源文件外面咱们申明了一个UDF,名字叫add1,它的类型是Python,同时咱们也能看到它的UDF地位。

接下来咱们运行它,执行后果是234。

PyFlink下一步布局

目前PyFlink只反对了Python Table API,咱们打算在下一个版本中反对DataStream API,同时也会反对Python UDAF以及Pandas UDAF,另外,在执行层也会继续优化PyFlink的执行效率。

这是一些资源的链接,包含PyFlink的文档地址。

  • Python Table API文档

https://ci.apache.org/projects/flink/flink-docs-master/api/python/

  • PyFlink文档

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/

  • PyFlink playground

https://github.com/pyflink/playgrounds/tree/1.11

好的,咱们明天的分享就到这里了,欢送大家持续关注咱们的课程。

流动举荐:

仅需99元即可体验阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版!点击下方链接理解流动详情:https://www.aliyun.com/product/bigdata/sc?utm\_content=g\_1000250506

版权申明:本文内容由阿里云实名注册用户自发奉献,版权归原作者所有,阿里云开发者社区不领有其著作权,亦不承当相应法律责任。具体规定请查看《阿里云开发者社区用户服务协定》和《阿里云开发者社区知识产权爱护指引》。如果您发现本社区中有涉嫌剽窃的内容,填写侵权投诉表单进行举报,一经查实,本社区将立即删除涉嫌侵权内容。