关于flink:Flink之状态管理State

74次阅读

共计 5690 个字符,预计需要花费 15 分钟才能阅读完成。

Operator State

Operator State 能够用在所有算子上,每个算子子工作或者说每个算子实例共享一个状态,流入这个算子子工作的数据能够拜访和更新这个状态。下图展现了 Operator State,算子子工作 1 上的所有数据能够共享第一个 Operator State,以此类推,每个算子子工作上的数据共享本人的状态。

如何应用 Operator State 呢?咱们能够通过实现 CheckpointedFunction 接口来实现,或者实现 ListCheckpointed<T extends Serializable> 接口来实现,它们之间次要的区别是:实现 CheckpointedFunction 接口,有两种模式的 ListState API 能够应用,别离是getListState 以及getListUnionState,它们都会返回一个ListState,然而他们在从新分区的时候会有区别,前面会具体介绍。。。

Operator State 的理论利用场景不如 Keyed State 多,它常常被用在 Source 或 Sink 等算子上,用来保留流入数据的偏移量或对输入数据做缓存,以保障 Flink 利用的 Exactly-Once 语义。这里咱们来看一个 Flink 官网提供的 Sink 案例以理解 CheckpointedFunction 的工作原理。

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;

/**
 * @Author Natasha
 * @Description 输入到 Sink 之前,先将数据放在本地缓存中,并定期进行 snapshot。即便程序解体,状态中存储着还未输入的数据,下次启动后还会将这些未输入数据读取到内存,持续输入到内部零碎。* @Date 2020/10/19 16:30
 **/


public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction {

    private final int threshold;
    // 托管状态
    private transient ListState<Tuple2<String, Integer>> checkpointedState;
    // 原始状态(本地缓存)private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList();}

    @Override
    //Sink 的外围解决逻辑,将上游数据 value 输入到内部零碎:在 invoke 办法外头先将 value 缓存到 bufferedElements,缓存个数触发阈值时,执行 sink 操作,而后清空 bufferedElements
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        // 先将上游数据缓存到本地的缓存
        bufferedElements.add(value);
        // 当本地缓存大小达到阈值时,将本地缓存输入到内部零碎
        if (bufferedElements.size() == threshold) {for (Tuple2<String, Integer> element: bufferedElements) {// send it to the sink}
            // 清空本地缓存
            bufferedElements.clear();}
    }

    @Override
    // Checkpoint 触发时会调用这个办法,对 bufferedElements 进行 snapshot 本地状态长久化
    public void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedState.clear();
        // 将最新的数据写到状态中
        for (Tuple2<String, Integer> element : bufferedElements) {checkpointedState.add(element);
        }
    }

    @Override
    // 第一次初始化时会调用这个办法,或者从之前的检查点复原时也会调用这个办法
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
                new ListStateDescriptor(
                        "buffered-elements",
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);
        // 如果是作业重启,读取存储中的状态数据并填充到本地缓存中
        if (context.isRestored()) {for (Tuple2<String, Integer> element : checkpointedState.get()) {
                // 从托管状态将数据到挪动到原始状态
                bufferedElements.add(element);
            }
        }
    }
}

Keyed State

Keyed State 是 KeyedStream 上的状态。如果输出流依照 id 为 Key 进行了 keyBy 分组,造成一个KeyedStream,数据流中所有 id 为 1 的数据共享一个状态,能够拜访和更新这个状态,以此类推,每个 Key 对应一个本人的状态。下图展现了 Keyed State,因为一个算子子工作能够解决一到多个 Key,算子子工作 1 解决了两种 Key,两种 Key 别离对应本人的状态。

它次要提供了以下的 state:

  • Value State:ValueState 分区的单值状态。
  • Map State:MapState<UK,UV> 分区的键值状态。
  • List State:ListState 分区的列表状态。
  • Reducing State:ReducingState 每次调用 add(T) 增加新元素,会调用 ReduceFunction 进行聚合。传入类型和返回类型雷同。
  • Aggregating State:AggregatingState<IN,OUT> 每次调用 add(T) 增加新元素,会调用 ReduceFunction 进行聚合。传入类型和返回类型能够不同。

上面是一个简略的示例,计算每一个 key 中均匀每 3 个数据的平均值,如:

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


/**
 * @Author Natasha
 * @Description 计算不同 key 的均匀每三个之间的平均值
 * @Date 2020/10/19 17:46
 **/
public class KeyedStateDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment  env=StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<Tuple2<Long,Long>> input=env.fromElements(Tuple2.of(1L,4L),
                Tuple2.of(1L,2L),
                Tuple2.of(1L,6L),
                Tuple2.of(2L,4L),
                Tuple2.of(2L,4L),
                Tuple2.of(3L,5L),
                Tuple2.of(2L,3L),
                Tuple2.of(1L,4L)
        );

        input.keyBy(0)
                .flatMap(new KeyedStateAgvFlatMap())
                .setParallelism(10)
                .print();

        env.execute();}


    public static class KeyedStateAgvFlatMap extends RichFlatMapFunction<Tuple2<Long,Long>,Tuple2<Long,Long>> {

        private ValueState<Tuple2<Long,Long>> valueState;

        @Override
        public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> collector) throws Exception {Tuple2<Long,Long> currentValue=valueState.value();
            if(currentValue==null){currentValue=Tuple2.of(0L,0L);
            }
            currentValue.f0+=1;
            currentValue.f1+=value.f1;
            valueState.update(currentValue);
            // 大于三个
            if(currentValue.f0>=3){collector.collect(Tuple2.of(value.f0,currentValue.f1/currentValue.f0));
                valueState.clear();}
        }

        @Override
        public void open(Configuration parameters) throws Exception {super.open(parameters);

            //keyedState 能够设置 TTL 过期工夫
            StateTtlConfig config=StateTtlConfig
                    .newBuilder(Time.seconds(30))
                    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                    .build();

            ValueStateDescriptor valueStateDescriptor=new ValueStateDescriptor("agvKeyedState",
                    TypeInformation.of(new TypeHint<Tuple2<Long,Long>>() {}));

            // 设置反对 TTL 配置
            valueStateDescriptor.enableTimeToLive(config);

            valueState=getRuntimeContext().getState(valueStateDescriptor);
        }
    }
}

正文完
 0