关于云计算:Flink处理函数实战之一深入了解ProcessFunction的状态Flink110

3次阅读

共计 3809 个字符,预计需要花费 10 分钟才能阅读完成。

欢送拜访我的 GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,波及 Java、Docker、Kubernetes、DevOPS 等;

欢送拜访我的 GitHub

这里分类和汇总了欣宸的全副原创(含配套源码):https://github.com/zq2599/blog_demos

Flink 处理函数实战系列链接

  1. 深刻理解 ProcessFunction 的状态操作(Flink-1.10);
  2. ProcessFunction;
  3. KeyedProcessFunction 类;
  4. ProcessAllWindowFunction(窗口解决);
  5. CoProcessFunction(双流解决);

对于 ProcessFunction 状态的纳闷

学习 Flink 的 ProcessFunction 过程中,官网文档中波及状态解决的时候,不止一次提到只实用于 keyed stream 的元素,如下图红框所示:

之前写过一些 flink 利用,keyed stream 罕用但不是必须用的,所以产生了疑难:

  1. 为何只有 <font color=”blue”>keyed stream</font> 的元素能读写状态?
  2. 每个 key 对应的状态是如何操作的?

Flink 的 ” 状态 ”

先去回顾 Flink” 状态 ” 的知识点:

  1. 官网文档说就两种状态:keyed state 和 operator state:

  1. 如上图,keyed stream 的元素是具备 key 的特色,与 ProcessFunction 的操作状态时要求匹配,其余 steam 的元素因为没有 key 的特色,所以也就没有 <font color=”blue”> 状态 </font> 一说了;
  2. 另一种状态是 <font color=”blue”>Operator State</font>,如下图,这是和多并行度计算时的算子实例绑定的,例如以后算子生产 kafka 的某个分区的最新 offset,而 ProcessFunction 是用来解决 stream 元素的,不会波及到 Operator State:

官网 demo

为了学习 ProcessFunction 就去看官网 demo,地址是:https://ci.apache.org/project…,简略说说这个 demo 的性能:

  1. 数据源在不间断的产生单词,每个单词对应一个 Tuple2<String,String> 的实例;
  2. 数据源被 <font color=”blue”>keyBy</font> 办法转成 KeyedStream,key 是 Tuple2 实例的 f0 字段;
  3. 一个 KeyedProcessFunction 的子类 <font color=”blue”>CountWithTimeoutFunction</font>,被用来解决 KeyedStream 的每个元素,解决的逻辑:为每个 key 保护一个状态,状态的内容是这个 key 的呈现次数和最初一次呈现工夫;
  4. 如果那个 key 间断一分钟没有呈现,KeyedProcessFunction 就向上游发送这个元素;

以上就是官网 demo 的性能,原本是想通过 demo 来加深意识,后果看完岂但没有明确,反而更晕了,下图是我对 demo 代码的纳闷:

从上图可见我的纳闷,这里再复述一下:

  1. 入参 value 是 Tuple2 类型,假如其 f0 字段等于 aaa,那么 processElement 办法的作用,就是取出 aaa 的状态,更新后保留;
  2. 从代码上看,state.value()返回了 aaa 的状态,这个 value 办法并没有将 aaa 作为入参,那怎么做到返回 aaa 的状态呢?如果下一个入参 value 的 f0 字段等于 bbb 了,这个 state.value()能返回 bbb 的状态吗?
  3. 对更新状态的代码 state.update(current)也是同样的纳闷;
  4. 而后又产生了新的纳闷:成员变量 state 难道是始终在变?每执行一次 processElement,都会变成该 key 对应的 state 实例?

先反思为何会有上述纳闷

  1. 上述纳闷产生的起因,应该是受到平时应用 HashMap 的影响,HashMap 获取值就是在调用 get 办法时指定 key,设置值也是在 put 时指定 key,所以看到 state.value()办法没有用 key 做入参就不习惯了
  2. 要打消这种不适应,要做的第一件事就是揭示本人:processElement 是在框架内运行的,很多数据在之前曾经由框架筹备好了;
  3. 接下来要做的,就是把 <font color=”blue”> 框架筹备数据 </font> 的逻辑看一遍,除了弄明确本人的问题,因为 ProcessFunction 属于最低阶形象(如下图的最下方地位),看懂了这些,其实也是在理解 DataStream/DataSet API 的设计思路:

跟踪源码

  1. 如下图,让咱们从一个断点的堆栈开始吧,这是在执行下面 demo 中 <font color=”blue”> 的 processElement</font> 办法之前的一个断点,可见本源是个线程的 run 办法,也就是 KeyedProcessFunction 对应的算子执行工作的线程:

  1. 下面的堆栈不用每一层都细看,只关注重要的局部,下图这段很重要:StreamTask.run 办法中,有个有限循环(猜想是每次执行 processInput 办法都解决 KeyedStream 的一个元素):

  1. 如下图,StreamOneInputProcessor.processInput 办法取出 KeyedStream 的一个元素,调用 processElement 办法,并将此元素作为入参,再联合上一幅图能够看出:在编写 <font color=”red”>KeyedProcessFunction 子类的时候,KeyedStream 的每个元素都会作为入参,在调用你重写的 processElement 办法时传进去;</font> 这一点,在做 ProcessFunction 和 KeyedProcessFunction 开发时都是要分外留神的:

  1. 接下来到了最要害的中央了,下图红框中的 streamOperator.setKeyContextElement1(record)会解答我后面的纳闷,肯定要进去看个分明,(前面的黄线上的代码,您应该猜到了,外面其实就是调用 demo 中的 processElement 办法)

  1. 下图中,AbstractStreamOperator.setKeyContextElement 给出了答案:<font color=”blue”> 对于 KeyedStream 的每个元素,都会在这里算出 key,再调用 setCurrentKey 保留这个 key</font>:

  1. 开展 <font color=”blue”>setCurrentKey</font>,如下图,发现 key 的保留和以后状态的存储策略 (StateBackend) 无关,我这里是默认策略 <font color=”blue”>HeapKeyedStateBackend</font>:

  1. 最终,依据以后元素失去的 key 会在 StateBackend 的 keyContext 对象中找中央保留,StateBackend 的具体实现和 Flink 设置无关,我这里是保留到了 InternalKeyContextImpl 实例的 currentKey 变量中:

  1. 代码读到这里,对我后面的纳闷,您应该能揣测出答案了:state.value()外面会通过 StateBackend 的 keyContext 取出方才保留的 key,接下来就能像 HashMap 那样依据 key 查出该 key 的状态了,接下来是欢快的印证咱们揣测的过程;
  2. 在 <font color=”blue”>state.value()</font> 代码地位打断点一次看个明确,如下图,果然,state 外面有 StateBackend 的 keyContext 对象的援用,拜访方才保留的 key 就不成问题了:

  1. 开展 state.value()办法如下,简单明了,间接拿 keyContext 保留的 key 作为入参去取对应的状态:

  1. 再开展下面的 get 办法,可见最终是从 stateMap 中获得的,而这个 stateMap 的具体实现是 CopyOnWriteStateMap 类型的实例:

  1. 代码读到这里,只剩最初一处须要印证了:更新状态的 state.update(current)办法,应该也是以 StateBackend 的 keyContext 中的 key 作为本人的 key,再将入参的 current 作为 value,更新到 stateMap 中,来吧,一起印证这个揣测;
  2. 开展办法,看到的是 stateTable.put 办法(后面刚看过 stateTable 的 get 办法,稳了):

  1. stateTable.put 办法外面和后面的 get 办法一样,间接拿 keyContext 保留的 key 作为本人的 key:

  1. 最终是调用了 stateMap.put 办法,将数据保留在 CopyOnWriteStateMap 实例中:

  1. 得益于 Flink 代码本身标准、清晰的设计和实现,再加上 IDEA 弱小的 debug 性能,整个浏览和剖析过程非常顺利,这其中的播种会逐步在今后深刻学习 DataStreamAPI 的过程中奏效;

最初,依据下面的剖析过程绘制了一幅简陋的流程图,心愿能帮忙您放慢了解:

欢送关注公众号:程序员欣宸

微信搜寻「程序员欣宸」,我是欣宸,期待与您一起畅游 Java 世界 …
https://github.com/zq2599/blog_demos

正文完
 0