关于flink:Flink-DataStream-API-中的多面手Process-Function详解

9次阅读

共计 6688 个字符,预计需要花费 17 分钟才能阅读完成。

在 Flink 的工夫与 watermarks 详解这篇文章中,论述了 Flink 的工夫与水位线的相干内容。你可能不禁要提问,该如何拜访工夫戳和水位线呢?首先通过一般的 DataStream API 是无法访问的,须要借助 Flink 提供的一个底层的 API——Process Function。Process Function 不仅可能拜访工夫戳与水位线,而且还能够注册在未来的某个特定工夫触发的计时器(timers)。除此之外,还能够将数据通过 Side Outputs 发送到多个输入流中。这样以来,能够实现数据分流的性能,同时也是解决早退数据的一种形式。上面咱们将从源码动手,联合具体的应用案例来阐明该如何应用 Process Function。

<!– more –>

简介

Flink 提供了很多 Process Function,每种 Process Function 都有各自的性能,这些 Process Function 次要包含:

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • ProcessJoinFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction
  • BaseBroadcastProcessFunction

    • KeyedBroadcastProcessFunction
  • BroadcastProcessFunction

继承关系图如下:

从下面的继承关系中能够看出,都实现了 RichFunction 接口,所以反对应用 open()close()getRuntimeContext() 等办法的调用。从名字上能够看出,这些函数都有不同的实用场景,然而根本的性能是相似的,上面会以 KeyedProcessFunction 为例来探讨这些函数的通用性能。

源码

KeyedProcessFunction

/**
 * 解决 KeyedStream 流的低级 API 函数
 * 对于输出流中的每个元素都会触发调用 processElement 办法. 该办法会产生 0 个或多个输入.
 * 其实现类能够通过 Context 拜访数据的工夫戳和计时器 (timers). 当计时器(timers) 触发时,会回调 onTimer 办法.
 * onTimer 办法会产生 0 个或者多个输入,并且会注册一个将来的计时器.
 *
 * 留神:如果要拜访 keyed state 和计时器(timers),必须在 KeyedStream 上应用 KeyedProcessFunction.
 * 另外,KeyedProcessFunction 的父类 AbstractRichFunction 实现了 RichFunction 接口,所以,能够应用
 * open(),close()及 getRuntimeContext()办法.
 *
 * @param <K> key 的类型
 * @param <I> 输出元素的数据类型
 * @param <O> 输入元素的数据类型
 */
@PublicEvolving
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
​
 private static final long serialVersionUID = 1L;
 /**
 * 解决输出流中的每个元素
 * 该办法会输入 0 个或者多个输入,相似于 FlatMap 的性能
 * 除此之外,该办法还能够更新外部状态或者设置计时器(timer)
 * @param value 输出元素
 * @param ctx  Context,能够拜访输出元素的工夫戳,并其能够获取一个工夫服务器 (TimerService),用于注册计时器(timers) 并查问工夫
 *  Context 只有在 processElement 被调用期间无效.
 * @param out  返回的后果值
 * @throws Exception
 */
 public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
​
 /**
 * 是一个回调函数,当在 TimerService 中注册的计时器 (timers) 被触发时,会回调该函数
 * @param timestamp 触发计时器 (timers) 的工夫戳
 * @param ctx  OnTimerContext,容许拜访工夫戳,TimeDomain 枚举类提供了两种工夫类型:* EVENT_TIME 与 PROCESSING_TIME
 * 并其能够获取一个工夫服务器 (TimerService),用于注册计时器(timers) 并查问工夫
 * OnTimerContext 只有在 onTimer 办法被调用期间无效
 * @param out 后果输入
 * @throws Exception
 */
 public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
 /**
 * 仅仅在 processElement()办法或者 onTimer 办法被调用期间无效
 */
 public abstract class Context {
​
 /**
 * 以后被解决元素的工夫戳,或者是触发计时器 (timers) 时的工夫戳
 * 该值可能为 null,比方当程序中设置的工夫语义为:TimeCharacteristic#ProcessingTime
 * @return
 */
 public abstract Long timestamp();
​
 /**
 * 拜访工夫和注册的计时器(timers)
 * @return
 */
 public abstract TimerService timerService();
​
 /**
 * 将元素输入到 side output (侧输入)
 * @param outputTag 侧输入的标记
 * @param value 输入的记录
 * @param <X>
 */
 public abstract <X> void output(OutputTag<X> outputTag, X value);
 /**
 * 获取被解决元素的 key
 * @return
 */
 public abstract K getCurrentKey();}
 /**
 * 当 onTimer 办法被调用时,才能够应用 OnTimerContext
 */
 public abstract class OnTimerContext extends Context {
 /**
 * 触发计时器 (timers) 的工夫类型,包含两种:EVENT_TIME 与 PROCESSING_TIME
 * @return
 */
 public abstract TimeDomain timeDomain();
 /**
 * 获取触发计时器 (timer) 元素的 key
 * @return
 */
 @Override
 public abstract K getCurrentKey();}
}

下面的源码中,次要有两个办法,剖析如下:

  • processElement(I value, Context ctx, Collector<O> out)

该办法会对流中的每条记录都调用一次,输入 0 个或者多个元素,相似于 FlatMap 的性能,通过 Collector 将后果收回。除此之外,该函数有一个 Context 参数,用户能够通过 Context 拜访工夫戳、以后记录的 key 值以及 TimerService(对于 TimerService,上面会具体解释)。另外还能够应用 output 办法将数据发送到 side output,实现分流或者解决早退数据的性能。

  • onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)

该办法是一个回调函数,当在 TimerService 中注册的计时器 (timers) 被触发时,会回调该函数。其中 @param timestamp 参数示意触发计时器 (timers) 的工夫戳,Collector 能够将记录收回。仔细的你可能会发现,这两个办法都有一个上下文参数,下面的办法传递的是 Context 参数,onTimer 办法传递的是 OnTimerContext 参数,这两个参数对象能够实现类似的性能。OnTimerContext 还能够返回触发计时器的工夫域(EVENT_TIME 与 PROCESSING_TIME)。

TimerService

在 KeyedProcessFunction 源码中,应用 TimerService 来拜访工夫和计时器,上面来看一下源码:

@PublicEvolving
public interface TimerService {
 String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";
 String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";
 // 返回以后的解决工夫
 long currentProcessingTime();
 // 返回以后 event-time 水位线(watermark)
 long currentWatermark();
​
 /**
 * 注册一个计时器(timers),当 processing time 的工夫等于该计时器时钟时会被调用
 * @param time
 */
 void registerProcessingTimeTimer(long time);
​
 /**
 * 注册一个计时器 (timers), 当 event time 的水位线(watermark) 达到该工夫时会被触发
 * @param time
 */
 void registerEventTimeTimer(long time);
​
 /**
 * 依据给定的触发工夫 (trigger time) 来删除 processing-time 计时器
 * 如果这个 timer 不存在,那么该办法不会起作用,* 即该计时器 (timer) 之前曾经被注册了,并且没有过期
 *
 * @param time
 */
 void deleteProcessingTimeTimer(long time);

 /**
 * 依据给定的触发工夫 (trigger time) 来删除 event-time 计时器
 * 如果这个 timer 不存在,那么该办法不会起作用,*  即该计时器 (timer) 之前曾经被注册了,并且没有过期
 * @param time
 */
 void deleteEventTimeTimer(long time);
}

TimerService 提供了以下几种办法:

  • currentProcessingTime()

返回以后的解决工夫

  • currentWatermark()

返回以后 event-time 水位线 (watermark) 工夫戳

  • registerProcessingTimeTimer(long time)

针对以后 key,注册一个 processing time 计时器(timers),当 processing time 的工夫等于该计时器时钟时会被调用

  • registerEventTimeTimer(long time)

针对以后 key,注册一个 event time 计时器(timers),当水位线工夫戳大于等于该计时器时钟时会被调用

  • deleteProcessingTimeTimer(long time)

针对以后 key,删除一个之前注册过的 processing time 计时器(timers),如果这个 timer 不存在,那么该办法不会起作用

  • deleteEventTimeTimer(long time)

针对以后 key,删除一个之前注册过的 event time 计时器(timers),如果这个 timer 不存在,那么该办法不会起作用

当计时器触发时,会回调 onTimer()函数,零碎对于 ProcessElement()办法和 onTimer()办法的调用是同步的

留神: 下面的源码中有两个 Error 信息, 这就阐明计时器只能在 keyed streams 上应用,常见的用处是在某些 key 值不在应用后革除 keyed state,或者实现一些基于工夫的自定义窗口逻辑。如果要在一个非 KeyedStream 上应用计时器,能够应用 KeySelector 返回一个固定的分区值(比方返回一个常数),这样所有的数据只会发送到一个分区。

应用案例

上面将应用 Process Function 的 side output 性能进行分流解决,具体代码如下:

public class ProcessFunctionExample {
​
 // 定义 side output 标签
 static final OutputTag<UserBehaviors> buyTags = new OutputTag<UserBehaviors>("buy") { };
 static final OutputTag<UserBehaviors> cartTags = new OutputTag<UserBehaviors>("cart") { };
 static final OutputTag<UserBehaviors> favTags = new OutputTag<UserBehaviors>("fav") { };
 static class SplitStreamFunction extends ProcessFunction<UserBehaviors, UserBehaviors> {
​
 @Override
 public void processElement(UserBehaviors value, Context ctx, Collector<UserBehaviors> out) throws Exception {switch (value.behavior) {
 case "buy":
 ctx.output(buyTags, value);
 break;
 case "cart":
 ctx.output(cartTags, value);
 break;
 case "fav":
 ctx.output(favTags, value);
 break;
 default:
 out.collect(value);
 }
 }
 }
 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
​
 // 模仿数据源[userId,behavior,product]
 SingleOutputStreamOperator<UserBehaviors> splitStream = env.fromElements(new UserBehaviors(1L, "buy", "iphone"),
 new UserBehaviors(1L, "cart", "huawei"),
 new UserBehaviors(1L, "buy", "logi"),
 new UserBehaviors(1L, "fav", "oppo"),
 new UserBehaviors(2L, "buy", "huawei"),
 new UserBehaviors(2L, "buy", "onemore"),
 new UserBehaviors(2L, "fav", "iphone")).process(new SplitStreamFunction());
​
 // 获取分流之后购买行为的数据
 splitStream.getSideOutput(buyTags).print("data_buy");
 // 获取分流之后加购行为的数据
 splitStream.getSideOutput(cartTags).print("data_cart");
 // 获取分流之后珍藏行为的数据
 splitStream.getSideOutput(favTags).print("data_fav");
​
 env.execute("ProcessFunctionExample");
 }
}

总结

本文首先介绍了 Flink 提供的几种底层 Process Function API,这些 API 能够拜访工夫戳和水位线,同时反对注册一个计时器,进行调用回调函数 onTimer()。接着从源码的角度解读了这些 API 的独特局部,具体解释了每个办法的具体含意和应用形式。最初,给出了一个 Process Function 常见应用场景案例,应用其实现分流解决。除此之外,用户还能够应用这些函数,通过注册计时器,在回调函数中定义解决逻辑,应用十分的灵便。

  • 关注公众号:大数据技术与数仓

收费支付百 G 大数据资料

正文完
 0