Operator State
Operator State 能够用在所有算子上,每个算子子工作或者说每个算子实例共享一个状态,流入这个算子子工作的数据能够拜访和更新这个状态。下图展现了 Operator State,算子子工作 1 上的所有数据能够共享第一个 Operator State,以此类推,每个算子子工作上的数据共享本人的状态。
如何应用 Operator State
呢?咱们能够通过实现 CheckpointedFunction
接口来实现,或者实现 ListCheckpointed<T extends Serializable>
接口来实现,它们之间次要的区别是:实现 CheckpointedFunction
接口,有两种模式的 ListState
API 能够应用,别离是getListState
以及getListUnionState
,它们都会返回一个ListState
,然而他们在从新分区的时候会有区别,前面会具体介绍。。。
Operator State 的理论利用场景不如 Keyed State 多,它常常被用在 Source 或 Sink 等算子上,用来保留流入数据的偏移量或对输入数据做缓存,以保障 Flink 利用的 Exactly-Once 语义。这里咱们来看一个 Flink 官网提供的 Sink 案例以理解 CheckpointedFunction
的工作原理。
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* @Author Natasha
* @Description 输入到 Sink 之前,先将数据放在本地缓存中,并定期进行 snapshot。即便程序解体,状态中存储着还未输入的数据,下次启动后还会将这些未输入数据读取到内存,持续输入到内部零碎。* @Date 2020/10/19 16:30
**/
public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction {
private final int threshold;
// 托管状态
private transient ListState<Tuple2<String, Integer>> checkpointedState;
// 原始状态(本地缓存)private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList();}
@Override
//Sink 的外围解决逻辑,将上游数据 value 输入到内部零碎:在 invoke 办法外头先将 value 缓存到 bufferedElements,缓存个数触发阈值时,执行 sink 操作,而后清空 bufferedElements
public void invoke(Tuple2<String, Integer> value) throws Exception {
// 先将上游数据缓存到本地的缓存
bufferedElements.add(value);
// 当本地缓存大小达到阈值时,将本地缓存输入到内部零碎
if (bufferedElements.size() == threshold) {for (Tuple2<String, Integer> element: bufferedElements) {// send it to the sink}
// 清空本地缓存
bufferedElements.clear();}
}
@Override
// Checkpoint 触发时会调用这个办法,对 bufferedElements 进行 snapshot 本地状态长久化
public void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedState.clear();
// 将最新的数据写到状态中
for (Tuple2<String, Integer> element : bufferedElements) {checkpointedState.add(element);
}
}
@Override
// 第一次初始化时会调用这个办法,或者从之前的检查点复原时也会调用这个办法
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
// 如果是作业重启,读取存储中的状态数据并填充到本地缓存中
if (context.isRestored()) {for (Tuple2<String, Integer> element : checkpointedState.get()) {
// 从托管状态将数据到挪动到原始状态
bufferedElements.add(element);
}
}
}
}
Keyed State
Keyed State 是 KeyedStream
上的状态。如果输出流依照 id 为 Key 进行了 keyBy
分组,造成一个KeyedStream
,数据流中所有 id 为 1 的数据共享一个状态,能够拜访和更新这个状态,以此类推,每个 Key 对应一个本人的状态。下图展现了 Keyed State,因为一个算子子工作能够解决一到多个 Key,算子子工作 1 解决了两种 Key,两种 Key 别离对应本人的状态。
它次要提供了以下的 state:
- Value State:ValueState 分区的单值状态。
- Map State:MapState<UK,UV> 分区的键值状态。
- List State:ListState 分区的列表状态。
- Reducing State:ReducingState 每次调用 add(T) 增加新元素,会调用 ReduceFunction 进行聚合。传入类型和返回类型雷同。
- Aggregating State:AggregatingState<IN,OUT> 每次调用 add(T) 增加新元素,会调用 ReduceFunction 进行聚合。传入类型和返回类型能够不同。
上面是一个简略的示例,计算每一个 key 中均匀每 3 个数据的平均值,如:
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @Author Natasha
* @Description 计算不同 key 的均匀每三个之间的平均值
* @Date 2020/10/19 17:46
**/
public class KeyedStateDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Tuple2<Long,Long>> input=env.fromElements(Tuple2.of(1L,4L),
Tuple2.of(1L,2L),
Tuple2.of(1L,6L),
Tuple2.of(2L,4L),
Tuple2.of(2L,4L),
Tuple2.of(3L,5L),
Tuple2.of(2L,3L),
Tuple2.of(1L,4L)
);
input.keyBy(0)
.flatMap(new KeyedStateAgvFlatMap())
.setParallelism(10)
.print();
env.execute();}
public static class KeyedStateAgvFlatMap extends RichFlatMapFunction<Tuple2<Long,Long>,Tuple2<Long,Long>> {
private ValueState<Tuple2<Long,Long>> valueState;
@Override
public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> collector) throws Exception {Tuple2<Long,Long> currentValue=valueState.value();
if(currentValue==null){currentValue=Tuple2.of(0L,0L);
}
currentValue.f0+=1;
currentValue.f1+=value.f1;
valueState.update(currentValue);
// 大于三个
if(currentValue.f0>=3){collector.collect(Tuple2.of(value.f0,currentValue.f1/currentValue.f0));
valueState.clear();}
}
@Override
public void open(Configuration parameters) throws Exception {super.open(parameters);
//keyedState 能够设置 TTL 过期工夫
StateTtlConfig config=StateTtlConfig
.newBuilder(Time.seconds(30))
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.build();
ValueStateDescriptor valueStateDescriptor=new ValueStateDescriptor("agvKeyedState",
TypeInformation.of(new TypeHint<Tuple2<Long,Long>>() {}));
// 设置反对 TTL 配置
valueStateDescriptor.enableTimeToLive(config);
valueState=getRuntimeContext().getState(valueStateDescriptor);
}
}
}