前言
Flink 是流式的、实时的 计算引擎
下面一句话就有两个概念,一个是流式,一个是实时。
流式 :就是数据源源不断的流进来,也就是数据没有边界,然而咱们计算的时候必须在一个有边界的范畴内进行,所以这外面就有一个问题,边界怎么确定?无非就两种形式, 依据时间段或者数据量进行确定,依据时间段就是每隔多长时间就划分一个边界,依据数据量就是每来多少条数据划分一个边界,Flink 中就是这么划分边界的,本文会具体解说。
实时:就是数据发送过去之后立马就进行相干的计算,而后将后果输入。这里的计算有两种:
- 一种是只有边界内的数据进行计算,这种好了解,比方统计每个用户最近五分钟内浏览的新闻数量,就能够取最近五分钟内的所有数据,而后依据每个用户分组,统计新闻的总数。
- 另一种是边界内数据与内部数据进行关联计算,比方:统计最近五分钟内浏览新闻的用户都是来自哪些地区,这种就须要将五分钟内浏览新闻的用户信息与 hive 中的地区维表进行关联,而后在进行相干计算。
本篇文章所讲的 Flink 的内容就是围绕以上概念进行具体分析的!
Time 与 Window
Time
在 Flink 中,如果以时间段划分边界的话,那么工夫就是一个极其重要的字段。
Flink 中的工夫有三种类型,如下图所示:
- Event Time:是事件创立的工夫。它通常由事件中的工夫戳形容,例如采集的日志数据中,每一条日志都会记录本人的生成工夫,Flink 通过工夫戳分配器拜访事件工夫戳。
- Ingestion Time:是数据进入 Flink 的工夫。
- Processing Time:是每一个执行基于工夫操作的算子的本地零碎工夫,与机器相干,默认的工夫属性就是 Processing Time。
例如,一条日志进入 Flink 的工夫为 2021-01-22 10:00:00.123,达到 Window 的零碎工夫为 2021-01-22 10:00:01.234,日志的内容如下:
2021-01-06 18:37:15.624 INFO Fail over to rm2
对于业务来说,要统计 1min 内的故障日志个数,哪个工夫是最有意义的?—— eventTime,因为咱们要依据日志的生成工夫进行统计。
Window
Window,即窗口,咱们后面始终提到的边界就是这里的 Window(窗口)。
官网解释:流式计算是一种被设计用于解决有限数据集的数据处理引擎,而有限数据集是指一种一直增长的实质上有限的数据集,而 window 是一种切割有限数据为无限块进行解决的伎俩。
所以Window 是有限数据流解决的外围,Window 将一个有限的 stream 拆分成无限大小的”buckets”桶,咱们能够在这些桶上做计算操作。
Window 类型
本文刚开始提到,划分窗口就两种形式:
- 依据工夫进行截取(time-driven-window),比方每 1 分钟统计一次或每 10 分钟统计一次。
- 依据数据进行截取(data-driven-window),比方每 5 个数据统计一次或每 50 个数据统计一次。
对于 TimeWindow(依据工夫划分窗口),能够依据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。
- 滚动窗口(Tumbling Windows)
将数据根据固定的窗口长度对数据进行切片。
特点:工夫对齐,窗口长度固定,没有重叠。
滚动窗口分配器将每个元素调配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会呈现重叠。
例如:如果你指定了一个 5 分钟大小的滚动窗口,窗口的创立如下图所示:
实用场景:适宜做 BI 统计等(做每个时间段的聚合计算)。
- 滑动窗口(Sliding Windows)
滑动窗口是固定窗口的更狭义的一种模式,滑动窗口由固定的窗口长度和滑动距离组成。
特点:工夫对齐,窗口长度固定,有重叠。
滑动窗口分配器将元素调配到固定长度的窗口中,与滚动窗口相似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数管制滑动窗口开始的频率。因而,滑动窗口如果滑动参数小于窗口大小的话,窗口是能够重叠的,在这种状况下元素会被调配到多个窗口中。
例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里蕴含着上个 10 分钟产生的数据,如下图所示:
实用场景:对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)。
- 会话窗口(Session Windows)
由一系列事件组合一个指定工夫长度的 timeout 间隙组成,相似于 web 利用的 session,也就是一段时间没有接管到新数据就会生成新的窗口。
特点:工夫无对齐。
session 窗口分配器通过 session 流动来对元素进行分组,session 窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始工夫和完结工夫的状况,相同,当它在一个固定的工夫周期内不再收到元素,即非流动距离产生,那个这个窗口就会敞开。一个 session 窗口通过一个 session 距离来配置,这个 session 距离定义了非沉闷周期的长度,当这个非沉闷周期产生,那么以后的 session 将敞开并且后续的元素将被调配到新的 session 窗口中去。
Window API
TimeWindow
TimeWindow 是将指定工夫范畴内的所有数据组成一个 window,一次对一个 window 外面的所有数据进行计算(就是本文结尾说的对一个边界内的数据进行计算)。
咱们以 红绿灯路口通过的汽车数量 为例子:
红绿灯路口会有汽车通过,一共会有多少汽车通过,无奈计算。因为车流源源不断,计算没有边界。
所以咱们统计每 15 秒钟通过红路灯的汽车数量,如第一个 15 秒为 2 辆,第二个 15 秒为 3 辆,第三个 15 秒为 1 辆 …
- tumbling-time-window (无重叠数据)
咱们应用 Linux 中的 nc 命令模仿数据的发送方
1. 开启发送端口,端口号为 9999
nc -lk 9999
2. 发送内容(key 代表不同的路口,value 代表每次通过的车辆)一次发送一行,发送的工夫距离代表汽车通过的工夫距离
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4
Flink 进行采集数据并计算:
object Window {def main(args: Array[String]): Unit = {
//TODO time-window
//1. 创立运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2. 定义数据流起源
val text = env.socketTextStream("localhost", 9999)
//3. 转换数据格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)
val ds1: DataStream[CarWc] = text.map {
line => {val tokens = line.split(",")
CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
}
}
//4. 执行统计操作,每个 sensorId 一个 tumbling 窗口,窗口的大小为 5 秒
// 也就是说,每 5 秒钟统计一次,在这过来的 5 秒钟内,各个路口通过红绿灯汽车的数量。val ds2: DataStream[CarWc] = ds1
.keyBy("sensorId")
.timeWindow(Time.seconds(5))
.sum("carCnt")
//5. 显示统计后果
ds2.print()
//6. 触发流计算
env.execute(this.getClass.getName)
}
}
咱们发送的数据并没有指定工夫字段,所以 Flink 应用的是默认的 Processing Time,也就是 Flink 零碎解决数据时的工夫。
- sliding-time-window (有重叠数据)
//1. 创立运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2. 定义数据流起源
val text = env.socketTextStream("localhost", 9999)
//3. 转换数据格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)
val ds1: DataStream[CarWc] = text.map {
line => {val tokens = line.split(",")
CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
}
}
//4. 执行统计操作,每个 sensorId 一个 sliding 窗口,窗口工夫 10 秒, 滑动工夫 5 秒
// 也就是说,每 5 秒钟统计一次,在这过来的 10 秒钟内,各个路口通过红绿灯汽车的数量。val ds2: DataStream[CarWc] = ds1
.keyBy("sensorId")
.timeWindow(Time.seconds(10), Time.seconds(5))
.sum("carCnt")
//5. 显示统计后果
ds2.print()
//6. 触发流计算
env.execute(this.getClass.getName)
CountWindow
CountWindow 依据窗口中雷同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的后果。
留神:CountWindow 的 window_size 指的是雷同 Key 的元素的个数,不是输出的所有元素的总数。
- tumbling-count-window (无重叠数据)
//1. 创立运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2. 定义数据流起源
val text = env.socketTextStream("localhost", 9999)
//3. 转换数据格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)
val ds1: DataStream[CarWc] = text.map {(f) => {val tokens = f.split(",")
CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
}
}
//4. 执行统计操作,每个 sensorId 一个 tumbling 窗口,窗口的大小为 5
// 依照 key 进行收集,对应的 key 呈现的次数达到 5 次作为一个后果
val ds2: DataStream[CarWc] = ds1
.keyBy("sensorId")
.countWindow(5)
.sum("carCnt")
//5. 显示统计后果
ds2.print()
//6. 触发流计算
env.execute(this.getClass.getName)
- sliding-count-window (有重叠数据)
同样也是窗口长度和滑动窗口的操作:窗口长度是 5,滑动长度是 3
//1. 创立运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2. 定义数据流起源
val text = env.socketTextStream("localhost", 9999)
//3. 转换数据格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)
val ds1: DataStream[CarWc] = text.map {(f) => {val tokens = f.split(",")
CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
}
}
//4. 执行统计操作,每个 sensorId 一个 sliding 窗口,窗口大小 3 条数据, 窗口滑动为 3 条数据
// 也就是说,每个路口别离统计,收到对于它的 3 条音讯时统计在最近 5 条音讯中,各自路口通过的汽车数量
val ds2: DataStream[CarWc] = ds1
.keyBy("sensorId")
.countWindow(5, 3)
.sum("carCnt")
//5. 显示统计后果
ds2.print()
//6. 触发流计算
env.execute(this.getClass.getName)
- Window 总结
-
flink 反对两种划分窗口的形式(time 和 count)
- 如果依据工夫划分窗口,那么它就是一个 time-window
- 如果依据数据划分窗口,那么它就是一个 count-window
-
flink 反对窗口的两个重要属性(size 和 interval)
- 如果 size=interval, 那么就会造成 tumbling-window(无重叠数据)
- 如果 size>interval, 那么就会造成 sliding-window(有重叠数据)
- 如果 size<interval, 那么这种窗口将会失落数据。比方每 5 秒钟,统计过来 3 秒的通过路口汽车的数据,将会漏掉 2 秒钟的数据。
-
通过组合能够得出四种根本窗口
- time-tumbling-window 无重叠数据的工夫窗口,设置形式举例:timeWindow(Time.seconds(5))
- time-sliding-window 有重叠数据的工夫窗口,设置形式举例:timeWindow(Time.seconds(5), Time.seconds(3))
- count-tumbling-window 无重叠数据的数量窗口,设置形式举例:countWindow(5)
- count-sliding-window 有重叠数据的数量窗口,设置形式举例:countWindow(5,3)
Window Reduce
WindowedStream → DataStream:给 window 赋一个 reduce 性能的函数,并返回一个聚合的后果。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object StreamWindowReduce {def main(args: Array[String]): Unit = {
// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创立 SocketSource
val stream = env.socketTextStream("node01", 9999)
// 对 stream 进行解决并按 key 聚合
val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
// 引入工夫窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
// 执行聚合操作
val streamReduce = streamWindow.reduce((item1, item2) => (item1._1, item1._2 + item2._2)
)
// 将聚合数据写入文件
streamReduce.print()
// 执行程序
env.execute("TumblingWindow")
}
}
Window Apply
apply 办法能够进行一些自定义解决,通过匿名外部类的办法来实现。当有一些简单计算时应用。
用法
- 实现一个 WindowFunction 类
- 指定该类的泛型为 [输出数据类型, 输入数据类型, keyBy 中应用分组字段的类型, 窗口类型]
示例:应用 apply 办法来实现单词统计
步骤:
- 获取流解决运行环境
- 构建 socket 流数据源,并指定 IP 地址和端口号
- 对接管到的数据转换成单词元组
- 应用 keyBy 进行分流(分组)
- 应用 timeWinodw 指定窗口的长度(每 3 秒计算一次)
-
实现一个 WindowFunction 匿名外部类
- apply 办法中实现聚合计算
- 应用 Collector.collect 收集数据
外围代码如下:
//1. 获取流解决运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2. 构建 socket 流数据源,并指定 IP 地址和端口号
val textDataStream = env.socketTextStream("node01", 9999).flatMap(_.split(" "))
//3. 对接管到的数据转换成单词元组
val wordDataStream = textDataStream.map(_->1)
//4. 应用 keyBy 进行分流(分组)val groupedDataStream: KeyedStream[(String, Int), String] = wordDataStream.keyBy(_._1)
//5. 应用 timeWinodw 指定窗口的长度(每 3 秒计算一次)val windowDataStream: WindowedStream[(String, Int), String, TimeWindow] = groupedDataStream.timeWindow(Time.seconds(3))
//6. 实现一个 WindowFunction 匿名外部类
val reduceDatStream: DataStream[(String, Int)] = windowDataStream.apply(new RichWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
// 在 apply 办法中实现数据的聚合
override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {println("hello world")
val tuple = input.reduce((t1, t2) => {(t1._1, t1._2 + t2._2)
})
// 将要返回的数据收集起来,发送回去
out.collect(tuple)
}
})
reduceDatStream.print()
env.execute()
Window Fold
WindowedStream → DataStream:给窗口赋一个 fold 性能的函数,并返回一个 fold 后的后果。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object StreamWindowFold {def main(args: Array[String]): Unit = {
// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创立 SocketSource
val stream = env.socketTextStream("node01", 9999,'\n',3)
// 对 stream 进行解决并按 key 聚合
val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
// 引入滚动窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
// 执行 fold 操作
val streamFold = streamWindow.fold(100){(begin, item) =>
begin + item._2
}
// 将聚合数据写入文件
streamFold.print()
// 执行程序
env.execute("TumblingWindow")
}
}
Aggregation on Window
WindowedStream → DataStream:对一个 window 内的所有元素做聚合操作。min 和 minBy 的区别是 min 返回的是最小值,而 minBy 返回的是蕴含最小值字段的元素(同样的原理实用于 max 和 maxBy)。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.scala._
object StreamWindowAggregation {def main(args: Array[String]): Unit = {
// 获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创立 SocketSource
val stream = env.socketTextStream("node01", 9999)
// 对 stream 进行解决并按 key 聚合
val streamKeyBy = stream.map(item => (item.split("")(0), item.split(" ")(1))).keyBy(0)
// 引入滚动窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
// 执行聚合操作
val streamMax = streamWindow.max(1)
// 将聚合数据写入文件
streamMax.print()
// 执行程序
env.execute("TumblingWindow")
}
}
EventTime 与 Window
EventTime 的引入
- 与事实世界中的工夫是不统一的,在 flink 中被划分为事件工夫,提取工夫,解决工夫三种。
- 如果以 EventTime 为基准来定义工夫窗口那将造成 EventTimeWindow, 要求音讯自身就应该携带 EventTime
- 如果以 IngesingtTime 为基准来定义工夫窗口那将造成 IngestingTimeWindow, 以 source 的 systemTime 为准。
- 如果以 ProcessingTime 基准来定义工夫窗口那将造成 ProcessingTimeWindow,以 operator 的 systemTime 为准。
在 Flink 的流式解决中,绝大部分的业务都会应用 eventTime,个别只在 eventTime 无奈应用时,才会被迫应用 ProcessingTime 或者 IngestionTime。
如果要应用 EventTime,那么须要引入 EventTime 的工夫属性,引入形式如下所示:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给 env 创立的每一个 stream 追加工夫特色
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
Watermark
引入
咱们晓得,流解决从事件产生,到流经 source,再到 operator,两头是有一个过程和工夫的,尽管大部分状况下,流到 operator 的数据都是依照事件产生的工夫程序来的,然而也不排除因为网络、背压等起因,导致乱序的产生,所谓乱序,就是指 Flink 接管到的事件的先后顺序不是严格依照事件的 Event Time 顺序排列的,所以 Flink 最后设计的时候,就思考到了网络提早,网络乱序等问题,所以提出了一个抽象概念:水印(WaterMark);
如上图所示,就呈现一个问题,一旦呈现乱序,如果只依据 EventTime 决定 Window 的运行,咱们不能明确数据是否全副到位,但又不能无限期的等上来,此时必须要有个机制来保障一个特定的工夫后,必须触发 Window 去进行计算了,这个特地的机制,就是 Watermark。
Watermark 是用于解决乱序事件的,而正确的解决乱序事件,通常用 Watermark 机制联合 Window 来实现。
数据流中的 Watermark 用于示意 timestamp 小于 Watermark 的数据,都曾经达到了,因而,Window 的执行也是由 Watermark 触发的。
Watermark 能够了解成一个提早触发机制,咱们能够设置 Watermark 的延时时长 t,每次零碎会校验曾经达到的数据中最大的 maxEventTime,而后认定 EventTime 小于 maxEventTime – t 的所有数据都曾经达到,如果有窗口的进行工夫等于 maxEventTime – t,那么这个窗口被触发执行。
有序流的 Watermarker 如下图所示:(Watermark 设置为 0)
乱序流的 Watermarker 如下图所示:(Watermark 设置为 2)
当 Flink 接管到每一条数据时,都会产生一条 Watermark,这条 Watermark 就等于以后所有达到数据中的 maxEventTime – 提早时长,也就是说,Watermark 是由数据携带的,一旦数据携带的 Watermark 比以后未触发的窗口的进行工夫要晚,那么就会触发相应窗口的执行。因为 Watermark 是由数据携带的,因而,如果运行过程中无奈获取新的数据,那么没有被触发的窗口将永远都不被触发。
上图中,咱们设置的容许最大提早达到工夫为 2s,所以工夫戳为 7s 的事件对应的 Watermark 是 5s,工夫戳为 12s 的事件的 Watermark 是 10s,如果咱们的窗口 1 是 1s~5s,窗口 2 是 6s~10s,那么工夫戳为 7s 的事件达到时的 Watermarker 恰好触发窗口 1,工夫戳为 12s 的事件达到时的 Watermark 恰好触发窗口 2。
Flink 对于早退数据的解决
waterMark 和 Window 机制解决了流式数据的乱序问题,对于因为提早而程序有误的数据,能够依据 eventTime 进行业务解决,于提早的数据 Flink 也有本人的解决办法,次要的方法是给定一个容许提早的工夫,在该工夫范畴内仍能够承受解决提早数据。
设置容许提早的工夫是通过 allowedLateness(lateness: Time) 设置
保留提早数据则是通过 sideOutputLateData(outputTag: OutputTag[T]) 保留
获取提早数据是通过 DataStream.getSideOutput(tag: OutputTag[X]) 获取
具体的用法如下:
allowedLateness(lateness: Time)
def allowedLateness(lateness: Time): WindowedStream[T, K, W] = {javaStream.allowedLateness(lateness)
this
}
该办法传入一个 Time 值,设置容许数据早退的工夫,这个工夫和 WaterMark 中的工夫概念不同。再来回顾一下:
WaterMark= 数据的事件工夫 - 容许乱序工夫值
随着新数据的到来,waterMark 的值会更新为最新数据事件工夫 - 容许乱序工夫值,然而如果这时候来了一条历史数据,waterMark 值则不会更新。总的来说,waterMark 是为了能接管到尽可能多的乱序数据。
那这里的 Time 值,次要是为了期待早退的数据,在肯定工夫范畴内,如果属于该窗口的数据到来,仍会进行计算,前面会对计算形式认真阐明
留神:该办法只针对于基于 event-time 的窗口,如果是基于 processing-time,并且指定了非零的 time 值则会抛出异样。
sideOutputLateData(outputTag: OutputTag[T])
def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = {javaStream.sideOutputLateData(outputTag)
this
}
该办法是将迟来的数据保留至给定的 outputTag 参数,而 OutputTag 则是用来标记提早数据的一个对象。
DataStream.getSideOutput(tag: OutputTag[X])
通过 window 等操作返回的 DataStream 调用该办法,传入标记提早数据的对象来获取提早的数据。
对提早数据的了解
提早数据是指:
在以后窗口【假如窗口范畴为 10-15】曾经计算之后,又来了一个属于该窗口的数据【假如事件工夫为 13】,这时候仍会触发 Window 操作,这种数据就称为提早数据。
那么问题来了,延迟时间怎么计算呢?
假如窗口范畴为 10-15,延迟时间为 2s,则只有 WaterMark<15+2,并且属于该窗口,就能触发 Window 操作。而如果来了一条数据使得 WaterMark>=15+2,10-15 这个窗口就不能再触发 Window 操作,即便新来的数据的 Event Time 属于这个窗口工夫内。
Flink 关联 Hive 分区表
Flink 1.12 反对了 Hive 最新的分区作为时态表的性能,能够通过 SQL 的形式间接关联 Hive 分区表的最新分区,并且会主动监听最新的 Hive 分区,当监控到新的分区后,会主动地做维表数据的全量替换。通过这种形式,用户无需编写 DataStream 程序即可实现 Kafka 流实时关联最新的 Hive 分区实现数据打宽。
具体用法:
在 Sql Client 中注册 HiveCatalog:
vim conf/sql-client-defaults.yaml
catalogs:
- name: hive_catalog
type: hive
hive-conf-dir: /disk0/soft/hive-conf/ #该目录须要包 hive-site.xml 文件
创立 Kafka 表
CREATE TABLE hive_catalog.flink_db.kfk_fact_bill_master_12 (
master Row<reportDate String, groupID int, shopID int, shopName String, action int, orderStatus int, orderKey String, actionTime bigint, areaName String, paidAmount double, foodAmount double, startTime String, person double, orderSubType int, checkoutTime String>,
proctime as PROCTIME() -- PROCTIME 用来和 Hive 时态表关联) WITH (
'connector' = 'kafka',
'topic' = 'topic_name',
'format' = 'json',
'properties.bootstrap.servers' = 'host:9092',
'properties.group.id' = 'flinkTestGroup',
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1607844694000'
);
Flink 事实表与 Hive 最新分区数据关联
dim_extend_shop_info 是 Hive 中已存在的表,所以咱们用 table hint 动静地开启维表参数。
CREATE VIEW IF NOT EXISTS hive_catalog.flink_db.view_fact_bill_master as
SELECT * FROM
(select t1.*, t2.group_id, t2.shop_id, t2.group_name, t2.shop_name, t2.brand_id,
ROW_NUMBER() OVER (PARTITION BY groupID, shopID, orderKey ORDER BY actionTime desc) rn
from hive_catalog.flink_db.kfk_fact_bill_master_12 t1
JOIN hive_catalog.flink_db.dim_extend_shop_info
/*+ OPTIONS('streaming-source.enable'='true',
'streaming-source.partition.include' = 'latest',
'streaming-source.monitor-interval' = '1 h',
'streaming-source.partition-order' = 'partition-name') */
FOR SYSTEM_TIME AS OF t1.proctime AS t2 -- 时态表
ON t1.groupID = t2.group_id and t1.shopID = t2.shop_id
where groupID in (202042)) t where t.rn = 1
参数解释:
- streaming-source.enable 开启流式读取 Hive 数据。
-
streaming-source.partition.include 有以下两个值:
- latest 属性: 只读取最新分区数据。
- all: 读取全量分区数据,默认值为 all,示意读所有分区,latest 只能用在 temporal join 中,用于读取最新分区作为维表,不能间接读取最新分区数据。
- streaming-source.monitor-interval 监听新分区生成的工夫、不宜过短、最短是 1 个小时,因为目前的实现是每个 task 都会查问 metastore,高频的查可能会对 metastore 产生过大的压力。须要留神的是,1.12.1 放开了这个限度,但仍倡议依照理论业务不要配个太短的 interval。
-
streaming-source.partition-order 分区策略,次要有以下 3 种,其中最为举荐的是 partition-name:
- partition-name 应用默认分区名称程序加载最新分区
- create-time 应用分区文件创建工夫程序
- partition-time 应用分区工夫程序