关于云计算:Flink的sink实战之四自定义

40次阅读

共计 4895 个字符,预计需要花费 13 分钟才能阅读完成。

欢送拜访我的 GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,波及 Java、Docker、Kubernetes、DevOPS 等;

本篇概览

Flink 官网提供的 sink 服务可能满足不了咱们的须要,此时能够开发自定义的 sink,文本就来一起实战;

全系列链接

  1. 《Flink 的 sink 实战之一:初探》
  2. 《Flink 的 sink 实战之二:kafka》
  3. 《Flink 的 sink 实战之三:cassandra3》
  4. 《Flink 的 sink 实战之四:自定义》

继承关系

  1. 在正式编码前,要先弄清楚对 sink 能力是如何实现的,后面咱们实战过的 print、kafka、cassandra 等 sink 操作,外围类的继承关系如下图所示:

  1. 可见实现 sink 能力的要害,是实现 <font color=”blue”>RichFunction 和 SinkFunction</font> 接口,前者用于资源管制(如 open、close 等操作),后者负责 sink 的具体操作,来看看最简略的 PrintSinkFunction 类是如何实现 SinkFunction 接口的 invoke 办法:
@Override
public void invoke(IN record) {writer.write(record);
}
  1. 当初对 sink 的根本逻辑曾经分明了,能够开始编码实战了;

内容和版本

本次实战很简略:自定义 sink,用于将数据写入 MySQL,波及的版本信息如下:

  1. jdk:1.8.0_191
  2. flink:1.9.2
  3. maven:3.6.0
  4. flink 所在操作系统:CentOS Linux release 7.7.1908
  5. MySQL:5.7.29
  6. IDEA:2018.3.5 (Ultimate Edition)

源码下载

如果您不想写代码,整个系列的源码可在 GitHub 下载到,地址和链接信息如下表所示 (https://github.com/zq2599/blo…:

名称 链接 备注
我的项目主页 https://github.com/zq2599/blo… 该我的项目在 GitHub 上的主页
git 仓库地址 (https) https://github.com/zq2599/blo… 该我的项目源码的仓库地址,https 协定
git 仓库地址 (ssh) git@github.com:zq2599/blog_demos.git 该我的项目源码的仓库地址,ssh 协定

这个 git 我的项目中有多个文件夹,本章的利用在 <font color=”blue”>flinksinkdemo</font> 文件夹下,如下图红框所示:

数据库筹备

请您将 MySQL 筹备好,并执行以下 sql,用于创立数据库 flinkdemo 和表 student:

create database if not exists flinkdemo;
USE flinkdemo;
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(25) COLLATE utf8_bin DEFAULT NULL,
  `age` int(10) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

编码

  1. 应用《Flink 的 sink 实战之二:kafka》中创立的 flinksinkdemo 工程;
  2. 在 pom.xml 中减少 mysql 的依赖:
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>8.0.11</version>
</dependency>
  1. 创立和数据库的 student 表对应的实体类 Student.java:
package com.bolingcavalry.customize;

public class Student {
    private int id;
    private String name;
    private int age;

    public int getId() {return id;}

    public void setId(int id) {this.id = id;}

    public String getName() {return name;}

    public void setName(String name) {this.name = name;}

    public int getAge() {return age;}

    public void setAge(int age) {this.age = age;}

    public Student(String name, int age) {
        this.name = name;
        this.age = age;
    }
}
  1. 创立自定义 sink 类 MySQLSinkFunction.java,这是本文的外围,无关数据库的连贯、断开、写入数据都集中在此:
package com.bolingcavalry.customize;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class MySQLSinkFunction extends RichSinkFunction<Student> {

    PreparedStatement preparedStatement;

    private Connection connection;

    private ReentrantLock reentrantLock = new ReentrantLock();

    @Override
    public void open(Configuration parameters) throws Exception {super.open(parameters);

        // 筹备数据库相干实例
        buildPreparedStatement();}

    @Override
    public void close() throws Exception {super.close();

        try{if(null!=preparedStatement) {preparedStatement.close();
                preparedStatement = null;
            }
        } catch(Exception e) {e.printStackTrace();
        }

        try{if(null!=connection) {connection.close();
                connection = null;
            }
        } catch(Exception e) {e.printStackTrace();
        }
    }

    @Override
    public void invoke(Student value, Context context) throws Exception {preparedStatement.setString(1, value.getName());
        preparedStatement.setInt(2, value.getAge());
        preparedStatement.executeUpdate();}

    /**
     * 筹备好 connection 和 preparedStatement
     * 获取 mysql 连贯实例,思考多线程同步,* 不必 synchronize 是因为获取数据库连贯是近程操作,耗时不确定
     * @return
     */
    private void buildPreparedStatement() {if(null==connection) {
            boolean hasLock = false;
            try {hasLock = reentrantLock.tryLock(10, TimeUnit.SECONDS);

                if(hasLock) {Class.forName("com.mysql.cj.jdbc.Driver");
                    connection = DriverManager.getConnection("jdbc:mysql://192.168.50.43:3306/flinkdemo?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC", "root", "123456");
                }

                if(null!=connection) {preparedStatement = connection.prepareStatement("insert into student (name, age) values (?, ?)");
                }
            } catch (Exception e) {
                // 生产环境慎用
                e.printStackTrace();} finally {if(hasLock) {reentrantLock.unlock();
                }
            }
        }
    }
}
  1. 上述代码很简略,只须要留神在创立连贯的时候用到了锁来管制多线程同步,以及高版本 mysql 驱动对应的 driver 和 uri 的写法与以前 5.x 版本的区别;
  2. 创立工作类 StudentSink.java,用来创立一个 flink 工作,外面通过 ArrayList 创立了一个数据集,而后间接 addSink,为了看清 DAG,调用 disableChaining 办法勾销了 operator chain:
package com.bolingcavalry.customize;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;

public class StudentSink {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 并行度为 1
        env.setParallelism(1);

        List<Student> list = new ArrayList<>();
        list.add(new Student("aaa", 11));
        list.add(new Student("bbb", 12));
        list.add(new Student("ccc", 13));
        list.add(new Student("ddd", 14));
        list.add(new Student("eee", 15));
        list.add(new Student("fff", 16));

        env.fromCollection(list)
            .addSink(new MySQLSinkFunction())
            .disableChaining();

        env.execute("sink demo : customize mysql obj");
    }
}
  1. 在 flink web 页面提交工作,并设置工作类:

  1. 工作实现后,DAG 图显示工作和记录数都合乎预期:

  1. 去查看数据库,发现数据已写入:

至此,自定义 sink 的实战曾经实现,心愿本文能给您一些参考;

欢送关注公众号:程序员欣宸

微信搜寻「程序员欣宸」,我是欣宸,期待与您一起畅游 Java 世界 …
https://github.com/zq2599/blog_demos

正文完
 0