问题的提出
对于WaterMark设计的地位是否会影响窗口的失常开闭?

上面我模仿了两种情景(source并行度为1,map并行度为2),别离是
1.在source后设置watermark,通过map后开窗
2.在map后设置watermark,而后开窗

ps: 上面的两种代码我都设置了天然增长的watermark,窗口工夫都是5秒,只是设置watermark的地位不同

            watermark是testWM对象的ts字段*1000

代码一:在Source后增加WaterMark
public class WMTest {

public static void main(String[] args) throws Exception {    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    DataStreamSource<String> source = env.socketTextStream("localhost", 9999);    // TODO: 2021/12/1 在source后设置watermark     SingleOutputStreamOperator<String> sourceWithWM = source.assignTimestampsAndWatermarks(WatermarkStrategy            .<String>forMonotonousTimestamps()            .withTimestampAssigner(new SerializableTimestampAssigner<String>() {                @Override                public long extractTimestamp(String element, long recordTimestamp) {                    String[] split = element.split(",");                    return Long.parseLong(split[2]) * 1000;                }            }));    // TODO: 2021/12/1 设置map并行度为2     SingleOutputStreamOperator<testWM> mapDS = sourceWithWM.map(r -> {        String[] split = r.split(",");        return new testWM(Integer.parseInt(split[0]), Integer.parseInt(split[1]),Long.parseLong(split[2]));    }).setParallelism(2);    SingleOutputStreamOperator<String> resultDS = mapDS.keyBy(r -> r.getId())            .window(TumblingEventTimeWindows.of(Time.seconds(5)))            .process(new ProcessWindowFunction<testWM, String, Integer, TimeWindow>() {                @Override                public void process(Integer integer, Context context, Iterable<testWM> elements, Collector<String> out) throws Exception {                    out.collect("我关窗啦");                }            });    resultDS.print();    env.execute();}

}
@Data
@AllArgsConstructor
class testWM{

private int id;private int num;private long ts;

}
代码二:在Map后设置WaterMark
public class WMTest {

public static void main(String[] args) throws Exception {    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    DataStreamSource<String> source = env.socketTextStream("localhost", 9999);    // TODO: 2021/12/1 设置map并行度为2    SingleOutputStreamOperator<testWM> mapDS = source.map(r -> {        String[] split = r.split(",");        return new testWM(Integer.parseInt(split[0]), Integer.parseInt(split[1]),Long.parseLong(split[2]));    }).setParallelism(2);    // TODO: 2021/12/1  在map后增加watermark    SingleOutputStreamOperator<testWM> mapWithWM = mapDS.assignTimestampsAndWatermarks(WatermarkStrategy            .<testWM>forMonotonousTimestamps()            .withTimestampAssigner(new SerializableTimestampAssigner<testWM>() {                @Override                public long extractTimestamp(testWM element, long recordTimestamp) {                    return element.getTs() * 1000;                }            }));    SingleOutputStreamOperator<String> resultDS = mapWithWM.keyBy(r -> r.getId())            .window(TumblingEventTimeWindows.of(Time.seconds(5)))            .process(new ProcessWindowFunction<testWM, String, Integer, TimeWindow>() {                @Override                public void process(Integer integer, Context context, Iterable<testWM> elements, Collector<String> out) throws Exception {                    out.collect("我关窗啦");                }            });    resultDS.print();    env.execute();}

}
@Data
@AllArgsConstructor
class testWM{

private int id;private int num;private long ts;

}
运行后果:
对于第一种,在source后增加watermark的后果如下:

当1,1,1这条数据进入时,开启了[0,5)这个窗口,当1,1,9这条数据进入时,watermark升至9000(疏忽watermark的减1).窗口敞开,没有问题

对于第二种,在map后增加watermark的后果如下:

能够很显著的发现,当第一条1,1,9进入时,[0,5)这个窗口并没有敞开.直到第二条1,1,9进入时,窗口才被敞开,这是为什么?

我针对以上两种状况画了下图来了解.

图一.图二描述了source之后设置watermark的场景,一般来说,这是咱们生产中须要的

WaterMark以播送的模式向上游发送,大数据培训并且如果同时接管上游多条并行度的WaterMark时,以小的为准

这就导致图三(Map后设置WaterMark)中,我须要发送两条足够[0,5)这个窗口敞开的数据,能力真正敞开窗口,因为数据要通过轮询能力达到每个并行度。

拓展:
在KafkaSource中,曾经做了很好得优化,在生产中咱们个别设置并行度与topic分区数雷同

如果设置得并行度比topic分区数多,那必然有并行度生产不到数据,就会导致WaterMark始终放弃在Long.min_value.

当这种WaterMark向上游播送之后,会导致所有失常并行度的窗口全副无奈敞开,因为WaterMark取了各个并行度的最小值

然而当这种状态放弃一段时间之后,程序会在计算WaterMark的时候,主动过滤掉迟迟没有数据的并行度传进来的WaterMark,这就是KafkaSource的优化.