乐趣区

关于大数据:Flink多并行度下WaterMark的设计区别

问题的提出
对于 WaterMark 设计的地位是否会影响窗口的失常开闭?

上面我模仿了两种情景(source 并行度为 1,map 并行度为 2), 别离是
1. 在 source 后设置 watermark, 通过 map 后开窗
2. 在 map 后设置 watermark, 而后开窗

ps: 上面的两种代码我都设置了天然增长的 watermark, 窗口工夫都是 5 秒, 只是设置 watermark 的地位不同

            watermark 是 testWM 对象的 ts 字段 *1000

代码一: 在 Source 后增加 WaterMark
public class WMTest {

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
    // TODO: 2021/12/1 在 source 后设置 watermark 
    SingleOutputStreamOperator<String> sourceWithWM = source.assignTimestampsAndWatermarks(WatermarkStrategy
            .<String>forMonotonousTimestamps()
            .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                @Override
                public long extractTimestamp(String element, long recordTimestamp) {String[] split = element.split(",");
                    return Long.parseLong(split[2]) * 1000;
                }
            }));
    // TODO: 2021/12/1 设置 map 并行度为 2 
    SingleOutputStreamOperator<testWM> mapDS = sourceWithWM.map(r -> {String[] split = r.split(",");
        return new testWM(Integer.parseInt(split[0]), Integer.parseInt(split[1]),Long.parseLong(split[2]));
    }).setParallelism(2);
    SingleOutputStreamOperator<String> resultDS = mapDS.keyBy(r -> r.getId())
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .process(new ProcessWindowFunction<testWM, String, Integer, TimeWindow>() {
                @Override
                public void process(Integer integer, Context context, Iterable<testWM> elements, Collector<String> out) throws Exception {out.collect("我关窗啦");
                }
            });
    resultDS.print();
    env.execute();}

}
@Data
@AllArgsConstructor
class testWM{

private int id;
private int num;
private long ts;

}
代码二: 在 Map 后设置 WaterMark
public class WMTest {

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
    // TODO: 2021/12/1 设置 map 并行度为 2
    SingleOutputStreamOperator<testWM> mapDS = source.map(r -> {String[] split = r.split(",");
        return new testWM(Integer.parseInt(split[0]), Integer.parseInt(split[1]),Long.parseLong(split[2]));
    }).setParallelism(2);
    // TODO: 2021/12/1  在 map 后增加 watermark
    SingleOutputStreamOperator<testWM> mapWithWM = mapDS.assignTimestampsAndWatermarks(WatermarkStrategy
            .<testWM>forMonotonousTimestamps()
            .withTimestampAssigner(new SerializableTimestampAssigner<testWM>() {
                @Override
                public long extractTimestamp(testWM element, long recordTimestamp) {return element.getTs() * 1000;
                }
            }));
    SingleOutputStreamOperator<String> resultDS = mapWithWM.keyBy(r -> r.getId())
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .process(new ProcessWindowFunction<testWM, String, Integer, TimeWindow>() {
                @Override
                public void process(Integer integer, Context context, Iterable<testWM> elements, Collector<String> out) throws Exception {out.collect("我关窗啦");
                }
            });
    resultDS.print();
    env.execute();}

}
@Data
@AllArgsConstructor
class testWM{

private int id;
private int num;
private long ts;

}
运行后果:
对于第一种, 在 source 后增加 watermark 的后果如下:

当 1,1,1 这条数据进入时, 开启了 [0,5) 这个窗口, 当 1,1,9 这条数据进入时,watermark 升至 9000(疏忽 watermark 的减 1). 窗口敞开, 没有问题

对于第二种, 在 map 后增加 watermark 的后果如下:

能够很显著的发现, 当第一条 1,1,9 进入时,[0,5)这个窗口并没有敞开. 直到第二条 1,1,9 进入时, 窗口才被敞开, 这是为什么?

我针对以上两种状况画了下图来了解.

图一. 图二描述了 source 之后设置 watermark 的场景, 一般来说, 这是咱们生产中须要的

WaterMark 以播送的模式向上游发送, 大数据培训并且如果同时接管上游多条并行度的 WaterMark 时, 以小的为准

这就导致图三 (Map 后设置 WaterMark) 中, 我须要发送两条足够 [0,5) 这个窗口敞开的数据, 能力真正敞开窗口, 因为数据要通过轮询能力达到每个并行度。

拓展:
在 KafkaSource 中, 曾经做了很好得优化, 在生产中咱们个别设置并行度与 topic 分区数雷同

如果设置得并行度比 topic 分区数多, 那必然有并行度生产不到数据, 就会导致 WaterMark 始终放弃在 Long.min_value.

当这种 WaterMark 向上游播送之后, 会导致所有失常并行度的窗口全副无奈敞开, 因为 WaterMark 取了各个并行度的最小值

然而当这种状态放弃一段时间之后, 程序会在计算 WaterMark 的时候, 主动过滤掉迟迟没有数据的并行度传进来的 WaterMark, 这就是 KafkaSource 的优化.

退出移动版