Flink窗口背景

Flink认为Batch是Streaming的一个特例,因而Flink底层引擎是一个流式引擎,在下面实现了流解决和批处理。而Window就是从Streaming到Batch的桥梁。艰深讲,Window是用来对一个有限的流设置一个无限的汇合,从而在有界的数据集上进行操作的一种机制。流上的汇合由Window来划定范畴,比方“计算过来10分钟”或者“最初50个元素的和”。Window能够由工夫(Time Window)(比方每30s)或者数据(Count Window)(如每100个元素)驱动。DataStream API提供了Time和Count的Window。

一个Flink窗口利用的大抵骨架构造如下所示:

// Keyed Windowstream  .keyBy(...)               <-  依照一个Key进行分组  .window(...)              <-  将数据流中的元素调配到相应的窗口中  [.trigger(...)]            <-  指定触发器Trigger(可选)  [.evictor(...)]            <-  指定清除器Evictor(可选)    .reduce/aggregate/process()      <-  窗口处理函数Window Function// Non-Keyed Windowstream  .windowAll(...)           <-  不分组,将数据流中的所有元素调配到相应的窗口中  [.trigger(...)]            <-  指定触发器Trigger(可选)  [.evictor(...)]            <-  指定清除器Evictor(可选)    .reduce/aggregate/process()      <-  窗口处理函数Window Function

Flink窗口的骨架构造中有两个必须的两个操作:

  • 应用窗口分配器(WindowAssigner)将数据流中的元素调配到对应的窗口。
  • 当满足窗口触发条件后,对窗口内的数据应用窗口处理函数(Window Function)进行解决,罕用的Window Function有reduceaggregateprocess

滚动窗口

基于工夫驱动

将数据根据固定的窗口长度对数据进行切分,滚动窗口下窗口之间之间不重叠,且窗口长度是固定的。咱们能够用TumblingEventTimeWindowsTumblingProcessingTimeWindows创立一个基于Event Time或Processing Time的滚动工夫窗口。窗口的长度能够用org.apache.flink.streaming.api.windowing.time.Time中的secondsminuteshoursdays来设置。

//要害解决案例KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream.keyBy(0);// 基于工夫驱动,每隔10s划分一个窗口WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow =keyedStream.timeWindow(Time.seconds(10));// 基于事件驱动, 每相隔3个事件(即三个雷同key的数据), 划分一个窗口进行计算// WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> countWindow =keyedStream.countWindow(3);// apply是窗口的利用函数,即apply里的函数将利用在此窗口的数据上。timeWindow.apply(new MyTimeWindowFunction()).print();// countWindow.apply(new MyCountWindowFunction()).print();

基于事件驱动

当咱们想要每100个用户的购买行为作为驱动,那么每当窗口中填满100个”雷同”元素了,就会对窗口进行计算,很好了解,上面是一个实现案例

public class MyCountWindowFunction implements WindowFunction<Tuple2<String, Integer>,  String, Tuple, GlobalWindow> {    @Override    public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, Integer>>      input, Collector<String> out) throws Exception {      SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");      int sum = 0;      for (Tuple2<String, Integer> tuple2 : input){        sum += tuple2.f1;      }            //无用的工夫戳,默认值为: Long.MAX_VALUE,因为基于事件计数的状况下,不关怀工夫。      long maxTimestamp = window.maxTimestamp();      out.collect("key:" + tuple.getField(0) + " value: " + sum + "| maxTimeStamp :"+ maxTimestamp + "," + format.format(maxTimestamp)      );  }}

滑动工夫窗口

动窗口是固定窗口的更狭义的一种模式,滑动窗口由固定的窗口长度和滑动距离组成,特点:窗口长度固定,能够有重叠,滑动窗口以一个步长(Slide)一直向前滑动,窗口的长度固定。应用时,咱们要设置Slide和Size。Slide的大小决定了Flink以多大的频率来创立新的窗口,Slide较小,窗口的个数会很多。Slide小于窗口的Size时,相邻窗口会重叠,一个事件会被调配到多个窗口;Slide大于Size,有些事件可能被丢掉

基于工夫的滚动窗口

//基于工夫驱动,每隔5s计算一下最近10s的数据// WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow =keyedStream.timeWindow(Time.seconds(10), Time.seconds(5));SingleOutputStreamOperator<String> applyed = countWindow.apply(new WindowFunction<Tuple3<String, String, String>, String, String, GlobalWindow>() {    @Override    public void apply(String s, GlobalWindow window, Iterable<Tuple3<String, String, String>> input, Collector<String> out) throws Exception {        Iterator<Tuple3<String, String, String>> iterator = input.iterator();        StringBuilder sb = new StringBuilder();        while (iterator.hasNext()) {            Tuple3<String, String, String> next = iterator.next();            sb.append(next.f0 + ".." + next.f1 + ".." + next.f2);        }//                window.        out.collect(sb.toString());    }}); 

基于事件的滚动窗口

/*** 滑动窗口:窗口可重叠* 1、基于工夫驱动* 2、基于事件驱动*/WindowedStream<Tuple3<String, String, String>, String, GlobalWindow> countWindow = keybyed.countWindow(3,2);SingleOutputStreamOperator<String> applyed = countWindow.apply(new WindowFunction<Tuple3<String, String, String>, String, String, GlobalWindow>() {    @Override    public void apply(String s, GlobalWindow window, Iterable<Tuple3<String, String, String>> input, Collector<String> out) throws Exception {        Iterator<Tuple3<String, String, String>> iterator = input.iterator();        StringBuilder sb = new StringBuilder();        while (iterator.hasNext()) {            Tuple3<String, String, String> next = iterator.next();            sb.append(next.f0 + ".." + next.f1 + ".." + next.f2);        }//                window.        out.collect(sb.toString());    }});

会话工夫窗口

由一系列事件组合一个指定工夫长度的timeout间隙组成,相似于web利用的session,也就是一段时间没有接管到新数据就会生成新的窗口,在这种模式下,窗口的长度是可变的,每个窗口的开始和完结工夫并不是确定的。咱们能够设置定长的Session gap,也能够应用SessionWindowTimeGapExtractor动静地确定Session gap的长度。

val input: DataStream[T] = ...// event-time session windows with static gapinput    .keyBy(...)    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))    .<window function>(...)// event-time session windows with dynamic gapinput    .keyBy(...)    .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {      override def extract(element: T): Long = {        // determine and return session gap      }    }))    .<window function>(...)// processing-time session windows with static gapinput    .keyBy(...)    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))    .<window function>(...)// processing-time session windows with dynamic gapinput    .keyBy(...)    .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {      override def extract(element: T): Long = {        // determine and return session gap      }    }))    .<window function>(...)

窗口函数

在窗口划分结束后,就是要对窗口内的数据进行解决,一是增量计算对应reduceaggregate,二是全量计算对应process ,增量计算指的是窗口保留一份两头数据,每流入一个新元素,新元素与两头数据两两合一,生成新的两头数据,再保留到窗口中。全量计算指的是窗口先缓存该窗口所有元素,等到触发条件后对窗口内的全量元素执行计算

参考

https://cloud.tencent.com/developer/article/1584926

吴邪,小三爷,混迹于后盾,大数据,人工智能畛域的小菜鸟。
更多请关注