聊聊flink的Table API及SQL Programs


本文主要研究一下flink的Table API及SQL Programs
实例
// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// create a TableEnvironment
// for batch programs use BatchTableEnvironment instead of StreamTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register a Table
tableEnv.registerTable(“table1”, …) // or
tableEnv.registerTableSource(“table2”, …); // or
tableEnv.registerExternalCatalog(“extCat”, …);
// register an output Table
tableEnv.registerTableSink(“outputTable”, …);

// create a Table from a Table API query
Table tapiResult = tableEnv.scan(“table1”).select(…);
// create a Table from a SQL query
Table sqlResult = tableEnv.sqlQuery(“SELECT … FROM table2 … “);

// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.insertInto(“outputTable”);

// execute
env.execute();
本实例展示了flink的Table API及SQL Programs的基本用法
Table API实例
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register Orders table

// scan registered Orders table
Table orders = tableEnv.scan(“Orders”);
// compute revenue for all customers from France
Table revenue = orders
.filter(“cCountry === ‘FRANCE’”)
.groupBy(“cID, cName”)
.select(“cID, cName, revenue.sum AS revSum”);

// emit or convert Table
// execute query
通过tableEnv.scan方法来创建Table,之后使用Table的各种查询api
SQL实例
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register Orders table

// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
“SELECT cID, cName, SUM(revenue) AS revSum ” +
“FROM Orders ” +
“WHERE cCountry = ‘FRANCE’ ” +
“GROUP BY cID, cName”
);

// emit or convert Table
// execute query
sqlQuery内部是使用Apache Calcite来实现的
TableSink实例
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register “Orders” table
// register “RevenueFrance” output table

// compute revenue for all customers from France and emit to “RevenueFrance”
tableEnv.sqlUpdate(
“INSERT INTO RevenueFrance ” +
“SELECT cID, cName, SUM(revenue) AS revSum ” +
“FROM Orders ” +
“WHERE cCountry = ‘FRANCE’ ” +
“GROUP BY cID, cName”
);

// execute query
这里使用TableSink注册output table之后,就可以使用TableEnvironment的sqlUpdate或者Table的insertInto输出到table
小结
flink的Table API及SQL Programs的基本用法

首先是创建TableEnvironment(BatchTableEnvironment或者StreamTableEnvironment),之后就是创建Table或者TableSource并注册到catalog(默认使用的catalog是internal的,也可以自己选择注册external catalog),然后就进行table的query,之后就是一些转换操作
关于Table的创建可以从DataSet、DataStream转换过来;关于Table的查询可以使用api query(scan方法),也可以使用sql query(sqlQuery方法),或者是混合使用
也可以将查询的Table转换为DataSet或者DataStream进行其他处理;如果输出也是输出到table的话,可以注册TableSink,然后输出到TableSink

doc
Table API & SQL Concepts & Common API

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理