在Flink的工夫与watermarks详解这篇文章中,论述了Flink的工夫与水位线的相干内容。你可能不禁要提问,该如何拜访工夫戳和水位线呢?首先通过一般的DataStream API是无法访问的,须要借助Flink提供的一个底层的API——Process Function。Process Function不仅可能拜访工夫戳与水位线,而且还能够注册在未来的某个特定工夫触发的计时器(timers)。除此之外,还能够将数据通过Side Outputs发送到多个输入流中。这样以来,能够实现数据分流的性能,同时也是解决早退数据的一种形式。上面咱们将从源码动手,联合具体的应用案例来阐明该如何应用Process Function。
<!– more –>
简介
Flink提供了很多Process Function,每种Process Function都有各自的性能,这些Process Function次要包含:
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- ProcessJoinFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
BaseBroadcastProcessFunction
- KeyedBroadcastProcessFunction
- BroadcastProcessFunction
继承关系图如下:
从下面的继承关系中能够看出,都实现了RichFunction接口,所以反对应用open()
、close()
、getRuntimeContext()
等办法的调用。从名字上能够看出,这些函数都有不同的实用场景,然而根本的性能是相似的,上面会以KeyedProcessFunction为例来探讨这些函数的通用性能。
源码
KeyedProcessFunction
/**
* 解决KeyedStream流的低级API函数
* 对于输出流中的每个元素都会触发调用processElement办法.该办法会产生0个或多个输入.
* 其实现类能够通过Context拜访数据的工夫戳和计时器(timers).当计时器(timers)触发时,会回调onTimer办法.
* onTimer办法会产生0个或者多个输入,并且会注册一个将来的计时器.
*
* 留神:如果要拜访keyed state和计时器(timers),必须在KeyedStream上应用KeyedProcessFunction.
* 另外,KeyedProcessFunction的父类AbstractRichFunction实现了RichFunction接口,所以,能够应用
* open(),close()及getRuntimeContext()办法.
*
* @param <K> key的类型
* @param <I> 输出元素的数据类型
* @param <O> 输入元素的数据类型
*/
@PublicEvolving
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
/**
* 解决输出流中的每个元素
* 该办法会输入0个或者多个输入,相似于FlatMap的性能
* 除此之外,该办法还能够更新外部状态或者设置计时器(timer)
* @param value 输出元素
* @param ctx Context,能够拜访输出元素的工夫戳,并其能够获取一个工夫服务器(TimerService),用于注册计时器(timers)并查问工夫
* Context只有在processElement被调用期间无效.
* @param out 返回的后果值
* @throws Exception
*/
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
/**
* 是一个回调函数,当在TimerService中注册的计时器(timers)被触发时,会回调该函数
* @param timestamp 触发计时器(timers)的工夫戳
* @param ctx OnTimerContext,容许拜访工夫戳,TimeDomain枚举类提供了两种工夫类型:
* EVENT_TIME与PROCESSING_TIME
* 并其能够获取一个工夫服务器(TimerService),用于注册计时器(timers)并查问工夫
* OnTimerContext只有在onTimer办法被调用期间无效
* @param out 后果输入
* @throws Exception
*/
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
/**
* 仅仅在processElement()办法或者onTimer办法被调用期间无效
*/
public abstract class Context {
/**
* 以后被解决元素的工夫戳,或者是触发计时器(timers)时的工夫戳
* 该值可能为null,比方当程序中设置的工夫语义为:TimeCharacteristic#ProcessingTime
* @return
*/
public abstract Long timestamp();
/**
* 拜访工夫和注册的计时器(timers)
* @return
*/
public abstract TimerService timerService();
/**
* 将元素输入到side output (侧输入)
* @param outputTag 侧输入的标记
* @param value 输入的记录
* @param <X>
*/
public abstract <X> void output(OutputTag<X> outputTag, X value);
/**
* 获取被解决元素的key
* @return
*/
public abstract K getCurrentKey();
}
/**
* 当onTimer办法被调用时,才能够应用OnTimerContext
*/
public abstract class OnTimerContext extends Context {
/**
* 触发计时器(timers)的工夫类型,包含两种:EVENT_TIME与PROCESSING_TIME
* @return
*/
public abstract TimeDomain timeDomain();
/**
* 获取触发计时器(timer)元素的key
* @return
*/
@Override
public abstract K getCurrentKey();
}
}
下面的源码中,次要有两个办法,剖析如下:
- processElement(I value, Context ctx, Collector<O> out)
该办法会对流中的每条记录都调用一次,输入0个或者多个元素,相似于FlatMap的性能,通过Collector将后果收回。除此之外,该函数有一个Context 参数,用户能够通过Context 拜访工夫戳、以后记录的key值以及TimerService(对于TimerService,上面会具体解释)。另外还能够应用output办法将数据发送到side output,实现分流或者解决早退数据的性能。
- onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
该办法是一个回调函数,当在TimerService中注册的计时器(timers)被触发时,会回调该函数。其中@param timestamp
参数示意触发计时器(timers)的工夫戳,Collector能够将记录收回。仔细的你可能会发现,这两个办法都有一个上下文参数,下面的办法传递的是Context 参数,onTimer办法传递的是OnTimerContext参数,这两个参数对象能够实现类似的性能。OnTimerContext还能够返回触发计时器的工夫域(EVENT_TIME与PROCESSING_TIME)。
TimerService
在KeyedProcessFunction源码中,应用TimerService来拜访工夫和计时器,上面来看一下源码:
@PublicEvolving
public interface TimerService {
String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";
String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";
// 返回以后的解决工夫
long currentProcessingTime();
// 返回以后event-time水位线(watermark)
long currentWatermark();
/**
* 注册一个计时器(timers),当processing time的工夫等于该计时器时钟时会被调用
* @param time
*/
void registerProcessingTimeTimer(long time);
/**
* 注册一个计时器(timers),当event time的水位线(watermark)达到该工夫时会被触发
* @param time
*/
void registerEventTimeTimer(long time);
/**
* 依据给定的触发工夫(trigger time)来删除processing-time计时器
* 如果这个timer不存在,那么该办法不会起作用,
* 即该计时器(timer)之前曾经被注册了,并且没有过期
*
* @param time
*/
void deleteProcessingTimeTimer(long time);
/**
* 依据给定的触发工夫(trigger time)来删除event-time 计时器
* 如果这个timer不存在,那么该办法不会起作用,
* 即该计时器(timer)之前曾经被注册了,并且没有过期
* @param time
*/
void deleteEventTimeTimer(long time);
}
TimerService提供了以下几种办法:
- currentProcessingTime()
返回以后的解决工夫
- currentWatermark()
返回以后event-time水位线(watermark)工夫戳
- registerProcessingTimeTimer(long time)
针对以后key,注册一个processing time计时器(timers),当processing time的工夫等于该计时器时钟时会被调用
- registerEventTimeTimer(long time)
针对以后key,注册一个event time计时器(timers),当水位线工夫戳大于等于该计时器时钟时会被调用
- deleteProcessingTimeTimer(long time)
针对以后key,删除一个之前注册过的processing time计时器(timers),如果这个timer不存在,那么该办法不会起作用
- deleteEventTimeTimer(long time)
针对以后key,删除一个之前注册过的event time计时器(timers),如果这个timer不存在,那么该办法不会起作用
当计时器触发时,会回调onTimer()函数,零碎对于ProcessElement()办法和onTimer()办法的调用是同步的
留神:下面的源码中有两个Error 信息,这就阐明计时器只能在keyed streams上应用,常见的用处是在某些key值不在应用后革除keyed state,或者实现一些基于工夫的自定义窗口逻辑。如果要在一个非KeyedStream上应用计时器,能够应用KeySelector返回一个固定的分区值(比方返回一个常数),这样所有的数据只会发送到一个分区。
应用案例
上面将应用Process Function的side output性能进行分流解决,具体代码如下:
public class ProcessFunctionExample {
// 定义side output标签
static final OutputTag<UserBehaviors> buyTags = new OutputTag<UserBehaviors>("buy") {
};
static final OutputTag<UserBehaviors> cartTags = new OutputTag<UserBehaviors>("cart") {
};
static final OutputTag<UserBehaviors> favTags = new OutputTag<UserBehaviors>("fav") {
};
static class SplitStreamFunction extends ProcessFunction<UserBehaviors, UserBehaviors> {
@Override
public void processElement(UserBehaviors value, Context ctx, Collector<UserBehaviors> out) throws Exception {
switch (value.behavior) {
case "buy":
ctx.output(buyTags, value);
break;
case "cart":
ctx.output(cartTags, value);
break;
case "fav":
ctx.output(favTags, value);
break;
default:
out.collect(value);
}
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
// 模仿数据源[userId,behavior,product]
SingleOutputStreamOperator<UserBehaviors> splitStream = env.fromElements(
new UserBehaviors(1L, "buy", "iphone"),
new UserBehaviors(1L, "cart", "huawei"),
new UserBehaviors(1L, "buy", "logi"),
new UserBehaviors(1L, "fav", "oppo"),
new UserBehaviors(2L, "buy", "huawei"),
new UserBehaviors(2L, "buy", "onemore"),
new UserBehaviors(2L, "fav", "iphone")).process(new SplitStreamFunction());
//获取分流之后购买行为的数据
splitStream.getSideOutput(buyTags).print("data_buy");
//获取分流之后加购行为的数据
splitStream.getSideOutput(cartTags).print("data_cart");
//获取分流之后珍藏行为的数据
splitStream.getSideOutput(favTags).print("data_fav");
env.execute("ProcessFunctionExample");
}
}
总结
本文首先介绍了Flink提供的几种底层Process Function API,这些API能够拜访工夫戳和水位线,同时反对注册一个计时器,进行调用回调函数onTimer()。接着从源码的角度解读了这些API的独特局部,具体解释了每个办法的具体含意和应用形式。最初,给出了一个Process Function常见应用场景案例,应用其实现分流解决。除此之外,用户还能够应用这些函数,通过注册计时器,在回调函数中定义解决逻辑,应用十分的灵便。
- 关注公众号:大数据技术与数仓
收费支付百G大数据资料
发表回复