问题的提出
对于WaterMark设计的地位是否会影响窗口的失常开闭?
上面我模仿了两种情景(source并行度为1,map并行度为2),别离是
1.在source后设置watermark,通过map后开窗
2.在map后设置watermark,而后开窗
ps: 上面的两种代码我都设置了天然增长的watermark,窗口工夫都是5秒,只是设置watermark的地位不同
watermark是testWM对象的ts字段*1000
代码一:在Source后增加WaterMark
public class WMTest {
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.socketTextStream("localhost", 9999); // TODO: 2021/12/1 在source后设置watermark SingleOutputStreamOperator<String> sourceWithWM = source.assignTimestampsAndWatermarks(WatermarkStrategy .<String>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<String>() { @Override public long extractTimestamp(String element, long recordTimestamp) { String[] split = element.split(","); return Long.parseLong(split[2]) * 1000; } })); // TODO: 2021/12/1 设置map并行度为2 SingleOutputStreamOperator<testWM> mapDS = sourceWithWM.map(r -> { String[] split = r.split(","); return new testWM(Integer.parseInt(split[0]), Integer.parseInt(split[1]),Long.parseLong(split[2])); }).setParallelism(2); SingleOutputStreamOperator<String> resultDS = mapDS.keyBy(r -> r.getId()) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction<testWM, String, Integer, TimeWindow>() { @Override public void process(Integer integer, Context context, Iterable<testWM> elements, Collector<String> out) throws Exception { out.collect("我关窗啦"); } }); resultDS.print(); env.execute();}
}
@Data
@AllArgsConstructor
class testWM{
private int id;private int num;private long ts;
}
代码二:在Map后设置WaterMark
public class WMTest {
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.socketTextStream("localhost", 9999); // TODO: 2021/12/1 设置map并行度为2 SingleOutputStreamOperator<testWM> mapDS = source.map(r -> { String[] split = r.split(","); return new testWM(Integer.parseInt(split[0]), Integer.parseInt(split[1]),Long.parseLong(split[2])); }).setParallelism(2); // TODO: 2021/12/1 在map后增加watermark SingleOutputStreamOperator<testWM> mapWithWM = mapDS.assignTimestampsAndWatermarks(WatermarkStrategy .<testWM>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<testWM>() { @Override public long extractTimestamp(testWM element, long recordTimestamp) { return element.getTs() * 1000; } })); SingleOutputStreamOperator<String> resultDS = mapWithWM.keyBy(r -> r.getId()) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction<testWM, String, Integer, TimeWindow>() { @Override public void process(Integer integer, Context context, Iterable<testWM> elements, Collector<String> out) throws Exception { out.collect("我关窗啦"); } }); resultDS.print(); env.execute();}
}
@Data
@AllArgsConstructor
class testWM{
private int id;private int num;private long ts;
}
运行后果:
对于第一种,在source后增加watermark的后果如下:
当1,1,1这条数据进入时,开启了[0,5)这个窗口,当1,1,9这条数据进入时,watermark升至9000(疏忽watermark的减1).窗口敞开,没有问题
对于第二种,在map后增加watermark的后果如下:
能够很显著的发现,当第一条1,1,9进入时,[0,5)这个窗口并没有敞开.直到第二条1,1,9进入时,窗口才被敞开,这是为什么?
我针对以上两种状况画了下图来了解.
图一.图二描述了source之后设置watermark的场景,一般来说,这是咱们生产中须要的
WaterMark以播送的模式向上游发送,大数据培训并且如果同时接管上游多条并行度的WaterMark时,以小的为准
这就导致图三(Map后设置WaterMark)中,我须要发送两条足够[0,5)这个窗口敞开的数据,能力真正敞开窗口,因为数据要通过轮询能力达到每个并行度。
拓展:
在KafkaSource中,曾经做了很好得优化,在生产中咱们个别设置并行度与topic分区数雷同
如果设置得并行度比topic分区数多,那必然有并行度生产不到数据,就会导致WaterMark始终放弃在Long.min_value.
当这种WaterMark向上游播送之后,会导致所有失常并行度的窗口全副无奈敞开,因为WaterMark取了各个并行度的最小值
然而当这种状态放弃一段时间之后,程序会在计算WaterMark的时候,主动过滤掉迟迟没有数据的并行度传进来的WaterMark,这就是KafkaSource的优化.