序
本文主要研究一下flink的Table API及SQL Programs
实例
// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment
// for batch programs use BatchTableEnvironment instead of StreamTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register a Table
tableEnv.registerTable(“table1”, …) // or
tableEnv.registerTableSource(“table2”, …); // or
tableEnv.registerExternalCatalog(“extCat”, …);
// register an output Table
tableEnv.registerTableSink(“outputTable”, …);
// create a Table from a Table API query
Table tapiResult = tableEnv.scan(“table1”).select(…);
// create a Table from a SQL query
Table sqlResult = tableEnv.sqlQuery(“SELECT … FROM table2 … “);
// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.insertInto(“outputTable”);
// execute
env.execute();
本实例展示了flink的Table API及SQL Programs的基本用法
Table API实例
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register Orders table
// scan registered Orders table
Table orders = tableEnv.scan(“Orders”);
// compute revenue for all customers from France
Table revenue = orders
.filter(“cCountry === ‘FRANCE’”)
.groupBy(“cID, cName”)
.select(“cID, cName, revenue.sum AS revSum”);
// emit or convert Table
// execute query
通过tableEnv.scan方法来创建Table,之后使用Table的各种查询api
SQL实例
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register Orders table
// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
“SELECT cID, cName, SUM(revenue) AS revSum ” +
“FROM Orders ” +
“WHERE cCountry = ‘FRANCE’ ” +
“GROUP BY cID, cName”
);
// emit or convert Table
// execute query
sqlQuery内部是使用Apache Calcite来实现的
TableSink实例
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register “Orders” table
// register “RevenueFrance” output table
// compute revenue for all customers from France and emit to “RevenueFrance”
tableEnv.sqlUpdate(
“INSERT INTO RevenueFrance ” +
“SELECT cID, cName, SUM(revenue) AS revSum ” +
“FROM Orders ” +
“WHERE cCountry = ‘FRANCE’ ” +
“GROUP BY cID, cName”
);
// execute query
这里使用TableSink注册output table之后,就可以使用TableEnvironment的sqlUpdate或者Table的insertInto输出到table
小结
flink的Table API及SQL Programs的基本用法
首先是创建TableEnvironment(BatchTableEnvironment或者StreamTableEnvironment),之后就是创建Table或者TableSource并注册到catalog(默认使用的catalog是internal的,也可以自己选择注册external catalog),然后就进行table的query,之后就是一些转换操作
关于Table的创建可以从DataSet、DataStream转换过来;关于Table的查询可以使用api query(scan方法),也可以使用sql query(sqlQuery方法),或者是混合使用
也可以将查询的Table转换为DataSet或者DataStream进行其他处理;如果输出也是输出到table的话,可以注册TableSink,然后输出到TableSink
doc
Table API & SQL Concepts & Common API
发表回复