摘要:本文整顿自阿里巴巴高级开发工程师黄兴勃 (断尘) 在 Flink Forward Aisa 2021 核心技术专场的演讲。次要内容包含:
- PyFlink 最新性能
- PyFlink Runtime
- 基于 FFI 的 PEMJA
- PyFlink Runtime 2.0
- Future Work
点击查看直播回放 & 演讲 PDF
原 Flink Forward Asia 2021 演讲中的 JCP 我的项目已改名为 PEMJA,并且于 2022 年 1 月 14 日正式开源,开源地址为:
https://github.com/alibaba/pemja
Ps:JCP 已在本文替换为 PEMJA。
一、PyFlink 新性能
PyFlink 1.14 新增了很多性能,次要分为性能、易用性和性能三个方面。
性能方面,新增了 State TTL config。在 1.14 以前曾经实现了 Python Datastream API 以及一些操作 State 上的性能,然而并没有提供 State TTL config 的配置,这也意味着用户写 Python Datastream API 的自定义函数时无奈主动把 State 的值清掉,而是须要手动的操作,对用户不够敌对。
易用性方面,次要新增了以下几项性能:
- 在依赖治理局部反对了 tar.gz 格局。
- Profile 性能。用户写 PyFlink 会用到一些 Python 的自定义函数,但并不分明这部分函数的性能瓶颈在哪里。而有了 profile 性能之后,Python 函数呈现性能瓶颈时,便能够通过 profile 剖析它的瓶颈具体是由起因什么引起,从而能够针对这部分进行一些优化。
- Print 性能。在 1.14 以前,打印自定义的 log 信息必须应用 Python 自定义的 logging 模块。但对于 Python 用户来说,print 是他们比拟习惯应用的一种输入日志信息的形式。所以在 1.14 增加上了这部分性能。
- Local Debug 模式。在 1.14 以前,用户如果应用 Python 自定义的函数在本地开发 PyFlink 作业,必须应用 remote debug 形式调试自定义逻辑,但它应用起来绝对比拟繁琐,而且应用门槛较高。在 1.14 扭转了这种模式,如果在本地编写一个 PyFlink 作业应用了 Python 自定义函数,能够主动切到 local debug 模式,能够在 ide 外面间接 debug 自定义 Python 函数。
性能方面次要新增了以下性能:
- Operator Fusion。这个性能次要针对在 Python Datastream API 的作业中做间断几个算子操作的场景。比方两次 .map 操作,在 1.14 以前,这两个 .map 会别离运行在两个 Python worker 中,而实现了 Operator Fusion 后,它们会被 merge 并运行在同一个 operator 中,而后由 Python worker 执行总的后果,达到了很好的性能优化。
- State 序列化 / 反序列化优化。在 1.14 以前,State 序列化 / 反序列化优化是应用 Python 内置的序列化器 pickle,它能序列化各种 Python 自定义的数据结构,但须要把 State type 的信息序列化到数据结构中,这会导致序列化的构造领会更大。1.14 中对其进行了优化,应用了自定义的序列化器,一个 type 对应一个序列化器来做优化,使得序列化信息更小。
- Finish Bundle 优化。在 1.14 以前 Finish Bundle 是同步的操作,现在把它改成了异步的操作,进步了它的性能,而且能解决一些 Checkpoint 无奈实现的场景。
二、PyFlink Runtime
上图是 PyFlink 现有的框架图。
图左侧的最上方的 Python Table API & SQL 和 Datastream API 是提供给用户的 Python API。用户通过这两个 Python API 编写 PyFlink 作业,再通过一个 py4j 的三方库把 Python API 转换成 Java API,即可对应到 Flink Java API 来形容这个作业。
针对 Table 和 SQL 的作业有个额定的 optimizer,它有两种 rule,一种是常见的 common rules,另一种是 Python rules。这里为什么会有 Python rules?家喻户晓,common rules 针对各种 Table 和 SQL 现有的作业都是无效的,而 Python rules 做的优化是针对 PyFlink 作业中应用了自定义的 Python 函数的场景,可能把对应的 operator 抽取进去。
形容完了作业之后,它会被翻译成一个 jobgraph,外面有对应的 Python operators。Python operators 形容的 jobgraph 会提交到 TM (Runtime) 下来运行,Runtime 中也有个 Python operators。
图右侧是 Python operators 的各种组件,形容了 PyFlink Runtime 最外围的局部。次要分为两个局部:Java operator 和 Python worker。
Java operator 中它有很多个组件,包含 data service 和 State service,以及针对 checkpoint、watermark 和 State request 的一些解决。因为自定义 Python 函数无奈间接运行在 Flink 现有的架构之上,Flink 现有的架构是基于 JVM 的,然而编写 Python 函数须要一个 Python Runtime,所以用 operator worker 来解决这个问题。
解决方案如下:发动一个 Python 过程运行 Python 自定义的函数,同时应用 Java operator 解决上游来的数据,再通过非凡解决之后发送给对应的 Python worker。这里应用的是过程间通信的计划,也就是图中的 data service。State service 针对 Python Datastream API 对 State 的操作,通过在 Python 里操作 State,数据会从 Python worker 返回到 Java operator,Java operator 再通过拜访 State backend 拿到对应的 State 数据,并回传给 Python worker,最初用户就能够操作 State 的后果了。
上图是 PyFlink Runtime Workflow。外面的角色别离是 Python operator、Python runner、bundle processor、coder、Python operation,这几个不同的角色运行在不同的中央。其中 Python operator 和 Python runner 是运行在 Java JVM 里,负责对接上游和上游的 Java operator,而 bundle processor、coder 以及 Python operation 运行在 PVM 里,bundle processor 利用了现有的 Apache Bean 框架,可能接管来自于 Java Python 的数据,它们之间应用了过程间通信。coder 是在 Python 端的一个自定义的序列化器,Java 端发送了一条数据,通过 Python operator 发送给 Python runner,由 Python runner 进行序列化后,再通过过程间的通信发送给 bundle processor。bundle processor 再把序列化后的二进制数组通过 coder 将它反序列化并失去一个 Python 对象。最初通过 Python operation 把反序列化之后的 Python 参数作为一个函数体的入参,而后调用自定义的 Python 函数,失去自定义的后果。
上述流程的瓶颈次要存在以下几个方面:首先是计算端调用用户自定义函数以及在调用之前,存在框架层 Python 写的开销;其次是自定义序列化局部,在 Java 端和 Python 端都须要序列化和反序列化数据;第三局部是过程间的通信。
针对上述瓶颈,进行了一些列优化:
- 计算方面,利用 codegen 将现有的 Python 调函数的变量全都改为常量,函数的执行效率会更高;另外,将现有的 Python operation 的实现全都改为 cython,相当于将 Python 转化为 .c 的实现形式,性能失去了大幅晋升;
- 序列化方面,提供了自定义序列化器,全都是纯 c 的实现,比 Python 更高效。
- 通信方面,目前暂未实现优化。
- 序列化和通信的问题,实质上就是 Java 和 Python 相互调用的问题,也就是如何优化 PyFlink 的 Runtime 架构的问题。
三、基于 FFI 的 PEMJA
Java 和 Python 相互调用曾经是一个比拟通用的问题,目前也曾经有很多种实现计划。
第一种是过程间相互调用的计划,即网络通信的计划,包含以下几种:
- socket 计划,它所有的通信协定都是通过本人实现,能够很灵便,然而比拟繁琐;
- py4j 计划,即 PyFlink 和 PySpark 在客户端编写作业时都应用 py4j;
- Alink 计划,它是在 Runtime 运行时应用 py4j,也有自定义的 Python 函数;grpc 计划,它利用现有的 grp service,不须要自定义的协定,有自定义的 service 和 message;
- 此外,共享内存的计划也是另一种过程间通信的计划,比方 Tensorflow on Flink,它是通过共享内存的形式实现的。还有 PyArrow Plasma,也是一种对象式的共享内存存储。
上述计划都是针对过程间通信,那么是否让 Python 和 Java 运行在同一个过程里,从而齐全打消过程间通信带来的困扰?
的确有一些现有的库在这方面做了尝试,第一种计划是将 Python 转成 Java。比方 p2j 是把 Python 的 source code 转成 Java 的 source code,voc 是把 Python 代码间接转成 Java 的 bytecode,这种计划的实质就是将 Python 转成一套能够间接运行在 JVM 之上的代码。但这套计划也存在不小的缺点,因为 Python 是在一直地倒退,它有各种语法,而将 Python 语法映射到 Java 中对应的对象是很艰难的,它们毕竟是不同的语言。
第二种计划是基于 Java 实现的 Python 解释器。首先是 Jython 计划,Python 其实是用 c 语言写的一套 Python 解释器,c 写的 Python 解释器能够运行在 c 之上,那么 Java 实现的 Python 解释器也就能够间接运行在 JVM 之上。另外一种计划是 Graalvm,它提供了一种 truffle framework 的形式,能够反对各种编程语言应用独特的构造,这种构造能运行在 JVM 之上,也就能够让各种语言运行在同一个过程里。
上述计划实现的前提是可能辨认 Python code,也就意味着要能兼容现有的各种 Python code,然而目前来看,兼容是一个难以解决的问题,因而也就阻止了这套 Python 转成 Java 计划持续推广的可能性。
第三种是基于 FFI 的一套计划。
FFI 的实质就是 host language 如何调用一个 guest language,即 Java 与 Python 之间的相互调用,对应的具体实现计划有很多种。
Java 提供了 JNI (Java native interface),让 Java 用户可能通过 JNI 的接口调用 c 实现的一些 lib,反过来也同样实用。有了这套接口之后,JVM 的厂商就会依据这套接口去实现 JNI,从而实现 Java 与 c 之间的相互调用。
Python/C API 也是相似的,Python 是一套 c 实现的解释器,因而能很好地反对 Python 代码调用 c 的三方库,反之也同样实用。
Cython 提供了一个工具,可能将 source code 转换成另一种语言能辨认的代码。比方将 Python 代码转换成一套十分高效的 c 语言代码,再嵌入到 cPython 解释器中即可间接运行,十分高效。
Ctypes 是通过将 c 的 library 封装起来,使得 Python 能高效地调用 c 的 library。
上述提到基于 FFI 的计划的外围就是 c。有了 c 这个桥梁之后,一个 Java 写成的代码,通过 JNI 接口就能调用到 c,而后由 c 去调用 cPython API 的接口,最终实现 Java 和 Python 运行在同一个线程里,这就是 PEMJA 的整体思路。解决了过程间通信的问题,以及因为它自身是应用的是本人提供的 Python/C API,也就不存在兼容性的问题,克服了 Java 实现解释器的缺点。
上图展现了基于这套思维的几种实现,但这几种实现都或多或少存在一些问题。
JPype 解决的问题是 Python 调用 Java 的问题,不反对 Java 调用 Python,所以它并不实用这个场景。
JEP 实现了 Java 调用 Python,但它的具体实现存在很多限度,一是只能用源码装置,对环境的要求十分高,以及它须要依赖 cPython 三方的一些 .source 文件,十分不利于跨平台的装置应用。JEP 的启动入口必须是 JEP 的程序,须要动静加载类库,必须提前在环境变量中设置好,十分不利于它作为一个第三方的中间件插件运行在另一个架构上。此外还有性能上问题,它没有很好地克服现有的 Python GIL 的问题,所以导致它的性能并不是那么高效。
而 PEMJA 根本克服了上述问题,更好的实现了 Java 和 Python 相互调用。
上图是几种框架的性能比照。这里应用了一个比拟规范简略的 String upper 函数。这里次要比拟的是框架层的开销,并不是自定义函数的性能,所以应用了一个最简略的函数。同时,思考到现有的各种函数最罕用的数据结构是 String,所以这里应用了 String。
这里别离比照的是 100 个 bytes 和 1000 个 bytes 在这 4 种解释器下的性能,能够看到 Jython 并没有像设想中那么高效,反而是这 4 种实现计划中性能最低的。JEP 的性能也远远比不上 PEMJA,PEMJA 在 100 bytes 的时候大略是纯 Java 实现的 40%,1000 bytes 的状况下性能竟然超过了纯 Java 的实现。
如何解释这个景象呢?String upper 自身是一套 Java 的实现,而在 Python 中它是 .c 的实现,函数自身的执行效率比 Java 高,再联合框架开销足够小的状况,整体的性能反而比 Java 更高,也就意味着在某些场景下,Python UDF 的性能是有可能超过 Java UDF 的。
当初很多用户应用 Java UDF 而不应用 Python UDF 的一个关键点是 Python UDF 性能远远比不上 Java。然而如果 Java 的性能并没有比 Python 更好的话,Python 反而就有了劣势,因为它毕竟是一种脚本语言,写起来是更不便。
上图展现了 PEMJA 的架构。
Java 中的 damond thread 负责初始化以及最初的销毁以及在 PEMJA 和对应的 Python PVM 里创立及开释资源。用户应用的是 Java 中的 PEMJA 实例,实例映射到 PEMJA 中对应 PEMJA 的 instance,instant 会创立每一个 Python 的 sub interpreter。Python double interpreter 绝对于全局 Python interpreter,是一个更小的可能掌控 GIL 的概念,它有本人独立的 hip 空间,所以可能实现命名空间的隔离。这里的每一个 thread 都会对应一个 Python sub interpret,能够在对应的 PVM 里执行本人的 Python function。
四、PyFlink Runtime 2.0
PyFlink Runtime 2.0 就是基于 PEMJA 做的。
上图右边是 PyFlink 1.0 的架构。外面有两个过程,一个是 Java 过程,一个是 Python 过程。它们之间的数据交互是通过 data service 和 State service 实现,应用了过程 IPC 通信。
有了 PEMJA 之后,就能够把 data service 和 State service 替换成 PEMJA Lib,随即能够把右边原来的 JVM 和左边的 PVM 运行在同一个过程里,从而彻底解掉的 IPC 过程通信的问题。
上图将现有的 PyFlink UDF、PyFlink 基于 PEMJA 的一套 UDF 以及 Java UDF 做了性能比照。也是应用 String upper 函数,比拟 100 bytes 和 1000 bytes 的性能。能够看到,在 100 bytes 的状况下,UDF on PEMJA 的实现曾经根本达到 Java UDF 的 50% 的性能。在 1000 bytes 的状况下,UDF on PEMJA 的性能曾经超过了 Java UDF。尽管这和实现了自定义的函数无关,但也能阐明这套 PEMJA 框架的性能之高效。
五、Future Work
将来,会开源 PEMJA 框架 (已于 2022 年 1 月 14 日正式开源),因为它波及到通用的解决方案,不仅仅是使用在 PyFlink 之上,各种 Java 和 Python 相互调用的计划也都能够利用这套框架,所以会对 PEMJA 框架做一个独立的开源。它的第一个版本临时只反对 Java 调用 Python 性能,后续会反对 Python 调用 Java 的性能,因为 Python Datastream API 用 Python 写的函数调用 State 是依赖于 Python 调用 Java 的性能。此外,将实现 PEMJA 反对 Numpy 原生数据结构,实现了这个反对之后,pandas UDF 也就得以使用,性能将会失去质的飞跃。
欢送大家退出“PyFlink 交换群”,交换 PyFlink 相干的问题。
Flink CDC Meetup · Online
工夫:5 月 21 日 9:00-12:25
PC 端直播观看:https://developer.aliyun.com/…
挪动端 倡议微信扫一扫关注 ApacheFlink 视频号预约观看:
更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~
流动举荐
阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
99 元试用 实时计算 Flink 版(包年包月、10CU)即有机会取得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
理解流动详情:https://www.aliyun.com/produc…