乐趣区

关于java:Fllink实时计算运用四Flink-Table-API-SQL-深入详解

1. Flink Table API 的整体实现流程

次要操作流程如下:

// 创立表的执行环境
val tableEnv = ...     

// 创立一张表,用于读取数据
tableEnv.connect(...).createTemporaryTable("inputTable")

// 注册一张表,用于把计算结果输入
tableEnv.connect(...).createTemporaryTable("outputTable")

// 通过 Table API 查问算子,失去一张后果表
val result = tableEnv.from("inputTable").select(...)

// 通过 SQL 查问语句,失去一张后果表
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM inputTable ...")

// 将后果表写入输出表中
result.insertInto("outputTable")

2. 流解决执行环境的创立配置

  1. 创立表环境

    基于流解决执行环境,调 create 办法间接创立:

    val tableEnv = StreamTableEnvironment.create(env)

表环境(TableEnvironment)是 flink 中集成 Table API & SQL 的外围概念。它次要负责:

  • 注册 catalog
  • 在外部 catalog 中注册表
  • 执行 SQL 查问
  • 注册用户自定义函数
  • 将 DataStream 或 DataSet 转换为表
  • 保留对 ExecutionEnvironment 或 StreamExecutionEnvironment 的援用
  1. 环境配置

    创立 TableEnv 的时候,能够通过一些参数来配置 TableEnvironment 的个性。

    • 老版本的流式查问配置

      EnvironmentSettings settings = EnvironmentSettings.newInstance()
        .useOldPlanner()      // 应用老版本 planner
        .inStreamingMode()    // 流解决模式
        .build()
      StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings)
      
  • 老版本的批处理配置

    EnvironmentSettings settings = EnvironmentSettings.newInstance()
                    .useOldPlanner()      // 应用老版本 planner
                    .inBatchMode()    // 应用老版本的流解决模式
                    .build()
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings)    
  • blink 版本的流解决配置

    EnvironmentSettings settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
    .inStreamingMode().build()
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings)
  • blink 版本的批处理配置

    EnvironmentSettings bbSettings = EnvironmentSettings.newInstance()
            .useBlinkPlanner()
            .inBatchMode().build();
    TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

3. Catalog 的操作应用

1)Catalog 的类型:

  1. GenericInMemoryCatalog: 内置 Catalog。名为 default_catalog,默认数据库名为 default_database。默认,如用 TableEnvironment#registerTable 注册的表,均会注册到这个 Catalog 中。
  2. User-Defined Catalog: 用户自定义 Catalog。如 flink-connector-hive 中的 HiveCatalog。

留神:

GenericInMemoryCatalog 中的元数据对象名辨别大小写。HiveCatalog 以小写存储所有元数据对象名。

默认应用的 Catalog: default_catalog;Database: default_database。

2)Catalog 的用法:

  1. 获取以后应用的 Catalog

    tableEnv.getCurrentCatalog()
  2. 获取以后应用的 Database

    tableEnv.getCurrentDatabase()
  3. 注册自定义 Catalog

    tableEnv.registerCatalog("custom-catalog",new CustomCatalog("customCatalog"))
  4. 列出所有 Catalog

    tableEnv.listCatalogs()
  5. 列出所有 Database

    tableEnv.listDatabases()
  6. 切换 Catalog

    tableEnv.useCatalog("catalogName")
  7. 切换 Database

    tableEnv.useDatabase("databaseName")

4. 文件系统的读取操作实现(csv)

  1. POM 依赖

    <!-- 导入 csv 格局形容器的依赖包 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-csv</artifactId>
        <version>${flink.version}</version>
    </dependency>
  2. 代码实现

    public static void main(String[] args) throws Exception {
        //1. 创立流式程序运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2. 没有指定 EnviromentSettings,默认应用的是老版本的 Planner 的流式查问
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
    
        //3. 指定读取 csv 文件的门路
        String filePath = "./data/user.csv";
    
        //4. 读取 csv 的文件,配置属性类型
        tabEnv.connect(new FileSystem().path(filePath))// 读取指定文件目录的文件数据,该对象肯定是实现了 ConnectorDescriptor 的实现类
        .withFormat(new Csv()) // 定义从内部文件读取数据的格式化办法,须要传入继承自 FormatDescriptor 抽象类的实现类
        .withSchema(new Schema()
                .field("id", DataTypes.INT())
                .field("name", DataTypes.STRING())
        )// 定义表的构造
        .createTemporaryTable("inputTable");
    
        System.out.println(tabEnv.getCurrentCatalog());
        System.out.println(tabEnv.getCurrentDatabase());
        //5. 将 table 表的数据转换成 table 对象
        Table inputTable = tabEnv.from("inputTable");
    
        //6. 打印测试
        tabEnv.toAppendStream(inputTable, TypeInformation.of(new TypeHint<Tuple2<Integer, String>>() {})).print().setParallelism(1);
    
        env.execute();}

5. 音讯队列的读取操作实现(kafka)

  1. POM 依赖

    <!-- 导入 kafka 连接器 jar 包 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_${scala.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    
    <!-- flink json 序列化 jar 包 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>${flink.version}</version>
    </dependency>
  2. 代码实现

    public static void main(String[] args) throws Exception {
        // 创立流式程序运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 没有指定 EnviromentSettings,默认应用的是老版本的 Planner 的流式查问
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
    
        // 接入 kafka 的 connect 生产数据
        tabEnv.connect(new Kafka() // 从 kafka 中读取数据
                .version("universal") // 指定以后环境采纳的 kafka 的版本号:"0.8", "0.9", "0.10", "0.11", and "universal"
                .topic("rate")  // 指定生产的 topic 名字
                .property("zookeeper.connect", "10.10.20.15:2181") // 指定 zookeeper 的集群地址
                .property("bootstrap.servers", "10.10.20.15:9092") // 指定生产 kafka 的集群地址
        ).withFormat(new Csv())
                .withSchema(new Schema()
                        .field("timestamp", DataTypes.BIGINT())
                        .field("type", DataTypes.STRING())
                        .field("rate", DataTypes.INT())
                ).createTemporaryTable("RateInputTable");
    
        Table rateInputTable = tabEnv.from("RateInputTable");
    
        tabEnv.toAppendStream(rateInputTable, Rate.class).print();
    
        env.execute();}
  3. 生产测试

    开启 kafka 生产端:

    bin/kafka-console-producer.sh --broker-list 10.10.20.15:9092 --topic rate

发送数据:

1618388392479, 'REF', 9
1618388392480, 'USD', 4
1618388392580, 'HKD', 9

6. 如何进行条件查问操作

6.1 Table API 的实现形式

Table API 是集成在 Scala 和 Java 语言内的查问 API。与 SQL 不同,Table API 的查问不会用字符串示意,而是在宿主语言中一步一步调用实现的。

Table API 基于代表一张“表”的 Table 类,并提供一整套操作解决的办法 API。这些办法会返回一个新的 Table 对象,这个对象就示意对输出表利用转换操作的后果。有些关系型转换操作,能够由多个办法调用组成,形成链式调用构造。例如 table.select(…).filter(…),其中 select(…)示意抉择表中指定的字段,filter(…) 示意筛选条件。

代码实现:

...
// 基于 TableAPI 进行数据的查问转换操作,所以要求注册的长期表须要读取进去,赋值给一个 Table 对象实例才能够操作
Table resultTable = inputTable.filter("id == 1").select("id,name");

// 应用 TableAPI 对 Table 对象进行聚合计算
Table aggResultTable = inputTable.groupBy("id").select("id,id.count as count");

// 打印测试
tabEnv.toAppendStream(resultTable, TypeInformation.of(new TypeHint<Tuple2<Integer, String>>() {})).print("resultTable>>>").setParallelism(1) ;
tabEnv.toRetractStream(aggResultTable, TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {})).print("aggResultTable>>>").setParallelism(1) ;
...

输入后果:

resultTable>>>> (1,zhangsan)
aggResultTable>>>> (true,(2,1))
aggResultTable>>>> (false,(2,1))
aggResultTable>>>> (true,(2,2))
aggResultTable>>>> (true,(1,1))

true 代表新的数据,false 代表已存在历史数据,而后再次打印“true,(2,2)“进行累积统计。

6.2 SQL 的实现形式

Flink 的 SQL 集成,基于的是 ApacheCalcite,它实现了 SQL 规范。在 Flink 中,用惯例字符串来定义 SQL 查问语句。SQL 查问的后果,是一个新的 Table。

// 应用 SQL 对表的数据进行操作
Table resultTableBySQL = tabEnv.sqlQuery("select id,count(id) as cnt from inputTable group by id");
tabEnv.toRetractStream(resultTableBySQL, TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {})).print("sql result>>>").setParallelism(1) ;

7. 实现数据的输入操作

表的输入,是通过将数据写入 TableSink 来实现的。TableSink 是一个通用接口,能够反对不同的文件格式、存储数据库和音讯队列。

具体实现,输出表最间接的办法,就是通过 Table.executeInsert() 办法将一个 Table 写入注册过的 TableSink 中。

7.1 输入到文件

代码实现:

//1. 创立流式程序运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2. 没有指定 EnviromentSettings,默认应用的是老版本的 Planner 的流式查问
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);

env.setParallelism(1);

//3. 指定读取 csv 文件的门路
String filePath = "./data/user.csv";

//4. 读取 csv 的文件,配置属性类型
tabEnv.connect(new FileSystem().path(filePath))// 读取指定文件目录的文件数据,该对象肯定是实现了 ConnectorDescriptor 的实现类
 .withFormat(new Csv()) // 定义从内部文件读取数据的格式化办法,须要传入继承自 FormatDescriptor 抽象类的实现类
 .withSchema(new Schema()
             .field("id", DataTypes.INT())
             .field("name", DataTypes.STRING())
            )// 定义表的构造
 .createTemporaryTable("inputTable");

//5. 将 table 表的数据转换成 table 对象
Table inputTable = tabEnv.from("inputTable");

Table resultTable = inputTable.select("id,name");

// 定义后果表,将文件数据写入到后果文件中
tabEnv.connect(new FileSystem().path("./data/user.log"))
 .withFormat(new Csv())
 .withSchema(new Schema() // 这个办法肯定要指定
             .field("id", DataTypes.INT())
             .field("name", DataTypes.STRING())
            )
 .createTemporaryTable("outputTable");

resultTable.executeInsert("outputTable");

//6. 打印测试
tabEnv.toAppendStream(inputTable, TypeInformation.of(new TypeHint<Tuple2<Integer, String>>() {})).print().setParallelism(1);

env.execute();

7.2 输入到 KAFKA

解决的数据能够反对输入到 Kafka,联合后面的 Kafka 作为输出数据,创立数据管道,再输入至 Kafka 音讯队列:

String kafkaNode = "10.10.20.15:2181";
String kafkaNodeServer = "10.10.20.15:9092";
// 创立流式程序运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 没有指定 EnviromentSettings,默认应用的是老版本的 Planner 的流式查问
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);

// 接入 kafka 的 connect 生产数据
tabEnv.connect(new Kafka() // 从 kafka 中读取数据
        .version("universal") // 指定以后环境采纳的 kafka 的版本号:"0.8", "0.9", "0.10", "0.11", and "universal"
        .startFromEarliest()
        .topic("rate")  // 指定生产的 topic 名字
        .property("zookeeper.connect", kafkaNode) // 指定 zookeeper 的集群地址
        .property("bootstrap.servers", kafkaNodeServer) // 指定生产 kafka 的集群地址
).withFormat(new Csv())
        .withSchema(new Schema()
                .field("timestamp", DataTypes.BIGINT())
                .field("type", DataTypes.STRING())
                .field("rate", DataTypes.INT())
        ).createTemporaryTable("RateInputTable");

Table rateInputTable = tabEnv.from("RateInputTable");

// 接入 kafka 的 connect 生产数据
tabEnv.connect(new Kafka() // 从 kafka 中读取数据
        .version("universal") // 指定以后环境采纳的 kafka 的版本号:"0.8", "0.9", "0.10", "0.11", and "universal"
        .topic("output_rate")  // 指定生产的 topic 名字
        .property("zookeeper.connect", kafkaNode) // 指定 zookeeper 的集群地址
        .property("bootstrap.servers", kafkaNodeServer) // 指定生产 kafka 的集群地址
).withFormat(new Csv())
        .withSchema(new Schema()
                .field("timestamp", DataTypes.BIGINT())
                .field("type", DataTypes.STRING())
                .field("rate", DataTypes.INT())
        ).createTemporaryTable("RateOutputTable");

// 将 table 数据写入 kafka 音讯队列
rateInputTable.executeInsert("RateOutputTable");

// 打印数据信息
tabEnv.toAppendStream(rateInputTable, StreamOutputKafkaApplication.Rate.class).print();

env.execute();

本文由 mirson 创作分享,如需进一步交换,请加 QQ 群:19310171 或拜访 www.softart.cn

退出移动版