关于flink:1通过FlinkSQL将数据写入mysql-demo

36次阅读

共计 3392 个字符,预计需要花费 9 分钟才能阅读完成。

FlinkSQL 的呈现,极大水平上升高了 Flink 的编程门槛,更加容易了解和把握应用。明天将本人的笔记分享进去,心愿能帮忙在这方面有须要的敌人。(1)首先引入 POM 依赖:
<properties>

<flink.version>1.13.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>

</properties>

<dependencies>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <!--<scope>provided</scope>-->
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.12.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.16</version>
</dependency>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.66</version>
</dependency>

</dependencies>

(2)编写代码
public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
        .inStreamingMode()
        //.useOldPlanner() // flink
        .useBlinkPlanner() // blink
        .build();
StreamTableEnvironment ste = StreamTableEnvironment.create(env, settings);


String ddl = "CREATE TABLE flinksinksds(\r\n" +
        "componentname STRING,\r\n" +
        "componentcount INT,\r\n" +
        "componentsum INT\r\n" +
        ") WITH(\r\n" +
        "'connector.type'='jdbc',\r\n"+"'connector.driver'='com.mysql.cj.jdbc.Driver'," +
        "'connector.url'='jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai',\r\n"+"'connector.table'='flinksink',\r\n" +
        "'connector.username'='root',\r\n"+"'connector.password'='root',\r\n" +
        "'connector.write.flush.max-rows'='1'\r\n"+")";
System.err.println(ddl);
ste.executeSql(ddl);

String insert = "insert into flinksinksds(componentname,componentcount,componentsum)" +
        "values('1024', 1 , 2)";
ste.executeSql(insert);
env.execute();
System.exit(0);

}

(3)执行后果:

正文完
 0