Flink总共有三种工夫语义:Processing time(解决工夫)、Event time(事件工夫)以及Ingestion time(摄入工夫)。对于这些工夫语义的具体解释,能够参考另一篇文章Flink的工夫与watermarks详解。本文次要解说Flink Table API & SQL中基于工夫的算子如何定义工夫语义。通过本文你能够理解到:
- 工夫属性的简介
- 解决工夫
- 事件工夫
工夫属性简介
Flink TableAPI&SQL中的基于工夫的操作(如window),须要指定工夫语义,表能够依据指定的工夫戳提供一个逻辑工夫属性。
工夫属性是表schama的一部分,当应用DDL创立表时、DataStream转为表时或者应用TableSource时,会定义工夫属性。一旦工夫属性被定义实现,该工夫属性能够看做是一个字段的援用,从而在基于工夫的操作中应用该字段。
工夫属性像一个工夫戳,能够被拜访并参加计算,如果一个工夫属性参加计算,那么该工夫属性会被雾化成一个惯例的工夫戳,惯例的工夫戳不能与Flink的工夫与水位线兼容,不能被基于工夫的操作所应用。
Flink TableAPI & SQL所须要的工夫属性能够通过Datastream程序中指定,如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 默认
// 能够抉择:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
解决工夫
基于本地的机器工夫,是一种最简略的工夫语义,然而不能保障后果一致性,应用该工夫语义不须要提取工夫戳和生成水位线。总共有三种形式定义解决工夫属性,具体如下
DDL语句创立表时定义解决工夫
解决工夫的属性能够在DDL语句中被定义为一个计算列,须要应用PROCTIME()函数,如下所示:
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time AS PROCTIME() -- 申明一个额定字段,作为解决工夫属性
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); -- 10分钟的滚动窗口
DataStream转为Table的过程中定义解决工夫
在将DataStream转为表时,在schema定义中能够通过.proctime属性指定工夫属性,并将其放在其余schema字段的最初面,具体如下:
DataStream<Tuple2<String, String>> stream = ...;
// 申明一个额定逻辑字段作为解决工夫属性
Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.proctime");
WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));
应用TableSource
自定义TableSource并实现DefinedProctimeAttribute
接口,如下:
// 定义个带有解决工夫属性的table source
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {
@Override
public TypeInformation<Row> getReturnType() {
String[] names = new String[] {"user_name" , "data"};
TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
return Types.ROW(names, types);
}
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
// 创立stream
DataStream<Row> stream = ...;
return stream;
}
@Override
public String getProctimeAttribute() {
// 该字段会追加到schema中,作为第三个字段
return "user_action_time";
}
}
// 注册table source
tEnv.registerTableSource("user_actions", new UserActionSource());
WindowedTable windowedTable = tEnv
.from("user_actions")
.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));
事件工夫
基于记录的具体工夫戳,即使是存在乱序或者早退数据也会保障后果的一致性。总共有三种形式定义解决工夫属性,具体如下
DDL语句创立表时定事件工夫
事件工夫属性能够通过 WATERMARK语句进行定义,如下:
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- 申明user_action_time作为事件工夫属性,并容许5S的提早
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
DataStream转为Table的过程中定义事件工夫
当定义Schema时通过.rowtime属性指定事件工夫属性,必须在DataStream中指定工夫戳与水位线。例如在数据集中,事件工夫属性为event_time,此时Table中的事件工夫字段中能够通过’event_time. rowtime‘来指定。
目前Flink反对两种形式定义EventTime字段,如下:
// 形式1:
// 提取timestamp并调配watermarks
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// 申明一个额定逻辑字段作为事件工夫属性
// 在table schema的开端应用user_action_time.rowtime定义事件工夫属性
// 零碎会在TableEnvironment中获取事件工夫属性
Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.rowtime");
// 形式2:
// 从第一个字段提取timestamp并调配watermarks
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
// 第一个字段曾经用来提取工夫戳,能够间接应用对应的字段作为事件工夫属性
Table table = tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data");
// 应用:
WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));
应用TableSource
另外也能够在创立TableSource的时候,实现DefinedRowtimeAttributes接口来定义EventTime字段,在接口中须要实现getRowtimeAttributeDescriptors办法,创立基于EventTime的工夫属性信息。
// 定义带有rowtime属性的table source
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {
@Override
public TypeInformation<Row> getReturnType() {
String[] names = new String[] {"user_name", "data", "user_action_time"};
TypeInformation[] types =
new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
return Types.ROW(names, types);
}
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
// 创立流,基于user_action_time属性调配水位线
DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
return stream;
}
@Override
public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
// 标记user_action_time字段作为事件工夫属性
// 创立user_action_time描述符,用来标识工夫属性字段
RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
"user_action_time",
new ExistingField("user_action_time"),
new AscendingTimestamps());
List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
return listRowtimeAttrDescr;
}
}
// register表
tEnv.registerTableSource("user_actions", new UserActionSource());
WindowedTable windowedTable = tEnv
.from("user_actions")
.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));
小结
本文次要介绍了如何在Flink Table API和SQL中应用工夫语义,能够应用两种工夫语义:解决工夫和事件工夫。别离对每种的工夫语义的应用形式进行了具体解释。
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包
发表回复