前言
Flink 是流式的、实时的 计算引擎
下面一句话就有两个概念,一个是流式,一个是实时。
流式:就是数据源源不断的流进来,也就是数据没有边界,然而咱们计算的时候必须在一个有边界的范畴内进行,所以这外面就有一个问题,边界怎么确定? 无非就两种形式,依据时间段或者数据量进行确定,依据时间段就是每隔多长时间就划分一个边界,依据数据量就是每来多少条数据划分一个边界,Flink 中就是这么划分边界的,本文会具体解说。
实时:就是数据发送过去之后立马就进行相干的计算,而后将后果输入。这里的计算有两种:
- 一种是只有边界内的数据进行计算,这种好了解,比方统计每个用户最近五分钟内浏览的新闻数量,就能够取最近五分钟内的所有数据,而后依据每个用户分组,统计新闻的总数。
- 另一种是边界内数据与内部数据进行关联计算,比方:统计最近五分钟内浏览新闻的用户都是来自哪些地区,这种就须要将五分钟内浏览新闻的用户信息与 hive 中的地区维表进行关联,而后在进行相干计算。
本篇文章所讲的 Flink 的内容就是围绕以上概念进行具体分析的!
Time与Window
Time
在Flink中,如果以时间段划分边界的话,那么工夫就是一个极其重要的字段。
Flink中的工夫有三种类型,如下图所示:
- Event Time:是事件创立的工夫。它通常由事件中的工夫戳形容,例如采集的日志数据中,每一条日志都会记录本人的生成工夫,Flink通过工夫戳分配器拜访事件工夫戳。
- Ingestion Time:是数据进入Flink的工夫。
- Processing Time:是每一个执行基于工夫操作的算子的本地零碎工夫,与机器相干,默认的工夫属性就是Processing Time。
例如,一条日志进入Flink的工夫为2021-01-22 10:00:00.123,达到Window的零碎工夫为2021-01-22 10:00:01.234,日志的内容如下:
2021-01-06 18:37:15.624 INFO Fail over to rm2
对于业务来说,要统计1min内的故障日志个数,哪个工夫是最有意义的?—— eventTime,因为咱们要依据日志的生成工夫进行统计。
Window
Window,即窗口,咱们后面始终提到的边界就是这里的Window(窗口)。
官网解释:流式计算是一种被设计用于解决有限数据集的数据处理引擎,而有限数据集是指一种一直增长的实质上有限的数据集,而window是一种切割有限数据为无限块进行解决的伎俩。
所以Window是有限数据流解决的外围,Window将一个有限的stream拆分成无限大小的”buckets”桶,咱们能够在这些桶上做计算操作。
Window类型
本文刚开始提到,划分窗口就两种形式:
- 依据工夫进行截取(time-driven-window),比方每1分钟统计一次或每10分钟统计一次。
- 依据数据进行截取(data-driven-window),比方每5个数据统计一次或每50个数据统计一次。
对于TimeWindow(依据工夫划分窗口), 能够依据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。
- 滚动窗口(Tumbling Windows)
将数据根据固定的窗口长度对数据进行切片。
特点:工夫对齐,窗口长度固定,没有重叠。
滚动窗口分配器将每个元素调配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会呈现重叠。
例如:如果你指定了一个5分钟大小的滚动窗口,窗口的创立如下图所示:
实用场景:适宜做BI统计等(做每个时间段的聚合计算)。
- 滑动窗口(Sliding Windows)
滑动窗口是固定窗口的更狭义的一种模式,滑动窗口由固定的窗口长度和滑动距离组成。
特点:工夫对齐,窗口长度固定,有重叠。
滑动窗口分配器将元素调配到固定长度的窗口中,与滚动窗口相似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数管制滑动窗口开始的频率。因而,滑动窗口如果滑动参数小于窗口大小的话,窗口是能够重叠的,在这种状况下元素会被调配到多个窗口中。
例如,你有10分钟的窗口和5分钟的滑动,那么每个窗口中5分钟的窗口里蕴含着上个10分钟产生的数据,如下图所示:
实用场景:对最近一个时间段内的统计(求某接口最近5min的失败率来决定是否要报警)。
- 会话窗口(Session Windows)
由一系列事件组合一个指定工夫长度的timeout间隙组成,相似于web利用的session,也就是一段时间没有接管到新数据就会生成新的窗口。
特点:工夫无对齐。
session窗口分配器通过session流动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始工夫和完结工夫的状况,相同,当它在一个固定的工夫周期内不再收到元素,即非流动距离产生,那个这个窗口就会敞开。一个session窗口通过一个session距离来配置,这个session距离定义了非沉闷周期的长度,当这个非沉闷周期产生,那么以后的session将敞开并且后续的元素将被调配到新的session窗口中去。
Window API
TimeWindow
TimeWindow是将指定工夫范畴内的所有数据组成一个window,一次对一个window外面的所有数据进行计算(就是本文结尾说的对一个边界内的数据进行计算)。
咱们以 红绿灯路口通过的汽车数量 为例子:
红绿灯路口会有汽车通过,一共会有多少汽车通过,无奈计算。因为车流源源不断,计算没有边界。
所以咱们统计每15秒钟通过红路灯的汽车数量,如第一个15秒为2辆,第二个15秒为3辆,第三个15秒为1辆 ...
- tumbling-time-window (无重叠数据)
咱们应用 Linux 中的 nc 命令模仿数据的发送方
1.开启发送端口,端口号为9999nc -lk 99992.发送内容(key 代表不同的路口,value 代表每次通过的车辆)一次发送一行,发送的工夫距离代表汽车通过的工夫距离9,39,29,74,92,61,52,35,75,4
Flink 进行采集数据并计算:
object Window { def main(args: Array[String]): Unit = { //TODO time-window //1.创立运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.定义数据流起源 val text = env.socketTextStream("localhost", 9999) //3.转换数据格式,text->CarWc case class CarWc(sensorId: Int, carCnt: Int) val ds1: DataStream[CarWc] = text.map { line => { val tokens = line.split(",") CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt) } } //4.执行统计操作,每个sensorId一个tumbling窗口,窗口的大小为5秒 //也就是说,每5秒钟统计一次,在这过来的5秒钟内,各个路口通过红绿灯汽车的数量。 val ds2: DataStream[CarWc] = ds1 .keyBy("sensorId") .timeWindow(Time.seconds(5)) .sum("carCnt") //5.显示统计后果 ds2.print() //6.触发流计算 env.execute(this.getClass.getName) }}
咱们发送的数据并没有指定工夫字段,所以Flink应用的是默认的 Processing Time,也就是Flink零碎解决数据时的工夫。
- sliding-time-window (有重叠数据)
//1.创立运行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//2.定义数据流起源val text = env.socketTextStream("localhost", 9999)//3.转换数据格式,text->CarWccase class CarWc(sensorId: Int, carCnt: Int)val ds1: DataStream[CarWc] = text.map { line => { val tokens = line.split(",") CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt) }}//4.执行统计操作,每个sensorId一个sliding窗口,窗口工夫10秒,滑动工夫5秒//也就是说,每5秒钟统计一次,在这过来的10秒钟内,各个路口通过红绿灯汽车的数量。val ds2: DataStream[CarWc] = ds1 .keyBy("sensorId") .timeWindow(Time.seconds(10), Time.seconds(5)) .sum("carCnt")//5.显示统计后果ds2.print()//6.触发流计算env.execute(this.getClass.getName)
CountWindow
CountWindow依据窗口中雷同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的后果。
留神:CountWindow的window_size指的是雷同Key的元素的个数,不是输出的所有元素的总数。
- tumbling-count-window (无重叠数据)
//1.创立运行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//2.定义数据流起源val text = env.socketTextStream("localhost", 9999)//3.转换数据格式,text->CarWccase class CarWc(sensorId: Int, carCnt: Int)val ds1: DataStream[CarWc] = text.map { (f) => { val tokens = f.split(",") CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt) }}//4.执行统计操作,每个sensorId一个tumbling窗口,窗口的大小为5//依照key进行收集,对应的key呈现的次数达到5次作为一个后果val ds2: DataStream[CarWc] = ds1 .keyBy("sensorId") .countWindow(5) .sum("carCnt")//5.显示统计后果ds2.print()//6.触发流计算env.execute(this.getClass.getName)
- sliding-count-window (有重叠数据)
同样也是窗口长度和滑动窗口的操作:窗口长度是5,滑动长度是3
//1.创立运行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//2.定义数据流起源val text = env.socketTextStream("localhost", 9999)//3.转换数据格式,text->CarWccase class CarWc(sensorId: Int, carCnt: Int)val ds1: DataStream[CarWc] = text.map { (f) => { val tokens = f.split(",") CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt) }}//4.执行统计操作,每个sensorId一个sliding窗口,窗口大小3条数据,窗口滑动为3条数据//也就是说,每个路口别离统计,收到对于它的3条音讯时统计在最近5条音讯中,各自路口通过的汽车数量val ds2: DataStream[CarWc] = ds1 .keyBy("sensorId") .countWindow(5, 3) .sum("carCnt")//5.显示统计后果ds2.print()//6.触发流计算env.execute(this.getClass.getName)
- Window 总结
flink反对两种划分窗口的形式(time和count)
- 如果依据工夫划分窗口,那么它就是一个time-window
- 如果依据数据划分窗口,那么它就是一个count-window
flink反对窗口的两个重要属性(size和interval)
- 如果size=interval,那么就会造成tumbling-window(无重叠数据)
- 如果size>interval,那么就会造成sliding-window(有重叠数据)
- 如果size<interval,那么这种窗口将会失落数据。比方每5秒钟,统计过来3秒的通过路口汽车的数据,将会漏掉2秒钟的数据。
通过组合能够得出四种根本窗口
- time-tumbling-window 无重叠数据的工夫窗口,设置形式举例:timeWindow(Time.seconds(5))
- time-sliding-window 有重叠数据的工夫窗口,设置形式举例:timeWindow(Time.seconds(5), Time.seconds(3))
- count-tumbling-window无重叠数据的数量窗口,设置形式举例:countWindow(5)
- count-sliding-window 有重叠数据的数量窗口,设置形式举例:countWindow(5,3)
Window Reduce
WindowedStream → DataStream:给window赋一个reduce性能的函数,并返回一个聚合的后果。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.streaming.api.windowing.time.Timeobject StreamWindowReduce { def main(args: Array[String]): Unit = { // 获取执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 创立SocketSource val stream = env.socketTextStream("node01", 9999) // 对stream进行解决并按key聚合 val streamKeyBy = stream.map(item => (item, 1)).keyBy(0) // 引入工夫窗口 val streamWindow = streamKeyBy.timeWindow(Time.seconds(5)) // 执行聚合操作 val streamReduce = streamWindow.reduce( (item1, item2) => (item1._1, item1._2 + item2._2) ) // 将聚合数据写入文件 streamReduce.print() // 执行程序 env.execute("TumblingWindow") }}
Window Apply
apply办法能够进行一些自定义解决,通过匿名外部类的办法来实现。当有一些简单计算时应用。
用法
- 实现一个 WindowFunction 类
- 指定该类的泛型为 [输出数据类型, 输入数据类型, keyBy中应用分组字段的类型, 窗口类型]
示例:应用apply办法来实现单词统计
步骤:
- 获取流解决运行环境
- 构建socket流数据源,并指定IP地址和端口号
- 对接管到的数据转换成单词元组
- 应用 keyBy 进行分流(分组)
- 应用 timeWinodw 指定窗口的长度(每3秒计算一次)
实现一个WindowFunction匿名外部类
- apply办法中实现聚合计算
- 应用Collector.collect收集数据
外围代码如下:
//1. 获取流解决运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2. 构建socket流数据源,并指定IP地址和端口号 val textDataStream = env.socketTextStream("node01", 9999).flatMap(_.split(" ")) //3. 对接管到的数据转换成单词元组 val wordDataStream = textDataStream.map(_->1) //4. 应用 keyBy 进行分流(分组) val groupedDataStream: KeyedStream[(String, Int), String] = wordDataStream.keyBy(_._1) //5. 应用 timeWinodw 指定窗口的长度(每3秒计算一次) val windowDataStream: WindowedStream[(String, Int), String, TimeWindow] = groupedDataStream.timeWindow(Time.seconds(3)) //6. 实现一个WindowFunction匿名外部类 val reduceDatStream: DataStream[(String, Int)] = windowDataStream.apply(new RichWindowFunction[(String, Int), (String, Int), String, TimeWindow] { //在apply办法中实现数据的聚合 override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = { println("hello world") val tuple = input.reduce((t1, t2) => { (t1._1, t1._2 + t2._2) }) //将要返回的数据收集起来,发送回去 out.collect(tuple) } }) reduceDatStream.print() env.execute()
Window Fold
WindowedStream → DataStream:给窗口赋一个fold性能的函数,并返回一个fold后的后果。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.streaming.api.windowing.time.Timeobject StreamWindowFold { def main(args: Array[String]): Unit = { // 获取执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 创立SocketSource val stream = env.socketTextStream("node01", 9999,'\n',3) // 对stream进行解决并按key聚合 val streamKeyBy = stream.map(item => (item, 1)).keyBy(0) // 引入滚动窗口 val streamWindow = streamKeyBy.timeWindow(Time.seconds(5)) // 执行fold操作 val streamFold = streamWindow.fold(100){ (begin, item) => begin + item._2 } // 将聚合数据写入文件 streamFold.print() // 执行程序 env.execute("TumblingWindow") }}
Aggregation on Window
WindowedStream → DataStream:对一个window内的所有元素做聚合操作。min和 minBy的区别是min返回的是最小值,而minBy返回的是蕴含最小值字段的元素(同样的原理实用于 max 和 maxBy)。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.api.scala._object StreamWindowAggregation { def main(args: Array[String]): Unit = { // 获取执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 创立SocketSource val stream = env.socketTextStream("node01", 9999) // 对stream进行解决并按key聚合 val streamKeyBy = stream.map(item => (item.split(" ")(0), item.split(" ")(1))).keyBy(0) // 引入滚动窗口 val streamWindow = streamKeyBy.timeWindow(Time.seconds(5)) // 执行聚合操作 val streamMax = streamWindow.max(1) // 将聚合数据写入文件 streamMax.print() // 执行程序 env.execute("TumblingWindow") }}
EventTime与Window
EventTime的引入
- 与事实世界中的工夫是不统一的,在flink中被划分为事件工夫,提取工夫,解决工夫三种。
- 如果以EventTime为基准来定义工夫窗口那将造成EventTimeWindow,要求音讯自身就应该携带EventTime
- 如果以IngesingtTime为基准来定义工夫窗口那将造成IngestingTimeWindow,以source的systemTime为准。
- 如果以ProcessingTime基准来定义工夫窗口那将造成ProcessingTimeWindow,以operator的systemTime为准。
在Flink的流式解决中,绝大部分的业务都会应用eventTime,个别只在eventTime无奈应用时,才会被迫应用ProcessingTime或者IngestionTime。
如果要应用EventTime,那么须要引入EventTime的工夫属性,引入形式如下所示:
val env = StreamExecutionEnvironment.getExecutionEnvironment// 从调用时刻开始给env创立的每一个stream追加工夫特色env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
Watermark
引入
咱们晓得,流解决从事件产生,到流经 source,再到 operator,两头是有一个过程和工夫的,尽管大部分状况下,流到 operator 的数据都是依照事件产生的工夫程序来的,然而也不排除因为网络、背压等起因,导致乱序的产生,所谓乱序,就是指 Flink 接管到的事件的先后顺序不是严格依照事件的 Event Time 顺序排列的,所以 Flink 最后设计的时候,就思考到了网络提早,网络乱序等问题,所以提出了一个抽象概念:水印(WaterMark);
如上图所示,就呈现一个问题,一旦呈现乱序,如果只依据 EventTime 决定 Window 的运行,咱们不能明确数据是否全副到位,但又不能无限期的等上来,此时必须要有个机制来保障一个特定的工夫后,必须触发 Window 去进行计算了,这个特地的机制,就是 Watermark。
Watermark 是用于解决乱序事件的,而正确的解决乱序事件,通常用 Watermark 机制联合 Window 来实现。
数据流中的 Watermark 用于示意 timestamp 小于 Watermark 的数据,都曾经达到了,因而,Window 的执行也是由 Watermark 触发的。
Watermark 能够了解成一个提早触发机制,咱们能够设置 Watermark 的延时时长 t,每次零碎会校验曾经达到的数据中最大的 maxEventTime,而后认定 EventTime 小于 maxEventTime - t 的所有数据都曾经达到,如果有窗口的进行工夫等于 maxEventTime – t,那么这个窗口被触发执行。
有序流的Watermarker如下图所示:(Watermark设置为0)
乱序流的Watermarker如下图所示:(Watermark设置为2)
当 Flink 接管到每一条数据时,都会产生一条 Watermark,这条 Watermark 就等于以后所有达到数据中的 maxEventTime - 提早时长,也就是说,Watermark 是由数据携带的,一旦数据携带的 Watermark 比以后未触发的窗口的进行工夫要晚,那么就会触发相应窗口的执行。因为 Watermark 是由数据携带的,因而,如果运行过程中无奈获取新的数据,那么没有被触发的窗口将永远都不被触发。
上图中,咱们设置的容许最大提早达到工夫为2s,所以工夫戳为7s的事件对应的Watermark是5s,工夫戳为12s的事件的Watermark是10s,如果咱们的窗口1是1s~5s,窗口2是6s~10s,那么工夫戳为7s的事件达到时的Watermarker恰好触发窗口1,工夫戳为12s的事件达到时的Watermark恰好触发窗口2。
Flink对于早退数据的解决
waterMark和Window机制解决了流式数据的乱序问题,对于因为提早而程序有误的数据,能够依据eventTime进行业务解决,于提早的数据Flink也有本人的解决办法,次要的方法是给定一个容许提早的工夫,在该工夫范畴内仍能够承受解决提早数据。
设置容许提早的工夫是通过 allowedLateness(lateness: Time) 设置
保留提早数据则是通过 sideOutputLateData(outputTag: OutputTag[T]) 保留
获取提早数据是通过 DataStream.getSideOutput(tag: OutputTag[X]) 获取
具体的用法如下:
allowedLateness(lateness: Time)
def allowedLateness(lateness: Time): WindowedStream[T, K, W] = { javaStream.allowedLateness(lateness) this}
该办法传入一个Time值,设置容许数据早退的工夫,这个工夫和 WaterMark 中的工夫概念不同。再来回顾一下:
WaterMark=数据的事件工夫-容许乱序工夫值
随着新数据的到来,waterMark的值会更新为最新数据事件工夫-容许乱序工夫值,然而如果这时候来了一条历史数据,waterMark值则不会更新。总的来说,waterMark是为了能接管到尽可能多的乱序数据。
那这里的Time值,次要是为了期待早退的数据,在肯定工夫范畴内,如果属于该窗口的数据到来,仍会进行计算,前面会对计算形式认真阐明
留神:该办法只针对于基于event-time的窗口,如果是基于processing-time,并且指定了非零的time值则会抛出异样。
sideOutputLateData(outputTag: OutputTag[T])
def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = { javaStream.sideOutputLateData(outputTag) this}
该办法是将迟来的数据保留至给定的outputTag参数,而OutputTag则是用来标记提早数据的一个对象。
DataStream.getSideOutput(tag: OutputTag[X])
通过window等操作返回的DataStream调用该办法,传入标记提早数据的对象来获取提早的数据。
对提早数据的了解
提早数据是指:
在以后窗口【假如窗口范畴为10-15】曾经计算之后,又来了一个属于该窗口的数据【假如事件工夫为13】,这时候仍会触发 Window 操作,这种数据就称为提早数据。
那么问题来了,延迟时间怎么计算呢?
假如窗口范畴为10-15,延迟时间为2s,则只有 WaterMark<15+2,并且属于该窗口,就能触发 Window 操作。而如果来了一条数据使得 WaterMark>=15+2,10-15这个窗口就不能再触发 Window 操作,即便新来的数据的 Event Time 属于这个窗口工夫内 。
Flink 关联 Hive 分区表
Flink 1.12 反对了 Hive 最新的分区作为时态表的性能,能够通过 SQL 的形式间接关联 Hive 分区表的最新分区,并且会主动监听最新的 Hive 分区,当监控到新的分区后,会主动地做维表数据的全量替换。通过这种形式,用户无需编写 DataStream 程序即可实现 Kafka 流实时关联最新的 Hive 分区实现数据打宽。
具体用法:
在 Sql Client 中注册 HiveCatalog:
vim conf/sql-client-defaults.yaml catalogs: - name: hive_catalog type: hive hive-conf-dir: /disk0/soft/hive-conf/ #该目录须要包hive-site.xml文件
创立 Kafka 表
CREATE TABLE hive_catalog.flink_db.kfk_fact_bill_master_12 ( master Row<reportDate String, groupID int, shopID int, shopName String, action int, orderStatus int, orderKey String, actionTime bigint, areaName String, paidAmount double, foodAmount double, startTime String, person double, orderSubType int, checkoutTime String>, proctime as PROCTIME() -- PROCTIME用来和Hive时态表关联 ) WITH ( 'connector' = 'kafka', 'topic' = 'topic_name', 'format' = 'json', 'properties.bootstrap.servers' = 'host:9092', 'properties.group.id' = 'flinkTestGroup', 'scan.startup.mode' = 'timestamp', 'scan.startup.timestamp-millis' = '1607844694000' );
Flink 事实表与 Hive 最新分区数据关联
dim_extend_shop_info 是 Hive 中已存在的表,所以咱们用 table hint 动静地开启维表参数。
CREATE VIEW IF NOT EXISTS hive_catalog.flink_db.view_fact_bill_master as SELECT * FROM (select t1.*, t2.group_id, t2.shop_id, t2.group_name, t2.shop_name, t2.brand_id, ROW_NUMBER() OVER (PARTITION BY groupID, shopID, orderKey ORDER BY actionTime desc) rn from hive_catalog.flink_db.kfk_fact_bill_master_12 t1 JOIN hive_catalog.flink_db.dim_extend_shop_info /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.partition.include' = 'latest', 'streaming-source.monitor-interval' = '1 h', 'streaming-source.partition-order' = 'partition-name') */ FOR SYSTEM_TIME AS OF t1.proctime AS t2 --时态表 ON t1.groupID = t2.group_id and t1.shopID = t2.shop_id where groupID in (202042)) t where t.rn = 1
参数解释:
- streaming-source.enable 开启流式读取 Hive 数据。
streaming-source.partition.include 有以下两个值:
- latest 属性: 只读取最新分区数据。
- all: 读取全量分区数据 ,默认值为 all,示意读所有分区,latest 只能用在 temporal join 中,用于读取最新分区作为维表,不能间接读取最新分区数据。
- streaming-source.monitor-interval 监听新分区生成的工夫、不宜过短 、最短是1 个小时,因为目前的实现是每个 task 都会查问 metastore,高频的查可能会对metastore 产生过大的压力。须要留神的是,1.12.1 放开了这个限度,但仍倡议依照理论业务不要配个太短的 interval。
streaming-source.partition-order 分区策略,次要有以下 3 种,其中最为举荐的是 partition-name:
- partition-name 应用默认分区名称程序加载最新分区
- create-time 应用分区文件创建工夫程序
- partition-time 应用分区工夫程序