Operator State
Operator State能够用在所有算子上,每个算子子工作或者说每个算子实例共享一个状态,流入这个算子子工作的数据能够拜访和更新这个状态。下图展现了Operator State,算子子工作1上的所有数据能够共享第一个Operator State,以此类推,每个算子子工作上的数据共享本人的状态。
如何应用Operator State
呢?咱们能够通过实现CheckpointedFunction
接口来实现,或者实现ListCheckpointed<T extends Serializable>
接口来实现,它们之间次要的区别是:实现CheckpointedFunction
接口,有两种模式的ListState
API能够应用,别离是getListState
以及getListUnionState
,它们都会返回一个ListState
,然而他们在从新分区的时候会有区别,前面会具体介绍。。。
Operator State的理论利用场景不如Keyed State多,它常常被用在Source或Sink等算子上,用来保留流入数据的偏移量或对输入数据做缓存,以保障Flink利用的Exactly-Once语义。这里咱们来看一个Flink官网提供的Sink案例以理解CheckpointedFunction
的工作原理。
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* @Author Natasha
* @Description 输入到Sink之前,先将数据放在本地缓存中,并定期进行snapshot。即便程序解体,状态中存储着还未输入的数据,下次启动后还会将这些未输入数据读取到内存,持续输入到内部零碎。
* @Date 2020/10/19 16:30
**/
public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction {
private final int threshold;
//托管状态
private transient ListState<Tuple2<String, Integer>> checkpointedState;
//原始状态(本地缓存)
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList();
}
@Override
//Sink的外围解决逻辑,将上游数据value输入到内部零碎:在invoke办法外头先将value缓存到bufferedElements,缓存个数触发阈值时,执行sink操作,而后清空bufferedElements
public void invoke(Tuple2<String, Integer> value) throws Exception {
// 先将上游数据缓存到本地的缓存
bufferedElements.add(value);
// 当本地缓存大小达到阈值时,将本地缓存输入到内部零碎
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
// 清空本地缓存
bufferedElements.clear();
}
}
@Override
// Checkpoint触发时会调用这个办法,对bufferedElements进行snapshot本地状态长久化
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
//将最新的数据写到状态中
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
// 第一次初始化时会调用这个办法,或者从之前的检查点复原时也会调用这个办法
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
// 如果是作业重启,读取存储中的状态数据并填充到本地缓存中
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
//从托管状态将数据到挪动到原始状态
bufferedElements.add(element);
}
}
}
}
Keyed State
Keyed State是KeyedStream
上的状态。如果输出流依照id为Key进行了keyBy
分组,造成一个KeyedStream
,数据流中所有id为1的数据共享一个状态,能够拜访和更新这个状态,以此类推,每个Key对应一个本人的状态。下图展现了Keyed State,因为一个算子子工作能够解决一到多个Key,算子子工作1解决了两种Key,两种Key别离对应本人的状态。
它次要提供了以下的state:
- Value State:ValueState 分区的单值状态。
- Map State:MapState<UK,UV> 分区的键值状态。
- List State:ListState 分区的列表状态。
- Reducing State:ReducingState 每次调用 add(T) 增加新元素,会调用 ReduceFunction 进行聚合。传入类型和返回类型雷同。
- Aggregating State:AggregatingState<IN,OUT> 每次调用 add(T) 增加新元素,会调用ReduceFunction 进行聚合。传入类型和返回类型能够不同。
上面是一个简略的示例,计算每一个key中均匀每3个数据的平均值,如:
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @Author Natasha
* @Description 计算不同key的均匀每三个之间的平均值
* @Date 2020/10/19 17:46
**/
public class KeyedStateDemo {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Tuple2<Long,Long>> input=env.fromElements(
Tuple2.of(1L,4L),
Tuple2.of(1L,2L),
Tuple2.of(1L,6L),
Tuple2.of(2L,4L),
Tuple2.of(2L,4L),
Tuple2.of(3L,5L),
Tuple2.of(2L,3L),
Tuple2.of(1L,4L)
);
input.keyBy(0)
.flatMap(new KeyedStateAgvFlatMap())
.setParallelism(10)
.print();
env.execute();
}
public static class KeyedStateAgvFlatMap extends RichFlatMapFunction<Tuple2<Long,Long>,Tuple2<Long,Long>> {
private ValueState<Tuple2<Long,Long>> valueState;
@Override
public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> collector) throws Exception {
Tuple2<Long,Long> currentValue=valueState.value();
if(currentValue==null){
currentValue=Tuple2.of(0L,0L);
}
currentValue.f0+=1;
currentValue.f1+=value.f1;
valueState.update(currentValue);
//大于三个
if(currentValue.f0>=3){
collector.collect(Tuple2.of(value.f0,currentValue.f1/currentValue.f0));
valueState.clear();
}
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//keyedState能够设置TTL过期工夫
StateTtlConfig config=StateTtlConfig
.newBuilder(Time.seconds(30))
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.build();
ValueStateDescriptor valueStateDescriptor=new ValueStateDescriptor("agvKeyedState",
TypeInformation.of(new TypeHint<Tuple2<Long,Long>>() {}));
//设置反对TTL配置
valueStateDescriptor.enableTimeToLive(config);
valueState=getRuntimeContext().getState(valueStateDescriptor);
}
}
}
发表回复