关于html5:Python基础实战之猜年龄游戏

50次阅读

共计 4528 个字符,预计需要花费 12 分钟才能阅读完成。

Flink 窗口背景

Flink 认为 Batch 是 Streaming 的一个特例,因而 Flink 底层引擎是一个流式引擎,在下面实现了流解决和批处理。而 Window 就是从 Streaming 到 Batch 的桥梁。艰深讲,Window 是用来对一个有限的流设置一个无限的汇合,从而在有界的数据集上进行操作的一种机制。流上的汇合由 Window 来划定范畴,比方“计算过来 10 分钟”或者“最初 50 个元素的和”。Window 能够由工夫(Time Window)(比方每 30s)或者数据(Count Window)(如每 100 个元素)驱动。DataStream API 提供了 Time 和 Count 的 Window。

一个 Flink 窗口利用的大抵骨架构造如下所示:

// Keyed Window
stream
.keyBy(…) <- 依照一个 Key 进行分组
.window(…) <- 将数据流中的元素调配到相应的窗口中
[.trigger(…)] <- 指定触发器 Trigger(可选)
[.evictor(…)] <- 指定清除器 Evictor(可选)

.reduce/aggregate/process()      <-  窗口处理函数 Window Function

// Non-Keyed Window
stream
.windowAll(…) <- 不分组,将数据流中的所有元素调配到相应的窗口中
[.trigger(…)] <- 指定触发器 Trigger(可选)
[.evictor(…)] <- 指定清除器 Evictor(可选)

.reduce/aggregate/process()      <-  窗口处理函数 Window Function

Flink 窗口的骨架构造中有两个必须的两个操作:

应用窗口分配器(WindowAssigner)将数据流中的元素调配到对应的窗口。
当满足窗口触发条件后,对窗口内的数据应用窗口处理函数(Window Function)进行解决,罕用的 Window Function 有 reduce、aggregate、process
滚动窗口

基于工夫驱动
将数据根据固定的窗口长度对数据进行切分,滚动窗口下窗口之间之间不重叠,且窗口长度是固定的。咱们能够用 TumblingEventTimeWindows 和 TumblingProcessingTimeWindows 创立一个基于 Event Time 或 Processing Time 的滚动工夫窗口。窗口的长度能够用 org.apache.flink.streaming.api.windowing.time.Time 中的 seconds、minutes、hours 和 days 来设置。

// 要害解决案例
KeyedStream<Tuple2, Tuple> keyedStream = mapStream.keyBy(0);
// 基于工夫驱动,每隔 10s 划分一个窗口
WindowedStream<Tuple2, Tuple, TimeWindow> timeWindow =
keyedStream.timeWindow(Time.seconds(10));
// 基于事件驱动, 每相隔 3 个事件(即三个雷同 key 的数据), 划分一个窗口进行计算
// WindowedStream<Tuple2, Tuple, GlobalWindow> countWindow =
keyedStream.countWindow(3);
// apply 是窗口的利用函数,即 apply 里的函数将利用在此窗口的数据上。
timeWindow.apply(new MyTimeWindowFunction()).print();
// countWindow.apply(new MyCountWindowFunction()).print();
基于事件驱动
当咱们想要每 100 个用户的购买行为作为驱动,那么每当窗口中填满 100 个”雷同”游戏元素了,就会对窗口进行计算,很好了解,上面是一个实现案例

public class MyCountWindowFunction implements WindowFunction<Tuple2,
String, Tuple, GlobalWindow> {

@Override
public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2>
  input, Collectorout) throws Exception {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
  int sum = 0;
  for (Tuple2tuple2 : input){sum += tuple2.f1;}
  
  // 无用的工夫戳,默认值为:Long.MAX_VALUE, 因为基于事件计数的状况下,不关怀工夫。long maxTimestamp = window.maxTimestamp();
  out.collect("key:" + tuple.getField(0) + "value:" + sum + "| maxTimeStamp :"+ maxTimestamp + "," + format.format(maxTimestamp)
  );

}
}
滑动工夫窗口

动窗口是固定窗口的更狭义的一种模式,滑动窗口由固定的窗口长度和滑动距离组成,特点:窗口长度固定,能够有重叠,滑动窗口以一个步长(Slide)一直向前滑动,窗口的长度固定。应用时,咱们要设置 Slide 和 Size。Slide 的大小决定了 Flink 以多大的频率来创立新的窗口,Slide 较小,窗口的个数会很多。Slide 小于窗口的 Size 时,相邻窗口会重叠,一个事件会被调配到多个窗口;Slide 大于 Size,有些事件可能被丢掉

基于工夫的滚动窗口
// 基于工夫驱动,每隔 5s 计算一下最近 10s 的数据
// WindowedStream<Tuple2, Tuple, TimeWindow> timeWindow =
keyedStream.timeWindow(Time.seconds(10), Time.seconds(5));
SingleOutputStreamOperatorapplyed = countWindow.apply(new WindowFunction<Tuple3, String, String, GlobalWindow>() {

@Override
public void apply(String s, GlobalWindow window, Iterable<Tuple3> input, Collectorout) throws Exception {Iterator<Tuple3> iterator = input.iterator();
    StringBuilder sb = new StringBuilder();
    while (iterator.hasNext()) {Tuple3next = iterator.next();
        sb.append(next.f0 + ".." + next.f1 + ".." + next.f2);
    }

// window.

    out.collect(sb.toString());
}

});
基于事件的滚动窗口
/**

  • 滑动窗口:窗口可重叠
  • 1、基于工夫驱动
  • 2、基于事件驱动
    */
    WindowedStream<Tuple3, String, GlobalWindow> countWindow = keybyed.countWindow(3,2);

SingleOutputStreamOperatorapplyed = countWindow.apply(new WindowFunction<Tuple3, String, String, GlobalWindow>() {

@Override
public void apply(String s, GlobalWindow window, Iterable<Tuple3> input, Collectorout) throws Exception {Iterator<Tuple3> iterator = input.iterator();
    StringBuilder sb = new StringBuilder();
    while (iterator.hasNext()) {Tuple3next = iterator.next();
        sb.append(next.f0 + ".." + next.f1 + ".." + next.f2);
    }

// window.

    out.collect(sb.toString());
}

});
会话工夫窗口

由一系列事件组合一个指定工夫长度的 timeout 间隙组成,相似于 web 利用的 session,而后在 www.cungun,com 也就是一段时间没有接管到新数据就会生成新的窗口,在这种模式下,窗口的长度是可变的,每个窗口的开始和完结工夫并不是确定的。咱们能够设置定长的 Session gap,也能够应用 SessionWindowTimeGapExtractor 动静地确定 Session gap 的长度。

val input: DataStream[T] = …
// event-time session windows with static gap
input

.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.(...)

// event-time session windows with dynamic gap
input

.keyBy(...)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {override def extract(element: T): Long = {// determine and return session gap}
}))
.(...)

// processing-time session windows with static gap
input

.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.(...)

// processing-time session windows with dynamic gap
input

.keyBy(...)
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {override def extract(element: T): Long = {// determine and return session gap}
}))
.(...)

窗口函数

在窗口划分结束后,就是要对窗口内的数据进行解决,一是增量计算对应 reduce 和 aggregate,二是全量计算对应 process , 增量计算指的是窗口保留一份两头数据,每流入一个新元素,新元素与两头数据两两合一,生成新的两头数据,再保留到窗口中。全量计算指的是窗口先缓存该窗口所有元素,等到触发条件后对窗口内的全量元素执行计算

正文完
 0