关于Flink:PyFlink-Table-API-Python-自定义函数

31次阅读

共计 13446 个字符,预计需要花费 34 分钟才能阅读完成。

作者:付典

背景

Python 自定义函数是 PyFlink Table API 中最重要的性能之一,其容许用户在 PyFlink Table API 中应用 Python 语言开发的自定义函数,极大地拓宽了 Python Table API 的应用范畴。

目前 Python 自定义函数的性能曾经十分欠缺,反对多种类型的自定义函数,比方 UDF(scalar function)、UDTF(table function)、UDAF(aggregate function),UDTAF(table aggregate function,1.13 反对)、Panda UDF、Pandas UDAF 等。接下来,咱们具体介绍一下如何在 PyFlink Table API 作业中应用 Python 自定义函数。

Python 自定义函数根底

依据输出 / 输入数据的行数,Flink Table API & SQL 中,自定义函数能够分为以下几类:

自定义函数 Single Row Input Multiple Row Input
Single Row Output ScalarFunction AggregateFunction
Multiple Row Output TableFunction TableAggregateFunction

PyFlink 针对以上四种类型的自定义函数都提供了反对,接下来,咱们别离看一下每种类型的自定义函数如何应用。

Python UDF

Python UDF,即 Python ScalarFunction,针对每一条输出数据,仅产生一条输入数据。比方以下示例,展现了通过多种形式,来定义名字为 “sub_string” 的 Python UDF:

from pyflink.table.udf import udf, FunctionContext, ScalarFunction
from pyflink.table import DataTypes

形式一:@udf(result_type=DataTypes.STRING())
def sub_string(s: str, begin: int, end: int):
    return s[begin:end]

形式二:
sub_string = udf(lambda s, begin, end: s[begin:end], result_type=DataTypes.STRING())

形式三:class SubString(object):
    def __call__(self, s: str, begin: int, end: int):
        return s[begin:end]

sub_string = udf(SubString(), result_type=DataTypes.STRING())

形式四:def sub_string(s: str, begin: int, end: int):
    return s[begin:end]

sub_string_begin_1 = udf(functools.partial(sub_string, begin=1), result_type=DataTypes.STRING())

形式五:class SubString(ScalarFunction):
    def open(self, function_context: FunctionContext):
        pass

    def eval(self, s: str, begin: int, end: int):
        return s[begin:end]

sub_string = udf(SubString(), result_type=DataTypes.STRING())

阐明:

  • 须要通过名字为“udf”的装璜器,申明这是一个 scalar function;
  • 须要通过装璜器中的 result_type 参数,申明 scalar function 的后果类型;
  • 上述形式五,通过继承 ScalarFunction 的形式来定义 Python UDF 有以下用途:

    • ScalarFunction 的基类 UserDefinedFunction 中定义了一个 open 办法,该办法只在作业初始化时执行一次,因而能够利用该办法,做一些初始化工作,比方加载机器学习模型、连贯内部服务等。
    • 此外,还能够通过 open 办法中的 function_context 参数,注册及应用 metrics。
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)

table = t_env.from_elements([("hello", 1), ("world", 2), ("flink", 3)], ['a', 'b'])
table.select(sub_string(table.a, 1, 3))

Python UDTF

Python UDTF,即 Python TableFunction,针对每一条输出数据,Python UDTF 能够产生 0 条、1 条或者多条输入数据,此外,一条输入数据能够蕴含多个列。比方以下示例,定义了一个名字为 split 的 Python UDF,以指定字符串为分隔符,将输出字符串切分成两个字符串:

from pyflink.table.udf import udtf
from pyflink.table import DataTypes

@udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])
def split(s: str, sep: str):
    splits = s.split(sep)
    yield splits[0], splits[1]

阐明:

  • 须要通过名字为“udtf”的装璜器,申明这是一个 table function;
  • 须要通过装璜器中的 result_types 参数,申明 table function 的后果类型。因为 table function 每条输入能够蕴含多个列,result_types 须要指定所有输入列的类型;
  • Python UDTF 的定义,也反对 Python UDF 章节中所列出的多种定义形式,这里只展现了其中一种。

定义完 Python UDTF 之后,能够间接在 Python Table API 中应用:

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)

table = t_env.from_elements([("hello|word", 1), ("abc|def", 2)], ['a', 'b'])

table.join_lateral(split(table.a, '|').alias("c1, c2"))
table.left_outer_join_lateral(split(table.a, '|').alias("c1, c2"))

Python UDAF

Python UDAF,即 Python AggregateFunction。Python UDAF 用来针对一组数据进行聚合运算,比方同一个 window 下的多条数据、或者同一个 key 下的多条数据等。针对同一组输出数据,Python AggregateFunction 产生一条输入数据。比方以下示例,定义了一个名字为 weighted_avg 的 Python UDAF:

from pyflink.common import Row
from pyflink.table import AggregateFunction, DataTypes
from pyflink.table.udf import udaf


class WeightedAvg(AggregateFunction):

    def create_accumulator(self):
        # Row(sum, count)
        return Row(0, 0)

    def get_value(self, accumulator: Row) -> float:
        if accumulator[1] == 0:
            return 0
        else:
            return accumulator[0] / accumulator[1]

    def accumulate(self, accumulator: Row, value, weight):
        accumulator[0] += value * weight
        accumulator[1] += weight

    def retract(self, accumulator: Row, value, weight):
        accumulator[0] -= value * weight
        accumulator[1] -= weight


weighted_avg = udaf(f=WeightedAvg(),
                    result_type=DataTypes.DOUBLE(),
                    accumulator_type=DataTypes.ROW([DataTypes.FIELD("f0", DataTypes.BIGINT()),
                        DataTypes.FIELD("f1", DataTypes.BIGINT())]))

阐明:

  • 须要通过名字为“udaf”的装璜器,申明这是一个 aggregate function,
  • 须要别离通过装璜器中的 result_type 及 accumulator_type 参数,申明 aggregate function 的后果类型及 accumulator 类型;
  • create_accumulator,get_value 和 accumulate 这 3 个办法必须要定义,retract 办法能够依据须要定义,详细信息能够参见 Flink 官网文档 [1];须要留神的是,因为必须定义 create_accumulator,get_value 和 accumulate 这 3 个办法,Python UDAF 只能通过继承 AggregateFunction 的形式进行定义(Pandas UDAF 没有这方面的限度)。

定义完 Python UDAF 之后,能够在 Python Table API 中这样应用:

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)

t = t_env.from_elements([(1, 2, "Lee"), (3, 4, "Jay"), (5, 6, "Jay"), (7, 8, "Lee")],
                        ["value", "count", "name"])

t.group_by(t.name).select(weighted_avg(t.value, t.count).alias("avg"))

Python UDTAF

Python UDTAF,即 Python TableAggregateFunction。Python UDTAF 用来针对一组数据进行聚合运算,比方同一个 window 下的多条数据、或者同一个 key 下的多条数据等,与 Python UDAF 不同的是,针对同一组输出数据,Python UDTAF 能够产生 0 条、1 条、甚至多条输入数据。

以下示例,定义了一个名字为 Top2 的 Python UDTAF:

from pyflink.common import Row
from pyflink.table import DataTypes
from pyflink.table.udf import udtaf, TableAggregateFunction

class Top2(TableAggregateFunction):

    def create_accumulator(self):
        # 存储以后最大的两个值
        return [None, None]

    def accumulate(self, accumulator, input_row):
        if input_row[0] is not None:
            # 新的输出值最大
            if accumulator[0] is None or input_row[0] > accumulator[0]:
                accumulator[1] = accumulator[0]
                accumulator[0] = input_row[0]
            # 新的输出值次大
            elif accumulator[1] is None or input_row[0] > accumulator[1]:
                accumulator[1] = input_row[0]

    def emit_value(self, accumulator): 
        yield Row(accumulator[0])
        if accumulator[1] is not None:
            yield Row(accumulator[1])

top2 = udtaf(f=Top2(),
             result_type=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())]),
             accumulator_type=DataTypes.ARRAY(DataTypes.BIGINT()))

阐明:

  • Python UDTAF 性能是 Flink 1.13 之后反对的新性能;
  • create_accumulator,accumulate 和 emit_value 这 3 个办法必须定义,此外 TableAggregateFunction 中反对 retract、merge 等办法,能够依据须要抉择是否定义,详细信息能够参见 Flink 官网文档 [2]。

定义完 Python UDTAF 之后,能够在 Python Table API 中这样应用:

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)

t = t_env.from_elements([(1, 'Hi', 'Hello'),
                         (3, 'Hi', 'hi'),
                         (5, 'Hi2', 'hi'),
                         (2, 'Hi', 'Hello'),
                         (7, 'Hi', 'Hello')],
                        ['a', 'b', 'c'])

t_env.execute_sql("""
       CREATE TABLE my_sink (
         word VARCHAR,
         `sum` BIGINT
       ) WITH ('connector' = 'print')
    """)

result = t.group_by(t.b).flat_aggregate(top2).select("b, a").execute_insert("my_sink")

# 1)期待作业执行完结,用于 local 执行,否则可能作业尚未执行完结,该脚本已退出,会导致 minicluster 过早退出
# 2)当作业通过 detach 模式往 remote 集群提交时,比方 YARN/Standalone/K8s 等,须要移除该办法
result.wait()

当执行以上程序,能够看到相似如下输入:

11> +I[Hi, 7]
10> +I[Hi2, 5]
11> +I[Hi, 3]

阐明:

  • Python UDTAF 只能用于 Table API,不能用于 SQL 语句中;
  • flat_aggregate 的后果蕴含了原始的 grouping 列以及 UDTAF(top 2)的输入,因而,能够在 select 中拜访列“b”。

Python 自定义函数进阶

在纯 SQL 作业中应用 Python 自定义函数

Flink SQL 中的 CREATE FUNCTION 语句反对注册 Python 自定义函数,因而用户除了能够在 PyFlink Table API 作业中应用 Python 自定义函数之外,还能够在纯 SQL 作业中应用 Python 自定义函数。

CREATE TEMPORARY FUNCTION sub_string AS 'test_udf.sub_string' LANGUAGE PYTHON

CREATE TABLE source (a VARCHAR) WITH ('connector' = 'datagen');

CREATE TABLE sink (a VARCHAR) WITH ('connector' = 'print');

INSERT INTO sink
SELECT sub_string(a, 1, 3)
FROM source;

在 Java 作业中应用 Python 自定义函数

用户能够通过 DDL 的形式注册 Python 自定义函数,这意味着,用户也能够在 Java Table API 作业中应用 Python 自定义函数,比方:

TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
tEnv.executeSql("CREATE TEMPORARY FUNCTION sub_string AS'test_udf.sub_string'LANGUAGE PYTHON");
tEnv.createTemporaryView("source", tEnv.fromValues("hello", "world", "flink").as("a"));
tEnv.executeSql("SELECT sub_string(a) FROM source").collect();

具体示例能够参见 PyFlink Playground [3]。

该性能的一个重要用途是将 Java 算子与 Python 算子混用。用户能够应用 Java 语言来开发绝大部分的作业逻辑,当作业逻辑中的某些局部必须应用 Python 语言来编写时,能够通过如上形式来调用应用 Python 语言编写的自定义函数。

如果是 DataStream 作业,能够先将 DataStream 转换成 Table,而后再通过上述形式,调用 Python 语言编写的自定义函数。

依赖治理

在 Python 自定义函数中拜访第三方 Python 库是十分常见的需要,另外,在机器学习预测场景中,用户也可能须要在 Python 自定义函数中加载一个机器学习模型。当咱们通过 local 模式执行 PyFlink 作业时,能够将第三方 Python 库装置在本地 Python 环境中,或者将机器学习模型下载到本地;然而当咱们将 PyFlink 作业提交到近程执行的时候,这也可能会呈现一些问题:

  • 第三方 Python 库如何被 Python 自定义函数拜访。不同的作业,对于 Python 库的版本要求是不一样的,将第三方 Python 库预装置到集群的 Python 环境中,只实用于装置一些公共的依赖,不能解决不同作业对于 Python 依赖个性化的需要;
  • 机器学习模型或者数据文件,如何散发到集群节点,并最终被 Python 自定义函数拜访。

除此之外,依赖可能还包含 JAR 包等,PyFlink 中针对各种依赖提供了多种解决方案:

依赖类型 解决方案 用处形容 示例(flink run)
flink run 参数 配置项 API
作业入口文件 -py / –python 指定作业的入口文件,只能是.py 文件 -py file:///path/to/table_api_demo.py
作业入口 entry module -pym / –pyModule 指定作业的 entry module,性能和 –python 相似,可用于当作业的 Python 文件为 zip 包等状况,无奈通过 –python 指定的时候,相比 –python 来说,更通用 -pym table_api_demo-pyfs file:///path/to/table_api_demo.py
Python 三方库文件 -pyfs / –pyFiles python.files add_python_file 指定一个到多个 Python 文件(.py/.zip/.whl 等,逗号宰割),这些 Python 文件在作业执行时,会放到 Python 过程的 PYTHONPATH 中,能够在 Python 自定义函数中间接拜访 -pyfs file:///path/to/table_api_demo.py,file:///path/to/deps.zip
存档文件 -pyarch /–pyArchives python.archives add_python_archive 指定一个到多个存档文件(逗号宰割),这些存档文件,在作业执行的时候,会被解压,并放到 Python 过程的工作目录,能够通过相对路径的形式进行拜访 -pyarchfile:///path/to/venv.zip
Python 解释器门路 -pyexec / –pyExecutable python.executable set_python_executable 指定作业执行时,所应用的 Python 解释器门路 -pyarchfile:///path/to/venv.zip-pyexec venv.zip/venv/bin/python3
requirements 文件 -pyreq / –pyRequirements python.requirements set_python_requirements 指定 requirements 文件,requirements 文件中定义了作业的 Python 三方库依赖,作业执行时,会依据 requirements 的内容,通过 pip 装置相干依赖 -pyreq requirements.txt
JAR 包 pipeline.classpaths,pipeline.jars 没有专门的 API,能够通过 configuration 的 set_string 办法设置 指定作业依赖的 JAR 包,通常用于指定 connector JAR 包

阐明:

  • 须要留神的是,Python UDF 的实现所在的文件,也须要在作业执行的时候,作为依赖文件上传;
  • 能够通过合用“存档文件”与“Python 解释器门路”,指定作业应用上传的 Python 虚拟环境来执行,比方:
table_env.add_python_archive("/path/to/py_env.zip")

# 指定应用 py_env.zip 包中带的 python 来执行用户自定义函数,必须通过相对路径来指定
table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python")
  • 举荐用户应用 conda 来构建 Python 虚拟环境,conda 构建的 Python 虚拟环境蕴含了执行 Python 所需的绝大多数底层库,能够极大地防止当本地环境与集群环境不一样时,所构建的 Python 虚拟环境在集群执行时,短少各种底层依赖库的问题。对于如何应用 conda 构建的 Python 虚拟环境,能够参考阿里云 VVP 文档中“应用 Python 三方包”章节的介绍 [4]
  • 有些 Python 三方库须要装置能力应用,即并非”将其下载下来就能够间接放到 PYTHONPATH 中援用“,针对这种类型的 Python 三方库,有两种解决方案:

    • 将其装置在 Python 虚拟环境之中,指定作业运行应用所构建的 Python 虚拟环境;
    • 找一台与集群环境雷同的机器(或 docker),装置所需的 Python 三方库,而后将安装文件打包。该形式绝对于 Python 虚拟环境来说,打包文件比拟小。详情能够参考阿里云 VVP 文档中“应用自定义的 Python 虚拟环境”章节的介绍 [5]。

调试

PyFlink 反对用户通过近程调试的形式,来调试 Python 自定义函数,具体方法能够参见文章“如何从 0 到 1 开发 PyFlink API 作业”[6] 中“近程调试”章节的介绍。

另外,用户还能够在 Python 自定义函数中,通过 logging 的形式,打印日志。须要留神的是,日志输入须要在 TaskManager 的日志文件中查看,而不是以后 console。具体应用形式,请参见“如何从 0 到 1 开发 PyFlink API 作业”[6] 中“自定义日志”章节的介绍。须要留神的是,当通过 local 形式运行作业的时候,TM 的日志位于 PyFlink 的装置目录,比方:

\>>> import pyflink

[‘/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink’]

调优

Python 自定义函数的性能在很大水平上取决于 Python 自定义函数本身的实现,如果遇到性能问题,您首先须要想方法尽可能优化 Python 自定义函数的实现。

除此之外,Python 自定义函数的性能也受以下参数取值的影响。

参数 阐明
python.fn-execution.bundle.size Python 自定义函数的执行是异步的,在作业执行过程中,Java 算子将数据异步发送给 Python 过程进行解决。Java 算子在将数据发送给 Python 过程之前,会先将数据缓存起来,达到肯定阈值之后,再发送给 Python 过程。python.fn-execution.bundle.size 参数可用来管制可缓存的数据最大条数,默认值为 100000。
python.fn-execution.bundle.time 用来控制数据的最大缓存工夫。当缓存的数据条数达到 python.fn-execution.bundle.size 定义的阈值或缓存工夫达到 python.fn-execution.bundle.time 定义的阈值时,会触发缓存数据的计算。默认值为 1000,单位是毫秒。
python.fn-execution.arrow.batch.size 用来管制当应用 Pandas UDF 时,一个 arrow batch 可包容的数据最大条数,默认值为 10000。阐明 python.fn-execution.arrow.batch.size 参数值不能大于 python.fn-execution.bundle.size 参数值。

阐明:

  • checkpoint 时,会触发缓存数据的计算,因而当上述参数配置的值过大时,可能会导致 checkpoint 时须要解决过多的数据,从而导致 checkpoint 工夫过长,甚至会导致 checkpoint 失败。当遇到作业的 checkpoint 工夫比拟长的问题时,能够尝试缩小上述参数的取值。

常见问题

1)Python 自定义函数的理论返回值类型与 result_type 中申明的类型不统一,该问题会导致 Java 算子在收到 Python 自定义函数的执行后果,进行反序列化时报错,谬误堆栈相似:

Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_261]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) ~[flink-python_2.11-1.12.0.jar:1.12.0]

2)在 Python 自定义函数的 init 办法里实例化了一个不能被 cloudpickle 序列化的对象。

在提交作业的时候,PyFlink 会通过 cloudpickle 序列化 Python 自定义函数,若 Python 自定义函数蕴含不能被 cloudpickle 序列化的对象,则会遇到相似谬误:TypeError: can’t pickle xxx,能够将这种变量放在 open 办法里初始化。

3)在 Python 自定义函数的 init 办法里 load 一个十分大的数据文件。

因为在提交作业的时候,PyFlink 会通过 cloudpickle 序列化 Python 自定义函数,若在 init 办法里 load 一个十分大的数据文件,则整个数据文件都会被序列化并作为 Python 自定义函数实现的一部分,若数据文件十分大,可能会导致作业执行失败,能够将 load 数据文件的操作放在 open 办法里执行。

4)客户端 Python 环境与集群端 Python 环境不统一,比方 Python 版本不统一、PyFlink 版本不统一(大版本须要保持一致,比方都为 1.12.x)等。

总结

在这篇文章中,咱们次要介绍了各种 Python 自定义函数的定义及应用形式,以及 Python 依赖治理、Python 自定义函数调试及调优等方面的信息,心愿能够帮忙用户理解 Python 自定义函数。接下来,咱们会持续推出 PyFlink 系列文章,帮忙 PyFlink 用户深刻理解 PyFlink 中各种性能、利用场景、最佳实际等。

另外,阿里云实时计算生态团队长期招聘优良大数据人才(包含实习 + 社招),咱们的工作包含:

  • 实时机器学习:反对机器学习场景下实时特色工程和 AI 引擎配合,基于 Apache Flink 及其生态打造实时机器学习的规范,推动例如搜寻、举荐、广告、风控等场景的全面实时化;
  • 大数据 + AI 一体化:包含编程语言一体化 (PyFlink 相干工作),执行引擎集成化 (TF on Flink),工作流及治理一体化(Flink AI Flow)。

如果你对开源、大数据或者 AI 感兴趣,请发简历到:fudian.fd@alibaba-inc.com

援用链接

[1] https://ci.apache.org/project…

[2] https://ci.apache.org/project…

[3] https://github.com/pyflink/pl…

[4] https://help.aliyun.com/docum…

[5] https://help.aliyun.com/docum…

[6] https://mp.weixin.qq.com/s/Gy…

正文完
 0