共计 9608 个字符,预计需要花费 25 分钟才能阅读完成。
欢送拜访我的 GitHub
https://github.com/zq2599/blog_demos
内容:所有原创文章分类汇总及配套源码,波及 Java、Docker、Kubernetes、DevOPS 等;
本篇概览
本文是《Flink 的 DataSource 三部曲》的终篇,后面都是在学习 Flink 已有的数据源性能,但如果这些不能满足需要,就要自定义数据源(例如从数据库获取数据),也就是明天实战的内容,如下图红框所示:
Flink 的 DataSource 三部曲文章链接
- 《Flink 的 DataSource 三部曲之一:间接 API》
- 《Flink 的 DataSource 三部曲之二: 内置 connector》
- 《Flink 的 DataSource 三部曲之三: 自定义》
环境和版本
本次实战的环境和版本如下:
- JDK:1.8.0_211
- Flink:1.9.2
- Maven:3.6.0
- 操作系统:macOS Catalina 10.15.3(MacBook Pro 13-inch, 2018)
- IDEA:2018.3.5 (Ultimate Edition)
在服务器上搭建 Flink 服务
- 后面两章的程序都是在 IDEA 上运行的,本章须要通过 Flink 的 web ui 察看运行后果,因而要独自部署 Flink 服务,我这里是在 CentOS 环境通过 docker-compose 部署的,以下是 docker-compose.yml 的内容,用于参考:
version: "2.1" | |
services: | |
jobmanager: | |
image: flink:1.9.2-scala_2.12 | |
expose: | |
- "6123" | |
ports: | |
- "8081:8081" | |
command: jobmanager | |
environment: | |
- JOB_MANAGER_RPC_ADDRESS=jobmanager | |
taskmanager1: | |
image: flink:1.9.2-scala_2.12 | |
expose: | |
- "6121" | |
- "6122" | |
depends_on: | |
- jobmanager | |
command: taskmanager | |
links: | |
- "jobmanager:jobmanager" | |
environment: | |
- JOB_MANAGER_RPC_ADDRESS=jobmanager | |
taskmanager2: | |
image: flink:1.9.2-scala_2.12 | |
expose: | |
- "6121" | |
- "6122" | |
depends_on: | |
- jobmanager | |
command: taskmanager | |
links: | |
- "jobmanager:jobmanager" | |
environment: | |
- JOB_MANAGER_RPC_ADDRESS=jobmanager |
- 下图是我的 Flink 状况,有两个 Task Maganer,共八个 Slot 全副可用:
源码下载
如果您不想写代码,整个系列的源码可在 GitHub 下载到,地址和链接信息如下表所示 (https://github.com/zq2599/blo…:
名称 | 链接 | 备注 |
---|---|---|
我的项目主页 | https://github.com/zq2599/blo… | 该我的项目在 GitHub 上的主页 |
git 仓库地址 (https) | https://github.com/zq2599/blo… | 该我的项目源码的仓库地址,https 协定 |
git 仓库地址 (ssh) | git@github.com:zq2599/blog_demos.git | 该我的项目源码的仓库地址,ssh 协定 |
这个 git 我的项目中有多个文件夹,本章的利用在 <font color=”blue”>flinkdatasourcedemo</font> 文件夹下,如下图红框所示:
筹备结束,开始开发;
实现 SourceFunctionDemo 接口的 DataSource
- 从最简略的开始,开发一个不可并行的数据源并验证;
- 实现 SourceFunction 接口,在工程 <font color=”blue”>flinkdatasourcedemo</font> 中减少 SourceFunctionDemo.java:
package com.bolingcavalry.customize; | |
import org.apache.flink.api.java.tuple.Tuple2; | |
import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.functions.source.SourceFunction; | |
import org.apache.flink.streaming.api.windowing.time.Time; | |
public class SourceFunctionDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
// 并行度为 2 | |
env.setParallelism(2); | |
DataStream<Tuple2<Integer,Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<Integer, Integer>>() { | |
private volatile boolean isRunning = true; | |
@Override | |
public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { | |
int i = 0; | |
while (isRunning) {ctx.collect(new Tuple2<>(i++ % 5, 1)); | |
Thread.sleep(1000); | |
if(i>9){break;} | |
} | |
} | |
@Override | |
public void cancel() {isRunning = false;} | |
}); | |
dataStream | |
.keyBy(0) | |
.timeWindow(Time.seconds(2)) | |
.sum(1) | |
.print(); | |
env.execute("Customize DataSource demo : SourceFunction"); | |
} | |
} |
- 从上述代码可见,给 addSource 办法传入一个匿名类实例,该匿名类实现了 SourceFunction 接口;
- 实现 SourceFunction 接口只需实现 run 和 cancel 办法;
- <font color=”blue”>run</font> 办法产生数据,这里为了简答操作,每隔一秒产生一个 Tuple2 实例,因为接下来的算子中有 keyBy 操作,因而 Tuple2 的第一个字段始终保持着 5 的余数,这样能够多几个 key,以便扩散到不同的 slot 中;
- 为了核查数据是否精确,这里并没有有限发送数据,而是仅发送了 10 个 Tuple2 实例;
- <font color=”blue”>cancel</font> 是 job 被勾销时执行的办法;
- 整体并行度显式设置为 2;
- 编码实现后,执行 <font color=”blue”>mvn clean package -U -DskipTests</font> 构建,在 target 目录失去文件 <font color=”blue”>flinkdatasourcedemo-1.0-SNAPSHOT.jar</font>;
- 在 Flink 的 web UI 上传 flinkdatasourcedemo-1.0-SNAPSHOT.jar,并指定执行类,如下图红框所示:
-
- 工作执行实现后,在 <font color=”blue”>Completed Jobs</font> 页面能够看到,DataSource 的并行度是 1(红框),对应的 SubTask 一共发送了 10 条记录(蓝框),这和咱们的代码是统一的;
- 再来看生产的子工作,如下图,红框显示并行度是 2,这和后面代码中的设置是统一的,蓝框显示两个子工作一共收到 10 条数据记录,和上游收回的数量统一:
- 接下来尝试多并行度的 DataSource;
实现 ParallelSourceFunction 接口的 DataSource
- 如果自定义 DataSource 中有简单的或者耗时的操作,那么减少 DataSource 的并行度,让多个 SubTask 同时进行这些操作,能够无效晋升整体吞吐量(前提是硬件资源富余);
- 接下来实战能够并行执行的 DataSource,原理是 DataSoure 实现 ParallelSourceFunction 接口,代码如下,可见和 SourceFunctionDemo 简直一样,只是 addSource 方发入参不同,该入参仍然是匿名类,不过实现的的接口变成了 ParallelSourceFunction:
package com.bolingcavalry.customize; | |
import org.apache.flink.api.java.tuple.Tuple2; | |
import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; | |
import org.apache.flink.streaming.api.windowing.time.Time; | |
public class ParrelSourceFunctionDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
// 并行度为 2 | |
env.setParallelism(2); | |
DataStream<Tuple2<Integer,Integer>> dataStream = env.addSource(new ParallelSourceFunction<Tuple2<Integer, Integer>>() { | |
private volatile boolean isRunning = true; | |
@Override | |
public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { | |
int i = 0; | |
while (isRunning) {ctx.collect(new Tuple2<>(i++ % 5, 1)); | |
Thread.sleep(1000); | |
if(i>9){break;} | |
} | |
} | |
@Override | |
public void cancel() {isRunning = false;} | |
}); | |
dataStream | |
.keyBy(0) | |
.timeWindow(Time.seconds(2)) | |
.sum(1) | |
.print(); | |
env.execute("Customize DataSource demo : ParallelSourceFunction"); | |
} | |
} |
- 编码实现后,执行 <font color=”blue”>mvn clean package -U -DskipTests</font> 构建,在 target 目录失去文件 <font color=”blue”>flinkdatasourcedemo-1.0-SNAPSHOT.jar</font>;
- 在 Flink 的 web UI 上传 flinkdatasourcedemo-1.0-SNAPSHOT.jar,并指定执行类,如下图红框所示:
- 工作执行实现后,在 <font color=”blue”>Completed Jobs</font> 页面能够看到,现在 DataSource 的并行度是 2(红框),对应的 SubTask 一共发送了 20 条记录(蓝框),这和咱们的代码是统一的,绿框显示两个 SubTask 的 Task Manager 是同一个:
- 为什么 DataSource 一共发送了 20 条记录?因为每个 SubTask 中都有一份 ParallelSourceFunction 匿名类的实例,对应的 run 办法别离被执行,因而每个 SubTask 都发送了 10 条;
- 再来看生产数据的子工作,如下图,红框显示并行度与代码中设置的数量是统一的,蓝框显示两个 SubTask 一共生产了 20 条记录,和数据源收回的记录数统一,另外绿框显示两个 SubTask 的 Task Manager 是同一个,而且和 DataSource 的 TaskManager 是同一个,因而整个 job 都是在同一个 TaskManager 进行的,没有跨机器带来的额定代价:
- 接下来要实际的内容,和另一个重要的抽象类无关;
继承抽象类 RichSourceFunction 的 DataSource
- 对 RichSourceFunction 的了解是从继承关系开始的,如下图,SourceFunction 和 RichFunction 的个性最终都体现在 RichSourceFunction 上,SourceFunction 的个性是数据的生成 (run 办法),RichFunction 的个性是对资源的连贯和开释 (open 和 close 办法):
- 接下来开始实战,指标是从 MySQL 获取数据作为 DataSource,而后生产这些数据;
- 请提前准备好可用的 MySql 数据库,而后执行以下 SQL,创立库、表、记录:
DROP DATABASE IF EXISTS flinkdemo; | |
CREATE DATABASE IF NOT EXISTS flinkdemo; | |
USE flinkdemo; | |
SELECT 'CREATING DATABASE STRUCTURE' as 'INFO'; | |
DROP TABLE IF EXISTS `student`; | |
CREATE TABLE `student` (`id` int(11) unsigned NOT NULL AUTO_INCREMENT, | |
`name` varchar(25) COLLATE utf8_bin DEFAULT NULL, | |
PRIMARY KEY (`id`) | |
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin; | |
INSERT INTO `student` VALUES ('1', 'student01'), ('2', 'student02'), ('3', 'student03'), ('4', 'student04'), ('5', 'student05'), ('6', 'student06'); | |
COMMIT; |
- 在 pom.xml 中减少 mysql 依赖:
<dependency> | |
<groupId>mysql</groupId> | |
<artifactId>mysql-connector-java</artifactId> | |
<version>5.1.34</version> | |
</dependency> |
- 新增 MySQLDataSource.java,内容如下:
package com.bolingcavalry.customize; | |
import com.bolingcavalry.Student; | |
import org.apache.flink.configuration.Configuration; | |
import org.apache.flink.streaming.api.functions.source.RichSourceFunction; | |
import java.sql.Connection; | |
import java.sql.DriverManager; | |
import java.sql.PreparedStatement; | |
import java.sql.ResultSet; | |
public class MySQLDataSource extends RichSourceFunction<Student> { | |
private Connection connection = null; | |
private PreparedStatement preparedStatement = null; | |
private volatile boolean isRunning = true; | |
@Override | |
public void open(Configuration parameters) throws Exception {super.open(parameters); | |
if(null==connection) {Class.forName("com.mysql.jdbc.Driver"); | |
connection = DriverManager.getConnection("jdbc:mysql://192.168.50.43:3306/flinkdemo?useUnicode=true&characterEncoding=UTF-8", "root", "123456"); | |
} | |
if(null==preparedStatement) {preparedStatement = connection.prepareStatement("select id, name from student"); | |
} | |
} | |
/** | |
* 开释资源 | |
* @throws Exception | |
*/ | |
@Override | |
public void close() throws Exception {super.close(); | |
if(null!=preparedStatement) { | |
try {preparedStatement.close(); | |
} catch (Exception exception) {exception.printStackTrace(); | |
} | |
} | |
if(null==connection) {connection.close(); | |
} | |
} | |
@Override | |
public void run(SourceContext<Student> ctx) throws Exception {ResultSet resultSet = preparedStatement.executeQuery(); | |
while (resultSet.next() && isRunning) {Student student = new Student(); | |
student.setId(resultSet.getInt("id")); | |
student.setName(resultSet.getString("name")); | |
ctx.collect(student); | |
} | |
} | |
@Override | |
public void cancel() {isRunning = false;} | |
} |
- 下面的代码中,MySQLDataSource 继承了 RichSourceFunction,作为一个 DataSource,能够作为 addSource 办法的入参;
- open 和 close 办法都会被数据源的 SubTask 调用,open 负责创立数据库连贯对象,close 负责开释资源;
- open 办法中间接写死了数据库相干的配置 (不可取);
- run 办法在 open 之后被调用,作用和之前的 DataSource 例子一样,负责生产数据,这里是用后面筹备好的 preparedStatement 对象间接去数据库取数据;
- 接下来写个 Demo 类应用 MySQLDataSource:
package com.bolingcavalry.customize; | |
import com.bolingcavalry.Student; | |
import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
public class RichSourceFunctionDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
// 并行度为 2 | |
env.setParallelism(2); | |
DataStream<Student> dataStream = env.addSource(new MySQLDataSource()); | |
dataStream.print(); | |
env.execute("Customize DataSource demo : RichSourceFunction"); | |
} | |
} |
- 从上述代码可见,MySQLDataSource 实例传入 addSource 办法即可创立数据集;
- 像之前那样,编译构建、提交到 Flink、指定工作类,即可开始执行此工作;
- 执行后果如下图,DataSource 的并行度是 1,一共发送六条记录,即 student 表的所有记录:
- 解决数据的 SubTask 一共两个,各解决三条音讯:
- 因为代码中对数据集执行了 print(),因而在 TaskManager 控制台看到数据输入如下图红框所示:
对于 RichParallelSourceFunction
- 实战到了这里,还剩 RichParallelSourceFunction 这个抽象类咱们还没有尝试过,但我感觉这个类能够不必在文中多说了,咱们把 RichlSourceFunction 和 RichParallelSourceFunction 的类图放在一起看看:
- 从上图可见,在 RichFunction 继承关系上,两者统一,在 SourceFunction 的继承关系上,RichlSourceFunction 和 RichParallelSourceFunction 略有不同,RichParallelSourceFunction 走的是 ParallelSourceFunction 这条线,而 SourceFunction 和 ParallelSourceFunction 的区别,后面曾经讲过了,因而,后果显而易见:<font color=”blue”> 继承 RichParallelSourceFunction 的 DataSource 的并行度是能够大于 1 的 </font>;
- 读者您如果有趣味,能够将后面的 MySQLDataSource 改成继承 RichParallelSourceFunction 再试试,DataSource 的并行度会超过 1,<font color=”red”> 然而绝不是只有这一点变动 </font>,DAG 图显示 Flink 还会做一些 Operator Chain 解决,但这不是本章要关注的内容,只能说后果是正确的(两个 DataSource 的 SubTask,一共发送 12 条记录),建议您试试;
至此,《Flink 的 DataSource 三部曲》系列就全副实现了,好的开始是胜利的一半,在拿到数据后,前面还有很多知识点要学习和把握,接下来的文章会持续深刻 Flink 的微妙之旅;
欢送关注公众号:程序员欣宸
微信搜寻「程序员欣宸」,我是欣宸,期待与您一起畅游 Java 世界 …
https://github.com/zq2599/blog_demos
正文完