聊聊flink的ProcessFunction

29次阅读

共计 3834 个字符,预计需要花费 10 分钟才能阅读完成。


本文主要研究一下 flink 的 ProcessFunction
实例
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;

// the source data stream
DataStream<Tuple2<String, String>> stream = …;

// apply the process function onto a keyed stream
DataStream<Tuple2<String, Long>> result = stream
.keyBy(0)
.process(new CountWithTimeoutFunction());

/**
* The data type stored in the state
*/
public class CountWithTimestamp {

public String key;
public long count;
public long lastModified;
}

/**
* The implementation of the ProcessFunction that maintains the count and timeouts
*/
public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {

/** The state that is maintained by this process function */
private ValueState<CountWithTimestamp> state;

@Override
public void open(Configuration parameters) throws Exception {
state = getRuntimeContext().getState(new ValueStateDescriptor<>(“myState”, CountWithTimestamp.class));
}

@Override
public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
throws Exception {

// retrieve the current count
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}

// update the state’s count
current.count++;

// set the state’s timestamp to the record’s assigned event time timestamp
current.lastModified = ctx.timestamp();

// write the state back
state.update(current);

// schedule the next timer 60 seconds from the current event time
ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
throws Exception {

// get the state for the key that scheduled the timer
CountWithTimestamp result = state.value();

// check if this is an outdated timer or the latest timer
if (timestamp == result.lastModified + 60000) {
// emit the state on timeout
out.collect(new Tuple2<String, Long>(result.key, result.count));
}
}
}

本实例展示了如何在 ProcessFunction 里头使用 keyed state 以及 timer;process 方法使用的 ProcessFunction 是 CountWithTimeoutFunction
CountWithTimeoutFunction 的 open 方法创建了 CountWithTimestamp 类型的 ValueState;processElement 方法里头会更新该 ValueState,用于记录每个 key 的 element 个数以及最后访问时间,然后注册一个 EventTimeTimer,在当前 eventTime 时间的 60 秒后到达
onTimer 用于响应 timer,它会判断如果该 key 在 60 秒内没有被 update,则 emit 相关数据

ProcessFunction
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/ProcessFunction.java
@PublicEvolving
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {

private static final long serialVersionUID = 1L;

public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

public abstract class Context {

public abstract Long timestamp();

public abstract TimerService timerService();

public abstract <X> void output(OutputTag<X> outputTag, X value);
}

public abstract class OnTimerContext extends Context {

public abstract TimeDomain timeDomain();
}

}

ProcessFunction 继承了 AbstractRichFunction(可以通过 RuntimeContext 获取 keyed state),它定义了抽象方法 processElement 以及抽象类 Context、OnTimerContext
Context 里头有三个抽象方法,分别是 timestamp、timerService、output;OnTimerContext 继承了 Context,它定义了 timeDomain 方法
ProcessFunction 还定义了 onTimer 方法,用于响应 TimerService 触发的 timer

小结

ProcessFunction 是 low-level 的 stream 处理操作,它相当于可以访问 keyed state 及 timer 的 FlatMapFunction,当要使用 keyed state 或者 timer 的时候,可以使用 ProcessFunction
ProcessFunction 继承了 AbstractRichFunction(可以通过 RuntimeContext 获取 keyed state),它定义了抽象方法 processElement 以及抽象类 Context、OnTimerContext
Context 里头有三个抽象方法,分别是 timestamp、timerService、output;OnTimerContext 继承了 Context,它定义了 timeDomain 方法;ProcessFunction 还定义了 onTimer 方法,用于响应 TimerService 触发的 timer

doc
Process Function (Low-level Operations)

正文完
 0