共计 3743 个字符,预计需要花费 10 分钟才能阅读完成。
序
本文主要研究一下 flink 的 Queryable State
实例
Job
@Test
public void testValueStateForQuery() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createRemoteEnvironment(“192.168.99.100”, 8081, SubmitTest.JAR_FILE);
env.addSource(new RandomTuple2Source())
.keyBy(0) //key by first value of tuple
.flatMap(new CountWindowAverage())
.print();
JobExecutionResult result = env.execute(“testQueryableState”);
LOGGER.info(“submit job result:{}”,result);
}
这里运行一个 job,它对 tuple 的第一个值作为 key,然后 flatMap 操作使用的是 CountWindowAverage
CountWindowAverage
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private transient ValueState<Tuple2<Long, Long>> sum; // a tuple containing the count and the sum
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
Tuple2<Long, Long> currentSum = sum.value();
if(currentSum == null){
currentSum = Tuple2.of(1L,input.f1);
}else{
currentSum.f0 += 1;
currentSum.f1 += input.f1;
}
sum.update(currentSum);
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
“average”, // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information
descriptor.setQueryable(“query-name”);
sum = getRuntimeContext().getState(descriptor);
}
}
CountWindowAverage 通过 ValueStateDescriptor 的 setQueryable(“query-name”) 方法,将 state 声明为是 queryable 的
QueryableStateClient
@Test
public void testQueryStateByJobId() throws InterruptedException, IOException {
//get jobId from flink ui running job page
JobID jobId = JobID.fromHexString(“793edfa93f354aa0274f759cb13ce79e”);
long key = 1L;
//flink-core-1.7.0-sources.jar!/org/apache/flink/configuration/QueryableStateOptions.java
QueryableStateClient client = new QueryableStateClient(“192.168.99.100”, 9069);
// the state descriptor of the state to be fetched.
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
“average”,
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture =
client.getKvState(jobId, “query-name”, key, BasicTypeInfo.LONG_TYPE_INFO, descriptor);
LOGGER.info(“get kv state return future, waiting……”);
// org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException: Queryable State Server : No state for the specified key/namespace.
ValueState<Tuple2<Long, Long>> res = resultFuture.join();
LOGGER.info(“query result:{}”,res.value());
client.shutdownAndWait();
}
这里通过 QueryableStateClient 连接 QueryableStateClientProxy 进行 query state;这里的 jobId 可以在 job 提交之后,通过 ui 界面查询得到,然后使用 JobID.fromHexString 方法转为 JobID 对象
小结
Queryable State 的功能目前是 beta 版本,flink1.7 的发行版默认没有开启,要开启的话,需要将 flink-queryable-state-runtime_2.11-1.7.0.jar 拷贝到 /opt/flink/lib/ 目录下,这样子 task manager 启动的时候会打印诸如 Started Queryable State Proxy Server @ /172.20.0.3:9069 的日志,这样子就可以确认是启用了该功能
Queryable State 在架构上涉及三个组件,一个是 QueryableStateServer,它会在每个 task manager 上运行,负责本地 state 存储;一个是 QueryableStateClientProxy,它也在每个 task manager 上运行,负责接收 client 发来的查询请求,然后从对应的 task manager 上获取对应的 state,然后返回给 client;一个是 QueryableStateClient,它就是通常是运行在 flink cluster 之外,用于提交用户的 state query
QueryableStateServer 以及 QueryableStateClientProxy 均有 ports、network-threads、query-threads 的属性可以配置;QueryableStateServer 默认的 query.server.ports 值为 9097;QueryableStateClientProxy 默认的 query.proxy.ports 值为 9096,client 端需要使用这个端口来进行请求
声明 state 为 queryable 有两个方法,一个是通过 KeyedStream.asQueryableState 方法转为 QueryableStateStream;一个是调用 Managed keyed State 的 StateDescriptor 的 setQueryable 进行声明;这两个的区别在于 asQueryableState 必须是直接作用于 KeyedStream 对象,因此 KeyedStream 就不能做后续的 transform 操作,类似于 sink;而通过 StateDescriptor 的 setQueryable 进行声明则相对灵活一点;这里要注意没有 queryable ListState
Queryable State 目前有几点限制,一个是它生命周期跟 task 一样,在 task 运行完的时候就销毁了,没办法查询,后续可能支持在 task 完成之后查询;一个是目前的 KvState 的 Notifications 进行使用 tell 机制,后续可能改为 ack 模式;一个是目前 query 的 statistics 默认是禁用的,后续可能支持发布到 metrics system
doc
Queryable State Beta