作者:王东阳
前言
ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这个规范。Temporal Table记录了历史上任何工夫点所有的数据改变,Temporal Table具备一般table的个性,有具体独特的DDL/DML/QUERY语法,工夫是其外围属性。历史意味着工夫,意味着快照Snapshot。
Apache Flink遵循ANSI-SQL规范,Apache Flink中Temporal Table的概念也源于ANSI-2011的规范语义,但目前的实现在语法层面和ANSI-SQL略有差异,下面看到ANSI-2011中应用FOR SYSTEM_TIME AS OF的语法,Apache Flink在晚期版本中仅仅反对 LATERAL TABLE(TemporalTableFunction)的语法,以后flinkv14版本中曾经反对FOR SYSTEM_TIME AS OF语法。
因为Flink中基于eventtime 的 temporal table join 基于flink 的watermark机制实现,为了更好的让读者了解,本文首先介绍flink中的 动静表和时序表,工夫概念,Watermark等相干常识,最初通过具体的代码用例介绍Flink中基于eventtime 的 temporal table join用法。
动静表和时序表
动静表
什么是动静表
动静表 是 Flink 的反对流数据的 Table API 和 SQL 的外围概念。与示意批处理数据的动态表不同,动静表是随工夫变动的。能够像查问动态批处理表一样查问它们。查问动静表将生成一个 间断查问 。一个间断查问永远不会终止,后果会生成一个动静表。查问不断更新其(动静)后果表,以反映其(动静)输出表上的更改。实质上,动静表上的间断查问十分相似于定义物化视图的查问。
动静表能够像一般数据库表一样通过 INSERT、UPDATE 和 DELETE 来一直批改。它可能是一个只有一行、不断更新的表,也可能是一个 insert-only 的表,没有 UPDATE 和 DELETE 批改,或者介于两者之间的其余表。
动静表转换
在将动静表转换为流或将其写入内部零碎时,须要对这些更改进行编码。Flink的 Table API 和 SQL 反对三种形式来编码一个动静表的变动:
Append-only 流: 仅通过 INSERT 操作批改的动静表能够通过输入插入的行转换为流。
Retract 流: retract 流蕴含两种类型的 message: add messages 和 retract messages 。通过将INSERT 操作编码为 add message、将 DELETE 操作编码为 retract message、将 UPDATE 操作编码为更新(先前)行的 retract message 和更新(新)行的 add message,将动静表转换为 retract 流。下图显示了将动静表转换为 retract 流的过程。
Upsert 流: upsert 流蕴含两种类型的 message: upsert messages 和delete messages。转换为 upsert 流的动静表须要(可能是组合的)惟一键。通过将 INSERT 和 UPDATE 操作编码为 upsert message,将 DELETE 操作编码为 delete message ,将具备惟一键的动静表转换为流。生产流的算子须要晓得惟一键的属性,以便正确地利用 message。与 retract 流的次要区别在于 UPDATE 操作是用单个 message 编码的,因而效率更高。下图显示了将动静表转换为 upsert 流的过程。
Flink在将动静表转换为 DataStream 时,只反对 append 流和 retract 流。前面的样例代码中会展现转换的API以及Retract 流和Upsert 流的不同。
时态表(Temporal Tables)
时态表(Temporal Table)是一张随工夫变动的表, 在 Flink 中称为动静表,时态表中的每条记录都关联了一个或多个时间段,所有的 Flink 表都是时态的(动静的)。也就是说时态表是动静表的特例,时态表肯定是动静表,动静表不肯定是时态表。
时态表蕴含表的一个或多个有版本的表快照,时态表能够是一张跟踪所有变更记录的表(例如数据库表的 changelog,蕴含多个表快照),也能够是物化所有变更之后的表(例如数据库表,只有最新表快照)。
Flink 应用主键束缚和事件工夫来定义一张版本表和版本视图,在前面介绍temporal join的相干样例中会展现这两种。
样例代码
- 环境初始化
首先初始化StreamExecutionEnvironment env 和 StreamTableEnvironment tEnv, 如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING); 设置以后运行模式为 STREAMING模式
env.setParallelism(1); 设置并行度是1次要是为测试的目标,便于察看join的后果
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 设置工夫属性是 EventTime
首先创立并注册一个一般表 RatesHistory
DataStream<Row> ratesStream = env.fromElements(Row.of(LocalDateTime.parse("2021-08-21T09:02:00"), "US Dollar", 102), Row.of(LocalDateTime.parse("2021-08-21T09:00:00"), "Euro", 114), Row.of(LocalDateTime.parse("2021-08-21T09:00:00"), "Yen", 1), Row.of(LocalDateTime.parse("2021-08-21T10:45:00"), "Euro", 116), Row.of(LocalDateTime.parse("2021-08-21T11:15:00"), "Euro", 119), Row.of(LocalDateTime.parse("2021-08-21T11:49:00"), "Pounds", 108)) .returns( Types.ROW_NAMED( new String[] {"currency_time", "currency", "rate"}, Types.LOCAL_DATE_TIME, Types.STRING, Types.INT));
Table rateTable = tEnv.fromDataStream(ratesStream, Schema.newBuilder().build());tEnv.registerTable("RatesHistory", rateTable);rateTable.printSchema();tEnv.from("RatesHistory").execute().print();
失去RatesHistory的schema信息以及表中内容:
(currency_time
TIMESTAMP(9),currency
STRING,rate
INT
)
op | currency_time | currency | rate |
---|---|---|---|
+I | 2021-08-21 09:02:00.000000000 | US Dollar | 102 |
+I | 2021-08-21 09:00:00.000000000 | Euro | 114 |
+I | 2021-08-21 09:00:00.000000000 | Yen | 1 |
+I | 2021-08-21 10:45:00.000000000 | Euro | 116 |
+I | 2021-08-21 11:15:00.000000000 | Euro | 119 |
+I | 2021-08-21 11:49:00.000000000 | Pounds | 108 |
6 rows in set
- 申明版本表
在 Flink 中,定义了主键束缚和事件工夫属性的表就是版本表。相比下面的代码,在应用fromDataStream的第二个参数Schema外面,通过columnByExpression 指定事件工夫的工夫戳(flink中要求必须是 TIMESTAMP(3) ), 通过 primaryKey("currency") 指定 currency 主键束缚。
// version table
Table versionedTable = tEnv.fromDataStream(ratesStream, Schema.newBuilder() .columnByExpression("rowtime", "CAST(currency_time AS TIMESTAMP(3))") .primaryKey("currency") .build());tEnv.registerTable("versionRate", versionedTable);System.out.println("versioned table get");versionedTable.printSchema();tEnv.from("versionRate").execute().print();
打印versionRate的schema信息以及表中内容:
(currency_time
TIMESTAMP(9),currency
STRING NOT NULL,rate
INT,rowtime
TIMESTAMP(3) AS CAST(currency_time AS TIMESTAMP(3)),
CONSTRAINT PK_currency
PRIMARY KEY (currency
) NOT ENFORCED
)
op | currency_time | currency | rate | rowtime |
---|---|---|---|---|
+I | 2021-08-21 09:02:00.000000000 | US Dollar | 102 | 2021-08-21 09:02:00.000 |
+I | 2021-08-21 09:00:00.000000000 | Euro | 114 | 2021-08-21 09:00:00.000 |
+I | 2021-08-21 09:00:00.000000000 | Yen | 1 | 2021-08-21 09:00:00.000 |
+I | 2021-08-21 10:45:00.000000000 | Euro | 116 | 2021-08-21 10:45:00.000 |
+I | 2021-08-21 11:15:00.000000000 | Euro | 119 | 2021-08-21 11:15:00.000 |
+I | 2021-08-21 11:49:00.000000000 | Pounds | 108 | 2021-08-21 11:49:00.000 |
- 申明版本视图
Flink 也反对定义版本视图只有一个视图蕴含主键和事件工夫便是一个版本视图。为了在 RatesHistory 上定义版本表,Flink 反对通过去重查问定义版本视图, 去重查问能够产出一个有序的 changelog 流,去重查问可能推断主键并保留原始数据流的事件工夫属性。
// https://nightlies.apache.org/...
Table versionedRateView = tEnv.sqlQuery( "select currency, rate, currency_time " + // (1) `currency_time` 保留了事件工夫 "from ( " + "select *, " + "ROW_NUMBER() OVER (PARTITION BY currency " + //(2) `currency` 是去重query的unique key,作为主键 " ORDER BY currency_time DESC) AS rowNum " + "FROM RatesHistory ) " + "WHERE rowNum = 1");tEnv.createTemporaryView("versioned_rates", versionedRateView);versionedRateView.printSchema();tEnv.from("versioned_rates").execute().print();
对于去重语法中的相干参数形容如下
Parameter Specification:
ROW_NUMBER(): 给每一行调配一个从1开始的递增的惟一的序号。
PARTITION BY col1[, col2...]: 指定分区列.
ORDER BY time_attr [asc|desc]:指定排序所基于的列, 必须是 time attribute. 以后Flink反对 processing time attribute 和event time attribute. Ordering by ASC 象征保留最老的那列, ordering by DESC 象征保留最新的那列.
WHERE rownum = 1: rownum = 1 用于 Flink 获取到去重后的数据。
行 (1) 保留了事件工夫作为视图 versioned_rates 的事件工夫,行 (2) 使得视图 versioned_rates 有了主键, 因而视图 versioned_rates 是一个版本视图。
视图中的去重 query 会被 Flink 优化并高效地产出 changelog stream, 产出的 changelog 保留了主键束缚和事件工夫。
打印schema和versioned_rates表中内容
(currency
STRING,rate
INT,currency_time
TIMESTAMP(9)
)
op | currency | rate | currency_time |
---|---|---|---|
+I | US Dollar | 102 | 2021-08-21 09:02:00.000000000 |
+I | Euro | 114 | 2021-08-21 09:00:00.000000000 |
+I | Yen | 1 | 2021-08-21 09:00:00.000000000 |
-U | Euro | 114 | 2021-08-21 09:00:00.000000000 |
+U | Euro | 116 | 2021-08-21 10:45:00.000000000 |
-U | Euro | 116 | 2021-08-21 10:45:00.000000000 |
+U | Euro | 119 | 2021-08-21 11:15:00.000000000 |
+I | Pounds | 108 | 2021-08-21 11:49:00.000000000 |
能够看到Euro中一共有5条记录,属于Retract 类型的changelogstream,蕴含UPDATE_BEFORE, UPDATE_AFTER两种
| +I | Euro | 114 | 2021-08-21 09:00:00.000000000 |
| -U | Euro | 114 | 2021-08-21 09:00:00.000000000 |
| +U | Euro | 116 | 2021-08-21 10:45:00.000000000 |
| -U | Euro | 116 | 2021-08-21 10:45:00.000000000 |
| +U | Euro | 119 | 2021-08-21 11:15:00.000000000 |
这5条记录的具体生成过程如下:
当第一条 Euro数据 LocalDateTime.parse("2021-08-21T09:00:00"), "Euro", 114 进入到versioned_rates 所对应的底层Flink算子中时,因为以后Euro只有这一条记录,执行去重语法后就失去 Euro 114 2021-08-21 09:00:00.000000000
当第二条 Euro数据 LocalDateTime.parse("2021-08-21T10:45:00"), "Euro", 116 进入到versioned_rates 所对应的底层Flink算子中时,执行去重语法获取最新的记录就失去 Euro 116 2021-08-21 10:45:00.000000000, 因为 currency是主键,所以执行更新操作,生成UPDATE_BEFORE,UPDATE_AFTER 两条changelogstream。
当第三条 Euro数据 LocalDateTime.parse("2021-08-21T11:15:00"), "Euro", 119 进入到versioned_rates 所对应的底层Flink算子中时,执行去重语法获取最新的记录就失去 Euro 119 2021-08-21 11:15:00.000000000, 因为 currency是主键,所以执行更新操作,生成UPDATE_BEFORE,UPDATE_AFTER 两条changelogstream。
- 通过table API显示转为datastream
flink API 中提供了 toChangelogStream 接口
// default retract mode
tEnv.toChangelogStream(versionedRateView) .executeAndCollect() .forEachRemaining(System.out::println);tEnv.toChangelogStream(versionedRateView, Schema.newBuilder().build(), ChangelogMode.upsert()) .executeAndCollect() .forEachRemaining(System.out::println);
+I[US Dollar, 102, 2021-08-21T09:02]
+I[Euro, 114, 2021-08-21T09:00]
+I[Yen, 1, 2021-08-21T09:00]
-U[Euro, 114, 2021-08-21T09:00]
+U[Euro, 116, 2021-08-21T10:45]
-U[Euro, 116, 2021-08-21T10:45]
+U[Euro, 119, 2021-08-21T11:15]
+I[Pounds, 108, 2021-08-21T11:49]
失去输入
+I[US Dollar, 102, 2021-08-21T09:02]
+I[Euro, 114, 2021-08-21T09:00]
+I[Yen, 1, 2021-08-21T09:00]
+U[Euro, 116, 2021-08-21T10:45]
+U[Euro, 119, 2021-08-21T11:15]
+I[Pounds, 108, 2021-08-21T11:49]
水印 Watermark
此章节次要参考
《flink内核原理与实现》 4.4章节 水印
《Flink原理、实战与性能优化》4.2章节 工夫概念与Watermark
工夫概念
对于流式数据处理,最大的特点是数据上具备工夫的属性特色,Flimk依据工夫产生的地位不同,将工夫辨别为三种工夫概念,别离为 事件生成工夫(Event Time)、事件接入工夫(Ingestion Time)和事件处理工夫(Processing Time)
1.事件工夫(Event Time)
事件工夫(Event Time)是每个独立事件在产生它的设施上产生的工夫,这个工夫通常在事件进入Flink之前就曾经嵌入到事件中,时 间程序取决于事件产生的中央,和上游数据处理系统的工夫无关。事件数据具备不变的事件工夫属性,该工夫自事件元素产生就不会改 变。通常状况下能够在Flink零碎中指定事件工夫属性或者设定工夫提取器来提取事件工夫。
所有进入到Flink流式零碎解决的事件,其工夫都是在内部零碎中产生,通过网络进入到Flink零碎内解决的,在实践状况下(所有零碎 都具备雷同零碎时钟),事件工夫对应的工夫戳肯定会早于在Flink零碎中解决的工夫戳,但在理论状况中往往会呈现数据记录乱序、提早 达到等问题。基于EventTime的工夫概念,数据处理过程依赖于数据自身产生的工夫,而不是Flink零碎中Operator所在主机节点的零碎工夫,这样可能借助于事件产生时的工夫信息来还原事件的先后关系。
2.接入工夫(Ingestion Time)
接入工夫(Ingestion Time)是数据进入Flink零碎的工夫,Ingestion Time依赖于Source Operator所在主机的零碎时钟。Ingestion Time介于Event Time和Process Time之间,绝对于ProcessTime,Ingestion Time生成的代价绝对较高,Ingestion Time具备一 定的可预见性,次要因为Ingestion Time在数据接入过程生成后,工夫戳就不再发生变化,和后续数据处理Operator所在机器的时钟没有 关系,从而不会因为某台机器时钟不同步或网络时延而导致计算结果不精确的问题。然而须要留神的是相比于Event Time,Ingestion Time不能解决乱序事件,所以也就不必生成对应的Watermarks
3.解决工夫(Processing Time)
解决工夫(Processing Time)是指数据在操作算子计算过程中获取到的所在主机工夫。当用户抉择应用Processing Time时,所有和时 间相干的计算算子,例如Windows计算,在以后的工作中所有的算子将间接应用其所在主机的零碎工夫。Processing Time是Flink零碎中最简略的一种工夫概念,基于Processing Time工夫概念,Flink的程序性能绝对较高,延时也绝对较低,对接入到零碎中的数据工夫相干的计算齐全交给算子外部决定,工夫窗口计算依赖的工夫都是在具体算子运行的过程中产生,不须要做任何工夫上的比照和协调。但 Processing Time工夫概念尽管在性能和易用性的角度上具备劣势,但思考到对数据乱序解决的状况,Processing Time就不是最优的抉择。同时在分布式系统中,数据自身不乱序,但每台机器的工夫如果不同步,也可能导致数据处理过程中数据乱序的问题,从而影响计算结果。总之,Processing Time概念实用于工夫计算精度要求不是特地高的计算场景,例如统计某些延时十分高的日志数据等。
4.工夫概念指定
在Flink中默认状况下应用是Process Time工夫概念,如果用户抉择应用Event Time或者Ingestion Time概念,则须要在创立的 StreamExecutionEnvironment中调用setStreamTimeCharacteristic()办法设定零碎的工夫概念,如下代码应用 TimeCharacteristic.EventTime作为零碎的工夫概念,这样对以后的StreamExecutionEnvironment会全局失效
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
setStreamTimeCharacteristic 在Flink1.14中曾经不倡议应用了,进入到setStreamTimeCharacteristic底层源码,能够看到底层是调用了getConfig().setAutoWatermarkInterval
@Deprecated
@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = (TimeCharacteristic)Preconditions.checkNotNull(characteristic);if (characteristic == TimeCharacteristic.ProcessingTime) { this.getConfig().setAutoWatermarkInterval(0L);} else { this.getConfig().setAutoWatermarkInterval(200L);}
}
5.如何解决乱序问题
这里援用下 Apache Flink 漫谈系列 - Watermark是个啥? 这篇文章中的实例图片
当Watermark的工夫戳等于Event中携带的EventTime时候,也就是(Watermark=EventTime) 的计算形式如下
因为watermark = eventtime, 所以 在Eventtime等于15s的时候, windows2[10,15) 这个窗口就触发了进行了计算,而后在16s的时候乱序的11号数据达到,被抛弃。
如果想正确处理迟来的数据能够定义Watermark生成策略为 Watermark = EventTime -5s, 如下
因为 Watermark = EventTime -5s,在Eventtime为15s的时候,Watermark等于15-5=10s, 并不会触发Window2窗口计算, 只有在Eventtime为20s的时候, Watermark等于20-5 = 15s ,触发Window2[10, 15) 的计算,因为乱序的11号记录在之前的16s时候达到了,所以Window2的计算蕴含了11号记录。
Watermark生成
从流解决原始设施产生事件,到Flink读取到数据,再到Flink多个算子解决数据,在这个过程中,会受到网络提早、数据乱序、背压、Failover等多种状况的影响,导致数据是乱序的。尽管大部分状况下没有问题,然而不得不在设计上思考此类异常情况,为了保障计算结果的正确性,须要期待数据,这带来了计算的提早。对于提早太久的数据,不能无限期地等上来,所以必须有一个机制,来保障特定的工夫后肯定会触发窗口进行计算。
比方基于事件工夫的Window创立后,具体该如何确定属于该Window的数据元素曾经全副达到。如果确定全副达到,就能够对Window的所有数据做窗口计算操作(如汇总、分组等),如果数据没有全副达到,则持续期待该窗口中的数据全副达到才开始解决。这种状况下就须要用到水位线(WaterMarks)机制,它可能掂量数据处理进度(表白数据达到的完整性),保障事件数据(全副)达到Flink零碎,或者在乱序及提早达到时,也可能像预期一样计算出正确并且间断的后果。Flink会将用读取进入零碎的最新事件工夫减去固定的工夫距离作为Watermark,该工夫距离为用户内部配置的反对最大提早达到的工夫长度,也就是说实践上认为不会有事件超过该距离达到,否则就认为是早退事件或异样事件。
通常Watermark在Source Function中生成,如果是并行计算的工作,在多个并行执行的Source Function中,互相独立产生各自的 Watermark。而Flink提供了额定的机制,容许在调用DataStream API操作(如map、filter等)之后,依据业务逻辑的须要,应用工夫戳和Watermark生成器批改数据记录的工夫戳和Watermark。
- Source Function
Source Function能够间接为数据元素调配工夫戳,同时也会向上游 发 送 Watermark 。 在 Source Function 中 为 数 据 分 配 了 时 间 戳 和Watermark就不用在DataStream API中应用了。须要留神的是:如果一个 timestamp 分 配 器 被 使 用 的 话 , 由 源 提 供 的 任 何 Timestamp 和Watermark都会被重写。
public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
public void run(SourceContext<T> ctx) { ... T next = getNext(); // 为元素赋予工夫戳 ctx.collectWithTimestamp(next, next.getEventTimestamp()); // 生成Watermark发送到上游 if (next.hasWatermarkTime()){ ctx.emitWatermark(new Watermark(next.getWatermarkTime())); } ...}
}
另外须要留神下官网源码中的一段正文:
Sources may assign timestamps to elements and may manually emit watermarks.
- However, these are only interpreted if the streaming program runs on
- {@link TimeCharacteristic#EventTime}. On other time characteristics
- ({@link TimeCharacteristic#IngestionTime} and {@link TimeCharacteristic#ProcessingTime}),
- the watermarks from the source function are ignored.
如果以后flink环境中TimeCharacteristic 设置的工夫概念是EventTime, Source function会为元素调配 timestamp 和 触发watermark
如果以后flink环境中TimeCharacteristic 设置的工夫概念是IngestionTime 或 ProcessingTime, 那么source function中产生的watermark会疏忽掉。- DataStream API
DataStream API中应用的TimestampAssigner接口定义了工夫戳的提 取 行 为 , 其 有 两 个 不 同 接 AssignerWithPeriodicWatermarks 和AssignerWithPunctuatedWatermarks,别离代表了不同的Watermark生成策略
周期性Watermark策略 : 在 Flink 中 叫 作PeriodicWatermarkAssigner,周期性(肯定工夫距离或者达到肯定的记 录 条 数 ) 地 产 生 一 个 Watermark 。 在 实 际 的 生 产 中 使 用 周 期 性Watermark策略的时候,必须留神工夫和数据量,联合工夫和积攒条数两个维度持续周期性产生Watermark,否则在极其状况下会有很大的延时。这个策略又能够分为两种:
AscendingTimestamps:递增Watermark,作用在Flink SQL中的Rowtime属性上,Watermark=以后收到的数据元素的最大工夫戳-1,此处减1的目标是确保有最大工夫戳的事件不会被当做早退数据抛弃。
BoundedOutOfOrderTimestamps:固定提早Watermark,作用在Flink SQL的Rowtime属性上,Watermark=以后收到的数据元素的最大工夫戳-固定提早 。
每事件Watermark策略 :在 Flink 中 叫 作PuntuatedWatamarkAssigner ,对每一个事件都会尝试进行Watermark的生成,然而如果生成的Watermark是null或者Watermark小于之前的Watermark,则该Watermark不会发往上游,因为发往上游也不会有任何成果,不会触发任何窗口的执行。 在理论的生产中Punctuated形式在TPS很高的场景下会产生大量的Watermark,在肯定水平上会对上游算子造成压力,所以只有在实时性要求十分高的场景下才会抉择Punctuated的形式进行Watermark的生成。
- 多流的Watermark
Watermark 在 作 业 的 DAG 从 上 游 向 下 游 传 递 , 算 子 收 到 上 游Watermark后会更新其Watermark。如果新的Watermark大于算子的以后Watermark,则更新算子的Watermark为新Watermark,并发送给上游算子 。Watermark是在Source Function中生成或者在后续的DataStreamAPI中生成的。
Flink作业个别是并行执行的,作业蕴含多个Task,每个Task运行一个或一组算子(OperatorChain)实例,Task在生成Watermark的时候是互相独立的,也就是说在作业中存在多个并行的Watermark。 某些算子会有多个上游输出,如Union或keyBy、partition之后的算子。在Flink的底层执行模型上,多流输出会被合成为多个双流输出,所以对于多流Watermark的解决也就是双流Watermark的解决,无论是哪一个流的Watermark进入算子,都须要跟另一个流的以后算子进行 比 较 ,选 择 较 小 的 Watermark , 即Min ( input1Watermark,intput2Watermark ,与 算 子 当 前 的Watermark 比 较 , 如 果 大 于 算 子 当 前 的 Watermark , 则 更 新 算 子 的Watermark为新的Watermark,并发送给上游
如上图,Source算子产生各自的Watermark,并随着数据流流向上游的map算子,map算子是无状态计算,所以会将Watermark向下透 传。window算子收到上游两个输出的Watermark后,抉择其中较小的一个发送给上游,window(1)算子比拟Watermark 29和Watermark 14,抉择Watermark 14作为算子以后Watermark,并将Watermark 14发往上游,window(2)算子也采纳雷同的逻辑。
- 新的水印策略
在flink 1.11之前的版本中,提供了两种生成水印(Watermark)的策略,别离是AssignerWithPunctuatedWatermarks和AssignerWithPeriodicWatermarks,这两个接口都继承自TimestampAssigner接口。
用户想应用不同的水印生成形式,则须要实现不同的接口,然而这样引发了一个问题,对于想给水印增加一些通用的、公共的性能则变得复杂,因为咱们须要给这两个接口都同时增加新的性能,这样还造成了代码的反复。
所以为了防止代码的反复,在flink 1.11 中对flink的水印生成接口进行了重构,
4.1 新的水印生成接口
当咱们构建了一个DataStream之后,应用assignTimestampsAndWatermarks办法来结构水印,新的接口须要传入一个WatermarkStrategy对象。
DataStream#assignTimestampsAndWatermarks(WatermarkStrategy<T>)
WatermarkStrategy 这个接口是做什么的呢?这外面提供了很多动态的办法和带有缺省实现的办法,只有一个办法是非default和没有缺省实现的,就是上面的这个办法。
/**
- Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
*/
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
所以默认状况下,咱们只须要实现这个办法就行了,这个办法次要是返回一个 WatermarkGenerator,咱们在进入这里边看看。
@Public
public interface WatermarkGenerator<T> {
/**
- Called for every event, allows the watermark generator to examine and remember the
- event timestamps, or to emit a watermark based on the event itself.
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
/**
- Called periodically, and might emit a new watermark, or not.
* - <p>The interval in which this method is called and Watermarks are generated
- depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
*/
void onPeriodicEmit(WatermarkOutput output);
}
这个办法简单明了,次要是有两个办法:
onEvent :每个元素都会调用这个办法,如果咱们想依赖每个元素生成一个水印,而后发射到上游(可选,就是看是否用output来收集水印),咱们能够实现这个办法.
onPeriodicEmit : 如果数据量比拟大的时候,咱们每条数据都生成一个水印的话,会影响性能,所以这里还有一个周期性生成水印的办法。这个水印的生成周期能够这样设置:env.getConfig().setAutoWatermarkInterval(5000L);
咱们本人实现一个简略的周期性的发射水印的例子:
在这个onEvent办法里,咱们从每个元素里抽取了一个工夫字段,然而咱们并没有生成水印发射给上游,而是本人保留了在一个变量里,在onPeriodicEmit办法里,应用最大的日志工夫减去咱们想要的延迟时间作为水印发射给上游。
DataStream<Tuple2<String,Long>> withTimestampsAndWatermarks = dataStream.assignTimestampsAndWatermarks(
new WatermarkStrategy<Tuple2<String,Long>>(){ @Override public WatermarkGenerator<Tuple2<String,Long>> createWatermarkGenerator( WatermarkGeneratorSupplier.Context context){ return new WatermarkGenerator<Tuple2<String,Long>>(){ private long maxTimestamp; private long delay = 3000; @Override public void onEvent( Tuple2<String,Long> event, long eventTimestamp, WatermarkOutput output){ maxTimestamp = Math.max(maxTimestamp, event.f1); } @Override public void onPeriodicEmit(WatermarkOutput output){ output.emitWatermark(new Watermark(maxTimestamp - delay)); } }; }});
4.2 内置水印生成策略
为了不便开发,flink提供了一些内置的水印生成办法供咱们应用。
4.2.1 固定提早生成水印
通过静态方法forBoundedOutOfOrderness提供,入参接管一个Duration类型的工夫距离,也就是咱们能够承受的最大的延迟时间.应用这种提早策略的时候须要咱们对数据的延迟时间有一个大略的预估判断。
WatermarkStrategy#forBoundedOutOfOrderness(Duration maxOutOfOrderness)
咱们实现一个提早3秒的固定提早水印,能够这样做:
DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));
他的底层应用的WatermarkGenerator接口的一个实现类BoundedOutOfOrdernessWatermarks。
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
4.2.2 枯燥递增生成水印
通过静态方法forMonotonousTimestamps来提供.
WatermarkStrategy.forMonotonousTimestamps()
这个也就是相当于上述的提早策略去掉了延迟时间,以event中的工夫戳充当了水印。
在程序中能够这样应用:
DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
它的底层实现是AscendingTimestampsWatermarks,其实它就是BoundedOutOfOrdernessWatermarks类的一个子类,没有了延迟时间,咱们来看看具体源码的实现.
@Public
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {
/**
- Creates a new watermark generator with for ascending timestamps.
*/
public AscendingTimestampsWatermarks() {
super(Duration.ofMillis(0));
}
}
4.2.3 eventtime的获取
上述咱们讲了flink自带的两种水印生成策略,然而对于咱们应用eventtime语义的时候,咱们想从咱们的本人的数据中抽取eventtime,这个就须要TimestampAssigner了.
@Public
@FunctionalInterface
public interface TimestampAssigner<T> {
............
long extractTimestamp(T element, long recordTimestamp);
}
应用的时候咱们次要就是从咱们本人的元素element中提取咱们想要的eventtime。
应用flink自带的水印策略和eventtime抽取类,能够这样用:
List<Tuple3<String, Long, Timestamp>> rateHistoryData = Lists.newArrayList();
rateHistoryData.add(new Tuple3<>("US Dollar", 102L, new Timestamp(1L)));rateHistoryData.add(new Tuple3<>("Euro", 114L, new Timestamp(1L)));rateHistoryData.add(new Tuple3<>("Yen", 1L, new Timestamp(1L)));rateHistoryData.add(new Tuple3<>("Euro", 116L, new Timestamp(5L)));rateHistoryData.add(new Tuple3<>("Euro", 119L, new Timestamp(7L)));DataStream<Tuple3<String, Long, Timestamp>> rateStream = env.fromCollection(rateHistoryData) .assignTimestampsAndWatermarks( WatermarkStrategy // here <Tuple3< String, Long, Timestamp> is needed for using withTimestampAssigner .<Tuple3<String, Long, Timestamp>>forBoundedOutOfOrderness(Duration.ofSeconds(10)) .withTimestampAssigner((event, timestamp) -> event.f2.getTime()) );
其中.withTimestampAssigner((event, timestamp) -> event.f2.getTime()) 中的(event, timestamp) -> event.f2.getTime() 是一个闭包,用来标识如何从元素中失去eventtime. 在这个例子中,元素类型是元组 Tuple3, 该元组中第三个元素也就是对应的f2就是事件的产生工夫。
Event Time Temporal Join
概念介绍
Event Time Temporal joins 能够利用 versioned table执行join操作,也就是说一个表能够应用变动中的元素进行加宽,通过查问在特定工夫点上值。
Temporal joins 承受一个左表,会把左表中每一行数据在右表( versioned table )中找到对应的行数据进行关联。 Flink应用SQL:2011规范中的 FOR SYSTEM_TIME AS OF SQL 语法执行这个操作。 Temporal joins的语法如下
SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1
借助 event-time 属性(也称为 rowtime 属性), 就能够检索失去特定key在过来某个工夫点上的值,从而能够把两个表中在雷同工夫点上的数据进行join。 versioned table保留了不同版本也就是在不同工夫点的数据。
举例,假如咱们有一个订单的表,不同的订单中价格应用的货币不一样,而货币在不同时刻的汇率也不一样,为了应用对立的货币例如人民币计算各个订单的消费额,每一个订单须要和订单产生时的货币汇率进行join。
外部实现原理
能够去 孙金城 大佬的文章 Apache Flink 漫谈系列(11) - Temporal Table JOIN ,不过这篇文章比拟老了,外面的代码还是应用 LATERAL TABLE 语法进行join,不过外面的外部实现原理的示意图倒是对钻研有肯定的参考意义。
注意事项
event-time temporal join 是通过左右输出流中的watermark触发的,所以要确保join两边的watermark设置正确;
The event-time temporal join 要求 temporal join 条件等式中必须蕴含主键;
样例代码
- Table function join
Flink晚期是应用 LATERAL TABLE 语法联合 Table function 实现的 Temporal table join , 这种形式没有FOR SYSTEM_TIME AS OF语法中对于watermark和主键的强制要求,具体应用形式能够看上面的代码:
1.0 环境初始化
首先初始化StreamExecutionEnvironment env 和 StreamTableEnvironment tEnv, 如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING); 设置以后运行模式为 STREAMING模式
env.setParallelism(1); 设置并行度是1次要是为测试的目标,便于察看join的后果
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 设置工夫属性是 EventTime,
1.1 创立和注册左表 Orders
构建填充到Orders中的数据
List<Tuple3<Long, String, Timestamp>> orderData = Lists.newArrayList();
orderData.add(new Tuple3<>(2L, "Euro", new Timestamp(2L)));orderData.add(new Tuple3<>(1L, "US Dollar", new Timestamp(3L)));orderData.add(new Tuple3<>(50L, "Yen", new Timestamp(4L)));orderData.add(new Tuple3<>(3L, "Euro", new Timestamp(5L)));orderData.add(new Tuple3<>(5L, "Euro", new Timestamp(9L)));
基于List构建DataStream,并通过assignTimestampsAndWatermarks 设置工夫戳和watermark,而后基于DataStream中构建Table并注册为表名 Orders。因为Tuple3构建的DataStream中的列名默认别离是f0,f1,f2, 这里 $("amount"), $("currency"), $("eventtime").rowtime() 的作用就是把列名改为了 amount, currency, eventtime,同时将eventtime标记为rowtime()。
DataStream<Tuple3<Long, String, Timestamp>> orderStream = env.fromCollection(orderData)
.assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple3<Long, String, Timestamp>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((event, timestamp) -> event.f2.getTime()) );Table orderTable = tEnv.fromDataStream(orderStream, $("amount"), $("currency"), $("eventtime").rowtime());// here we use rowtiime()tEnv.registerTable("Orders", orderTable);
留神这里尽管应用了assignTimestampsAndWatermarks也设置了Watermark,然而在本例中并没有用到,因为 LATERAL TABLE 语法只须要Timestamp提取eventtime。
打印Schema和表外面的内容
orderTable.printSchema();//only for debug
tEnv.from("Orders").execute().print(); // only for debug
(amount
BIGINT,currency
STRING,eventtime
TIMESTAMP(3) ROWTIME
)
op | amount | currency | eventtime |
---|---|---|---|
+I | 2 | Euro | 1970-01-01 00:00:00.002 |
+I | 1 | US Dollar | 1970-01-01 00:00:00.003 |
+I | 50 | Yen | 1970-01-01 00:00:00.004 |
+I | 3 | Euro | 1970-01-01 00:00:00.005 |
+I | 5 | Euro | 1970-01-01 00:00:00.009 |
1.2 创立和注册右表 RatesHistory
构建数据
List<Tuple3<String, Long, Timestamp>> rateHistoryData = Lists.newArrayList();
rateHistoryData.add(new Tuple3<>("US Dollar", 102L, new Timestamp(1L)));rateHistoryData.add(new Tuple3<>("Euro", 114L, new Timestamp(1L)));rateHistoryData.add(new Tuple3<>("Yen", 1L, new Timestamp(1L)));rateHistoryData.add(new Tuple3<>("Euro", 116L, new Timestamp(5L)));rateHistoryData.add(new Tuple3<>("Euro", 119L, new Timestamp(7L)));
基于List构建DataStream,并通过assignTimestampsAndWatermarks 设置工夫戳和watermark,而后基于DataStream中构建Table并注册为表名 RatesHistory。因为Tuple3构建的DataStream中的列名默认别离是f0,f1,f2, 这里 $("amount"), $("currency"), $("eventtime").rowtime() 的作用就是把列名改为了 currency, rate, eventtime,同时将eventtime标记为rowtime()。
DataStream<Tuple3<String, Long, Timestamp>> rateStream = env.fromCollection(rateHistoryData)
.assignTimestampsAndWatermarks( WatermarkStrategy // here <Tuple3< String, Long, Timestamp> is needed for using withTimestampAssigner .<Tuple3<String, Long, Timestamp>>forBoundedOutOfOrderness(Duration.ofSeconds(10)) .withTimestampAssigner((event, timestamp) -> event.f2.getTime()) );Table rateTable = tEnv.fromDataStream(rateStream, $("currency"), $("rate"), $("eventime").rowtime());tEnv.registerTable("RatesHistory", rateTable);
打印Schema和表外面的内容
rateTable.printSchema();//only for debug
tEnv.from("RatesHistory").execute().print(); // only for debug
(currency
STRING,rate
BIGINT,eventime
TIMESTAMP(3) ROWTIME
)
op | currency | rate | eventime |
---|---|---|---|
+I | US Dollar | 102 | 1970-01-01 00:00:00.001 |
+I | Euro | 114 | 1970-01-01 00:00:00.001 |
+I | Yen | 1 | 1970-01-01 00:00:00.001 |
+I | Euro | 116 | 1970-01-01 00:00:00.005 |
+I | Euro | 119 | 1970-01-01 00:00:00.007 |
1.3 注册表函数
基于rateTable创立和注册为表函数 Rates,如下:
tEnv.registerFunction(
"Rates", rateTable.createTemporalTableFunction("eventime", "currency"));
须要留神createTemporalTableFunction的第一个参数传入工夫戳所在列,第二个参数传入两表联结时进行匹配的列,例如在咱们的例子中,Orders 表和 RatesHistory 表基于currency进行匹配。
1.4 执行联结查问
执行上面的联结查问语句:
String sqlQuery =
"SELECT o.eventtime, o.currency, o.amount, r.rate, " + " o.amount * r.rate as amount_sum " + "from " + " Orders AS o, " + " LATERAL TABLE (Rates(o.eventtime)) AS r " + "WHERE r.currency = o.currency";tEnv.sqlQuery(sqlQuery).execute().print();
查问后果如下:
op | eventtime | currency | amount | rate | amount_sum |
---|---|---|---|---|---|
+I | 1970-01-01 00:00:00.003 | US Dollar | 1 | 102 | 102 |
+I | 1970-01-01 00:00:00.004 | Yen | 50 | 1 | 50 |
+I | 1970-01-01 00:00:00.002 | Euro | 2 | 114 | 228 |
+I | 1970-01-01 00:00:00.005 | Euro | 3 | 116 | 348 |
+I | 1970-01-01 00:00:00.009 | Euro | 5 | 119 | 595 |
能够看到在不同的eventtime上,Euro 对应的rate不一样,最终计算总和的时候采纳两边相应工夫点的数据进行计算。
- FOR SYSTEM_TIME AS OF
2.0 环境初始化
首先初始化StreamExecutionEnvironment env 和 StreamTableEnvironment tEnv, 如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING); 设置以后运行模式为 STREAMING模式
env.setParallelism(1); 设置并行度是1次要是为测试的目标,便于察看join的后果
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 设置工夫属性是 EventTime,
2.1 创立和注册左表 Orders
构建填充到Orders中的数据
List<Tuple3<Long, String, Timestamp>> orderData = Lists.newArrayList();
orderData.add(new Tuple3<>(2L, "Euro", new Timestamp(2000L)));orderData.add(new Tuple3<>(1L, "US Dollar", new Timestamp(3000L)));orderData.add(new Tuple3<>(50L, "Yen", new Timestamp(4000L)));orderData.add(new Tuple3<>(3L, "Euro", new Timestamp(5000L)));orderData.add(new Tuple3<>(5L, "Euro", new Timestamp(9000L)));
基于List构建DataStream,并通过assignTimestampsAndWatermarks 设置工夫戳和watermark.
DataStream<Tuple3<Long, String, Timestamp>> orderStream = env.fromCollection(orderData)
.assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple3<Long, String, Timestamp>>forBoundedOutOfOrderness(Duration.ofSeconds(10000)) .withTimestampAssigner((event, timestamp) -> event.f2.getTime()) );
而后基于DataStream构建Table并注册为表名 Orders。
Table orderTable = tEnv.fromDataStream(orderStream, Schema.newBuilder()
.columnByExpression("rowtime", "CAST(f2 AS TIMESTAMP(3))") .watermark("rowtime", "SOURCE_WATERMARK()") .build());tEnv.registerTable("Orders", orderTable);
留神这里创立 Table的API与后面Table function join的不同,fromDataStream的第二个参数是Schema 类型,这段代码达到的性能是:
主动从DataStream中继承所有的物理列
通过创立一个计算列来拜访流记录中的工夫戳,对应代码中columnByExpression (这里是创立了一个rowtime属性列)
继承DataStream中已有的watermark, 对应代码中("rowtime", "SOURCE_WATERMARK()") ,这种用法的前提是DataStream中曾经通过定义了相干的watermark 策略,在本例中咱们应用 assignTimestampsAndWatermarks 对orderStream增加了watermark。
打印Schema和表外面的内容
orderTable.printSchema();//only for debug
tEnv.from("Orders").execute().print(); // only for debug
(f0
BIGINT NOT NULL,f1
STRING,f2
TIMESTAMP(9),rowtime
TIMESTAMP(3) ROWTIME AS CAST(f2 AS TIMESTAMP(3)),
WATERMARK FOR rowtime
: TIMESTAMP(3) AS SOURCE_WATERMARK()
)
op | f0 | f1 | f2 | rowtime |
---|---|---|---|---|
+I | 2 | Euro | 1970-01-01 08:00:02.000000000 | 1970-01-01 08:00:02.000 |
+I | 1 | US Dollar | 1970-01-01 08:00:03.000000000 | 1970-01-01 08:00:03.000 |
+I | 50 | Yen | 1970-01-01 08:00:04.000000000 | 1970-01-01 08:00:04.000 |
+I | 3 | Euro | 1970-01-01 08:00:05.000000000 | 1970-01-01 08:00:05.000 |
+I | 5 | Euro | 1970-01-01 08:00:09.000000000 | 1970-01-01 08:00:09.000 |
能够看到orderTable的Schema中多了 上面两条属性信息,别离对应代码中设置的rowtime 和 watermark
rowtime
TIMESTAMP(3) ROWTIME AS CAST(f2 AS TIMESTAMP(3)),
WATERMARK FOR rowtime
: TIMESTAMP(3) AS SOURCE_WATERMARK()
2.2 创立和注册右表 RatesHistory
构建填充到 RatesHistory 中的数据
List<Tuple3<String, Long, Timestamp>> rateHistoryData = Lists.newArrayList();
rateHistoryData.add(new Tuple3<>("US Dollar", 102L, new Timestamp(1000L)));rateHistoryData.add(new Tuple3<>("Euro", 114L, new Timestamp(1000L)));rateHistoryData.add(new Tuple3<>("Yen", 1L, new Timestamp(1000L)));rateHistoryData.add(new Tuple3<>("Euro", 116L, new Timestamp(5000L)));rateHistoryData.add(new Tuple3<>("Euro", 119L, new Timestamp(7000L)));
基于List构建DataStream,并通过assignTimestampsAndWatermarks 设置工夫戳和watermark.
DataStream<Tuple3<String, Long, Timestamp>> rateStream = env.fromCollection(rateHistoryData)
.assignTimestampsAndWatermarks( WatermarkStrategy // here <Tuple3< String, Long, Timestamp> is need for using withTimestampAssigner .<Tuple3<String, Long, Timestamp>>forBoundedOutOfOrderness(Duration.ofSeconds(1000)) .withTimestampAssigner((event, timestamp) -> event.f2.getTime()) );
而后基于DataStream构建Table并注册为表名 Orders。
Table rateTable = tEnv.fromDataStream(
rateStream, Schema.newBuilder() .columnByExpression("rowtime", "CAST(f2 AS TIMESTAMP(3))") .watermark("rowtime", "SOURCE_WATERMARK()") .primaryKey("f0") .build());tEnv.createTemporaryView("RatesHistory", rateTable);
留神这里RatesHistory的Schema中多了一个 primaryKey("f0"), 否则在执行join的时候会报错
Invalid primary key 'PK_f0'. Column 'f0' is nullable.
因为在Flink中,如果要反对 FOR SYSTEM_TIME AS OF, 右表中必须要有主键,而且这个主键对应join操作中on等值匹配的列。
打印Schema和表外面的内容
rateTable.printSchema();//only for debug
tEnv.from("RatesHistory").execute().print();
(f0
STRING NOT NULL,f1
BIGINT NOT NULL,f2
TIMESTAMP(9),rowtime
TIMESTAMP(3) ROWTIME AS CAST(f2 AS TIMESTAMP(3)),
WATERMARK FOR rowtime
: TIMESTAMP(3) AS SOURCE_WATERMARK(),
CONSTRAINT PK_f0
PRIMARY KEY (f0
) NOT ENFORCED
)
op | f0 | f1 | f2 | rowtime |
---|---|---|---|---|
+I | US Dollar | 102 | 1970-01-01 08:00:01.000000000 | 1970-01-01 08:00:01.000 |
+I | Euro | 114 | 1970-01-01 08:00:01.000000000 | 1970-01-01 08:00:01.000 |
+I | Yen | 1 | 1970-01-01 08:00:01.000000000 | 1970-01-01 08:00:01.000 |
+I | Euro | 116 | 1970-01-01 08:00:05.000000000 | 1970-01-01 08:00:05.000 |
+I | Euro | 119 | 1970-01-01 08:00:07.000000000 | 1970-01-01 08:00:07.000 |
能够看到rateTable的Schema中多了 上面三条属性信息,别离对应代码中设置的rowtime(计算列) , watermark和 primaryKey
rowtime
TIMESTAMP(3) ROWTIME AS CAST(f2 AS TIMESTAMP(3)),
WATERMARK FOR rowtime
: TIMESTAMP(3) AS SOURCE_WATERMARK(),
CONSTRAINT PK_f0
PRIMARY KEY (f0
) NOT ENFORCED
2.3 执行join
String sqlQuery2 =
"SELECT o.f1 as currency, o.f0 as amount, o.rowtime, r.f1 as rate, " + " o.f0 * r.f1 as amount_sum " + "from " + " Orders AS o " + " JOIN RatesHistory FOR SYSTEM_TIME AS OF o.rowtime AS r " + "ON o.f1 = r.f0";tEnv.sqlQuery(sqlQuery2).execute().print();
失去
op | currency | amount | rowtime | rate | amount_sum |
---|---|---|---|---|---|
+I | US Dollar | 1 | 1970-01-01 08:00:03.000 | 102 | 102 |
+I | Yen | 50 | 1970-01-01 08:00:04.000 | 1 | 50 |
+I | Euro | 2 | 1970-01-01 08:00:02.000 | 114 | 228 |
+I | Euro | 3 | 1970-01-01 08:00:05.000 | 116 | 348 |
+I | Euro | 5 | 1970-01-01 08:00:09.000 | 119 | 595 |
- 其余
在Flink v14.3中registerTable这个API曾经不倡议应用了,能够改为 tEnv.createTemporaryView
参考资料
Apache Flink 漫谈系列(11) - Temporal Table JOIN https://developer.aliyun.com/...
动静表 https://nightlies.apache.org/...动静表-dynamic-table
Apache Flink 漫谈系列 - Watermark是个啥? https://mp.weixin.qq.com/s?__...
Apache Flink 漫谈系列(09) - JOIN 算子 https://developer.aliyun.com/...
flink教程-聊聊 flink 1.11 中新的水印策略 https://cloud.tencent.com/dev...
聊聊flink的TableFunction https://segmentfault.com/a/11...
Event Time Temporal Join https://nightlies.apache.org/...
DataStream API https://nightlies.apache.org/...
《flink内核原理与实现》 4.4章节 水印
《Flink原理、实战与性能优化》4.2章节 工夫概念与Watermark
Watermark 机制 https://www.bilibili.com/vide...