关于flink:Flink-temporal-table-join研究

9次阅读

共计 29694 个字符,预计需要花费 75 分钟才能阅读完成。

作者: 王东阳
前言
ANSI-SQL 2011 中提出了 Temporal 的概念,Oracle,SQLServer,DB2 等大的数据库厂商也先后实现了这个规范。Temporal Table 记录了历史上任何工夫点所有的数据改变,Temporal Table 具备一般 table 的个性,有具体独特的 DDL/DML/QUERY 语法,工夫是其外围属性。历史意味着工夫,意味着快照 Snapshot。

Apache Flink 遵循 ANSI-SQL 规范,Apache Flink 中 Temporal Table 的概念也源于 ANSI-2011 的规范语义,但目前的实现在语法层面和 ANSI-SQL 略有差异,下面看到 ANSI-2011 中应用 FOR SYSTEM_TIME AS OF 的语法,Apache Flink 在晚期版本中仅仅反对 LATERAL TABLE(TemporalTableFunction)的语法,以后 flinkv14 版本中曾经反对 FOR SYSTEM_TIME AS OF 语法。

因为 Flink 中基于 eventtime 的 temporal table join 基于 flink 的 watermark 机制实现,为了更好的让读者了解,本文首先介绍 flink 中的 动静表和时序表,工夫概念,Watermark 等相干常识,最初通过具体的代码用例介绍 Flink 中基于 eventtime 的 temporal table join 用法。

动静表和时序表
动静表
什么是动静表
动静表 是 Flink 的反对流数据的 Table API 和 SQL 的外围概念。与示意批处理数据的动态表不同,动静表是随工夫变动的。能够像查问动态批处理表一样查问它们。查问动静表将生成一个 间断查问。一个间断查问永远不会终止,后果会生成一个动静表。查问不断更新其 (动静) 后果表,以反映其 (动静) 输出表上的更改。实质上,动静表上的间断查问十分相似于定义物化视图的查问。

动静表能够像一般数据库表一样通过 INSERT、UPDATE 和 DELETE 来一直批改。它可能是一个只有一行、不断更新的表,也可能是一个 insert-only 的表,没有 UPDATE 和 DELETE 批改,或者介于两者之间的其余表。

动静表转换
在将动静表转换为流或将其写入内部零碎时,须要对这些更改进行编码。Flink 的 Table API 和 SQL 反对三种形式来编码一个动静表的变动:

Append-only 流:仅通过 INSERT 操作批改的动静表能够通过输入插入的行转换为流。
Retract 流:retract 流蕴含两种类型的 message:add messages 和 retract messages。通过将 INSERT 操作编码为 add message、将 DELETE 操作编码为 retract message、将 UPDATE 操作编码为更新 (先前) 行的 retract message 和更新 (新) 行的 add message,将动静表转换为 retract 流。下图显示了将动静表转换为 retract 流的过程。

Upsert 流: upsert 流蕴含两种类型的 message:upsert messages 和 delete messages。转换为 upsert 流的动静表须要 (可能是组合的) 惟一键。通过将 INSERT 和 UPDATE 操作编码为 upsert message,将 DELETE 操作编码为 delete message,将具备惟一键的动静表转换为流。生产流的算子须要晓得惟一键的属性,以便正确地利用 message。与 retract 流的次要区别在于 UPDATE 操作是用单个 message 编码的,因而效率更高。下图显示了将动静表转换为 upsert 流的过程。

Flink 在将动静表转换为 DataStream 时,只反对 append 流和 retract 流。前面的样例代码中会展现转换的 API 以及 Retract 流和 Upsert 流的不同。

时态表(Temporal Tables)
时态表(Temporal Table)是一张随工夫变动的表, 在 Flink 中称为动静表,时态表中的每条记录都关联了一个或多个时间段,所有的 Flink 表都是时态的(动静的)。也就是说时态表是动静表的特例,时态表肯定是动静表,动静表不肯定是时态表。
时态表蕴含表的一个或多个有版本的表快照,时态表能够是一张跟踪所有变更记录的表(例如数据库表的 changelog,蕴含多个表快照),也能够是物化所有变更之后的表(例如数据库表,只有最新表快照)。
Flink 应用主键束缚和事件工夫来定义一张版本表和版本视图,在前面介绍 temporal join 的相干样例中会展现这两种。
样例代码

  1. 环境初始化
    首先初始化 StreamExecutionEnvironment env 和 StreamTableEnvironment tEnv,如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

env.setRuntimeMode(RuntimeExecutionMode.STREAMING); 设置以后运行模式为 STREAMING 模式
env.setParallelism(1); 设置并行度是 1 次要是为测试的目标,便于察看 join 的后果
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 设置工夫属性是 EventTime

  1. 首先创立并注册一个一般表 RatesHistory
    DataStream<Row> ratesStream = env.fromElements(

         Row.of(LocalDateTime.parse("2021-08-21T09:02:00"), "US Dollar", 102),
         Row.of(LocalDateTime.parse("2021-08-21T09:00:00"), "Euro", 114),
         Row.of(LocalDateTime.parse("2021-08-21T09:00:00"), "Yen", 1),
         Row.of(LocalDateTime.parse("2021-08-21T10:45:00"), "Euro", 116),
         Row.of(LocalDateTime.parse("2021-08-21T11:15:00"), "Euro", 119),
         Row.of(LocalDateTime.parse("2021-08-21T11:49:00"), "Pounds", 108))
     .returns(
         Types.ROW_NAMED(new String[] {"currency_time", "currency", "rate"},
             Types.LOCAL_DATE_TIME, Types.STRING, Types.INT));
    
Table rateTable = tEnv.fromDataStream(ratesStream, Schema.newBuilder().build());
tEnv.registerTable("RatesHistory", rateTable);
rateTable.printSchema();
tEnv.from("RatesHistory").execute().print();

失去 RatesHistory 的 schema 信息以及表中内容:

(
currency_time TIMESTAMP(9),
currency STRING,
rate INT
)

op currency_time currency rate
+I 2021-08-21 09:02:00.000000000 US Dollar 102
+I 2021-08-21 09:00:00.000000000 Euro 114
+I 2021-08-21 09:00:00.000000000 Yen 1
+I 2021-08-21 10:45:00.000000000 Euro 116
+I 2021-08-21 11:15:00.000000000 Euro 119
+I 2021-08-21 11:49:00.000000000 Pounds 108

6 rows in set

  1. 申明版本表
    在 Flink 中,定义了主键束缚和事件工夫属性的表就是版本表。相比下面的代码,在应用 fromDataStream 的第二个参数 Schema 外面,通过 columnByExpression 指定事件工夫的工夫戳(flink 中要求必须是 TIMESTAMP(3)), 通过 primaryKey(“currency”) 指定 currency 主键束缚。

// version table

Table versionedTable = tEnv.fromDataStream(ratesStream, Schema.newBuilder()
        .columnByExpression("rowtime", "CAST(currency_time AS TIMESTAMP(3))")
        .primaryKey("currency")
    .build());
tEnv.registerTable("versionRate", versionedTable);
System.out.println("versioned table get");
versionedTable.printSchema();
tEnv.from("versionRate").execute().print();

打印 versionRate 的 schema 信息以及表中内容:

(
currency_time TIMESTAMP(9),
currency STRING NOT NULL,
rate INT,
rowtime TIMESTAMP(3) AS CAST(currency_time AS TIMESTAMP(3)),
CONSTRAINT PK_currency PRIMARY KEY (currency) NOT ENFORCED
)

op currency_time currency rate rowtime
+I 2021-08-21 09:02:00.000000000 US Dollar 102 2021-08-21 09:02:00.000
+I 2021-08-21 09:00:00.000000000 Euro 114 2021-08-21 09:00:00.000
+I 2021-08-21 09:00:00.000000000 Yen 1 2021-08-21 09:00:00.000
+I 2021-08-21 10:45:00.000000000 Euro 116 2021-08-21 10:45:00.000
+I 2021-08-21 11:15:00.000000000 Euro 119 2021-08-21 11:15:00.000
+I 2021-08-21 11:49:00.000000000 Pounds 108 2021-08-21 11:49:00.000
  1. 申明版本视图
    Flink 也反对定义版本视图只有一个视图蕴含主键和事件工夫便是一个版本视图。为了在 RatesHistory 上定义版本表,Flink 反对通过去重查问定义版本视图,去重查问能够产出一个有序的 changelog 流,去重查问可能推断主键并保留原始数据流的事件工夫属性。

// https://nightlies.apache.org/…

Table versionedRateView = tEnv.sqlQuery("select currency, rate, currency_time" + // (1) `currency_time` 保留了事件工夫
        "from (" +
        "select *," +
        "ROW_NUMBER() OVER (PARTITION BY currency" + //(2) `currency` 是去重 query 的 unique key,作为主键
        "ORDER BY currency_time DESC) AS rowNum" +
        "FROM RatesHistory )" +
        "WHERE rowNum = 1");
tEnv.createTemporaryView("versioned_rates", versionedRateView);
versionedRateView.printSchema();
tEnv.from("versioned_rates").execute().print();

对于去重语法中的相干参数形容如下

Parameter Specification:

ROW_NUMBER(): 给每一行调配一个从 1 开始的递增的惟一的序号。
PARTITION BY col1[, col2…]: 指定分区列.
ORDER BY time_attr [asc|desc]: 指定排序所基于的列, 必须是 time attribute. 以后 Flink 反对 processing time attribute 和 event time attribute. Ordering by ASC 象征保留最老的那列, ordering by DESC 象征保留最新的那列.
WHERE rownum = 1: rownum = 1 用于 Flink 获取到去重后的数据。
行 (1) 保留了事件工夫作为视图 versioned_rates 的事件工夫,行 (2) 使得视图 versioned_rates 有了主键, 因而视图 versioned_rates 是一个版本视图。

视图中的去重 query 会被 Flink 优化并高效地产出 changelog stream, 产出的 changelog 保留了主键束缚和事件工夫。

打印 schema 和 versioned_rates 表中内容

(
currency STRING,
rate INT,
currency_time TIMESTAMP(9)
)

op currency rate currency_time
+I US Dollar 102 2021-08-21 09:02:00.000000000
+I Euro 114 2021-08-21 09:00:00.000000000
+I Yen 1 2021-08-21 09:00:00.000000000
-U Euro 114 2021-08-21 09:00:00.000000000
+U Euro 116 2021-08-21 10:45:00.000000000
-U Euro 116 2021-08-21 10:45:00.000000000
+U Euro 119 2021-08-21 11:15:00.000000000
+I Pounds 108 2021-08-21 11:49:00.000000000

能够看到 Euro 中一共有 5 条记录,属于 Retract 类型的 changelogstream,蕴含 UPDATE_BEFORE, UPDATE_AFTER 两种

| +I | Euro | 114 | 2021-08-21 09:00:00.000000000 |
| -U | Euro | 114 | 2021-08-21 09:00:00.000000000 |
| +U | Euro | 116 | 2021-08-21 10:45:00.000000000 |
| -U | Euro | 116 | 2021-08-21 10:45:00.000000000 |
| +U | Euro | 119 | 2021-08-21 11:15:00.000000000 |
这 5 条记录的具体生成过程如下:

当第一条 Euro 数据 LocalDateTime.parse(“2021-08-21T09:00:00”), “Euro”, 114 进入到 versioned_rates 所对应的底层 Flink 算子中时,因为以后 Euro 只有这一条记录,执行去重语法后就失去 Euro 114 2021-08-21 09:00:00.000000000
当第二条 Euro 数据 LocalDateTime.parse(“2021-08-21T10:45:00”), “Euro”, 116 进入到 versioned_rates 所对应的底层 Flink 算子中时,执行去重语法获取最新的记录就失去 Euro 116 2021-08-21 10:45:00.000000000,因为 currency 是主键,所以执行更新操作,生成 UPDATE_BEFORE,UPDATE_AFTER 两条 changelogstream。
当第三条 Euro 数据 LocalDateTime.parse(“2021-08-21T11:15:00”), “Euro”, 119 进入到 versioned_rates 所对应的底层 Flink 算子中时,执行去重语法获取最新的记录就失去 Euro 119 2021-08-21 11:15:00.000000000,因为 currency 是主键,所以执行更新操作,生成 UPDATE_BEFORE,UPDATE_AFTER 两条 changelogstream。

  1. 通过 table API 显示转为 datastream
    flink API 中提供了 toChangelogStream 接口

// default retract mode

tEnv.toChangelogStream(versionedRateView)
    .executeAndCollect()
    .forEachRemaining(System.out::println);

tEnv.toChangelogStream(versionedRateView, Schema.newBuilder().build(), ChangelogMode.upsert())
    .executeAndCollect()
    .forEachRemaining(System.out::println);

+I[US Dollar, 102, 2021-08-21T09:02]
+I[Euro, 114, 2021-08-21T09:00]
+I[Yen, 1, 2021-08-21T09:00]
-U[Euro, 114, 2021-08-21T09:00]
+U[Euro, 116, 2021-08-21T10:45]
-U[Euro, 116, 2021-08-21T10:45]
+U[Euro, 119, 2021-08-21T11:15]
+I[Pounds, 108, 2021-08-21T11:49]
失去输入

+I[US Dollar, 102, 2021-08-21T09:02]
+I[Euro, 114, 2021-08-21T09:00]
+I[Yen, 1, 2021-08-21T09:00]
+U[Euro, 116, 2021-08-21T10:45]
+U[Euro, 119, 2021-08-21T11:15]
+I[Pounds, 108, 2021-08-21T11:49]
水印 Watermark
此章节次要参考

《flink 内核原理与实现》4.4 章节 水印
《Flink 原理、实战与性能优化》4.2 章节 工夫概念与 Watermark
工夫概念
对于流式数据处理,最大的特点是数据上具备工夫的属性特色,Flimk 依据工夫产生的地位不同,将工夫辨别为三种工夫概念,别离为 事件生成工夫(Event Time)、事件接入工夫(Ingestion Time)和事件处理工夫(Processing Time)

1. 事件工夫(Event Time)
事件工夫(Event Time)是每个独立事件在产生它的设施上产生的工夫,这个工夫通常在事件进入 Flink 之前就曾经嵌入到事件中,时 间程序取决于事件产生的中央,和上游数据处理系统的工夫无关。事件数据具备不变的事件工夫属性,该工夫自事件元素产生就不会改 变。通常状况下能够在 Flink 零碎中指定事件工夫属性或者设定工夫提取器来提取事件工夫。

所有进入到 Flink 流式零碎解决的事件,其工夫都是在内部零碎中产生,通过网络进入到 Flink 零碎内解决的,在实践状况下(所有零碎 都具备雷同零碎时钟),事件工夫对应的工夫戳肯定会早于在 Flink 零碎中解决的工夫戳,但在理论状况中往往会呈现数据记录乱序、提早 达到等问题。基于 EventTime 的工夫概念,数据处理过程依赖于数据自身产生的工夫,而不是 Flink 零碎中 Operator 所在主机节点的零碎工夫,这样可能借助于事件产生时的工夫信息来还原事件的先后关系。

2. 接入工夫(Ingestion Time)
接入工夫(Ingestion Time)是数据进入 Flink 零碎的工夫,Ingestion Time 依赖于 Source Operator 所在主机的零碎时钟。Ingestion Time 介于 Event Time 和 Process Time 之间,绝对于 ProcessTime,Ingestion Time 生成的代价绝对较高,Ingestion Time 具备一 定的可预见性,次要因为 Ingestion Time 在数据接入过程生成后,工夫戳就不再发生变化,和后续数据处理 Operator 所在机器的时钟没有 关系,从而不会因为某台机器时钟不同步或网络时延而导致计算结果不精确的问题。然而须要留神的是相比于 Event Time,Ingestion Time 不能解决乱序事件,所以也就不必生成对应的 Watermarks

3. 解决工夫(Processing Time)
解决工夫(Processing Time)是指数据在操作算子计算过程中获取到的所在主机工夫。当用户抉择应用 Processing Time 时,所有和时 间相干的计算算子,例如 Windows 计算,在以后的工作中所有的算子将间接应用其所在主机的零碎工夫。Processing Time 是 Flink 零碎中最简略的一种工夫概念,基于 Processing Time 工夫概念,Flink 的程序性能绝对较高,延时也绝对较低,对接入到零碎中的数据工夫相干的计算齐全交给算子外部决定,工夫窗口计算依赖的工夫都是在具体算子运行的过程中产生,不须要做任何工夫上的比照和协调。但 Processing Time 工夫概念尽管在性能和易用性的角度上具备劣势,但思考到对数据乱序解决的状况,Processing Time 就不是最优的抉择。同时在分布式系统中,数据自身不乱序,但每台机器的工夫如果不同步,也可能导致数据处理过程中数据乱序的问题,从而影响计算结果。总之,Processing Time 概念实用于工夫计算精度要求不是特地高的计算场景,例如统计某些延时十分高的日志数据等。

4. 工夫概念指定
在 Flink 中默认状况下应用是 Process Time 工夫概念,如果用户抉择应用 Event Time 或者 Ingestion Time 概念,则须要在创立的 StreamExecutionEnvironment 中调用 setStreamTimeCharacteristic()办法设定零碎的工夫概念,如下代码应用 TimeCharacteristic.EventTime 作为零碎的工夫概念,这样对以后的 StreamExecutionEnvironment 会全局失效

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

setStreamTimeCharacteristic 在 Flink1.14 中曾经不倡议应用了,进入到 setStreamTimeCharacteristic 底层源码,能够看到底层是调用了 getConfig().setAutoWatermarkInterval

@Deprecated
@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {

this.timeCharacteristic = (TimeCharacteristic)Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {this.getConfig().setAutoWatermarkInterval(0L);
} else {this.getConfig().setAutoWatermarkInterval(200L);
}

}
5. 如何解决乱序问题
这里援用下 Apache Flink 漫谈系列 – Watermark 是个啥?这篇文章中的实例图片

当 Watermark 的工夫戳等于 Event 中携带的 EventTime 时候,也就是(Watermark=EventTime) 的计算形式如下

因为 watermark = eventtime, 所以 在 Eventtime 等于 15s 的时候,windows2[10,15) 这个窗口就触发了进行了计算,而后在 16s 的时候乱序的 11 号数据达到,被抛弃。

如果想正确处理迟来的数据能够定义 Watermark 生成策略为 Watermark = EventTime -5s,如下

因为 Watermark = EventTime -5s,在 Eventtime 为 15s 的时候,Watermark 等于 15-5=10s, 并不会触发 Window2 窗口计算,只有在 Eventtime 为 20s 的时候,Watermark 等于 20-5 = 15s,触发 Window2[10, 15) 的计算,因为乱序的 11 号记录在之前的 16s 时候达到了,所以 Window2 的计算蕴含了 11 号记录。

Watermark 生成
从流解决原始设施产生事件,到 Flink 读取到数据,再到 Flink 多个算子解决数据,在这个过程中,会受到网络提早、数据乱序、背压、Failover 等多种状况的影响,导致数据是乱序的。尽管大部分状况下没有问题,然而不得不在设计上思考此类异常情况,为了保障计算结果的正确性,须要期待数据,这带来了计算的提早。对于提早太久的数据,不能无限期地等上来,所以必须有一个机制,来保障特定的工夫后肯定会触发窗口进行计算。

比方基于事件工夫的 Window 创立后,具体该如何确定属于该 Window 的数据元素曾经全副达到。如果确定全副达到,就能够对 Window 的所有数据做窗口计算操作(如汇总、分组等),如果数据没有全副达到,则持续期待该窗口中的数据全副达到才开始解决。这种状况下就须要用到水位线(WaterMarks)机制,它可能掂量数据处理进度(表白数据达到的完整性),保障事件数据(全副)达到 Flink 零碎,或者在乱序及提早达到时,也可能像预期一样计算出正确并且间断的后果。Flink 会将用读取进入零碎的最新事件工夫减去固定的工夫距离作为 Watermark,该工夫距离为用户内部配置的反对最大提早达到的工夫长度,也就是说实践上认为不会有事件超过该距离达到,否则就认为是早退事件或异样事件。

通常 Watermark 在 Source Function 中生成,如果是并行计算的工作,在多个并行执行的 Source Function 中,互相独立产生各自的 Watermark。而 Flink 提供了额定的机制,容许在调用 DataStream API 操作(如 map、filter 等)之后,依据业务逻辑的须要,应用工夫戳和 Watermark 生成器批改数据记录的工夫戳和 Watermark。

  1. Source Function
    Source Function 能够间接为数据元素调配工夫戳,同时也会向上游 发 送 Watermark。在 Source Function 中 为 数 据 分 配 了 时 间 戳 和 Watermark 就不用在 DataStream API 中应用了。须要留神的是: 如果一个 timestamp 分 配 器 被 使 用 的 话,由 源 提 供 的 任 何 Timestamp 和 Watermark 都会被重写。

public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {

public void run(SourceContext<T> ctx) {
    ...
    T next = getNext();
    // 为元素赋予工夫戳
    ctx.collectWithTimestamp(next, next.getEventTimestamp());
    // 生成 Watermark 发送到上游
    if (next.hasWatermarkTime()){ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
    }
    ...

}

}
另外须要留神下官网源码中的一段正文:

  • Sources may assign timestamps to elements and may manually emit watermarks.

    • However, these are only interpreted if the streaming program runs on
    • {@link TimeCharacteristic#EventTime}. On other time characteristics
    • ({@link TimeCharacteristic#IngestionTime} and {@link TimeCharacteristic#ProcessingTime}),
    • the watermarks from the source function are ignored.

    如果以后 flink 环境中 TimeCharacteristic 设置的工夫概念是 EventTime, Source function 会为元素调配 timestamp 和 触发 watermark
    如果以后 flink 环境中 TimeCharacteristic 设置的工夫概念是 IngestionTime 或 ProcessingTime, 那么 source function 中产生的 watermark 会疏忽掉。

  • DataStream API
    DataStream API 中应用的 TimestampAssigner 接口定义了工夫戳的提 取 行 为,其 有 两 个 不 同 接 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks,别离代表了不同的 Watermark 生成策略

周期性 Watermark 策略 : 在 Flink 中 叫 作 PeriodicWatermarkAssigner,周期性(肯定工夫距离或者达到肯定的记 录 条 数)地 产 生 一 个 Watermark。在 实 际 的 生 产 中 使 用 周 期 性 Watermark 策略的时候,必须留神工夫和数据量,联合工夫和积攒条数两个维度持续周期性产生 Watermark,否则在极其状况下会有很大的延时。这个策略又能够分为两种:
AscendingTimestamps:递增 Watermark,作用在 Flink SQL 中的 Rowtime 属性上,Watermark= 以后收到的数据元素的最大工夫戳 -1,此处减 1 的目标是确保有最大工夫戳的事件不会被当做早退数据抛弃。
BoundedOutOfOrderTimestamps:固定提早 Watermark,作用在 Flink SQL 的 Rowtime 属性上,Watermark= 以后收到的数据元素的最大工夫戳 - 固定提早。
每事件 Watermark 策略 : 在 Flink 中 叫 作 PuntuatedWatamarkAssigner,对每一个事件都会尝试进行 Watermark 的生成,然而如果生成的 Watermark 是 null 或者 Watermark 小于之前的 Watermark,则该 Watermark 不会发往上游,因为发往上游也不会有任何成果,不会触发任何窗口的执行。在理论的生产中 Punctuated 形式在 TPS 很高的场景下会产生大量的 Watermark,在肯定水平上会对上游算子造成压力,所以只有在实时性要求十分高的场景下才会抉择 Punctuated 的形式进行 Watermark 的生成。

  1. 多流的 Watermark
    Watermark 在 作 业 的 DAG 从 上 游 向 下 游 传 递,算 子 收 到 上 游 Watermark 后会更新其 Watermark。如果新的 Watermark 大于算子的以后 Watermark,则更新算子的 Watermark 为新 Watermark,并发送给上游算子。Watermark 是在 Source Function 中生成或者在后续的 DataStreamAPI 中生成的。

Flink 作业个别是并行执行的,作业蕴含多个 Task,每个 Task 运行一个或一组算子(OperatorChain)实例,Task 在生成 Watermark 的时候是互相独立的,也就是说在作业中存在多个并行的 Watermark。某些算子会有多个上游输出,如 Union 或 keyBy、partition 之后的算子。在 Flink 的底层执行模型上,多流输出会被合成为多个双流输出,所以对于多流 Watermark 的解决也就是双流 Watermark 的解决,无论是哪一个流的 Watermark 进入算子,都须要跟另一个流的以后算子进行 比 较,选 择 较 小 的 Watermark,即 Min(input1Watermark,intput2Watermark,与 算 子 当 前 的 Watermark 比 较,如 果 大 于 算 子 当 前 的 Watermark,则 更 新 算 子 的 Watermark 为新的 Watermark,并发送给上游

如上图,Source 算子产生各自的 Watermark,并随着数据流流向上游的 map 算子,map 算子是无状态计算,所以会将 Watermark 向下透 传。window 算子收到上游两个输出的 Watermark 后,抉择其中较小的一个发送给上游,window(1)算子比拟 Watermark 29 和 Watermark 14,抉择 Watermark 14 作为算子以后 Watermark,并将 Watermark 14 发往上游,window(2)算子也采纳雷同的逻辑。

  1. 新的水印策略
    在 flink 1.11 之前的版本中,提供了两种生成水印(Watermark)的策略,别离是 AssignerWithPunctuatedWatermarks 和 AssignerWithPeriodicWatermarks,这两个接口都继承自 TimestampAssigner 接口。

用户想应用不同的水印生成形式,则须要实现不同的接口,然而这样引发了一个问题,对于想给水印增加一些通用的、公共的性能则变得复杂,因为咱们须要给这两个接口都同时增加新的性能,这样还造成了代码的反复。

所以为了防止代码的反复,在 flink 1.11 中对 flink 的水印生成接口进行了重构,

4.1 新的水印生成接口
当咱们构建了一个 DataStream 之后,应用 assignTimestampsAndWatermarks 办法来结构水印,新的接口须要传入一个 WatermarkStrategy 对象。

DataStream#assignTimestampsAndWatermarks(WatermarkStrategy<T>)
WatermarkStrategy 这个接口是做什么的呢?这外面提供了很多动态的办法和带有缺省实现的办法,只有一个办法是非 default 和没有缺省实现的,就是上面的这个办法。

/**

  • Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
    */

@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
所以默认状况下,咱们只须要实现这个办法就行了,这个办法次要是返回一个 WatermarkGenerator,咱们在进入这里边看看。

@Public
public interface WatermarkGenerator<T> {

/**

  • Called for every event, allows the watermark generator to examine and remember the
  • event timestamps, or to emit a watermark based on the event itself.
    */

void onEvent(T event, long eventTimestamp, WatermarkOutput output);

/**

  • Called periodically, and might emit a new watermark, or not.
    *
  • <p>The interval in which this method is called and Watermarks are generated
  • depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
    */

void onPeriodicEmit(WatermarkOutput output);
}
这个办法简单明了,次要是有两个办法:

onEvent:每个元素都会调用这个办法,如果咱们想依赖每个元素生成一个水印,而后发射到上游 (可选,就是看是否用 output 来收集水印),咱们能够实现这个办法.
onPeriodicEmit : 如果数据量比拟大的时候,咱们每条数据都生成一个水印的话,会影响性能,所以这里还有一个周期性生成水印的办法。这个水印的生成周期能够这样设置:env.getConfig().setAutoWatermarkInterval(5000L);
咱们本人实现一个简略的周期性的发射水印的例子:

在这个 onEvent 办法里,咱们从每个元素里抽取了一个工夫字段,然而咱们并没有生成水印发射给上游,而是本人保留了在一个变量里,在 onPeriodicEmit 办法里,应用最大的日志工夫减去咱们想要的延迟时间作为水印发射给上游。

DataStream<Tuple2<String,Long>> withTimestampsAndWatermarks = dataStream.assignTimestampsAndWatermarks(

new WatermarkStrategy<Tuple2<String,Long>>(){
 @Override
 public WatermarkGenerator<Tuple2<String,Long>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context){return new WatermarkGenerator<Tuple2<String,Long>>(){
   private long maxTimestamp;
   private long delay = 3000;
   @Override
   public void onEvent(
     Tuple2<String,Long> event,
     long eventTimestamp,
     WatermarkOutput output){maxTimestamp = Math.max(maxTimestamp, event.f1);
   }
   @Override
   public void onPeriodicEmit(WatermarkOutput output){output.emitWatermark(new Watermark(maxTimestamp - delay));
   }
  };
 }
});

4.2 内置水印生成策略
为了不便开发,flink 提供了一些内置的水印生成办法供咱们应用。

4.2.1 固定提早生成水印
通过静态方法 forBoundedOutOfOrderness 提供, 入参接管一个 Duration 类型的工夫距离,也就是咱们能够承受的最大的延迟时间. 应用这种提早策略的时候须要咱们对数据的延迟时间有一个大略的预估判断。

WatermarkStrategy#forBoundedOutOfOrderness(Duration maxOutOfOrderness)
咱们实现一个提早 3 秒的固定提早水印,能够这样做:

DataStream dataStream = …… ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));
他的底层应用的 WatermarkGenerator 接口的一个实现类 BoundedOutOfOrdernessWatermarks。

@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}

@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp – outOfOrdernessMillis – 1));
}
4.2.2 枯燥递增生成水印
通过静态方法 forMonotonousTimestamps 来提供.

WatermarkStrategy.forMonotonousTimestamps()
这个也就是相当于上述的提早策略去掉了延迟时间,以 event 中的工夫戳充当了水印。

在程序中能够这样应用:

DataStream dataStream = …… ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
它的底层实现是 AscendingTimestampsWatermarks,其实它就是 BoundedOutOfOrdernessWatermarks 类的一个子类,没有了延迟时间,咱们来看看具体源码的实现.

@Public
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {

/**

  • Creates a new watermark generator with for ascending timestamps.
    */

public AscendingTimestampsWatermarks() {
super(Duration.ofMillis(0));
}
}
4.2.3 eventtime 的获取
上述咱们讲了 flink 自带的两种水印生成策略,然而对于咱们应用 eventtime 语义的时候,咱们想从咱们的本人的数据中抽取 eventtime,这个就须要 TimestampAssigner 了.

@Public
@FunctionalInterface
public interface TimestampAssigner<T> {

............

long extractTimestamp(T element, long recordTimestamp);
}
应用的时候咱们次要就是从咱们本人的元素 element 中提取咱们想要的 eventtime。

应用 flink 自带的水印策略和 eventtime 抽取类,能够这样用:

List<Tuple3<String, Long, Timestamp>> rateHistoryData = Lists.newArrayList();

rateHistoryData.add(new Tuple3<>("US Dollar", 102L, new Timestamp(1L)));
rateHistoryData.add(new Tuple3<>("Euro", 114L, new Timestamp(1L)));
rateHistoryData.add(new Tuple3<>("Yen", 1L, new Timestamp(1L)));
rateHistoryData.add(new Tuple3<>("Euro", 116L, new Timestamp(5L)));
rateHistoryData.add(new Tuple3<>("Euro", 119L, new Timestamp(7L)));

DataStream<Tuple3<String, Long, Timestamp>> rateStream = env.fromCollection(rateHistoryData)
    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            // here  <Tuple3< String, Long, Timestamp> is needed for using withTimestampAssigner
            .<Tuple3<String, Long, Timestamp>>forBoundedOutOfOrderness(Duration.ofSeconds(10))
            .withTimestampAssigner((event, timestamp) -> event.f2.getTime())
    );

其中.withTimestampAssigner((event, timestamp) -> event.f2.getTime()) 中的(event, timestamp) -> event.f2.getTime() 是一个闭包,用来标识如何从元素中失去 eventtime. 在这个例子中,元素类型是元组 Tuple3, 该元组中第三个元素也就是对应的 f2 就是事件的产生工夫。

Event Time Temporal Join
概念介绍
Event Time Temporal joins 能够利用 versioned table 执行 join 操作,也就是说一个表能够应用变动中的元素进行加宽,通过查问在特定工夫点上值。

Temporal joins 承受一个左表,会把左表中每一行数据在右表(versioned table)中找到对应的行数据进行关联。Flink 应用 SQL:2011 规范中的 FOR SYSTEM_TIME AS OF SQL 语法执行这个操作。Temporal joins 的语法如下

SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{proctime | rowtime} [AS <alias2>]
ON table1.column-name1 = table2.column-name1
借助 event-time 属性(也称为 rowtime 属性),就能够检索失去特定 key 在过来某个工夫点上的值,从而能够把两个表中在雷同工夫点上的数据进行 join。versioned table 保留了不同版本也就是在不同工夫点的数据。

举例,假如咱们有一个订单的表,不同的订单中价格应用的货币不一样,而货币在不同时刻的汇率也不一样,为了应用对立的货币例如人民币计算各个订单的消费额,每一个订单须要和订单产生时的货币汇率进行 join。

外部实现原理
能够去 孙金城 大佬的文章 Apache Flink 漫谈系列(11) – Temporal Table JOIN,不过这篇文章比拟老了,外面的代码还是应用 LATERAL TABLE 语法进行 join,不过外面的外部实现原理的示意图倒是对钻研有肯定的参考意义。

注意事项
event-time temporal join 是通过左右输出流中的 watermark 触发的,所以要确保 join 两边的 watermark 设置正确;
The event-time temporal join 要求 temporal join 条件等式中必须蕴含主键;
样例代码

  1. Table function join
    Flink 晚期是应用 LATERAL TABLE 语法联合 Table function 实现的 Temporal table join , 这种形式没有 FOR SYSTEM_TIME AS OF 语法中对于 watermark 和主键的强制要求,具体应用形式能够看上面的代码:

1.0 环境初始化
首先初始化 StreamExecutionEnvironment env 和 StreamTableEnvironment tEnv,如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

env.setRuntimeMode(RuntimeExecutionMode.STREAMING); 设置以后运行模式为 STREAMING 模式
env.setParallelism(1); 设置并行度是 1 次要是为测试的目标,便于察看 join 的后果
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 设置工夫属性是 EventTime,
1.1 创立和注册左表 Orders
构建填充到 Orders 中的数据

List<Tuple3<Long, String, Timestamp>> orderData = Lists.newArrayList();

orderData.add(new Tuple3<>(2L, "Euro", new Timestamp(2L)));
orderData.add(new Tuple3<>(1L, "US Dollar", new Timestamp(3L)));
orderData.add(new Tuple3<>(50L, "Yen", new Timestamp(4L)));
orderData.add(new Tuple3<>(3L, "Euro", new Timestamp(5L)));
orderData.add(new Tuple3<>(5L, "Euro", new Timestamp(9L)));

基于 List 构建 DataStream,并通过 assignTimestampsAndWatermarks 设置工夫戳和 watermark,而后基于 DataStream 中构建 Table 并注册为表名 Orders。因为 Tuple3 构建的 DataStream 中的列名默认别离是 f0,f1,f2, 这里 $(“amount”), $(“currency”), $(“eventtime”).rowtime() 的作用就是把列名改为了 amount, currency, eventtime,同时将 eventtime 标记为 rowtime()。

DataStream<Tuple3<Long, String, Timestamp>> orderStream = env.fromCollection(orderData)

    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            .<Tuple3<Long, String, Timestamp>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner((event, timestamp) -> event.f2.getTime())
    );

Table orderTable = tEnv.fromDataStream(orderStream,
    $("amount"), $("currency"), $("eventtime").rowtime());// here we use rowtiime()
tEnv.registerTable("Orders", orderTable);

留神这里尽管应用了 assignTimestampsAndWatermarks 也设置了 Watermark,然而在本例中并没有用到,因为 LATERAL TABLE 语法只须要 Timestamp 提取 eventtime。

打印 Schema 和表外面的内容

orderTable.printSchema();//only for debug

tEnv.from("Orders").execute().print(); // only for debug

(
amount BIGINT,
currency STRING,
eventtime TIMESTAMP(3) ROWTIME
)

op amount currency eventtime
+I 2 Euro 1970-01-01 00:00:00.002
+I 1 US Dollar 1970-01-01 00:00:00.003
+I 50 Yen 1970-01-01 00:00:00.004
+I 3 Euro 1970-01-01 00:00:00.005
+I 5 Euro 1970-01-01 00:00:00.009

1.2 创立和注册右表 RatesHistory
构建数据

List<Tuple3<String, Long, Timestamp>> rateHistoryData = Lists.newArrayList();

rateHistoryData.add(new Tuple3<>("US Dollar", 102L, new Timestamp(1L)));
rateHistoryData.add(new Tuple3<>("Euro", 114L, new Timestamp(1L)));
rateHistoryData.add(new Tuple3<>("Yen", 1L, new Timestamp(1L)));
rateHistoryData.add(new Tuple3<>("Euro", 116L, new Timestamp(5L)));
rateHistoryData.add(new Tuple3<>("Euro", 119L, new Timestamp(7L)));

基于 List 构建 DataStream,并通过 assignTimestampsAndWatermarks 设置工夫戳和 watermark,而后基于 DataStream 中构建 Table 并注册为表名 RatesHistory。因为 Tuple3 构建的 DataStream 中的列名默认别离是 f0,f1,f2, 这里 $(“amount”), $(“currency”), $(“eventtime”).rowtime() 的作用就是把列名改为了 currency, rate, eventtime,同时将 eventtime 标记为 rowtime()。

DataStream<Tuple3<String, Long, Timestamp>> rateStream = env.fromCollection(rateHistoryData)

    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            // here  <Tuple3< String, Long, Timestamp> is needed for using withTimestampAssigner
            .<Tuple3<String, Long, Timestamp>>forBoundedOutOfOrderness(Duration.ofSeconds(10))
            .withTimestampAssigner((event, timestamp) -> event.f2.getTime())
    );

Table rateTable = tEnv.fromDataStream(rateStream,
    $("currency"), $("rate"), $("eventime").rowtime());
tEnv.registerTable("RatesHistory", rateTable);

打印 Schema 和表外面的内容

rateTable.printSchema();//only for debug

tEnv.from("RatesHistory").execute().print(); // only for debug

(
currency STRING,
rate BIGINT,
eventime TIMESTAMP(3) ROWTIME
)

op currency rate eventime
+I US Dollar 102 1970-01-01 00:00:00.001
+I Euro 114 1970-01-01 00:00:00.001
+I Yen 1 1970-01-01 00:00:00.001
+I Euro 116 1970-01-01 00:00:00.005
+I Euro 119 1970-01-01 00:00:00.007

1.3 注册表函数
基于 rateTable 创立和注册为表函数 Rates,如下:

tEnv.registerFunction(

    "Rates",
    rateTable.createTemporalTableFunction("eventime", "currency"));

须要留神 createTemporalTableFunction 的第一个参数传入工夫戳所在列,第二个参数传入两表联结时进行匹配的列,例如在咱们的例子中,Orders 表和 RatesHistory 表基于 currency 进行匹配。

1.4 执行联结查问
执行上面的联结查问语句:

String sqlQuery =

    "SELECT o.eventtime, o.currency, o.amount, r.rate," +
        "o.amount * r.rate as amount_sum" +
        "from" +
        "Orders AS o," +
        "LATERAL TABLE (Rates(o.eventtime)) AS r" +
        "WHERE r.currency = o.currency";
tEnv.sqlQuery(sqlQuery).execute().print();

查问后果如下:

op eventtime currency amount rate amount_sum
+I 1970-01-01 00:00:00.003 US Dollar 1 102 102
+I 1970-01-01 00:00:00.004 Yen 50 1 50
+I 1970-01-01 00:00:00.002 Euro 2 114 228
+I 1970-01-01 00:00:00.005 Euro 3 116 348
+I 1970-01-01 00:00:00.009 Euro 5 119 595

能够看到在不同的 eventtime 上,Euro 对应的 rate 不一样,最终计算总和的时候采纳两边相应工夫点的数据进行计算。

  1. FOR SYSTEM_TIME AS OF
    2.0 环境初始化
    首先初始化 StreamExecutionEnvironment env 和 StreamTableEnvironment tEnv,如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

env.setRuntimeMode(RuntimeExecutionMode.STREAMING); 设置以后运行模式为 STREAMING 模式
env.setParallelism(1); 设置并行度是 1 次要是为测试的目标,便于察看 join 的后果
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 设置工夫属性是 EventTime,
2.1 创立和注册左表 Orders
构建填充到 Orders 中的数据

List<Tuple3<Long, String, Timestamp>> orderData = Lists.newArrayList();

orderData.add(new Tuple3<>(2L, "Euro", new Timestamp(2000L)));
orderData.add(new Tuple3<>(1L, "US Dollar", new Timestamp(3000L)));
orderData.add(new Tuple3<>(50L, "Yen", new Timestamp(4000L)));
orderData.add(new Tuple3<>(3L, "Euro", new Timestamp(5000L)));
orderData.add(new Tuple3<>(5L, "Euro", new Timestamp(9000L)));

基于 List 构建 DataStream,并通过 assignTimestampsAndWatermarks 设置工夫戳和 watermark.

DataStream<Tuple3<Long, String, Timestamp>> orderStream = env.fromCollection(orderData)

    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            .<Tuple3<Long, String, Timestamp>>forBoundedOutOfOrderness(Duration.ofSeconds(10000))
            .withTimestampAssigner((event, timestamp) -> event.f2.getTime())
    );

而后基于 DataStream 构建 Table 并注册为表名 Orders。

Table orderTable = tEnv.fromDataStream(orderStream, Schema.newBuilder()

        .columnByExpression("rowtime", "CAST(f2 AS TIMESTAMP(3))")
        .watermark("rowtime", "SOURCE_WATERMARK()")
    .build());

tEnv.registerTable("Orders", orderTable);

留神这里创立 Table 的 API 与后面 Table function join 的不同,fromDataStream 的第二个参数是 Schema 类型,这段代码达到的性能是:

主动从 DataStream 中继承所有的物理列
通过创立一个计算列来拜访流记录中的工夫戳,对应代码中 columnByExpression(这里是创立了一个 rowtime 属性列)
继承 DataStream 中已有的 watermark,对应代码中 (“rowtime”, “SOURCE_WATERMARK()”),这种用法的前提是 DataStream 中曾经通过定义了相干的 watermark 策略,在本例中咱们应用 assignTimestampsAndWatermarks 对 orderStream 增加了 watermark。
打印 Schema 和表外面的内容

orderTable.printSchema();//only for debug

tEnv.from("Orders").execute().print(); // only for debug

(
f0 BIGINT NOT NULL,
f1 STRING,
f2 TIMESTAMP(9),
rowtime TIMESTAMP(3) ROWTIME AS CAST(f2 AS TIMESTAMP(3)),
WATERMARK FOR rowtime: TIMESTAMP(3) AS SOURCE_WATERMARK()
)

op f0 f1 f2 rowtime
+I 2 Euro 1970-01-01 08:00:02.000000000 1970-01-01 08:00:02.000
+I 1 US Dollar 1970-01-01 08:00:03.000000000 1970-01-01 08:00:03.000
+I 50 Yen 1970-01-01 08:00:04.000000000 1970-01-01 08:00:04.000
+I 3 Euro 1970-01-01 08:00:05.000000000 1970-01-01 08:00:05.000
+I 5 Euro 1970-01-01 08:00:09.000000000 1970-01-01 08:00:09.000

能够看到 orderTable 的 Schema 中多了 上面两条属性信息,别离对应代码中设置的 rowtime 和 watermark

rowtime TIMESTAMP(3) ROWTIME AS CAST(f2 AS TIMESTAMP(3)),
WATERMARK FOR rowtime: TIMESTAMP(3) AS SOURCE_WATERMARK()
2.2 创立和注册右表 RatesHistory
构建填充到 RatesHistory 中的数据

List<Tuple3<String, Long, Timestamp>> rateHistoryData = Lists.newArrayList();

rateHistoryData.add(new Tuple3<>("US Dollar", 102L, new Timestamp(1000L)));
rateHistoryData.add(new Tuple3<>("Euro", 114L, new Timestamp(1000L)));
rateHistoryData.add(new Tuple3<>("Yen", 1L, new Timestamp(1000L)));
rateHistoryData.add(new Tuple3<>("Euro", 116L, new Timestamp(5000L)));
rateHistoryData.add(new Tuple3<>("Euro", 119L, new Timestamp(7000L)));

基于 List 构建 DataStream,并通过 assignTimestampsAndWatermarks 设置工夫戳和 watermark.

DataStream<Tuple3<String, Long, Timestamp>> rateStream = env.fromCollection(rateHistoryData)

    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            // here  <Tuple3< String, Long, Timestamp> is need for using withTimestampAssigner
            .<Tuple3<String, Long, Timestamp>>forBoundedOutOfOrderness(Duration.ofSeconds(1000))
            .withTimestampAssigner((event, timestamp) -> event.f2.getTime())
    );

而后基于 DataStream 构建 Table 并注册为表名 Orders。

Table rateTable = tEnv.fromDataStream(

    rateStream,
    Schema.newBuilder()
        .columnByExpression("rowtime", "CAST(f2 AS TIMESTAMP(3))")
        .watermark("rowtime", "SOURCE_WATERMARK()")
        .primaryKey("f0")  
        .build());

tEnv.createTemporaryView("RatesHistory", rateTable);

留神这里 RatesHistory 的 Schema 中多了一个 primaryKey(“f0”), 否则在执行 join 的时候会报错

Invalid primary key ‘PK_f0’. Column ‘f0’ is nullable.
因为在 Flink 中,如果要反对 FOR SYSTEM_TIME AS OF, 右表中必须要有主键,而且这个主键对应 join 操作中 on 等值匹配的列。

打印 Schema 和表外面的内容

rateTable.printSchema();//only for debug

tEnv.from("RatesHistory").execute().print();

(
f0 STRING NOT NULL,
f1 BIGINT NOT NULL,
f2 TIMESTAMP(9),
rowtime TIMESTAMP(3) ROWTIME AS CAST(f2 AS TIMESTAMP(3)),
WATERMARK FOR rowtime: TIMESTAMP(3) AS SOURCE_WATERMARK(),
CONSTRAINT PK_f0 PRIMARY KEY (f0) NOT ENFORCED
)

op f0 f1 f2 rowtime
+I US Dollar 102 1970-01-01 08:00:01.000000000 1970-01-01 08:00:01.000
+I Euro 114 1970-01-01 08:00:01.000000000 1970-01-01 08:00:01.000
+I Yen 1 1970-01-01 08:00:01.000000000 1970-01-01 08:00:01.000
+I Euro 116 1970-01-01 08:00:05.000000000 1970-01-01 08:00:05.000
+I Euro 119 1970-01-01 08:00:07.000000000 1970-01-01 08:00:07.000

能够看到 rateTable 的 Schema 中多了 上面三条属性信息,别离对应代码中设置的 rowtime(计算列), watermark 和 primaryKey

rowtime TIMESTAMP(3) ROWTIME AS CAST(f2 AS TIMESTAMP(3)),
WATERMARK FOR rowtime: TIMESTAMP(3) AS SOURCE_WATERMARK(),
CONSTRAINT PK_f0 PRIMARY KEY (f0) NOT ENFORCED
2.3 执行 join
String sqlQuery2 =

    "SELECT o.f1 as currency, o.f0 as amount, o.rowtime, r.f1 as rate," +
        "o.f0 * r.f1 as amount_sum" +
        "from" +
        "Orders AS o" +
        "JOIN RatesHistory FOR SYSTEM_TIME AS OF o.rowtime AS r" +
        "ON o.f1 = r.f0";
tEnv.sqlQuery(sqlQuery2).execute().print();

失去

op currency amount rowtime rate amount_sum
+I US Dollar 1 1970-01-01 08:00:03.000 102 102
+I Yen 50 1970-01-01 08:00:04.000 1 50
+I Euro 2 1970-01-01 08:00:02.000 114 228
+I Euro 3 1970-01-01 08:00:05.000 116 348
+I Euro 5 1970-01-01 08:00:09.000 119 595
  1. 其余
    在 Flink v14.3 中 registerTable 这个 API 曾经不倡议应用了,能够改为 tEnv.createTemporaryView

参考资料
Apache Flink 漫谈系列(11) – Temporal Table JOIN https://developer.aliyun.com/…
动静表 https://nightlies.apache.org/… 动静表 -dynamic-table
Apache Flink 漫谈系列 – Watermark 是个啥?https://mp.weixin.qq.com/s?__…
Apache Flink 漫谈系列 (09) – JOIN 算子 https://developer.aliyun.com/…
flink 教程 - 聊聊 flink 1.11 中新的水印策略 https://cloud.tencent.com/dev…
聊聊 flink 的 TableFunction https://segmentfault.com/a/11…
Event Time Temporal Join https://nightlies.apache.org/…
DataStream API https://nightlies.apache.org/…
《flink 内核原理与实现》4.4 章节 水印
《Flink 原理、实战与性能优化》4.2 章节 工夫概念与 Watermark
Watermark 机制 https://www.bilibili.com/vide…

正文完
 0