1、概述
flink 中比拟重要的是工夫和状态,学习 flink 的过程中对水位线的了解始终含糊,通过一段时间的消化,在此总结总结。本文次要把水位线是什么,怎么来的,有什么用形容分明。
2、不太好了解的水位线
有些人喜爱把水位线叫成水印,不论是水印还是水位线,中文翻译过去一点都不贴切咱们的生存,比拟形象,让人难得了解。在咱们生存中水位线相似家中挂在墙上的一个挂钟,相似咱们的手表。上面来聊聊如下的话题:
1,到底是如何产生。
2,既然是一个挂钟,钟表有哪些特点呢,钟表每隔 1s 秒针往前走一小步,工夫是不是越来越大,这些特点水位线是不是也有呢。
3,挂钟有什么用途啊?早晨看看手表发现 12 点,咱们必定自我暗示:” 应该睡觉了 ”,通过工夫让咱们晓得什么工夫该干什么事件。
3、什么叫水位线
3.1、水位线的定义
水位线就是一个逻辑时钟,为什么叫逻辑时钟?失常工夫是有 cpu 产生的,周期而固定的往前走,然而咱们这个时钟的工夫是程序员计算出来,依据 ” 事件工夫 ” 动静计算出来 (至于什么是工夫事件,有什么应用场景这里就不讲了),如某一时刻计算的后果为 x,x 值为 2022-10-10 10:10:10 对应的工夫戳为 1665367810000,x 的值随着事件工夫的变大而变大,可能的后果为 x,x+1,x+2,x+3,x+4 … 间断的越来越大的工夫戳是不是相似钟表每隔 1s 往前走一步呢。
3.2、水位线(逻辑时钟)的组成
水位线由一串间断的工夫戳组成,越来越大,每个工夫戳都是依据事件工夫动静计算出来的。时钟也是由一间断的工夫组成,也是越来越大,如 2022-10-10 10:10:10,2022-10-10 10:10:11,2022-10-10 10:10:12,2022-10-10 10:10:13。。。等,水位线就是相似生存中的时钟,所以我把这个水位线称为逻辑时钟,逻辑时钟就是水位线,水印机制。
3.3、逻辑时钟以后工夫
相似时钟的以后工夫,此处此刻为几点几分几秒,这个以后工夫比拟重要,窗口的闭合,定时工作的触发都是依据以后工夫来判断的。
以后值特点:越来越大,流刚刚产生的时候插入负无穷大值,完结是插入正无穷大的值。
集体感觉这个以后值相似一个指针类型的变量,他的指向是不停的变动的(集体了解)。
3.4、以后工夫的计算公式
时钟的 ” 以后工夫 ” 对应一个具体的工夫戳。时钟的以后值 xxx = 事件工夫 – 最大延迟时间 – 1 毫秒。
3.5、来一个案例
案例形容:从 socket 读取数据,并打印以后水位的具体值。
package com.deepexi.sql;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class ExampleTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
// 从 socket 读取数据
.socketTextStream("192.168.117.211", 9999)
.map(r -> Tuple2.of(r.split("")[0], Long.parseLong(r.split(" ")[1])))
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.assignTimestampsAndWatermarks(
//5s 延迟时间
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
// 提取事件工夫
return element.f1;
}
})
)
// 分流
.keyBy(r -> r.f0)
.process(new KeyedProcessFunction<String, Tuple2<String, Long>, String>() {
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {
out.collect("以后的水位线是:" + ctx.timerService().currentWatermark());
}
})
.print();
env.execute();
}
}
nc -lk 9999 开启 socket 服务,监听 9999 端口
命令行输出:a 1000
[root@localhost ~]# nc -lk 9999 a 1000
idea 控制台打印
以后的水位线是:-9223372036854775808 //-9223372036854775808 是一个无穷大的数字
命令行输出:a 2000
idea 控制台打印:
以后的水位线是:-4001 // 以后水位线的值 = 事件工夫 – 最大延迟时间 -1 = 1000 – 5000 -1 = -4000
为什么用 1000- 5000 - 1 而用 2000 – 5000 -1?flink 会周期往流中插入水位线,水位线也是流中的一个元素,还是看下图了解吧。
命令行输出:a 3000
idea 控制台打印:以后的水位线是:-3001 //2000 – 5000 -1 = -2000
命令行输出:a 10000
idea 控制台打印:以后的水位线是:-2001 //3000 – 5000 -1 = -2000
命令行输出:a 1000
idea 控制台打印:以后的水位线是:4999 //10000 – 5000 -1 = 4999
命令行输出:a 1000
idea 控制台打印:以后的水位线是:4999 //10000 – 5000 -1 = 4999
命令行输出:a 2000
idea 控制台打印:以后的水位线是:4999 //10000 – 5000 -1 = 4999
通过控制台的打印后果发现水位线的和钟表一样,值总是越来越大的,随着事件工夫的变动而变动,然而不会变小,也可能会进行某一刻,如输出 a 1000 后在输出 a 1000,a 2000 水位线的值始终是 4999。
整个打印过程
命令行窗口:
[root@master ~]# nc -lk 9999
a 1000
a 2000
a 3000
a 10000
a 1000
a 1000
a 2000
idea 打印:
以后的水位线是:-9223372036854775808
以后的水位线是:-4001
以后的水位线是:-3001
以后的水位线是:-2001
以后的水位线是:4999
以后的水位线是:4999
以后的水位线是:4999
4、如何产生的
水位线实质就是一个工夫戳,这个工夫戳是程序员依据事件工夫动静计算出来,间接来一个案例吧。
案例 1
自定义水位线的产生逻辑,实现 WatermarkStrategy 接口,flink 会每隔 200 毫秒的调用 onPeriodicEmit 办法。
public class ExampleTest2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 设置每隔 1 分钟插入一次水位线
//env.getConfig().setAutoWatermarkInterval(6 * 1000L);
env
.socketTextStream("192.168.117.211", 9999)
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {String[] arr = value.split(" ");
return Tuple2.of(arr[0], Long.parseLong(arr[1]));
}
})
.assignTimestampsAndWatermarks(new CustomWatermarkGenerator())
.print();
env.execute();}
public static class CustomWatermarkGenerator implements WatermarkStrategy<Tuple2<String, Long>> {
@Override
public TimestampAssigner<Tuple2<String, Long>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {return element.f1;}
};
}
@Override
public WatermarkGenerator<Tuple2<String, Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Tuple2<String, Long>>() {
// 最大延迟时间
private Long bound = 5000L;
private Long maxTs = -Long.MAX_VALUE + bound + 1L;
@Override
public void onEvent(Tuple2<String, Long> event, long eventTimestamp, WatermarkOutput output) {
// 更新察看到的最大事件工夫
maxTs = Math.max(maxTs, event.f1);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {System.out.println("水位线的值:" + (maxTs - bound - 1L));
// 发送水位线,计算公式:事件工夫 - 延迟时间 -1L
output.emitWatermark(new Watermark(maxTs - bound - 1L));
}
};
}
}
}
nc -lk 9999 开启 socket 服务,监听 9999 端口
启动 idea,控制台每隔 200 毫秒打印后果:水位线的值:xxxxx。如下:
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
命令行输出:a 1000
控制台每隔 200 毫秒打印后果:水位线的值:xxxxx。如下:
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
命令行输出:a 2000
控制台每隔 200 毫秒打印接口:水位线的值:xxxxx。如下:
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
// 默认 200 毫秒插入水位线到流,能够设置水位线的插入流的工夫距离
env.getConfig().setAutoWatermarkInterval(6 * 1000L);
整个打印过程
命令行窗口:
[root@master ~]# nc -lk 9999
a 1000
a 2000
idea 打印:
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
(a,1000)
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
(a,2000)
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
Disconnected from the target VM, address: '127.0.0.1:58591', transport: 'socket'
水位线的值:-3001
Process finished with exit code 130
通过后果咱们能够晓得,水位线的值随着事件工夫 1000,2000 的变动而变动。如果输出 a 2000 后在输出 a 1000,控制台打印后果是怎么的?那必定打印的是:水位线的值:-3001,因为水位线的值和工夫一样永远只会越来越大。
案例 2
革新一下程序,新增如下代码,keyby 后,把命令行输出的元素打印进去。
nc -lk 9999 启动 socket 监听 9999 端口
启动 idea
命令行输出
[root@localhost ~]# nc -lk 9999
a 1000
a 2000
a 5000
a 6000
idea 控制台打印:
水位线的值:-9223372036854775807
水位线的值:-9223372036854775807
输出业务数据是:(a,1000)
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
水位线的值:-4001
输出业务数据是:(a,2000)
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
水位线的值:-3001
输出业务数据是:(a,5000)
水位线的值:-1
水位线的值:-1
水位线的值:-1
水位线的值:-1
水位线的值:-1
水位线的值:-1
水位线的值:-1
水位线的值:-1
输出业务数据是:(a,6000)
水位线的值:999
水位线的值:999
水位线的值:999
水位线的值:999
水位线的值:999
剖析计算结果后果:
-9223372036854775807,-9223372036854775807,(a,1000),-4001,-4001,-4001,-4001,-4001,-4001,-4001,-4001,(a,2000),-3001,-3001,-3001,-3001,-3001,(a,5000),-1,-1,-1,(a,6000),999,999,999,999
不晓得大家有没有一种感觉,水位线和业务数据什么关系?是不是相似生存中落花和流水的关系呢?业务数据就是河流中的水,水位线就像落在水中的花,他们两一起流向大海,水位线和业务数据一样都属于流中的一个元素。
5、有什么用
在流的世界逻辑时钟就是一个参照物。还是挂钟来举例吧,看看挂钟曾经 12 点了,咱们必定在会暗示本人该放下手机了要睡觉了。针对源源不断的数据流,把数据流拆分为多段进行解决,针对每段数据进行统计,那什么时候触发统计呢?这个时候就会用这个逻辑时钟,窗口看看逻辑工夫以后处于几点钟,发现窗口完结工夫小于时钟的工夫,窗口闭合进行统计。
案例 1,水位线触发定时工作的执行
性能形容:水位线的以后工夫戳大于定时工作的的触发工夫后 触发定时工作的执行。
public class ExampleTest3 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.socketTextStream("192.168.117.211", 9999)
.map(r -> Tuple2.of(r.split("")[0], Long.parseLong(r.split(" ")[1])))
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {return element.f1;}
})
)
.keyBy(r -> r.f0)
.process(new KeyedProcessFunction<String, Tuple2<String, Long>, String>() {
@Override
public void processElement(Tuple2<String, Long> value, Context ctx, Collector<String> out) throws Exception {// out.collect("以后的水位线是:" + ctx.timerService().currentWatermark());
ctx.timerService().registerEventTimeTimer(value.f1 + 5000L);
out.collect("注册了一个工夫戳是:" + new Timestamp(value.f1 + 5000L) + "的定时器");
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);
out.collect("定时器触发了!");
}
})
.print();
env.execute();}
}
nc -lk 9999 开启 socket 服务,监听 9999 端口
命令行输出:a 1665367810000 //1665367810000 对应的工夫为 2022-10-10 10:10:10
控制台输入:注册了一个工夫戳是:2022-10-10 10:10:15.0 的定时器 //2022-10-10 10:10:15 转换为工夫戳为 1665367815000
解释一下控制台输入后果
以后水位线的值:2022-10-10 10:10:10 – 5s - 1 毫秒 = 1665367810000 – 5000 -1 = 1665367804999。当水位线的值大于 1665367815000 定时工作触发。
命令行输出:1665367821000 // 命令行输出 2022-10-10 10:10:21 对应的工夫戳 1665367821000 将会触发定时工作
控制台输入:定时器触发了!
命名行打印输出
[root@master ~]# nc -lk 9999
a 1665367810000
a 1665367821000
idea 打印输出
注册了一个工夫戳是:2022-10-10 10:10:15.0 的定时器
注册了一个工夫戳是:2022-10-10 10:10:26.0 的定时器
定时器触发了!
案例 2,水位线以后工夫戳大于窗口完结工夫触发窗口闭
案例 day3.Example4
public class ExampleTest4 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.socketTextStream("192.168.117.211", 9999)
.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {String[] arr = value.split(" ");
return Tuple2.of(arr[0], Long.parseLong(arr[1]));
}
})
.assignTimestampsAndWatermarks(
// 最大延迟时间设置为 5 秒
WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {return element.f1; // 通知 flink 事件工夫是哪一个字段}
})
)
.keyBy(r -> r.f0)
// 5 秒的事件工夫滚动窗口
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {long windowStart = context.window().getStart();
long windowEnd = context.window().getEnd();
// System.out.println("以后窗口的完结值:" + context.currentWatermark());
// System.out.println("以后水位线的值:" + context.currentWatermark());
long count = elements.spliterator().getExactSizeIfKnown();
out.collect("用户" + key + "在窗口" +
""+ new Timestamp(windowStart) +"~"+ new Timestamp(windowEnd) +"" +
"中的 pv 次数是:" + count);
}
})
.print();
env.execute();}
}
命令行输出:a 1665367810000 //flink 将开启一个 2022-10-10 10:10:10.0~2022-10-10 10:10:15 的窗口,当水位线以后值 (以后值指下面的以后工夫) 大于窗口完结工夫对应的工夫戳会触发窗口闭合。
命令行输出:a 1665367821000 // 此时水位线以后值为:1665367821000 – 5000 -1 = 1665367815999,1665367815999 转换为工夫:2022-10-10 10:10:15,2022-10-10 10:10:15 等于窗口完结工夫,所以触发窗口闭合。
管制输入:用户 a 在窗口 2022-10-10 10:10:10.0~2022-10-10 10:10:15.0 中的 pv 次数是:1
命令行
[root@master ~]# nc -lk 9999
a 1665367810000
a 1665367821000
idea
以后窗口的完结值:1665367815999
以后水位线的值:1665367815999
用户 a 在窗口 2022-10-10 10:10:10.0~2022-10-10 10:10:15.0 中的 pv 次数是:1
如果依据 ” 解决工夫 ” 来进行统计分析,窗口要闭合进行统计,必定有一个参考的工夫,只是这个工夫是 cpu 帮忙产生的,窗口的闭合依据 cpu 产生的工夫进行闭合,但逻辑时钟的某霎时的值是程序计算出来的,这也是为什么把水位线称为逻辑时钟。
6、早退数据的解决
6.1、什么叫早退数据
事件工夫小于水位线以后工夫戳,比方以后数据流的数据 xxx 携带的事件工夫是 2022:20:50,逻辑时钟的此时的工夫为 2022:20:51,那么 flink 认为 xxx 就是一条早退数据。
案例形容:手动发送水位线,手动发送携带事件工夫的元素。
public class ExampleTest5 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<String> result = env
.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
// 发送数据携带事件工夫的数据 hello world
ctx.collectWithTimestamp("hello world", 1000L);
// 发送水位线
ctx.emitWatermark(new Watermark(999L));
// 发送数据携带事件工夫的数据 hello flink
ctx.collectWithTimestamp("hello flink", 2000L);
// 发送水位线
ctx.emitWatermark(new Watermark(1999L));
// 发送数据携带事件工夫的数据 hello late
ctx.collectWithTimestamp("hello late", 1000L);
}
@Override
public void cancel() {}
})
.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {//System.out.println("以后水位线:" + ctx.timerService().currentWatermark());
// 判断事件工夫是否小于水位线
if (ctx.timestamp() < ctx.timerService().currentWatermark()) {System.out.println("早退元素:" + value);
} else {System.out.println("失常元素:" + value);
}
}
});
env.execute();}
}
控制台输入:
失常元素:hello world
失常元素:hello flink
早退元素:hello late
6.2、早退元素的解决
了解了什么叫早退元素,至于怎么解决,flink 提供了几种计划,如
案例:早退数据发送到 ” 侧输入流 ” 中
public class ExampleTest {
// 定义侧输入流
private static OutputTag<String> lateElement = new OutputTag<String>("late-element") { };
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<String> result = env
.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
// 发送数据携带事件工夫的数据 hello world
ctx.collectWithTimestamp("hello world", 1000L);
// 发送水位线
ctx.emitWatermark(new Watermark(999L));
// 发送数据携带事件工夫的数据 hello flink
ctx.collectWithTimestamp("hello flink", 2000L);
// 发送水位线
ctx.emitWatermark(new Watermark(1999L));
// 发送数据携带事件工夫的数据 hello late
ctx.collectWithTimestamp("hello late", 1000L);
}
@Override
public void cancel() {}
})
.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
// 判断事件工夫是否小于水位线
if (ctx.timestamp() < ctx.timerService().currentWatermark()) {ctx.output(lateElement, "早退元素发送到侧输入流:" + value);
} else {out.collect("失常达到的元素:" + value);
}
}
});
result.print("支流:");
result.getSideOutput(lateElement).print("侧输入流:");
env.execute();}
}
idea 控制台输入:
支流:> 失常达到的元素:hello world
支流:> 失常达到的元素:hello flink
侧输入流:> 早退元素发送到侧输入流:hello late
思考:窗口,早退元素,水位线之间有什么关联?
7、总结
水位线相似生存中的时钟,通过时钟咱们晓得以后工夫处于几点几分秒,这个 ” 以后工夫 ” 在 flink 外面对应一个工夫戳,通过工夫戳来触发窗口的闭合,触发定时工作的执行。也相似一个参照物的角色。