乐趣区

关于flink:Flink-Table-API-SQL编程指南1

Apache Flink 提供了两种顶层的关系型 API,别离为 Table API 和 SQL,Flink 通过 Table API&SQL 实现了批流对立。其中 Table API 是用于 Scala 和 Java 的语言集成查问 API,它容许以十分直观的形式组合关系运算符(例如 select,where 和 join)的查问。Flink SQL 基于 Apache Calcite 实现了规范的 SQL,用户能够应用规范的 SQL 解决数据集。Table API 和 SQL 与 Flink 的 DataStream 和 DataSet API 严密集成在一起,用户能够实现互相转化,比方能够将 DataStream 或者 DataSet 注册为 table 进行操作数据。值得注意的是,Table API and SQL目前尚未齐全欠缺,还在踊跃的开发中,所以并不是所有的算子操作都能够通过其实现。

<!– more –>

依赖

从 Flink1.9 开始,Flink 为 Table & SQL API 提供了两种 planner, 别离为 Blink planner 和 old planner,其中 old planner 是在 Flink1.9 之前的版本应用。次要区别如下:

尖叫提醒:对于生产环境,目前举荐应用 old planner.

  • flink-table-common: 通用模块,蕴含 Flink Planner 和 Blink Planner 一些共用的代码
  • flink-table-api-java: java 语言的 Table & SQL API,仅针对 table(处于晚期的开发阶段,不举荐应用)
  • flink-table-api-scala: scala 语言的 Table & SQL API,仅针对 table(处于晚期的开发阶段,不举荐应用)
  • flink-table-api-java-bridge: java 语言的 Table & SQL API,反对 DataStream/DataSet API(举荐应用)
  • flink-table-api-scala-bridge: scala 语言的 Table & SQL API,反对 DataStream/DataSet API(举荐应用)
  • flink-table-planner:planner 和 runtime. planner 为 Flink1,9 之前的 old planner(举荐应用)
  • flink-table-planner-blink: 新的 Blink planner.
  • flink-table-runtime-blink: 新的 Blink runtime.
  • flink-table-uber: 将上述的 API 模块及 old planner 打成一个 jar 包,形如 flink-table-*.jar,位与 /lib 目录下
  • flink-table-uber-blink: 将上述的 API 模块及 Blink 模块打成一个 jar 包,形如 fflink-table-blink-*.jar,位与 /lib 目录下

Blink planner & old planner

Blink planner 和 old planner 有许多不同的特点,具体列举如下:

  • Blink planner 将批处理作业看做是流解决作业的特例。所以,不反对 Table 与 DataSet 之间的转换,批处理的作业也不会被转成 DataSet 程序,而是被转为 DataStream 程序。
  • Blink planner 不反对 BatchTableSource,应用的是有界的 StreamTableSource。
  • Blink planner 仅反对新的 Catalog,不反对ExternalCatalog (已过期)。
  • 对于 FilterableTableSource 的实现,两种 Planner 是不同的。old planner 会谓词下推到PlannerExpression(将来会被移除),而 Blink planner 会谓词下推到 Expression(示意一个产生计算结果的逻辑树)。
  • 仅仅 Blink planner 反对 key-value 模式的配置,即通过 Configuration 进行参数设置。
  • 对于 PlannerConfig 的实现,两种 planner 有所不同。
  • Blink planner 会将多个 sink 优化成一个 DAG(仅反对 TableEnvironment,StreamTableEnvironment 不反对),old planner 总是将每一个 sink 优化成一个新的 DAG,每一个 DAG 都是互相独立的。
  • old planner 不反对 catalog 统计,Blink planner 反对 catalog 统计。

Flink Table & SQL 程序的 pom 依赖

依据应用的语言不同,能够抉择上面的依赖,包含 scala 版和 java 版,如下:

<!-- java 版 -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>
<!-- scala 版 -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

除此之外,如果须要在本地的 IDE 中运行 Table API & SQL 的程序,则须要增加上面的 pom 依赖:

<!-- Flink 1.9 之前的 old planner -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>
<!-- 新的 Blink planner -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

另外,如果须要实现自定义的格局 (比方和 kafka 交互) 或者用户自定义函数,须要增加如下依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-common</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>

Table API & SQL 的编程模板

所有的 Table API&SQL 的程序 (无论是批处理还是流解决) 都有着雷同的模式,上面将给出通用的编程构造模式:

// 创立一个 TableEnvironment 对象,指定 planner、解决模式(batch、streaming)
TableEnvironment tableEnv = ...; 
// 创立一个表
tableEnv.connect(...).createTemporaryTable("table1");
// 注册一个内部的表
tableEnv.connect(...).createTemporaryTable("outputTable");
// 通过 Table API 的查问创立一个 Table 对象
Table tapiResult = tableEnv.from("table1").select(...);
// 通过 SQL 查问的查问创立一个 Table 对象
Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ...");
// 将后果写入 TableSink
tapiResult.insertInto("outputTable");
// 执行
tableEnv.execute("java_job");

留神:Table API & SQL 的查问能够互相集成,另外还能够在 DataStream 或者 DataSet 中应用 Table API & SQL 的 API,实现 DataStreams、DataSet 与 Table 之间的互相转换。

创立 TableEnvironment

TableEnvironment 是 Table API & SQL 程序的一个入口,次要包含如下的性能:

  • 在外部的 catalog 中注册 Table
  • 注册 catalog
  • 加载可插拔模块
  • 执行 SQL 查问
  • 注册用户定义函数
  • DataStreamDataSet与 Table 之间的互相转换
  • 持有对 ExecutionEnvironmentStreamExecutionEnvironment 的援用

一个 Table 必然属于一个具体的 TableEnvironment,不能够将不同 TableEnvironment 的表放在一起应用(比方 join,union 等操作)。

TableEnvironment 是通过调用 BatchTableEnvironment.create() 或者 StreamTableEnvironment.create()的静态方法进行创立的。另外,默认两个 planner 的 jar 包都存在与 classpath 下,所有须要明确指定应用的 planner。

// **********************
// FLINK 流解决查问
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;

EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// 或者 TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);

// ******************
// FLINK 批处理查问
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;

ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

// **********************
// BLINK 流解决查问
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;

StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// 或者 TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);

// ******************
// BLINK 批处理查问
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

在 catalog 中创立表

长期表与永恒表

表能够分为长期表和永恒表两种,其中永恒表须要一个 catalog(比方 Hive 的 Metastore)俩保护表的元数据信息,一旦永恒表被创立,只有连贯到该 catalog 就能够拜访该表,只有显示删除永恒表,该表才能够被删除。长期表的生命周期是 Flink Session,这些表不可能被其余的 Flink Session 拜访,这些表不属于任何的 catalog 或者数据库,如果与长期表绝对应的数据库被删除了,该长期表也不会被删除。

创立表

虚表(Virtual Tables)

一个 Table 对象相当于 SQL 中的视图(虚表),它封装了一个逻辑执行打算,能够通过一个 catalog 创立,具体如下:

// 获取一个 TableEnvironment
TableEnvironment tableEnv = ...; 
// table 对象,查问的后果集
Table projTable = tableEnv.from("X").select(...);
// 注册一个表,名称为 "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);

内部数据源表(Connector Tables)

能够把内部的数据源注册成表,比方能够读取 MySQL 数据库数据、Kafka 数据等

tableEnvironment
  .connect(...)
  .withFormat(...)
  .withSchema(...)
  .inAppendMode()
  .createTemporaryTable("MyTable")

扩大创立表的标识属性

表的注册总是蕴含三局部标识属性:catalog、数据库、表名。用户能够在外部设置一个 catalog 和一个数据库作为以后的 catalog 和数据库,所以对于 catalog 和数据库这两个标识属性是可选的,即如果不指定,默认应用的是“current catalog”和“current database”。

TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");// 设置 catalog
tEnv.useDatabase("custom_database");// 设置数据库
Table table = ...;
// 注册一个名为 exampleView 的视图,catalog 名为 custom_catalog
// 数据库的名为 custom_database
tableEnv.createTemporaryView("exampleView", table);

// 注册一个名为 exampleView 的视图,catalog 的名为 custom_catalog
// 数据库的名为 other_database
tableEnv.createTemporaryView("other_database.exampleView", table);
 
// 注册一个名为 'View' 的视图,catalog 的名称为 custom_catalog
// 数据库的名为 custom_database,'View' 是保留关键字,须要应用 ``(反引号)
tableEnv.createTemporaryView("`View`", table);

// 注册一个名为 example.View 的视图,catalog 的名为 custom_catalog,// 数据库名为 custom_database
tableEnv.createTemporaryView("`example.View`", table);

// 注册一个名为 'exampleView' 的视图,catalog 的名为 'other_catalog'
// 数据库名为 other_database'tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);

查问表

Table API

Table API 是一个集成 Scala 与 Java 语言的查问 API,与 SQL 相比,它的查问不是一个规范的 SQL 语句,而是由一步一步的操作组成的。如下展现了一个应用 Table API 实现一个简略的聚合查问。

// 获取 TableEnvironment
TableEnvironment tableEnv = ...;
// 注册 Orders 表

// 查问注册的表
Table orders = tableEnv.from("Orders");
// 计算操作
Table revenue = orders
  .filter("cCountry ==='FRANCE'")
  .groupBy("cID, cName")
  .select("cID, cName, revenue.sum AS revSum");

SQL

Flink SQL 依赖于 Apache Calcite,其实现了规范的 SQL 语法,如下案例:

// 获取 TableEnvironment
TableEnvironment tableEnv = ...;

// 注册 Orders 表

// 计算逻辑同下面的 Table API
Table revenue = tableEnv.sqlQuery("SELECT cID, cName, SUM(revenue) AS revSum" +
    "FROM Orders" +
    "WHERE cCountry ='FRANCE'" +"GROUP BY cID, cName"
  );

// 注册 "RevenueFrance" 内部输出表
// 计算结果插入 "RevenueFrance" 表
tableEnv.sqlUpdate(
    "INSERT INTO RevenueFrance" +
    "SELECT cID, cName, SUM(revenue) AS revSum" +
    "FROM Orders" +
    "WHERE cCountry ='FRANCE'" +"GROUP BY cID, cName"
  );

输出表

一个表通过将其写入到 TableSink,而后进行输入。TableSink 是一个通用的反对多种文件格式 (CSV、Parquet, Avro) 和多种内部存储系统 (JDBC, Apache HBase, Apache Cassandra, Elasticsearch) 以及多种音讯对列 (Apache Kafka, RabbitMQ) 的接口。

批处理的表只能被写入到 BatchTableSink, 流解决的表须要指明 AppendStreamTableSink、RetractStreamTableSink 或者 UpsertStreamTableSink

// 获取 TableEnvironment
TableEnvironment tableEnv = ...;

// 创立输出表
final Schema schema = new Schema()
    .field("a", DataTypes.INT())
    .field("b", DataTypes.STRING())
    .field("c", DataTypes.LONG());

tableEnv.connect(new FileSystem("/path/to/file"))
    .withFormat(new Csv().fieldDelimiter('|').deriveSchema())
    .withSchema(schema)
    .createTemporaryTable("CsvSinkTable");

// 计算结果表
Table result = ...
// 输入后果表到注册的 TableSink
result.insertInto("CsvSinkTable");

Table API & SQL 底层的转换与执行

上文提到了 Flink 提供了两种 planner,别离为 old planner 和 Blink planner,对于不同的 planner 而言,Table API & SQL 底层的执行与转换是有所不同的。

Old planner

依据是流解决作业还是批处理作业,Table API &SQL 会被转换成 DataStream 或者 DataSet 程序。一个查问在外部示意为一个逻辑查问打算,会被转换为两个阶段:

  • 1. 逻辑查问打算优化
  • 2. 转换成 DataStream 或者 DataSet 程序

下面的两个阶段只有上面的操作被执行时才会被执行:

  • 当一个表被输入到 TableSink 时,比方调用了 Table.insertInto()办法
  • 当执行更新查问时,比方调用 TableEnvironment.sqlUpdate()办法
  • 当一个表被转换为 DataStream 或者 DataSet 时

一旦执行上述两个阶段,Table API & SQL 的操作会被看做是一般的 DataStream 或者 DataSet 程序,所以当 StreamExecutionEnvironment.execute() 或者ExecutionEnvironment.execute() 被调用时,会执行转换后的程序。

Blink planner

无论是批处理作业还是流解决作业,如果应用的是 Blink planner,底层都会被转换为 DataStream 程序。在一个查问在外部示意为一个逻辑查问打算,会被转换成两个阶段:

  • 1. 逻辑查问打算优化
  • 2. 转换成 DataStream 程序

对于 TableEnvironment and StreamTableEnvironment 而言,一个查问的转换是不同的

首先对于 TableEnvironment,当 TableEnvironment.execute()办法执行时,Table API & SQL 的查问才会被转换,因为 TableEnvironment 会将多个 sink 优化为一个 DAG。

对于 StreamTableEnvironment,转换产生的工夫与 old planner 雷同。

与 DataStream & DataSet API 集成

对于 Old planner 与 Blink planner 而言,只有是流解决的操作,都能够与 DataStream API 集成,仅仅只有 Old planner 才能够与 DataSet API 集成,因为 Blink planner 的批处理作业会被转换成 DataStream 程序,所以不可能与 DataSet API 集成。值得注意的是,上面提到的 table 与 DataSet 之间的转换仅实用于 Old planner。

Table API & SQL 的查问很容易与 DataStream 或者 DataSet 程序集成,并能够将 Table API & SQL 的查问嵌入 DataStream 或者 DataSet 程序中。DataStream 或者 DataSet 能够转换成表,反之,表也能够被转换成 DataStream 或者 DataSet。

从 DataStream 或者 DataSet 中注册长期表(视图)

尖叫提醒:只能将 DataStream 或者 DataSet 转换为长期表(视图)

上面演示 DataStream 的转换,对于 DataSet 的转换相似。

// 获取 StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
DataStream<Tuple2<Long, String>> stream = ...
// 将 DataStream 注册为一个名为 myTable 的视图,其中字段别离为 "f0", "f1"
tableEnv.createTemporaryView("myTable", stream);
// 将 DataStream 注册为一个名为 myTable2 的视图, 其中字段别离为 "myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, "myLong, myString");

将 DataStream 或者 DataSet 转化为 Table 对象

能够间接将 DataStream 或者 DataSet 转换为 Table 对象,之后能够应用 Table API 进行查问操作。上面演示 DataStream 的转换,对于 DataSet 的转换相似。

// 获取 StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
DataStream<Tuple2<Long, String>> stream = ...
// 将 DataStream 转换为 Table 对象,默认的字段为 "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);
// 将 DataStream 转换为 Table 对象,默认的字段为 "myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");

将表转换为 DataStream 或者 DataSet

当将 Table 转为 DataStream 或者 DataSet 时,须要指定 DataStream 或者 DataSet 的数据类型。通常最不便的数据类型是 row 类型,Flink 提供了很多的数据类型供用户抉择,具体包含 Row、POJO、样例类、Tuple 和原子类型。

将表转换为 DataStream

一个流解决查问的后果是动态变化的,所以将表转为 DataStream 时须要指定一个更新模式,共有两种模式:Append ModeRetract Mode

  • Append Mode

如果动静表仅只有 Insert 操作,即之前输入的后果不会被更新,则应用该模式。如果更新或删除操作应用追加模式会失败报错

  • Retract Mode

始终能够应用此模式。返回值是 boolean 类型。它用 true 或 false 来标记数据的插入和撤回,返回 true 代表数据插入,false 代表数据的撤回。

// 获取 StreamTableEnvironment. 
StreamTableEnvironment tableEnv = ...; 
// 蕴含两个字段的表(String name, Integer age)
Table table = ...
// 将表转为 DataStream,应用 Append Mode 追加模式,数据类型为 Row
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// 将表转为 DataStream,应用 Append Mode 追加模式,数据类型为定义好的 TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING(),
  Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple = 
  tableEnv.toAppendStream(table, tupleType);
// 将表转为 DataStream,应用的模式为 Retract Mode 撤回模式,类型为 Row
// 对于转换后的 DataStream<Tuple2<Boolean, X>>,X 示意流的数据类型,// boolean 值示意数据扭转的类型,其中 INSERT 返回 true,DELETE 返回的是 false
DataStream<Tuple2<Boolean, Row>> retractStream = 
  tableEnv.toRetractStream(table, Row.class);

将表转换为 DataSet

// 获取 BatchTableEnvironment
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
// 蕴含两个字段的表(String name, Integer age)
Table table = ...
// 将表转为 DataSet 数据类型为 Row
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
// 将表转为 DataSet,通过 TypeInformation 定义 Tuple2<String, Integer> 数据类型
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(Types.STRING(),
  Types.INT());
DataSet<Tuple2<String, Integer>> dsTuple = 
  tableEnv.toDataSet(table, tupleType);

表的 Schema 与数据类型之间的映射

表的 Schema 与数据类型之间的映射有两种形式:别离是基于字段下标地位的映射和基于字段名称的映射。

基于字段下标地位的映射

该形式是依照字段的程序进行一一映射,应用形式如下:

// 获取 StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
DataStream<Tuple2<Long, Integer>> stream = ...
// 将 DataStream 转为表,默认的字段名为 "f0" 和 "f1"
Table table = tableEnv.fromDataStream(stream);
// 将 DataStream 转为表,选取 tuple 的第一个元素,指定一个名为 "myLong" 的字段名
Table table = tableEnv.fromDataStream(stream, "myLong");
// 将 DataStream 转为表,为 tuple 的第一个元素指定名为 "myLong",为第二个元素指定 myInt 的字段名
Table table = tableEnv.fromDataStream(stream, "myLong, myInt");

基于字段名称的映射

基于字段名称的映射形式反对任意的数据类型包含 POJO 类型,能够很灵便地定义表 Schema 映射,所有的字段被映射成一个具体的字段名称,同时也能够应用 ”as” 为字段起一个别名。其中 Tuple 元素的第一个元素为 f0, 第二个元素为 f1,以此类推。

// 获取 StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
DataStream<Tuple2<Long, Integer>> stream = ...
// 将 DataStream 转为表,默认的字段名为 "f0" 和 "f1"
Table table = tableEnv.fromDataStream(stream);
// 将 DataStream 转为表,抉择 tuple 的第二个元素,指定一个名为 "f1" 的字段名
Table table = tableEnv.fromDataStream(stream, "f1");
// 将 DataStream 转为表,替换字段的程序
Table table = tableEnv.fromDataStream(stream, "f1, f0");
// 将 DataStream 转为表,替换字段的程序,并为 f1 起别名为 "myInt",为 f0 起别名为 "myLong
Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong");

原子类型

Flink 将 Integer, Double, String 或者一般的类型称之为原子类型,一个数据类型为原子类型的 DataStream 或者 DataSet 能够被转成单个字段属性的表,这个字段的类型与 DataStream 或者 DataSet 的数据类型统一,这个字段的名称能够进行指定。

// 获取 StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
// 数据类型为原子类型 Long
DataStream<Long> stream = ...
// 将 DataStream 转为表,默认的字段名为 "f0"
Table table = tableEnv.fromDataStream(stream);
// 将 DataStream 转为表,指定字段名为 myLong"Table table = tableEnv.fromDataStream(stream,"myLong");

Tuple 类型

Tuple 类型的 DataStream 或者 DataSet 都能够转为表,能够从新设定表的字段名(即依据 tuple 元素的地位进行一一映射,转为表之后,每个元素都有一个别名),如果不为字段指定名称,则应用默认的名称(java 语言默认的是 f0,f1,scala 默认的是_1), 用户也能够重新排列字段的程序,并为每个字段起一个别名。

// 获取 StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
//Tuple2<Long, String> 类型的 DataStream
DataStream<Tuple2<Long, String>> stream = ...
// 将 DataStream 转为表,默认的字段名为 "f0", "f1"
Table table = tableEnv.fromDataStream(stream);
// 将 DataStream 转为表,指定字段名为 "myLong", "myString"(依照 Tuple 元素的程序地位)
Table table = tableEnv.fromDataStream(stream, "myLong, myString");
// 将 DataStream 转为表,指定字段名为 "f0", "f1",并且替换程序
Table table = tableEnv.fromDataStream(stream, "f1, f0");
// 将 DataStream 转为表,只抉择 Tuple 的第二个元素,指定字段名为 "f1"
Table table = tableEnv.fromDataStream(stream, "f1");
// 将 DataStream 转为表,为 Tuple 的第二个元素指定别名为 myString,为第一个元素指定字段名为 myLong
Table table = tableEnv.fromDataStream(stream, "f1 as'myString', f0 as'myLong'");

POJO 类型

当将 POJO 类型的 DataStream 或者 DataSet 转为表时,如果不指定表名,则默认应用的是 POJO 字段自身的名称,原始字段名称的映射须要指定原始字段的名称,能够为其起一个别名,也能够调换字段的程序,也能够只抉择局部的字段。

// 获取 StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
// 数据类型为 Person 的 POJO 类型,字段包含 "name" 和 "age"
DataStream<Person> stream = ...
// 将 DataStream 转为表,默认的字段名称为 "age", "name"
Table table = tableEnv.fromDataStream(stream);
//  将 DataStream 转为表,为 "age" 字段指定别名 myAge, 为 "name" 字段指定别名 myName
Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName");
//  将 DataStream 转为表,只抉择一个 name 字段
Table table = tableEnv.fromDataStream(stream, "name");
//  将 DataStream 转为表,只抉择一个 name 字段,并起一个别名 myName
Table table = tableEnv.fromDataStream(stream, "name as myName");

Row 类型

Row 类型的 DataStream 或者 DataSet 转为表的过程中,能够依据字段的地位或者字段名称进行映射,同时也能够为字段起一个别名,或者只抉择局部字段。

// 获取 StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
// Row 类型的 DataStream,通过 RowTypeInfo 指定两个字段 "name" 和 "age"
DataStream<Row> stream = ...
// 将 DataStream 转为表,默认的字段名为原始字段名 "name" 和 "age"
Table table = tableEnv.fromDataStream(stream);
// 将 DataStream 转为表,依据地位映射,为第一个字段指定 myName 别名,为第二个字段指定 myAge 别名
Table table = tableEnv.fromDataStream(stream, "myName, myAge");
// 将 DataStream 转为表,依据字段名映射,为 name 字段起别名 myName,为 age 字段起别名 myAge
Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
// 将 DataStream 转为表,依据字段名映射,只抉择 name 字段
Table table = tableEnv.fromDataStream(stream, "name");
// 将 DataStream 转为表,依据字段名映射,只抉择 name 字段,并起一个别名 "myName"
Table table = tableEnv.fromDataStream(stream, "name as myName");

查问优化

Old planner

Apache Flink 利用 Apache Calcite 来优化和转换查问。以后执行的优化包含投影和过滤器下推,去相干子查问以及其余类型的查问重写。Old Planner 目前不反对优化 JOIN 的程序,而是依照查问中定义的程序执行它们。

通过提供一个 CalciteConfig 对象,能够调整在不同阶段利用的优化规定集。这可通过调用 CalciteConfig.createBuilder() 办法来进行创立,并通过调用 tableEnv.getConfig.setPlannerConfig(calciteConfig) 办法将该对象传递给 TableEnvironment。

Blink planner

Apache Flink 利用并扩大了 Apache Calcite 来执行简单的查问优化。这包含一系列基于规定和基于老本的优化(cost_based),例如:

  • 基于 Apache Calcite 的去相干子查问
  • 投影裁剪
  • 分区裁剪
  • 过滤器谓词下推
  • 过滤器下推
  • 子打算反复数据删除以防止反复计算
  • 非凡的子查问重写,包含两个局部:

    • 将 IN 和 EXISTS 转换为左半联接(left semi-join)
    • 将 NOT IN 和 NOT EXISTS 转换为 left anti-join
  • 调整 join 的程序,须要启用 table.optimizer.join-reorder-enabled

留神: IN / EXISTS / NOT IN / NOT EXISTS 以后仅在子查问重写的联合条件下受反对。

查问优化器不仅基于打算,而且还能够基于数据源的统计信息以及每个操作的细粒度开销(例如 io,cpu,网络和内存), 从而做出更加理智且正当的优化决策。

高级用户能够通过 CalciteConfig 对象提供自定义优化规定,通过调用 tableEnv.getConfig.setPlannerConfig(calciteConfig),将参数传递给 TableEnvironment。

查看执行打算

SQL 语言反对通过 explain 来查看某条 SQL 的执行打算,Flink Table API 也能够通过调用 explain()办法来查看具体的执行打算。该办法返回一个字符串用来形容三个局部打算,别离为:

  1. 关系查问的形象语法树,即未优化的逻辑查问打算,
  2. 优化的逻辑查问打算
  3. 理论执行打算
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
  .where("LIKE(word,'F%')")
  .unionAll(table2);
// 查看执行打算
String explanation = tEnv.explain(table);
System.out.println(explanation);

执行打算的后果为:

== 形象语法树 ==
LogicalUnion(all=[true])
  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
    FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
  FlinkLogicalDataStreamScan(id=[2], fields=[count, word])

== 优化的逻辑执行打算 ==
DataStreamUnion(all=[true], union all=[count, word])
  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
    DataStreamScan(id=[1], fields=[count, word])
  DataStreamScan(id=[2], fields=[count, word])

== 物理执行打算 ==
Stage 1 : Data Source
    content : collect elements with CollectionInputFormat

Stage 2 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 3 : Operator
        content : from: (count, word)
        ship_strategy : REBALANCE

        Stage 4 : Operator
            content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
            ship_strategy : FORWARD

            Stage 5 : Operator
                content : from: (count, word)
                ship_strategy : REBALANCE

小结

本文次要介绍了 Flink TableAPI &SQL,首先介绍了 Flink Table API &SQL 的基本概念,而后介绍了构建 Flink Table API & SQL 程序所须要的依赖,接着介绍了 Flink 的两种 planner,还介绍了如何注册表以及 DataStream、DataSet 与表的互相转换,最初介绍了 Flink 的两种 planner 对应的查问优化并给出了一个查看执行打算的案例。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

退出移动版