共计 8506 个字符,预计需要花费 22 分钟才能阅读完成。
一、背景
Flink 1.13 已于近期正式公布,超过 200 名贡献者参加了 Flink 1.13 的开发,提交了超过 1000 个 commits,实现了若干重要性能。其中,PyFlink 模块在该版本中也新增了若干重要性能,比方反对了 state、自定义 window、row-based operation 等。随着这些性能的引入,PyFlink 性能曾经日趋完善,用户能够应用 Python 语言实现绝大多数类型 Flink 作业的开发。接下来,咱们具体介绍如何在 Python DataStream API 中应用 state & timer 性能。
二、state 性能介绍
作为流计算引擎,state 是 Flink 中最外围的性能之一。
- 在 1.12 中,Python DataStream API 尚不反对 state,用户应用 Python DataStream API 只能实现一些简略的、不须要应用 state 的利用;
- 而在 1.13 中,Python DataStream API 反对了此项重要性能。
state 应用示例
如下是一个简略的示例,阐明如何在 Python DataStream API 作业中应用 state:
from pyflink.common import WatermarkStrategy, Row
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor
class MyMapFunction(MapFunction):
def open(self, runtime_context: RuntimeContext):
state_desc = ValueStateDescriptor('cnt', Types.LONG())
# 定义 value state
self.cnt_state = runtime_context.get_state(state_desc)
def map(self, value):
cnt = self.cnt_state.value()
if cnt is None:
cnt = 0
new_cnt = cnt + 1
self.cnt_state.update(new_cnt)
return value[0], new_cnt
def state_access_demo():
# 1. 创立 StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
# 2. 创立数据源
seq_num_source = NumberSequenceSource(1, 100)
ds = env.from_source(
source=seq_num_source,
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name='seq_num_source',
type_info=Types.LONG())
# 3. 定义执行逻辑
ds = ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \
.key_by(lambda a: a[0]) \
.map(MyMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.LONG()]))
# 4. 将打印后果数据
ds.print()
# 5. 执行作业
env.execute()
if __name__ == '__main__':
state_access_demo()
在下面的例子中,咱们定义了一个 MapFunction,该 MapFunction 中定义了一个名字为“cnt_state”的 ValueState,用于记录每一个 key 呈现的次数。
阐明:
- 除了 ValueState 之外,Python DataStream API 还反对 ListState、MapState、ReducingState,以及 AggregatingState;
- 定义 state 的 StateDescriptor 时,须要申明 state 中所存储的数据的类型(TypeInformation)。另外须要留神的是,以后 TypeInformation 字段并未被应用,默认应用 pickle 进行序列化,因而倡议将 TypeInformation 字段定义为 Types.PICKLED_BYTE_ARRAY() 类型,与理论所应用的序列化器相匹配。这样的话,当后续版本反对应用 TypeInformation 之后,能够放弃后向兼容性;
- state 除了能够在 KeyedStream 的 map 操作中应用,还能够在其它操作中应用;除此之外,还能够在连贯流中应用 state,比方:
ds1 = ... # type DataStream
ds2 = ... # type DataStream
ds1.connect(ds2) \
.key_by(key_selector1=lambda a: a[0], key_selector2=lambda a: a[0]) \
.map(MyCoMapFunction()) # 能够在 MyCoMapFunction 中应用 state
能够应用 state 的 API 列表如下:
操作 | 自定义函数 | |
---|---|---|
KeyedStream | map | MapFunction |
flat_map | FlatMapFunction | |
reduce | ReduceFunction | |
filter | FilterFunction | |
process | KeyedProcessFunction | |
ConnectedStreams | map | CoMapFunction |
flat_map | CoFlatMapFunction | |
process | KeyedCoProcessFunction | |
WindowedStream | apply | WindowFunction |
process | ProcessWindowFunction |
state 工作原理
上图是 PyFlink 中,state 工作原理的架构图。从图中咱们能够看出,Python 自定义函数运行在 Python worker 过程中,而 state backend 运行在 JVM 过程中(由 Java 算子来治理)。当 Python 自定义函数须要拜访 state 时,会通过近程调用的形式,拜访 state backend。
咱们晓得,近程调用的开销是十分大的,为了晋升 state 读写的性能,PyFlink 针对 state 读写做了以下几个方面的优化工作:
-
Lazy Read:
对于蕴含多个 entry 的 state,比方 MapState,当遍历 state 时,state 数据并不会一次性全副读取到 Python worker 中,只有当真正须要拜访时,才从 state backend 读取。
-
Async Write:
当更新 state 时,更新后的 state,会先存储在 LRU cache 中,并不会同步地更新到远端的 state backend,这样做能够防止每次 state 更新操作都拜访远端的 state backend;同时,针对同一个 key 的屡次更新操作,能够合并执行,尽量避免有效的 state 更新。
-
LRU cache:
在 Python worker 过程中保护了 state 读写的 cache。当读取某个 key 时,会先查看其是否曾经被加载到读 cache 中;当更新某个 key 时,会先将其寄存到写 cache 中。针对频繁读写的 key,LRU cache 能够防止每次读写操作,都拜访远端的 state backend,对于有热点 key 的场景,能够极大晋升 state 读写性能。
-
Flush on Checkpoint:
为了保障 checkpoint 语义的正确性,当 Java 算子须要执行 checkpoint 时,会将 Python worker 中的写 cache 都 flush 回 state backend。
其中 LRU cache 能够细分为二级,如下图所示:
阐明:
- 二级 cache 为 global cache,二级 cache 中的读 cache 中存储着以后 Python worker 过程中所有缓存的原始 state 数据(未反序列化);二级 cache 中的写 cache 中存储着以后 Python worker 过程中所有创立的 state 对象。
- 一级 cache 位于每一个 state 对象内,在 state 对象中缓存着该 state 对象曾经从远端的 state backend 读取的 state 数据以及待更新回远端的 state backend 的 state 数据。
工作流程:
- 当在 Python UDF 中,创立一个 state 对象时,首先会查看以后 key 所对应的 state 对象是否曾经存在(在二级 cache 中的“Global Write Cache”中查找),如果存在,则返回对应的 state 对象;如果不存在,则创立新的 state 对象,并存入“Global Write Cache”;
- state 读取:当在 Python UDF 中,读取 state 对象时,如果待读取的 state 数据曾经存在(一级 cache),比方对于 MapState,待读取的 map key/map value 曾经存在,则间接返回对应的 map key/map value;否则,拜访二级 cache,如果二级 cache 中也不存在待读取的 state 数据,则从远端的 state backend 读取;
- state 写入:当在 Python UDF 中,更新 state 对象时,先写到 state 对象外部的写 cache 中(一级 cache);当 state 对象中待写回 state backend 的 state 数据的大小超过指定阈值或者当遇到 checkpoint 时,将待写回的 state 数据写回远端的 state backend。
state 性能调优
通过前一节的介绍,咱们晓得 PyFlink 应用了多种优化伎俩,用于晋升 state 读写的性能,这些优化行为能够通过以下参数配置:
配置 | 阐明 |
---|---|
python.state.cache-size | Python worker 中读 cache 以及写 cache 的大小。(二级 cache)须要留神的是:读 cache、写 cache 是独立的,以后不反对别离配置读 cache 以及写 cache 的大小。 |
python.map-state.iterate-response-batch-size | 当遍历 MapState 时,每次从 state backend 读取并返回给 Python worker 的 entry 的最大个数。 |
python.map-state.read-cache-size | 一个 MapState 的读 cache 中最大容许的 entry 个数(一级 cache)。当一个 MapState 中,读 cache 中的 entry 个数超过该阈值时,会通过 LRU 策略从读 cache 中删除最近起码拜访过的 entry。 |
python.map-state.write-cache-size | 一个 MapState 的写 cache 中最大容许的待更新 entry 的个数(一级 cache)。当一个 MapState 中,写 cache 中待更新的 entry 的个数超过该阈值时,会将该 MapState 下所有待更新 state 数据写回远端的 state backend。 |
须要留神的是,state 读写的性能不仅取决于以上参数,还受其它因素的影响,比方:
-
输出数据中 key 的散布:
输出数据的 key 越扩散,读 cache 命中的概率越低,则性能越差。
-
Python UDF 中 state 读写次数:
state 读写可能波及到读写远端的 state backend,应该尽量优化 Python UDF 的实现,缩小不必要的 state 读写。
-
checkpoint interval:
为了保障 checkpoint 语义的正确性,当遇到 checkpoint 时,Python worker 会将所有缓存的待更新 state 数据,写回 state backend。如果配置的 checkpoint interval 过小,则可能并不能无效缩小 Python worker 写回 state backend 的数据量。
-
bundle size / bundle time:
以后 Python 算子会将输出数据划分成多个批次,发送给 Python worker 执行。当一个批次的数据处理完之后,会强制将 Python worker 过程中的待更新 state 写回 state backend。与 checkpoint interval 相似,该行为也可能会影响 state 写性能。批次的大小能够通过 python.fn-execution.bundle.size 和 python.fn-execution.bundle.time 参数管制。
三、timer 性能介绍
timer 应用示例
除了 state 之外,用户还能够在 Python DataStream API 中应用定时器 timer。
import datetime
from pyflink.common import Row, WatermarkStrategy
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.table import StreamTableEnvironment
class CountWithTimeoutFunction(KeyedProcessFunction):
def __init__(self):
self.state = None
def open(self, runtime_context: RuntimeContext):
self.state = runtime_context.get_state(ValueStateDescriptor("my_state", Types.ROW([Types.STRING(), Types.LONG(), Types.LONG()])))
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
# retrieve the current count
current = self.state.value()
if current is None:
current = Row(value.f1, 0, 0)
# update the state's count
current[1] += 1
# set the state's timestamp to the record's assigned event time timestamp
current[2] = ctx.timestamp()
# write the state back
self.state.update(current)
# schedule the next timer 60 seconds from the current event time
ctx.timer_service().register_event_time_timer(current[2] + 60000)
def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
# get the state for the key that scheduled the timer
result = self.state.value()
# check if this is an outdated timer or the latest timer
if timestamp == result[2] + 60000:
# emit the state on timeout
yield result[0], result[1]
class MyTimestampAssigner(TimestampAssigner):
def __init__(self):
self.epoch = datetime.datetime.utcfromtimestamp(0)
def extract_timestamp(self, value, record_timestamp) -> int:
return int((value[0] - self.epoch).total_seconds() * 1000)
if __name__ == '__main__':
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.execute_sql("""
CREATE TABLE my_source (a TIMESTAMP(3),
b VARCHAR,
c VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
)
""")
stream = t_env.to_append_stream(t_env.from_path('my_source'),
Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()]))
watermarked_stream = stream.assign_timestamps_and_watermarks(WatermarkStrategy.for_monotonous_timestamps()
.with_timestamp_assigner(MyTimestampAssigner()))
# apply the process function onto a keyed stream
watermarked_stream.key_by(lambda value: value[1])\
.process(CountWithTimeoutFunction()) \
.print()
env.execute()
在上述示例中,咱们定义了一个 KeyedProcessFunction,该 KeyedProcessFunction 记录每一个 key 呈现的次数,当一个 key 超过 60 秒没有更新时,会将该 key 以及其呈现次数,发送到上游节点。
除了 event time timer 之外,用户还能够应用 processing time timer。
timer 工作原理
timer 的工作流程是这样的:
- 与 state 拜访应用独自的通信信道不同,当用户注册 timer 之后,注册音讯通过数据通道发送到 Java 算子;
- Java 算子收到 timer 注册音讯之后,首先查看待注册 timer 的触发工夫,如果曾经超过以后工夫,则间接触发;否则的话,将 timer 注册到 Java 算子的 timer service 中;
- 当 timer 触发之后,触发音讯通过数据通道发送到 Python worker,Python worker 回调用户 Python UDF 中的的 on_timer 办法。
须要留神的是: 因为 timer 注册音讯以及触发音讯通过数据通道异步地在 Java 算子以及 Python worker 之间传输,这会造成在某些场景下,timer 的触发可能没有那么及时。比方当用户注册了一个 processing time timer,当 timer 触发之后,触发音讯通过数据通道传输到 Python UDF 时,可能曾经是几秒中之后了。
四、总结
在这篇文章中,咱们次要介绍了如何在 Python DataStream API 作业中应用 state & timer,state & timer 的工作原理以及如何进行性能调优。接下来,咱们会持续推出 PyFlink 系列文章,帮忙 PyFlink 用户深刻理解 PyFlink 中各种性能、利用场景以及最佳实际等。
另外,阿里云实时计算生态团队长期招聘优良大数据人才(包含实习 + 社招),咱们的工作包含:
- 实时机器学习:反对机器学习场景下实时特色工程和 AI 引擎配合,基于 Apache Flink 及其生态打造实时机器学习的规范,推动例如搜寻、举荐、广告、风控等场景的全面实时化;
- 大数据 + AI 一体化:包含编程语言一体化 (PyFlink 相干工作),执行引擎集成化 (TF on Flink),工作流及治理一体化(Flink AI Flow)。