共计 6688 个字符,预计需要花费 17 分钟才能阅读完成。
在 Flink 的工夫与 watermarks 详解这篇文章中,论述了 Flink 的工夫与水位线的相干内容。你可能不禁要提问,该如何拜访工夫戳和水位线呢?首先通过一般的 DataStream API 是无法访问的,须要借助 Flink 提供的一个底层的 API——Process Function。Process Function 不仅可能拜访工夫戳与水位线,而且还能够注册在未来的某个特定工夫触发的计时器(timers)。除此之外,还能够将数据通过 Side Outputs 发送到多个输入流中。这样以来,能够实现数据分流的性能,同时也是解决早退数据的一种形式。上面咱们将从源码动手,联合具体的应用案例来阐明该如何应用 Process Function。
<!– more –>
简介
Flink 提供了很多 Process Function,每种 Process Function 都有各自的性能,这些 Process Function 次要包含:
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- ProcessJoinFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
BaseBroadcastProcessFunction
- KeyedBroadcastProcessFunction
- BroadcastProcessFunction
继承关系图如下:
从下面的继承关系中能够看出,都实现了 RichFunction 接口,所以反对应用 open()
、close()
、getRuntimeContext()
等办法的调用。从名字上能够看出,这些函数都有不同的实用场景,然而根本的性能是相似的,上面会以 KeyedProcessFunction 为例来探讨这些函数的通用性能。
源码
KeyedProcessFunction
/**
* 解决 KeyedStream 流的低级 API 函数
* 对于输出流中的每个元素都会触发调用 processElement 办法. 该办法会产生 0 个或多个输入.
* 其实现类能够通过 Context 拜访数据的工夫戳和计时器 (timers). 当计时器(timers) 触发时,会回调 onTimer 办法.
* onTimer 办法会产生 0 个或者多个输入,并且会注册一个将来的计时器.
*
* 留神:如果要拜访 keyed state 和计时器(timers),必须在 KeyedStream 上应用 KeyedProcessFunction.
* 另外,KeyedProcessFunction 的父类 AbstractRichFunction 实现了 RichFunction 接口,所以,能够应用
* open(),close()及 getRuntimeContext()办法.
*
* @param <K> key 的类型
* @param <I> 输出元素的数据类型
* @param <O> 输入元素的数据类型
*/
@PublicEvolving
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
/**
* 解决输出流中的每个元素
* 该办法会输入 0 个或者多个输入,相似于 FlatMap 的性能
* 除此之外,该办法还能够更新外部状态或者设置计时器(timer)
* @param value 输出元素
* @param ctx Context,能够拜访输出元素的工夫戳,并其能够获取一个工夫服务器 (TimerService),用于注册计时器(timers) 并查问工夫
* Context 只有在 processElement 被调用期间无效.
* @param out 返回的后果值
* @throws Exception
*/
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
/**
* 是一个回调函数,当在 TimerService 中注册的计时器 (timers) 被触发时,会回调该函数
* @param timestamp 触发计时器 (timers) 的工夫戳
* @param ctx OnTimerContext,容许拜访工夫戳,TimeDomain 枚举类提供了两种工夫类型:* EVENT_TIME 与 PROCESSING_TIME
* 并其能够获取一个工夫服务器 (TimerService),用于注册计时器(timers) 并查问工夫
* OnTimerContext 只有在 onTimer 办法被调用期间无效
* @param out 后果输入
* @throws Exception
*/
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
/**
* 仅仅在 processElement()办法或者 onTimer 办法被调用期间无效
*/
public abstract class Context {
/**
* 以后被解决元素的工夫戳,或者是触发计时器 (timers) 时的工夫戳
* 该值可能为 null,比方当程序中设置的工夫语义为:TimeCharacteristic#ProcessingTime
* @return
*/
public abstract Long timestamp();
/**
* 拜访工夫和注册的计时器(timers)
* @return
*/
public abstract TimerService timerService();
/**
* 将元素输入到 side output (侧输入)
* @param outputTag 侧输入的标记
* @param value 输入的记录
* @param <X>
*/
public abstract <X> void output(OutputTag<X> outputTag, X value);
/**
* 获取被解决元素的 key
* @return
*/
public abstract K getCurrentKey();}
/**
* 当 onTimer 办法被调用时,才能够应用 OnTimerContext
*/
public abstract class OnTimerContext extends Context {
/**
* 触发计时器 (timers) 的工夫类型,包含两种:EVENT_TIME 与 PROCESSING_TIME
* @return
*/
public abstract TimeDomain timeDomain();
/**
* 获取触发计时器 (timer) 元素的 key
* @return
*/
@Override
public abstract K getCurrentKey();}
}
下面的源码中,次要有两个办法,剖析如下:
- processElement(I value, Context ctx, Collector<O> out)
该办法会对流中的每条记录都调用一次,输入 0 个或者多个元素,相似于 FlatMap 的性能,通过 Collector 将后果收回。除此之外,该函数有一个 Context 参数,用户能够通过 Context 拜访工夫戳、以后记录的 key 值以及 TimerService(对于 TimerService,上面会具体解释)。另外还能够应用 output 办法将数据发送到 side output,实现分流或者解决早退数据的性能。
- onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
该办法是一个回调函数,当在 TimerService 中注册的计时器 (timers) 被触发时,会回调该函数。其中 @param timestamp
参数示意触发计时器 (timers) 的工夫戳,Collector 能够将记录收回。仔细的你可能会发现,这两个办法都有一个上下文参数,下面的办法传递的是 Context 参数,onTimer 办法传递的是 OnTimerContext 参数,这两个参数对象能够实现类似的性能。OnTimerContext 还能够返回触发计时器的工夫域(EVENT_TIME 与 PROCESSING_TIME)。
TimerService
在 KeyedProcessFunction 源码中,应用 TimerService 来拜访工夫和计时器,上面来看一下源码:
@PublicEvolving
public interface TimerService {
String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";
String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";
// 返回以后的解决工夫
long currentProcessingTime();
// 返回以后 event-time 水位线(watermark)
long currentWatermark();
/**
* 注册一个计时器(timers),当 processing time 的工夫等于该计时器时钟时会被调用
* @param time
*/
void registerProcessingTimeTimer(long time);
/**
* 注册一个计时器 (timers), 当 event time 的水位线(watermark) 达到该工夫时会被触发
* @param time
*/
void registerEventTimeTimer(long time);
/**
* 依据给定的触发工夫 (trigger time) 来删除 processing-time 计时器
* 如果这个 timer 不存在,那么该办法不会起作用,* 即该计时器 (timer) 之前曾经被注册了,并且没有过期
*
* @param time
*/
void deleteProcessingTimeTimer(long time);
/**
* 依据给定的触发工夫 (trigger time) 来删除 event-time 计时器
* 如果这个 timer 不存在,那么该办法不会起作用,* 即该计时器 (timer) 之前曾经被注册了,并且没有过期
* @param time
*/
void deleteEventTimeTimer(long time);
}
TimerService 提供了以下几种办法:
- currentProcessingTime()
返回以后的解决工夫
- currentWatermark()
返回以后 event-time 水位线 (watermark) 工夫戳
- registerProcessingTimeTimer(long time)
针对以后 key,注册一个 processing time 计时器(timers),当 processing time 的工夫等于该计时器时钟时会被调用
- registerEventTimeTimer(long time)
针对以后 key,注册一个 event time 计时器(timers),当水位线工夫戳大于等于该计时器时钟时会被调用
- deleteProcessingTimeTimer(long time)
针对以后 key,删除一个之前注册过的 processing time 计时器(timers),如果这个 timer 不存在,那么该办法不会起作用
- deleteEventTimeTimer(long time)
针对以后 key,删除一个之前注册过的 event time 计时器(timers),如果这个 timer 不存在,那么该办法不会起作用
当计时器触发时,会回调 onTimer()函数,零碎对于 ProcessElement()办法和 onTimer()办法的调用是同步的
留神: 下面的源码中有两个 Error 信息, 这就阐明计时器只能在 keyed streams 上应用,常见的用处是在某些 key 值不在应用后革除 keyed state,或者实现一些基于工夫的自定义窗口逻辑。如果要在一个非 KeyedStream 上应用计时器,能够应用 KeySelector 返回一个固定的分区值(比方返回一个常数),这样所有的数据只会发送到一个分区。
应用案例
上面将应用 Process Function 的 side output 性能进行分流解决,具体代码如下:
public class ProcessFunctionExample {
// 定义 side output 标签
static final OutputTag<UserBehaviors> buyTags = new OutputTag<UserBehaviors>("buy") { };
static final OutputTag<UserBehaviors> cartTags = new OutputTag<UserBehaviors>("cart") { };
static final OutputTag<UserBehaviors> favTags = new OutputTag<UserBehaviors>("fav") { };
static class SplitStreamFunction extends ProcessFunction<UserBehaviors, UserBehaviors> {
@Override
public void processElement(UserBehaviors value, Context ctx, Collector<UserBehaviors> out) throws Exception {switch (value.behavior) {
case "buy":
ctx.output(buyTags, value);
break;
case "cart":
ctx.output(cartTags, value);
break;
case "fav":
ctx.output(favTags, value);
break;
default:
out.collect(value);
}
}
}
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
// 模仿数据源[userId,behavior,product]
SingleOutputStreamOperator<UserBehaviors> splitStream = env.fromElements(new UserBehaviors(1L, "buy", "iphone"),
new UserBehaviors(1L, "cart", "huawei"),
new UserBehaviors(1L, "buy", "logi"),
new UserBehaviors(1L, "fav", "oppo"),
new UserBehaviors(2L, "buy", "huawei"),
new UserBehaviors(2L, "buy", "onemore"),
new UserBehaviors(2L, "fav", "iphone")).process(new SplitStreamFunction());
// 获取分流之后购买行为的数据
splitStream.getSideOutput(buyTags).print("data_buy");
// 获取分流之后加购行为的数据
splitStream.getSideOutput(cartTags).print("data_cart");
// 获取分流之后珍藏行为的数据
splitStream.getSideOutput(favTags).print("data_fav");
env.execute("ProcessFunctionExample");
}
}
总结
本文首先介绍了 Flink 提供的几种底层 Process Function API,这些 API 能够拜访工夫戳和水位线,同时反对注册一个计时器,进行调用回调函数 onTimer()。接着从源码的角度解读了这些 API 的独特局部,具体解释了每个办法的具体含意和应用形式。最初,给出了一个 Process Function 常见应用场景案例,应用其实现分流解决。除此之外,用户还能够应用这些函数,通过注册计时器,在回调函数中定义解决逻辑,应用十分的灵便。
- 关注公众号:大数据技术与数仓
收费支付百 G 大数据资料