乐趣区

关于flink:字节跳动-Flink-状态查询实践与优化

摘要:本文整顿自字节跳动基础架构工程师,Apache Flink Contributor 马越在 Flink Forward Asia 2021 平台建设专场的演讲。次要内容包含:

  1. 背景
  2. State Processor API 介绍
  3. StateMeta Snapshot 机制
  4. State as Database
  5. 应用 Flink Batch SQL 查问工作状态
  6. 将来瞻望

点击查看直播回放 & 演讲 PDF

一、背景

家喻户晓,Flink 中的 State 保留了算子计算过程的两头后果。当工作出现异常时,能够通过查问工作快照中的 State 获取无效线索。

但目前对于 Flink SQL 工作来说,当咱们想要查问作业 State 时,通常会因为无奈获知 State 的定义形式和具体类型等信息,而导致查问 State 的老本过高。

为了解决这个问题,字节跳动流式计算团队在外部提出了 State Query on Flink SQL 的解决方案——用户通过写 SQL 的形式就能够简略地查问 State。本文将次要介绍字节跳动在 Flink 状态查问这方面所进行的相干工作。

二、State Processor API 介绍

提到状态查问,咱们天然会联想到 Flink 在 1.9 版本提出的个性 — State Processor API。应用 State Processor API,咱们能够将作业产生的 Savepoint 转换成 DataSet,而后应用 DataSet API 实现对 State 的查问、批改和初始化等操作。

上面简略介绍一下 如何应用 State Processor API 来实现 State 的查问

  • 首先创立 ExistingSavepoint 用来示意一个 Savepoint。初始化 ExistingSavepoint 时须要提供 Savepoint 门路和 StateBackend 等信息;
  • 而后实现 ReaderFunction 用于从新注册所须要查问的 State 以及定义解决 State 的形式。查问状态的过程中会遍历所有的 Key 并依照咱们定义的形式去操作 State;
  • 最初,调用 Savepoint.readKeyedState 并传入算子的 uid 和 ReaderFunction,就能够实现 State 的查问。

接下来为大家简述一下 State 查问背地的原理

在 Savepoint 目录中蕴含两种文件,一种是状态数据文件,比方上图中的 opA-1-state,这个文件外面保留着算子 A 在第一个 SubTask 状态的明细数据;还有一种元数据文件,对应上图中的 _metadata,元数据文件中保留了每个算子和状态文件的映射关系。

当咱们在进行状态查问的时候。首先在 Client 端会依据 Savepoint 门路去解析 metadata 文件。通过算子 ID,能够获取须要查问的状态所对应的文件的句柄。当状态查问真正执行时,负责读取状态的 Task 会创立一个新的 StateBackend,而后将状态文件中的数据恢复到 Statebackend 中。等到状态复原实现之后就会遍历全副的 Key 并把对应的状态交给 ReaderFunction 解决。

有些同学可能会问,既然社区曾经提供了查问 State 的性能,咱们为什么还要去做同样的工作呢?次要是因为 咱们在应用 State Processor API 的过程中发现一些问题

  1. 每次查问 State 咱们都须要独立开发一个 Flink Batch 工作,对用户来说具备肯定的开发成本;
  1. 实现 ReaderFunction 的时候须要比拟清晰地理解工作状态的定义形式,包含 State 的名称、类型以及 State Descriptor 等信息,对用户来说应用门槛高较高;
  1. 应用 State Processor API 时,只能查问单个算子状态,无奈同时查问多个算子的状态;
  1. 无奈间接查问工作状态的元信息,比方查问工作应用了哪些状态,或者查问某个状态的类型。

总体来说,咱们的指标有两个,一是 升高用户的应用老本 ;二是 加强状态查问的性能。咱们心愿用户在查问 State 时能用最简略的形式;同时也不须要晓得任何信息。

此外,咱们还心愿用户能同时查问多个算子的 State,也能够间接查问作业应用了哪些 State,每个 State 的类型是什么。

因而,咱们提出了 State Query on Flink SQL 的解决方案 。简略来说是 把 State 当成数据库一样,让用户通过写 SQL 的形式就能够很简略地查问 State。

在这个计划中,咱们须要解决两个问题

  • 如何对用户屏蔽 State 的信息:参考 State Processor API 咱们能够晓得,查问 State 须要提供十分多的信息,比方 Savepoint 门路、StateBacked 类型、算子 id、State Descriptor 等等。通过 SQL 语句显然难以残缺地表述这些简单的信息,那么查问状态到底须要哪些内容,咱们又如何对用户屏蔽 State 里简单的细节呢?这是咱们面对的第一个难点。
  • 如何用 SQL 表白 State:State 在 Flink 中的存储形式并不像 Database 一样,咱们如何去用 SQL 来表白状态的查问过程呢?这是咱们要解决的另一个难点。

三、StateMeta Snapshot 机制

首先咱们来答复第一个问题,查问一个 State 须要哪些信息呢

能够参考上文中 State Processor API 的示例,当咱们创立 ExistingSavepoint 和 ReaderFunction 的时候,咱们须要提供的信息有 Savepoint 门路、Backend 类型、OperatorID、算子 key 的类型、State 名称以及 Serializer 等等,咱们能够将这些对立称为状态的元信息。

对于 Flink SQL 工作来说,要分明地理解这些信息,对用户来说门槛是十分高的。咱们的想法是让用户只须要提供最简略的信息,即 Savepoint ID,而后由 Flink 框架把其余的元信息都存在 Savepoint 中,这样就能够对用户屏蔽 State 那些简单的细节,实现状态的查问。因而,咱们引入了 StateMeta Snapshot 机制。

StateMeta Snapshot 简略来说就是把状态的元信息增加到 Savepoint Metadata 的过程,具体步骤如下:

  1. 首先在 State 注册的时候,Task 会把 operatorName\ID\KeySerializer\StateDescriptors 等元信息都保留在 Task 的内存中;
  1. 触发 Savepoint 时,Task 会在制作快照的同时,对状态的元信息也同样进行快照。快照实现之后将状态的元信息 (StateMeta) 和状态文件的句柄 (StateHandle) 一起上报给 JobManager;
  1. JobManager 在收到所有 Task 上报的 StateMeta 信息之后,将这些状态元信息进行合并,最初会把合并之后的状态元信息保留到 Savepoint 目录里名为 stateInfo 的文件中。

之后在状态查问时就只需解析 Savepoint 中的 stateInfo 文件,而不再须要用户通过代码去输出这些 State 的元信息。通过这样的形式能够很大水平地升高用户查问状态的老本。

四、State as Database

接下来咱们来答复第二个问题,咱们如何用 SQL 来表白 State。其实社区在设计 State Processor API 的时候就提出了一些解决思路,也就是 State As Database。

在传统的数据库中,通常用 Catalog、Database、Table 这个三个元素来示意一个 Table,其实咱们也能够将用样的逻辑到映射到 Flink State 上。咱们能够把 Flink 的 State 当作一种非凡的数据源,作业每次产生的 Savepoint 都当作一个独立 DB。在这个 DB 中,咱们将 State 元信息、State 的明细数据,都形象成不同的 Table 裸露给用户,用户间接查问这些 Table 就能够获取工作的状态信息。

首先咱们来看如何把 State 示意为 Table。咱们都晓得 在 Flink 中,罕用的 State 有两种类型,别离是 KeyedState 和 OperatorState

  • 对于 OperatorState 来说,它只有 Value 这一个属性,用来示意这个 State 具体的值。因而咱们能够把 OperatorState 示意为只蕴含一个 Value 字段的表构造。
  • 对于 KeyedState 来说,每个 State 在不同的 Key 和 Namespace 下的值可能都不一样,因而咱们能够将 KeyedState 示意为一个蕴含 Key、Namespace、Value 这三个字段的表构造。

当咱们形象出了单个 State 之后,想要示意多个 State 就比拟容易了。能够看到在上图的例子中,这个算子蕴含 3 个 State,别离是两个 KeyedState 和一个 OperatorState,咱们只须要将这些 Table 简略的 union 起来,再通过 state_name 字段去辨别不同的 State,就能够示意这个算子中所有的 State。

最初还有一个问题,咱们 如何晓得一个工作到底用了哪些 State 或者这些 State 的具体类型呢

为了解决这个问题,咱们定义了一种非凡表 — StateMeta,用来示意一个 Flink 工作中所有 State 的元信息。StateMeta 中蕴含一个工作中每个 State 的名称、State 所在的算子 ID、算子名称、Key 的类型和 Value 的类型等等,这样用户间接查问 StateMeta 这个表就能获取工作中所有状态的元信息。

五、应用 Flink Batch SQL 查问工作状态

以上就是状态查问计划的整体介绍。那咱们到底 如何去查问一个 State 呢,咱们 以一个 Word Count 工作为例来阐明

首先,咱们须要创立一个 Flink SQL 工作并启动。通过 web-ui 能够看到这个工作中蕴含三个算子,别离是 Source,Aggregate 还有 Sink。而后,咱们能够触发 Savepoint,当 Savepoint 制作胜利之后获取对应的 SavepointID。咱们能够通过 SavepointID 去实现作业状态的查问。

如果咱们当初对 Flink SQL 工作中状态的应用无所不知,那么首先咱们须要查问的就是这个 Flink 工作中蕴含哪些 State 以及这些 State 的类型。咱们能够从 StateMeta 表获取这些信息。如上图中场景一所示,通过查问 StateMeta 表,能够看到这个工作蕴含一个 ListState 和一个 ValueState,别离存在于 Source 算子和 Aggregate 算子中。

此外,有些对 Flink 比拟理解的同学晓得,KafkaSource 中的 State 是用于记录以后生产的 Offset 信息。如场景二所示,咱们能够通过查问 Source 算子的状态,获取到工作中生产 Kafka Topic 的 Partition 和 Offset 信息。

还有一种比拟常见的场景,比方上游的业务同学发现某个 key(比方 key_662)的后果异样。咱们在定位问题的时候能够间接去查问作业中 aggregate 算子中的状态,同时去指定 key 等于 key_662 作为查问条件。如上图场景三所示,通过查问的后果能够看到,当 key 为 662 时对应的聚合后果是 11290。用户应用这样的形式就能够比拟不便地验证状态是否正确。

六、将来瞻望

将来,咱们打算 进一步丰盛 State 的性能,目前咱们反对了应用 SQL 查问 State 的性能,其实社区还提供了 State 批改和初始化的能力。在一些场景下,这些能力也比拟重要。比方,咱们已知状态中的局部 key 计算错误,心愿将状态中这部分的数据进行修改;或者工作逻辑产生变更当前和之前的状态不能齐全兼容,这个时候咱们心愿能够通过状态批改和初始化的能力去生成一个新的 Savepoint。同样,在应用形式上咱们也心愿用户能间接应用 SQL 中 insert 和 update 语法来实现状态的批改和初始化操作。

其次,咱们会 进一步增强 State 的可用性。咱们应用 DAG 编辑的计划解决了作业拓扑发生变化时产生的状态不兼容问题,然而当 Flink SQL 工作批改字段时 State Serializer 可能会变动,同样导致状态无奈兼容。针对这种状况咱们设计了残缺的 Flink SQL State Schema Evolution 计划,能够极大的加强 Flink SQL 工作发生变化之后状态的恢复能力,目前计划正在落地中。此外,咱们还提供了欠缺的状态复原事先查看能力,可能做到在工作上线之前就查看出状态是否兼容并告知用户,防止状态不兼容引起的作业启动失败对线上造成影响。

点击查看直播回放 & 演讲 PDF


2022 第四届 实时计算 FLINK 挑战赛

49 万奖金等你来拿!

连续“激励师打算”,赢取丰富礼品!

点击进入赛事官网理解报名参赛

更多 Flink 相干技术问题,可扫码退出社区钉钉交换群
第一工夫获取最新技术文章和社区动静,请关注公众号~

流动举荐

阿里云基于 Apache Flink 构建的企业级产品 - 实时计算 Flink 版现开启流动:
99 元试用 实时计算 Flink 版(包年包月、10CU)即有机会取得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
理解流动详情:https://www.aliyun.com/produc…

退出移动版