序本文主要研究一下flink的ProcessFunction实例import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.ProcessFunction;import org.apache.flink.streaming.api.functions.ProcessFunction.Context;import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;import org.apache.flink.util.Collector;// the source data streamDataStream<Tuple2<String, String>> stream = …;// apply the process function onto a keyed streamDataStream<Tuple2<String, Long>> result = stream .keyBy(0) .process(new CountWithTimeoutFunction());/** * The data type stored in the state /public class CountWithTimestamp { public String key; public long count; public long lastModified;}/* * The implementation of the ProcessFunction that maintains the count and timeouts /public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> { /* The state that is maintained by this process function */ private ValueState<CountWithTimestamp> state; @Override public void open(Configuration parameters) throws Exception { state = getRuntimeContext().getState(new ValueStateDescriptor<>(“myState”, CountWithTimestamp.class)); } @Override public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception { // retrieve the current count CountWithTimestamp current = state.value(); if (current == null) { current = new CountWithTimestamp(); current.key = value.f0; } // update the state’s count current.count++; // set the state’s timestamp to the record’s assigned event time timestamp current.lastModified = ctx.timestamp(); // write the state back state.update(current); // schedule the next timer 60 seconds from the current event time ctx.timerService().registerEventTimeTimer(current.lastModified + 60000); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception { // get the state for the key that scheduled the timer CountWithTimestamp result = state.value(); // check if this is an outdated timer or the latest timer if (timestamp == result.lastModified + 60000) { // emit the state on timeout out.collect(new Tuple2<String, Long>(result.key, result.count)); } }}本实例展示了如何在ProcessFunction里头使用keyed state以及timer;process方法使用的ProcessFunction是CountWithTimeoutFunctionCountWithTimeoutFunction的open方法创建了CountWithTimestamp类型的ValueState;processElement方法里头会更新该ValueState,用于记录每个key的element个数以及最后访问时间,然后注册一个EventTimeTimer,在当前eventTime时间的60秒后到达onTimer用于响应timer,它会判断如果该key在60秒内没有被update,则emit相关数据ProcessFunctionflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/ProcessFunction.java@PublicEvolvingpublic abstract class ProcessFunction<I, O> extends AbstractRichFunction { private static final long serialVersionUID = 1L; public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception; public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {} public abstract class Context { public abstract Long timestamp(); public abstract TimerService timerService(); public abstract <X> void output(OutputTag<X> outputTag, X value); } public abstract class OnTimerContext extends Context { public abstract TimeDomain timeDomain(); }}ProcessFunction继承了AbstractRichFunction(可以通过RuntimeContext获取keyed state),它定义了抽象方法processElement以及抽象类Context、OnTimerContextContext里头有三个抽象方法,分别是timestamp、timerService、output;OnTimerContext继承了Context,它定义了timeDomain方法ProcessFunction还定义了onTimer方法,用于响应TimerService触发的timer小结ProcessFunction是low-level的stream处理操作,它相当于可以访问keyed state及timer的FlatMapFunction,当要使用keyed state或者timer的时候,可以使用ProcessFunctionProcessFunction继承了AbstractRichFunction(可以通过RuntimeContext获取keyed state),它定义了抽象方法processElement以及抽象类Context、OnTimerContextContext里头有三个抽象方法,分别是timestamp、timerService、output;OnTimerContext继承了Context,它定义了timeDomain方法;ProcessFunction还定义了onTimer方法,用于响应TimerService触发的timerdocProcess Function (Low-level Operations)