乐趣区

关于java:Fllink实时计算运用五Flink-Table-API-SQL-案例实战

1. Table API & SQL 实战使用

  1. 案例阐明

    • 性能阐明

      通过 socket 读取数据源,进行单词的统计解决。

    • 实现流程

      • 初始化 Table 运行环境
      • 转换操作解决:

        1)以空格进行宰割

        2)给每个单词计数累加 1

        3)依据单词进行分组解决

        4)求和统计

        5)输入打印数据

      • 执行工作
  2. FlinkTable API 形式实现

    StreamTableApiApplication,代码实现:

    // 获取流解决的运行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
    
    // 获取 Table 的运行环境
    StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings);
    
    // 接入数据源
    DataStreamSource<String> lines = env.socketTextStream("10.10.20.15", 9922);
    
    // 对字符串进行分词压平
    SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String line, Collector<String> out) throws Exception {Arrays.stream(line.split(" ")).forEach(out::collect);
        }
    });
    
    // 将 DataStream 转换成 Table 对象,字段名默认的是 f0,给定字段名是 word
    Table table = tabEnv.fromDataStream(words, "word");
    
    // 依照单词进行分组聚合操作
    Table resultTable = table.groupBy("word").select("word, sum(1L) as counts");
    
    // 在流解决中,数据会源源不断的产生,须要累加解决,只能采纳用 toRestractStream
    //        DataStream<WordCount> wordCountDataStream = tabEnv.toAppendStream(resultTable, WordCount.class);
    //        wordCountDataStream.printToErr("toAppendStream>>>");
    
    DataStream<Tuple2<Boolean, WordCount>> wordCountDataStream = tabEnv.toRetractStream(resultTable, WordCount.class);
    wordCountDataStream.printToErr("toRetractStream>>>");
    
    env.execute();

测试验证:

开启 socket 输出,输出字符串:

[root@flink1 flink-1.11.2]# nc -lk 9922
  1. FlinkTable SQL 形式实现

    代码实现:

    StreamTableSqlApplication 实现类:

    // 获取流解决的运行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
    
    // 获取 Table 的运行环境
    StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings);
    
    // 接入数据源
    DataStreamSource<String> lines = env.socketTextStream("10.10.20.15", 9922);
    
    // 对字符串进行分词压平
    SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String line, Collector<String> out) throws Exception {Arrays.stream(line.split(" ")).forEach(out::collect);
        }
    });
    
    // 将 DataStream 转换成 Table 对象,字段名默认的是 f0,给定字段名是 word
    tabEnv.registerDataStream("t_wordcount", words, "word");
    
    // 依照单词进行分组聚合操作
    Table resultTable = tabEnv.sqlQuery("select word,count(1) as counts from t_wordcount group by word");
    
    DataStream<Tuple2<Boolean, WordCount>> wordCountDataStream = tabEnv.toRetractStream(resultTable, WordCount.class);
    wordCountDataStream.printToErr("toRetractStream>>>");
    env.execute();

2. Flink SQL 滚动窗口实战

  1. Flink SQL 窗口阐明

    Flink SQL 反对的窗口聚合次要是两种:Window 聚合和 Over 聚合。这里次要介绍 Window 聚合。Window 聚合反对两种工夫属性定义窗口:Event Time 和 Processing Time。每种工夫属性类型反对三种窗口类型: 滚动窗口(TUMBLE)、滑动窗口(HOP)和会话窗口(SESSION)

  2. 案例阐明

    统计在过来的 1 分钟内有多少用户点击了某个的网页,能够通过定义一个窗口来收集最近 1 分钟内的数据,并对这个窗口内的数据进行计算。

    测试数据:

| 用户名 | 拜访地址 | 拜访工夫 |
| —— | ——————— | ——————– |
| 张三 | http://taobao.com/xxx | 2021-05-10 10:00:00 |
| 张三 | http://taobao.com/xxx | 2021-05-10 10:00:10 |
| 张三 | http://taobao.com/xxx | 2021-05-10 10:00:49 |
| 张三 | http://taobao.com/xxx | 2021-05-10 10:01:05 |
| 张三 | http://taobao.com/xxx | 2021-05-10 10:01:58 |
| 李四 | http://taobao.com/xxx | 2021-05-10 10:02:10 |

  1. 滚动窗口使用

    滚动窗口(Tumbling windows)要用 Tumble 类来定义,另外还有三个办法:

    • over:定义窗口长度
    • on:用来分组(按工夫距离)或者排序(按行数)的工夫字段
    • as:别名,必须呈现在前面的 groupBy 中

    实现步骤:

    • 初始化流运行环境
    • 在流模式下应用 blink planner
    • 创立用户点击事件数据
    • 将源数据写入临时文件并获取绝对路径
    • 创立表载入用户点击事件数据
    • 对表运行 SQL 查问,并将后果作为新表检索
    • Table 转换成 DataStream
    • 执行工作

    TumbleUserClickApplication,实现代码:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings);
    
    // 将源数据写入临时文件并获取绝对路径
    String contents =
            "张三,http://taobao.com/xxx,2021-05-10 10:00:00\n" +
                    "张三,http://taobao.com/xxx,2021-05-10 10:00:10\n" +
                    "张三,http://taobao.com/xxx,2021-05-10 10:00:49\n" +
                    "张三,http://taobao.com/xxx,2021-05-10 10:01:05\n" +
                    "张三,http://taobao.com/xxx,2021-05-10 10:01:58\n" +
                    "张三,http://taobao.com/xxx,2021-05-10 10:02:10\n";
    String path = FileUtil.createTempFile(contents);
    
    String ddl = "CREATE TABLE user_clicks (\n" +
            "username varchar,\n" +
            "click_url varchar,\n" +
            "ts TIMESTAMP(3),\n" +
            "WATERMARK FOR ts AS ts - INTERVAL'2'SECOND\n" +
            ") WITH (\n" +
            "'connector.type' = 'filesystem',\n"+"  'connector.path' = '"+ path +"',\n"+"  'format.type' = 'csv'\n"+")";
    
    tabEnv.sqlUpdate(ddl);
    
    // 对表数据进行 sql 查问,并将后果作为新表进行查问
    String query = "SELECT\n" +
            "TUMBLE_START(ts, INTERVAL'1'MINUTE),\n" +
            "TUMBLE_END(ts, INTERVAL'1'MINUTE),\n" +
            "username,\n" +
            "COUNT(click_url)\n" +
            "FROM user_clicks\n" +
            "GROUP BY TUMBLE(ts, INTERVAL'1'MINUTE), username";
    
    Table result = tabEnv.sqlQuery(query);
    
    tabEnv.toAppendStream(result, Row.class).print();
    
    env.execute();

以 1 分钟作为工夫滚动窗口,水印提早 2 秒。

输入后果:

4> 2021-10-10T10:00,2021-10-10T10:01, 张三,3
4> 2021-10-10T10:01,2021-10-10T10:02, 张三,2
4> 2021-10-10T10:02,2021-10-10T10:03, 张三,1

3. Flink SQL 滑动窗口实战

  1. 实现步骤

    • 初始化流运行环境
    • 在流模式下应用 blink planner
    • 创立用户点击事件数据
    • 将源数据写入临时文件并获取绝对路径
    • 创立表载入用户点击事件数据
    • 对表运行 SQL 查问,并将后果作为新表检索
    • Table 转换成 DataStream
    • 执行工作
  2. 实现代码

    代码 HopUserClickApplication:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings);
    
    // 将源数据写入临时文件并获取绝对路径
    String contents =
            "张三,http://taobao.com/xxx,2020-10-10 10:00:00\n" +
                    "张三,http://taobao.com/xxx,2020-10-10 10:00:10\n" +
                    "张三,http://taobao.com/xxx,2020-10-10 10:00:49\n" +
                    "张三,http://taobao.com/xxx,2020-10-10 10:01:05\n" +
                    "张三,http://taobao.com/xxx,2020-10-10 10:01:58\n" +
                    "张三,http://taobao.com/xxx,2020-10-10 10:02:10\n";
    String path = FileUtil.createTempFile(contents);
    
    String ddl = "CREATE TABLE user_clicks (\n" +
            "username varchar,\n" +
            "click_url varchar,\n" +
            "ts TIMESTAMP(3),\n" +
            "WATERMARK FOR ts AS ts - INTERVAL'2'SECOND\n" +
            ") WITH (\n" +
            "'connector.type' = 'filesystem',\n"+"  'connector.path' = '"+ path +"',\n"+"  'format.type' = 'csv'\n"+")";
    
    tabEnv.sqlUpdate(ddl);
    
    // 对表数据进行 sql 查问,并将后果作为新表进行查问,每隔 30 秒,统计一次过来 1 分钟的数据
    String query = "SELECT\n" +
            "HOP_START(ts, INTERVAL'30'SECOND, INTERVAL'1'MINUTE),\n" +
            "HOP_END(ts, INTERVAL'30'SECOND, INTERVAL'1'MINUTE),\n" +
            "username,\n" +
            "COUNT(click_url)\n" +
            "FROM user_clicks\n" +
            "GROUP BY HOP (ts, INTERVAL'30'SECOND, INTERVAL'1'MINUTE), username";
    
    Table result = tabEnv.sqlQuery(query);
    
    tabEnv.toAppendStream(result, Row.class).print();
    
    env.execute();

每隔 30 秒,统计一次过来 1 分钟的用户点击数量。

输入后果:

4> 2021-05-10T09:59:30,2021-05-10T10:00:30, 张三,2
4> 2021-05-10T10:00,2021-05-10T10:01, 张三,3
4> 2021-05-10T10:00:30,2021-05-10T10:01:30, 张三,2
4> 2021-05-10T10:01,2021-05-10T10:02, 张三,2
4> 2021-05-10T10:01:30,2021-05-10T10:02:30, 张三,2
4> 2021-05-10T10:02,2021-05-10T10:03, 张三,1

4. Flink SQL 会话窗口实战

  1. 实现步骤

    • 初始化流运行环境
    • 在流模式下应用 blink planner
    • 创立用户点击事件数据
    • 将源数据写入临时文件并获取绝对路径
    • 创立表载入用户点击事件数据
    • 对表运行 SQL 查问,并将后果作为新表检索
    • Table 转换成 DataStream
    • 执行工作
  2. 代码实现:

    代码:SessionUserClickApplication

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings);
    
    // 将源数据写入临时文件并获取绝对路径
    String contents =
            "张三,http://taobao.com/xxx,2021-05-10 10:00:00\n" +
                    "张三,http://taobao.com/xxx,2021-05-10 10:00:10\n" +
                    "张三,http://taobao.com/xxx,2021-05-10 10:00:49\n" +
                    "张三,http://taobao.com/xxx,2021-05-10 10:01:05\n" +
                    "张三,http://taobao.com/xxx,2021-05-10 10:01:58\n" +
                    "张三,http://taobao.com/xxx,2021-05-10 10:02:10\n";
    String path = FileUtil.createTempFile(contents);
    
    String ddl = "CREATE TABLE user_clicks (\n" +
            "username varchar,\n" +
            "click_url varchar,\n" +
            "ts TIMESTAMP(3),\n" +
            "WATERMARK FOR ts AS ts - INTERVAL'2'SECOND\n" +
            ") WITH (\n" +
            "'connector.type' = 'filesystem',\n"+"  'connector.path' = '"+ path +"',\n"+"  'format.type' = 'csv'\n"+")";
    
    tabEnv.sqlUpdate(ddl);
    
    // 对表数据进行 sql 查问,并将后果作为新表进行查问,每隔 30 秒统计一次数据
    String query = "SELECT\n" +
            "SESSION_START(ts, INTERVAL'30'SECOND),\n" +
            "SESSION_END(ts, INTERVAL'30'SECOND),\n" +
            "username,\n" +
            "COUNT(click_url)\n" +
            "FROM user_clicks\n" +
            "GROUP BY SESSION (ts, INTERVAL'30'SECOND), username";
    
    Table result = tabEnv.sqlQuery(query);
    
    tabEnv.toAppendStream(result, Row.class).print();
    
    env.execute();

每隔 30 秒统计一次用户点击数据.

输入后果:

4> 2021-05-10T10:00,2021-05-10T10:00:40, 张三,2
4> 2021-05-10T10:00:49,2021-05-10T10:01:35, 张三,2
4> 2021-05-10T10:01:58,2021-05-10T10:02:40, 张三,2

本文由 mirson 创作分享,如需进一步交换,请加 QQ 群:19310171 或拜访 www.softart.cn

退出移动版