关于flink:Flink-Table-APISQL编程指南之时间属性3

Flink总共有三种工夫语义:Processing time(解决工夫)、Event time(事件工夫)以及Ingestion time(摄入工夫)。对于这些工夫语义的具体解释,能够参考另一篇文章Flink的工夫与watermarks详解。本文次要解说Flink Table API & SQL中基于工夫的算子如何定义工夫语义。通过本文你能够理解到:

  • 工夫属性的简介
  • 解决工夫
  • 事件工夫

工夫属性简介

Flink TableAPI&SQL中的基于工夫的操作(如window),须要指定工夫语义,表能够依据指定的工夫戳提供一个逻辑工夫属性。

工夫属性是表schama的一部分,当应用DDL创立表时、DataStream转为表时或者应用TableSource时,会定义工夫属性。一旦工夫属性被定义实现,该工夫属性能够看做是一个字段的援用,从而在基于工夫的操作中应用该字段。

工夫属性像一个工夫戳,能够被拜访并参加计算,如果一个工夫属性参加计算,那么该工夫属性会被雾化成一个惯例的工夫戳,惯例的工夫戳不能与Flink的工夫与水位线兼容,不能被基于工夫的操作所应用。

Flink TableAPI & SQL所须要的工夫属性能够通过Datastream程序中指定,如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 默认

// 能够抉择:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

解决工夫

基于本地的机器工夫,是一种最简略的工夫语义,然而不能保障后果一致性,应用该工夫语义不须要提取工夫戳和生成水位线。总共有三种形式定义解决工夫属性,具体如下

DDL语句创立表时定义解决工夫

解决工夫的属性能够在DDL语句中被定义为一个计算列,须要应用PROCTIME()函数,如下所示:

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time AS PROCTIME() -- 申明一个额定字段,作为解决工夫属性
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); -- 10分钟的滚动窗口

DataStream转为Table的过程中定义解决工夫

在将DataStream转为表时,在schema定义中能够通过.proctime属性指定工夫属性,并将其放在其余schema字段的最初面,具体如下:

DataStream<Tuple2<String, String>> stream = ...;
// 申明一个额定逻辑字段作为解决工夫属性
Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.proctime");

WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

应用TableSource

自定义TableSource并实现DefinedProctimeAttribute 接口,如下:

// 定义个带有解决工夫属性的table source
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {

    @Override
    public TypeInformation<Row> getReturnType() {
        String[] names = new String[] {"user_name" , "data"};
        TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
        return Types.ROW(names, types);
    }

    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
        // 创立stream
        DataStream<Row> stream = ...;
        return stream;
    }

    @Override
    public String getProctimeAttribute() {
        // 该字段会追加到schema中,作为第三个字段
        return "user_action_time";
    }
}

// 注册table source
tEnv.registerTableSource("user_actions", new UserActionSource());

WindowedTable windowedTable = tEnv
    .from("user_actions")
    .window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

事件工夫

基于记录的具体工夫戳,即使是存在乱序或者早退数据也会保障后果的一致性。总共有三种形式定义解决工夫属性,具体如下

DDL语句创立表时定事件工夫

事件工夫属性能够通过 WATERMARK语句进行定义,如下:

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- 申明user_action_time作为事件工夫属性,并容许5S的提早  
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

DataStream转为Table的过程中定义事件工夫

当定义Schema时通过.rowtime属性指定事件工夫属性,必须在DataStream中指定工夫戳与水位线。例如在数据集中,事件工夫属性为event_time,此时Table中的事件工夫字段中能够通过’event_time. rowtime‘来指定。

目前Flink反对两种形式定义EventTime字段,如下:

// 形式1:
// 提取timestamp并调配watermarks
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// 申明一个额定逻辑字段作为事件工夫属性
// 在table schema的开端应用user_action_time.rowtime定义事件工夫属性
// 零碎会在TableEnvironment中获取事件工夫属性
Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.rowtime");

// 形式2:

// 从第一个字段提取timestamp并调配watermarks
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// 第一个字段曾经用来提取工夫戳,能够间接应用对应的字段作为事件工夫属性
Table table = tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data");

// 应用:

WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

应用TableSource

另外也能够在创立TableSource的时候,实现DefinedRowtimeAttributes接口来定义EventTime字段,在接口中须要实现getRowtimeAttributeDescriptors办法,创立基于EventTime的工夫属性信息。

// 定义带有rowtime属性的table source
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {

    @Override
    public TypeInformation<Row> getReturnType() {
        String[] names = new String[] {"user_name", "data", "user_action_time"};
        TypeInformation[] types =
            new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
        return Types.ROW(names, types);
    }

    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {

        // 创立流,基于user_action_time属性调配水位线
        DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
        return stream;
    }

    @Override
    public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
        // 标记user_action_time字段作为事件工夫属性
        // 创立user_action_time描述符,用来标识工夫属性字段
        RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
            "user_action_time",
            new ExistingField("user_action_time"),
            new AscendingTimestamps());
        List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
        return listRowtimeAttrDescr;
    }
}

// register表
tEnv.registerTableSource("user_actions", new UserActionSource());

WindowedTable windowedTable = tEnv
    .from("user_actions")
    .window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

小结

本文次要介绍了如何在Flink Table API和SQL中应用工夫语义,能够应用两种工夫语义:解决工夫和事件工夫。别离对每种的工夫语义的应用形式进行了具体解释。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理