1. Flink Table API的整体实现流程
次要操作流程如下:
// 创立表的执行环境
val tableEnv = ...
// 创立一张表,用于读取数据
tableEnv.connect(...).createTemporaryTable("inputTable")
// 注册一张表,用于把计算结果输入
tableEnv.connect(...).createTemporaryTable("outputTable")
// 通过 Table API 查问算子,失去一张后果表
val result = tableEnv.from("inputTable").select(...)
// 通过 SQL查问语句,失去一张后果表
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...")
// 将后果表写入输出表中
result.insertInto("outputTable")
2. 流解决执行环境的创立配置
-
创立表环境
基于流解决执行环境,调create办法间接创立:
val tableEnv = StreamTableEnvironment.create(env)
表环境(TableEnvironment)是flink中集成Table API & SQL的外围概念。它次要负责:
- 注册catalog
- 在外部 catalog 中注册表
- 执行 SQL 查问
- 注册用户自定义函数
- 将 DataStream 或 DataSet 转换为表
- 保留对 ExecutionEnvironment 或 StreamExecutionEnvironment 的援用
-
环境配置
创立TableEnv的时候,能够通过一些参数来配置 TableEnvironment的个性。
-
老版本的流式查问配置
EnvironmentSettings settings = EnvironmentSettings.newInstance() .useOldPlanner() // 应用老版本planner .inStreamingMode() // 流解决模式 .build() StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings)
-
-
老版本的批处理配置
EnvironmentSettings settings = EnvironmentSettings.newInstance() .useOldPlanner() // 应用老版本planner .inBatchMode() // 应用老版本的流解决模式 .build() StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings)
-
blink版本的流解决配置
EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode().build() StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings)
-
blink版本的批处理配置
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode().build(); TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
3. Catalog的操作应用
1)Catalog的类型:
- GenericInMemoryCatalog: 内置Catalog。名为default_catalog,默认数据库名为default_database。默认,如用TableEnvironment#registerTable注册的表,均会注册到这个Catalog中。
- User-Defined Catalog: 用户自定义Catalog。如flink-connector-hive中的HiveCatalog。
留神:
GenericInMemoryCatalog 中的元数据对象名辨别大小写。HiveCatalog以小写存储所有元数据对象名。
默认应用的Catalog: default_catalog;Database: default_database。
2)Catalog的用法:
-
获取以后应用的Catalog
tableEnv.getCurrentCatalog()
-
获取以后应用的Database
tableEnv.getCurrentDatabase()
-
注册自定义Catalog
tableEnv.registerCatalog("custom-catalog",new CustomCatalog("customCatalog"))
-
列出所有Catalog
tableEnv.listCatalogs()
-
列出所有Database
tableEnv.listDatabases()
-
切换Catalog
tableEnv.useCatalog("catalogName")
-
切换Database
tableEnv.useDatabase("databaseName")
4. 文件系统的读取操作实现(csv)
-
POM依赖
<!-- 导入csv格局形容器的依赖包--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> </dependency>
-
代码实现
public static void main(String[] args) throws Exception { //1. 创立流式程序运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2. 没有指定EnviromentSettings,默认应用的是老版本的Planner的流式查问 StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env); //3. 指定读取csv文件的门路 String filePath = "./data/user.csv"; //4. 读取csv的文件,配置属性类型 tabEnv.connect(new FileSystem().path(filePath))//读取指定文件目录的文件数据,该对象肯定是实现了ConnectorDescriptor的实现类 .withFormat(new Csv()) //定义从内部文件读取数据的格式化办法,须要传入继承自FormatDescriptor抽象类的实现类 .withSchema(new Schema() .field("id", DataTypes.INT()) .field("name", DataTypes.STRING()) )//定义表的构造 .createTemporaryTable("inputTable"); System.out.println(tabEnv.getCurrentCatalog()); System.out.println(tabEnv.getCurrentDatabase()); //5. 将table表的数据转换成table对象 Table inputTable = tabEnv.from("inputTable"); //6. 打印测试 tabEnv.toAppendStream(inputTable, TypeInformation.of(new TypeHint<Tuple2<Integer, String>>() {})).print().setParallelism(1); env.execute(); }
5. 音讯队列的读取操作实现(kafka)
-
POM依赖
<!-- 导入kafka连接器jar包--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- flink json序列化jar包--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency>
-
代码实现
public static void main(String[] args) throws Exception { //创立流式程序运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //没有指定EnviromentSettings,默认应用的是老版本的Planner的流式查问 StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env); //接入kafka的connect生产数据 tabEnv.connect(new Kafka() //从kafka中读取数据 .version("universal") //指定以后环境采纳的kafka的版本号:"0.8", "0.9", "0.10", "0.11", and "universal" .topic("rate") //指定生产的topic名字 .property("zookeeper.connect", "10.10.20.15:2181") //指定zookeeper的集群地址 .property("bootstrap.servers", "10.10.20.15:9092") //指定生产kafka的集群地址 ).withFormat(new Csv()) .withSchema(new Schema() .field("timestamp", DataTypes.BIGINT()) .field("type", DataTypes.STRING()) .field("rate", DataTypes.INT()) ).createTemporaryTable("RateInputTable"); Table rateInputTable = tabEnv.from("RateInputTable"); tabEnv.toAppendStream(rateInputTable, Rate.class).print(); env.execute(); }
-
生产测试
开启kafka生产端:
bin/kafka-console-producer.sh --broker-list 10.10.20.15:9092 --topic rate
发送数据:
1618388392479, 'REF', 9
1618388392480, 'USD', 4
1618388392580, 'HKD', 9
6. 如何进行条件查问操作
6.1 Table API的实现形式
Table API是集成在Scala和Java语言内的查问API。与SQL不同,Table API的查问不会用字符串示意,而是在宿主语言中一步一步调用实现的。
Table API基于代表一张“表”的Table类,并提供一整套操作解决的办法API。这些办法会返回一个新的Table对象,这个对象就示意对输出表利用转换操作的后果。有些关系型转换操作,能够由多个办法调用组成,形成链式调用构造。例如table.select(…).filter(…),其中select(…)示意抉择表中指定的字段,filter(…)示意筛选条件。
代码实现:
...
//基于TableAPI进行数据的查问转换操作,所以要求注册的长期表须要读取进去,赋值给一个Table对象实例才能够操作
Table resultTable = inputTable.filter("id == 1").select("id,name");
//应用TableAPI对Table对象进行聚合计算
Table aggResultTable = inputTable.groupBy("id").select("id,id.count as count");
//打印测试
tabEnv.toAppendStream(resultTable, TypeInformation.of(new TypeHint<Tuple2<Integer, String>>() {})).print("resultTable>>>").setParallelism(1) ;
tabEnv.toRetractStream(aggResultTable, TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {})).print("aggResultTable>>>").setParallelism(1) ;
...
输入后果:
resultTable>>>> (1,zhangsan)
aggResultTable>>>> (true,(2,1))
aggResultTable>>>> (false,(2,1))
aggResultTable>>>> (true,(2,2))
aggResultTable>>>> (true,(1,1))
true代表新的数据, false代表已存在历史数据,而后再次打印“true,(2,2)“ 进行累积统计。
6.2 SQL的实现形式
Flink的SQL集成,基于的是ApacheCalcite,它实现了SQL规范。在Flink中,用惯例字符串来定义SQL查问语句。SQL 查问的后果,是一个新的 Table。
// 应用SQL对表的数据进行操作
Table resultTableBySQL = tabEnv.sqlQuery("select id,count(id) as cnt from inputTable group by id");
tabEnv.toRetractStream(resultTableBySQL, TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {})).print("sql result>>>").setParallelism(1) ;
7. 实现数据的输入操作
表的输入,是通过将数据写入 TableSink 来实现的。TableSink 是一个通用接口,能够反对不同的文件格式、存储数据库和音讯队列。
具体实现,输出表最间接的办法,就是通过 Table.executeInsert() 办法将一个 Table 写入注册过的 TableSink 中。
7.1 输入到文件
代码实现:
//1. 创立流式程序运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2. 没有指定EnviromentSettings,默认应用的是老版本的Planner的流式查问
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
//3. 指定读取csv文件的门路
String filePath = "./data/user.csv";
//4. 读取csv的文件,配置属性类型
tabEnv.connect(new FileSystem().path(filePath))//读取指定文件目录的文件数据,该对象肯定是实现了ConnectorDescriptor的实现类
.withFormat(new Csv()) //定义从内部文件读取数据的格式化办法,须要传入继承自FormatDescriptor抽象类的实现类
.withSchema(new Schema()
.field("id", DataTypes.INT())
.field("name", DataTypes.STRING())
)//定义表的构造
.createTemporaryTable("inputTable");
//5. 将table表的数据转换成table对象
Table inputTable = tabEnv.from("inputTable");
Table resultTable = inputTable.select("id,name");
//定义后果表,将文件数据写入到后果文件中
tabEnv.connect(new FileSystem().path("./data/user.log"))
.withFormat(new Csv())
.withSchema(new Schema() //这个办法肯定要指定
.field("id", DataTypes.INT())
.field("name", DataTypes.STRING())
)
.createTemporaryTable("outputTable");
resultTable.executeInsert("outputTable");
//6. 打印测试
tabEnv.toAppendStream(inputTable, TypeInformation.of(new TypeHint<Tuple2<Integer, String>>() {})).print().setParallelism(1);
env.execute();
7.2 输入到KAFKA
解决的数据能够反对输入到Kafka,联合后面的Kafka作为输出数据, 创立数据管道,再输入至Kafka音讯队列:
String kafkaNode = "10.10.20.15:2181";
String kafkaNodeServer = "10.10.20.15:9092";
//创立流式程序运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//没有指定EnviromentSettings,默认应用的是老版本的Planner的流式查问
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
//接入kafka的connect生产数据
tabEnv.connect(new Kafka() //从kafka中读取数据
.version("universal") //指定以后环境采纳的kafka的版本号:"0.8", "0.9", "0.10", "0.11", and "universal"
.startFromEarliest()
.topic("rate") //指定生产的topic名字
.property("zookeeper.connect", kafkaNode) //指定zookeeper的集群地址
.property("bootstrap.servers", kafkaNodeServer) //指定生产kafka的集群地址
).withFormat(new Csv())
.withSchema(new Schema()
.field("timestamp", DataTypes.BIGINT())
.field("type", DataTypes.STRING())
.field("rate", DataTypes.INT())
).createTemporaryTable("RateInputTable");
Table rateInputTable = tabEnv.from("RateInputTable");
//接入kafka的connect生产数据
tabEnv.connect(new Kafka() //从kafka中读取数据
.version("universal") //指定以后环境采纳的kafka的版本号:"0.8", "0.9", "0.10", "0.11", and "universal"
.topic("output_rate") //指定生产的topic名字
.property("zookeeper.connect", kafkaNode) //指定zookeeper的集群地址
.property("bootstrap.servers", kafkaNodeServer) //指定生产kafka的集群地址
).withFormat(new Csv())
.withSchema(new Schema()
.field("timestamp", DataTypes.BIGINT())
.field("type", DataTypes.STRING())
.field("rate", DataTypes.INT())
).createTemporaryTable("RateOutputTable");
// 将table数据写入kafka音讯队列
rateInputTable.executeInsert("RateOutputTable");
// 打印数据信息
tabEnv.toAppendStream(rateInputTable, StreamOutputKafkaApplication.Rate.class).print();
env.execute();
本文由mirson创作分享,如需进一步交换,请加QQ群:19310171或拜访www.softart.cn
发表回复