在Flink的工夫与watermarks详解这篇文章中,论述了Flink的工夫与水位线的相干内容。你可能不禁要提问,该如何拜访工夫戳和水位线呢?首先通过一般的DataStream API是无法访问的,须要借助Flink提供的一个底层的API——Process Function。Process Function不仅可能拜访工夫戳与水位线,而且还能够注册在未来的某个特定工夫触发的计时器(timers)。除此之外,还能够将数据通过Side Outputs发送到多个输入流中。这样以来,能够实现数据分流的性能,同时也是解决早退数据的一种形式。上面咱们将从源码动手,联合具体的应用案例来阐明该如何应用Process Function。
<!-- more -->
简介
Flink提供了很多Process Function,每种Process Function都有各自的性能,这些Process Function次要包含:
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- ProcessJoinFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
BaseBroadcastProcessFunction
- KeyedBroadcastProcessFunction
- BroadcastProcessFunction
继承关系图如下:
从下面的继承关系中能够看出,都实现了RichFunction接口,所以反对应用open()
、close()
、getRuntimeContext()
等办法的调用。从名字上能够看出,这些函数都有不同的实用场景,然而根本的性能是相似的,上面会以KeyedProcessFunction为例来探讨这些函数的通用性能。
源码
KeyedProcessFunction
/** * 解决KeyedStream流的低级API函数 * 对于输出流中的每个元素都会触发调用processElement办法.该办法会产生0个或多个输入. * 其实现类能够通过Context拜访数据的工夫戳和计时器(timers).当计时器(timers)触发时,会回调onTimer办法. * onTimer办法会产生0个或者多个输入,并且会注册一个将来的计时器. * * 留神:如果要拜访keyed state和计时器(timers),必须在KeyedStream上应用KeyedProcessFunction. * 另外,KeyedProcessFunction的父类AbstractRichFunction实现了RichFunction接口,所以,能够应用 * open(),close()及getRuntimeContext()办法. * * @param <K> key的类型 * @param <I> 输出元素的数据类型 * @param <O> 输入元素的数据类型 */@PublicEvolvingpublic abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction { private static final long serialVersionUID = 1L; /** * 解决输出流中的每个元素 * 该办法会输入0个或者多个输入,相似于FlatMap的性能 * 除此之外,该办法还能够更新外部状态或者设置计时器(timer) * @param value 输出元素 * @param ctx Context,能够拜访输出元素的工夫戳,并其能够获取一个工夫服务器(TimerService),用于注册计时器(timers)并查问工夫 * Context只有在processElement被调用期间无效. * @param out 返回的后果值 * @throws Exception */ public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception; /** * 是一个回调函数,当在TimerService中注册的计时器(timers)被触发时,会回调该函数 * @param timestamp 触发计时器(timers)的工夫戳 * @param ctx OnTimerContext,容许拜访工夫戳,TimeDomain枚举类提供了两种工夫类型: * EVENT_TIME与PROCESSING_TIME * 并其能够获取一个工夫服务器(TimerService),用于注册计时器(timers)并查问工夫 * OnTimerContext只有在onTimer办法被调用期间无效 * @param out 后果输入 * @throws Exception */ public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {} /** * 仅仅在processElement()办法或者onTimer办法被调用期间无效 */ public abstract class Context { /** * 以后被解决元素的工夫戳,或者是触发计时器(timers)时的工夫戳 * 该值可能为null,比方当程序中设置的工夫语义为:TimeCharacteristic#ProcessingTime * @return */ public abstract Long timestamp(); /** * 拜访工夫和注册的计时器(timers) * @return */ public abstract TimerService timerService(); /** * 将元素输入到side output (侧输入) * @param outputTag 侧输入的标记 * @param value 输入的记录 * @param <X> */ public abstract <X> void output(OutputTag<X> outputTag, X value); /** * 获取被解决元素的key * @return */ public abstract K getCurrentKey(); } /** * 当onTimer办法被调用时,才能够应用OnTimerContext */ public abstract class OnTimerContext extends Context { /** * 触发计时器(timers)的工夫类型,包含两种:EVENT_TIME与PROCESSING_TIME * @return */ public abstract TimeDomain timeDomain(); /** * 获取触发计时器(timer)元素的key * @return */ @Override public abstract K getCurrentKey(); }}
下面的源码中,次要有两个办法,剖析如下:
- processElement(I value, Context ctx, Collector<O> out)
该办法会对流中的每条记录都调用一次,输入0个或者多个元素,相似于FlatMap的性能,通过Collector将后果收回。除此之外,该函数有一个Context 参数,用户能够通过Context 拜访工夫戳、以后记录的key值以及TimerService(对于TimerService,上面会具体解释)。另外还能够应用output办法将数据发送到side output,实现分流或者解决早退数据的性能。
- onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
该办法是一个回调函数,当在TimerService中注册的计时器(timers)被触发时,会回调该函数。其中@param timestamp
参数示意触发计时器(timers)的工夫戳,Collector能够将记录收回。仔细的你可能会发现,这两个办法都有一个上下文参数,下面的办法传递的是Context 参数,onTimer办法传递的是OnTimerContext参数,这两个参数对象能够实现类似的性能。OnTimerContext还能够返回触发计时器的工夫域(EVENT_TIME与PROCESSING_TIME)。
TimerService
在KeyedProcessFunction源码中,应用TimerService来拜访工夫和计时器,上面来看一下源码:
@PublicEvolvingpublic interface TimerService { String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams."; String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams."; // 返回以后的解决工夫 long currentProcessingTime(); // 返回以后event-time水位线(watermark) long currentWatermark(); /** * 注册一个计时器(timers),当processing time的工夫等于该计时器时钟时会被调用 * @param time */ void registerProcessingTimeTimer(long time); /** * 注册一个计时器(timers),当event time的水位线(watermark)达到该工夫时会被触发 * @param time */ void registerEventTimeTimer(long time); /** * 依据给定的触发工夫(trigger time)来删除processing-time计时器 * 如果这个timer不存在,那么该办法不会起作用, * 即该计时器(timer)之前曾经被注册了,并且没有过期 * * @param time */ void deleteProcessingTimeTimer(long time); /** * 依据给定的触发工夫(trigger time)来删除event-time 计时器 * 如果这个timer不存在,那么该办法不会起作用, * 即该计时器(timer)之前曾经被注册了,并且没有过期 * @param time */ void deleteEventTimeTimer(long time);}
TimerService提供了以下几种办法:
- currentProcessingTime()
返回以后的解决工夫
- currentWatermark()
返回以后event-time水位线(watermark)工夫戳
- registerProcessingTimeTimer(long time)
针对以后key,注册一个processing time计时器(timers),当processing time的工夫等于该计时器时钟时会被调用
- registerEventTimeTimer(long time)
针对以后key,注册一个event time计时器(timers),当水位线工夫戳大于等于该计时器时钟时会被调用
- deleteProcessingTimeTimer(long time)
针对以后key,删除一个之前注册过的processing time计时器(timers),如果这个timer不存在,那么该办法不会起作用
- deleteEventTimeTimer(long time)
针对以后key,删除一个之前注册过的event time计时器(timers),如果这个timer不存在,那么该办法不会起作用
当计时器触发时,会回调onTimer()函数,零碎对于ProcessElement()办法和onTimer()办法的调用是同步的
留神:下面的源码中有两个Error 信息,这就阐明计时器只能在keyed streams上应用,常见的用处是在某些key值不在应用后革除keyed state,或者实现一些基于工夫的自定义窗口逻辑。如果要在一个非KeyedStream上应用计时器,能够应用KeySelector返回一个固定的分区值(比方返回一个常数),这样所有的数据只会发送到一个分区。
应用案例
上面将应用Process Function的side output性能进行分流解决,具体代码如下:
public class ProcessFunctionExample { // 定义side output标签 static final OutputTag<UserBehaviors> buyTags = new OutputTag<UserBehaviors>("buy") { }; static final OutputTag<UserBehaviors> cartTags = new OutputTag<UserBehaviors>("cart") { }; static final OutputTag<UserBehaviors> favTags = new OutputTag<UserBehaviors>("fav") { }; static class SplitStreamFunction extends ProcessFunction<UserBehaviors, UserBehaviors> { @Override public void processElement(UserBehaviors value, Context ctx, Collector<UserBehaviors> out) throws Exception { switch (value.behavior) { case "buy": ctx.output(buyTags, value); break; case "cart": ctx.output(cartTags, value); break; case "fav": ctx.output(favTags, value); break; default: out.collect(value); } } } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); // 模仿数据源[userId,behavior,product] SingleOutputStreamOperator<UserBehaviors> splitStream = env.fromElements( new UserBehaviors(1L, "buy", "iphone"), new UserBehaviors(1L, "cart", "huawei"), new UserBehaviors(1L, "buy", "logi"), new UserBehaviors(1L, "fav", "oppo"), new UserBehaviors(2L, "buy", "huawei"), new UserBehaviors(2L, "buy", "onemore"), new UserBehaviors(2L, "fav", "iphone")).process(new SplitStreamFunction()); //获取分流之后购买行为的数据 splitStream.getSideOutput(buyTags).print("data_buy"); //获取分流之后加购行为的数据 splitStream.getSideOutput(cartTags).print("data_cart"); //获取分流之后珍藏行为的数据 splitStream.getSideOutput(favTags).print("data_fav"); env.execute("ProcessFunctionExample"); }}
总结
本文首先介绍了Flink提供的几种底层Process Function API,这些API能够拜访工夫戳和水位线,同时反对注册一个计时器,进行调用回调函数onTimer()。接着从源码的角度解读了这些API的独特局部,具体解释了每个办法的具体含意和应用形式。最初,给出了一个Process Function常见应用场景案例,应用其实现分流解决。除此之外,用户还能够应用这些函数,通过注册计时器,在回调函数中定义解决逻辑,应用十分的灵便。
- 关注公众号:大数据技术与数仓
收费支付百G大数据资料