Apache Flink提供了两种顶层的关系型API,别离为Table API和SQL,Flink通过Table API&SQL实现了批流对立。其中Table API是用于Scala和Java的语言集成查问API,它容许以十分直观的形式组合关系运算符(例如select,where和join)的查问。Flink SQL基于Apache Calcite 实现了规范的SQL,用户能够应用规范的SQL解决数据集。Table API和SQL与Flink的DataStream和DataSet API严密集成在一起,用户能够实现互相转化,比方能够将DataStream或者DataSet注册为table进行操作数据。值得注意的是,Table API and SQL目前尚未齐全欠缺,还在踊跃的开发中,所以并不是所有的算子操作都能够通过其实现。
<!– more –>
依赖
从Flink1.9开始,Flink为Table & SQL API提供了两种planner,别离为Blink planner和old planner,其中old planner是在Flink1.9之前的版本应用。次要区别如下:
尖叫提醒:对于生产环境,目前举荐应用old planner.
flink-table-common
: 通用模块,蕴含 Flink Planner 和 Blink Planner 一些共用的代码flink-table-api-java
: java语言的Table & SQL API,仅针对table(处于晚期的开发阶段,不举荐应用)flink-table-api-scala
: scala语言的Table & SQL API,仅针对table(处于晚期的开发阶段,不举荐应用)flink-table-api-java-bridge
: java语言的Table & SQL API,反对DataStream/DataSet API(举荐应用)flink-table-api-scala-bridge
: scala语言的Table & SQL API,反对DataStream/DataSet API(举荐应用)flink-table-planner
:planner 和runtime. planner为Flink1,9之前的old planner(举荐应用)flink-table-planner-blink
: 新的Blink planner.flink-table-runtime-blink
: 新的Blink runtime.flink-table-uber
: 将上述的API模块及old planner打成一个jar包,形如flink-table-*.jar,位与/lib目录下flink-table-uber-blink
:将上述的API模块及Blink 模块打成一个jar包,形如fflink-table-blink-*.jar,位与/lib目录下
Blink planner & old planner
Blink planner和old planner有许多不同的特点,具体列举如下:
- Blink planner将批处理作业看做是流解决作业的特例。所以,不反对Table 与DataSet之间的转换,批处理的作业也不会被转成DataSet程序,而是被转为DataStream程序。
- Blink planner不反对
BatchTableSource
,应用的是有界的StreamTableSource。 - Blink planner仅反对新的
Catalog
,不反对ExternalCatalog
(已过期)。 - 对于FilterableTableSource的实现,两种Planner是不同的。old planner会谓词下推到
PlannerExpression
(将来会被移除),而Blink planner 会谓词下推到Expression
(示意一个产生计算结果的逻辑树)。 - 仅仅Blink planner反对key-value模式的配置,即通过Configuration进行参数设置。
- 对于PlannerConfig的实现,两种planner有所不同。
- Blink planner 会将多个sink优化成一个DAG(仅反对TableEnvironment,StreamTableEnvironment不反对),old planner总是将每一个sink优化成一个新的DAG,每一个DAG都是互相独立的。
- old planner不反对catalog统计,Blink planner反对catalog统计。
Flink Table & SQL程序的pom依赖
依据应用的语言不同,能够抉择上面的依赖,包含scala版和java版,如下:
<!-- java版 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
<!-- scala版 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
除此之外,如果须要在本地的IDE中运行Table API & SQL的程序,则须要增加上面的pom依赖:
<!-- Flink 1.9之前的old planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
<!-- 新的Blink planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
另外,如果须要实现自定义的格局(比方和kafka交互)或者用户自定义函数,须要增加如下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
Table API & SQL的编程模板
所有的Table API&SQL的程序(无论是批处理还是流解决)都有着雷同的模式,上面将给出通用的编程构造模式:
// 创立一个TableEnvironment对象,指定planner、解决模式(batch、streaming)
TableEnvironment tableEnv = ...;
// 创立一个表
tableEnv.connect(...).createTemporaryTable("table1");
// 注册一个内部的表
tableEnv.connect(...).createTemporaryTable("outputTable");
// 通过Table API的查问创立一个Table 对象
Table tapiResult = tableEnv.from("table1").select(...);
// 通过SQL查问的查问创立一个Table 对象
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
// 将后果写入TableSink
tapiResult.insertInto("outputTable");
// 执行
tableEnv.execute("java_job");
留神:Table API & SQL的查问能够互相集成,另外还能够在DataStream或者DataSet中应用Table API & SQL的API,实现DataStreams、 DataSet与Table之间的互相转换。
创立TableEnvironment
TableEnvironment是Table API & SQL程序的一个入口,次要包含如下的性能:
- 在外部的catalog中注册Table
- 注册catalog
- 加载可插拔模块
- 执行SQL查问
- 注册用户定义函数
DataStream
、DataSet
与Table之间的互相转换- 持有对
ExecutionEnvironment
、StreamExecutionEnvironment
的援用
一个Table必然属于一个具体的TableEnvironment,不能够将不同TableEnvironment的表放在一起应用(比方join,union等操作)。
TableEnvironment是通过调用 BatchTableEnvironment.create()
或者StreamTableEnvironment.create()的静态方法进行创立的。另外,默认两个planner的jar包都存在与classpath下,所有须要明确指定应用的planner。
// **********************
// FLINK 流解决查问
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
//或者TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
// ******************
// FLINK 批处理查问
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
// **********************
// BLINK 流解决查问
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// 或者 TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
// ******************
// BLINK 批处理查问
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
在catalog中创立表
长期表与永恒表
表能够分为长期表和永恒表两种,其中永恒表须要一个catalog(比方Hive的Metastore)俩保护表的元数据信息,一旦永恒表被创立,只有连贯到该catalog就能够拜访该表,只有显示删除永恒表,该表才能够被删除。长期表的生命周期是Flink Session,这些表不可能被其余的Flink Session拜访,这些表不属于任何的catalog或者数据库,如果与长期表绝对应的数据库被删除了,该长期表也不会被删除。
创立表
虚表(Virtual Tables)
一个Table对象相当于SQL中的视图(虚表),它封装了一个逻辑执行打算,能够通过一个catalog创立,具体如下:
// 获取一个TableEnvironment
TableEnvironment tableEnv = ...;
// table对象,查问的后果集
Table projTable = tableEnv.from("X").select(...);
// 注册一个表,名称为 "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);
内部数据源表(Connector Tables)
能够把内部的数据源注册成表,比方能够读取MySQL数据库数据、Kafka数据等
tableEnvironment
.connect(...)
.withFormat(...)
.withSchema(...)
.inAppendMode()
.createTemporaryTable("MyTable")
扩大创立表的标识属性
表的注册总是蕴含三局部标识属性:catalog、数据库、表名。用户能够在外部设置一个catalog和一个数据库作为以后的catalog和数据库,所以对于catalog和数据库这两个标识属性是可选的,即如果不指定,默认应用的是“current catalog”和 “current database”。
TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");//设置catalog
tEnv.useDatabase("custom_database");//设置数据库
Table table = ...;
// 注册一个名为exampleView的视图,catalog名为custom_catalog
// 数据库的名为custom_database
tableEnv.createTemporaryView("exampleView", table);
// 注册一个名为exampleView的视图,catalog的名为custom_catalog
// 数据库的名为other_database
tableEnv.createTemporaryView("other_database.exampleView", table);
// 注册一个名为'View'的视图,catalog的名称为custom_catalog
// 数据库的名为custom_database,'View'是保留关键字,须要应用``(反引号)
tableEnv.createTemporaryView("`View`", table);
// 注册一个名为example.View的视图,catalog的名为custom_catalog,
// 数据库名为custom_database
tableEnv.createTemporaryView("`example.View`", table);
// 注册一个名为'exampleView'的视图, catalog的名为'other_catalog'
// 数据库名为other_database'
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);
查问表
Table API
Table API是一个集成Scala与Java语言的查问API,与SQL相比,它的查问不是一个规范的SQL语句,而是由一步一步的操作组成的。如下展现了一个应用Table API实现一个简略的聚合查问。
// 获取TableEnvironment
TableEnvironment tableEnv = ...;
//注册Orders表
// 查问注册的表
Table orders = tableEnv.from("Orders");
// 计算操作
Table revenue = orders
.filter("cCountry === 'FRANCE'")
.groupBy("cID, cName")
.select("cID, cName, revenue.sum AS revSum");
SQL
Flink SQL依赖于Apache Calcite,其实现了规范的SQL语法,如下案例:
// 获取TableEnvironment
TableEnvironment tableEnv = ...;
//注册Orders表
// 计算逻辑同下面的Table API
Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// 注册"RevenueFrance"内部输出表
// 计算结果插入"RevenueFrance"表
tableEnv.sqlUpdate(
"INSERT INTO RevenueFrance " +
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
输出表
一个表通过将其写入到TableSink,而后进行输入。TableSink是一个通用的反对多种文件格式(CSV、Parquet, Avro)和多种内部存储系统(JDBC, Apache HBase, Apache Cassandra, Elasticsearch)以及多种音讯对列(Apache Kafka, RabbitMQ)的接口。
批处理的表只能被写入到 BatchTableSink
,流解决的表须要指明AppendStreamTableSink、RetractStreamTableSink或者 UpsertStreamTableSink
// 获取TableEnvironment
TableEnvironment tableEnv = ...;
// 创立输出表
final Schema schema = new Schema()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.LONG());
tableEnv.connect(new FileSystem("/path/to/file"))
.withFormat(new Csv().fieldDelimiter('|').deriveSchema())
.withSchema(schema)
.createTemporaryTable("CsvSinkTable");
// 计算结果表
Table result = ...
// 输入后果表到注册的TableSink
result.insertInto("CsvSinkTable");
Table API & SQL底层的转换与执行
上文提到了Flink提供了两种planner,别离为old planner和Blink planner,对于不同的planner而言,Table API & SQL底层的执行与转换是有所不同的。
Old planner
依据是流解决作业还是批处理作业,Table API &SQL会被转换成DataStream或者DataSet程序。一个查问在外部示意为一个逻辑查问打算,会被转换为两个阶段:
- 1.逻辑查问打算优化
- 2.转换成DataStream或者DataSet程序
下面的两个阶段只有上面的操作被执行时才会被执行:
- 当一个表被输入到TableSink时,比方调用了Table.insertInto()办法
- 当执行更新查问时,比方调用TableEnvironment.sqlUpdate()办法
- 当一个表被转换为DataStream或者DataSet时
一旦执行上述两个阶段,Table API & SQL的操作会被看做是一般的DataStream或者DataSet程序,所以当StreamExecutionEnvironment.execute()
或者ExecutionEnvironment.execute()
被调用时,会执行转换后的程序。
Blink planner
无论是批处理作业还是流解决作业,如果应用的是Blink planner,底层都会被转换为DataStream程序。在一个查问在外部示意为一个逻辑查问打算,会被转换成两个阶段:
- 1.逻辑查问打算优化
- 2.转换成DataStream程序
对于TableEnvironment
and StreamTableEnvironment
而言,一个查问的转换是不同的
首先对于TableEnvironment,当TableEnvironment.execute()办法执行时,Table API & SQL的查问才会被转换,因为TableEnvironment会将多个sink优化为一个DAG。
对于StreamTableEnvironment,转换产生的工夫与old planner雷同。
与DataStream & DataSet API集成
对于Old planner与Blink planner而言,只有是流解决的操作,都能够与DataStream API集成,仅仅只有Old planner才能够与DataSet API集成,因为Blink planner的批处理作业会被转换成DataStream程序,所以不可能与DataSet API集成。值得注意的是,上面提到的table与DataSet之间的转换仅实用于Old planner。
Table API & SQL的查问很容易与DataStream或者DataSet程序集成,并能够将Table API & SQL的查问嵌入DataStream或者DataSet程序中。DataStream或者DataSet能够转换成表,反之,表也能够被转换成DataStream或者DataSet。
从DataStream或者DataSet中注册长期表(视图)
尖叫提醒:只能将DataStream或者DataSet转换为长期表(视图)
上面演示DataStream的转换,对于DataSet的转换相似。
// 获取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
DataStream<Tuple2<Long, String>> stream = ...
// 将DataStream注册为一个名为myTable的视图,其中字段别离为"f0", "f1"
tableEnv.createTemporaryView("myTable", stream);
// 将DataStream注册为一个名为myTable2的视图,其中字段别离为"myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, "myLong, myString");
将DataStream或者DataSet转化为Table对象
能够间接将DataStream或者DataSet转换为Table对象,之后能够应用Table API进行查问操作。上面演示DataStream的转换,对于DataSet的转换相似。
// 获取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
DataStream<Tuple2<Long, String>> stream = ...
// 将DataStream转换为Table对象,默认的字段为"f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);
// 将DataStream转换为Table对象,默认的字段为"myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
将表转换为DataStream或者DataSet
当将Table转为DataStream或者DataSet时,须要指定DataStream或者DataSet的数据类型。通常最不便的数据类型是row类型,Flink提供了很多的数据类型供用户抉择,具体包含Row、POJO、样例类、Tuple和原子类型。
将表转换为DataStream
一个流解决查问的后果是动态变化的,所以将表转为DataStream时须要指定一个更新模式,共有两种模式:Append Mode和Retract Mode。
- Append Mode
如果动静表仅只有Insert操作,即之前输入的后果不会被更新,则应用该模式。如果更新或删除操作应用追加模式会失败报错
- Retract Mode
始终能够应用此模式。返回值是boolean类型。它用true或false来标记数据的插入和撤回,返回true代表数据插入,false代表数据的撤回。
// 获取StreamTableEnvironment.
StreamTableEnvironment tableEnv = ...;
// 蕴含两个字段的表(String name, Integer age)
Table table = ...
// 将表转为DataStream,应用Append Mode追加模式,数据类型为Row
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// 将表转为DataStream,应用Append Mode追加模式,数据类型为定义好的TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple =
tableEnv.toAppendStream(table, tupleType);
// 将表转为DataStream,应用的模式为Retract Mode撤回模式,类型为Row
// 对于转换后的DataStream<Tuple2<Boolean, X>>,X示意流的数据类型,
// boolean值示意数据扭转的类型,其中INSERT返回true,DELETE返回的是false
DataStream<Tuple2<Boolean, Row>> retractStream =
tableEnv.toRetractStream(table, Row.class);
将表转换为DataSet
// 获取BatchTableEnvironment
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
// 蕴含两个字段的表(String name, Integer age)
Table table = ...
// 将表转为DataSet数据类型为Row
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
// 将表转为DataSet,通过TypeInformation定义Tuple2<String, Integer>数据类型
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataSet<Tuple2<String, Integer>> dsTuple =
tableEnv.toDataSet(table, tupleType);
表的Schema与数据类型之间的映射
表的Schema与数据类型之间的映射有两种形式:别离是基于字段下标地位的映射和基于字段名称的映射。
基于字段下标地位的映射
该形式是依照字段的程序进行一一映射,应用形式如下:
// 获取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
DataStream<Tuple2<Long, Integer>> stream = ...
// 将DataStream转为表,默认的字段名为"f0"和"f1"
Table table = tableEnv.fromDataStream(stream);
// 将DataStream转为表,选取tuple的第一个元素,指定一个名为"myLong"的字段名
Table table = tableEnv.fromDataStream(stream, "myLong");
// 将DataStream转为表,为tuple的第一个元素指定名为"myLong",为第二个元素指定myInt的字段名
Table table = tableEnv.fromDataStream(stream, "myLong, myInt");
基于字段名称的映射
基于字段名称的映射形式反对任意的数据类型包含POJO类型,能够很灵便地定义表Schema映射,所有的字段被映射成一个具体的字段名称,同时也能够应用”as”为字段起一个别名。其中Tuple元素的第一个元素为f0,第二个元素为f1,以此类推。
// 获取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
DataStream<Tuple2<Long, Integer>> stream = ...
// 将DataStream转为表,默认的字段名为"f0"和"f1"
Table table = tableEnv.fromDataStream(stream);
// 将DataStream转为表,抉择tuple的第二个元素,指定一个名为"f1"的字段名
Table table = tableEnv.fromDataStream(stream, "f1");
// 将DataStream转为表,替换字段的程序
Table table = tableEnv.fromDataStream(stream, "f1, f0");
// 将DataStream转为表,替换字段的程序,并为f1起别名为"myInt",为f0起别名为"myLong
Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong");
原子类型
Flink将Integer
, Double
, String
或者一般的类型称之为原子类型,一个数据类型为原子类型的DataStream或者DataSet能够被转成单个字段属性的表,这个字段的类型与DataStream或者DataSet的数据类型统一,这个字段的名称能够进行指定。
//获取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
// 数据类型为原子类型Long
DataStream<Long> stream = ...
// 将DataStream转为表,默认的字段名为"f0"
Table table = tableEnv.fromDataStream(stream);
// 将DataStream转为表,指定字段名为myLong"
Table table = tableEnv.fromDataStream(stream, "myLong");
Tuple类型
Tuple类型的DataStream或者DataSet都能够转为表,能够从新设定表的字段名(即依据tuple元素的地位进行一一映射,转为表之后,每个元素都有一个别名),如果不为字段指定名称,则应用默认的名称(java语言默认的是f0,f1,scala默认的是_1),用户也能够重新排列字段的程序,并为每个字段起一个别名。
// 获取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
//Tuple2<Long, String>类型的DataStream
DataStream<Tuple2<Long, String>> stream = ...
// 将DataStream转为表,默认的字段名为 "f0", "f1"
Table table = tableEnv.fromDataStream(stream);
// 将DataStream转为表,指定字段名为 "myLong", "myString"(依照Tuple元素的程序地位)
Table table = tableEnv.fromDataStream(stream, "myLong, myString");
// 将DataStream转为表,指定字段名为 "f0", "f1",并且替换程序
Table table = tableEnv.fromDataStream(stream, "f1, f0");
// 将DataStream转为表,只抉择Tuple的第二个元素,指定字段名为"f1"
Table table = tableEnv.fromDataStream(stream, "f1");
// 将DataStream转为表,为Tuple的第二个元素指定别名为myString,为第一个元素指定字段名为myLong
Table table = tableEnv.fromDataStream(stream, "f1 as 'myString', f0 as 'myLong'");
POJO类型
当将POJO类型的DataStream或者DataSet转为表时,如果不指定表名,则默认应用的是POJO字段自身的名称,原始字段名称的映射须要指定原始字段的名称,能够为其起一个别名,也能够调换字段的程序,也能够只抉择局部的字段。
// 获取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
//数据类型为Person的POJO类型,字段包含"name"和"age"
DataStream<Person> stream = ...
// 将DataStream转为表,默认的字段名称为"age", "name"
Table table = tableEnv.fromDataStream(stream);
// 将DataStream转为表,为"age"字段指定别名myAge, 为"name"字段指定别名myName
Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName");
// 将DataStream转为表,只抉择一个name字段
Table table = tableEnv.fromDataStream(stream, "name");
// 将DataStream转为表,只抉择一个name字段,并起一个别名myName
Table table = tableEnv.fromDataStream(stream, "name as myName");
Row类型
Row类型的DataStream或者DataSet转为表的过程中,能够依据字段的地位或者字段名称进行映射,同时也能够为字段起一个别名,或者只抉择局部字段。
// 获取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...;
// Row类型的DataStream,通过RowTypeInfo指定两个字段"name"和"age"
DataStream<Row> stream = ...
// 将DataStream转为表,默认的字段名为原始字段名"name"和"age"
Table table = tableEnv.fromDataStream(stream);
// 将DataStream转为表,依据地位映射,为第一个字段指定myName别名,为第二个字段指定myAge别名
Table table = tableEnv.fromDataStream(stream, "myName, myAge");
// 将DataStream转为表,依据字段名映射,为name字段起别名myName,为age字段起别名myAge
Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
// 将DataStream转为表,依据字段名映射,只抉择name字段
Table table = tableEnv.fromDataStream(stream, "name");
// 将DataStream转为表,依据字段名映射,只抉择name字段,并起一个别名"myName"
Table table = tableEnv.fromDataStream(stream, "name as myName");
查问优化
Old planner
Apache Flink利用Apache Calcite来优化和转换查问。以后执行的优化包含投影和过滤器下推,去相干子查问以及其余类型的查问重写。Old Planner目前不反对优化JOIN的程序,而是依照查问中定义的程序执行它们。
通过提供一个CalciteConfig
对象,能够调整在不同阶段利用的优化规定集。这可通过调用CalciteConfig.createBuilder()
办法来进行创立,并通过调用tableEnv.getConfig.setPlannerConfig(calciteConfig)
办法将该对象传递给TableEnvironment。
Blink planner
Apache Flink利用并扩大了Apache Calcite来执行简单的查问优化。这包含一系列基于规定和基于老本的优化(cost_based),例如:
- 基于Apache Calcite的去相干子查问
- 投影裁剪
- 分区裁剪
- 过滤器谓词下推
- 过滤器下推
- 子打算反复数据删除以防止反复计算
-
非凡的子查问重写,包含两个局部:
- 将IN和EXISTS转换为左半联接( left semi-join)
- 将NOT IN和NOT EXISTS转换为left anti-join
- 调整join的程序,须要启用
table.optimizer.join-reorder-enabled
留神: IN / EXISTS / NOT IN / NOT EXISTS以后仅在子查问重写的联合条件下受反对。
查问优化器不仅基于打算,而且还能够基于数据源的统计信息以及每个操作的细粒度开销(例如io,cpu,网络和内存),从而做出更加理智且正当的优化决策。
高级用户能够通过CalciteConfig
对象提供自定义优化规定,通过调用tableEnv.getConfig.setPlannerConfig(calciteConfig),将参数传递给TableEnvironment。
查看执行打算
SQL语言反对通过explain来查看某条SQL的执行打算,Flink Table API也能够通过调用explain()办法来查看具体的执行打算。该办法返回一个字符串用来形容三个局部打算,别离为:
- 关系查问的形象语法树,即未优化的逻辑查问打算,
- 优化的逻辑查问打算
- 理论执行打算
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
.where("LIKE(word, 'F%')")
.unionAll(table2);
// 查看执行打算
String explanation = tEnv.explain(table);
System.out.println(explanation);
执行打算的后果为:
== 形象语法树 ==
LogicalUnion(all=[true])
LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
FlinkLogicalDataStreamScan(id=[2], fields=[count, word])
== 优化的逻辑执行打算 ==
DataStreamUnion(all=[true], union all=[count, word])
DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
DataStreamScan(id=[1], fields=[count, word])
DataStreamScan(id=[2], fields=[count, word])
== 物理执行打算 ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Data Source
content : collect elements with CollectionInputFormat
Stage 3 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
Stage 4 : Operator
content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
ship_strategy : FORWARD
Stage 5 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
小结
本文次要介绍了Flink TableAPI &SQL,首先介绍了Flink Table API &SQL的基本概念 ,而后介绍了构建Flink Table API & SQL程序所须要的依赖,接着介绍了Flink的两种planner,还介绍了如何注册表以及DataStream、DataSet与表的互相转换,最初介绍了Flink的两种planner对应的查问优化并给出了一个查看执行打算的案例。
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包
发表回复