1. 状态函数的实现状态函数通过运行上下文存储和访问状态键状态类似于分布式Map 每个状态函数实例维护一段范围的键状态使用键状态的状态函数必须应用于KeyedStream(已按键分区后的流)键状态类型 包括单值 列表 Map和聚合类型1.1 在RuntimeContext中定义键状态(keyed State)static class StateMachineMapper extends RichFlatMapFunction<Event, Alert> { /** The state for the current key. / private ValueState<Integer> currentState; @Override public void open(Configuration conf) { // get access to the state object currentState = getRuntimeContext().getState(new ValueStateDescriptor<>(“state”, Integer.class)); } @Override public void flatMap(Event evt, Collector<Alert> out) throws Exception { // get the current state for the key (source address) // if no state exists, yet, the state must be the state machine’s initial state Integer state = currentState.value(); if(state==null){ currentState.update(1); }else { System.out.println(“key: “+evt.sourceAddress()+” state:"+state); currentState.update(state + 1); } }}1.2 在用户函数中实现算子状态算子状态(operator state)维护在每个单独的算子实例中算子状态包括List State,List Union State和BroadCast State用户函数通过实现ListCheckpointed接口来操作List State算子状态static class StateMachineMapper extends RichFlatMapFunction<Event, Alert> implements ListCheckpointed<Integer> { /* The state for the current key. */ private Integer currentState=0; @Override public void flatMap(Event evt, Collector<Alert> out) throws Exception { // get the current state for the key (source address) // if no state exists, yet, the state must be the state machine’s initial state System.out.println(currentState); currentState=currentState+1; }//Flink运行检查点时会执行该方法 对状态进行存储 @Override public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { return Lists.newArrayList(currentState); }//当作业启动或失败时会执行该方法用于状态的初始化 @Override public void restoreState(List<Integer> state) throws Exception { currentState=state.get(0); }}算子状态类型为List结构是用于应对状态算子并行度的改变 当增加或减少状态算子并行度时 那算子状态就需要在并行实例中进行重分配 这需要要求能够合并或分割算子状态Broadcast State算子状态是能够在所有状态算子间共享的状态用户函数通过继承CheckpointedFunction接口可同时操作键状态和算子状态用户函数通过继承CheckpointListener接口获取所有状态算子完成将其状态回写远程存储的通知2.状态应用的鲁棒和性能状态后端和检查点算法的选择影响状态应用的鲁棒和性能2.1 状态后端(state backend)状态后端负责维护每个算子实例的状态 且当检查点运行时负责将状态发送给远程持久化存储设备状态后端是插件化实现的 Flink提供三种状态后端实现 包括基于内存 基于磁盘和基于RocksDBStateBackend是用于实现用户自定义状态后端的接口//配置RocksDBStateBackend为Flink应用的状态后端final String checkpointDir = params.get(“checkpoint-dir”);boolean incrementalCheckpoints = params.getBoolean(“incremental-checkpoints”, false);env.setStateBackend(new RocksDBStateBackend(checkpointDir, incrementalCheckpoints));2.2 检查点(Checkpointing)开启流应用失败不应该影响计算正确性流应用失败不应该丢失状态 因为其可能是不可恢复的检查点机制指的是在流应用运行的某个时间点 对应用中所有内置状态和状态函数进行快照检查点机制和状态恢复机制保证对流应用的状态的有且仅有一次的一致性保证检查点开启需要设置一个运行周期 决定正常流处理中检查点运行的开销和失败后恢复的时间val env = StreamExecutionEnvironment.getExecutionEnvironment// set checkpointing interval to 10 seconds (10000 milliseconds)env.enableCheckpointing(10000L)2.3 状态算子的更新保存点(savepoint)机制保证不会因更新状态算子而停止的应用在重启时丢失状态2.4 调节状态应用性能2.5 避免状态泄漏3. 可查询状态(Queryable State)键状态可以以只读的键值形式暴露给外部系统3.1 可查询状态服务构成QueryableStateClient 供外部系统使用的访问键状态的客户端QueryableStateClientProxy 接受和响应客户端请求 每个TM运行一个该实例 因为键状态分布于所有算子实例 代理需要实现键对应的状态状态维护于哪个TM中 该信息维护于JM中QueryableStateServer 对ClientProxy请求发起响应 每个TM运行一个该实例用于访问本地状态后端的键状态3.2 可查询状态的暴露在open方法中为键状态设置可查询状态override def open(parameters: Configuration): Unit = { // create state descriptor val lastTempDescriptor = new ValueStateDescriptor[Double](“lastTemp”, classOf[Double]) // enable queryable state and set its external identifier lastTempDescriptor.setQueryable(“lastTemperature”) // obtain the state handle lastTempState = getRuntimeContext .getStateDouble}将流写入一个可查询状态的sinktenSecsMaxTemps .keyBy(_._1) .asQueryableState(“maxTemperature”)3.3 从外部系统访问可查询状态通过引入依赖来获取QueryableStateClient相关代码<dependency> <groupid>org.apache.flink</groupid> <artifactid>flink-queryable-state-client-java_2.11</artifactid> <version>1.5.0</version></dependency>创建访问可查询状态的客户端//tmHostName是任意TM的IP地址 val client: QueryableStateClient = new QueryableStateClient(tmHostname, proxyPort)