乐趣区

关于python:PyFlink-教程PyFlink-DataStream-API-state-timer

简介: 介绍如何在 Python DataStream API 中应用 state & timer 性能。

一、背景

Flink 1.13 已于近期正式公布,超过 200 名贡献者参加了 Flink 1.13 的开发,提交了超过 1000 个 commits,实现了若干重要性能。其中,PyFlink 模块在该版本中也新增了若干重要性能,比方反对了 state、自定义 window、row-based operation 等。随着这些性能的引入,PyFlink 性能曾经日趋完善,用户能够应用 Python 语言实现绝大多数类型 Flink 作业的开发。接下来,咱们具体介绍如何在 Python DataStream API 中应用 state & timer 性能。

二、state 性能介绍

作为流计算引擎,state 是 Flink 中最外围的性能之一。

  • 在 1.12 中,Python DataStream API 尚不反对 state,用户应用 Python DataStream API 只能实现一些简略的、不须要应用 state 的利用;
  • 而在 1.13 中,Python DataStream API 反对了此项重要性能。

state 应用示例

如下是一个简略的示例,阐明如何在 Python DataStream API 作业中应用 state:

from pyflink.common import WatermarkStrategy, Row
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor


class MyMapFunction(MapFunction):

    def open(self, runtime_context: RuntimeContext):
        state_desc = ValueStateDescriptor('cnt', Types.LONG())
        # 定义 value state
        self.cnt_state = runtime_context.get_state(state_desc)

    def map(self, value):
        cnt = self.cnt_state.value()
        if cnt is None:
            cnt = 0

        new_cnt = cnt + 1
        self.cnt_state.update(new_cnt)
        return value[0], new_cnt


def state_access_demo():
    # 1. 创立 StreamExecutionEnvironment
    env = StreamExecutionEnvironment.get_execution_environment()

    # 2. 创立数据源
    seq_num_source = NumberSequenceSource(1, 100)
    ds = env.from_source(
        source=seq_num_source,
        watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
        source_name='seq_num_source',
        type_info=Types.LONG())

    # 3. 定义执行逻辑
    ds = ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \
           .key_by(lambda a: a[0]) \
           .map(MyMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.LONG()]))

    # 4. 将打印后果数据
    ds.print()

    # 5. 执行作业
    env.execute()


if __name__ == '__main__':
    state_access_demo()

在下面的例子中,咱们定义了一个 MapFunction,该 MapFunction 中定义了一个名字为“cnt\_state”的 ValueState,用于记录每一个 key 呈现的次数。

阐明:

  • 除了 ValueState 之外,Python DataStream API 还反对 ListState、MapState、ReducingState,以及 AggregatingState;
  • 定义 state 的 StateDescriptor 时,须要申明 state 中所存储的数据的类型(TypeInformation)。另外须要留神的是,以后 TypeInformation 字段并未被应用,默认应用 pickle 进行序列化,因而倡议将 TypeInformation 字段定义为 Types.PICKLED\_BYTE\_ARRAY() 类型,与理论所应用的序列化器相匹配。这样的话,当后续版本反对应用 TypeInformation 之后,能够放弃后向兼容性;
  • state 除了能够在 KeyedStream 的 map 操作中应用,还能够在其它操作中应用;除此之外,还能够在连贯流中应用 state,比方:
ds1 = ...  # type DataStream
ds2 = ...  # type DataStream
ds1.connect(ds2) \
    .key_by(key_selector1=lambda a: a[0], key_selector2=lambda a: a[0]) \
    .map(MyCoMapFunction())  # 能够在 MyCoMapFunction 中应用 state

能够应用 state 的 API 列表如下:

操作 自定义函数
KeyedStream map MapFunction
flat\_map FlatMapFunction
reduce ReduceFunction
filter FilterFunction
process KeyedProcessFunction
ConnectedStreams map CoMapFunction
flat\_map CoFlatMapFunction
process KeyedCoProcessFunction
WindowedStream apply WindowFunction
process ProcessWindowFunction

state 工作原理

上图是 PyFlink 中,state 工作原理的架构图。从图中咱们能够看出,Python 自定义函数运行在 Python worker 过程中,而 state backend 运行在 JVM 过程中(由 Java 算子来治理)。当 Python 自定义函数须要拜访 state 时,会通过近程调用的形式,拜访 state backend。

咱们晓得,近程调用的开销是十分大的,为了晋升 state 读写的性能,PyFlink 针对 state 读写做了以下几个方面的优化工作:

  • Lazy Read:

    对于蕴含多个 entry 的 state,比方 MapState,当遍历 state 时,state 数据并不会一次性全副读取到 Python worker 中,只有当真正须要拜访时,才从 state backend 读取。

  • Async Write:

    当更新 state 时,更新后的 state,会先存储在 LRU cache 中,并不会同步地更新到远端的 state backend,这样做能够防止每次 state 更新操作都拜访远端的 state backend;同时,针对同一个 key 的屡次更新操作,能够合并执行,尽量避免有效的 state 更新。

  • LRU cache:

    在 Python worker 过程中保护了 state 读写的 cache。当读取某个 key 时,会先查看其是否曾经被加载到读 cache 中;当更新某个 key 时,会先将其寄存到写 cache 中。针对频繁读写的 key,LRU cache 能够防止每次读写操作,都拜访远端的 state backend,对于有热点 key 的场景,能够极大晋升 state 读写性能。

  • Flush on Checkpoint:

    为了保障 checkpoint 语义的正确性,当 Java 算子须要执行 checkpoint 时,会将 Python worker 中的写 cache 都 flush 回 state backend。

其中 LRU cache 能够细分为二级,如下图所示:

阐明:

  • 二级 cache 为 global cache,二级 cache 中的读 cache 中存储着以后 Python worker 过程中所有缓存的原始 state 数据(未反序列化);二级 cache 中的写 cache 中存储着以后 Python worker 过程中所有创立的 state 对象。
  • 一级 cache 位于每一个 state 对象内,在 state 对象中缓存着该 state 对象曾经从远端的 state backend 读取的 state 数据以及待更新回远端的 state backend 的 state 数据。

工作流程:

  • 当在 Python UDF 中,创立一个 state 对象时,首先会查看以后 key 所对应的 state 对象是否曾经存在(在二级 cache 中的“Global Write Cache”中查找),如果存在,则返回对应的 state 对象;如果不存在,则创立新的 state 对象,并存入“Global Write Cache”;
  • state 读取:当在 Python UDF 中,读取 state 对象时,如果待读取的 state 数据曾经存在(一级 cache),比方对于 MapState,待读取的 map key/map value 曾经存在,则间接返回对应的 map key/map value;否则,拜访二级 cache,如果二级 cache 中也不存在待读取的 state 数据,则从远端的 state backend 读取;
  • state 写入:当在 Python UDF 中,更新 state 对象时,先写到 state 对象外部的写 cache 中(一级 cache);当 state 对象中待写回 state backend 的 state 数据的大小超过指定阈值或者当遇到 checkpoint 时,将待写回的 state 数据写回远端的 state backend。

state 性能调优

通过前一节的介绍,咱们晓得 PyFlink 应用了多种优化伎俩,用于晋升 state 读写的性能,这些优化行为能够通过以下参数配置:

配置 阐明
python.state.cache-size Python worker 中读 cache 以及写 cache 的大小。(二级 cache)须要留神的是:读 cache、写 cache 是独立的,以后不反对别离配置读 cache 以及写 cache 的大小。
python.map-state.iterate-response-batch-size 当遍历 MapState 时,每次从 state backend 读取并返回给 Python worker 的 entry 的最大个数。
python.map-state.read-cache-size 一个 MapState 的读 cache 中最大容许的 entry 个数(一级 cache)。当一个 MapState 中,读 cache 中的 entry 个数超过该阈值时,会通过 LRU 策略从读 cache 中删除最近起码拜访过的 entry。
python.map-state.write-cache-size 一个 MapState 的写 cache 中最大容许的待更新 entry 的个数(一级 cache)。当一个 MapState 中,写 cache 中待更新的 entry 的个数超过该阈值时,会将该 MapState 下所有待更新 state 数据写回远端的 state backend。

须要留神的是,state 读写的性能不仅取决于以上参数,还受其它因素的影响,比方:

  • 输出数据中 key 的散布:

    输出数据的 key 越扩散,读 cache 命中的概率越低,则性能越差。

  • Python UDF 中 state 读写次数:

    state 读写可能波及到读写远端的 state backend,应该尽量优化 Python UDF 的实现,缩小不必要的 state 读写。

  • checkpoint interval:

    为了保障 checkpoint 语义的正确性,当遇到 checkpoint 时,Python worker 会将所有缓存的待更新 state 数据,写回 state backend。如果配置的 checkpoint interval 过小,则可能并不能无效缩小 Python worker 写回 state backend 的数据量。

  • bundle size / bundle time:

    以后 Python 算子会将输出数据划分成多个批次,发送给 Python worker 执行。当一个批次的数据处理完之后,会强制将 Python worker 过程中的待更新 state 写回 state backend。与 checkpoint interval 相似,该行为也可能会影响 state 写性能。批次的大小能够通过 python.fn-execution.bundle.size 和 python.fn-execution.bundle.time 参数管制。

三、timer 性能介绍

timer 应用示例

除了 state 之外,用户还能够在 Python DataStream API 中应用定时器 timer。

import datetime

from pyflink.common import Row, WatermarkStrategy
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.table import StreamTableEnvironment


class CountWithTimeoutFunction(KeyedProcessFunction):

    def __init__(self):
        self.state = None

    def open(self, runtime_context: RuntimeContext):
        self.state = runtime_context.get_state(ValueStateDescriptor("my_state", Types.ROW([Types.STRING(), Types.LONG(), Types.LONG()])))

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
        # retrieve the current count
        current = self.state.value()
        if current is None:
            current = Row(value.f1, 0, 0)

        # update the state's count
        current[1] += 1

        # set the state's timestamp to the record's assigned event time timestamp
        current[2] = ctx.timestamp()

        # write the state back
        self.state.update(current)

        # schedule the next timer 60 seconds from the current event time
        ctx.timer_service().register_event_time_timer(current[2] + 60000)

    def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
        # get the state for the key that scheduled the timer
        result = self.state.value()

        # check if this is an outdated timer or the latest timer
        if timestamp == result[2] + 60000:
            # emit the state on timeout
            yield result[0], result[1]


class MyTimestampAssigner(TimestampAssigner):

    def __init__(self):
        self.epoch = datetime.datetime.utcfromtimestamp(0)

    def extract_timestamp(self, value, record_timestamp) -> int:
        return int((value[0] - self.epoch).total_seconds() * 1000)


if __name__ == '__main__':
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(stream_execution_environment=env)

    t_env.execute_sql("""
            CREATE TABLE my_source (a TIMESTAMP(3),
              b VARCHAR,
              c VARCHAR
            ) WITH (
              'connector' = 'datagen',
              'rows-per-second' = '10'
            )
        """)

    stream = t_env.to_append_stream(t_env.from_path('my_source'),
        Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()]))
    watermarked_stream = stream.assign_timestamps_and_watermarks(WatermarkStrategy.for_monotonous_timestamps()
                         .with_timestamp_assigner(MyTimestampAssigner()))

    # apply the process function onto a keyed stream
    watermarked_stream.key_by(lambda value: value[1])\
        .process(CountWithTimeoutFunction()) \
        .print()

    env.execute()

在上述示例中,咱们定义了一个 KeyedProcessFunction,该 KeyedProcessFunction 记录每一个 key 呈现的次数,当一个 key 超过 60 秒没有更新时,会将该 key 以及其呈现次数,发送到上游节点。

除了 event time timer 之外,用户还能够应用 processing time timer。

timer 工作原理

timer 的工作流程是这样的:

  • 与 state 拜访应用独自的通信信道不同,当用户注册 timer 之后,注册音讯通过数据通道发送到 Java 算子;
  • Java 算子收到 timer 注册音讯之后,首先查看待注册 timer 的触发工夫,如果曾经超过以后工夫,则间接触发;否则的话,将 timer 注册到 Java 算子的 timer service 中;
  • 当 timer 触发之后,触发音讯通过数据通道发送到 Python worker,Python worker 回调用户 Python UDF 中的的 on\_timer 办法。

须要留神的是: 因为 timer 注册音讯以及触发音讯通过数据通道异步地在 Java 算子以及 Python worker 之间传输,这会造成在某些场景下,timer 的触发可能没有那么及时。比方当用户注册了一个 processing time timer,当 timer 触发之后,触发音讯通过数据通道传输到 Python UDF 时,可能曾经是几秒中之后了。

四、总结

在这篇文章中,咱们次要介绍了如何在 Python DataStream API 作业中应用 state & timer,state & timer 的工作原理以及如何进行性能调优。接下来,咱们会持续推出 PyFlink 系列文章,帮忙 PyFlink 用户深刻理解 PyFlink 中各种性能、利用场景以及最佳实际等。

另外,阿里云实时计算生态团队长期招聘优良大数据人才(包含实习 + 社招),咱们的工作包含:

  • 实时机器学习:反对机器学习场景下实时特色工程和 AI 引擎配合,基于 Apache Flink 及其生态打造实时机器学习的规范,推动例如搜寻、举荐、广告、风控等场景的全面实时化;
  • 大数据 + AI 一体化:包含编程语言一体化 (PyFlink 相干工作),执行引擎集成化 (TF on Flink),工作流及治理一体化(Flink AI Flow)。

如果你对开源、大数据或者 AI 感兴趣,请发简历到:fudian.fd@alibaba-inc.com

此外,也欢送大家退出“PyFlink 交换群”,交换 PyFlink 相干的问题。

版权申明: 本文内容由阿里云实名注册用户自发奉献,版权归原作者所有,阿里云开发者社区不领有其著作权,亦不承当相应法律责任。具体规定请查看《阿里云开发者社区用户服务协定》和《阿里云开发者社区知识产权爱护指引》。如果您发现本社区中有涉嫌剽窃的内容,填写侵权投诉表单进行举报,一经查实,本社区将立即删除涉嫌侵权内容。

退出移动版