乐趣区

关于flink:Flink中基于State的有状态计算开发方法

作者:王东阳

前言
状态在 Flink 中叫作 State,用来保留两头计算结果或者缓存数据。依据是否须要保留两头后果,分为无状态计算和有状态计算。对于流计算而言,事件继续一直地产生,如果每次计算都是互相独立的,不依赖于上下游的事件,则是无状态计算。如果计算须要依赖于之前或者后续的事件,则是有状态计算。

在 Flink 中依据数据集是否依据 Key 进行分区,将状态分为 Keyed State 和 Operator State(Non-keyed State)两种类型,本文次要介绍 Keyed State,后续文章会介绍 Operator State。

Keyed State 示意和 key 相干的一种 State,只能用于 KeydStream 类型数据集对应的 Functions 和 Operators 之上。Keyed State 是 Operator State 的特例,区别在于 Keyed State 当时依照 key 对数据集进行了分区,每个 Key State 仅对应一个 Operator 和 Key 的组合。Keyed State 能够通过 Key Groups 进行治理,次要用于当算子并行度发生变化时,主动从新散布 Keyed State 数据。在零碎运行过程中,一个 Keyed 算子实例可能运行一个或者多个 Key Groups 的 keys。

依照数据结构的不同,Flink 中定义了多种 Keyed State,具体如下:

ValueState<T> 即类型为 T 的单值状态。这个状态与对应的 Key 绑定,是最简略的状态。
ListState<T> 即 Key 上的状态值为一个列表。
MapState<UK,UV> 定义与 Key 对应键值对的状态,用于保护具备 key-value 构造类型的状态数据。
ReducingState<T> 这种 State 通过用户传入的 ReduceFucntion,每次调用 add(T)办法增加元素时,会调用 ReduceFucntion,最初合并到一个繁多的状态值。
AggregatingState<IN,OUT> 聚合 State 和 ReducingState<T> 十分相似,不同的是,这里聚合的类型能够是不同的元素类型,应用 add(IN)来退出元素,并应用 AggregateFunction 函数计算聚合后果。
State 开发实战
本章节将通过理论的我的项目代码演示不同数据结构在不同计算场景下的开发方法。

本文中我的项目 pom 文件
ValueState
ValueState<T> 即类型为 T 的单值状态。这个状态与对应的 Key 绑定,是最简略的状态。能够通过 update(T) 办法更新状态值,通过 T value()办法获取状态值。

因为 ValueState<T> 是与 Key 对应单个值的状态,利用场景能够是例如统计 user_id 对应的交易次数,每次用户交易都会在 count 状态值上进行更新。

接下来咱们将通过一个具体的实例代码展现如何利用 ValueState 去统计各个用户的交易次数。

ValueStateCountUser 实现
通过定义 ValueState<Integer> 类型的 countState 存储用户曾经拜访的次数,并在每次收到新的元素时,对 countState 中的数据加 1 更新,同时把以后用户名和曾经拜访的总次数返回。在本样例代码中将会通过 RichMapFunction 来对流元素进行解决。

在其中的 open(Configuration parameters) 办法中对 countState 进行初始化。Flink 提供了 RuntimeContext 用于获取状态数据,同时 RuntimeContext 提供了罕用的 Managed Keyd State 的获取形式,能够通过创立相应的 StateDescriptor 并调用 RuntimeContext 办法来获取状态数据。例如获取 ValueState 能够调用 ValueState[T] getState(ValueStateDescriptor[T])办法。
在 Tuple2<String, Integer> map(Tuple2<String, String> s)中,通过 countState.value() 获取用户之前的拜访次数,而后对 countState 中的数据加 1 更新,同时把以后用户名和曾经拜访的总次数返回。
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;

public class ValueStateCountUser extends RichMapFunction<Tuple2<String, String>, Tuple2<String, Integer>> {
private ValueState<Integer> countState;

public ValueStateCountUser() {}

@Override
public void open(Configuration parameters) throws Exception {

// super.open(parameters);
countState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("count", Integer.class));

}

@Override
public Tuple2<String, Integer> map(Tuple2<String, String> s) throws Exception {

Integer count = countState.value();
if (count == null) {countState.update(1);
  return Tuple2.of(s.f0, 1);
}

countState.update(count + 1);
return Tuple2.of(s.f0, count + 1);

}

@Override
public void close() throws Exception {

// super.close();
System.out.println(String.format("finally %d", countState.value()));

}
}
代码地址:ValueStateCountUser.java

验证代码
编写验证代码如下

private static void valueStateUserCount() throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, String>> inputStream = env.fromElements(new Tuple2<>("zhangsan","aa"),
    new Tuple2<>("lisi","aa"),
    new Tuple2<>("zhangsan","aa"),
    new Tuple2<>("lisi","aa"),
    new Tuple2<>("wangwu","aa")
);

inputStream.keyBy(new KeySelector<Tuple2<String, String>, String>() {
      @Override
      public String getKey(Tuple2<String, String> integerLongTuple2) throws Exception {return integerLongTuple2.f0;}
    })
    .map(new ValueStateCountUser())
    .print();

env.execute("StateCompute");

}
代码地址:valueStateUserCount

程序运行失去输入如下

(zhangsan,1)
(lisi,1)
(zhangsan,2)
(lisi,2)
(wangwu,1)
finally 1
能够看到对于流数据中的每一个元素,依据用户名进行统计,实时显示该用户曾经拜访了多少次。

ListState
ListState<T> 即 Key 上的状态值为一个列表。能够通过 add(T)办法或者 addAll(List[T])往列表中附加值;也能够通过 Iterable get()办法返回一个 Iterable<T> 来遍历状态值;应用 update(List[T])来更新元素。

因为 ListState 保留的是与 Key 对应元素列表的状态,状态中寄存元素的 List 列表,利用场景能够是例如定义 ListState 存储用户常常拜访的 IP 地址。

接下来咱们将通过一个具体的实例代码展现如何利用 ListState 去统计各个用户拜访过的 IP 地址列表。为了更宽泛的展现 State 的利用场景,本样例代码中将会基于 KeyedProcessFunction 来解决流元素。

ListStateUserIP 实现
申明 ListState<String> ipState 用于记录用户拜访过的 ip 地址
在 open(Configuration parameters)中利用 getRuntimeContext().getListState 对 ipState 进行初始化,getListState 须要传入 ListStateDescriptor 类型的参数。
在 processElement 中,将以后记录中的 IP 地址查究到 ipState 中,而后从 ipState 获取曾经拜访过的所有 IP 地址,和用户名一起放入到 Collector
import com.google.common.collect.Lists;
import java.util.List;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class ListStateUserIpKeyedProcessFunction
extends KeyedProcessFunction<String, Tuple2<String,String>, Tuple2<String, List<String>>>{
private ListState<String> ipState;

@Override
public void open(Configuration parameters) throws Exception {

// super.open(parameters);
ipState = getRuntimeContext().getListState(new ListStateDescriptor<String>("ipstate", String.class));

}

@Override
public void processElement(

  Tuple2<String, String> value,
  KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, List<String>>>.Context context,
  Collector<Tuple2<String, List<String>>> collector) throws Exception {ipState.add(value.f1);
List<String> ips = Lists.newArrayList(ipState.get());
collector.collect(Tuple2.of(value.f0, ips));

}
}
代码地址:ListStateUserIpKeyedProcessFunction.java

验证代码
编写验证程序如下

private static void listStateUserIP() throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, String>> inputStream = env.fromElements(new Tuple2<>("zhangsan","192.168.0.1"),
    new Tuple2<>("lisi","192.168.0.1"),
    new Tuple2<>("zhangsan","192.168.0.2"),
    new Tuple2<>("lisi","192.168.0.3"),
    new Tuple2<>("wangwu","192.168.0.1")
);

inputStream.keyBy(new KeySelector<Tuple2<String, String>, String>() {
      @Override
      public String getKey(Tuple2<String, String> integerLongTuple2) throws Exception {return integerLongTuple2.f0;}
    })
    .process(new ListStateUserIpKeyedProcessFunction())
    .print();

env.execute("listStateUserIP");

}
}
代码地址:listStateUserIP

执行后果打印如下

(zhangsan,[192.168.0.1])
(lisi,[192.168.0.1])
(zhangsan,[192.168.0.1, 192.168.0.2])
(lisi,[192.168.0.1, 192.168.0.3])
(wangwu,[192.168.0.1])
达到预期成果

MapState
MapState<UK,UV> 定义与 Key 对应键值对的状态,用于保护具备 key-value 构造类型的状态数据。MapState<UK,UV> 使 用 Map 存 储 Key-Value 对,MapState 的办法和 Java 的 Map 的办法极为类似,所以上手绝对容易。罕用的有如下:

get()办法获取值
put(),putAll() 办法更新值
remove() 删除某个 key
contains()判断是否存在某个 key
isEmpty() 判断是否为空
和 HashMap 接口类似,MapState 也能够通过 entries()、keys()、values()获取对应的 keys 或 values 的汇合。

接下来咱们通过一个具体的实例介绍如何通过 ReducingState<T> 计算每个人的以后最高和最低体温。

实现 KeyedProcessFunction
定义 MapState<String, Integer> minMaxState 用于记录每个用户的最低高温和最高体温。
在 open 办法中通过 getRuntimeContext().getMapState 初始化 minMaxState,getMapState 须要传入 MapStateDescriptor 作为参数,构建 MapStateDescriptor 的第二个参数标识 Map 中 Key 的类型,第三个参数标识 Map 中 Value 的类型
在 processElement 办法中,将输出的流元素也就是用户以后的体温记录和 minMaxState 中的最低温和最低温进行比照,并更新到 minMaxState 中。
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class MapStateMinMaxTempKeyedProcessFunction

extends KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> {

private MapState<String, Integer> minMaxState;

private final String minKey = “min”;
private final String maxKey = “max”;

@Override
public void open(Configuration parameters) throws Exception {

// super.open(parameters);
minMaxState = getRuntimeContext().getMapState(
    new MapStateDescriptor<String, Integer>("minMaxState",
        String.class,
        Integer.class));

}

@Override
public void processElement(

  Tuple2<String, Integer> in,
  KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>>.Context context,
  Collector<Tuple3<String, Integer, Integer>> collector) throws Exception {if (!minMaxState.contains(minKey)) {minMaxState.put(minKey, in.f1);
}

if (!minMaxState.contains(maxKey)) {minMaxState.put(maxKey, in.f1);
}

if (in.f1 > minMaxState.get(maxKey)) {minMaxState.put(maxKey, in.f1);
}

if (in.f1 < minMaxState.get(minKey)) {minMaxState.put(minKey, in.f1);
}

collector.collect(Tuple3.of(in.f0, minMaxState.get(minKey), minMaxState.get(maxKey)));

}
}
代码地址:MapStateMinMaxTempKeyedProcessFunction.java

验证代码
编写验证代码如下:

private static void mapStateMinMaxUserTemperature() throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> inputStream = env.fromElements(new Tuple2<>("zhangsan",36),
    new Tuple2<>("lisi",37),
    new Tuple2<>("zhangsan",35),
    new Tuple2<>("lisi",38),
    new Tuple2<>("wangwu",37)
);

inputStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
      @Override
      public String getKey(Tuple2<String, Integer> integerLongTuple2) throws Exception {return integerLongTuple2.f0;}
    })
    .process(new MapStateMinMaxTempKeyedProcessFunction())
    .print();

env.execute("mapStateMinMaxUserTemperature");

}
代码地址:mapStateMinMaxUserTemperature

程序运行输入

(zhangsan,36,36)
(lisi,37,37)
(zhangsan,35,36)
(lisi,37,38)
(wangwu,37,37)
能够看到对于每一条用户记录,正确计算输入了以后的最低和最高高温,达到了预期的计算需要。

ReducingState
ReducingState<T> 这种 State 通过用户传入的 ReduceFucntion,每次调用 add(T)办法增加元素时,会调用 ReduceFucntion,最初合并到一个繁多的状态值,因而,ReducingState 须要指定 ReduceFucntion 实现状态数据的聚合。

ReducingState 获取元素应用 T get()办法。

接下来咱们通过一个具体的实例介绍如何通过 ReducingState<T> 计算每个人的以后最高体温。

定义 ReduceFunction
实现 reduce 办法,求以后最大值

new ReduceFunction<Integer>() {

      @Override
      public Integer reduce(Integer integer, Integer t1) throws Exception {return integer > t1 ? integer : t1;}
    }

实现 KeyedProcessFunction
申明 ReducingState<Integer> tempState 用于记录用户以后的最高体温;
在 open(Configuration parameters)中利用 getRuntimeContext().getReducingState 对 tempState 进行初始化,getReducingState 须要传入 ReducingStateDescriptor,ReducingStateDescriptor 构建的时候,第二个参数要参入 ReduceFunction;
在 processElement 中,将以后用户的以后体温传入到 tempState 中(tempState 在外部会调用 ReduceFunction), 而后从 tempState 获取用户目前为止的最高发问,和用户名一起放入到 Collector
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class ReducingStateKeyedProcessFunction
extends KeyedProcessFunction<String, Tuple2<String, Integer> , Tuple2<String, Integer>> {

private ReducingState<Integer> tempState;

@Override
public void open(Configuration parameters) throws Exception {

// super.open(parameters);
tempState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Integer>("reduce",
    new ReduceFunction<Integer>() {
      @Override
      public Integer reduce(Integer integer, Integer t1) throws Exception {return integer > t1 ? integer : t1;}
    }, Integer.class));

}

@Override
public void processElement(

  Tuple2<String, Integer> stringIntegerTuple2,
  KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>.Context context,
  Collector<Tuple2<String, Integer>> collector) throws Exception {tempState.add(stringIntegerTuple2.f1);
collector.collect(Tuple2.of(stringIntegerTuple2.f0, tempState.get()));

}
}
代码地址:ReducingStateKeyedProcessFunction.java

验证代码
private static void reducingStateUserTemperature() throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> inputStream = env.fromElements(new Tuple2<>("zhangsan",36),
    new Tuple2<>("lisi",37),
    new Tuple2<>("zhangsan",35),
    new Tuple2<>("lisi",38),
    new Tuple2<>("wangwu",37)
);

inputStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
      @Override
      public String getKey(Tuple2<String, Integer> integerLongTuple2) throws Exception {return integerLongTuple2.f0;}
    })
    .process(new ReducingStateKeyedProcessFunction())
    .print();

env.execute("reducingStateUserTemperature");

}
代码地址:reducingStateUserTemperature

程序运行后果如下

(zhangsan,36)
(lisi,37)
(zhangsan,36)
(lisi,38)
(wangwu,37)
达到预期成果

AggregatingState
AggregatingState<IN,OUT> 聚合 State 和 ReducingState<T> 十分相似,不同的是,这里聚合的类型能够是不同的元素类型,应用 add(IN)来退出元素,并应用 AggregateFunction 函数计算聚合后果。

AggregatingState<IN,OUT> 这种 State 通过用户传入的 AggregateFunction,每次调用 add(IN)办法增加元素时,会调用 AggregateFunction,最初合并到一个繁多的状态值,因而,ReducingState 须要指定 AggregateFunction 实现状态数据的聚合。

定义 AggregateFunction
AggregateFunction<IN, ACC, OUT> 中的三个类型参数,别离对应
IN: 流元素的类型;
ACC: AggregateFunction 外部累加器的类型
OUT: 最终输入的聚合后果的类型
AggregateFunction<IN, ACC, OUT> 须要重写 createAccumulator, add, getResult, merge 这四个函数
createAccumulator 用于创立累加器 Accumulator
add 用于将流元素退出到累加器 Accumulator
getResult 用于从累加器 Accumulator 中计算获取聚合后果
merge 目前临时用不到
代码如下

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;

public class AvgTempAggregateFunction

implements AggregateFunction<Tuple2<String, Integer>, Tuple2<Integer, Integer>, Double> {

@Override
public Tuple2<Integer, Integer> createAccumulator() {

return Tuple2.of(0, 0);

}

@Override
public Tuple2<Integer, Integer> add(

  Tuple2<String, Integer> in, Tuple2<Integer, Integer> acc) {
  // in 流元素,acc 外部累加器
acc.f0 = acc.f0 + in.f1;
acc.f1 = acc.f1 + 1;
return acc;

}

@Override
public Double getResult(Tuple2<Integer, Integer> acc) {

return new Double(acc.f0)/ acc.f1;

}

@Override
public Tuple2<Integer, Integer> merge(

  Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);

}
}
代码地址:AvgTempAggregateFunction.java

实现 KeyedProcesssFunction
申明 AggregatingState<Tuple2<String, Integer>, Double> avgState 用于记录用户以后的均匀体温。其中第一个类型参数标识流元素类型,第二个类型参数标识聚合后果的类型;
在 open(Configuration parameters)中利用 getRuntimeContext().getAggregatingState 对 avgState 进行初始化,getAggregatingState 须要传入 AggregatingStateDescriptor,AggregatingStateDescriptor 构建的时候,第二个参数要参入 AggregateFunction,第三个参数要传入 AggregateFunction 中累加器的 TypeInformation;
在 processElement 中,将以后用户的以后体温传入到 avgState 中(avgState 外部会调用 add),而后从 avgState 获取用户体温的聚合后果(avgState 外部调用 getResult),和用户名一起放入到 Collector
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class AggregatingStateKeyedProcessFunction
extends KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Double>> {
private AggregatingState<Tuple2<String, Integer>, Double> avgState;

@Override
public void open(Configuration parameters) throws Exception {

// super.open(parameters);
avgState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Tuple2<String, Integer>,
    Tuple2<Integer, Integer>, Double>("aggregating", new AvgTempAggregateFunction(),
    TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {})));

}

@Override
public void processElement(

  Tuple2<String, Integer> in,
  KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Double>>.Context context,
  Collector<Tuple2<String, Double>> collector) throws Exception {avgState.add(in);
collector.collect(Tuple2.of(in.f0, avgState.get().doubleValue()));

}
}
代码地址:AggregatingStateKeyedProcessFunction.java

验证代码
private static void aggregatingStateAvgUserTemperature() throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> inputStream = env.fromElements(new Tuple2<>("zhangsan",36),
    new Tuple2<>("lisi",37),
    new Tuple2<>("zhangsan",35),
    new Tuple2<>("lisi",38),
    new Tuple2<>("wangwu",37)
);

inputStream.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
      @Override
      public String getKey(Tuple2<String, Integer> integerLongTuple2) throws Exception {return integerLongTuple2.f0;}
    })
    .process(new AggregatingStateKeyedProcessFunction())
    .print();

env.execute("aggregatingStateAvgUserTemperature");

}
代码地址:aggregatingStateAvgUserTemperature,程序运行后果如下

(zhangsan,36.0)
(lisi,37.0)
(zhangsan,35.5)
(lisi,37.5)
(wangwu,37.0)
能够看到对于每一条流元素,输入了用户以及以后的均匀体温,实现了预期性能需要。

总结
本文通过典型的场景用例,展现了不同数据结构的 State 在不同计算场景下的应用办法,帮忙 Flink 开发者相熟 State 相干 API 以及 State 在相应的计算场景中如何存储、查问相干的两头计算信息从而实现计算目标。

参考资料
《Flink 原理、实战与性能优化》5.1 有状态计算
《Flink 内核原理与实现》第 7 章 状态原理
Flink 教程(17) Keyed State 状态治理之 AggregatingState 应用案例 求平均值 https://blog.csdn.net/winterk…
公布于 2022-03-11 09:57

退出移动版