关于flink:透过窗口看无限数据流Flink的Window全面解析

39次阅读

共计 27500 个字符,预计需要花费 69 分钟才能阅读完成。

  • 欢送关注我的公众号:大数据技术与数仓
  • 收费支付百 G 大数据资料

窗口是流式计算中十分罕用的算子之一,通过窗口能够将有限流切分成无限流,而后在每个窗口之上应用计算函数,能够实现非常灵活的操作。Flink 提供了丰盛的窗口操作,除此之外,用户还能够依据本人的解决场景自定义窗口。通过本文,你能够理解到:

  • 窗口的基本概念和简略应用
  • 内置 Window Assigners 的分类、源码及应用
  • Window Function 的分类及应用
  • 窗口的组成部分及生命周期源码解读
  • 残缺的窗口应用 Demo 案例

Quick Start

是什么

Window(窗口)是解决无界流的外围算子,Window 能够将数据流分为固定大小的 ” 桶(buckets)”(即通过依照固定工夫或长度将数据流切分成不同的窗口),在每一个窗口上,用户能够应用一些计算函数对窗口内的数据进行解决,从而失去肯定工夫范畴内的统计后果。比方统计每隔 5 分钟输入最近一小时内点击量最多的前 N 个商品,这样就能够应用一个小时的工夫窗口将数据限定在固定工夫范畴内,而后能够对该范畴内的有界数据执行聚合解决。

依据作用的数据流 (DataStream、KeyedStream),Window 能够分为两种:Keyed WindowsNon-Keyed Windows。其中 Keyed Windows 是在 KeyedStream 上应用 window(…)操作,产生一个 WindowedStream。Non-Keyed Windows 是在 DataStream 上应用 windowAll(…)操作,产生一个 AllWindowedStream。具体的转换关系如下图所示。留神:个别不举荐应用AllWindowedStream,因为在一般流上进行窗口操作,会将所有分区的流都会集到单个的 Task 中,即并行度为 1,从而会影响性能。

如何用

下面咱们介绍了什么是窗口,那么该如何应用窗口呢? 具体如上面的代码片段:

Keyed Windows

stream
       .keyBy(...)               // keyedStream 上应用 window
       .window(...)              // 必选: 指定窗口分配器(window assigner)
      [.trigger(...)]            // 可选: 指定触发器(trigger), 如果不指定,则应用默认值
      [.evictor(...)]            // 可选: 指定清除器(evictor), 如果不指定,则没有
      [.allowedLateness(...)]    // 可选: 指定是否提早解决数据,如果不指定,默认应用 0 
      [.sideOutputLateData(...)] // 可选: 配置 side output,如果不指定,则没有
       .reduce/aggregate/fold/apply() // 必选: 指定窗口计算函数
      [.getSideOutput(...)]      // 可选: 从 side output 中获取数据

Non-Keyed Windows

stream
       .windowAll(...)           // 必选: 指定窗口分配器(window assigner)
      [.trigger(...)]            // 可选: 指定触发器(trigger), 如果不指定,则应用默认值
      [.evictor(...)]            // 可选: 指定清除器(evictor), 如果不指定,则没有
      [.allowedLateness(...)]    // 可选: 指定是否提早解决数据,如果不指定,默认应用 0
      [.sideOutputLateData(...)] // 可选: 配置 side output,如果不指定,则没有
       .reduce/aggregate/fold/apply() // 必选: 指定窗口计算函数
      [.getSideOutput(...)]      // 可选: 从 side output 中获取数据

简写 window 操作

下面的代码片段中,要在 keyedStream 上应用 window(…)或者在 DataStream 上应用 windowAll(…),须要传入一个 window assigner 的参数,对于 window assigner 下文会进行具体解释。如上面代码片段:

// -------------------------------------------
//  Keyed Windows
// -------------------------------------------
stream
       .keyBy(id)               
       .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 5S 的滚动窗口
       .reduce(MyReduceFunction)
// -------------------------------------------
//  Non-Keyed Windows
// -------------------------------------------
stream               
       .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // 5S 的滚动窗口
       .reduce(MyReduceFunction)
    

下面的代码能够简写为:

// -------------------------------------------
//  Keyed Windows
// -------------------------------------------
stream
       .keyBy(id)               
       .timeWindow(Time.seconds(5)) // 5S 的滚动窗口
       .reduce(MyReduceFunction)
// -------------------------------------------
//  Non-Keyed Windows
// -------------------------------------------
stream               
       .timeWindowAll(Time.seconds(5)) // 5S 的滚动窗口
       .reduce(MyReduceFunction)
    

对于下面的简写,以 KeyedStream 为例,对于看一下具体的 KeyedStream 源码片段,能够看出底层调用的还是非简写时的代码。对于 timeWindowAll()的代码也是一样的,能够参考 DataStream 源码,这里不再赘述。

// 会依据用户的应用的工夫类型,调用不同的内置 window Assigner
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {return window(TumblingProcessingTimeWindows.of(size));
        } else {return window(TumblingEventTimeWindows.of(size));
        }
    }

Window Assigners

分类

WindowAssigner 负责将输出的数据调配到一个或多个窗口,Flink 内置了许多 WindowAssigner,这些 WindowAssigner 能够满足大部分的应用场景。比方tumbling windows, sliding windows, session windows , global windows。如果这些内置的 WindowAssigner 不能满足你的需要,能够通过继承 WindowAssigner 类实现自定义的 WindowAssigner。

下面的 WindowAssigner 是基于工夫的(time-based windows),除此之外,Flink 还提供了基于数量的窗口(count-based windows), 即依据窗口的元素数量定义窗口大小,这种状况下,如果数据存在乱序,将导致窗口计算结果不确定。本文重点介绍基于工夫的窗口应用,因为篇幅无限,对于基于数量的窗口将不做探讨。

应用介绍

上面将会对 Flink 内置的四种基于工夫的 windowassigner,进行一一剖析。

Tumbling Windows

  • 图解

Tumbling Windows(滚动窗口)是将数据调配到确定的窗口中,依据固定工夫或大小进行切分,每个窗口有固定的大小且窗口之间不存在重叠(如下图所示)。这种比较简单,实用于依照周期统计某一指标的场景。

对于工夫的抉择,能够应用 Event Time 或者 Processing Time,别离对应的 window assigner 为:TumblingEventTimeWindows、TumblingProcessingTimeWindows。用户能够应用 window assigner 的 of(size)办法指定工夫距离,其中工夫单位能够是 Time.milliseconds(x)、Time.seconds(x)或 Time.minutes(x)等。

  • 应用
// 应用 EventTime
datastream
           .keyBy(id)
           .window(TumblingEventTimeWindows.of(Time.seconds(10)))
           .process(new MyProcessFunction())
// 应用 processing-time
datastream
           .keyBy(id)
           .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
           .process(new MyProcessFunction())

Sliding Windows

  • 图解

Sliding Windows(滑动窗口)在滚动窗口之上加了一个滑动窗口的工夫,这种类型的窗口是会存在窗口重叠的(如下图所示)。滚动窗口是依照窗口固定的工夫大小向前滚动,而滑动窗口是依据设定的滑动工夫向前滑动。窗口之间的重叠局部的大小取决于窗口大小与滑动的工夫大小,当滑动工夫小于窗口工夫大小时便会呈现重叠。当滑动工夫大于窗口工夫大小时,会呈现窗口不间断的状况,导致数据可能不属于任何一个窗口。当两者相等时,其性能就和滚动窗口雷同了。滑动窗口的应用场景是:用户依据设定的统计周期来计算指定窗口工夫大小的指标,比方每隔 5 分钟输入最近一小时内点击量最多的前 N 个商品。

对于工夫的抉择,能够应用 Event Time 或者 Processing Time,别离对应的 window assigner 为:SlidingEventTimeWindows、SlidingProcessingTimeWindows。用户能够应用 window assigner 的 of(size)办法指定工夫距离,其中工夫单位能够是 Time.milliseconds(x)、Time.seconds(x)或 Time.minutes(x)等。

  • 应用
// 应用 EventTime
datastream
           .keyBy(id)
           .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
           .process(new MyProcessFunction())
// 应用 processing-time
datastream
           .keyBy(id)
           .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
           .process(new MyProcessFunction())

Session Windows

  • 图解

Session Windows(会话窗口)次要是将某段时间内活跃度较高的数据聚合成一个窗口进行计算,窗口的触发的条件是 Session Gap,是指在规定的工夫内如果没有数据沉闷接入,则认为窗口完结,而后触发窗口计算结果。须要留神的是如果数据始终不间断地进入窗口,也会导致窗口始终不触发的状况。与滑动窗口、滚动窗口不同的是,Session Windows 不须要有固定窗口大小 (window size) 和滑动工夫(slide time),只须要定义 session gap,来规定不沉闷数据的工夫下限即可。如下图所示。Session Windows 窗口类型比拟适宜非连续型数据处理或周期性产生数据的场景,依据用户在线上某段时间内的活跃度对用户行为数据进行统计。

对于工夫的抉择,能够应用 Event Time 或者 Processing Time,别离对应的 window assigner 为:EventTimeSessionWindows 和 ProcessTimeSessionWindows。用户能够应用 window assigner 的 withGap()办法指定工夫距离,其中工夫单位能够是 Time.milliseconds(x)、Time.seconds(x)或 Time.minutes(x)等。

  • 应用
// 应用 EventTime
datastream
           .keyBy(id)
           .window((EventTimeSessionWindows.withGap(Time.minutes(15)))
           .process(new MyProcessFunction())
// 应用 processing-time
datastream
           .keyBy(id)
           .window(ProcessingTimeSessionWindows.withGap(Time.minutes(15)))
           .process(new MyProcessFunction())

留神:因为 session window 的开始工夫与完结工夫取决于接管的数据。windowassigner 不会立刻调配所有的元素到正确的窗口,SessionWindow 会为每个接管的元素初始化一个以该元素的工夫戳为开始工夫的窗口,应用 session gap 作为窗口大小,而后再合并重叠局部的窗口。所以,session window 操作须要指定用于合并的 Trigger 和 Window Function,比方ReduceFunction, AggregateFunction, or ProcessWindowFunction

Global Windows

  • 图解

Global Windows(全局窗口)将所有雷同的 key 的数据调配到单个窗口中计算结果,窗口没有起始和完结工夫,窗口须要借助于 Triger 来触发计算,如果不对 Global Windows 指定 Triger,窗口是不会触发计算的。因而,应用 Global Windows 须要十分谨慎,用户须要十分明确本人在整个窗口中统计出的后果是什么,并指定对应的触发器,同时还须要有指定相应的数据清理机制,否则数据将始终留在内存中。

  • 应用
datastream
    .keyBy(id)
    .window(GlobalWindows.create())
    .process(new MyProcessFunction())

Window Functions

分类

Flink 提供了两大类窗口函数,别离为增量聚合函数和全量窗口函数。其中增量聚合函数的性能要比全量窗口函数高,因为增量聚合窗口是基于两头后果状态计算最终后果的,即窗口中只保护一个两头后果状态,不要缓存所有的窗口数据。相同,对于全量窗口函数而言,须要对所以进入该窗口的数据进行缓存,等到窗口触发时才会遍历窗口内所有数据,进行后果计算。如果窗口数据量比拟大或者窗口工夫较长,就会消耗很多的资源缓存数据,从而导致性能降落。

  • 增量聚合函数

    包含:ReduceFunction、AggregateFunction 和 FoldFunction

  • 全量窗口函数

    包含:ProcessWindowFunction

应用介绍

ReduceFunction

输出两个雷同类型的数据元素依照指定的计算方法进行聚合,而后输入类型雷同的一个后果元素。要求输出元素的数据类型与输入元素的数据类型必须统一。实现的成果是应用上一次的后果值与以后值进行聚合。具体应用案例如下:

public class ReduceFunctionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 模仿数据源
        SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(Tuple3.of(1L, 10, 1588491228L),
                Tuple3.of(1L, 15, 1588491229L),
                Tuple3.of(1L, 20, 1588491238L),
                Tuple3.of(1L, 25, 1588491248L),
                Tuple3.of(2L, 10, 1588491258L),
                Tuple3.of(2L, 30, 1588491268L),
                Tuple3.of(2L, 20, 1588491278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
            @Override
            public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {return element.f2 * 1000;}
        });

        input
                .map(new MapFunction<Tuple3<Long, Integer, Long>, Tuple2<Long, Integer>>() {
                    @Override
                    public Tuple2<Long, Integer> map(Tuple3<Long, Integer, Long> value) {return Tuple2.of(value.f0, value.f1);
                    }
                })
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .reduce(new ReduceFunction<Tuple2<Long, Integer>>() {
                    @Override
                    public Tuple2<Long, Integer> reduce(Tuple2<Long, Integer> value1, Tuple2<Long, Integer> value2) throws Exception {
                        // 依据第一个元素分组,求第二个元素的累计和
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                }).print();

        env.execute("ReduceFunctionExample");
    }
}

AggregateFunction

与 ReduceFunction 类似,AggregateFunction 也是基于中间状态计算结果的增量计算函数,相比 ReduceFunction,AggregateFunction 在窗口计算上更加灵便,然而实现略微简单,须要实现 AggregateFunction 接口,重写四个办法。其最大的劣势就是两头后果的数据类型和最终的后果类型不依赖于输出的数据类型。对于 AggregateFunction 的源码,如下所示:

/** 
*  @param <IN>  输出元素的数据类型
 * @param <ACC> 两头聚合后果的数据类型
 * @param <OUT> 最终聚合后果的数据类型
 */
@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {

    /**
     * 创立一个新的累加器
     */
    ACC createAccumulator();

    /**
     * 将新的数据与累加器进行聚合,返回一个新的累加器
     */
    ACC add(IN value, ACC accumulator);

    /**
     从累加器中计算最终后果并返回
     */
    OUT getResult(ACC accumulator);

    /**
     * 合并两个累加器并返回后果
     */
    ACC merge(ACC a, ACC b);
}

具体应用代码案例如下:

public class AggregateFunctionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 模仿数据源
        SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(Tuple3.of(1L, 10, 1588491228L),
                Tuple3.of(1L, 15, 1588491229L),
                Tuple3.of(1L, 20, 1588491238L),
                Tuple3.of(1L, 25, 1588491248L),
                Tuple3.of(2L, 10, 1588491258L),
                Tuple3.of(2L, 30, 1588491268L),
                Tuple3.of(2L, 20, 1588491278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
            @Override
            public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {return element.f2 * 1000;}
        });

        input.keyBy(0)
             .window(TumblingEventTimeWindows.of(Time.seconds(10)))
             .aggregate(new MyAggregateFunction()).print();
        env.execute("AggregateFunctionExample");

    }

    private static class MyAggregateFunction implements AggregateFunction<Tuple3<Long, Integer, Long>,Tuple2<Long,Integer>,Tuple2<Long,Integer>> {
        /**
         * 创立一个累加器, 初始化值
         * @return
         */
        @Override
        public Tuple2<Long, Integer> createAccumulator() {return Tuple2.of(0L,0);
        }

        /**
         *
         * @param value 输出的元素值
         * @param accumulator 两头后果值
         * @return
         */
        @Override
        public Tuple2<Long, Integer> add(Tuple3<Long, Integer, Long> value, Tuple2<Long, Integer> accumulator) {return Tuple2.of(value.f0,value.f1 + accumulator.f1);
        }

        /**
         * 获取计算结果值
         * @param accumulator
         * @return
         */
        @Override
        public Tuple2<Long, Integer> getResult(Tuple2<Long, Integer> accumulator) {return Tuple2.of(accumulator.f0,accumulator.f1);
        }

        /**
         * 合并两头后果值
         * @param a 两头后果值 a
         * @param b 两头后果值 b
         * @return
         */
        @Override
        public Tuple2<Long, Integer> merge(Tuple2<Long, Integer> a, Tuple2<Long, Integer> b) {return Tuple2.of(a.f0,a.f1 + b.f1);
        }
    }
}

FoldFunction

FoldFunction 定义了如何将窗口中的输出元素与内部的元素合并的逻辑, 该接口已标记过期,倡议用户应用 AggregateFunction 来替换应用 FoldFunction。

public class FoldFunctionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 模仿数据源
        SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(Tuple3.of(1L, 10, 1588491228L),
                Tuple3.of(1L, 15, 1588491229L),
                Tuple3.of(1L, 20, 1588491238L),
                Tuple3.of(1L, 25, 1588491248L),
                Tuple3.of(2L, 10, 1588491258L),
                Tuple3.of(2L, 30, 1588491268L),
                Tuple3.of(2L, 20, 1588491278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
            @Override
            public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {return element.f2 * 1000;}
        });

        input.keyBy(0)
             .window(TumblingEventTimeWindows.of(Time.seconds(10)))
             .fold("用户",new FoldFunction<Tuple3<Long, Integer, Long>,String>() {
                 @Override
                 public String fold(String accumulator, Tuple3<Long, Integer, Long> value) throws Exception {
                    // 为第一个元素的值拼接一个 "用户" 字符串, 进行输入
                     return accumulator + value.f0 ;
                 }
             }).print();

        env.execute("FoldFunctionExample");

    }
}

ProcessWindowFunction

后面提到的 ReduceFunction 和 AggregateFunction 都是基于中间状态实现增量计算的窗口函数。有些时候须要应用整个窗口的所有数据进行计算,比方求中位数和众数。另外,ProcessWindowFunction 的 Context 对象能够拜访窗口的一些元数据信息,比方窗口完结工夫、水位线等。ProcessWindowsFunction 可能更加灵便地反对基于窗口全副数据元素的后果计算。

在零碎外部,由 ProcessWindowFunction 解决的窗口会将所有已调配的数据存储到 ListState 中,通过将数据收集起来且提供对于窗口的元数据及其他一些个性的拜访和应用,利用场景比 ReduceFunction 和 AggregateFunction 更加宽泛。对于 ProcessWindowFunction 抽象类的源码,如下所示:

/**
 * @param <IN> 输出的数据类型
 * @param <OUT> 输入的数据类型
 * @param <KEY> key 的数据类型
 * @param <W> window 的类型
 */
@PublicEvolving
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
    private static final long serialVersionUID = 1L;
    /**
     * 计算窗口数据,输入 0 个或多个元素
     * @param key 窗口的 key
     * @param context 窗口的上下文
     * @param elements 窗口内的所有元素
     * @param out 输入元素的 collector 对象
     * @throws Exception
     */
    public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
    /**
     * 当窗口被销毁时,删除状态
     * @param context
     * @throws Exception
     */
    public void clear(Context context) throws Exception {}
    //context 能够拜访窗口的元数据信息.
    public abstract class Context implements java.io.Serializable {
    // 返回以后被计算的窗口
        public abstract W window();
    // 返回以后 processing time. 
        public abstract long currentProcessingTime();
    // 返回以后 event-time 水位线.
        public abstract long currentWatermark();
    // 每个 key 和每个 window 的状态拜访器
        public abstract KeyedStateStore windowState();
    // 每个 key 的 global state 的状态拜访器.
        public abstract KeyedStateStore globalState();
        /**
         * 向 side output 输入数据
         * @param outputTag the {@code OutputTag}  side output 输入的标识.
         * @param value 输入的数据.
         */
        public abstract <X> void output(OutputTag<X> outputTag, X value);
    }
}

具体的应用案例如下:

public class ProcessWindowFunctionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 模仿数据源
        SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(Tuple3.of(1L, 10, 1588491228L),
                Tuple3.of(1L, 15, 1588491229L),
                Tuple3.of(1L, 20, 1588491238L),
                Tuple3.of(1L, 25, 1588491248L),
                Tuple3.of(2L, 10, 1588491258L),
                Tuple3.of(2L, 30, 1588491268L),
                Tuple3.of(2L, 20, 1588491278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
            @Override
            public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {return element.f2 * 1000;}
        });

        input.keyBy(t -> t.f0)
             .window(TumblingEventTimeWindows.of(Time.seconds(10)))
             .process(new MyProcessWindowFunction())
             .print();}

    private static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<Long, Integer, Long>,Tuple3<Long,String,Integer>,Long,TimeWindow> {
        @Override
        public void process(
                Long aLong,
                Context context,
                Iterable<Tuple3<Long, Integer, Long>> elements,
                Collector<Tuple3<Long, String, Integer>> out) throws Exception {
            int count = 0;
            for (Tuple3<Long, Integer, Long> in: elements) {count++;}
            // 统计每个窗口数据个数,加上窗口输入
            out.collect(Tuple3.of(aLong,"" + context.window(),count));
        }
    }
}

增量聚合函数和 ProcessWindowFunction 整合

ProcessWindowFunction 提供了很弱小的性能,然而惟一的毛病就是须要更大的状态存储数据。在很多时候,增量聚合的应用是十分频繁的,那么如何实现既反对增量聚合又反对拜访窗口元数据的操作呢?能够将 ReduceFunction 和 AggregateFunction 与 ProcessWindowFunction 整合在一起应用。通过这种组合形式,调配给窗口的元素会立刻被执行计算,当窗口触发时,会把聚合的后果传给 ProcessWindowFunction,这样 ProcessWindowFunction 的 process 办法的 Iterable 参数被就只有一个值,即增量聚合的后果。

  • ReduceFunction 与 ProcessWindowFunction 组合
public class ReduceProcessWindowFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 模仿数据源
        SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(Tuple3.of(1L, 10, 1588491228L),
                Tuple3.of(1L, 15, 1588491229L),
                Tuple3.of(1L, 20, 1588491238L),
                Tuple3.of(1L, 25, 1588491248L),
                Tuple3.of(2L, 10, 1588491258L),
                Tuple3.of(2L, 30, 1588491268L),
                Tuple3.of(2L, 20, 1588491278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
            @Override
            public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {return element.f2 * 1000;}
        });

        input.map(new MapFunction<Tuple3<Long, Integer, Long>, Tuple2<Long, Integer>>() {
            @Override
            public Tuple2<Long, Integer> map(Tuple3<Long, Integer, Long> value) {return Tuple2.of(value.f0, value.f1);
            }
        })
             .keyBy(t -> t.f0)
             .window(TumblingEventTimeWindows.of(Time.seconds(10)))
             .reduce(new MyReduceFunction(),new MyProcessWindowFunction())
             .print();

        env.execute("ProcessWindowFunctionExample");
    }

    private static class MyReduceFunction implements ReduceFunction<Tuple2<Long, Integer>> {
        @Override
        public Tuple2<Long, Integer> reduce(Tuple2<Long, Integer> value1, Tuple2<Long, Integer> value2) throws Exception {
            // 增量求和
            return Tuple2.of(value1.f0,value1.f1 + value2.f1);
        }
    }

    private static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<Long,Integer>,Tuple3<Long,Integer,String>,Long,TimeWindow> {
        @Override
        public void process(Long aLong, Context ctx, Iterable<Tuple2<Long, Integer>> elements, Collector<Tuple3<Long, Integer, String>> out) throws Exception {
            // 将求和之后的后果附带窗口完结工夫一起输入
            out.collect(Tuple3.of(aLong,elements.iterator().next().f1,"window_end" + ctx.window().getEnd()));
        }
    }
}
  • AggregateFunction 与 ProcessWindowFunction 组合
public class AggregateProcessWindowFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 模仿数据源
        SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(Tuple3.of(1L, 10, 1588491228L),
                Tuple3.of(1L, 15, 1588491229L),
                Tuple3.of(1L, 20, 1588491238L),
                Tuple3.of(1L, 25, 1588491248L),
                Tuple3.of(2L, 10, 1588491258L),
                Tuple3.of(2L, 30, 1588491268L),
                Tuple3.of(2L, 20, 1588491278L))
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
                    @Override
                    public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {return element.f2 * 1000;}
                });

        input.keyBy(t -> t.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .aggregate(new MyAggregateFunction(),new MyProcessWindowFunction())
                .print();

        env.execute("AggregateFunctionExample");

    }

    private static class MyAggregateFunction implements AggregateFunction<Tuple3<Long, Integer, Long>, Tuple2<Long, Integer>, Tuple2<Long, Integer>> {
        /**
         * 创立一个累加器, 初始化值
         *
         * @return
         */
        @Override
        public Tuple2<Long, Integer> createAccumulator() {return Tuple2.of(0L, 0);
        }

        /**
         * @param value       输出的元素值
         * @param accumulator 两头后果值
         * @return
         */
        @Override
        public Tuple2<Long, Integer> add(Tuple3<Long, Integer, Long> value, Tuple2<Long, Integer> accumulator) {return Tuple2.of(value.f0, value.f1 + accumulator.f1);
        }

        /**
         * 获取计算结果值
         *
         * @param accumulator
         * @return
         */
        @Override
        public Tuple2<Long, Integer> getResult(Tuple2<Long, Integer> accumulator) {return Tuple2.of(accumulator.f0, accumulator.f1);
        }

        /**
         * 合并两头后果值
         *
         * @param a 两头后果值 a
         * @param b 两头后果值 b
         * @return
         */
        @Override
        public Tuple2<Long, Integer> merge(Tuple2<Long, Integer> a, Tuple2<Long, Integer> b) {return Tuple2.of(a.f0, a.f1 + b.f1);
        }
    }

    private static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<Long,Integer>,Tuple3<Long,Integer,String>,Long,TimeWindow> {
        @Override
        public void process(Long aLong, Context ctx, Iterable<Tuple2<Long, Integer>> elements, Collector<Tuple3<Long, Integer, String>> out) throws Exception {
            // 将求和之后的后果附带窗口完结工夫一起输入
            out.collect(Tuple3.of(aLong,elements.iterator().next().f1,"window_end" + ctx.window().getEnd()));
        }
    }
}

window 生命周期解读

生命周期图解

窗口从创立到执行窗口计算再到被革除,须要通过一系列的过程,这个过程就是窗口的生命周期。

首先,当一个元素进入窗口算子之前,会由 WindowAssigner 调配该元素进入哪个或哪几个窗口,如果窗口不存在,则创立窗口。

其次,数据进入了窗口,这时要看有没有应用增量聚合函数,如果应用了增量聚合函数 ReduceFunction 或 AggregateFunction,新退出窗口的元素会立刻触发增量计算,计算的后果作为窗口的内容。如果没有应用增量聚合函数,则会将进入窗口的数据存储到 ListState 状态中,进一步期待窗口触发时,遍历窗口元素进行聚合计算。

而后,每个元素在进入窗口之后会传递至该窗口的触发器,触发器决定了窗口何时被执行计算及何时须要革除本身和保留的内容。触发器能够依据已调配的元素或注册的计时器来决定某些特定时刻执行窗口计算或革除窗口内容。

最初,触发器胜利触发之后的操作取决于应用的窗口函数,如果应用的是增量聚合函数,如 ReduceFunction 或 AggregateFunction,则会间接输入聚合的后果。如果只蕴含一个全量窗口函数,如 ProcessWindowFunction,则会作用窗口的所有元素,执行计算,输入后果。如果组合应用了 ReduceFunction 和 ProcessWindowFunction,即组合应用了增量聚合窗口函数和全量窗口函数,全量窗口函数会作用于增量聚合函数的聚合值,而后再输入最终的后果。

  • 状况 1:仅应用增量聚合窗口函数

  • 状况 2:仅应用全量窗口函数

  • 状况 3:组合应用增量聚合窗口函数与全量窗口函数

分配器(Window Assigners)

WindowAssigner 的作用是将输出的元素调配到一个或多个窗口,当 WindowAssigner 将第一个元素调配到窗口时,就会创立该窗口,所以一个窗口一旦被创立,窗口中必然至多有一个元素。Flink 内置了很多 WindowAssigners, 本文次要探讨基于工夫的 WindowAssigners,这些分配器都继承了 WindowAssigner 抽象类。对于罕用的分配器,上文曾经做了具体解释。上面先来看一下继承关系图:

接下来,将会对 WindowAssigner 抽象类的源码进行剖析,具体代码如下:

/**

  • WindowAssigner 调配一个元素到 0 个或多个窗口
  • 在一个窗口算子外部,元素是依照 key 进行分组的(应用 KeyedStream),
  • 雷同 key 和 window 的元素汇合称之为一个 pane(格子)
  • @param <T> 要调配元素的数据类型
  • @param <W> window 的类型:TimeWindow、GlobalWindow

*/
@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {

private static final long serialVersionUID = 1L;
/**
 * 返回一个向其调配元素的窗口汇合
 * @param element 待调配的元素
 * @param timestamp 元素的工夫戳
 * @param context WindowAssignerContext 对象
 * @return
 */
public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
/**
 * 返回一个与该 WindowAssigner 相干的默认 trigger(触发器)
 * @param env 执行环境
 * @return
 */
public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);

/**
 * 返回一个窗口序列化器
 * @param executionConfig
 * @return
 */
public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
/**
 * 如果元素是基于 event time 调配到窗口的,则返回 true
 * @return
 */
public abstract boolean isEventTime();
/**
 * 该 Context 容许拜访以后的解决工夫 processing time
 */
public abstract static class WindowAssignerContext {

    /**
     * 返回以后的解决工夫
     */
    public abstract long getCurrentProcessingTime();}

}

### 触发器(Triggers)

数据接入窗口后,窗口是否触发 WindowFunciton 计算,取决于窗口是否满足触发条件。Triggers 就是决定窗口何时触发计算并输入后果的条件,Triggers 能够依据工夫或者具体的数据条件进行触发,比方进入窗口元素的个数或者进入窗口的某些特定的元素值等。后面探讨的内置 WindowAssigner 都有各自默认的触发器,当应用的是 Processing Time 时,则当解决工夫超过窗口完结工夫时会被触发。当应用 Event Time 时,当水位线超过窗口完结工夫时会被触发。Flink 在外部提供很多内置的触发器,罕用的次要有 EventTimeTrigger、ProcessTimeTrigger 以及 CountTrigger 等。每种每种触发器都对应于不同的 Window Assigner,例如 Event Time 类型的 Windows 对应的触发器是 EventTimeTrigger,其基本原理是判断以后的 Watermark 是否超过窗口的 EndTime,如果超过则触发对窗口内数据的计算,反之不触发计算。对于下面剖析的内置 WindowAssigner 的默认 trigger,能够从各自的源码中看到,具体列举如下:| 分配器 | 对应的源码 | 默认触发器 |
| --- | --- | --- |
| TumblingEventTimeWindows | public Trigger <object, timewindow=""style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">getDefaultTrigger(StreamExecutionEnvironment env) {return EventTimeTrigger.create(); }</object,> | EventTimeTrigger |
| TumblingProcessingTimeWindows | public Trigger <object, timewindow=""style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">getDefaultTrigger(StreamExecutionEnvironment env) {return ProcessingTimeTrigger.create(); }</object,> | ProcessingTimeTrigger |
| SlidingEventTimeWindows | public Trigger <object, timewindow=""style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">getDefaultTrigger(StreamExecutionEnvironment env) {return EventTimeTrigger.create(); }</object,> | EventTimeTrigger |
| SlidingProcessingTimeWindows | public Trigger <object, timewindow=""style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">getDefaultTrigger(StreamExecutionEnvironment env) {return ProcessingTimeTrigger.create(); }</object,> | ProcessingTimeTrigger |
| EventTimeSessionWindows | public Trigger <object, timewindow=""style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">getDefaultTrigger(StreamExecutionEnvironment env) {return EventTimeTrigger.create(); }</object,> | EventTimeTrigger |
| ProcessingTimeSessionWindows | public Trigger <object, timewindow=""style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">getDefaultTrigger(StreamExecutionEnvironment env) {return ProcessingTimeTrigger.create(); }</object,> | ProcessingTimeTrigger |
| GlobalWindows | public Trigger <object, globalwindow=""style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">getDefaultTrigger(StreamExecutionEnvironment env) {return new NeverTrigger(); }</object,> | NeverTrigger |

这些 Trigger 都继承了 Trigger 抽象类,具体的继承关系,如下图:![](https://upload-images.jianshu.io/upload_images/22116987-5c716e831febced3.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)


对于这些内置的 Trigger 的具体解释如下:| Trigger | 解释 |
| --- | --- |
| EventTimeTrigger | 以后的 Watermark 是否超过窗口的 EndTime,如果超过则触发对窗口内数据的计算,反之不触发计算;|
| ProcessTimeTrigger | 以后的 Processing Time 是否超过窗口的 EndTime,如果超过则触发对窗口内数据的计算,反之不触发计算;|
| ContinuousEventTimeTrigger | 依据间隔时间周期性触发窗口或者 Window 的完结工夫小于以后 EventTime,触发窗口计算;|
| ContinuousProcessingTimeTrigger | 依据间隔时间周期性触发窗口或者 Window 的完结工夫小于以后 ProcessTime,触发窗口计算;|
| CountTrigger | 依据窗口的数据条数是否超过设定的阈值确定是否触发窗口计算;|
| DeltaTrigger | 依据窗口的数据计算出来的 Delta 指标是否超过指定的阈值,判断是否触发窗口计算 |
| PurgingTrigger | 能够将任意触发器作为参数转换为 Purge 类型触发器,计算实现后数据将被清理。|

对于抽象类 Trigger 的源码解释如下:

/**

  • @param <T> 元素的数据类型
  • @param <W> Window 的类型

*/
@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {

private static final long serialVersionUID = -4104633972991191369L;
/**
 * 每个元素被调配到窗口时都会调用该办法,返回一个 TriggerResult 枚举
 * 该枚举蕴含很多触发的类型:CONTINUE、FIRE_AND_PURGE、FIRE、PURGE
 *
 * @param element   进入窗口的元素
 * @param timestamp 进入窗口元素的工夫戳
 * @param window    窗口
 * @param ctx       上下文对象,能够注册计时器 (timer) 回调函数
 * @return
 * @throws Exception
 */
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
/**
 * 当应用 TriggerContext 注册的 processing-time 计时器被触发时, 会调用该办法
 *
 * @param time   触发计时器的工夫戳
 * @param window 计时器触发的 window
 * @param ctx    上下文对象,能够注册计时器 (timer) 回调函数
 * @return
 * @throws Exception
 */
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
/**
 * 当应用 TriggerContext 注册的 event-time 计时器被触发时, 会调用该办法
 *
 * @param time   触发计时器的工夫戳
 * @param window 计时器触发的 window
 * @param ctx    上下文对象,能够注册计时器 (timer) 回调函数
 * @return
 * @throws Exception
 */
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
/**
 * 如果触发器反对合并触发器状态,将返回 true
 *
 * @return
 */
public boolean canMerge() {return false;}

/**
 * 当多个窗口被合并成一个窗口时,会调用该办法
 *
 * @param window 合并之后的 window
 * @param ctx    上下文对象,能够注册计时器回调函数,也能够拜访状态
 * @throws Exception
 */
public void onMerge(W window, OnMergeContext ctx) throws Exception {throw new UnsupportedOperationException("This trigger does not support merging.");
}
/**
 * 革除所有 Trigger 持有的窗口状态
 * 当窗口被销毁时,调用该办法
 *
 * @param window
 * @param ctx
 * @throws Exception
 */
public abstract void clear(W window, TriggerContext ctx) throws Exception;
/**
 * Context 对象,传给 Trigger 的办法参数中,用于注册计时器回调函数和解决状态
 */
public interface TriggerContext {
    // 返回以后解决工夫
    long getCurrentProcessingTime();
    MetricGroup getMetricGroup();
    // 返回以后水位线工夫戳
    long getCurrentWatermark();
    // 注册一个 processing-time 的计时器
    void registerProcessingTimeTimer(long time);
    // 注册一个 EventTime 计时器
    void registerEventTimeTimer(long time);
    //  删除一个 processing-time 的计时器
    void deleteProcessingTimeTimer(long time);
    // 删除一个 EventTime 计时器
    void deleteEventTimeTimer(long time);
    /**
     * 提取状态以后 Trigger 的窗口和 Key 的状态
     */
    <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);

    // 与 getPartitionedState 性能雷同,该办法已被标记过期
    @Deprecated
    <S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);
    // 同 getPartitionedState 性能,该办法已被标记过期
    @Deprecated
    <S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
}
// TriggerContext 的扩大
public interface OnMergeContext extends TriggerContext {
    // 合并每个 window 的状态,状态必须反对合并
    <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);
}

}


下面的源码能够看出, 每当触发器调用时,会产生一个 TriggerResult 对象,该对象是一个枚举类,其包含的属性决定了作用在窗口上的操作是什么。总共有四种行为:CONTINUE、FIRE_AND_PURGE、FIRE、PURGE,对于每种类型的具体含意,咱们先看一下 TriggerResult 源码:

/**

  • 触发器办法的后果类型,决定在窗口上执行什么操作,比方是否调用 window function
  • 或者是否须要销毁窗口
  • 留神:如果一个 Trigger 返回的是 FIRE 或者 FIRE_AND_PURGE,然而窗口中没有任何元素,则窗口函数不会被调用

*/
public enum TriggerResult {

// 什么都不做,以后不触发计算,持续期待
CONTINUE(false, false),

// 执行 window function,输入后果,之后革除所有状态
FIRE_AND_PURGE(true, true),

// 执行 window function,输入后果,窗口不会被革除,数据持续保留
FIRE(true, false),

// 革除窗口外部数据,但不触发计算
PURGE(false, true);

}


### 清除器(Evictors)

Evictors 是一个可选的组件,其次要作用是对进入 WindowFuction 前后的数据进行革除解决。Flink 内置了三种 Evictors:别离为 CountEvictor、DeltaEvictor、TimeEvitor。如果用户不指定 Evictors,也不会有默认值。*   **CountEvictor**:放弃在窗口中具备固定数量的元素,将超过指定窗口元素数量的数据在窗口计算前剔除;*   **DeltaEvictor**:通过定义 DeltaFunction 和指定 threshold,并计算 Windows 中的元素与最新元素之间的 Delta 大小,如果超过 threshold 则将以后数据元素剔除;*   **TimeEvictor**:通过指定工夫距离,将以后窗口中最新元素的工夫减去 Interval,而后将小于该后果的数据全副剔除,其本质是将具备最新工夫的数据抉择进去,删除过期的数据。Evictors 继承关系图如下:![](https://upload-images.jianshu.io/upload_images/22116987-82253d38edd55255.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)


对于 Evictors 接口的源码,如下:

/**

  • 在 WindowFunction 计算之前或者之后进行革除窗口元素
  • @param <T> 元素的数据类型
  • @param <W> 窗口类型

*/
@PublicEvolving
public interface Evictor<T, W extends Window> extends Serializable {

/**
 * 选择性剔除元素,在 windowing function 之前调用
 * @param elements 窗口中的元素
 * @param size  窗口中元素个数
 * @param window 窗口
 * @param evictorContext
 */
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
 * 选择性剔除元素,在 windowing function 之后调用
 * @param elements 窗口中的元素.
 * @param size 窗口中元素个数.
 * @param window 窗口
 * @param evictorContext
 */
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
// 传递给 Evictor 办法参数的值
interface EvictorContext {
    // 返回以后 processing time
    long getCurrentProcessingTime();
    MetricGroup getMetricGroup();
    // 返回以后的水位线工夫戳
    long getCurrentWatermark();}

}


## 小结

本文首先给出了窗口应用的疾速入门,介绍了窗口的基本概念、分类及简略应用。而后对 Flink 内置的 Window Assigner 进行了一一解读,并给出了图解与应用的代码片段。接着对 Flink 的 Window Function 进行介绍,包含窗口函数的分类及具体应用案例。最初剖析了 Window 生命周期所波及的组件,并对每个组件的源码进行剖析。

正文完
 0