序本文主要研究一下flink的Table API及SQL Programs实例// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// create a TableEnvironment// for batch programs use BatchTableEnvironment instead of StreamTableEnvironmentStreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);// register a TabletableEnv.registerTable(“table1”, …) // ortableEnv.registerTableSource(“table2”, …); // ortableEnv.registerExternalCatalog(“extCat”, …);// register an output TabletableEnv.registerTableSink(“outputTable”, …);// create a Table from a Table API queryTable tapiResult = tableEnv.scan(“table1”).select(…);// create a Table from a SQL queryTable sqlResult = tableEnv.sqlQuery(“SELECT … FROM table2 … “);// emit a Table API result Table to a TableSink, same for SQL resulttapiResult.insertInto(“outputTable”);// executeenv.execute();本实例展示了flink的Table API及SQL Programs的基本用法Table API实例// get a StreamTableEnvironment, works for BatchTableEnvironment equivalentlyStreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);// register Orders table// scan registered Orders tableTable orders = tableEnv.scan(“Orders”);// compute revenue for all customers from FranceTable revenue = orders .filter(“cCountry === ‘FRANCE’”) .groupBy(“cID, cName”) .select(“cID, cName, revenue.sum AS revSum”);// emit or convert Table// execute query通过tableEnv.scan方法来创建Table,之后使用Table的各种查询apiSQL实例// get a StreamTableEnvironment, works for BatchTableEnvironment equivalentlyStreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);// register Orders table// compute revenue for all customers from FranceTable revenue = tableEnv.sqlQuery( “SELECT cID, cName, SUM(revenue) AS revSum " + “FROM Orders " + “WHERE cCountry = ‘FRANCE’ " + “GROUP BY cID, cName” );// emit or convert Table// execute querysqlQuery内部是使用Apache Calcite来实现的TableSink实例// get a StreamTableEnvironment, works for BatchTableEnvironment equivalentlyStreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);// register “Orders” table// register “RevenueFrance” output table// compute revenue for all customers from France and emit to “RevenueFrance"tableEnv.sqlUpdate( “INSERT INTO RevenueFrance " + “SELECT cID, cName, SUM(revenue) AS revSum " + “FROM Orders " + “WHERE cCountry = ‘FRANCE’ " + “GROUP BY cID, cName” );// execute query这里使用TableSink注册output table之后,就可以使用TableEnvironment的sqlUpdate或者Table的insertInto输出到table小结flink的Table API及SQL Programs的基本用法首先是创建TableEnvironment(BatchTableEnvironment或者StreamTableEnvironment),之后就是创建Table或者TableSource并注册到catalog(默认使用的catalog是internal的,也可以自己选择注册external catalog),然后就进行table的query,之后就是一些转换操作关于Table的创建可以从DataSet、DataStream转换过来;关于Table的查询可以使用api query(scan方法),也可以使用sql query(sqlQuery方法),或者是混合使用也可以将查询的Table转换为DataSet或者DataStream进行其他处理;如果输出也是输出到table的话,可以注册TableSink,然后输出到TableSinkdocTable API & SQL Concepts & Common API