前言

Flink 是流式的、实时的 计算引擎

下面一句话就有两个概念,一个是流式,一个是实时。

流式:就是数据源源不断的流进来,也就是数据没有边界,然而咱们计算的时候必须在一个有边界的范畴内进行,所以这外面就有一个问题,边界怎么确定? 无非就两种形式,依据时间段或者数据量进行确定,依据时间段就是每隔多长时间就划分一个边界,依据数据量就是每来多少条数据划分一个边界,Flink 中就是这么划分边界的,本文会具体解说。

实时:就是数据发送过去之后立马就进行相干的计算,而后将后果输入。这里的计算有两种:

  • 一种是只有边界内的数据进行计算,这种好了解,比方统计每个用户最近五分钟内浏览的新闻数量,就能够取最近五分钟内的所有数据,而后依据每个用户分组,统计新闻的总数。
  • 另一种是边界内数据与内部数据进行关联计算,比方:统计最近五分钟内浏览新闻的用户都是来自哪些地区,这种就须要将五分钟内浏览新闻的用户信息与 hive 中的地区维表进行关联,而后在进行相干计算。

本篇文章所讲的 Flink 的内容就是围绕以上概念进行具体分析的!

Time与Window

Time

在Flink中,如果以时间段划分边界的话,那么工夫就是一个极其重要的字段。

Flink中的工夫有三种类型,如下图所示:

  • Event Time:是事件创立的工夫。它通常由事件中的工夫戳形容,例如采集的日志数据中,每一条日志都会记录本人的生成工夫,Flink通过工夫戳分配器拜访事件工夫戳。
  • Ingestion Time:是数据进入Flink的工夫。
  • Processing Time:是每一个执行基于工夫操作的算子的本地零碎工夫,与机器相干,默认的工夫属性就是Processing Time。

例如,一条日志进入Flink的工夫为2021-01-22 10:00:00.123,达到Window的零碎工夫为2021-01-22 10:00:01.234,日志的内容如下:
2021-01-06 18:37:15.624 INFO Fail over to rm2

对于业务来说,要统计1min内的故障日志个数,哪个工夫是最有意义的?—— eventTime,因为咱们要依据日志的生成工夫进行统计。

Window

Window,即窗口,咱们后面始终提到的边界就是这里的Window(窗口)。

官网解释:流式计算是一种被设计用于解决有限数据集的数据处理引擎,而有限数据集是指一种一直增长的实质上有限的数据集,而window是一种切割有限数据为无限块进行解决的伎俩

所以Window是有限数据流解决的外围,Window将一个有限的stream拆分成无限大小的”buckets”桶,咱们能够在这些桶上做计算操作

Window类型

本文刚开始提到,划分窗口就两种形式:

  1. 依据工夫进行截取(time-driven-window),比方每1分钟统计一次或每10分钟统计一次。
  2. 依据数据进行截取(data-driven-window),比方每5个数据统计一次或每50个数据统计一次。

对于TimeWindow(依据工夫划分窗口), 能够依据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)

  1. 滚动窗口(Tumbling Windows)

将数据根据固定的窗口长度对数据进行切片。

特点:工夫对齐,窗口长度固定,没有重叠

滚动窗口分配器将每个元素调配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会呈现重叠。

例如:如果你指定了一个5分钟大小的滚动窗口,窗口的创立如下图所示:

实用场景:适宜做BI统计等(做每个时间段的聚合计算)。

  1. 滑动窗口(Sliding Windows)

滑动窗口是固定窗口的更狭义的一种模式,滑动窗口由固定的窗口长度和滑动距离组成。

特点:工夫对齐,窗口长度固定,有重叠

滑动窗口分配器将元素调配到固定长度的窗口中,与滚动窗口相似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数管制滑动窗口开始的频率。因而,滑动窗口如果滑动参数小于窗口大小的话,窗口是能够重叠的,在这种状况下元素会被调配到多个窗口中。

例如,你有10分钟的窗口和5分钟的滑动,那么每个窗口中5分钟的窗口里蕴含着上个10分钟产生的数据,如下图所示:

实用场景:对最近一个时间段内的统计(求某接口最近5min的失败率来决定是否要报警)。

  1. 会话窗口(Session Windows)

由一系列事件组合一个指定工夫长度的timeout间隙组成,相似于web利用的session,也就是一段时间没有接管到新数据就会生成新的窗口。

特点:工夫无对齐

session窗口分配器通过session流动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始工夫和完结工夫的状况,相同,当它在一个固定的工夫周期内不再收到元素,即非流动距离产生,那个这个窗口就会敞开。一个session窗口通过一个session距离来配置,这个session距离定义了非沉闷周期的长度,当这个非沉闷周期产生,那么以后的session将敞开并且后续的元素将被调配到新的session窗口中去。

Window API

TimeWindow

TimeWindow是将指定工夫范畴内的所有数据组成一个window,一次对一个window外面的所有数据进行计算(就是本文结尾说的对一个边界内的数据进行计算)。

咱们以 红绿灯路口通过的汽车数量 为例子:

红绿灯路口会有汽车通过,一共会有多少汽车通过,无奈计算。因为车流源源不断,计算没有边界。

所以咱们统计每15秒钟通过红路灯的汽车数量,如第一个15秒为2辆,第二个15秒为3辆,第三个15秒为1辆 ...

  • tumbling-time-window (无重叠数据)

咱们应用 Linux 中的 nc 命令模仿数据的发送方

1.开启发送端口,端口号为9999nc -lk 99992.发送内容(key 代表不同的路口,value 代表每次通过的车辆)一次发送一行,发送的工夫距离代表汽车通过的工夫距离9,39,29,74,92,61,52,35,75,4

Flink 进行采集数据并计算:

object Window {  def main(args: Array[String]): Unit = {    //TODO time-window    //1.创立运行环境    val env = StreamExecutionEnvironment.getExecutionEnvironment    //2.定义数据流起源    val text = env.socketTextStream("localhost", 9999)    //3.转换数据格式,text->CarWc    case class CarWc(sensorId: Int, carCnt: Int)    val ds1: DataStream[CarWc] = text.map {      line => {        val tokens = line.split(",")        CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)      }    }    //4.执行统计操作,每个sensorId一个tumbling窗口,窗口的大小为5秒    //也就是说,每5秒钟统计一次,在这过来的5秒钟内,各个路口通过红绿灯汽车的数量。    val ds2: DataStream[CarWc] = ds1      .keyBy("sensorId")      .timeWindow(Time.seconds(5))      .sum("carCnt")    //5.显示统计后果    ds2.print()    //6.触发流计算    env.execute(this.getClass.getName)  }}

咱们发送的数据并没有指定工夫字段,所以Flink应用的是默认的 Processing Time,也就是Flink零碎解决数据时的工夫。

  • sliding-time-window (有重叠数据)
//1.创立运行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//2.定义数据流起源val text = env.socketTextStream("localhost", 9999)//3.转换数据格式,text->CarWccase class CarWc(sensorId: Int, carCnt: Int)val ds1: DataStream[CarWc] = text.map {  line => {    val tokens = line.split(",")    CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)  }}//4.执行统计操作,每个sensorId一个sliding窗口,窗口工夫10秒,滑动工夫5秒//也就是说,每5秒钟统计一次,在这过来的10秒钟内,各个路口通过红绿灯汽车的数量。val ds2: DataStream[CarWc] = ds1  .keyBy("sensorId")  .timeWindow(Time.seconds(10), Time.seconds(5))  .sum("carCnt")//5.显示统计后果ds2.print()//6.触发流计算env.execute(this.getClass.getName)

CountWindow

CountWindow依据窗口中雷同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的后果。

留神:CountWindow的window_size指的是雷同Key的元素的个数,不是输出的所有元素的总数

  • tumbling-count-window (无重叠数据)
//1.创立运行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//2.定义数据流起源val text = env.socketTextStream("localhost", 9999)//3.转换数据格式,text->CarWccase class CarWc(sensorId: Int, carCnt: Int)val ds1: DataStream[CarWc] = text.map {  (f) => {    val tokens = f.split(",")    CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)  }}//4.执行统计操作,每个sensorId一个tumbling窗口,窗口的大小为5//依照key进行收集,对应的key呈现的次数达到5次作为一个后果val ds2: DataStream[CarWc] = ds1  .keyBy("sensorId")  .countWindow(5)  .sum("carCnt")//5.显示统计后果ds2.print()//6.触发流计算env.execute(this.getClass.getName)

  • sliding-count-window (有重叠数据)

同样也是窗口长度和滑动窗口的操作:窗口长度是5,滑动长度是3

//1.创立运行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//2.定义数据流起源val text = env.socketTextStream("localhost", 9999)//3.转换数据格式,text->CarWccase class CarWc(sensorId: Int, carCnt: Int)val ds1: DataStream[CarWc] = text.map {  (f) => {    val tokens = f.split(",")    CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)  }}//4.执行统计操作,每个sensorId一个sliding窗口,窗口大小3条数据,窗口滑动为3条数据//也就是说,每个路口别离统计,收到对于它的3条音讯时统计在最近5条音讯中,各自路口通过的汽车数量val ds2: DataStream[CarWc] = ds1  .keyBy("sensorId")  .countWindow(5, 3)  .sum("carCnt")//5.显示统计后果ds2.print()//6.触发流计算env.execute(this.getClass.getName)

  • Window 总结
  1. flink反对两种划分窗口的形式(time和count)

    • 如果依据工夫划分窗口,那么它就是一个time-window
    • 如果依据数据划分窗口,那么它就是一个count-window
  2. flink反对窗口的两个重要属性(size和interval)

    • 如果size=interval,那么就会造成tumbling-window(无重叠数据)
    • 如果size>interval,那么就会造成sliding-window(有重叠数据)
    • 如果size<interval,那么这种窗口将会失落数据。比方每5秒钟,统计过来3秒的通过路口汽车的数据,将会漏掉2秒钟的数据。
  3. 通过组合能够得出四种根本窗口

    • time-tumbling-window 无重叠数据的工夫窗口,设置形式举例:timeWindow(Time.seconds(5))
    • time-sliding-window 有重叠数据的工夫窗口,设置形式举例:timeWindow(Time.seconds(5), Time.seconds(3))
    • count-tumbling-window无重叠数据的数量窗口,设置形式举例:countWindow(5)
    • count-sliding-window 有重叠数据的数量窗口,设置形式举例:countWindow(5,3)

Window Reduce

WindowedStream → DataStream:给window赋一个reduce性能的函数,并返回一个聚合的后果。

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.streaming.api.windowing.time.Timeobject StreamWindowReduce {  def main(args: Array[String]): Unit = {    // 获取执行环境    val env = StreamExecutionEnvironment.getExecutionEnvironment    // 创立SocketSource    val stream = env.socketTextStream("node01", 9999)    // 对stream进行解决并按key聚合    val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)    // 引入工夫窗口    val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))    // 执行聚合操作    val streamReduce = streamWindow.reduce(      (item1, item2) => (item1._1, item1._2 + item2._2)    )    // 将聚合数据写入文件    streamReduce.print()    // 执行程序    env.execute("TumblingWindow")  }}

Window Apply

apply办法能够进行一些自定义解决,通过匿名外部类的办法来实现。当有一些简单计算时应用。

用法

  1. 实现一个 WindowFunction 类
  2. 指定该类的泛型为 [输出数据类型, 输入数据类型, keyBy中应用分组字段的类型, 窗口类型]

示例:应用apply办法来实现单词统计

步骤:

  1. 获取流解决运行环境
  2. 构建socket流数据源,并指定IP地址和端口号
  3. 对接管到的数据转换成单词元组
  4. 应用 keyBy 进行分流(分组)
  5. 应用 timeWinodw 指定窗口的长度(每3秒计算一次)
  6. 实现一个WindowFunction匿名外部类

    • apply办法中实现聚合计算
    • 应用Collector.collect收集数据

外围代码如下:

    //1. 获取流解决运行环境    val env = StreamExecutionEnvironment.getExecutionEnvironment    //2. 构建socket流数据源,并指定IP地址和端口号    val textDataStream = env.socketTextStream("node01", 9999).flatMap(_.split(" "))    //3. 对接管到的数据转换成单词元组    val wordDataStream = textDataStream.map(_->1)    //4. 应用 keyBy 进行分流(分组)    val groupedDataStream: KeyedStream[(String, Int), String] = wordDataStream.keyBy(_._1)    //5. 应用 timeWinodw 指定窗口的长度(每3秒计算一次)    val windowDataStream: WindowedStream[(String, Int), String, TimeWindow] = groupedDataStream.timeWindow(Time.seconds(3))    //6. 实现一个WindowFunction匿名外部类    val reduceDatStream: DataStream[(String, Int)] = windowDataStream.apply(new RichWindowFunction[(String, Int), (String, Int), String, TimeWindow] {      //在apply办法中实现数据的聚合      override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {        println("hello world")        val tuple = input.reduce((t1, t2) => {          (t1._1, t1._2 + t2._2)        })        //将要返回的数据收集起来,发送回去        out.collect(tuple)      }    })    reduceDatStream.print()    env.execute()

Window Fold

WindowedStream → DataStream:给窗口赋一个fold性能的函数,并返回一个fold后的后果。

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.streaming.api.windowing.time.Timeobject StreamWindowFold {  def main(args: Array[String]): Unit = {    // 获取执行环境    val env = StreamExecutionEnvironment.getExecutionEnvironment    // 创立SocketSource    val stream = env.socketTextStream("node01", 9999,'\n',3)    // 对stream进行解决并按key聚合    val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)    // 引入滚动窗口    val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))    // 执行fold操作    val streamFold = streamWindow.fold(100){      (begin, item) =>        begin + item._2    }    // 将聚合数据写入文件    streamFold.print()    // 执行程序    env.execute("TumblingWindow")  }}

Aggregation on Window

WindowedStream → DataStream:对一个window内的所有元素做聚合操作。min和 minBy的区别是min返回的是最小值,而minBy返回的是蕴含最小值字段的元素(同样的原理实用于 max 和 maxBy)。

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.api.scala._object StreamWindowAggregation {  def main(args: Array[String]): Unit = {    // 获取执行环境    val env = StreamExecutionEnvironment.getExecutionEnvironment    // 创立SocketSource    val stream = env.socketTextStream("node01", 9999)    // 对stream进行解决并按key聚合    val streamKeyBy = stream.map(item => (item.split(" ")(0), item.split(" ")(1))).keyBy(0)    // 引入滚动窗口    val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))    // 执行聚合操作    val streamMax = streamWindow.max(1)    // 将聚合数据写入文件    streamMax.print()    // 执行程序    env.execute("TumblingWindow")  }}

EventTime与Window

EventTime的引入

  1. 与事实世界中的工夫是不统一的,在flink中被划分为事件工夫,提取工夫,解决工夫三种。
  2. 如果以EventTime为基准来定义工夫窗口那将造成EventTimeWindow,要求音讯自身就应该携带EventTime
  3. 如果以IngesingtTime为基准来定义工夫窗口那将造成IngestingTimeWindow,以source的systemTime为准。
  4. 如果以ProcessingTime基准来定义工夫窗口那将造成ProcessingTimeWindow,以operator的systemTime为准。

在Flink的流式解决中,绝大部分的业务都会应用eventTime,个别只在eventTime无奈应用时,才会被迫应用ProcessingTime或者IngestionTime。

如果要应用EventTime,那么须要引入EventTime的工夫属性,引入形式如下所示:

val env = StreamExecutionEnvironment.getExecutionEnvironment// 从调用时刻开始给env创立的每一个stream追加工夫特色env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

Watermark

引入

咱们晓得,流解决从事件产生,到流经 source,再到 operator,两头是有一个过程和工夫的,尽管大部分状况下,流到 operator 的数据都是依照事件产生的工夫程序来的,然而也不排除因为网络、背压等起因,导致乱序的产生,所谓乱序,就是指 Flink 接管到的事件的先后顺序不是严格依照事件的 Event Time 顺序排列的,所以 Flink 最后设计的时候,就思考到了网络提早,网络乱序等问题,所以提出了一个抽象概念:水印(WaterMark);

如上图所示,就呈现一个问题,一旦呈现乱序,如果只依据 EventTime 决定 Window 的运行,咱们不能明确数据是否全副到位,但又不能无限期的等上来,此时必须要有个机制来保障一个特定的工夫后,必须触发 Window 去进行计算了,这个特地的机制,就是 Watermark。

Watermark 是用于解决乱序事件的,而正确的解决乱序事件,通常用 Watermark 机制联合 Window 来实现

数据流中的 Watermark 用于示意 timestamp 小于 Watermark 的数据,都曾经达到了,因而,Window 的执行也是由 Watermark 触发的。

Watermark 能够了解成一个提早触发机制,咱们能够设置 Watermark 的延时时长 t,每次零碎会校验曾经达到的数据中最大的 maxEventTime,而后认定 EventTime 小于 maxEventTime - t 的所有数据都曾经达到,如果有窗口的进行工夫等于 maxEventTime – t,那么这个窗口被触发执行

有序流的Watermarker如下图所示:(Watermark设置为0)

乱序流的Watermarker如下图所示:(Watermark设置为2)

当 Flink 接管到每一条数据时,都会产生一条 Watermark,这条 Watermark 就等于以后所有达到数据中的 maxEventTime - 提早时长,也就是说,Watermark 是由数据携带的,一旦数据携带的 Watermark 比以后未触发的窗口的进行工夫要晚,那么就会触发相应窗口的执行。因为 Watermark 是由数据携带的,因而,如果运行过程中无奈获取新的数据,那么没有被触发的窗口将永远都不被触发

上图中,咱们设置的容许最大提早达到工夫为2s,所以工夫戳为7s的事件对应的Watermark是5s,工夫戳为12s的事件的Watermark是10s,如果咱们的窗口1是1s~5s,窗口2是6s~10s,那么工夫戳为7s的事件达到时的Watermarker恰好触发窗口1,工夫戳为12s的事件达到时的Watermark恰好触发窗口2。

Flink对于早退数据的解决

waterMark和Window机制解决了流式数据的乱序问题,对于因为提早而程序有误的数据,能够依据eventTime进行业务解决,于提早的数据Flink也有本人的解决办法,次要的方法是给定一个容许提早的工夫,在该工夫范畴内仍能够承受解决提早数据。

设置容许提早的工夫是通过 allowedLateness(lateness: Time) 设置

保留提早数据则是通过 sideOutputLateData(outputTag: OutputTag[T]) 保留

获取提早数据是通过 DataStream.getSideOutput(tag: OutputTag[X]) 获取

具体的用法如下:

allowedLateness(lateness: Time)

def allowedLateness(lateness: Time): WindowedStream[T, K, W] = {  javaStream.allowedLateness(lateness)  this}

该办法传入一个Time值,设置容许数据早退的工夫,这个工夫和 WaterMark 中的工夫概念不同。再来回顾一下:

WaterMark=数据的事件工夫-容许乱序工夫值

随着新数据的到来,waterMark的值会更新为最新数据事件工夫-容许乱序工夫值,然而如果这时候来了一条历史数据,waterMark值则不会更新。总的来说,waterMark是为了能接管到尽可能多的乱序数据。

那这里的Time值,次要是为了期待早退的数据,在肯定工夫范畴内,如果属于该窗口的数据到来,仍会进行计算,前面会对计算形式认真阐明

留神:该办法只针对于基于event-time的窗口,如果是基于processing-time,并且指定了非零的time值则会抛出异样。

sideOutputLateData(outputTag: OutputTag[T])

def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = {  javaStream.sideOutputLateData(outputTag)  this}

该办法是将迟来的数据保留至给定的outputTag参数,而OutputTag则是用来标记提早数据的一个对象。

DataStream.getSideOutput(tag: OutputTag[X])

通过window等操作返回的DataStream调用该办法,传入标记提早数据的对象来获取提早的数据。

对提早数据的了解

提早数据是指:

在以后窗口【假如窗口范畴为10-15】曾经计算之后,又来了一个属于该窗口的数据【假如事件工夫为13】,这时候仍会触发 Window 操作,这种数据就称为提早数据。

那么问题来了,延迟时间怎么计算呢?

假如窗口范畴为10-15,延迟时间为2s,则只有 WaterMark<15+2,并且属于该窗口,就能触发 Window 操作。而如果来了一条数据使得 WaterMark>=15+2,10-15这个窗口就不能再触发 Window 操作,即便新来的数据的 Event Time 属于这个窗口工夫内 。

Flink 关联 Hive 分区表

Flink 1.12 反对了 Hive 最新的分区作为时态表的性能,能够通过 SQL 的形式间接关联 Hive 分区表的最新分区,并且会主动监听最新的 Hive 分区,当监控到新的分区后,会主动地做维表数据的全量替换。通过这种形式,用户无需编写 DataStream 程序即可实现 Kafka 流实时关联最新的 Hive 分区实现数据打宽

具体用法:

在 Sql Client 中注册 HiveCatalog:

vim conf/sql-client-defaults.yaml catalogs:   - name: hive_catalog     type: hive     hive-conf-dir: /disk0/soft/hive-conf/ #该目录须要包hive-site.xml文件 

创立 Kafka 表

CREATE TABLE hive_catalog.flink_db.kfk_fact_bill_master_12 (      master Row<reportDate String, groupID int, shopID int, shopName String, action int, orderStatus int, orderKey String, actionTime bigint, areaName String, paidAmount double, foodAmount double, startTime String, person double, orderSubType int, checkoutTime String>,  proctime as PROCTIME()  -- PROCTIME用来和Hive时态表关联  ) WITH (   'connector' = 'kafka',   'topic' = 'topic_name',   'format' = 'json',   'properties.bootstrap.servers' = 'host:9092',   'properties.group.id' = 'flinkTestGroup',   'scan.startup.mode' = 'timestamp',   'scan.startup.timestamp-millis' = '1607844694000'  ); 

Flink 事实表与 Hive 最新分区数据关联

dim_extend_shop_info 是 Hive 中已存在的表,所以咱们用 table hint 动静地开启维表参数。

CREATE VIEW IF NOT EXISTS hive_catalog.flink_db.view_fact_bill_master as  SELECT * FROM   (select t1.*, t2.group_id, t2.shop_id, t2.group_name, t2.shop_name, t2.brand_id,        ROW_NUMBER() OVER (PARTITION BY groupID, shopID, orderKey ORDER BY actionTime desc) rn      from hive_catalog.flink_db.kfk_fact_bill_master_12 t1         JOIN hive_catalog.flink_db.dim_extend_shop_info     /*+ OPTIONS('streaming-source.enable'='true',       'streaming-source.partition.include' = 'latest',       'streaming-source.monitor-interval' = '1 h',     'streaming-source.partition-order' = 'partition-name') */    FOR SYSTEM_TIME AS OF t1.proctime AS t2 --时态表      ON t1.groupID = t2.group_id and t1.shopID = t2.shop_id      where groupID in (202042)) t  where t.rn = 1 

参数解释:

  • streaming-source.enable 开启流式读取 Hive 数据。
  • streaming-source.partition.include 有以下两个值:

    1. latest 属性: 只读取最新分区数据。
    2. all: 读取全量分区数据 ,默认值为 all,示意读所有分区,latest 只能用在 temporal join 中,用于读取最新分区作为维表,不能间接读取最新分区数据。
  • streaming-source.monitor-interval 监听新分区生成的工夫、不宜过短 、最短是1 个小时,因为目前的实现是每个 task 都会查问 metastore,高频的查可能会对metastore 产生过大的压力。须要留神的是,1.12.1 放开了这个限度,但仍倡议依照理论业务不要配个太短的 interval。
  • streaming-source.partition-order 分区策略,次要有以下 3 种,其中最为举荐的是 partition-name

    1. partition-name 应用默认分区名称程序加载最新分区
    2. create-time 应用分区文件创建工夫程序
    3. partition-time 应用分区工夫程序

搜寻公众号:五分钟学大数据,获取大数据学习秘籍,深刻钻研大数据技术!