欢送拜访我的GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,波及Java、Docker、Kubernetes、DevOPS等;

本篇概览

本文是《Flink的DataSource三部曲》的终篇,后面都是在学习Flink已有的数据源性能,但如果这些不能满足需要,就要自定义数据源(例如从数据库获取数据),也就是明天实战的内容,如下图红框所示:

Flink的DataSource三部曲文章链接

  1. 《Flink的DataSource三部曲之一:间接API》
  2. 《Flink的DataSource三部曲之二:内置connector》
  3. 《Flink的DataSource三部曲之三:自定义》

环境和版本

本次实战的环境和版本如下:

  1. JDK:1.8.0_211
  2. Flink:1.9.2
  3. Maven:3.6.0
  4. 操作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
  5. IDEA:2018.3.5 (Ultimate Edition)

在服务器上搭建Flink服务

  1. 后面两章的程序都是在IDEA上运行的,本章须要通过Flink的web ui察看运行后果,因而要独自部署Flink服务,我这里是在CentOS环境通过docker-compose部署的,以下是docker-compose.yml的内容,用于参考:
version: "2.1"services:  jobmanager:    image: flink:1.9.2-scala_2.12    expose:      - "6123"    ports:      - "8081:8081"    command: jobmanager    environment:      - JOB_MANAGER_RPC_ADDRESS=jobmanager  taskmanager1:    image: flink:1.9.2-scala_2.12    expose:      - "6121"      - "6122"    depends_on:      - jobmanager    command: taskmanager    links:      - "jobmanager:jobmanager"    environment:      - JOB_MANAGER_RPC_ADDRESS=jobmanager  taskmanager2:    image: flink:1.9.2-scala_2.12    expose:      - "6121"      - "6122"    depends_on:      - jobmanager    command: taskmanager    links:      - "jobmanager:jobmanager"    environment:      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  1. 下图是我的Flink状况,有两个Task Maganer,共八个Slot全副可用:

源码下载

如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blo...:

名称链接备注
我的项目主页https://github.com/zq2599/blo...该我的项目在GitHub上的主页
git仓库地址(https)https://github.com/zq2599/blo...该我的项目源码的仓库地址,https协定
git仓库地址(ssh)git@github.com:zq2599/blog_demos.git该我的项目源码的仓库地址,ssh协定

这个git我的项目中有多个文件夹,本章的利用在<font color="blue">flinkdatasourcedemo</font>文件夹下,如下图红框所示:

筹备结束,开始开发;

实现SourceFunctionDemo接口的DataSource

  1. 从最简略的开始,开发一个不可并行的数据源并验证;
  2. 实现SourceFunction接口,在工程<font color="blue">flinkdatasourcedemo</font>中减少SourceFunctionDemo.java:
package com.bolingcavalry.customize;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.api.windowing.time.Time;public class SourceFunctionDemo {    public static void main(String[] args) throws Exception {        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //并行度为2        env.setParallelism(2);        DataStream<Tuple2<Integer,Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<Integer, Integer>>() {            private volatile boolean isRunning = true;            @Override            public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {                int i = 0;                while (isRunning) {                    ctx.collect(new Tuple2<>(i++ % 5, 1));                    Thread.sleep(1000);                    if(i>9){                        break;                    }                }            }            @Override            public void cancel() {                isRunning = false;            }        });        dataStream                .keyBy(0)                .timeWindow(Time.seconds(2))                .sum(1)                .print();        env.execute("Customize DataSource demo : SourceFunction");    }}
  1. 从上述代码可见,给addSource办法传入一个匿名类实例,该匿名类实现了SourceFunction接口;
  2. 实现SourceFunction接口只需实现run和cancel办法;
  3. <font color="blue">run</font>办法产生数据,这里为了简答操作,每隔一秒产生一个Tuple2实例,因为接下来的算子中有keyBy操作,因而Tuple2的第一个字段始终保持着5的余数,这样能够多几个key,以便扩散到不同的slot中;
  4. 为了核查数据是否精确,这里并没有有限发送数据,而是仅发送了10个Tuple2实例;
  5. <font color="blue">cancel</font>是job被勾销时执行的办法;
  6. 整体并行度显式设置为2;
  7. 编码实现后,执行<font color="blue">mvn clean package -U -DskipTests</font>构建,在target目录失去文件<font color="blue">flinkdatasourcedemo-1.0-SNAPSHOT.jar</font>;
  8. 在Flink的web UI上传flinkdatasourcedemo-1.0-SNAPSHOT.jar,并指定执行类,如下图红框所示:

    1. 工作执行实现后,在<font color="blue">Completed Jobs</font>页面能够看到,DataSource的并行度是1(红框),对应的SubTask一共发送了10条记录(蓝框),这和咱们的代码是统一的;

  1. 再来看生产的子工作,如下图,红框显示并行度是2,这和后面代码中的设置是统一的,蓝框显示两个子工作一共收到10条数据记录,和上游收回的数量统一:

  1. 接下来尝试多并行度的DataSource;

实现ParallelSourceFunction接口的DataSource

  1. 如果自定义DataSource中有简单的或者耗时的操作,那么减少DataSource的并行度,让多个SubTask同时进行这些操作,能够无效晋升整体吞吐量(前提是硬件资源富余);
  2. 接下来实战能够并行执行的DataSource,原理是DataSoure实现ParallelSourceFunction接口,代码如下,可见和SourceFunctionDemo简直一样,只是addSource方发入参不同,该入参仍然是匿名类,不过实现的的接口变成了ParallelSourceFunction:
package com.bolingcavalry.customize;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;import org.apache.flink.streaming.api.windowing.time.Time;public class ParrelSourceFunctionDemo {    public static void main(String[] args) throws Exception {        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //并行度为2        env.setParallelism(2);        DataStream<Tuple2<Integer,Integer>> dataStream = env.addSource(new ParallelSourceFunction<Tuple2<Integer, Integer>>() {            private volatile boolean isRunning = true;            @Override            public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {                int i = 0;                while (isRunning) {                    ctx.collect(new Tuple2<>(i++ % 5, 1));                    Thread.sleep(1000);                    if(i>9){                        break;                    }                }            }            @Override            public void cancel() {                isRunning = false;            }        });        dataStream                .keyBy(0)                .timeWindow(Time.seconds(2))                .sum(1)                .print();        env.execute("Customize DataSource demo : ParallelSourceFunction");    }}
  1. 编码实现后,执行<font color="blue">mvn clean package -U -DskipTests</font>构建,在target目录失去文件<font color="blue">flinkdatasourcedemo-1.0-SNAPSHOT.jar</font>;
  2. 在Flink的web UI上传flinkdatasourcedemo-1.0-SNAPSHOT.jar,并指定执行类,如下图红框所示:

  1. 工作执行实现后,在<font color="blue">Completed Jobs</font>页面能够看到,现在DataSource的并行度是2(红框),对应的SubTask一共发送了20条记录(蓝框),这和咱们的代码是统一的,绿框显示两个SubTask的Task Manager是同一个:

  1. 为什么DataSource一共发送了20条记录?因为每个SubTask中都有一份ParallelSourceFunction匿名类的实例,对应的run办法别离被执行,因而每个SubTask都发送了10条;
  2. 再来看生产数据的子工作,如下图,红框显示并行度与代码中设置的数量是统一的,蓝框显示两个SubTask一共生产了20条记录,和数据源收回的记录数统一,另外绿框显示两个SubTask的Task Manager是同一个,而且和DataSource的TaskManager是同一个,因而整个job都是在同一个TaskManager进行的,没有跨机器带来的额定代价:

  1. 接下来要实际的内容,和另一个重要的抽象类无关;

继承抽象类RichSourceFunction的DataSource

  1. 对RichSourceFunction的了解是从继承关系开始的,如下图,SourceFunction和RichFunction的个性最终都体现在RichSourceFunction上,SourceFunction的个性是数据的生成(run办法),RichFunction的个性是对资源的连贯和开释(open和close办法):

  1. 接下来开始实战,指标是从MySQL获取数据作为DataSource,而后生产这些数据;
  2. 请提前准备好可用的MySql数据库,而后执行以下SQL,创立库、表、记录:
DROP DATABASE IF EXISTS flinkdemo;CREATE DATABASE IF NOT EXISTS flinkdemo;USE flinkdemo;SELECT 'CREATING DATABASE STRUCTURE' as 'INFO';DROP TABLE IF EXISTS `student`;CREATE TABLE `student` (  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,  `name` varchar(25) COLLATE utf8_bin DEFAULT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;INSERT INTO `student` VALUES ('1', 'student01'), ('2', 'student02'), ('3', 'student03'), ('4', 'student04'), ('5', 'student05'), ('6', 'student06');COMMIT;
  1. 在pom.xml中减少mysql依赖:
<dependency>  <groupId>mysql</groupId>  <artifactId>mysql-connector-java</artifactId>  <version>5.1.34</version></dependency>
  1. 新增MySQLDataSource.java,内容如下:
package com.bolingcavalry.customize;import com.bolingcavalry.Student;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.ResultSet;public class MySQLDataSource extends RichSourceFunction<Student> {    private Connection connection = null;    private PreparedStatement preparedStatement = null;    private volatile boolean isRunning = true;    @Override    public void open(Configuration parameters) throws Exception {        super.open(parameters);        if(null==connection) {            Class.forName("com.mysql.jdbc.Driver");            connection = DriverManager.getConnection("jdbc:mysql://192.168.50.43:3306/flinkdemo?useUnicode=true&characterEncoding=UTF-8", "root", "123456");        }        if(null==preparedStatement) {            preparedStatement = connection.prepareStatement("select id, name from student");        }    }    /**     * 开释资源     * @throws Exception     */    @Override    public void close() throws Exception {        super.close();        if(null!=preparedStatement) {            try {                preparedStatement.close();            } catch (Exception exception) {                exception.printStackTrace();            }        }        if(null==connection) {            connection.close();        }    }    @Override    public void run(SourceContext<Student> ctx) throws Exception {        ResultSet resultSet = preparedStatement.executeQuery();        while (resultSet.next() && isRunning) {            Student student = new Student();            student.setId(resultSet.getInt("id"));            student.setName(resultSet.getString("name"));            ctx.collect(student);        }    }    @Override    public void cancel() {        isRunning = false;    }}
  1. 下面的代码中,MySQLDataSource继承了RichSourceFunction,作为一个DataSource,能够作为addSource办法的入参;
  2. open和close办法都会被数据源的SubTask调用,open负责创立数据库连贯对象,close负责开释资源;
  3. open办法中间接写死了数据库相干的配置(不可取);
  4. run办法在open之后被调用,作用和之前的DataSource例子一样,负责生产数据,这里是用后面筹备好的preparedStatement对象间接去数据库取数据;
  5. 接下来写个Demo类应用MySQLDataSource:
package com.bolingcavalry.customize;import com.bolingcavalry.Student;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class RichSourceFunctionDemo {    public static void main(String[] args) throws Exception {        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //并行度为2        env.setParallelism(2);        DataStream<Student> dataStream = env.addSource(new MySQLDataSource());        dataStream.print();        env.execute("Customize DataSource demo : RichSourceFunction");    }}
  1. 从上述代码可见,MySQLDataSource实例传入addSource办法即可创立数据集;
  2. 像之前那样,编译构建、提交到Flink、指定工作类,即可开始执行此工作;
  3. 执行后果如下图,DataSource的并行度是1,一共发送六条记录,即student表的所有记录:

  1. 解决数据的SubTask一共两个,各解决三条音讯:

  1. 因为代码中对数据集执行了print(),因而在TaskManager控制台看到数据输入如下图红框所示:

对于RichParallelSourceFunction

  1. 实战到了这里,还剩RichParallelSourceFunction这个抽象类咱们还没有尝试过,但我感觉这个类能够不必在文中多说了,咱们把RichlSourceFunction和RichParallelSourceFunction的类图放在一起看看:

  1. 从上图可见,在RichFunction继承关系上,两者统一,在SourceFunction的继承关系上,RichlSourceFunction和RichParallelSourceFunction略有不同,RichParallelSourceFunction走的是ParallelSourceFunction这条线,而SourceFunction和ParallelSourceFunction的区别,后面曾经讲过了,因而,后果显而易见:<font color="blue">继承RichParallelSourceFunction的DataSource的并行度是能够大于1的</font>;
  2. 读者您如果有趣味,能够将后面的MySQLDataSource改成继承RichParallelSourceFunction再试试,DataSource的并行度会超过1,<font color="red">然而绝不是只有这一点变动</font>,DAG图显示Flink还会做一些Operator Chain解决,但这不是本章要关注的内容,只能说后果是正确的(两个DataSource的SubTask,一共发送12条记录),建议您试试;

至此,《Flink的DataSource三部曲》系列就全副实现了,好的开始是胜利的一半,在拿到数据后,前面还有很多知识点要学习和把握,接下来的文章会持续深刻Flink的微妙之旅;

欢送关注公众号:程序员欣宸

微信搜寻「程序员欣宸」,我是欣宸,期待与您一起畅游Java世界...
https://github.com/zq2599/blog_demos