乐趣区

关于大数据:Flink-中极其重要的-Time-与-Window-详细解析深度好文建议收藏

前言

Flink 是流式的、实时的 计算引擎

下面一句话就有两个概念,一个是流式,一个是实时。

流式 :就是数据源源不断的流进来,也就是数据没有边界,然而咱们计算的时候必须在一个有边界的范畴内进行,所以这外面就有一个问题,边界怎么确定?无非就两种形式, 依据时间段或者数据量进行确定,依据时间段就是每隔多长时间就划分一个边界,依据数据量就是每来多少条数据划分一个边界,Flink 中就是这么划分边界的,本文会具体解说。

实时:就是数据发送过去之后立马就进行相干的计算,而后将后果输入。这里的计算有两种:

  • 一种是只有边界内的数据进行计算,这种好了解,比方统计每个用户最近五分钟内浏览的新闻数量,就能够取最近五分钟内的所有数据,而后依据每个用户分组,统计新闻的总数。
  • 另一种是边界内数据与内部数据进行关联计算,比方:统计最近五分钟内浏览新闻的用户都是来自哪些地区,这种就须要将五分钟内浏览新闻的用户信息与 hive 中的地区维表进行关联,而后在进行相干计算。

本篇文章所讲的 Flink 的内容就是围绕以上概念进行具体分析的!

Time 与 Window

Time

在 Flink 中,如果以时间段划分边界的话,那么工夫就是一个极其重要的字段。

Flink 中的工夫有三种类型,如下图所示:

  • Event Time:是事件创立的工夫。它通常由事件中的工夫戳形容,例如采集的日志数据中,每一条日志都会记录本人的生成工夫,Flink 通过工夫戳分配器拜访事件工夫戳。
  • Ingestion Time:是数据进入 Flink 的工夫。
  • Processing Time:是每一个执行基于工夫操作的算子的本地零碎工夫,与机器相干,默认的工夫属性就是 Processing Time。

例如,一条日志进入 Flink 的工夫为 2021-01-22 10:00:00.123,达到 Window 的零碎工夫为 2021-01-22 10:00:01.234,日志的内容如下:
2021-01-06 18:37:15.624 INFO Fail over to rm2

对于业务来说,要统计 1min 内的故障日志个数,哪个工夫是最有意义的?—— eventTime,因为咱们要依据日志的生成工夫进行统计。

Window

Window,即窗口,咱们后面始终提到的边界就是这里的 Window(窗口)。

官网解释:流式计算是一种被设计用于解决有限数据集的数据处理引擎,而有限数据集是指一种一直增长的实质上有限的数据集,而 window 是一种切割有限数据为无限块进行解决的伎俩

所以Window 是有限数据流解决的外围,Window 将一个有限的 stream 拆分成无限大小的”buckets”桶,咱们能够在这些桶上做计算操作

Window 类型

本文刚开始提到,划分窗口就两种形式:

  1. 依据工夫进行截取(time-driven-window),比方每 1 分钟统计一次或每 10 分钟统计一次。
  2. 依据数据进行截取(data-driven-window),比方每 5 个数据统计一次或每 50 个数据统计一次。

对于 TimeWindow(依据工夫划分窗口),能够依据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)

  1. 滚动窗口(Tumbling Windows)

将数据根据固定的窗口长度对数据进行切片。

特点:工夫对齐,窗口长度固定,没有重叠

滚动窗口分配器将每个元素调配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会呈现重叠。

例如:如果你指定了一个 5 分钟大小的滚动窗口,窗口的创立如下图所示:

实用场景:适宜做 BI 统计等(做每个时间段的聚合计算)。

  1. 滑动窗口(Sliding Windows)

滑动窗口是固定窗口的更狭义的一种模式,滑动窗口由固定的窗口长度和滑动距离组成。

特点:工夫对齐,窗口长度固定,有重叠

滑动窗口分配器将元素调配到固定长度的窗口中,与滚动窗口相似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数管制滑动窗口开始的频率。因而,滑动窗口如果滑动参数小于窗口大小的话,窗口是能够重叠的,在这种状况下元素会被调配到多个窗口中。

例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里蕴含着上个 10 分钟产生的数据,如下图所示:

实用场景:对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)。

  1. 会话窗口(Session Windows)

由一系列事件组合一个指定工夫长度的 timeout 间隙组成,相似于 web 利用的 session,也就是一段时间没有接管到新数据就会生成新的窗口。

特点:工夫无对齐

session 窗口分配器通过 session 流动来对元素进行分组,session 窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始工夫和完结工夫的状况,相同,当它在一个固定的工夫周期内不再收到元素,即非流动距离产生,那个这个窗口就会敞开。一个 session 窗口通过一个 session 距离来配置,这个 session 距离定义了非沉闷周期的长度,当这个非沉闷周期产生,那么以后的 session 将敞开并且后续的元素将被调配到新的 session 窗口中去。

Window API

TimeWindow

TimeWindow 是将指定工夫范畴内的所有数据组成一个 window,一次对一个 window 外面的所有数据进行计算(就是本文结尾说的对一个边界内的数据进行计算)。

咱们以 红绿灯路口通过的汽车数量 为例子:

红绿灯路口会有汽车通过,一共会有多少汽车通过,无奈计算。因为车流源源不断,计算没有边界。

所以咱们统计每 15 秒钟通过红路灯的汽车数量,如第一个 15 秒为 2 辆,第二个 15 秒为 3 辆,第三个 15 秒为 1 辆 …

  • tumbling-time-window (无重叠数据)

咱们应用 Linux 中的 nc 命令模仿数据的发送方

1. 开启发送端口,端口号为 9999
nc -lk 9999

2. 发送内容(key 代表不同的路口,value 代表每次通过的车辆)一次发送一行,发送的工夫距离代表汽车通过的工夫距离
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4

Flink 进行采集数据并计算:

object Window {def main(args: Array[String]): Unit = {
    //TODO time-window
    //1. 创立运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2. 定义数据流起源
    val text = env.socketTextStream("localhost", 9999)

    //3. 转换数据格式,text->CarWc
    case class CarWc(sensorId: Int, carCnt: Int)
    val ds1: DataStream[CarWc] = text.map {
      line => {val tokens = line.split(",")
        CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
      }
    }

    //4. 执行统计操作,每个 sensorId 一个 tumbling 窗口,窗口的大小为 5 秒
    // 也就是说,每 5 秒钟统计一次,在这过来的 5 秒钟内,各个路口通过红绿灯汽车的数量。val ds2: DataStream[CarWc] = ds1
      .keyBy("sensorId")
      .timeWindow(Time.seconds(5))
      .sum("carCnt")

    //5. 显示统计后果
    ds2.print()

    //6. 触发流计算
    env.execute(this.getClass.getName)

  }
}

咱们发送的数据并没有指定工夫字段,所以 Flink 应用的是默认的 Processing Time,也就是 Flink 零碎解决数据时的工夫。

  • sliding-time-window (有重叠数据)
//1. 创立运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

//2. 定义数据流起源
val text = env.socketTextStream("localhost", 9999)

//3. 转换数据格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)

val ds1: DataStream[CarWc] = text.map {
  line => {val tokens = line.split(",")
    CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
  }
}
//4. 执行统计操作,每个 sensorId 一个 sliding 窗口,窗口工夫 10 秒, 滑动工夫 5 秒
// 也就是说,每 5 秒钟统计一次,在这过来的 10 秒钟内,各个路口通过红绿灯汽车的数量。val ds2: DataStream[CarWc] = ds1
  .keyBy("sensorId")
  .timeWindow(Time.seconds(10), Time.seconds(5))
  .sum("carCnt")

//5. 显示统计后果
ds2.print()

//6. 触发流计算
env.execute(this.getClass.getName)

CountWindow

CountWindow 依据窗口中雷同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的后果。

留神:CountWindow 的 window_size 指的是雷同 Key 的元素的个数,不是输出的所有元素的总数

  • tumbling-count-window (无重叠数据)
//1. 创立运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

//2. 定义数据流起源
val text = env.socketTextStream("localhost", 9999)

//3. 转换数据格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)

val ds1: DataStream[CarWc] = text.map {(f) => {val tokens = f.split(",")
    CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
  }
}
//4. 执行统计操作,每个 sensorId 一个 tumbling 窗口,窗口的大小为 5
// 依照 key 进行收集,对应的 key 呈现的次数达到 5 次作为一个后果
val ds2: DataStream[CarWc] = ds1
  .keyBy("sensorId")
  .countWindow(5)
  .sum("carCnt")

//5. 显示统计后果
ds2.print()

//6. 触发流计算
env.execute(this.getClass.getName)

  • sliding-count-window (有重叠数据)

同样也是窗口长度和滑动窗口的操作:窗口长度是 5,滑动长度是 3

//1. 创立运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment

//2. 定义数据流起源
val text = env.socketTextStream("localhost", 9999)

//3. 转换数据格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)

val ds1: DataStream[CarWc] = text.map {(f) => {val tokens = f.split(",")
    CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
  }
}
//4. 执行统计操作,每个 sensorId 一个 sliding 窗口,窗口大小 3 条数据, 窗口滑动为 3 条数据
// 也就是说,每个路口别离统计,收到对于它的 3 条音讯时统计在最近 5 条音讯中,各自路口通过的汽车数量
val ds2: DataStream[CarWc] = ds1
  .keyBy("sensorId")
  .countWindow(5, 3)
  .sum("carCnt")

//5. 显示统计后果
ds2.print()

//6. 触发流计算
env.execute(this.getClass.getName)

  • Window 总结
  1. flink 反对两种划分窗口的形式(time 和 count)

    • 如果依据工夫划分窗口,那么它就是一个 time-window
    • 如果依据数据划分窗口,那么它就是一个 count-window
  2. flink 反对窗口的两个重要属性(size 和 interval)

    • 如果 size=interval, 那么就会造成 tumbling-window(无重叠数据)
    • 如果 size>interval, 那么就会造成 sliding-window(有重叠数据)
    • 如果 size<interval, 那么这种窗口将会失落数据。比方每 5 秒钟,统计过来 3 秒的通过路口汽车的数据,将会漏掉 2 秒钟的数据。
  3. 通过组合能够得出四种根本窗口

    • time-tumbling-window 无重叠数据的工夫窗口,设置形式举例:timeWindow(Time.seconds(5))
    • time-sliding-window 有重叠数据的工夫窗口,设置形式举例:timeWindow(Time.seconds(5), Time.seconds(3))
    • count-tumbling-window 无重叠数据的数量窗口,设置形式举例:countWindow(5)
    • count-sliding-window 有重叠数据的数量窗口,设置形式举例:countWindow(5,3)

Window Reduce

WindowedStream → DataStream:给 window 赋一个 reduce 性能的函数,并返回一个聚合的后果。

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object StreamWindowReduce {def main(args: Array[String]): Unit = {
    // 获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 创立 SocketSource
    val stream = env.socketTextStream("node01", 9999)

    // 对 stream 进行解决并按 key 聚合
    val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)

    // 引入工夫窗口
    val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))

    // 执行聚合操作
    val streamReduce = streamWindow.reduce((item1, item2) => (item1._1, item1._2 + item2._2)
    )

    // 将聚合数据写入文件
    streamReduce.print()

    // 执行程序
    env.execute("TumblingWindow")
  }
}

Window Apply

apply 办法能够进行一些自定义解决,通过匿名外部类的办法来实现。当有一些简单计算时应用。

用法

  1. 实现一个 WindowFunction 类
  2. 指定该类的泛型为 [输出数据类型, 输入数据类型, keyBy 中应用分组字段的类型, 窗口类型]

示例:应用 apply 办法来实现单词统计

步骤:

  1. 获取流解决运行环境
  2. 构建 socket 流数据源,并指定 IP 地址和端口号
  3. 对接管到的数据转换成单词元组
  4. 应用 keyBy 进行分流(分组)
  5. 应用 timeWinodw 指定窗口的长度(每 3 秒计算一次)
  6. 实现一个 WindowFunction 匿名外部类

    • apply 办法中实现聚合计算
    • 应用 Collector.collect 收集数据

外围代码如下:

    //1. 获取流解决运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //2. 构建 socket 流数据源,并指定 IP 地址和端口号
    val textDataStream = env.socketTextStream("node01", 9999).flatMap(_.split(" "))

    //3. 对接管到的数据转换成单词元组
    val wordDataStream = textDataStream.map(_->1)

    //4. 应用 keyBy 进行分流(分组)val groupedDataStream: KeyedStream[(String, Int), String] = wordDataStream.keyBy(_._1)

    //5. 应用 timeWinodw 指定窗口的长度(每 3 秒计算一次)val windowDataStream: WindowedStream[(String, Int), String, TimeWindow] = groupedDataStream.timeWindow(Time.seconds(3))

    //6. 实现一个 WindowFunction 匿名外部类
    val reduceDatStream: DataStream[(String, Int)] = windowDataStream.apply(new RichWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
      // 在 apply 办法中实现数据的聚合
      override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {println("hello world")
        val tuple = input.reduce((t1, t2) => {(t1._1, t1._2 + t2._2)
        })
        // 将要返回的数据收集起来,发送回去
        out.collect(tuple)
      }
    })
    reduceDatStream.print()
    env.execute()

Window Fold

WindowedStream → DataStream:给窗口赋一个 fold 性能的函数,并返回一个 fold 后的后果。

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object StreamWindowFold {def main(args: Array[String]): Unit = {
    // 获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 创立 SocketSource
    val stream = env.socketTextStream("node01", 9999,'\n',3)

    // 对 stream 进行解决并按 key 聚合
    val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)

    // 引入滚动窗口
    val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))

    // 执行 fold 操作
    val streamFold = streamWindow.fold(100){(begin, item) =>
        begin + item._2
    }

    // 将聚合数据写入文件
    streamFold.print()

    // 执行程序
    env.execute("TumblingWindow")
  }
}

Aggregation on Window

WindowedStream → DataStream:对一个 window 内的所有元素做聚合操作。min 和 minBy 的区别是 min 返回的是最小值,而 minBy 返回的是蕴含最小值字段的元素(同样的原理实用于 max 和 maxBy)。

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.scala._

object StreamWindowAggregation {def main(args: Array[String]): Unit = {
    // 获取执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 创立 SocketSource
    val stream = env.socketTextStream("node01", 9999)

    // 对 stream 进行解决并按 key 聚合
    val streamKeyBy = stream.map(item => (item.split("")(0), item.split(" ")(1))).keyBy(0)

    // 引入滚动窗口
    val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))

    // 执行聚合操作
    val streamMax = streamWindow.max(1)

    // 将聚合数据写入文件
    streamMax.print()

    // 执行程序
    env.execute("TumblingWindow")
  }
}

EventTime 与 Window

EventTime 的引入

  1. 与事实世界中的工夫是不统一的,在 flink 中被划分为事件工夫,提取工夫,解决工夫三种。
  2. 如果以 EventTime 为基准来定义工夫窗口那将造成 EventTimeWindow, 要求音讯自身就应该携带 EventTime
  3. 如果以 IngesingtTime 为基准来定义工夫窗口那将造成 IngestingTimeWindow, 以 source 的 systemTime 为准。
  4. 如果以 ProcessingTime 基准来定义工夫窗口那将造成 ProcessingTimeWindow,以 operator 的 systemTime 为准。

在 Flink 的流式解决中,绝大部分的业务都会应用 eventTime,个别只在 eventTime 无奈应用时,才会被迫应用 ProcessingTime 或者 IngestionTime。

如果要应用 EventTime,那么须要引入 EventTime 的工夫属性,引入形式如下所示:

val env = StreamExecutionEnvironment.getExecutionEnvironment

// 从调用时刻开始给 env 创立的每一个 stream 追加工夫特色
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

Watermark

引入

咱们晓得,流解决从事件产生,到流经 source,再到 operator,两头是有一个过程和工夫的,尽管大部分状况下,流到 operator 的数据都是依照事件产生的工夫程序来的,然而也不排除因为网络、背压等起因,导致乱序的产生,所谓乱序,就是指 Flink 接管到的事件的先后顺序不是严格依照事件的 Event Time 顺序排列的,所以 Flink 最后设计的时候,就思考到了网络提早,网络乱序等问题,所以提出了一个抽象概念:水印(WaterMark);

如上图所示,就呈现一个问题,一旦呈现乱序,如果只依据 EventTime 决定 Window 的运行,咱们不能明确数据是否全副到位,但又不能无限期的等上来,此时必须要有个机制来保障一个特定的工夫后,必须触发 Window 去进行计算了,这个特地的机制,就是 Watermark。

Watermark 是用于解决乱序事件的,而正确的解决乱序事件,通常用 Watermark 机制联合 Window 来实现

数据流中的 Watermark 用于示意 timestamp 小于 Watermark 的数据,都曾经达到了,因而,Window 的执行也是由 Watermark 触发的。

Watermark 能够了解成一个提早触发机制,咱们能够设置 Watermark 的延时时长 t,每次零碎会校验曾经达到的数据中最大的 maxEventTime,而后认定 EventTime 小于 maxEventTime – t 的所有数据都曾经达到,如果有窗口的进行工夫等于 maxEventTime – t,那么这个窗口被触发执行

有序流的 Watermarker 如下图所示:(Watermark 设置为 0)

乱序流的 Watermarker 如下图所示:(Watermark 设置为 2)

当 Flink 接管到每一条数据时,都会产生一条 Watermark,这条 Watermark 就等于以后所有达到数据中的 maxEventTime – 提早时长,也就是说,Watermark 是由数据携带的,一旦数据携带的 Watermark 比以后未触发的窗口的进行工夫要晚,那么就会触发相应窗口的执行。因为 Watermark 是由数据携带的,因而,如果运行过程中无奈获取新的数据,那么没有被触发的窗口将永远都不被触发

上图中,咱们设置的容许最大提早达到工夫为 2s,所以工夫戳为 7s 的事件对应的 Watermark 是 5s,工夫戳为 12s 的事件的 Watermark 是 10s,如果咱们的窗口 1 是 1s~5s,窗口 2 是 6s~10s,那么工夫戳为 7s 的事件达到时的 Watermarker 恰好触发窗口 1,工夫戳为 12s 的事件达到时的 Watermark 恰好触发窗口 2。

Flink 对于早退数据的解决

waterMark 和 Window 机制解决了流式数据的乱序问题,对于因为提早而程序有误的数据,能够依据 eventTime 进行业务解决,于提早的数据 Flink 也有本人的解决办法,次要的方法是给定一个容许提早的工夫,在该工夫范畴内仍能够承受解决提早数据。

设置容许提早的工夫是通过 allowedLateness(lateness: Time) 设置

保留提早数据则是通过 sideOutputLateData(outputTag: OutputTag[T]) 保留

获取提早数据是通过 DataStream.getSideOutput(tag: OutputTag[X]) 获取

具体的用法如下:

allowedLateness(lateness: Time)

def allowedLateness(lateness: Time): WindowedStream[T, K, W] = {javaStream.allowedLateness(lateness)
  this
}

该办法传入一个 Time 值,设置容许数据早退的工夫,这个工夫和 WaterMark 中的工夫概念不同。再来回顾一下:

WaterMark= 数据的事件工夫 - 容许乱序工夫值

随着新数据的到来,waterMark 的值会更新为最新数据事件工夫 - 容许乱序工夫值,然而如果这时候来了一条历史数据,waterMark 值则不会更新。总的来说,waterMark 是为了能接管到尽可能多的乱序数据。

那这里的 Time 值,次要是为了期待早退的数据,在肯定工夫范畴内,如果属于该窗口的数据到来,仍会进行计算,前面会对计算形式认真阐明

留神:该办法只针对于基于 event-time 的窗口,如果是基于 processing-time,并且指定了非零的 time 值则会抛出异样。

sideOutputLateData(outputTag: OutputTag[T])

def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = {javaStream.sideOutputLateData(outputTag)
  this
}

该办法是将迟来的数据保留至给定的 outputTag 参数,而 OutputTag 则是用来标记提早数据的一个对象。

DataStream.getSideOutput(tag: OutputTag[X])

通过 window 等操作返回的 DataStream 调用该办法,传入标记提早数据的对象来获取提早的数据。

对提早数据的了解

提早数据是指:

在以后窗口【假如窗口范畴为 10-15】曾经计算之后,又来了一个属于该窗口的数据【假如事件工夫为 13】,这时候仍会触发 Window 操作,这种数据就称为提早数据。

那么问题来了,延迟时间怎么计算呢?

假如窗口范畴为 10-15,延迟时间为 2s,则只有 WaterMark<15+2,并且属于该窗口,就能触发 Window 操作。而如果来了一条数据使得 WaterMark>=15+2,10-15 这个窗口就不能再触发 Window 操作,即便新来的数据的 Event Time 属于这个窗口工夫内。

Flink 关联 Hive 分区表

Flink 1.12 反对了 Hive 最新的分区作为时态表的性能,能够通过 SQL 的形式间接关联 Hive 分区表的最新分区,并且会主动监听最新的 Hive 分区,当监控到新的分区后,会主动地做维表数据的全量替换。通过这种形式,用户无需编写 DataStream 程序即可实现 Kafka 流实时关联最新的 Hive 分区实现数据打宽

具体用法:

在 Sql Client 中注册 HiveCatalog:

vim conf/sql-client-defaults.yaml 
catalogs: 
  - name: hive_catalog 
    type: hive 
    hive-conf-dir: /disk0/soft/hive-conf/ #该目录须要包 hive-site.xml 文件 

创立 Kafka 表


CREATE TABLE hive_catalog.flink_db.kfk_fact_bill_master_12 (  
    master Row<reportDate String, groupID int, shopID int, shopName String, action int, orderStatus int, orderKey String, actionTime bigint, areaName String, paidAmount double, foodAmount double, startTime String, person double, orderSubType int, checkoutTime String>,  
proctime as PROCTIME()  -- PROCTIME 用来和 Hive 时态表关联) WITH (  
 'connector' = 'kafka',  
 'topic' = 'topic_name',  
 'format' = 'json',  
 'properties.bootstrap.servers' = 'host:9092',  
 'properties.group.id' = 'flinkTestGroup',  
 'scan.startup.mode' = 'timestamp',  
 'scan.startup.timestamp-millis' = '1607844694000'  
); 

Flink 事实表与 Hive 最新分区数据关联

dim_extend_shop_info 是 Hive 中已存在的表,所以咱们用 table hint 动静地开启维表参数。


CREATE VIEW IF NOT EXISTS hive_catalog.flink_db.view_fact_bill_master as  
SELECT * FROM  
 (select t1.*, t2.group_id, t2.shop_id, t2.group_name, t2.shop_name, t2.brand_id,   
     ROW_NUMBER() OVER (PARTITION BY groupID, shopID, orderKey ORDER BY actionTime desc) rn  
    from hive_catalog.flink_db.kfk_fact_bill_master_12 t1  
       JOIN hive_catalog.flink_db.dim_extend_shop_info   
  /*+ OPTIONS('streaming-source.enable'='true',  
     'streaming-source.partition.include' = 'latest',  
     'streaming-source.monitor-interval' = '1 h',
     'streaming-source.partition-order' = 'partition-name') */
    FOR SYSTEM_TIME AS OF t1.proctime AS t2 -- 时态表  
    ON t1.groupID = t2.group_id and t1.shopID = t2.shop_id  
    where groupID in (202042)) t  where t.rn = 1 

参数解释:

  • streaming-source.enable 开启流式读取 Hive 数据。
  • streaming-source.partition.include 有以下两个值:

    1. latest 属性: 只读取最新分区数据。
    2. all: 读取全量分区数据,默认值为 all,示意读所有分区,latest 只能用在 temporal join 中,用于读取最新分区作为维表,不能间接读取最新分区数据。
  • streaming-source.monitor-interval 监听新分区生成的工夫、不宜过短、最短是 1 个小时,因为目前的实现是每个 task 都会查问 metastore,高频的查可能会对 metastore 产生过大的压力。须要留神的是,1.12.1 放开了这个限度,但仍倡议依照理论业务不要配个太短的 interval。
  • streaming-source.partition-order 分区策略,次要有以下 3 种,其中最为举荐的是 partition-name

    1. partition-name 应用默认分区名称程序加载最新分区
    2. create-time 应用分区文件创建工夫程序
    3. partition-time 应用分区工夫程序

搜寻公众号:五分钟学大数据,获取大数据学习秘籍,深刻钻研大数据技术!

退出移动版