关于数据挖掘:一文教你快速上手PyFlink

47次阅读

共计 6400 个字符,预计需要花费 16 分钟才能阅读完成。

简介: 本文介绍了 PyFlink 我的项目的指标和倒退历程,以及 PyFlink 目前的外围性能,包含 Python Table API、Python UDF、向量化 Python UDF、Python UDF Metrics、PyFlink 依赖治理和 Python UDF 执行优化,同时也针对性能展现了相干 demo。

作者|付典

本文介绍了 PyFlink 我的项目的指标和倒退历程,以及 PyFlink 目前的外围性能,包含 Python Table API、Python UDF、向量化 Python UDF、Python UDF Metrics、PyFlink 依赖治理和 Python UDF 执行优化,同时也针对性能展现了相干 demo。本文次要分为 4 个局部:

  1. PyFlink 介绍
  2. PyFlink 相干性能
  3. PyFlink 性能演示
  4. PyFlink 下一步布局

PyFlink 介绍

PyFlink 是 Flink 的一个子模块,也是整个 Flink 我的项目的一部分,次要目标是提供 Flink 的 Python 语言反对。因为在机器学习和数据分析等畛域,Python 语言十分重要,甚至是最次要的开发语言。所以,为了满足更多用户需要,拓宽 Flink 的生态,咱们启动了 PyFlink 我的项目。

PyFlink 我的项目的指标次要有两点,第一点是将 Flink 的计算能力输入给 Python 用户,也就是咱们会在 Flink 中提供一系列的 Python API,不便对 Python 语言比拟相熟的用户开发 Flink 作业。

第二点,就是将 Python 生态基于 Flink 进行分布式化。尽管咱们会在 Flink 中提供一系列的 Python API 来给 Python 用户来应用,但这对用户来说是有学习老本的,因为用户要学习怎么应用 Flink 的 Python API,理解每一个 API 的用处。所以咱们心愿用户能在 API 层应用他们比拟相熟的 Python 库的 API,然而底层的计算引擎应用 Flink,从而升高他们的学习老本。这是咱们将来要做的事件,目前处于启动阶段。

下图是 PyFlink 我的项目的倒退状况,目前公布了 3 个版本,反对的内容也越来越丰盛。

PyFlink 相干性能介绍

咱们次要介绍 PyFlink 以下性能,Python Table API、Python UDF、向量化 Python UDF、Python UDF Metrics、PyFlink 依赖治理和 Python UDF 执行优化。

Python Table API

Python Table API 的目标是为了让用户能够应用 Python 语言来开发 Flink 作业。Flink 外面有三种类型的 API,Process、Function 和 Table API,前两者是较为底层的 API,基于 Process 和 Function 开发的作业,其逻辑会严格依照用户定义的行为进行执行,而 Table API 是较为高层的 API,基于 Table API 开发的作业,其逻辑会通过一系列的优化之后进行执行。

Python Table API,顾名思义就是提供 Table API 的 Python 语言反对。

以下是 Python Table API 开发的一个 Flink 作业,作业逻辑是读取文件,计算 word count,而后再把计算结果写到文件中去。这个例子尽管简略,但包含了开发一个 Python Table API 作业的所有根本流程。

首先咱们须要定义作业的执行模式,比如说是批模式还是流模式,作业的并发度是多少?作业的配置是什么。接下来咱们须要定义 source 表和 sink 表,source 表定义了作业的数据源来源于哪里,数据的格局是什么;sink 表定义了作业的执行后果写到哪里去,数据格式是什么。最初咱们须要定义作业的执行逻辑,在这个例子中是计算写过来的 count。

以下是 Python Table API 的局部截图,能够看到它的数量和性能都比拟齐全。

Python UDF

Python Table API 是一种关系型的 API,其性能能够类比成 SQL,而 SQL 里自定义函数是十分重要的性能,能够极大地扩大 SQL 的应用范畴。Python UDF 的次要目标就是容许用户应用 Python 语言来开发自定义函数,从而扩大 Python Table API 的应用场景。同时,Python UDF 除了能够用在 Python Table API 作业中之外,还能够用在 Java Table API 作业以及 SQL 作业中。

在 PyFlink 中咱们反对多种形式来定义 Python UDF。用户能够定义一个 Python 类,继承 ScalarFunction,也能够定义一个一般的 Python 函数或者 Lambda 函数,实现自定义函数的逻辑。除此之外,咱们还反对通过 Callable Function 和 Partial Function 定义 Python UDF。用户能够依据本人的须要抉择最适宜本人的形式。

PyFlink 外面提供了多种 Python UDF 的应用形式,包含 Python Table API、Java table API 和 SQL,咱们一一介绍。

在 Python Table API 中应用 Python UDF,在定义完 Python UDF 之后,用户首先须要注册 Python UDF,能够调用 table environment register 来注册,而后命名,而后就能够在作业中通过这个名字来应用 Python UDF 了。

在 Java Table API 中它的应用形式也比拟类似,然而注册形式不一样,Java Table API 作业中须要通过 DDL 语句来进行注册。

除此之外,用户也能够在 SQL 的作业中应用 Python UDF。与后面两种形式相似,用户首先须要注册 Python UDF,能够在 SQL 脚本中通过 DDL 语句来注册,也能够在 SQL Client 的环境配置文件外面注册。

Python UDF 架构

简略介绍下 Python UDF 的执行架构。Flink 是用 Java 语言编写的,运行在 Java 虚拟机中,而 Python UDF 运行在 Python 虚拟机中,所以 Java 过程和 Python 过程须要进行数据通信。除此之外,两者间还须要传输 state、log、metrics,它们的传输协定须要反对 4 种类型。

向量化 Python UDF

向量化 Python UDF 的次要目标是使 Python 用户能够利用 Pandas 或者 Numpy 等数据分析畛域罕用的 Python 库,开发高性能的 Python UDF。

向量化 Python UDF 是绝对于一般 Python UDF 而言的,咱们能够在下图看到两者的区别。

下图显示了向量化 Python UDF 的执行过程。首先在 Java 端,Java 在攒完多条数据之后会转换成 Arrow 格局,而后发送给 Python 过程。Python 过程在收到数据之后,将其转换成 Pandas 的数据结构,而后调用用户自定义的向量化 Python UDF。同时向量化 Python UDF 的执行后果会再转化成 Arrow 格局的数据,再发送给 Java 过程。

在应用形式上,向量化 Python UDF 与一般 Python UDF 是相似的,只有以下几个中央稍有不同。首先向量化 Python UDF 的申明形式须要加一个 UDF type,申明这是一个向量化 Python UDF,同时 UDF 的输入输出类型是 Pandas Series。

Python UDF Metrics

后面咱们提到 Python UDF 有多种定义形式,然而如果须要在 Python UDF 中应用 Metrics,那么 Python UDF 必须继承 ScalarFunction 来进行定义。在 Python UDF 的 open 办法外面提供了一个 Function Context 参数,用户能够通过 Function Context 参数来注册 Metrics,而后就能够通过注册的 Metrics 对象来汇报了。

PyFlink 依赖治理

从类型来说,PyFlink 依赖次要包含以下几种类型,一般的 PyFlink 文件、存档文件,第三方的库、PyFlink 解释器,或者 Java 的 Jar 包等等。从解决方案来看,针对每种类型的依赖,PyFlink 提供了两种解决方案,一种是 API 的解决方案,一种是命令行选项的形式,大家抉择其一即可。

Python UDF 执行优化

Python UDF 的执行优化次要包含两个方面,执行打算优化和运行时优化。它与 SQL 十分像,一个蕴含 Python UDF 的作业,首先会通过事后定义的规定,生成一个最优的执行打算。在执行打算曾经确定的状况下,在理论执行的时候,又能够使用一些其余的优化伎俩来达到尽可能高的执行效率。

Python UDF 执行打算优化

执行打算的优化次要有以下几个优化思路。一个是不同类型的 UDF 的拆分,因为在一个节点中可能同时蕴含多种类型的 UDF,而不同的类型的 UDF 是不能放在一块执行的;第二个方面是 Filter 下推,其次要目标是尽可能升高含有 Python UDF 节点的输出数据量,从而晋升整个作业的执行性能;第三个优化思路是 Python UDF Chaining,Java 过程与 Python 过程之间的通信开销以及序列化反序列化开销比拟大,而 Python UDF Chaining 能够尽量减少 Java 过程和 Python 过程之间的通信开销。

不同类型 UDF 的拆分

如果有这样一个作业,它蕴含了两个 UDF, 其中 add 是 Python UDF, subtract 是向量化 Python UDF。默认状况下,这个作业的执行打算会有一个 project 节点,这两个 UDF 同时位于这一 project 的节点外面。这个执行打算的次要问题是,一般 Python UDF 每次解决一条数据,而向量化 Python UDF,每次解决多条数据,所以这样的一个执行打算是没有方法执行的。

然而通过拆分,咱们能够把这一个 project 的节点拆分成了两个 project 的节点,其中第一个 project 的节点只蕴含一般 Python UDF,而第二个节点只蕴含向量化 Python UDF。不同类型的 Python UDF 拆分到不同的节点之后,每一个节点都只蕴含了一种类型的 UDF, 所以算子就能够依据它所蕴含的 UDF 的类型抉择最合适的执行形式。

Filter 下推到 Python UDF 之前

Filter 下推的次要目标是将过滤算子下推到 Python UDF 节点之前,尽量减少 Python UDF 节点的数据量。

如果咱们有这样一个作业,作业原始执行打算外面包含了两个 Project 的节点,一个是 add、subtract,同时还包含一个 Filter 节点。这个执行打算是能够运行的,但须要更优化。能够看到,因为 Python 的节点位于 Filter 节点之前,所以在 Filter 节点之前 Python UDF 曾经计算完了,然而如果把 Filter 过滤下,推到 Python UDF 之前,那么就能够大大降低 Python UDF 节点的输出数据量。

Python UDF Chaining

如果咱们有这样一个作业,外面蕴含两种类型的 UDF,一个是 add,一个是 subtract,它们都是一般的 Python UDF。在一个执行打算外面蕴含两个 project 的节点,其中第一个 project 的节点先算 subtract,而后再传输给第二个 project 节点进行执行。

它的次要问题是,因为 subtract 和 add 位于两个不同的节点,其计算结果须要从 Python 发送回 Java,而后再由 Java 过程发送给第二个节点的 Python 进行执行。相当于数据在 Java 过程和 Python 过程之间转了一圈,所以它带来了齐全没有必要的通信开销和序列化反序列化开销。因而,咱们能够将执行打算优化成右图,就是将 add 节点和 subtract 节点放在一个节点中运行,subtract 节点的后果计算出来之后间接去调用 add 节点。

Python UDF 运行时优化

目前进步 Python UDF 经营时的执行效率有三种:一是 Cython 优化,用它来进步 Python 代码的执行效率;二是自定义 Java 过程和 Python 过程之间的序列化器和反序列化器,进步序列化和反序列化效率;三是提供向量化 Python UDF 性能。

PyFlink 相干性能演示

首先大家关上这个页面,外面提供了 PyFlink 的一些 demo,这些 demo 是运行在 docker 外面的,所以大家如果要运行这些 demo 就须要在本机装置 docker 环境。

随后,咱们能够运行命令,命令会启动一个 PyFlink 的集群,前面咱们运行的 PyFlink 的例子都会提交到集群去执行。

第一个例子是 word count,咱们首先在外面定义了环境、source、sink 等,咱们能够运行一下这个作业。

这是作业的执行后果,能够看到 Flink 这个单词呈现了两次,PyFlink 这个单词呈现了一次。

接下来再运行一个 Python UDF 的例子。这个例子和后面有一些相似,首先咱们定义它应用 PyFlink,运行在批这种模式下,同时作业的并发度是 1。不一样的中央是咱们在作业里定义了一个 UDF,它的输出包含两个列,都是 Bigint 类型,而且它输入类型也是对应的。这个 UDF 的逻辑是把这两个列的相加作为一个后果输入。

咱们执行一下作业,执行后果是 3。

接下来咱们再运行一个带有依赖的 Python UDF。后面作业的 UDF 是不蕴含任何依赖的,间接就把两个输出列相加起来。而在这个例子里,UDF 援用了一个第三方的依赖,咱们能够通过 API set python requirement 来执行。

接下来咱们运行作业,它的执行后果和后面是一样的,因为这两个作业的逻辑是相似的。

接下来咱们再看一个向量化 Python UDF 的例子。在 UDF 定义的时候,咱们加了一个 UDF 的 type 字段,阐明说咱们是一个向量化的 Python UDF,其余的逻辑和一般 Python UDF 的逻辑相似。最初它的执行后果也是 3,因为它的逻辑和后面是一样的,计算两页的之和。

咱们再来看一个例子,在 Java 的 Table 作业外面应用 Python。在这个作业外面咱们又会用到一个 Python UDF,它通过 DDL 语句进行注册,而后在 execute SQL 语句外面进行应用。

接下来咱们再看在纯 SQL 作业中应用 Python UDF 的例子。在资源文件外面咱们申明了一个 UDF,名字叫 add1,它的类型是 Python,同时咱们也能看到它的 UDF 地位。

接下来咱们运行它,执行后果是 234。

PyFlink 下一步布局

目前 PyFlink 只反对了 Python Table API,咱们打算在下一个版本中反对 DataStream API,同时也会反对 Python UDAF 以及 Pandas UDAF,另外,在执行层也会继续优化 PyFlink 的执行效率。

这是一些资源的链接,包含 PyFlink 的文档地址。

  • Python Table API 文档

https://ci.apache.org/projects/flink/flink-docs-master/api/python/

  • PyFlink 文档

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/

  • PyFlink playground

https://github.com/pyflink/playgrounds/tree/1.11

好的,咱们明天的分享就到这里了,欢送大家持续关注咱们的课程。

流动举荐:

仅需 99 元即可体验阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版!点击下方链接理解流动详情:https://www.aliyun.com/product/bigdata/sc?utm\_content=g\_1000250506

版权申明: 本文内容由阿里云实名注册用户自发奉献,版权归原作者所有,阿里云开发者社区不领有其著作权,亦不承当相应法律责任。具体规定请查看《阿里云开发者社区用户服务协定》和《阿里云开发者社区知识产权爱护指引》。如果您发现本社区中有涉嫌剽窃的内容,填写侵权投诉表单进行举报,一经查实,本社区将立即删除涉嫌侵权内容。

正文完
 0