关于flink:5FlinkSQL将socket数据写入到mysql方式二

30次阅读

共计 1877 个字符,预计需要花费 5 分钟才能阅读完成。

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1", 9999,"\n");
    SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
        @Override
        public WaterSensor map(String s) throws Exception {String[] split = s.split(",");
            return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
        }
    });

    // 将流转化为表
    Table table = tableEnv.fromDataStream(waterDS,
            $("id"),
            $("ts"),
            $("vc"),
            $("pt").proctime());

    tableEnv.createTemporaryView("EventTable", table);


    tableEnv.executeSql("CREATE TABLE flinksink (" +
            "componentname STRING," +
            "componentcount BIGINT NOT NULL," +
            "componentsum BIGINT" +
            ") WITH (" +
            "'connector.type' = 'jdbc',"+"'connector.url'='jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +
            "'connector.table' = 'flinksink',"+"'connector.driver'='com.mysql.cj.jdbc.Driver'," +
            "'connector.username' = 'root',"+"'connector.password'='root'," +
            "'connector.write.flush.max-rows'='3'\r\n"+")"
    );
    Table mysql_user = tableEnv.from("flinksink");
    mysql_user.printSchema();

    Table result = tableEnv.sqlQuery(
            "SELECT" +
                    "id as componentname," +                //window_start, window_end,
                    "COUNT(ts) as componentcount ,SUM(ts) as componentsum" +
                    "FROM TABLE(" +
                    "TUMBLE( TABLE EventTable ," +
                    "DESCRIPTOR(pt)," +
                    "INTERVAL'10'SECOND))" +
                    "GROUP BY id , window_start, window_end"
    );

    // 形式一:写入数据库 

// result.executeInsert(“flinksink”).print(); //;.insertInto(“flinksink”);

    // 形式二:写入数据库
    tableEnv.createTemporaryView("ResultTable", result);
    tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();

// tableEnv.toAppendStream(result, Row.class).print(“toAppendStream”); // 追加模式

    env.execute();}

正文完
 0