简介:以 Flink 1.12 为例,介绍如何应用 Python 语言,通过 PyFlink API 来开发 Flink 作业。
Apache Flink 作为以后最风行的流批对立的计算引擎,在实时 ETL、事件处理、数据分析、CEP、实时机器学习等畛域都有着宽泛的利用。从 Flink 1.9 开始,Apache Flink 社区开始在原有的 Java、Scala、SQL 等编程语言的根底之上,提供对于 Python 语言的反对。通过 Flink 1.9 ~ 1.12 以及行将公布的 1.13 版本的多个版本的开发,目前 PyFlink API 的性能曾经日趋完善,能够满足绝大多数状况下 Python 用户的需要。接下来,咱们以 Flink 1.12 为例,介绍如何应用 Python 语言,通过 PyFlink API 来开发 Flink 作业。内容包含:
- 环境筹备
- 作业开发
- 作业提交
- 问题排查
- 总结
GitHub 地址
https://github.com/apache/flink
欢送大家给 Flink 点赞送 star~
环境筹备
第一步:装置 Python
PyFlink 仅反对 Python 3.5+,您首先须要确认您的开发环境是否已装置了 Python 3.5+,如果没有的话,首先须要装置 Python 3.5+。
第二步:装置 JDK
咱们晓得 Flink 的运行时是应用 Java 语言开发的,所以为了执行 Flink 作业,您还须要装置 JDK。Flink 提供了对于 JDK 8 以及 JDK 11 的全面反对,您须要确认您的开发环境中是否曾经装置了上述版本的 JDK,如果没有的话,首先须要装置 JDK。
第三步:装置 PyFlink
接下来须要装置 PyFlink,能够通过以下命令进行装置:
# 创立 Python 虚拟环境python3 -m pip install virtualenvvirtualenv -p `which python3` venv# 应用上述创立的 Python 虚拟环境./venv/bin/activate# 装置 PyFlink 1.12python3 -m pip install apache-flink==1.12.2
作业开发
PyFlink Table API 作业
咱们首先介绍一下如何开发 PyFlink Table API 作业。
■ 1)创立 TableEnvironment 对象
对于 Table API 作业来说,用户首先须要创立一个 TableEnvironment 对象。以下示例定义了一个 TableEnvironment 对象,应用该对象的定义的作业,运行在流模式,且应用 blink planner 执行。
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()t_env = StreamTableEnvironment.create(environment_settings=env_settings)
■ 2)配置作业的执行参数
能够通过以下形式,配置作业的执行参数。以下示例将作业的默认并发度设置为4。
t_env.get_config().get_configuration().set_string('parallelism.default', '4')
■ 3)创立数据源表
接下来,须要为作业创立一个数据源表。PyFlink 中提供了多种形式来定义数据源表。
形式一:from\_elements
PyFlink 反对用户从一个给定列表,创立源表。以下示例定义了蕴含了 3 行数据的表:[("hello", 1), ("world", 2), ("flink", 3)],该表有 2 列,列名别离为 a 和 b,类型别离为 VARCHAR 和 BIGINT。
tab = t_env.from_elements([("hello", 1), ("world", 2), ("flink", 3)], ['a', 'b'])
阐明:
- 这种形式通常用于测试阶段,能够疾速地创立一个数据源表,验证作业逻辑
- from\_elements 办法能够接管多个参数,其中第一个参数用于指定数据列表,列表中的每一个元素必须为 tuple 类型;第二个参数用于指定表的 schema
形式二:DDL
除此之外,数据也能够来自于一个内部的数据源。以下示例定义了一个名字为my\_source,类型为 datagen 的表,表中有两个类型为 VARCHAR 的字段。
t_env.execute_sql(""" CREATE TABLE my_source ( a VARCHAR, b VARCHAR ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '10' ) """)tab = t_env.from_path('my_source')
阐明:
- 通过 DDL 的形式来定义数据源表是目前最举荐的形式,且所有 Java Table API & SQL 中反对的 connector,都能够通过 DDL 的形式,在 PyFlink Table API 作业中应用,具体的 connector 列表请参见 Flink 官网文档 [1]。
- 以后仅有局部 connector 的实现蕴含在 Flink 官网提供的发行包中,比方 FileSystem,DataGen、Print、BlackHole 等,大部分 connector 的实现以后没有蕴含在 Flink 官网提供的发行包中,比方 Kafka、ES 等。针对没有蕴含在 Flink 官网提供的发行包中的 connector,如果须要在 PyFlink 作业中应用,用户须要显式地指定相应 FAT JAR,比方针对 Kafka,须要应用 JAR 包 [2],JAR 包能够通过如下形式指定:
# 留神:file:///前缀不能省略t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar")
形式三:catalog
hive_catalog = HiveCatalog("hive_catalog")t_env.register_catalog("hive_catalog", hive_catalog)t_env.use_catalog("hive_catalog")# 假如hive catalog中曾经定义了一个名字为source_table的表tab = t_env.from_path('source_table')
这种形式和 DDL 的形式相似,只不过表的定义当时曾经注册到了 catalog 中了,不须要在作业中从新再定义一遍了。
■ 4)定义作业的计算逻辑
形式一:通过 Table API
失去 source 表之后,接下来就能够应用 Table API 中提供的各种操作,定义作业的计算逻辑,对表进行各种变换了,比方:
@udf(result_type=DataTypes.STRING())def sub_string(s: str, begin: int, end: int): return s[begin:end]transformed_tab = tab.select(sub_string(col('a'), 2, 4))
形式二:通过 SQL 语句
除了能够应用 Table API 中提供的各种操作之外,也能够间接通过 SQL 语句来对表进行变换,比方上述逻辑,也能够通过 SQL 语句来实现:
t_env.create_temporary_function("sub_string", sub_string)transformed_tab = t_env.sql_query("SELECT sub_string(a, 2, 4) FROM %s" % tab)
阐明:
- TableEnvironment 中提供了多种形式用于执行 SQL 语句,其用处略有不同:
■ 5)查看执行打算
用户在开发或者调试作业的过程中,可能须要查看作业的执行打算,能够通过如下形式。
形式一:Table.explain
比方,当咱们须要晓得 transformed\_tab 以后的执行打算时,能够执行:print(transformed\_tab.explain()),能够失去如下输入:
== Abstract Syntax Tree ==LogicalProject(EXPR$0=[sub_string($0, 2, 4)])+- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]]])== Optimized Logical Plan ==PythonCalc(select=[sub_string(a, 2, 4) AS EXPR$0])+- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]]], fields=[a])== Physical Execution Plan ==Stage 1 : Data Source content : Source: PythonInputFormatTableSource(a) Stage 2 : Operator content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]], fields=[a]) ship_strategy : FORWARD Stage 3 : Operator content : StreamExecPythonCalc ship_strategy : FORWARD
形式二:TableEnvironment.explain\_sql
形式一实用于查看某一个 table 的执行打算,有时候并没有一个现成的 table 对象可用,比方:
print(t_env.explain_sql("INSERT INTO my_sink SELECT * FROM %s " % transformed_tab))
其执行打算如下所示:
== Abstract Syntax Tree ==LogicalSink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0])+- LogicalProject(EXPR$0=[sub_string($0, 2, 4)]) +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]]])== Optimized Logical Plan ==Sink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0])+- PythonCalc(select=[sub_string(a, 2, 4) AS EXPR$0]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]]], fields=[a])== Physical Execution Plan ==Stage 1 : Data Source content : Source: PythonInputFormatTableSource(a) Stage 2 : Operator content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]], fields=[a]) ship_strategy : FORWARD Stage 3 : Operator content : StreamExecPythonCalc ship_strategy : FORWARD Stage 4 : Data Sink content : Sink: Sink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0]) ship_strategy : FORWARD
■ 6)写出后果数据
形式一:通过 DDL
和创立数据源表相似,也能够通过 DDL 的形式来创立后果表。
t_env.execute_sql(""" CREATE TABLE my_sink ( `sum` VARCHAR ) WITH ( 'connector' = 'print' ) """)table_result = transformed_tab.execute_insert('my_sink')
阐明:
- 当应用 print 作为 sink 时,作业后果会打印到规范输入中。如果不须要查看输入,也能够应用 blackhole 作为 sink。
形式二:collect
也能够通过 collect 办法,将 table 的后果收集到客户端,并逐条查看。
table_result = transformed_tab.execute()with table_result.collect() as results: for result in results: print(result)
阐明:
- 该形式能够不便地将 table 的后果收集到客户端并查看
- 因为数据最终会收集到客户端,所以最好限度一下数据条数,比方:
transformed\_tab.limit(10).execute(),限度只收集 10 条数据到客户端
形式三:to\_pandas
也能够通过 to\_pandas 办法,将 table 的后果转换成 pandas.DataFrame 并查看。
result = transformed_tab.to_pandas()print(result)
能够看到如下输入:
_c00 321 e62 8b3 be4 4f5 b46 a67 498 359 6b
阐明:
- 该形式与 collect 相似,也会将 table 的后果收集到客户端,所以最好限度一下后果数据的条数
■ 7)总结
残缺的作业示例如下:
from pyflink.table import DataTypes, EnvironmentSettings, StreamTableEnvironmentfrom pyflink.table.expressions import colfrom pyflink.table.udf import udfdef table_api_demo(): env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() t_env = StreamTableEnvironment.create(environment_settings=env_settings) t_env.get_config().get_configuration().set_string('parallelism.default', '4') t_env.execute_sql(""" CREATE TABLE my_source ( a VARCHAR, b VARCHAR ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '10' ) """) tab = t_env.from_path('my_source') @udf(result_type=DataTypes.STRING()) def sub_string(s: str, begin: int, end: int): return s[begin:end] transformed_tab = tab.select(sub_string(col('a'), 2, 4)) t_env.execute_sql(""" CREATE TABLE my_sink ( `sum` VARCHAR ) WITH ( 'connector' = 'print' ) """) table_result = transformed_tab.execute_insert('my_sink') # 1)期待作业执行完结,用于local执行,否则可能作业尚未执行完结,该脚本已退出,会导致minicluster过早退出 # 2)当作业通过detach模式往remote集群提交时,比方YARN/Standalone/K8s等,须要移除该办法 table_result.wait()if __name__ == '__main__': table_api_demo()
执行后果如下:
4> +I(a1)3> +I(b0)2> +I(b1)1> +I(37)3> +I(74)4> +I(3d)1> +I(07)2> +I(f4)1> +I(7f)2> +I(da)
PyFlink DataStream API 作业
■ 1)创立 StreamExecutionEnvironment 对象
对于 DataStream API 作业来说,用户首先须要定义一个 StreamExecutionEnvironment 对象。
env = StreamExecutionEnvironment.get_execution_environment()
■ 2)配置作业的执行参数
能够通过以下形式,配置作业的执行参数。以下示例将作业的默认并发度设置为4。
env.set_parallelism(4)
■ 3)创立数据源
接下来,须要为作业创立一个数据源。PyFlink 中提供了多种形式来定义数据源。
形式一:from\_collection
PyFlink 反对用户从一个列表创立源表。以下示例定义了蕴含了 3 行数据的表:[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],该表有 2 列,列名别离为 a 和 b,类型别离为 VARCHAR 和 BIGINT。
ds = env.from_collection( collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')], type_info=Types.ROW([Types.INT(), Types.STRING()]))
阐明:
- 这种形式通常用于测试阶段,能够不便地创立一个数据源
- from\_collection 办法能够接管两个参数,其中第一个参数用于指定数据列表;第二个参数用于指定数据的类型
形式二:应用 PyFlink DataStream API 中定义的 connector
此外,也能够应用 PyFlink DataStream API 中曾经反对的 connector,须要留神的是,1.12 中仅提供了 Kafka connector 的反对。
deserialization_schema = JsonRowDeserializationSchema.builder() \ .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()kafka_consumer = FlinkKafkaConsumer( topics='test_source_topic', deserialization_schema=deserialization_schema, properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})ds = env.add_source(kafka_consumer)
阐明:
- Kafka connector 以后没有蕴含在 Flink 官网提供的发行包中,如果须要在PyFlink 作业中应用,用户须要显式地指定相应 FAT JAR [2],JAR 包能够通过如下形式指定:
# 留神:file:///前缀不能省略env.add_jars("file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar")
- 即便是 PyFlink DataStream API 作业,也举荐应用 Table & SQL connector 中打包进去的 FAT JAR,能够防止递归依赖的问题。
形式三:应用 PyFlink Table API 中定义的 connector
以下示例定义了如何将 Table & SQL 中反对的 connector 用于 PyFlink DataStream API 作业。
t_env = StreamTableEnvironment.create(stream_execution_environment=env)t_env.execute_sql(""" CREATE TABLE my_source ( a INT, b VARCHAR ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '10' ) """)ds = t_env.to_append_stream( t_env.from_path('my_source'), Types.ROW([Types.INT(), Types.STRING()]))
阐明:
- 因为以后 PyFlink DataStream API 中 built-in 反对的 connector 品种还比拟少,举荐通过这种形式来创立 PyFlink DataStream API 作业中应用的数据源表,这样的话,所有 PyFlink Table API 中能够应用的 connector,都能够在 PyFlink DataStream API 作业中应用。
- 须要留神的是,TableEnvironment 须要通过以下形式创立 StreamTableEnvironment.create(stream\_execution\_environment=env),以使得 PyFlink DataStream API 与 PyFlink Table API 共享同一个 StreamExecutionEnvironment 对象。
■ 4)定义计算逻辑
生成数据源对应的 DataStream 对象之后,接下来就能够应用 PyFlink DataStream API 中定义的各种操作,定义计算逻辑,对 DataStream 对象进行变换了,比方:
def split(s): splits = s[1].split("|") for sp in splits: yield s[0], spds = ds.map(lambda i: (i[0] + 1, i[1])) \ .flat_map(split) \ .key_by(lambda i: i[1]) \ .reduce(lambda i, j: (i[0] + j[0], i[1]))
■ 5)写出后果数据
形式一:print
能够调用 DataStream 对象上的 print 办法,将 DataStream 的后果打印到规范输入中,比方:
ds.print()
形式二:应用 PyFlink DataStream API 中定义的 connector
能够间接应用 PyFlink DataStream API 中曾经反对的 connector,须要留神的是,1.12 中提供了对于 FileSystem、JDBC、Kafka connector 的反对,以 Kafka 为例:
serialization_schema = JsonRowSerializationSchema.builder() \ .with_type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()kafka_producer = FlinkKafkaProducer( topic='test_sink_topic', serialization_schema=serialization_schema, producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})ds.add_sink(kafka_producer)
阐明:
- JDBC、Kafka connector 以后没有蕴含在 Flink 官网提供的发行包中,如果须要在 PyFlink 作业中应用,用户须要显式地指定相应 FAT JAR,比方 Kafka connector 能够应用 JAR 包 [2],JAR 包能够通过如下形式指定:
# 留神:file:///前缀不能省略env.add_jars("file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar")
- 举荐应用 Table & SQL connector 中打包进去的 FAT JAR,能够防止递归依赖的问题。
形式三:应用 PyFlink Table API 中定义的 connector
以下示例展现了如何将 Table & SQL 中反对的 connector,用作 PyFlink DataStream API 作业的 sink。
# 写法一:ds类型为Types.ROWdef split(s): splits = s[1].split("|") for sp in splits: yield Row(s[0], sp)ds = ds.map(lambda i: (i[0] + 1, i[1])) \ .flat_map(split, Types.ROW([Types.INT(), Types.STRING()])) \ .key_by(lambda i: i[1]) \ .reduce(lambda i, j: Row(i[0] + j[0], i[1]))# 写法二:ds类型为Types.TUPLEdef split(s): splits = s[1].split("|") for sp in splits: yield s[0], spds = ds.map(lambda i: (i[0] + 1, i[1])) \ .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \ .key_by(lambda i: i[1]) \ .reduce(lambda i, j: (i[0] + j[0], i[1]))# 将ds写出到sinkt_env.execute_sql(""" CREATE TABLE my_sink ( a INT, b VARCHAR ) WITH ( 'connector' = 'print' ) """)table = t_env.from_data_stream(ds)table_result = table.execute_insert("my_sink")
阐明:
- 须要留神的是,t\_env.from\_data\_stream(ds) 中的 ds 对象的 result type 类型必须是复合类型 Types.ROW 或者 Types.TUPLE,这也就是为什么须要显式申明作业计算逻辑中 flat\_map 操作的 result 类型
- 作业的提交,须要通过 PyFlink Table API 中提供的作业提交形式进行提交
- 因为以后 PyFlink DataStream API 中反对的 connector 品种还比拟少,举荐通过这种形式来定义 PyFlink DataStream API 作业中应用的数据源表,这样的话,所有 PyFlink Table API 中能够应用的 connector,都能够作为 PyFlink DataStream API 作业的 sink。
■ 7)总结
残缺的作业示例如下:
形式一(适宜调试):
from pyflink.common.typeinfo import Typesfrom pyflink.datastream import StreamExecutionEnvironmentdef data_stream_api_demo(): env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(4) ds = env.from_collection( collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')], type_info=Types.ROW([Types.INT(), Types.STRING()])) def split(s): splits = s[1].split("|") for sp in splits: yield s[0], sp ds = ds.map(lambda i: (i[0] + 1, i[1])) \ .flat_map(split) \ .key_by(lambda i: i[1]) \ .reduce(lambda i, j: (i[0] + j[0], i[1])) ds.print() env.execute()if __name__ == '__main__': data_stream_api_demo()
执行后果如下:
3> (2, 'aaa')3> (2, 'bb')3> (6, 'aaa')3> (4, 'a')3> (5, 'bb')3> (7, 'a')
形式二(适宜线上作业):
from pyflink.common.typeinfo import Typesfrom pyflink.datastream import StreamExecutionEnvironmentfrom pyflink.table import StreamTableEnvironmentdef data_stream_api_demo(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env) env.set_parallelism(4) t_env.execute_sql(""" CREATE TABLE my_source ( a INT, b VARCHAR ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '10' ) """) ds = t_env.to_append_stream( t_env.from_path('my_source'), Types.ROW([Types.INT(), Types.STRING()])) def split(s): splits = s[1].split("|") for sp in splits: yield s[0], sp ds = ds.map(lambda i: (i[0] + 1, i[1])) \ .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \ .key_by(lambda i: i[1]) \ .reduce(lambda i, j: (i[0] + j[0], i[1])) t_env.execute_sql(""" CREATE TABLE my_sink ( a INT, b VARCHAR ) WITH ( 'connector' = 'print' ) """) table = t_env.from_data_stream(ds) table_result = table.execute_insert("my_sink") # 1)期待作业执行完结,用于local执行,否则可能作业尚未执行完结,该脚本已退出,会导致minicluster过早退出 # 2)当作业通过detach模式往remote集群提交时,比方YARN/Standalone/K8s等,须要移除该办法 table_result.wait()if __name__ == '__main__': data_stream_api_demo()
作业提交
Flink 提供了多种作业部署形式,比方 local、standalone、YARN、K8s 等,PyFlink 也反对上述作业部署形式,请参考 Flink 官网文档 [3],理解更多详细信息。
local
阐明:应用该形式执行作业时,会启动一个 minicluster,作业会提交到minicluster 中执行,该形式适宜作业开发阶段。
示例:python3 table\_api\_demo.py
standalone
阐明:应用该形式执行作业时,作业会提交到一个远端的 standalone 集群。
示例:
./bin/flink run --jobmanager localhost:8081 --python table\_api\_demo.py
YARN Per-Job
阐明:应用该形式执行作业时,作业会提交到一个远端的 YARN 集群。
示例:
./bin/flink run --target yarn-per-job --python table\_api\_demo.py
K8s application mode
阐明:应用该形式执行作业时,作业会提交到 K8s 集群,以 application mode 的形式执行。
示例:
./bin/flink run-application \
--target kubernetes-application \--parallelism 8 \-Dkubernetes.cluster-id**=**<ClusterId> \-Dtaskmanager.memory.process.size**=**4096m \-Dkubernetes.taskmanager.cpu**=**2 \-Dtaskmanager.numberOfTaskSlots**=**4 \-Dkubernetes.container.image**=**<PyFlinkImageName> \
--pyModule table\_api\_demo \
--pyFiles file:///path/to/table_api_demo.py
参数阐明
除了下面提到的参数之外,通过 flink run 提交的时候,还有其它一些和 PyFlink 作业相干的参数。
参数名 | 用处形容 | 示例 |
---|---|---|
-py / --python | 指定作业的入口文件 | -py file:///path/to/table\_api\_demo.py |
-pym / --pyModule | 指定作业的 entry module,性能和--python相似,可用于当作业的 Python 文件为 zip 包,无奈通过--python 指定时,相比--python 来说,更通用 | -pym table\_api\_demo -pyfs file:///path/to/table\_api\_demo.py |
-pyfs / --pyFiles | 指定一个到多个 Python 文件(.py/.zip等,逗号宰割),这些 Python 文件在作业执行的时候,会放到 Python 过程的 PYTHONPATH 中,能够在 Python 自定义函数中拜访到 | -pyfs file:///path/to/table\_api\_demo.py,file:///path/to/deps.zip |
-pyarch / --pyArchives | 指定一个到多个存档文件(逗号宰割),这些存档文件,在作业执行的时候,会被解压之后,放到 Python 过程的 workspace 目录,能够通过相对路径的形式进行拜访 | -pyarch file:///path/to/venv.zip |
-pyexec / --pyExecutable | 指定作业执行的时候,Python 过程的门路 | -pyarch file:///path/to/venv.zip -pyexec venv.zip/venv/bin/python3 |
-pyreq / --pyRequirements | 指定 requirements 文件,requirements 文件中定义了作业的依赖 | -pyreq requirements.txt |
问题排查
当咱们刚刚上手 PyFlink 作业开发的时候,难免会遇到各种各样的问题,学会如何排查问题是十分重要的。接下来,咱们介绍一些常见的问题排查伎俩。
client 端异样输入
PyFlink 作业也遵循 Flink 作业的提交形式,作业首先会在 client 端编译成 JobGraph,而后提交到 Flink 集群执行。如果作业编译有问题,会导致在 client 端提交作业的时候就抛出异样,此时能够在 client 端看到相似这样的输入:
Traceback (most recent call last): File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 50, in <module> data_stream_api_demo() File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 45, in data_stream_api_demo table_result = table.execute_insert("my_") File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/table/table.py", line 864, in execute_insert return TableResult(self._j_table.executeInsert(table_path, overwrite)) File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/py4j/java_gateway.py", line 1285, in __call__ return_value = get_return_value( File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 162, in deco raise java_exceptionpyflink.util.exceptions.TableException: Sink `default_catalog`.`default_database`.`my_` does not exists at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:247) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:159) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:159) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:159) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676) at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)Process finished with exit code 1
比方上述报错阐明作业中应用的名字为"my\_"的表不存在。
TaskManager 日志文件
有些谬误直到作业运行的过程中才会产生,比方脏数据或者 Python 自定义函数的实现问题等,针对这种谬误,通常须要查看 TaskManager 的日志文件,比方以下谬误反映用户在 Python 自定义函数中拜访的 opencv 库不存在。
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last): File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute response = task() File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 479, in do_instruction return getattr(self, request_type)( File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 977, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 85, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process File "pyflink/fn_execution/coder_impl_fast.pyx", line 83, in pyflink.fn_execution.coder_impl_fast.DataStreamFlatMapCoderImpl.encode_to_stream File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 26, in split import cv2ModuleNotFoundError: No module named 'cv2' at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more
阐明:
- local 模式下,TaskManager 的 log 位于 PyFlink 的装置目录下:site-packages/pyflink/log/,也能够通过如下命令找到:
\>>> import pyflink
\>>> print(pyflink.\_\_path\_\_)
['/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink'],则log文件位于/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/log目录下
自定义日志
有时候,异样日志的内容并不足以帮忙咱们定位问题,此时能够思考在 Python 自定义函数中打印一些日志信息。PyFlink 反对用户在 Python 自定义函数中通过 logging 的形式输入 log,比方:
def split(s): import logging logging.info("s: " + str(s)) splits = s[1].split("|") for sp in splits: yield s[0], sp
通过上述形式,split 函数的输出参数,会打印到 TaskManager 的日志文件中。
近程调试
PyFlink 作业,在运行过程中,会启动一个独立的 Python 过程执行 Python 自定义函数,所以如果须要调试 Python 自定义函数,须要通过近程调试的形式进行,能够参见[4],理解如何在 Pycharm 中进行 Python 近程调试。
1)在 Python 环境中装置 pydevd-pycharm:
pip install pydevd-pycharm~=203.7717.65
2)在 Python 自定义函数中设置近程调试参数:
def split(s): import pydevd_pycharm pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True, stderrToServer=True) splits = s[1].split("|") for sp in splits: yield s[0], sp
3)依照 Pycharm 中近程调试的步骤,进行操作即可,能够参见[4],也能够参考博客[5]中“代码调试”局部的介绍。
阐明:Python 近程调试性能只在 Pycharm 的 professional 版才反对。
社区用户邮件列表
如果通过以上步骤之后,问题还未解决,也能够订阅 Flink 用户邮件列表 [6],将问题发送到 Flink 用户邮件列表。须要留神的是,将问题发送到邮件列表时,尽量将问题形容分明,最好有可复现的代码及数据,能够参考一下这个邮件[7]。
总结
在这篇文章中,咱们次要介绍了 PyFlink API 作业的环境筹备、作业开发、作业提交、问题排查等方面的信息,心愿能够帮忙用户应用 Python 语言疾速构建一个 Flink 作业,心愿对大家有所帮忙。接下来,咱们会持续推出 PyFlink 系列文章,帮忙 PyFlink 用户深刻理解 PyFlink 中各种性能、利用场景、最佳实际等。
为此咱们推出一个考察问卷,心愿大家积极参与这个问卷,帮忙咱们更好的去整顿 PyFlink 相干学习材料。填完问卷后即可参加抽奖,Flink 定制款 Polo 衫送送送!4月30日中午12:00准时开奖哦 ~
援用链接
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/
[2] https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka\_2.11/1.12.0/flink-sql-connector-kafka\_2.11-1.12.0.jar
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/cli.html#submitting-pyflink-jobs
[4] https://www.jetbrains.com/help/pycharm/remote-debugging-with-product.html#remote-debug-config
[5] https://mp.weixin.qq.com/s?\_\_biz=MzIzMDMwNTg3MA==&mid=2247485386&idx=1&sn=da24e5200d72e0627717494c22d0372e&chksm=e8b43eebdfc3b7fdbd10b49e6749cb761b7aa5f8ddc90b34eb3170119a8bbb3ddd7327acb712&scene=178&cur\_album\_id=1386152464113811456#rd
[6] https://flink.apache.org/community.html#mailing-lists
[7] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PyFlink-called-already-closed-and-NullPointerException-td42997.html
流动举荐:
仅需99元即可体验阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版!点击下方链接理解流动详情:https://www.aliyun.com/product/bigdata/sc?utm\_content=g\_1000250506
版权申明:本文内容由阿里云实名注册用户自发奉献,版权归原作者所有,阿里云开发者社区不领有其著作权,亦不承当相应法律责任。具体规定请查看《阿里云开发者社区用户服务协定》和《阿里云开发者社区知识产权爱护指引》。如果您发现本社区中有涉嫌剽窃的内容,填写侵权投诉表单进行举报,一经查实,本社区将立即删除涉嫌侵权内容。