欢送拜访我的GitHub
https://github.com/zq2599/blog_demos
内容:所有原创文章分类汇总及配套源码,波及Java、Docker、Kubernetes、DevOPS等;
本文是《Flink的DataSource三部曲》系列的第一篇,该系列旨在通过实战学习和理解Flink的DataSource,为当前的深刻学习打好根底,由以下三局部组成:
- 间接API:即本篇,除了筹备环境和工程,还学习了StreamExecutionEnvironment提供的用来创立数据来的API;
- 内置connector:StreamExecutionEnvironment的addSource办法,入参能够是flink内置的connector,例如kafka、RabbitMQ等;
- 自定义:StreamExecutionEnvironment的addSource办法,入参能够是自定义的SourceFunction实现类;
Flink的DataSource三部曲文章链接
- 《Flink的DataSource三部曲之一:间接API》
- 《Flink的DataSource三部曲之二:内置connector》
- 《Flink的DataSource三部曲之三:自定义》
对于Flink的DataSource
官网对DataSource的解释:Sources are where your program reads its input from,即DataSource是利用的数据起源,如下图的两个红框所示:
DataSource类型
对于常见的文本读入、kafka、RabbitMQ等数据起源,能够间接应用Flink提供的API或者connector,如果这些满足不了需要,还能够本人开发,下图是我依照本人的了解梳理的:
环境和版本
熟练掌握内置DataSource的最好方法就是实战,本次实战的环境和版本如下:
- JDK:1.8.0_211
- Flink:1.9.2
- Maven:3.6.0
- 操作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
- IDEA:2018.3.5 (Ultimate Edition)
源码下载
如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blo…:
名称 | 链接 | 备注 |
---|---|---|
我的项目主页 | https://github.com/zq2599/blo… | 该我的项目在GitHub上的主页 |
git仓库地址(https) | https://github.com/zq2599/blo… | 该我的项目源码的仓库地址,https协定 |
git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该我的项目源码的仓库地址,ssh协定 |
这个git我的项目中有多个文件夹,本章的利用在<font color=”blue”>flinkdatasourcedemo</font>文件夹下,如下图红框所示:
环境和版本
本次实战的环境和版本如下:
- JDK:1.8.0_211
- Flink:1.9.2
- Maven:3.6.0
- 操作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
- IDEA:2018.3.5 (Ultimate Edition)
创立工程
- 在控制台执行以下命令就会进入创立flink利用的交互模式,按提醒输出gourpId和artifactId,就会创立一个flink利用(我输出的groupId是<font color=”blue”>com.bolingcavalry</font>,artifactId是<font color=”blue”>flinkdatasourcedemo</font>):
mvn \
archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.2
- 当初maven工程已生成,用IDEA导入这个工程,如下图:
- 以maven的类型导入:
- 导入胜利的样子:
- 我的项目创立胜利,能够开始写代码实战了;
辅助类Splitter
实战中有个性能罕用到:将字符串用空格宰割,转成Tuple2类型的汇合,这里将此算子做成一个公共类Splitter.java,代码如下:
package com.bolingcavalry;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
public class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
if(StringUtils.isNullOrWhitespaceOnly(s)) {
System.out.println("invalid line");
return;
}
for(String word : s.split(" ")) {
collector.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
筹备结束,能够开始实战了,先从最简略的Socket开始。
Socket DataSource
Socket DataSource的性能是监听指定IP的指定端口,读取网络数据;
- 在方才新建的工程中创立一个类Socket.java:
package com.bolingcavalry.api;
import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
public class Socket {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//监听本地9999端口,读取字符串
DataStream<String> socketDataStream = env.socketTextStream("localhost", 9999);
//每五秒钟一次,将以后五秒内所有字符串以空格宰割,而后统计单词数量,打印进去
socketDataStream
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print();
env.execute("API DataSource demo : socket");
}
}
从上述代码可见,StreamExecutionEnvironment.socketTextStream就能够创立Socket类型的DataSource,在控制台执行命令<font color=”blue”>nc -lk 9999</font>,即可进入交互模式,此时输入任何字符串再回车,都会将字符串传输到本机9999端口;
- 在IDEA上运行Socket类,启动胜利后再回到方才执行<font color=”blue”>nc -lk 9999</font>的控制台,输出一些字符串再回车,可见Socket的性能曾经失效:
汇合DataSource(generateSequence)
- 基于汇合的DataSource,API如下图所示:
- 先试试最简略的generateSequence,创立指定范畴内的数字型的DataSource:
package com.bolingcavalry.api;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class GenerateSequence {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//并行度为1
env.setParallelism(1);
//通过generateSequence失去Long类型的DataSource
DataStream<Long> dataStream = env.generateSequence(1, 10);
//做一次过滤,只保留偶数,而后打印
dataStream.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long aLong) throws Exception {
return 0L==aLong.longValue()%2L;
}
}).print();
env.execute("API DataSource demo : collection");
}
}
- 运行时会打印偶数:
汇合DataSource(fromElements+fromCollection)
- fromElements和fromCollection就在一个类中试了吧,创立<font color=”blue”>FromCollection</font>类,外面是这两个API的用法:
package com.bolingcavalry.api;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
public class FromCollection {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//并行度为1
env.setParallelism(1);
//创立一个List,外面有两个Tuple2元素
List<Tuple2<String, Integer>> list = new ArrayList<>();
list.add(new Tuple2("aaa", 1));
list.add(new Tuple2("bbb", 1));
//通过List创立DataStream
DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list);
//通过多个Tuple2元素创立DataStream
DataStream<Tuple2<String, Integer>> fromElementDataStream = env.fromElements(
new Tuple2("ccc", 1),
new Tuple2("ddd", 1),
new Tuple2("aaa", 1)
);
//通过union将两个DataStream合成一个
DataStream<Tuple2<String, Integer>> unionDataStream = fromCollectionDataStream.union(fromElementDataStream);
//统计每个单词的数量
unionDataStream
.keyBy(0)
.sum(1)
.print();
env.execute("API DataSource demo : collection");
}
}
- 运行后果如下:
文件DataSource
- 上面的ReadTextFile类会读取绝对路径的文本文件,并对内容做单词统计:
package com.bolingcavalry.api;
import com.bolingcavalry.Splitter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ReadTextFile {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度为1
env.setParallelism(1);
//用txt文件作为数据源
DataStream<String> textDataStream = env.readTextFile("file:///Users/zhaoqin/temp/202003/14/README.txt", "UTF-8");
//统计单词数量并打印进去
textDataStream
.flatMap(new Splitter())
.keyBy(0)
.sum(1)
.print();
env.execute("API DataSource demo : readTextFile");
}
}
- 请确保代码中的绝对路径下存在名为README.txt文件,运行后果如下:
- 关上StreamExecutionEnvironment.java源码,看一下方才应用的readTextFile办法实现如下,原来是调用了另一个同名办法,该办法的第三个参数确定了文本文件是一次性读取结束,还是周期性扫描内容变更,而第四个参数就是周期性扫描的间隔时间:
public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(filePath), "The file path must not be null or blank.");
TextInputFormat format = new TextInputFormat(new Path(filePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName(charsetName);
return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo);
}
- 下面的FileProcessingMode是个枚举,源码如下:
@PublicEvolving
public enum FileProcessingMode {
/** Processes the current contents of the path and exits. */
PROCESS_ONCE,
/** Periodically scans the path for new data. */
PROCESS_CONTINUOUSLY
}
- 另外请关注<font color=”blue”>readTextFile</font>办法的<font color=”red”>filePath</font>参数,这是个URI类型的字符串,除了本地文件门路,还能够是HDFS的地址:<font color=”blue”>hdfs://host:port/file/path</font>
至此,通过间接API创立DataSource的实战就实现了,前面的章节咱们持续学习内置connector形式的DataSource;
欢送关注公众号:程序员欣宸
微信搜寻「程序员欣宸」,我是欣宸,期待与您一起畅游Java世界…
https://github.com/zq2599/blog_demos
发表回复