共计 5099 个字符,预计需要花费 13 分钟才能阅读完成。
Flink 总共有三种工夫语义:Processing time(解决工夫)、Event time(事件工夫) 以及 Ingestion time(摄入工夫)。对于这些工夫语义的具体解释,能够参考另一篇文章 Flink 的工夫与 watermarks 详解。本文次要解说 Flink Table API & SQL 中基于工夫的算子如何定义工夫语义。通过本文你能够理解到:
- 工夫属性的简介
- 解决工夫
- 事件工夫
工夫属性简介
Flink TableAPI&SQL 中的基于工夫的操作 (如 window),须要指定工夫语义,表能够依据指定的工夫戳提供一个逻辑工夫属性。
工夫属性是表 schama 的一部分,当应用 DDL 创立表时、DataStream 转为表时或者应用 TableSource 时,会定义工夫属性。一旦工夫属性被定义实现,该工夫属性能够看做是一个字段的援用,从而在基于工夫的操作中应用该字段。
工夫属性像一个工夫戳,能够被拜访并参加计算,如果一个工夫属性参加计算,那么该工夫属性会被雾化成一个惯例的工夫戳,惯例的工夫戳不能与 Flink 的工夫与水位线兼容,不能被基于工夫的操作所应用。
Flink TableAPI & SQL 所须要的工夫属性能够通过 Datastream 程序中指定,如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 默认
// 能够抉择:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
解决工夫
基于本地的机器工夫,是一种最简略的工夫语义,然而不能保障后果一致性,应用该工夫语义不须要提取工夫戳和生成水位线。总共有三种形式定义解决工夫属性,具体如下
DDL 语句创立表时定义解决工夫
解决工夫的属性能够在 DDL 语句中被定义为一个计算列,须要应用 PROCTIME() 函数,如下所示:
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time AS PROCTIME() -- 申明一个额定字段,作为解决工夫属性) WITH (...);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); -- 10 分钟的滚动窗口
DataStream 转为 Table 的过程中定义解决工夫
在将 DataStream 转为表时,在 schema 定义中能够通过.proctime 属性指定工夫属性,并将其放在其余 schema 字段的最初面,具体如下:
DataStream<Tuple2<String, String>> stream = ...;
// 申明一个额定逻辑字段作为解决工夫属性
Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.proctime");
WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));
应用 TableSource
自定义 TableSource 并实现 DefinedProctimeAttribute
接口,如下:
// 定义个带有解决工夫属性的 table source
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
@Override
public TypeInformation<Row> getReturnType() {String[] names = new String[] {"user_name" , "data"};
TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
return Types.ROW(names, types);
}
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
// 创立 stream
DataStream<Row> stream = ...;
return stream;
}
@Override
public String getProctimeAttribute() {
// 该字段会追加到 schema 中,作为第三个字段
return "user_action_time";
}
}
// 注册 table source
tEnv.registerTableSource("user_actions", new UserActionSource());
WindowedTable windowedTable = tEnv
.from("user_actions")
.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));
事件工夫
基于记录的具体工夫戳,即使是存在乱序或者早退数据也会保障后果的一致性。总共有三种形式定义解决工夫属性,具体如下
DDL 语句创立表时定事件工夫
事件工夫属性能够通过 WATERMARK 语句进行定义,如下:
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- 申明 user_action_time 作为事件工夫属性,并容许 5S 的提早
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (...);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
DataStream 转为 Table 的过程中定义事件工夫
当定义 Schema 时通过.rowtime 属性指定事件工夫属性,必须在 DataStream 中指定工夫戳与水位线。例如在数据集中,事件工夫属性为 event_time,此时 Table 中的事件工夫字段中能够通过’event_time. rowtime‘来指定。
目前 Flink 反对两种形式定义 EventTime 字段,如下:
// 形式 1:
// 提取 timestamp 并调配 watermarks
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// 申明一个额定逻辑字段作为事件工夫属性
// 在 table schema 的开端应用 user_action_time.rowtime 定义事件工夫属性
// 零碎会在 TableEnvironment 中获取事件工夫属性
Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.rowtime");
// 形式 2:
// 从第一个字段提取 timestamp 并调配 watermarks
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// 第一个字段曾经用来提取工夫戳,能够间接应用对应的字段作为事件工夫属性
Table table = tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data");
// 应用:
WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));
应用 TableSource
另外也能够在创立 TableSource 的时候,实现 DefinedRowtimeAttributes 接口来定义 EventTime 字段,在接口中须要实现 getRowtimeAttributeDescriptors 办法,创立基于 EventTime 的工夫属性信息。
// 定义带有 rowtime 属性的 table source
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {
@Override
public TypeInformation<Row> getReturnType() {String[] names = new String[] {"user_name", "data", "user_action_time"};
TypeInformation[] types =
new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
return Types.ROW(names, types);
}
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
// 创立流,基于 user_action_time 属性调配水位线
DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
return stream;
}
@Override
public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
// 标记 user_action_time 字段作为事件工夫属性
// 创立 user_action_time 描述符,用来标识工夫属性字段
RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
"user_action_time",
new ExistingField("user_action_time"),
new AscendingTimestamps());
List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
return listRowtimeAttrDescr;
}
}
// register 表
tEnv.registerTableSource("user_actions", new UserActionSource());
WindowedTable windowedTable = tEnv
.from("user_actions")
.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));
小结
本文次要介绍了如何在 Flink Table API 和 SQL 中应用工夫语义,能够应用两种工夫语义:解决工夫和事件工夫。别离对每种的工夫语义的应用形式进行了具体解释。
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包