问题的提出
对于WaterMark设计的地位是否会影响窗口的失常开闭?
上面我模仿了两种情景(source并行度为1,map并行度为2),别离是
1.在source后设置watermark,通过map后开窗
2.在map后设置watermark,而后开窗
ps: 上面的两种代码我都设置了天然增长的watermark,窗口工夫都是5秒,只是设置watermark的地位不同
watermark是testWM对象的ts字段*1000
代码一:在Source后增加WaterMark
public class WMTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
// TODO: 2021/12/1 在source后设置watermark
SingleOutputStreamOperator<String> sourceWithWM = source.assignTimestampsAndWatermarks(WatermarkStrategy
.<String>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
String[] split = element.split(",");
return Long.parseLong(split[2]) * 1000;
}
}));
// TODO: 2021/12/1 设置map并行度为2
SingleOutputStreamOperator<testWM> mapDS = sourceWithWM.map(r -> {
String[] split = r.split(",");
return new testWM(Integer.parseInt(split[0]), Integer.parseInt(split[1]),Long.parseLong(split[2]));
}).setParallelism(2);
SingleOutputStreamOperator<String> resultDS = mapDS.keyBy(r -> r.getId())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<testWM, String, Integer, TimeWindow>() {
@Override
public void process(Integer integer, Context context, Iterable<testWM> elements, Collector<String> out) throws Exception {
out.collect("我关窗啦");
}
});
resultDS.print();
env.execute();
}
}
@Data
@AllArgsConstructor
class testWM{
private int id;
private int num;
private long ts;
}
代码二:在Map后设置WaterMark
public class WMTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
// TODO: 2021/12/1 设置map并行度为2
SingleOutputStreamOperator<testWM> mapDS = source.map(r -> {
String[] split = r.split(",");
return new testWM(Integer.parseInt(split[0]), Integer.parseInt(split[1]),Long.parseLong(split[2]));
}).setParallelism(2);
// TODO: 2021/12/1 在map后增加watermark
SingleOutputStreamOperator<testWM> mapWithWM = mapDS.assignTimestampsAndWatermarks(WatermarkStrategy
.<testWM>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<testWM>() {
@Override
public long extractTimestamp(testWM element, long recordTimestamp) {
return element.getTs() * 1000;
}
}));
SingleOutputStreamOperator<String> resultDS = mapWithWM.keyBy(r -> r.getId())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<testWM, String, Integer, TimeWindow>() {
@Override
public void process(Integer integer, Context context, Iterable<testWM> elements, Collector<String> out) throws Exception {
out.collect("我关窗啦");
}
});
resultDS.print();
env.execute();
}
}
@Data
@AllArgsConstructor
class testWM{
private int id;
private int num;
private long ts;
}
运行后果:
对于第一种,在source后增加watermark的后果如下:
当1,1,1这条数据进入时,开启了[0,5)这个窗口,当1,1,9这条数据进入时,watermark升至9000(疏忽watermark的减1).窗口敞开,没有问题
对于第二种,在map后增加watermark的后果如下:
能够很显著的发现,当第一条1,1,9进入时,[0,5)这个窗口并没有敞开.直到第二条1,1,9进入时,窗口才被敞开,这是为什么?
我针对以上两种状况画了下图来了解.
图一.图二描述了source之后设置watermark的场景,一般来说,这是咱们生产中须要的
WaterMark以播送的模式向上游发送,大数据培训并且如果同时接管上游多条并行度的WaterMark时,以小的为准
这就导致图三(Map后设置WaterMark)中,我须要发送两条足够[0,5)这个窗口敞开的数据,能力真正敞开窗口,因为数据要通过轮询能力达到每个并行度。
拓展:
在KafkaSource中,曾经做了很好得优化,在生产中咱们个别设置并行度与topic分区数雷同
如果设置得并行度比topic分区数多,那必然有并行度生产不到数据,就会导致WaterMark始终放弃在Long.min_value.
当这种WaterMark向上游播送之后,会导致所有失常并行度的窗口全副无奈敞开,因为WaterMark取了各个并行度的最小值
然而当这种状态放弃一段时间之后,程序会在计算WaterMark的时候,主动过滤掉迟迟没有数据的并行度传进来的WaterMark,这就是KafkaSource的优化.
发表回复