关于java:Flink-基础知识-时间窗口水位线

4次阅读

共计 8971 个字符,预计需要花费 23 分钟才能阅读完成。

大家好,我是大圣。

最近在整顿 Flink 方面的知识点,明天更新一遍 Flink 的根底的知识点,让大家对 Flink 的 工夫、窗口、水位线 有一个根本的意识,而后下一篇文章会从源码的角度来解析 Flink 在 工夫、窗口、水位线 底层是怎么实现的。

话不多说,间接上明天的纲要主题:

图片

工夫

Flink 中 定义了 3 种 工夫类型:事件工夫(Event Time)、解决工夫(Processing Time)和摄取工夫(Ingestion Time)。

上面画张图来解释一下 3 种 工夫类型:

(1)事件工夫

事件工夫指的是这条数据产生的工夫,比方我用手机在某团上 2022-01-01:13:14 这个一时刻下单点了一个外卖,那么下单点外卖的这个工夫 2022-01-01:13:14 就是我点外卖这个事件产生的工夫,这个工夫确定之后就不会扭转。

还例如,咱们用 Flink 生产 Kafka 外面的日志数据的时候,每条日志外面所蕴含的工夫字段 的工夫戳就是事件工夫。通过事件工夫能够实现工夫旅行。

应用事件工夫的益处是不依赖服务器的时钟,这一批数据无论你重复执行多少次,失去的后果都是一样的。然而咱们应用事件工夫的时候,须要从每一条日志数据外面把含有工夫戳字段的提取进去。

(2)解决工夫

解决工夫是指数据被计算引擎(Flink)解决的工夫,这个工夫以你 Flink 程序在哪台服务器执行,就以那台服务器的时钟为准。

应用解决工夫依赖于操作系统的时钟,然而每台机器的时钟是变动的。比方我开了 2s 的解决工夫的窗口去解决数据,然而可能每台服务器的解决数据的能力不一样,可能第一次跑这一批数据的时候,2s 的工夫解决了 100 条数据,第二次跑这一批数据的时候,2s 的工夫解决了 500 条数据。

这样就会导致两次计算的后果不一样,就是可能是同一台机器也会呈现这种状况,因为 解决能力可能受过后 CPU 的影响,在 2s 内解决的数据条数不一样。然而解决工夫计算逻辑简略,提早性低性能好于事件工夫。

(3)摄取工夫

摄取工夫是指咱们的数据进入 Flink source 算子的工夫,摄取工夫个别用的比拟少,它也会代带来解决数据的后果不正确的状况。

在代码层面怎么应用这三个工夫:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 应用事件工夫
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 应用解决工夫
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 应用摄取工夫
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

窗口

批处理实质上是解决无限不变的数据集,流解决的实质是解决有限继续产生的数据集,所以批实质上来说是流的一种特例,那么窗口就是流和批对立的桥梁,对流上的数据进行窗口切分。

每一个窗口一旦到了要计算的时候,就能够被看成一个不可变的数据集,在触发计算之前,窗口外面的数据可能会继续的扭转,因而对窗口的数据进行计算就是批处理。

Flink 中窗口依照是否并发执行,分为 Keyed Window 和 Non-Keyed Window,它们次要的区别是有没有 keyBy 操作。

Keyed Window 能够依照指定的分区形式并发执行,所有雷同的键会被调配到雷同的工作上执行。Non-Keyed Window 会把所有数据放到一个工作上执行,并发度为 1。上面是窗口相干的 API。

  1. Keyed Wind
  2. Keyed Wind

stream

   .keyBy(...)                
   .window(...)              // 承受 WindowAssigner 参数,用来调配窗口
  [.trigger(...)]            // 可选的,承受 Trigger 类型参数,用来触发窗口
  [.evictor(...)]            // 可选的,承受 Evictor 类型参数,用来驱赶窗口中的数据
  // 可选的,承受 Time 类型参数,示意窗口容许的最大提早,超过该提早,数据会被抛弃
  [.allowedLateness(...)]   
  // 可选的,承受 OutputTag 类型参数,用来定义摈弃数据的输入
  [.sideOutputLateData(...)] 
   .reduce/aggregate/apply()      // 窗口函数
  [.getSideOutput(...)]      // 可选的,获取指定的 DataStream

  1. Non-Keyed Windows

stream

   .windowAll(...)           // 承受 WindowAssigner 参数,用来调配窗口
  [.trigger(...)]            // 可选的,承受 Trigger 类型参数,用来触发窗口
  [.evictor(...)]            // 可选的,承受 Evictor 类型参数,用来驱赶窗口中的数据
   // 可选的,承受 Time 类型参数,示意窗口容许的最大提早,超过该提早,数据会被抛弃
  [.allowedLateness(...)]    
  // 可选的,承受 OutputTag 类型参数,用来定义摈弃数据的输入
  [.sideOutputLateData(...)]
   .reduce/aggregate/apply()        // 窗口函数
  [.getSideOutput(...)]      // 可选的,获取指定的 DataStream


  1. Non-Keyed Windows

stream

   .windowAll(...)           // 承受 WindowAssigner 参数,用来调配窗口
  [.trigger(...)]            // 可选的,承受 Trigger 类型参数,用来触发窗口
  [.evictor(...)]            // 可选的,承受 Evictor 类型参数,用来驱赶窗口中的数据
   // 可选的,承受 Time 类型参数,示意窗口容许的最大提早,超过该提早,数据会被抛弃
  [.allowedLateness(...)]    
  // 可选的,承受 OutputTag 类型参数,用来定义摈弃数据的输入
  [.sideOutputLateData(...)]
   .reduce/aggregate/apply()        // 窗口函数
  [.getSideOutput(...)]      // 可选的,获取指定的 DataStream

上面咱们来看看下面提到的几个概念。

WindowAssiner:窗口分配器。咱们罕用的滚动窗口、滑动窗口、会话窗口等就是由 WindowAssiner 决定的,比方 TumblingEventTimeWindows 能够基于事件工夫的滚动窗口。

Trigger:触发器。Flink 依据 WindowAssigner 把数据调配到不同的窗口,还须要晓得什么时候触发窗口,Trigger 就是用来判断什么时候触发窗口的计算。Trigger 类中定义了一些返回值类型,依据返回值类型来决定是否触发窗口的计算。

Evictor:驱赶器。在窗口触发之后,在调用用户本人写的窗口函数之前或者之后,Flink 容许咱们定制要解决的数据汇合,Evictor 就是用来驱赶或者过滤不须要的数据集的。

Allowed Lateness:最大容许提早。次要用在基于事件工夫的窗口,示意水位线触发窗口计算之后还容许数据早退多久,在最长容许的延迟时间内,窗口是不会销毁的。

Window Function:窗口函数。用户代码执行函数,就是用户本人写的业务逻辑,用来做真正计算的。

Side Output:抛弃数据的汇合。通过 getSideOutput 办法能够获取抛弃的数据,而后用户本人灵便去解决抛弃的数据。

上面咱们来看看 Flink 外面的 计数窗口,工夫窗口,会话窗口

(1)计数窗口

这个用到的不多,这里就不做介绍了。

(2)工夫窗口

Tumble Time Window(滚动窗口)

图片

示意在工夫上依照当时约定的窗口大小(就是你在代码外面指定的窗口大小)进行窗口的切分,窗口之间不会互相重叠。

基于工夫的 滚动窗口 又分为 基于 解决工夫的 滚动窗口 和 基于事件工夫的 滚动窗口。具体如下:

解决工夫的 滚动窗口:

.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))

事件工夫的 滚动窗口:

.window(TumblingEventTimeWindows.of(Time.seconds(10)))

Sliding Time Window(滑动窗口)

图片

与滚动窗口相似,滑动窗口的 assigner 散发元素到指定大小的窗口,窗口大小通过 window size 参数设置。滑动窗口须要一个额定的滑动间隔(window slide)参数来管制生成新窗口的频率。因而,如果 slide 小于窗口大小,滑动窗口能够容许窗口重叠。这种状况下,一个元素可能会被散发到多个窗口。

比如说,你设置了大小为 10 分钟,滑动间隔 5 分钟的窗口,你会在每 5 分钟失去一个新的窗口,外面蕴含之前 10 分钟达到的数据(如上图所示)。

基于工夫的 滑动窗口 又分为 基于 解决工夫的 滑动窗口 和 基于事件工夫的 滑动窗口。具体如下:

解决工夫的 滑动窗口:

.window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(5)))

事件工夫的 滑动窗口:

.window(SlidingEventTimeWindows.of(Time.minutes(10), Time.minutes(5)));

(4)会话窗口

Session Window 是一种非凡的窗口,当超过一段时间,该窗口没有收到新的数据元素,则视为这个窗口就完结了,所有无奈当时确认窗口的长度等。

会话窗口平时工作中,我用到的也比拟少,所以这外面不做详解。

水位线

水位线(Watermark)用于解决乱序事件,而正确的解决乱序事件,通常用 Watermark 机制联合窗口来实现。

从流解决原始设施产生事件,到 Flink 读取到数据,再到 Flink 多个算子解决数据,在这个过程中,会受到网络提早、数据乱序、背压、Failover 等多种状况的影响,导致数据是乱序的。

尽管大部分状况下是没有问题,然而不得不在设计上思考此类异常情况,为了保障计算结果的正确性,须要期待数据,这带来了计算的提早。对于提早太久的数据,不能有限的等上来,所以必须有一个机制,来保障特定的工夫后肯定会触发窗口的来进行计算,这个触发机制就是水位线(Watermark)。

上面咱们来看一组代码,怎么去应用 Watermark,并且怎么去触发窗口计算:

public class EventTimeExample {

 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env
            .socketTextStream("192.168.1.141", 9998, '\n')
            .map(new MapFunction<String, TumblingBean>() {
                @Override
                public TumblingBean map(String value) throws Exception {String[] s = value.split("\\s");
                    TumblingBean bean = new TumblingBean();
                    bean.setUserId(s[0]);
                    bean.setTime(Long.parseLong(s[1]) * 1000L);
                    return bean;
                }
            })
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy
                            .<TumblingBean>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                            .withTimestampAssigner(new SerializableTimestampAssigner<TumblingBean>() {
                                @Override
                                public long extractTimestamp(TumblingBean element, long recordTimestamp) {System.out.println("水位线的工夫戳:" + element.getTime());
                                    return element.getTime();}
                            })
            )
            .keyBy(TumblingBean::getUserId)
            .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
            .process(new EventTimeWindow())
            .print();

    env.execute();}

}

class EventTimeWindow extends ProcessWindowFunction<TumblingBean, String, String, TimeWindow> {

@Override
public void process(String s, ProcessWindowFunction<TumblingBean,
        String, String, TimeWindow>.Context context, Iterable<TumblingBean> elements, Collector<String> out) throws Exception {
    int i = 0;
    for (TumblingBean bean : elements) {i++;}
    out.collect("总共有:" + i + "条数据");
}

}

一看这很多代码,第一反馈就是不想看。别急,我这里给你整体说一下这个代码你就懂了,次要逻辑 就是:

socketTextStream 读取数据

把读取到的数据利用 map 算子进行 split 切分

而后提取数据中的含有工夫的字段,并设置最大延迟时间是 2s

接着进行 key 操作

最初开了一个 10s 的窗口,等触发了窗口就进行计算

先补充几个重要的概念:

水位线(Watermark)就是一个 毫秒的工夫戳

Flink 默认每隔 200ms(机器工夫)向数据流中插入一个 水位线

Watermark 是⼀种掂量 Event Time 停顿的机制(逻辑时钟),能够设定提早触发

水位线必须枯燥递增,以确保工作的事件工夫在向前推动,而不是在后退

只有事件工夫须要水位线

水位线产生的公式:水位线 = 零碎察看到的最大工夫(就是 数据中携带的最大工夫戳)– 最大延迟时间

最大延迟时间由程序员本人设定

数据流中的水位线 用于示意 如果程序曾经解决的数据中含有的工夫戳都小于 水位线 的话,那么这些数据都曾经达到了,所以窗口的触发条件就是

水位线 >= 窗口完结工夫

那 这个窗口怎么触发呢?这是个好问题 水位线 >= 窗口完结工夫

咱们的程序里 设置的延迟时间是 2s,窗口完结工夫是 10s 然而不包含 10s 因为窗口是左闭又开的

水位线 = 零碎察看到的最大工夫(就是 数据中携带的最大工夫戳)– 最大延迟时间

如果 咱们 造的测试数据是 每条数据第一个字母是代表 key,第二个数字代表这条数据产生的工夫

a 1

a 2

b 5

b 4

b 12

阐明:1

当 a 1 这条数据过去时,咱们套用下面的公式

水位线 = 零碎察看到的最大工夫(就是 数据中携带的最大工夫戳)– 最大延迟时间 = 1 – 2 = -1

窗口触发的条件:

水位线 >= 窗口完结工夫 —–> -1 < 10 窗口不触发,数据被保留到状态外面

当 a 2 这条数据过去时,咱们套用下面的公式

水位线 = 零碎察看到的最大工夫(就是 数据中携带的最大工夫戳)– 最大延迟时间 = 2 – 2 = 0

窗口触发的条件:

水位线 >= 窗口完结工夫 —–>0 < 10 窗口不触发,数据被保留到状态外面

当 b 5 这条数据过去时,咱们套用下面的公式

水位线 = 零碎察看到的最大工夫(就是 数据中携带的最大工夫戳)– 最大延迟时间 = 5 – 2 = 3

窗口触发的条件:

水位线 >= 窗口完结工夫 —–> 3 < 10 窗口不触发,数据被保留到状态外面

当 b 4 这条数据过去时,咱们套用下面的公式

水位线 = 零碎察看到的最大工夫(就是 数据中携带的最大工夫戳)– 最大延迟时间 = 4 – 2 = 2

窗口触发的条件:

水位线 >= 窗口完结工夫 —–> 2 < 10 窗口不触发,数据被保留到状态外面

当 b 12 这条数据过去时,咱们套用下面的公式

水位线 = 零碎察看到的最大工夫(就是 数据中携带的最大工夫戳)– 最大延迟时间 = 12 – 2 = 10

窗口触发的条件:

水位线 >= 窗口完结工夫 —–> 10 < 10 窗口触发,计算窗口外面的逻辑

然而 b 12 这条数据 是不在 这次窗口外面的,因为 b 12 不属于 这个窗口。

阐明:其实

a 1

a 2

b 5

b 4

b 12

这几条数据中,b 4 是属于乱序数据,因为 先来了一条工夫戳是 5 的数据过后了,又来了一条工夫戳是 4 的数据 如果不必水位线去期待一下的,默认咱们就会把 b 4 这条数据给抛弃了,这就可能导致最初计算的后果不正确。

public class GenWatermark {

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setParallelism(1);

    //TODO 零碎默认每隔 200ms 机器工夫插入一次水位线
    //TODO 上面的语句设置为每隔 1 分钟插入一次水位线
    env.getConfig().setAutoWatermarkInterval(60000L);
    env
            .socketTextStream("192.168.1.141", 9999, '\n')
            .map(new MapFunction<String, TumblingBean>() {
                @Override
                public TumblingBean map(String value) throws Exception {String[] s = value.split("\\s");
                    TumblingBean bean = new TumblingBean();
                    bean.setUserId(s[0]);
                    bean.setTime(Long.parseLong(s[1]) * 1000L);

                    return bean;
                }
            })
            .assignTimestampsAndWatermarks(new MyAssigner()
            )
            .keyBy(TumblingBean::getUserId)
            .window(TumblingEventTimeWindows.of(Time.minutes(10)))
            .process(new WaterMarkWindow())
            .print();

    env.execute();}

}

class WaterMarkWindow extends ProcessWindowFunction<TumblingBean, String, String, TimeWindow> {

@Override
public void process(String s, ProcessWindowFunction<TumblingBean,
        String, String, TimeWindow>.Context context, Iterable<TumblingBean> elements, Collector<String> out) throws Exception {System.out.println("以后水位线的值为:" + context.currentWatermark() + "\t" + "窗口开始工夫:" + context.window().getStart() + "\t" + "窗口完结工夫:" + context.window().getEnd());

    int i = 0;
    for (TumblingBean bean : elements) {i++;}
    out.collect("总共有:" + i + "条数据");
}

}

class MyAssigner implements AssignerWithPeriodicWatermarks<TumblingBean> {

//TODO 设置最大延迟时间为 10s
Long bound = 10 * 1000L;

//TODO 零碎察看到的元素蕴含的最大工夫戳
Long maxTs = Long.MIN_VALUE + bound;

@Nullable
@Override
public Watermark getCurrentWatermark() {System.out.println("察看到的最大工夫戳是:" + maxTs);
    return new Watermark(maxTs - bound);
}

@Override
public long extractTimestamp(TumblingBean element, long recordTimestamp) {System.out.println("察看到的数据:" + element.getTime());
    maxTs = Math.max(maxTs, element.getTime());
    return maxTs;

}

}

上面有一个我本人自定义水位线的逻辑代码,大家感兴趣能够看看

总结

工夫,窗口,水位线 这三个是 Flink 中最根底的概念了,同时也是开发过程中用到的最多的货色,只有很好的把握了 工夫,窗口,水位线这些概念,能力依照需要去写出正确的代码,同时在呈现窗口不触发的状况下 疾速排查到 问题所在。

其实 对于窗口和水位线这块还有很多货色,比方多并行度下 水位线是怎么传递的,容许窗口的最大提早等,不过小伙伴们释怀,接下来这些货色咱们都会讲到的。

下一篇文章会从源码的角度来解读,当一条数据来了,咱们怎么提前数据外面的工夫字段生成水位线的,这条数据怎么调配到对应的窗口的,窗口的数据到底存在哪里了,这些窗口怎么触发的,触发完了怎么进行咱们本人写的逻辑运算的,怎么销毁窗口的等。

我会把这部分源码从头到尾讲一遍,让你对这部分的内容的了解更加粗浅,另外文章外面波及到的代码,如果大家想下载下来本人去测试的话,间接加我微信分割我就行了。

本文由博客一文多发平台 OpenWrite 公布!

正文完
 0