关于flink:Flink的时间与watermarks详解

11次阅读

共计 10585 个字符,预计需要花费 27 分钟才能阅读完成。

当咱们在应用 Flink 的时候,防止不了要和工夫 (time)、水位线(watermarks) 打交道,了解这些概念是开发分布式流解决利用的根底。那么 Flink 反对哪些工夫语义?Flink 是如何解决乱序事件的?什么是水位线?水位线是如何生成的?水位线的传播方式是什么?让咱们带着这些问题来开始本文的内容。

<!– more –>

工夫语义

基本概念

工夫是 Flink 等流解决中最重要的概念之一,在 Flink 中 Time 能够分为三种:Event-Time,Processing-Time 以及 Ingestion-Time,如下图所示:

  • Event Time

事件工夫 ,事件(Event) 自身的工夫,即数据流中事件理论产生的工夫,通常应用事件产生时的工夫戳来形容,这些事件的工夫戳通常在进入流解决利用之前就曾经存在了,事件工夫反映了事件实在的产生工夫。所以,基于事件工夫的计算操作,其后果是具备确定性的,无论数据流的处理速度如何、事件达到算子的程序是否会乱,最终生成的后果都是一样的。

  • Ingestion Time

摄入工夫,事件进入 Flink 的工夫,行将每一个事件在数据源算子的解决工夫作为事件工夫的工夫戳,并主动生成水位线(watermarks, 对于 watermarks 下文会详细分析)。

Ingestion Time 从概念上讲介于 Event Time 和 Processing Time 之间。与 Processing Time 相比,它的性能耗费更多一些,但后果却更可预测。因为 Ingestion Time 应用稳固的工夫戳(在数据源处调配了一次),因而对记录的不同窗口操作将援用雷同的工夫戳,而在 Processing Time 中每个窗口算子都能够将记录调配给不同的窗口。

与 Event Time 相比,Ingestion Time 无奈解决任何乱序事件或早退的数据,即无奈提供确定的后果,然而程序不用指定如何生成水位线。在外部,Ingestion Time 与 Event Time 十分类似,然而能够实现主动调配工夫戳和主动生成水位线的性能。

  • Processing Time

解决工夫,依据解决机器的零碎时钟决定数据流以后的工夫,即事件被解决时以后零碎的工夫。还以窗口算子为例(对于 window,下文会详细分析),基于解决工夫的窗口操作是以机器工夫来进行触发的,因为数据达到窗口的速率不同,所以窗口算子中应用解决工夫会导致不确定的后果。在应用解决工夫时,无需期待水位线的到来后进行触发窗口,所以能够提供较低的提早。

比照

通过下面的剖析,应该对 Flink 的工夫语义有了大抵的理解。不晓得你会不会有这样一个疑难:既然事件工夫曾经可能解决所有的问题了,那为何还要用解决工夫呢?其实解决工夫有其特定的应用场景,解决工夫因为不必思考事件的提早与乱序,所以其解决数据的提早较低。因而如果一些利用比拟器重处理速度而非准确性,那么就能够应用解决工夫,比方要实时监控仪表盘。总之,尽管解决工夫的提早较低,然而其后果具备不确定性,事件工夫尽管有提早,然而可能保障解决的后果具备准确性,并且能够解决提早甚至无序的数据。

应用

上一小结讲述了三种工夫语义的基本概念,接下来将从代码层面解说在程序中该如何配置这三种工夫语义。首先来看一段代码:

/** The time characteristic that is used if none other is set. */
 private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
// 省略的代码
/** The time characteristic used by the data streams. */
 private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;

上述两行代码摘自 StreamExecutionEnvironment 类,能够看出,Flink 在流处理程序中默认的工夫语义是 Processing Time,那么该如何批改默认的工夫语义呢?很简略,再来看一段代码,上面的代码片段同样来自于 StreamExecutionEnvironment 类:

 /**
 * 如果应用 Processing Time 或者 Event Time,默认的水位线间隔时间是 200 毫秒
 * 能够通过 ExecutionConfig#setAutoWatermarkInterval(long)设置
 * @param characteristic The time characteristic.
 */
 @PublicEvolving
 public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
 if (characteristic == TimeCharacteristic.ProcessingTime) {getConfig().setAutoWatermarkInterval(0);
 } else {getConfig().setAutoWatermarkInterval(200);
 }
 }

上述的办法能够配置不同的工夫语义,参数 TimeCharacteristic 是一个枚举类,包含 ProcessingTime,IngestionTime,EventTime 三个元素。具体应用形式如下:

//env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

watermarks

在解释 watermarks(水位线)之前,先看一个咱们身边产生的实在案例。高考,是大家十分相熟的场景。如果把高考的考试安顿简略地看作是一个流解决利用,那么,每一个考试科目的开始工夫到完结工夫就是一个窗口,每个考生能够了解成一条记录,考生达到考场的工夫能够了解成记录的工夫戳,而考试能够了解成某种算子操作。大家都晓得,高考考试在开考后 15 分钟是不容许进场的,这个规定能够了解成一个水位线,比方,上午第一场语文考试,开考工夫是 9:30,容许在 9:45 之前进入考场,那么 9:45 这个工夫能够了解成一个水位线。在开考之前,有的同学喜爱提前到考场,有的同学喜爱卡点到考场。假如有个同学叫 考必胜,ta 是卡着工夫点到的考场,然而早上因为吃了不洁净的货色,忽然感觉肚子不适,无奈之下在厕所里耽搁了 16 分钟,那么依照规定,此时考必胜是不可能进入考场的,因为此时曾经默认所有考生都曾经在考场了,此时考试也曾经触发,那么卡必胜就能够了解为早退的事件。以上就是对窗口、事件工夫以及水位线的简略了解,上面开始具体解释什么水位线。

基本概念

在上一节中,具体解说了 Flink 提供的三种工夫语义,在解说这三种工夫语义的时候,提到了一个名词 — 水位线,那么到底什么是水位线呢?先来看一个例子,如果要每 5 分钟统计一次过来 1 个小时内的热门商品的 topN,这是一个典型的滑动窗口操作,那么基于事件工夫的窗口该在什么时候登程计算呢?换句话说,咱们要等多久才可能确定曾经接管到了特定工夫点之前的所有事件,另一方面,因为网络提早等起因,会产生乱序的数据,在进行窗口操作时,不可能无限期的期待上来,须要一个机制来通知窗口在某个特定工夫来触发 window 计算,即认为小于等于该工夫点的数据都曾经到来了。这个机制就是 watermark(水位线),能够用来解决乱序事件。

水位线是一个全局的进度指标,示意能够确定不会再有提早的事件到来的某个工夫点。从实质上讲,水位线提供了一个逻辑时钟,用来告诉零碎以后的事件工夫。比方,当一个算子接管到了 W(T)时刻的水位线,就能够大胆的认为不会再接管到任何工夫戳小于或等于 W(T)的事件了。水位线对于基于事件工夫的窗口和解决乱序数据是十分要害的,算子一旦接管到了某个水位线,就相当于接到一支穿云箭的信号:所有特定工夫区间的数据都已集结结束,能够进行窗口触发计算。

既然曾经说了,事件是会存在乱序的,那这个乱序的水平到底有多大呢,这个就不太好确定了,总之总会有些早退的事件慢慢悠悠的到来。所以,水位线其实是一种在 准确性 提早 之间的衡量,如果水位线设置的十分刻薄,即不容许有落伍的数据呈现,尽管准确性进步了,但这在无形之中减少了数据处理的提早。反之,如果水位线设置的十分激进,即容许有早退的数据产生,那么尽管升高了数据处理的提早,但数据的准确性会较低。

所以,水位线是中庸之道,过犹不及。在很多事实利用中,零碎无奈获取足够多的信息来确定完满的水位线,那么该怎么办呢?Flink 提供了某些机制来解决那些可能晚于水位线的早退工夫,用户能够依据利用的需要不同,能够将这些漏网之鱼 (早退的数据) 舍弃掉,或者写入日志,或者利用他们修改之前的后果。

下面说到没有完满的水位线,可能还是很形象。接下来,咱们再看一幅图,从图中能够很直观地察看实在的水位线与现实中的完满水位线之间的关系,如下图:

上图的浅灰色直虚线示意现实的水位线,深灰色的蜿蜒虚线示意事实中的水位线,彩色直线示意两者之间的偏差。在现实状态下,这种偏差为 0,因为总是在工夫产生时就会立刻解决,即事件的实在工夫与处理事件的工夫是统一的。比方,12:01 产生的事件刚好在 12:01 时被解决,12:02 产生的事件刚好在 12:02 时被解决。然而事实总会有早退的数据产生,比方网络提早的起因,所以实在的状况会像深灰色的蜿蜒虚线示意的那样,即 12:01 产生的数据可能会在 12:01 之后被解决,12:02 产生的数据在 12:02 时被解决,12:03 时产生的数据会被在 12:03 之后解决。这种动静的偏差在分布式解决零碎中是十分常见的。

水位线图解

在上一大节,通过语言形容对水位线的概念进行了具体解读,在本大节会通过图解的形式解析水位线的含意,这样更能加深对水位线的了解。如下图所示:

如上图,矩形示意一条记录,三角示意该条记录的工夫戳(实在产生工夫),圆圈示意水位线。能够看到下面的数据是乱序的,比方当算子接管到为 2 的水位线时,就能够认为工夫戳小于等于 2 的数据都曾经到来了,此时能够触发计算。同理,接管到为 5 的水位线时,就能够认为工夫戳小于或等于 5 的数据都曾经到来了,此时能够触发计算。

能够看出水位线是枯燥递增的,并且和记录的工夫戳存在分割,一个工夫戳为 T 的水位线示意接下来所有记录的工夫戳肯定都会大于 T。

水位线的流传

当初,或者你曾经对水位线是什么有了一个初步的意识,接下来将会介绍水位线是怎么在 Flink 外部流传的。对于水位线的流传策略能够演绎为 3 点:

  • 首先,水位线是以播送的模式在算子之间进行流传
  • Long.MAX_VALUE 示意事件工夫的完结,即将来不会有数据到来了
  • 单个分区的输出取最大值,多个分区的输出取最小值

对于 Long.MAX_VALUE 的解释,先看一段代码,如下:

/** 
 * 当一个 source 敞开时,会输入一个 Long.MAX_VALUE 的水位线,当一个算子接管到该水位线时,* 相当于接管到一个信号:将来不会再有数据输出了
 */
@PublicEvolving
public final class Watermark extends StreamElement {
​
 // 示意事件工夫的完结
 public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
 // 省略的代码
}

对于另外两条策略的解释,能够从下图中失去:

如上图,一个工作会为它的每个分区都保护一个分区水位线(partition watermark),当收到每个分区传来的水位线时,工作首先会让以后分区水位线的值与接管的水位线值相比拟,如果新接管的水位线值大于以后分区水位线值,则会将对应的分区水位线值更新为较大的水位线值(如上图中的 2 步骤),接着,工作会把事件时钟调整为以后分区水位线值的最小值,如上图步骤 2,因为以后分区水位线的最小值为 3,所以将事件工夫时钟更新为 3,而后将值为 3 的水位线播送到上游工作。步骤 3 与步骤 4 的解决逻辑同上。

同时咱们能够留神到这种设计其实有一个局限,具体体现在没有对分区 (partition) 是否来自于不同的流进行辨别,比方对于两条流或多条流的 Union 或 Connect 操作,同样是依照全副分区水位线中最小值来更新事件工夫时钟,这就导致所有的输出记录都会依照基于同一个事件工夫时钟来解决,这种一刀切的做法对于同一个流的不同分区而言是无可非议的,然而对于多条流而言,强制应用一个时钟进行同步会对整个集群带来较大的性能开销,比方当两个流的水位线相差很大是,其中的一个流要期待最慢的那条流,而较快的流的记录会在状态中缓存,直到事件工夫时钟达到容许解决它们的那个工夫点。

水位线的生成形式

通常状况下,在接管到数据源之后应该马上为其生成水位线,即越凑近数据源越好。Flink 提供两种形式生成水位线,其中一种形式为在数据源实现的,即利用 SourceFunction 在利用读入数据流的时候调配工夫戳与水位线。另一种形式是通过实现接口的自定义函数,该形式又包含两种实现形式:一种为周期性生成水位线,即实现 AssignerWithPeriodicWatermarks 接口,另一种为定点生成水位线,即实 AssignerWithPunctuatedWatermarks 接口。具体如下图所示:

数据源形式

该形式次要是实现自定义数据源,数据源调配工夫戳和水位线次要是通过外部的 SourceContext 对象实现的,先看一下 SourceFunction 的源码,如下:

public interface SourceFunction<T> extends Function, Serializable {
​
 void cancel();
​
 interface SourceContext<T> {
​
 void collect(T element);
 /**
 * 用于输入记录并从属一个与之关联的工夫戳
 */
 @PublicEvolving
 void collectWithTimestamp(T element, long timestamp);
 /**
 * 用于输入传入的水位线
 */
 @PublicEvolving
 void emitWatermark(Watermark mark);
 /**
 * 将本身标记为闲暇状态
 * 某个某个分区不在产生数据,会妨碍全局水位线后退,* 因为收不到新的记录,意味着不会收回新的水位线,* 依据水位线的流传策略,会导致整个利用都进行工作
 * Flink 提供一种机制,将数据源函数临时标记为闲暇,* 在闲暇状态下,Flink 水位线的流传机制会疏忽掉闲暇的数据流分区
 */
 @PublicEvolving
 void markAsTemporarilyIdle();
​
 Object getCheckpointLock();
​
 void close();}
}

从上面对的代码能够看出,通过 SourceContext 对象的办法能够实现工夫戳与水位线的调配。

自定义函数的形式

应用自定义函数的形式调配工夫戳,只须要调用 assignTimestampsAndWatermarks()办法,传入一个实现 AssignerWithPeriodicWatermarks 或者 AssignerWithPunctuatedWatermarks 接口的分配器即可,如下代码所示:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 SingleOutputStreamOperator<UserBehavior> userBehavior = env
 .addSource(new MysqlSource())
 .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
  • 周期分配器(AssignerWithPeriodicWatermarks)

该分配器是实现了一个 AssignerWithPeriodicWatermarks 的用户自定义函数,通过重写 extractTimestamp()办法来提取工夫戳,提取进去的工夫戳会附加在各自的记录上,查问失去的水位线会注入到数据流中。

周期性的生成水位线是指以固定的工夫距离来收回水位线并推动事件工夫的后退,对于默认的工夫距离在上文中也有提到,依据抉择的工夫语义确定默认的工夫距离,如果应用 Processing Time 或者 Event Time,默认的水位线间隔时间是 200 毫秒,当然用户也能够本人设定工夫距离,对于如何设定,先看一段代码,代码来自于 ExecutionConfig 类:

 /**
 * 设置生成水位线的工夫距离
 * 注:主动生成 watermarks 的工夫距离不能是正数
 */
 @PublicEvolving
 public ExecutionConfig setAutoWatermarkInterval(long interval) {Preconditions.checkArgument(interval >= 0, "Auto watermark interval must not be negative.");
 this.autoWatermarkInterval = interval;
 return this;
 }

所以,如果要调整默认的 200 毫秒的距离,能够调用 setAutoWatermarkInterval()办法,具体应用如下:

 // 每 3 秒生成一次水位线
env.getConfig().setAutoWatermarkInterval(3000);

下面指定了每隔 3 秒生成一次水位线,即每隔 3 秒会主动向流里注入一个水位线,在代码层面,Flink 会每隔 3 秒钟调用一次 AssignerWithPeriodicWatermarks 的 getCurrentWatermark()办法,每次调用该办法时,如果失去的值不为空并且大于上一个水位线的工夫戳,那么就会向流中注入一个新的水位线。这项查看能够无效地保障了事件工夫的递增的个性,一旦查看失败也就不会生成水位线。上面给出一个实现周期调配水位线的例子:
“java
public class MyTimestampsAndWatermarks implements AssignerWithPeriodicWatermarks<UserBehavior> {
// 定义 1 分钟的容忍间隔时间,即容许数据的最大乱序工夫
private long maxOutofOrderness = 60 * 1000;
// 察看到的最大工夫戳
private long currentMaxTs = Long.MIN_VALUE;

@Nullable
@Override
public Watermark getCurrentWatermark() {
// 生成具备 1 分钟容忍度的水位线
return new Watermark(currentMaxTs – maxOutofOrderness);
}

@Override
public long extractTimestamp(UserBehavior element, long previousElementTimestamp) {
// 获取以后记录的工夫戳
long currentTs = element.timestamp;
// 更新最大的工夫戳
currentMaxTs = Math.max(currentMaxTs, currentTs);
// 返回记录的工夫戳
return currentTs;
}
}


通过查看 TimestampAssignerd 继承关系能够发现(继承关系如下图),除此之外,Flink 还提供了两种内置的水位线分配器,别离为:AscendingTimestampExtractor 和 BoundedOutOfOrdernessTimestampExtractor 两个抽象类。![](https://upload-images.jianshu.io/upload_images/22116987-83e4971769f856e5.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)


对于 **AscendingTimestampExtractor**,个别是在数据集的工夫戳是枯燥递增的且没有乱序时应用,该办法应用以后的工夫戳生成水位线,应用形式如下:

SingleOutputStreamOperator<UserBehavior> userBehavior = env
.addSource(new MysqlSource())
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
@Override
public long extractAscendingTimestamp(UserBehavior element) {
return element.timestamp*1000;
}
});


对于 **BoundedOutOfOrdernessTimestampExtractor**,是在数据集中存在乱序数据的状况下应用,即数据有提早(任意新到来的元素与曾经到来的工夫戳最大的元素之间的时间差),这种形式能够接管一个示意最大预期提早参数,具体如下:

SingleOutputStreamOperator<UserBehavior> userBehavior = env
.addSource(new MysqlSource())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserBehavior>(Time.seconds(10)) {
@Override
public long extractTimestamp(UserBehavior element) {
return element.timestamp*1000;
}
} );


上述的代码接管了一个 10 秒钟提早的参数,这 10 秒钟意味着如果以后元素的事件工夫与达到的元素的最大工夫戳的差值在 10 秒之内,那么该元素会被解决,如果差值超过 10 秒,示意其本应该参加的计算,曾经实现了,Flink 称之为早退的数据,Flink 提供了不同的策略来解决这些早退的数据。*   ** 定点水位线分配器(AssignerWithPunctuatedWatermarks)**

该形式是基于某些事件 (批示零碎进度的非凡元祖或标记) 触发水位线的生成与发送,基于特定的事件向流中注入一个水位线,流中的每一个元素都有机会判断是否生成一个水位线,如果失去的水位线不为空并且大于之前的水位线,就生成水位线并注入流中。实现 AssignerWithPunctuatedWatermarks 接口,重写 checkAndGetNextWatermark()办法,该办法会在针对每个事件的 extractTimestamp()办法后立刻调用,以此来决定是否生成一个新的水位线,如果该办法返回一个非空并且大于之前值的水位线,就会将这个新的水位线收回。上面将会实现一个简略的定点水位线分配器

public class MyPunctuatedAssigner implements AssignerWithPunctuatedWatermarks<UserBehavior> {
// 定义 1 分钟的容忍间隔时间,即容许数据的最大乱序工夫
private long maxOutofOrderness = 60 * 1000;
@Nullable
@Override
public Watermark checkAndGetNextWatermark(UserBehavior element, long extractedTimestamp) {
// 如果读取数据的用户行为是购买,就生成水位线
if(element.action.equals(“buy”)){
return new Watermark(extractedTimestamp – maxOutofOrderness);
}else{
// 不收回水位线
return null;
}
}
@Override
public long extractTimestamp(UserBehavior element, long previousElementTimestamp) {
return element.timestamp;
}
}


### 早退的数据

上文曾经说过,事实中很难生成一个完满的水位线,水位线就是在提早与准确性之前做的一种衡量。那么,如果生成的水位线过于紧迫,即水位线可能会大于起初数据的工夫戳,这就意味着数据有提早,对于提早数据的解决,Flink 提供了一些机制,具体如下:*   间接将早退的数据抛弃

*   将早退的数据输入到独自的数据流中,即应用 sideOutputLateData(new OutputTag<>())实现侧输入

*   依据早退的事件更新并收回后果

因为篇幅限度,对于早退数据的具体解决在本文先不做太多的探讨,在后续的文章中会对其具体进行阐明。## 总结

本文从 Flink 的工夫语义开始说起,具体介绍了三种工夫语义的概念、特点及应用形式,接着对 Flink 解决乱序数据的一种机制 --- 水位线进行具体阐明,次要形容了水位线的基本概念,传播方式、生成形式,并对其中的细节局部进行了图解,能够加深对水位线的了解。最初,简略阐明了一下 Flink 对于早退数据的解决形式。> * 关注公众号: 大数据技术与数仓
> ** 收费支付百 G 大数据资料 **

正文完
 0