乐趣区

关于flink:Flink-Table-APISQL编程指南之时间属性3

Flink 总共有三种工夫语义:Processing time(解决工夫)、Event time(事件工夫) 以及 Ingestion time(摄入工夫)。对于这些工夫语义的具体解释,能够参考另一篇文章 Flink 的工夫与 watermarks 详解。本文次要解说 Flink Table API & SQL 中基于工夫的算子如何定义工夫语义。通过本文你能够理解到:

  • 工夫属性的简介
  • 解决工夫
  • 事件工夫

工夫属性简介

Flink TableAPI&SQL 中的基于工夫的操作 (如 window),须要指定工夫语义,表能够依据指定的工夫戳提供一个逻辑工夫属性。

工夫属性是表 schama 的一部分,当应用 DDL 创立表时、DataStream 转为表时或者应用 TableSource 时,会定义工夫属性。一旦工夫属性被定义实现,该工夫属性能够看做是一个字段的援用,从而在基于工夫的操作中应用该字段。

工夫属性像一个工夫戳,能够被拜访并参加计算,如果一个工夫属性参加计算,那么该工夫属性会被雾化成一个惯例的工夫戳,惯例的工夫戳不能与 Flink 的工夫与水位线兼容,不能被基于工夫的操作所应用。

Flink TableAPI & SQL 所须要的工夫属性能够通过 Datastream 程序中指定,如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 默认

// 能够抉择:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

解决工夫

基于本地的机器工夫,是一种最简略的工夫语义,然而不能保障后果一致性,应用该工夫语义不须要提取工夫戳和生成水位线。总共有三种形式定义解决工夫属性,具体如下

DDL 语句创立表时定义解决工夫

解决工夫的属性能够在 DDL 语句中被定义为一个计算列,须要应用 PROCTIME() 函数,如下所示:

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time AS PROCTIME() -- 申明一个额定字段,作为解决工夫属性) WITH (...);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); -- 10 分钟的滚动窗口 

DataStream 转为 Table 的过程中定义解决工夫

在将 DataStream 转为表时,在 schema 定义中能够通过.proctime 属性指定工夫属性,并将其放在其余 schema 字段的最初面,具体如下:

DataStream<Tuple2<String, String>> stream = ...;
// 申明一个额定逻辑字段作为解决工夫属性
Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.proctime");

WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

应用 TableSource

自定义 TableSource 并实现 DefinedProctimeAttribute 接口,如下:

// 定义个带有解决工夫属性的 table source
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {

    @Override
    public TypeInformation<Row> getReturnType() {String[] names = new String[] {"user_name" , "data"};
        TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
        return Types.ROW(names, types);
    }

    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
        // 创立 stream
        DataStream<Row> stream = ...;
        return stream;
    }

    @Override
    public String getProctimeAttribute() {
        // 该字段会追加到 schema 中,作为第三个字段
        return "user_action_time";
    }
}

// 注册 table source
tEnv.registerTableSource("user_actions", new UserActionSource());

WindowedTable windowedTable = tEnv
    .from("user_actions")
    .window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

事件工夫

基于记录的具体工夫戳,即使是存在乱序或者早退数据也会保障后果的一致性。总共有三种形式定义解决工夫属性,具体如下

DDL 语句创立表时定事件工夫

事件工夫属性能够通过 WATERMARK 语句进行定义,如下:

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- 申明 user_action_time 作为事件工夫属性,并容许 5S 的提早  
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (...);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

DataStream 转为 Table 的过程中定义事件工夫

当定义 Schema 时通过.rowtime 属性指定事件工夫属性,必须在 DataStream 中指定工夫戳与水位线。例如在数据集中,事件工夫属性为 event_time,此时 Table 中的事件工夫字段中能够通过’event_time. rowtime‘来指定。

目前 Flink 反对两种形式定义 EventTime 字段,如下:

// 形式 1:
// 提取 timestamp 并调配 watermarks
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// 申明一个额定逻辑字段作为事件工夫属性
// 在 table schema 的开端应用 user_action_time.rowtime 定义事件工夫属性
// 零碎会在 TableEnvironment 中获取事件工夫属性
Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.rowtime");

// 形式 2:

// 从第一个字段提取 timestamp 并调配 watermarks
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// 第一个字段曾经用来提取工夫戳,能够间接应用对应的字段作为事件工夫属性
Table table = tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data");

// 应用:

WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

应用 TableSource

另外也能够在创立 TableSource 的时候,实现 DefinedRowtimeAttributes 接口来定义 EventTime 字段,在接口中须要实现 getRowtimeAttributeDescriptors 办法,创立基于 EventTime 的工夫属性信息。

// 定义带有 rowtime 属性的 table source
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {

    @Override
    public TypeInformation<Row> getReturnType() {String[] names = new String[] {"user_name", "data", "user_action_time"};
        TypeInformation[] types =
            new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
        return Types.ROW(names, types);
    }

    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {

        // 创立流,基于 user_action_time 属性调配水位线
        DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
        return stream;
    }

    @Override
    public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
        // 标记 user_action_time 字段作为事件工夫属性
        // 创立 user_action_time 描述符,用来标识工夫属性字段
        RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
            "user_action_time",
            new ExistingField("user_action_time"),
            new AscendingTimestamps());
        List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
        return listRowtimeAttrDescr;
    }
}

// register 表
tEnv.registerTableSource("user_actions", new UserActionSource());

WindowedTable windowedTable = tEnv
    .from("user_actions")
    .window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

小结

本文次要介绍了如何在 Flink Table API 和 SQL 中应用工夫语义,能够应用两种工夫语义:解决工夫和事件工夫。别离对每种的工夫语义的应用形式进行了具体解释。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

退出移动版