Stream Processing With Flink (7) 状态算子和用户函数

38次阅读

共计 3642 个字符,预计需要花费 10 分钟才能阅读完成。

1. 状态函数的实现

状态函数通过运行上下文存储和访问状态
键状态类似于分布式 Map 每个状态函数实例维护一段范围的键状态
使用键状态的状态函数必须应用于 KeyedStream(已按键分区后的流)
键状态类型 包括单值 列表 Map 和聚合类型

1.1 在 RuntimeContext 中定义键状态(keyed State)
static class StateMachineMapper extends RichFlatMapFunction<Event, Alert> {
/** The state for the current key. */
private ValueState<Integer> currentState;
@Override
public void open(Configuration conf) {
// get access to the state object
currentState = getRuntimeContext().getState(new ValueStateDescriptor<>(“state”, Integer.class));
}
@Override
public void flatMap(Event evt, Collector<Alert> out) throws Exception {
// get the current state for the key (source address)
// if no state exists, yet, the state must be the state machine’s initial state
Integer state = currentState.value();
if(state==null){
currentState.update(1);
}else {
System.out.println(“key: “+evt.sourceAddress()+” state:”+state);
currentState.update(state + 1);
}
}
}
1.2 在用户函数中实现算子状态

算子状态(operator state)维护在每个单独的算子实例中
算子状态包括 List State,List Union State 和 BroadCast State
用户函数通过实现 ListCheckpointed 接口来操作 List State 算子状态

static class StateMachineMapper extends RichFlatMapFunction<Event, Alert> implements ListCheckpointed<Integer> {
/** The state for the current key. */
private Integer currentState=0;

@Override
public void flatMap(Event evt, Collector<Alert> out) throws Exception {
// get the current state for the key (source address)
// if no state exists, yet, the state must be the state machine’s initial state
System.out.println(currentState);
currentState=currentState+1;
}
//Flink 运行检查点时会执行该方法 对状态进行存储
@Override
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return Lists.newArrayList(currentState);
}
// 当作业启动或失败时会执行该方法用于状态的初始化
@Override
public void restoreState(List<Integer> state) throws Exception {
currentState=state.get(0);
}
}

算子状态类型为 List 结构是用于应对状态算子并行度的改变 当增加或减少状态算子并行度时 那算子状态就需要在并行实例中进行重分配 这需要要求能够合并或分割算子状态
Broadcast State 算子状态是能够在所有状态算子间共享的状态
用户函数通过继承 CheckpointedFunction 接口可同时操作键状态和算子状态
用户函数通过继承 CheckpointListener 接口获取所有状态算子完成将其状态回写远程存储的通知

2. 状态应用的鲁棒和性能
状态后端和检查点算法的选择影响状态应用的鲁棒和性能
2.1 状态后端(state backend)

状态后端负责维护每个算子实例的状态 且当检查点运行时负责将状态发送给远程持久化存储设备
状态后端是插件化实现的 Flink 提供三种状态后端实现 包括基于内存 基于磁盘和基于 RocksDB
StateBackend 是用于实现用户自定义状态后端的接口

// 配置 RocksDBStateBackend 为 Flink 应用的状态后端
final String checkpointDir = params.get(“checkpoint-dir”);
boolean incrementalCheckpoints = params.getBoolean(“incremental-checkpoints”, false);
env.setStateBackend(new RocksDBStateBackend(checkpointDir, incrementalCheckpoints));
2.2 检查点(Checkpointing)开启

流应用失败不应该影响计算正确性
流应用失败不应该丢失状态 因为其可能是不可恢复的
检查点机制指的是在流应用运行的某个时间点 对应用中所有内置状态和状态函数进行快照
检查点机制和状态恢复机制保证对流应用的状态的有且仅有一次的一致性保证
检查点开启需要设置一个运行周期 决定正常流处理中检查点运行的开销和失败后恢复的时间

val env = StreamExecutionEnvironment.getExecutionEnvironment
// set checkpointing interval to 10 seconds (10000 milliseconds)
env.enableCheckpointing(10000L)
2.3 状态算子的更新
保存点(savepoint)机制保证不会因更新状态算子而停止的应用在重启时丢失状态
2.4 调节状态应用性能
2.5 避免状态泄漏
3. 可查询状态(Queryable State)
键状态可以以只读的键值形式暴露给外部系统
3.1 可查询状态服务构成

QueryableStateClient 供外部系统使用的访问键状态的客户端
QueryableStateClientProxy 接受和响应客户端请求 每个 TM 运行一个该实例 因为键状态分布于所有算子实例 代理需要实现键对应的状态状态维护于哪个 TM 中 该信息维护于 JM 中
QueryableStateServer 对 ClientProxy 请求发起响应 每个 TM 运行一个该实例用于访问本地状态后端的键状态

3.2 可查询状态的暴露
在 open 方法中为键状态设置可查询状态
override def open(parameters: Configuration): Unit = {
// create state descriptor
val lastTempDescriptor =
new ValueStateDescriptor[Double](“lastTemp”, classOf[Double])
// enable queryable state and set its external identifier
lastTempDescriptor.setQueryable(“lastTemperature”)
// obtain the state handle
lastTempState = getRuntimeContext
.getState[Double](lastTempDescriptor)
}
将流写入一个可查询状态的 sink
tenSecsMaxTemps
.keyBy(_._1)
.asQueryableState(“maxTemperature”)
3.3 从外部系统访问可查询状态
通过引入依赖来获取 QueryableStateClient 相关代码
<dependency>
<groupid>org.apache.flink</groupid>
<artifactid>flink-queryable-state-client-java_2.11</artifactid>
<version>1.5.0</version>
</dependency>
创建访问可查询状态的客户端
//tmHostName 是任意 TM 的 IP 地址
val client: QueryableStateClient = new QueryableStateClient(tmHostname, proxyPort)

正文完
 0