共计 8971 个字符,预计需要花费 23 分钟才能阅读完成。
大家好,我是大圣。
最近在整顿 Flink 方面的知识点,明天更新一遍 Flink 的根底的知识点,让大家对 Flink 的 工夫、窗口、水位线 有一个根本的意识,而后下一篇文章会从源码的角度来解析 Flink 在 工夫、窗口、水位线 底层是怎么实现的。
话不多说,间接上明天的纲要主题:
图片
工夫
Flink 中 定义了 3 种 工夫类型:事件工夫(Event Time)、解决工夫(Processing Time)和摄取工夫(Ingestion Time)。
上面画张图来解释一下 3 种 工夫类型:
(1)事件工夫
事件工夫指的是这条数据产生的工夫,比方我用手机在某团上 2022-01-01:13:14 这个一时刻下单点了一个外卖,那么下单点外卖的这个工夫 2022-01-01:13:14 就是我点外卖这个事件产生的工夫,这个工夫确定之后就不会扭转。
还例如,咱们用 Flink 生产 Kafka 外面的日志数据的时候,每条日志外面所蕴含的工夫字段 的工夫戳就是事件工夫。通过事件工夫能够实现工夫旅行。
应用事件工夫的益处是不依赖服务器的时钟,这一批数据无论你重复执行多少次,失去的后果都是一样的。然而咱们应用事件工夫的时候,须要从每一条日志数据外面把含有工夫戳字段的提取进去。
(2)解决工夫
解决工夫是指数据被计算引擎(Flink)解决的工夫,这个工夫以你 Flink 程序在哪台服务器执行,就以那台服务器的时钟为准。
应用解决工夫依赖于操作系统的时钟,然而每台机器的时钟是变动的。比方我开了 2s 的解决工夫的窗口去解决数据,然而可能每台服务器的解决数据的能力不一样,可能第一次跑这一批数据的时候,2s 的工夫解决了 100 条数据,第二次跑这一批数据的时候,2s 的工夫解决了 500 条数据。
这样就会导致两次计算的后果不一样,就是可能是同一台机器也会呈现这种状况,因为 解决能力可能受过后 CPU 的影响,在 2s 内解决的数据条数不一样。然而解决工夫计算逻辑简略,提早性低性能好于事件工夫。
(3)摄取工夫
摄取工夫是指咱们的数据进入 Flink source 算子的工夫,摄取工夫个别用的比拟少,它也会代带来解决数据的后果不正确的状况。
在代码层面怎么应用这三个工夫:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 应用事件工夫
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 应用解决工夫
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 应用摄取工夫
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
窗口
批处理实质上是解决无限不变的数据集,流解决的实质是解决有限继续产生的数据集,所以批实质上来说是流的一种特例,那么窗口就是流和批对立的桥梁,对流上的数据进行窗口切分。
每一个窗口一旦到了要计算的时候,就能够被看成一个不可变的数据集,在触发计算之前,窗口外面的数据可能会继续的扭转,因而对窗口的数据进行计算就是批处理。
Flink 中窗口依照是否并发执行,分为 Keyed Window 和 Non-Keyed Window,它们次要的区别是有没有 keyBy 操作。
Keyed Window 能够依照指定的分区形式并发执行,所有雷同的键会被调配到雷同的工作上执行。Non-Keyed Window 会把所有数据放到一个工作上执行,并发度为 1。上面是窗口相干的 API。
- Keyed Wind
- Keyed Wind
stream
.keyBy(...) | |
.window(...) // 承受 WindowAssigner 参数,用来调配窗口 | |
[.trigger(...)] // 可选的,承受 Trigger 类型参数,用来触发窗口 | |
[.evictor(...)] // 可选的,承受 Evictor 类型参数,用来驱赶窗口中的数据 | |
// 可选的,承受 Time 类型参数,示意窗口容许的最大提早,超过该提早,数据会被抛弃 | |
[.allowedLateness(...)] | |
// 可选的,承受 OutputTag 类型参数,用来定义摈弃数据的输入 | |
[.sideOutputLateData(...)] | |
.reduce/aggregate/apply() // 窗口函数 | |
[.getSideOutput(...)] // 可选的,获取指定的 DataStream | |
- Non-Keyed Windows
stream
.windowAll(...) // 承受 WindowAssigner 参数,用来调配窗口 | |
[.trigger(...)] // 可选的,承受 Trigger 类型参数,用来触发窗口 | |
[.evictor(...)] // 可选的,承受 Evictor 类型参数,用来驱赶窗口中的数据 | |
// 可选的,承受 Time 类型参数,示意窗口容许的最大提早,超过该提早,数据会被抛弃 | |
[.allowedLateness(...)] | |
// 可选的,承受 OutputTag 类型参数,用来定义摈弃数据的输入 | |
[.sideOutputLateData(...)] | |
.reduce/aggregate/apply() // 窗口函数 | |
[.getSideOutput(...)] // 可选的,获取指定的 DataStream | |
- Non-Keyed Windows
stream
.windowAll(...) // 承受 WindowAssigner 参数,用来调配窗口 | |
[.trigger(...)] // 可选的,承受 Trigger 类型参数,用来触发窗口 | |
[.evictor(...)] // 可选的,承受 Evictor 类型参数,用来驱赶窗口中的数据 | |
// 可选的,承受 Time 类型参数,示意窗口容许的最大提早,超过该提早,数据会被抛弃 | |
[.allowedLateness(...)] | |
// 可选的,承受 OutputTag 类型参数,用来定义摈弃数据的输入 | |
[.sideOutputLateData(...)] | |
.reduce/aggregate/apply() // 窗口函数 | |
[.getSideOutput(...)] // 可选的,获取指定的 DataStream | |
上面咱们来看看下面提到的几个概念。
WindowAssiner:窗口分配器。咱们罕用的滚动窗口、滑动窗口、会话窗口等就是由 WindowAssiner 决定的,比方 TumblingEventTimeWindows 能够基于事件工夫的滚动窗口。
Trigger:触发器。Flink 依据 WindowAssigner 把数据调配到不同的窗口,还须要晓得什么时候触发窗口,Trigger 就是用来判断什么时候触发窗口的计算。Trigger 类中定义了一些返回值类型,依据返回值类型来决定是否触发窗口的计算。
Evictor:驱赶器。在窗口触发之后,在调用用户本人写的窗口函数之前或者之后,Flink 容许咱们定制要解决的数据汇合,Evictor 就是用来驱赶或者过滤不须要的数据集的。
Allowed Lateness:最大容许提早。次要用在基于事件工夫的窗口,示意水位线触发窗口计算之后还容许数据早退多久,在最长容许的延迟时间内,窗口是不会销毁的。
Window Function:窗口函数。用户代码执行函数,就是用户本人写的业务逻辑,用来做真正计算的。
Side Output:抛弃数据的汇合。通过 getSideOutput 办法能够获取抛弃的数据,而后用户本人灵便去解决抛弃的数据。
上面咱们来看看 Flink 外面的 计数窗口,工夫窗口,会话窗口
(1)计数窗口
这个用到的不多,这里就不做介绍了。
(2)工夫窗口
Tumble Time Window(滚动窗口)
图片
示意在工夫上依照当时约定的窗口大小(就是你在代码外面指定的窗口大小)进行窗口的切分,窗口之间不会互相重叠。
基于工夫的 滚动窗口 又分为 基于 解决工夫的 滚动窗口 和 基于事件工夫的 滚动窗口。具体如下:
解决工夫的 滚动窗口:
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
事件工夫的 滚动窗口:
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
Sliding Time Window(滑动窗口)
图片
与滚动窗口相似,滑动窗口的 assigner 散发元素到指定大小的窗口,窗口大小通过 window size 参数设置。滑动窗口须要一个额定的滑动间隔(window slide)参数来管制生成新窗口的频率。因而,如果 slide 小于窗口大小,滑动窗口能够容许窗口重叠。这种状况下,一个元素可能会被散发到多个窗口。
比如说,你设置了大小为 10 分钟,滑动间隔 5 分钟的窗口,你会在每 5 分钟失去一个新的窗口,外面蕴含之前 10 分钟达到的数据(如上图所示)。
基于工夫的 滑动窗口 又分为 基于 解决工夫的 滑动窗口 和 基于事件工夫的 滑动窗口。具体如下:
解决工夫的 滑动窗口:
.window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(5)))
事件工夫的 滑动窗口:
.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)));
(4)会话窗口
Session Window 是一种非凡的窗口,当超过一段时间,该窗口没有收到新的数据元素,则视为这个窗口就完结了,所有无奈当时确认窗口的长度等。
会话窗口平时工作中,我用到的也比拟少,所以这外面不做详解。
水位线
水位线(Watermark)用于解决乱序事件,而正确的解决乱序事件,通常用 Watermark 机制联合窗口来实现。
从流解决原始设施产生事件,到 Flink 读取到数据,再到 Flink 多个算子解决数据,在这个过程中,会受到网络提早、数据乱序、背压、Failover 等多种状况的影响,导致数据是乱序的。
尽管大部分状况下是没有问题,然而不得不在设计上思考此类异常情况,为了保障计算结果的正确性,须要期待数据,这带来了计算的提早。对于提早太久的数据,不能有限的等上来,所以必须有一个机制,来保障特定的工夫后肯定会触发窗口的来进行计算,这个触发机制就是水位线(Watermark)。
上面咱们来看一组代码,怎么去应用 Watermark,并且怎么去触发窗口计算:
public class EventTimeExample {
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.setParallelism(1); | |
env | |
.socketTextStream("192.168.1.141", 9998, '\n') | |
.map(new MapFunction<String, TumblingBean>() { | |
@Override | |
public TumblingBean map(String value) throws Exception {String[] s = value.split("\\s"); | |
TumblingBean bean = new TumblingBean(); | |
bean.setUserId(s[0]); | |
bean.setTime(Long.parseLong(s[1]) * 1000L); | |
return bean; | |
} | |
}) | |
.assignTimestampsAndWatermarks( | |
WatermarkStrategy | |
.<TumblingBean>forBoundedOutOfOrderness(Duration.ofSeconds(2)) | |
.withTimestampAssigner(new SerializableTimestampAssigner<TumblingBean>() { | |
@Override | |
public long extractTimestamp(TumblingBean element, long recordTimestamp) {System.out.println("水位线的工夫戳:" + element.getTime()); | |
return element.getTime();} | |
}) | |
) | |
.keyBy(TumblingBean::getUserId) | |
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) | |
.process(new EventTimeWindow()) | |
.print(); | |
env.execute();} |
}
class EventTimeWindow extends ProcessWindowFunction<TumblingBean, String, String, TimeWindow> {
@Override | |
public void process(String s, ProcessWindowFunction<TumblingBean, | |
String, String, TimeWindow>.Context context, Iterable<TumblingBean> elements, Collector<String> out) throws Exception { | |
int i = 0; | |
for (TumblingBean bean : elements) {i++;} | |
out.collect("总共有:" + i + "条数据"); | |
} |
}
一看这很多代码,第一反馈就是不想看。别急,我这里给你整体说一下这个代码你就懂了,次要逻辑 就是:
socketTextStream 读取数据
把读取到的数据利用 map 算子进行 split 切分
而后提取数据中的含有工夫的字段,并设置最大延迟时间是 2s
接着进行 key 操作
最初开了一个 10s 的窗口,等触发了窗口就进行计算
先补充几个重要的概念:
水位线(Watermark)就是一个 毫秒的工夫戳
Flink 默认每隔 200ms(机器工夫)向数据流中插入一个 水位线
Watermark 是⼀种掂量 Event Time 停顿的机制(逻辑时钟),能够设定提早触发
水位线必须枯燥递增,以确保工作的事件工夫在向前推动,而不是在后退
只有事件工夫须要水位线
水位线产生的公式:水位线 = 零碎察看到的最大工夫(就是 数据中携带的最大工夫戳)– 最大延迟时间
最大延迟时间由程序员本人设定
数据流中的水位线 用于示意 如果程序曾经解决的数据中含有的工夫戳都小于 水位线 的话,那么这些数据都曾经达到了,所以窗口的触发条件就是
水位线 >= 窗口完结工夫
那 这个窗口怎么触发呢?这是个好问题 水位线 >= 窗口完结工夫
咱们的程序里 设置的延迟时间是 2s,窗口完结工夫是 10s 然而不包含 10s 因为窗口是左闭又开的
水位线 = 零碎察看到的最大工夫(就是 数据中携带的最大工夫戳)– 最大延迟时间
如果 咱们 造的测试数据是 每条数据第一个字母是代表 key,第二个数字代表这条数据产生的工夫
a 1
a 2
b 5
b 4
b 12
阐明:1
当 a 1 这条数据过去时,咱们套用下面的公式
水位线 = 零碎察看到的最大工夫(就是 数据中携带的最大工夫戳)– 最大延迟时间 = 1 – 2 = -1
窗口触发的条件:
水位线 >= 窗口完结工夫 —–> -1 < 10 窗口不触发,数据被保留到状态外面
当 a 2 这条数据过去时,咱们套用下面的公式
水位线 = 零碎察看到的最大工夫(就是 数据中携带的最大工夫戳)– 最大延迟时间 = 2 – 2 = 0
窗口触发的条件:
水位线 >= 窗口完结工夫 —–>0 < 10 窗口不触发,数据被保留到状态外面
当 b 5 这条数据过去时,咱们套用下面的公式
水位线 = 零碎察看到的最大工夫(就是 数据中携带的最大工夫戳)– 最大延迟时间 = 5 – 2 = 3
窗口触发的条件:
水位线 >= 窗口完结工夫 —–> 3 < 10 窗口不触发,数据被保留到状态外面
当 b 4 这条数据过去时,咱们套用下面的公式
水位线 = 零碎察看到的最大工夫(就是 数据中携带的最大工夫戳)– 最大延迟时间 = 4 – 2 = 2
窗口触发的条件:
水位线 >= 窗口完结工夫 —–> 2 < 10 窗口不触发,数据被保留到状态外面
当 b 12 这条数据过去时,咱们套用下面的公式
水位线 = 零碎察看到的最大工夫(就是 数据中携带的最大工夫戳)– 最大延迟时间 = 12 – 2 = 10
窗口触发的条件:
水位线 >= 窗口完结工夫 —–> 10 < 10 窗口触发,计算窗口外面的逻辑
然而 b 12 这条数据 是不在 这次窗口外面的,因为 b 12 不属于 这个窗口。
阐明:其实
a 1
a 2
b 5
b 4
b 12
这几条数据中,b 4 是属于乱序数据,因为 先来了一条工夫戳是 5 的数据过后了,又来了一条工夫戳是 4 的数据 如果不必水位线去期待一下的,默认咱们就会把 b 4 这条数据给抛弃了,这就可能导致最初计算的后果不正确。
public class GenWatermark {
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.setParallelism(1); | |
//TODO 零碎默认每隔 200ms 机器工夫插入一次水位线 | |
//TODO 上面的语句设置为每隔 1 分钟插入一次水位线 | |
env.getConfig().setAutoWatermarkInterval(60000L); | |
env | |
.socketTextStream("192.168.1.141", 9999, '\n') | |
.map(new MapFunction<String, TumblingBean>() { | |
@Override | |
public TumblingBean map(String value) throws Exception {String[] s = value.split("\\s"); | |
TumblingBean bean = new TumblingBean(); | |
bean.setUserId(s[0]); | |
bean.setTime(Long.parseLong(s[1]) * 1000L); | |
return bean; | |
} | |
}) | |
.assignTimestampsAndWatermarks(new MyAssigner() | |
) | |
.keyBy(TumblingBean::getUserId) | |
.window(TumblingEventTimeWindows.of(Time.minutes(10))) | |
.process(new WaterMarkWindow()) | |
.print(); | |
env.execute();} |
}
class WaterMarkWindow extends ProcessWindowFunction<TumblingBean, String, String, TimeWindow> {
@Override | |
public void process(String s, ProcessWindowFunction<TumblingBean, | |
String, String, TimeWindow>.Context context, Iterable<TumblingBean> elements, Collector<String> out) throws Exception {System.out.println("以后水位线的值为:" + context.currentWatermark() + "\t" + "窗口开始工夫:" + context.window().getStart() + "\t" + "窗口完结工夫:" + context.window().getEnd()); | |
int i = 0; | |
for (TumblingBean bean : elements) {i++;} | |
out.collect("总共有:" + i + "条数据"); | |
} |
}
class MyAssigner implements AssignerWithPeriodicWatermarks<TumblingBean> {
//TODO 设置最大延迟时间为 10s | |
Long bound = 10 * 1000L; | |
//TODO 零碎察看到的元素蕴含的最大工夫戳 | |
Long maxTs = Long.MIN_VALUE + bound; | |
@Nullable | |
@Override | |
public Watermark getCurrentWatermark() {System.out.println("察看到的最大工夫戳是:" + maxTs); | |
return new Watermark(maxTs - bound); | |
} | |
@Override | |
public long extractTimestamp(TumblingBean element, long recordTimestamp) {System.out.println("察看到的数据:" + element.getTime()); | |
maxTs = Math.max(maxTs, element.getTime()); | |
return maxTs; | |
} |
}
上面有一个我本人自定义水位线的逻辑代码,大家感兴趣能够看看
总结
工夫,窗口,水位线 这三个是 Flink 中最根底的概念了,同时也是开发过程中用到的最多的货色,只有很好的把握了 工夫,窗口,水位线这些概念,能力依照需要去写出正确的代码,同时在呈现窗口不触发的状况下 疾速排查到 问题所在。
其实 对于窗口和水位线这块还有很多货色,比方多并行度下 水位线是怎么传递的,容许窗口的最大提早等,不过小伙伴们释怀,接下来这些货色咱们都会讲到的。
下一篇文章会从源码的角度来解读,当一条数据来了,咱们怎么提前数据外面的工夫字段生成水位线的,这条数据怎么调配到对应的窗口的,窗口的数据到底存在哪里了,这些窗口怎么触发的,触发完了怎么进行咱们本人写的逻辑运算的,怎么销毁窗口的等。
我会把这部分源码从头到尾讲一遍,让你对这部分的内容的了解更加粗浅,另外文章外面波及到的代码,如果大家想下载下来本人去测试的话,间接加我微信分割我就行了。
本文由博客一文多发平台 OpenWrite 公布!