共计 3425 个字符,预计需要花费 9 分钟才能阅读完成。
问题的提出
对于 WaterMark 设计的地位是否会影响窗口的失常开闭?
上面我模仿了两种情景(source 并行度为 1,map 并行度为 2), 别离是
1. 在 source 后设置 watermark, 通过 map 后开窗
2. 在 map 后设置 watermark, 而后开窗
ps: 上面的两种代码我都设置了天然增长的 watermark, 窗口工夫都是 5 秒, 只是设置 watermark 的地位不同
watermark 是 testWM 对象的 ts 字段 *1000
代码一: 在 Source 后增加 WaterMark
public class WMTest {
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
// TODO: 2021/12/1 在 source 后设置 watermark
SingleOutputStreamOperator<String> sourceWithWM = source.assignTimestampsAndWatermarks(WatermarkStrategy
.<String>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {String[] split = element.split(",");
return Long.parseLong(split[2]) * 1000;
}
}));
// TODO: 2021/12/1 设置 map 并行度为 2
SingleOutputStreamOperator<testWM> mapDS = sourceWithWM.map(r -> {String[] split = r.split(",");
return new testWM(Integer.parseInt(split[0]), Integer.parseInt(split[1]),Long.parseLong(split[2]));
}).setParallelism(2);
SingleOutputStreamOperator<String> resultDS = mapDS.keyBy(r -> r.getId())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<testWM, String, Integer, TimeWindow>() {
@Override
public void process(Integer integer, Context context, Iterable<testWM> elements, Collector<String> out) throws Exception {out.collect("我关窗啦");
}
});
resultDS.print();
env.execute();}
}
@Data
@AllArgsConstructor
class testWM{
private int id;
private int num;
private long ts;
}
代码二: 在 Map 后设置 WaterMark
public class WMTest {
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
// TODO: 2021/12/1 设置 map 并行度为 2
SingleOutputStreamOperator<testWM> mapDS = source.map(r -> {String[] split = r.split(",");
return new testWM(Integer.parseInt(split[0]), Integer.parseInt(split[1]),Long.parseLong(split[2]));
}).setParallelism(2);
// TODO: 2021/12/1 在 map 后增加 watermark
SingleOutputStreamOperator<testWM> mapWithWM = mapDS.assignTimestampsAndWatermarks(WatermarkStrategy
.<testWM>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<testWM>() {
@Override
public long extractTimestamp(testWM element, long recordTimestamp) {return element.getTs() * 1000;
}
}));
SingleOutputStreamOperator<String> resultDS = mapWithWM.keyBy(r -> r.getId())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<testWM, String, Integer, TimeWindow>() {
@Override
public void process(Integer integer, Context context, Iterable<testWM> elements, Collector<String> out) throws Exception {out.collect("我关窗啦");
}
});
resultDS.print();
env.execute();}
}
@Data
@AllArgsConstructor
class testWM{
private int id;
private int num;
private long ts;
}
运行后果:
对于第一种, 在 source 后增加 watermark 的后果如下:
当 1,1,1 这条数据进入时, 开启了 [0,5) 这个窗口, 当 1,1,9 这条数据进入时,watermark 升至 9000(疏忽 watermark 的减 1). 窗口敞开, 没有问题
对于第二种, 在 map 后增加 watermark 的后果如下:
能够很显著的发现, 当第一条 1,1,9 进入时,[0,5)这个窗口并没有敞开. 直到第二条 1,1,9 进入时, 窗口才被敞开, 这是为什么?
我针对以上两种状况画了下图来了解.
图一. 图二描述了 source 之后设置 watermark 的场景, 一般来说, 这是咱们生产中须要的
WaterMark 以播送的模式向上游发送, 大数据培训并且如果同时接管上游多条并行度的 WaterMark 时, 以小的为准
这就导致图三 (Map 后设置 WaterMark) 中, 我须要发送两条足够 [0,5) 这个窗口敞开的数据, 能力真正敞开窗口, 因为数据要通过轮询能力达到每个并行度。
拓展:
在 KafkaSource 中, 曾经做了很好得优化, 在生产中咱们个别设置并行度与 topic 分区数雷同
如果设置得并行度比 topic 分区数多, 那必然有并行度生产不到数据, 就会导致 WaterMark 始终放弃在 Long.min_value.
当这种 WaterMark 向上游播送之后, 会导致所有失常并行度的窗口全副无奈敞开, 因为 WaterMark 取了各个并行度的最小值
然而当这种状态放弃一段时间之后, 程序会在计算 WaterMark 的时候, 主动过滤掉迟迟没有数据的并行度传进来的 WaterMark, 这就是 KafkaSource 的优化.