共计 21309 个字符,预计需要花费 54 分钟才能阅读完成。
Flink 最大的亮点是实时处理局部,Flink 认为批处理是流解决的非凡状况,能够通过一套引擎解决批量和流式数据,而 Flink 在将来也会重点投入更多的资源到批流交融中。我在 Flink DataStream API 编程指南中介绍了 DataStream API 的应用,在本文中将介绍 Flink 批处理计算的 DataSet API 的应用。通过本文你能够理解:
- DataSet 转换操作(Transformation)
- Source 与 Sink 的应用
- 播送变量的基本概念与应用 Demo
- 分布式缓存的概念及应用 Demo
- DataSet API 的 Transformation 应用 Demo 案例
WordCount 示例
在开始解说 DataSet API 之前,先看一个 Word Count 的简略示例,来直观感受一下 DataSet API 的编程模型,具体代码如下:
public class WordCount {public static void main(String[] args) throws Exception {
// 用于批处理的执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 数据源
DataSource<String> stringDataSource = env.fromElements("hello Flink What is Apache Flink");
// 转换
AggregateOperator<Tuple2<String, Integer>> wordCnt = stringDataSource
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] split = value.split(" ");
for (String word : split) {out.collect(Tuple2.of(word, 1));
}
}
})
.groupBy(0)
.sum(1);
// 输入
wordCnt.print();}
}
从下面的示例中能够看出,根本的编程模型是:
- 获取批处理的执行环境 ExecutionEnvironment
- 加载数据源
- 转换操作
- 数据输入
上面会对数据源、转换操作、数据输入进行一一解读。
Data Source
DataSet API 反对从多种数据源中将批量数据集读到 Flink 零碎中,并转换成 DataSet 数据集。次要包含三种类型:别离是基于文件的、基于汇合的及通用类数据源。同时在 DataSet API 中能够自定义实现 InputFormat/RichInputFormat 接口,以接入不同数据格式类型的数据源,比方 CsvInputFormat、TextInputFormat 等。从 ExecutionEnvironment 类提供的办法中能够看出反对的数据源办法,如下图所示:
基于文件的数据源
readTextFile(path) / TextInputFormat
- 解释
读取文本文件,传递文件门路参数,并将文件内容转换成 DataSet<String> 类型数据集。
- 应用
// 读取本地文件
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");
// 读取 HDSF 文件
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");
readTextFileWithValue(path)/ TextValueInputFormat
- 解释
读取文本文件内容,将文件内容转换成 DataSet[StringValue]类型数据集。该办法与 readTextFile(String)不同的是,其泛型是 StringValue,是一种可变的 String 类型,通过 StringValue 存储文本数据能够无效升高 String 对象创立数量,减小垃圾回收的压力。
- 应用
// 读取本地文件
DataSet<StringValue> localLines = env.readTextFileWithValue("file:///some/local/file");
// 读取 HDSF 文件
DataSet<StringValue> hdfsLines = env.readTextFileWithValue("hdfs://host:port/file/path");
readCsvFile(path)/ CsvInputFormat
- 解释
创立一个 CSV 的 reader,读取逗号分隔 (或其余分隔符) 的文件。能够间接转换成 Tuple 类型、POJOs 类的 DataSet。在办法中能够指定行切割符、列切割符、字段等信息。
- 应用
// read a CSV file with five fields, taking only two of them
// 读取一个具备 5 个字段的 CSV 文件,只取第一个和第四个字段
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
.includeFields("10010")
.types(String.class, Double.class);
// 读取一个有三个字段的 CSV 文件,将其转为 POJO 类型
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
.pojoType(Person.class, "name", "age", "zipcode");
readFileOfPrimitives(path, Class) / PrimitiveInputFormat
- 解释
读取一个原始数据类型 (如 String,Integer) 的文件, 返回一个对应的原始类型的 DataSet 汇合
- 应用
DataSet<String> Data = env.readFileOfPrimitives("file:///some/local/file", String.class);
基于汇合的数据源
fromCollection(Collection)
- 解释
从 java 的汇合中创立 DataSet 数据集,汇合中的元素数据类型雷同
- 应用
DataSet<String> data= env.fromCollection(arrayList);
fromElements(T …)
- 解释
从给定数据元素序列中创立 DataSet 数据集,且所有的数据对象类型必须统一
- 应用
DataSet<String> stringDataSource = env.fromElements("hello Flink What is Apache Flink");
generateSequence(from, to)
- 解释
指定 from 到 to 范畴区间,而后在区间外部生成数字序列数据集, 因为是并行处理的,所以最终的程序不能保障统一。
- 应用
DataSet<Long> longDataSource = env.generateSequence(1, 20);
通用类型数据源
DataSet API 中提供了 Inputformat 通用的数据接口,以接入不同数据源和格局类型的数据。InputFormat 接口次要分为两种类型:一种是基于文件类型,在 DataSet API 对应 readFile()办法;另外一种是基于通用数据类型的接口,例如读取 RDBMS 或 NoSQL 数据库中等,在 DataSet API 中对应 createInput()办法。
readFile(inputFormat, path) / FileInputFormat
- 解释
自定义文件类型输出源,将指定格式文件读取并转成 DataSet 数据集
- 应用
env.readFile(new MyInputFormat(), "file:///some/local/file");
createInput(inputFormat) / InputFormat
- 解释
自定义通用型数据源,将读取的数据转换为 DataSet 数据集。如以下实例应用 Flink 内置的 JDBCInputFormat,创立读取 mysql 数据源的 JDBCInput Format,实现从 mysql 中读取 Person 表,并转换成 DataSet [Row]数据集
- 应用
DataSet<Tuple2<String, Integer> dbData =
env.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost/mydb")
.setQuery("select name, age from stu")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
.finish());
Data Sink
Flink 在 DataSet API 中的数据输入共分为三种类型。第一种是基于文件实现,对应 DataSet 的 write()办法,实现将 DataSet 数据输入到文件系统中。第二种是基于通用存储介质实现,对应 DataSet 的 output()办法,例如应用 JDBCOutputFormat 将数据输入到关系型数据库中。最初一种是客户端输入,间接将 DataSet 数据从不同的节点收集到 Client,并在客户端中输入,例如 DataSet 的 print()办法。
规范的数据输入办法
// 文本数据
DataSet<String> textData = // [...]
// 将数据写入本地文件
textData.writeAsText("file:///my/result/on/localFS");
// 将数据写入 HDFS 文件
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS");
// 写数据到本地文件,如果文件存在则笼罩
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);
// 将数据输入到本地的 CSV 文件,指定分隔符为 "|"
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");
// 应用自定义的 TextFormatter 对象
values.writeAsFormattedText("file:///path/to/the/result/file",
new TextFormatter<Tuple2<Integer, Integer>>() {public String format (Tuple2<Integer, Integer> value) {return value.f1 + "-" + value.f0;}
});
应用自定义的输入类型
DataSet<Tuple3<String, Integer, Double>> myResult = [...]
// 将 tuple 类型的数据写入关系型数据库
myResult.output(
// 创立并配置 OutputFormat
JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost/mydb")
.setQuery("insert into persons (name, age, height) values (?,?,?)")
.finish());
DataSet 转换
转换 (transformations) 将一个 DataSet 转成另外一个 DataSet,Flink 提供了十分丰盛的转换操作符。具体应用如下:
Map
一进一出
DataSource<String> source = env.fromElements("I", "like", "flink");
source.map(new MapFunction<String, String>() {
@Override
// 将数据转为大写
public String map(String value) throws Exception {return value.toUpperCase();
}
}).print();
FlatMap
输出一个元素,产生 0 个、1 个或多个元素
stringDataSource
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] split = value.split(" ");
for (String word : split) {out.collect(Tuple2.of(word, 1));
}
}
})
.groupBy(0)
.sum(1);
MapPartition
性能和 Map 函数类似,只是 MapPartition 操作是在 DataSet 中基于分区对数据进行解决,函数调用中会依照分区将数据通过 Iteator 的模式传入,每个分区中的元素数与并行度无关,并返回任意数量的后果值。
source.mapPartition(new MapPartitionFunction<String, Long>() {
@Override
public void mapPartition(Iterable<String> values, Collector<Long> out) throws Exception {
long c = 0;
for (String value : values) {c++;}
// 输入每个分区元素个数
out.collect(c);
}
}).print();
Filter
过滤数据,如果返回 true 则保留数据,如果返回 false 则过滤掉
DataSource<Long> source = env.fromElements(1L, 2L, 3L,4L,5L);
source.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {return value % 2 == 0;}
}).print();
Project
仅能用在 Tuple 类型的数据集,投影操作,选取 Tuple 数据的字段的子集
DataSource<Tuple3<Long, Integer, String>> source = env.fromElements(Tuple3.of(1L, 20, "tom"),
Tuple3.of(2L, 25, "jack"),
Tuple3.of(3L, 22, "bob"));
// 去第一个和第三个元素
source.project(0, 2).print();
Reduce
通过两两合并,将数据集中的元素合并成一个元素,能够在整个数据集上应用,也能够在分组之后的数据集上应用。
DataSource<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("Flink", 1),
Tuple2.of("Flink", 1),
Tuple2.of("Hadoop", 1),
Tuple2.of("Spark", 1),
Tuple2.of("Flink", 1));
source
.groupBy(0)
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
}).print();
ReduceGroup
将数据集中的元素合并成一个元素,能够在整个数据集上应用,也能够在分组之后的数据集上应用。reduce 函数的输出值是一个分组元素的 Iterable。
DataSource<Tuple2<String, Long>> source = env.fromElements(Tuple2.of("Flink", 1L),
Tuple2.of("Flink", 1L),
Tuple2.of("Hadoop", 1L),
Tuple2.of("Spark", 1L),
Tuple2.of("Flink", 1L));
source
.groupBy(0)
.reduceGroup(new GroupReduceFunction<Tuple2<String,Long>, Tuple2<String,Long>>() {
@Override
public void reduce(Iterable<Tuple2<String, Long>> values, Collector<Tuple2<String, Long>> out) throws Exception {
Long sum = 0L;
String word = "";
for(Tuple2<String, Long> value:values){
sum += value.f1;
word = value.f0;
}
out.collect(Tuple2.of(word,sum));
}
}).print();
Aggregate
通过 Aggregate Function 将一组元素值合并成单个值,能够在整个 DataSet 数据集上应用,也能够在分组之后的数据集上应用。仅仅用在 Tuple 类型的数据集上,次要包含 Sum,Min,Max 函数
DataSource<Tuple2<String, Long>> source = env.fromElements(Tuple2.of("Flink", 1L),
Tuple2.of("Flink", 1L),
Tuple2.of("Hadoop", 1L),
Tuple2.of("Spark", 1L),
Tuple2.of("Flink", 1L));
source
.groupBy(0)
.aggregate(SUM,1)// 按第 2 个值求和
.print();
Distinct
DataSet 数据集元素去重
DataSource<Tuple> source = env.fromElements(Tuple1.of("Flink"),Tuple1.of("Flink"),Tuple1.of("hadoop"));
source.distinct(0).print();// 依照 tuple 的第一个字段去重
// 后果:(Flink)
(hadoop)
Join
默认的 join 是产生一个 Tuple2 数据类型的 DataSet,关联的 key 能够通过 key 表达式、Key-selector 函数、字段地位以及 CaseClass 字段指定。对于两个 Tuple 类型的数据集能够通过字段地位进行关联,右边数据集的字段通过 where 办法指定,左边数据集的字段通过 equalTo()办法指定。比方:
DataSource<Tuple2<Integer,String>> source1 = env.fromElements(Tuple2.of(1,"jack"),
Tuple2.of(2,"tom"),
Tuple2.of(3,"Bob"));
DataSource<Tuple2<String, Integer>> source2 = env.fromElements(Tuple2.of("order1", 1),
Tuple2.of("order2", 2),
Tuple2.of("order3", 3));
source1.join(source2).where(0).equalTo(1).print();
能够在关联的过程中指定自定义 Join Funciton, Funciton 的入参为右边数据集中的数据元素和左边数据集的中的数据元素所组成的元祖,并返回一个通过计算解决后的数据。如:
// 用户 id,购买商品名称,购买商品数量
DataSource<Tuple3<Integer,String,Integer>> source1 = env.fromElements(Tuple3.of(1,"item1",2),
Tuple3.of(2,"item2",3),
Tuple3.of(3,"item3",4));
// 商品名称与商品单价
DataSource<Tuple2<String, Integer>> source2 = env.fromElements(Tuple2.of("item1", 10),
Tuple2.of("item2", 20),
Tuple2.of("item3", 15));
source1.join(source2)
.where(1)
.equalTo(0)
.with(new JoinFunction<Tuple3<Integer,String,Integer>, Tuple2<String,Integer>, Tuple3<Integer,String,Double>>() {
// 用户每种商品购物总金额
@Override
public Tuple3<Integer, String, Double> join(Tuple3<Integer, String, Integer> first, Tuple2<String, Integer> second) throws Exception {return Tuple3.of(first.f0,first.f1,first.f2 * second.f1.doubleValue());
}
}).print();
为了可能更好地疏导 Flink 底层去正确地解决数据集,能够在 DataSet 数据集关联中,通过 Size Hint 标记数据集的大小,Flink 能够依据用户给定的 hint(提醒)调整计算策略,例如能够应用 joinWithTiny 或 joinWithHuge 提醒第二个数据集的大小。示例如下:
DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]
DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
result1 =
// 提醒第二个数据集为小数据集
input1.joinWithTiny(input2)
.where(0)
.equalTo(0);
DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
result2 =
// h 提醒第二个数据集为大数据集
input1.joinWithHuge(input2)
.where(0)
.equalTo(0);
Flink 的 runtime 能够应用多种形式执行 join。在不同的状况下,每种可能的形式都会胜过其余形式。零碎会尝试主动抉择一种正当的办法,然而容许用户手动抉择一种策略,能够让 Flink 更加灵便且高效地执行 Join 操作。
DataSet<SomeType> input1 = // [...]
DataSet<AnotherType> input2 = // [...]
// 播送第一个输出并从中构建一个哈希表,第二个输出将对其进行探测,实用于第一个数据集十分小的场景
DataSet<Tuple2<SomeType, AnotherType> result =
input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
.where("id").equalTo("key");
// 播送第二个输出并从中构建一个哈希表,第一个输出将对其进行探测,实用于第二个数据集十分小的场景
DataSet<Tuple2<SomeType, AnotherType> result =
input1.join(input2, JoinHint.BROADCAST_HASH_SECOND)
.where("id").equalTo("key");
// 将两个数据集从新分区,并将第一个数据集转换成哈希表,实用于第一个数据集比第二个数据集小,但两个数据集都比拟大的场景
DataSet<Tuple2<SomeType, AnotherType> result =
input1.join(input2, JoinHint.REPARTITION_HASH_FIRST)
.where("id").equalTo("key");
// 将两个数据集从新分区,并将第二个数据集转换成哈希表,实用于第二个数据集比第一个数据集小,但两个数据集都比拟大的场景
DataSet<Tuple2<SomeType, AnotherType> result =
input1.join(input2, JoinHint.REPARTITION_HASH_SECOND)
.where("id").equalTo("key");
// 将两个数据集从新分区,并将每个分区排序,实用于两个数据集都曾经排好序的场景
DataSet<Tuple2<SomeType, AnotherType> result =
input1.join(input2, JoinHint.REPARTITION_SORT_MERGE)
.where("id").equalTo("key");
// 相当于不指定,有零碎自行处理
DataSet<Tuple2<SomeType, AnotherType> result =
input1.join(input2, JoinHint.OPTIMIZER_CHOOSES)
.where("id").equalTo("key");
OuterJoin
OuterJoin 对两个数据集进行外关联,蕴含 left、right、full outer join 三种关联形式,别离对应 DataSet API 中的 leftOuterJoin、rightOuterJoin 以及 fullOuterJoin 办法。留神外连贯仅实用于 Java 和 Scala DataSet API.
应用形式简直和 join 相似:
// 左外连贯
source1.leftOuterJoin(source2).where(1).equalTo(0);
// 右外链接
source1.rightOuterJoin(source2).where(1).equalTo(0);
此外,外连贯也提供了相应的关联算法提醒,能够跟据左右数据集的散布状况抉择适合的优化策略,晋升数据处理的效率。上面代码能够参考下面 join 的解释。
DataSet<SomeType> input1 = // [...]
DataSet<AnotherType> input2 = // [...]
DataSet<Tuple2<SomeType, AnotherType> result1 =
input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE)
.where("id").equalTo("key");
DataSet<Tuple2<SomeType, AnotherType> result2 =
input1.rightOuterJoin(input2, JoinHint.BROADCAST_HASH_FIRST)
.where("id").equalTo("key");
对于外连贯的关联算法,与 join 有所不同。每种外连贯只反对局部算法。如下:
-
LeftOuterJoin 反对:
- OPTIMIZER_CHOOSES
- BROADCAST_HASH_SECOND
- REPARTITION_HASH_SECOND
- REPARTITION_SORT_MERGE
-
RightOuterJoin 反对:
- OPTIMIZER_CHOOSES
- BROADCAST_HASH_FIRST
- REPARTITION_HASH_FIRST
- REPARTITION_SORT_MERGE
-
FullOuterJoin 反对:
- OPTIMIZER_CHOOSES
- REPARTITION_SORT_MERGE
CoGroup
CoGroup 是对分组之后的 DataSet 进行 join 操作,将两个 DataSet 数据汇合并在一起,会先各自对每个 DataSet 依照 key 进行分组,而后将分组之后的 DataSet 传输到用户定义的 CoGroupFunction,将两个数据集依据雷同的 Key 记录组合在一起,雷同 Key 的记录会寄存在一个 Group 中,如果指定 key 仅在一个数据集中有记录,则 co-groupFunction 会将这个 Group 与空的 Group 关联。
// 用户 id,购买商品名称,购买商品数量
DataSource<Tuple3<Integer,String,Integer>> source1 = env.fromElements(Tuple3.of(1,"item1",2),
Tuple3.of(2,"item2",3),
Tuple3.of(3,"item2",4));
// 商品名称与商品单价
DataSource<Tuple2<String, Integer>> source2 = env.fromElements(Tuple2.of("item1", 10),
Tuple2.of("item2", 20),
Tuple2.of("item3", 15));
source1.coGroup(source2)
.where(1)
.equalTo(0)
.with(new CoGroupFunction<Tuple3<Integer,String,Integer>, Tuple2<String,Integer>, Tuple2<String,Double>>() {
// 每个 Iterable 存储的是分好组的数据,即雷同 key 的数据组织在一起
@Override
public void coGroup(Iterable<Tuple3<Integer, String, Integer>> first, Iterable<Tuple2<String, Integer>> second, Collector<Tuple2<String, Double>> out) throws Exception {
// 存储每种商品购买数量
int sum = 0;
for(Tuple3<Integer, String, Integer> val1:first){sum += val1.f2;}
// 每种商品数量 * 商品单价
for(Tuple2<String, Integer> val2:second){out.collect(Tuple2.of(val2.f0,sum * val2.f1.doubleValue()));
}
}
}).print();
Cross
将两个数据汇合并成一个数据集,返回被连贯的两个数据集所有数据行的笛卡儿积,返回的数据行数等于第一个数据集中合乎查问条件的数据行数乘以第二个数据集中合乎查问条件的数据行数。Cross 操作能够通过利用 Cross Funciton 将关联的数据汇合并成指标格局的数据集,如果不指定 Cross Funciton 则返回 Tuple2 类型的数据集。Cross 操作是计算密集型的算子,倡议在应用时加上算法提醒,比方crossWithTiny() and crossWithHuge().
//[id,x,y], 坐标值
DataSet<Tuple3<Integer, Integer, Integer>> coords1 = env.fromElements(Tuple3.of(1, 20, 18),
Tuple3.of(2, 15, 20),
Tuple3.of(3, 25, 10));
DataSet<Tuple3<Integer, Integer, Integer>> coords2 = env.fromElements(Tuple3.of(1, 20, 18),
Tuple3.of(2, 15, 20),
Tuple3.of(3, 25, 10));
// 求任意两点之间的欧氏间隔
coords1.cross(coords2)
.with(new CrossFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Double>>() {
@Override
public Tuple3<Integer, Integer, Double> cross(Tuple3<Integer, Integer, Integer> val1, Tuple3<Integer, Integer, Integer> val2) throws Exception {
// 计算欧式间隔
double dist = sqrt(pow(val1.f1 - val2.f1, 2) + pow(val1.f2 - val2.f2, 2));
// 返回两点之间的欧式间隔
return Tuple3.of(val1.f0,val2.f0,dist);
}
}).print();
Union
合并两个 DataSet 数据集,两个数据集的数据元素格局必须雷同,多个数据集能够间断合并.
DataSet<Tuple2<String, Integer>> vals1 = env.fromElements(Tuple2.of("jack",20),
Tuple2.of("Tom",21));
DataSet<Tuple2<String, Integer>> vals2 = env.fromElements(Tuple2.of("Robin",25),
Tuple2.of("Bob",30));
DataSet<Tuple2<String, Integer>> vals3 = env.fromElements(Tuple2.of("Jasper",24),
Tuple2.of("jarry",21));
DataSet<Tuple2<String, Integer>> unioned = vals1
.union(vals2)
.union(vals3);
unioned.print();
Rebalance
对数据集中的数据进行均匀散布,使得每个分区上的数据量雷同, 加重数据歪斜造成的影响,留神仅仅是 Map-like
类型的算子 (比方 map,flatMap) 才能够用在 Rebalance 算子之后。
DataSet<String> in = // [...]
// rebalance DataSet, 而后应用 map 算子.
DataSet<Tuple2<String, String>> out = in.rebalance()
.map(new Mapper());
Hash-Partition
依据给定的 Key 进行 Hash 分区,key 雷同的数据会被放入同一个分区内。能够应用通过元素的地位、元素的名称或者 key selector 函数指定 key。
DataSet<Tuple2<String, Integer>> in = // [...]
// 依据第一个值进行 hash 分区,而后应用 MapPartition 转换操作.
DataSet<Tuple2<String, String>> out = in.partitionByHash(0)
.mapPartition(new PartitionMapper());
Range-Partition
依据给定的 Key 进行 Range 分区,key 雷同的数据会被放入同一个分区内。能够应用通过元素的地位、元素的名称或者 key selector 函数指定 key。
DataSet<Tuple2<String, Integer>> in = // [...]
// 依据第一个值进行 Range 分区,而后应用 MapPartition 转换操作.
DataSet<Tuple2<String, String>> out = in.partitionByRange(0)
.mapPartition(new PartitionMapper());
Custom Partitioning
除了下面的分区外,还反对自定义分区函数。
DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.partitionCustom(partitioner, key)
.mapPartition(new PartitionMapper());
Sort Partition
在本地对 DataSet 数据集中的所有分区依据指定字段进行重排序,排序形式通过 Order.ASCENDING 以及 Order.DESCENDING 关键字指定。反对指定多个字段进行分区排序,如下:
DataSet<Tuple2<String, Integer>> in = // [...]
// 依照第一个字段升序排列,第二个字段降序排列.
DataSet<Tuple2<String, String>> out = in.sortPartition(1, Order.ASCENDING)
.sortPartition(0, Order.DESCENDING)
.mapPartition(new PartitionMapper());
First-n
返回数据集的 n 条随机后果,能够利用于惯例类型数据集、Grouped 类型数据集以及排序数据集上。
DataSet<Tuple2<String, Integer>> in = // [...]
// 返回数据集中的任意 5 个元素
DataSet<Tuple2<String, Integer>> out1 = in.first(5);
// 返回每个分组内的任意两个元素
DataSet<Tuple2<String, Integer>> out2 = in.groupBy(0)
.first(2);
// 返回每个分组内的前三个元素
// 分组后的数据集依照第二个字段进行升序排序
DataSet<Tuple2<String, Integer>> out3 = in.groupBy(0)
.sortGroup(1, Order.ASCENDING)
.first(3);
MinBy / MaxBy
从数据集中返回指定字段或组合对应最小或最大的记录,如果抉择的字段具备多个雷同值,则在汇合中随机抉择一条记录返回。
DataSet<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("jack",20),
Tuple2.of("Tom",21),
Tuple2.of("Robin",25),
Tuple2.of("Bob",30));
// 依照第 2 个元素比拟,找出第二个元素为最小值的那个 tuple
// 在整个 DataSet 上应用 minBy
ReduceOperator<Tuple2<String, Integer>> tuple2Reduce = source.minBy(1);
tuple2Reduce.print();// 返回(jack,20)
// 也能够在分组的 DataSet 上应用 minBy
source.groupBy(0) // 依照第一个字段进行分组
.minBy(1) // 找出每个分组内的依照第二个元素为最小值的那个 tuple
.print();
播送变量
基本概念
播送变量是分布式计算框架中常常会用到的一种数据共享形式。其次要作用是将小数据集采纳网络传输的形式,在每台机器上保护一个只读的缓存变量,所在的计算节点实例均能够在本地内存中间接读取被播送的数据集,这样可能防止在数据计算过程中屡次通过近程的形式从其余节点中读取小数据集,从而晋升整体工作的计算性能。
播送变量能够了解为一个公共的共享变量,能够把 DataSet 播送进来,这样不同的 task 都能够读取该数据,播送的数据只会在每个节点上存一份。如果不应用播送变量,则会在每个节点上的 task 中都要复制一份 dataset 数据集,导致节约内存。
应用播送变量的根本步骤如下:
// 第一步创立须要播送的数据集
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);
DataSet<String> data = env.fromElements("a", "b");
data.map(new RichMapFunction<String, String>() {
@Override
public void open(Configuration parameters) throws Exception {
// 第三步拜访汇合模式的播送变量数据集
Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
}
@Override
public String map(String value) throws Exception {...}
}).withBroadcastSet(toBroadcast, "broadcastSetName"); // 第二步播送数据集
从下面的代码能够看出,DataSet API 反对在 RichFunction 接口中通过 RuntimeContext 读取到播送变量。
首先在 RichFunction 中实现 Open()办法,而后调用 getRuntimeContext()办法获取利用的 RuntimeContext,接着调用 getBroadcastVariable()办法通过播送名称获取播送变量。同时 Flink 间接通过 collect 操作将数据集转换为本地 Collection。须要留神的是,Collection 对象的数据类型必须和定义的数据集的类型保持一致,否则会呈现类型转换问题。
注意事项:
- 因为播送变量的内容是保留在每个节点的内存中,所以播送变量数据集不易过大。
- 播送变量初始化之后,不反对批改,这样方能保障每个节点的数据都是一样的。
- 如果多个算子都要应用一份数据集,那么须要在多个算子的前面别离注册播送变量。
- 只能在批处理中应用播送变量。
应用 Demo
public class BroadcastExample {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<Tuple2<Integer,String>> RawBroadCastData = new ArrayList<>();
RawBroadCastData.add(new Tuple2<>(1,"jack"));
RawBroadCastData.add(new Tuple2<>(2,"tom"));
RawBroadCastData.add(new Tuple2<>(3,"Bob"));
// 模仿数据源,[userId,userName]
DataSource<Tuple2<Integer, String>> userInfoBroadCastData = env.fromCollection(RawBroadCastData);
ArrayList<Tuple2<Integer,Double>> rawUserAount = new ArrayList<>();
rawUserAount.add(new Tuple2<>(1,1000.00));
rawUserAount.add(new Tuple2<>(2,500.20));
rawUserAount.add(new Tuple2<>(3,800.50));
// 解决数据:用户 id,用户购买金额,[UserId,amount]
DataSet<Tuple2<Integer, Double>> userAmount = env.fromCollection(rawUserAount);
// 转换为 map 汇合类型的 DataSet
DataSet<HashMap<Integer, String>> userInfoBroadCast = userInfoBroadCastData.map(new MapFunction<Tuple2<Integer, String>, HashMap<Integer, String>>() {
@Override
public HashMap<Integer, String> map(Tuple2<Integer, String> value) throws Exception {HashMap<Integer, String> userInfo = new HashMap<>();
userInfo.put(value.f0, value.f1);
return userInfo;
}
});
DataSet<String> result = userAmount.map(new RichMapFunction<Tuple2<Integer, Double>, String>() {
// 寄存播送变量返回的 list 汇合数据
List<HashMap<String, String>> broadCastList = new ArrayList<>();
// 寄存播送变量的值
HashMap<String, String> allMap = new HashMap<>();
@Override
public void open(Configuration parameters) throws Exception {super.open(parameters);
// 获取播送数据, 返回的是一个 list 汇合
this.broadCastList = getRuntimeContext().getBroadcastVariable("userInfo");
for (HashMap<String, String> value : broadCastList) {allMap.putAll(value);
}
}
@Override
public String map(Tuple2<Integer, Double> value) throws Exception {String userName = allMap.get(value.f0);
return "用户 id:" + value.f0 + "|"+ "用户名:" + userName + "|" + "购买金额:" + value.f1;
}
}).withBroadcastSet(userInfoBroadCast, "userInfo");
result.print();}
}
分布式缓存
基本概念
Flink 提供了一个分布式缓存(distributed cache), 相似于 Hadoop,以使文件在本地可被用户函数的并行实例拜访。分布式缓存的工作机制是为程序注册一个文件或目录(本地或者近程文件系统,如 HDFS 等),通过 ExecutionEnvironment 注册一个缓存文件,并起一个别名。当程序执行的时候,Flink 会主动把注册的文件或目录复制到所有 TaskManager 节点的本地文件系统,用户能够通过注册是起的别名来查找文件或目录,而后在 TaskManager 节点的本地文件系统拜访该文件。
分布式缓存的应用步骤:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 注册一个 HDFS 文件
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
// 注册一个本地文件
env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)
// 拜访数据
getRuntimeContext().getDistributedCache().getFile("hdfsFile");
获取缓存文件的形式和播送变量类似,也是实现 RichFunction 接口,并通过 RichFunction 接口取得 RuntimeContext 对象,而后通过 RuntimeContext 提供的接口获取对应的本地缓存文件。
应用 Demo
public class DistributeCacheExample {public static void main(String[] args) throws Exception {
// 获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
/**
* 注册一个本地文件
* 文件内容为:* 1,"jack"
* 2,"tom"
* 3,"Bob"
*/
env.registerCachedFile("file:///E://userinfo.txt", "localFileUserInfo", true);
ArrayList<Tuple2<Integer,Double>> rawUserAount = new ArrayList<>();
rawUserAount.add(new Tuple2<>(1,1000.00));
rawUserAount.add(new Tuple2<>(2,500.20));
rawUserAount.add(new Tuple2<>(3,800.50));
// 解决数据:用户 id,用户购买金额,[UserId,amount]
DataSet<Tuple2<Integer, Double>> userAmount = env.fromCollection(rawUserAount);
DataSet<String> result= userAmount.map(new RichMapFunction<Tuple2<Integer, Double>, String>() {
// 保留缓存数据
HashMap<String, String> allMap = new HashMap<String, String>();
@Override
public void open(Configuration parameters) throws Exception {super.open(parameters);
// 获取分布式缓存的数据
File userInfoFile = getRuntimeContext().getDistributedCache().getFile("localFileUserInfo");
List<String> userInfo = FileUtils.readLines(userInfoFile);
for (String value : userInfo) {String[] split = value.split(",");
allMap.put(split[0], split[1]);
}
}
@Override
public String map(Tuple2<Integer, Double> value) throws Exception {String userName = allMap.get(value.f0);
return "用户 id:" + value.f0 + "|" + "用户名:" + userName + "|" + "购买金额:" + value.f1;
}
});
result.print();}
}
小结
本文次要解说了 Flink DataSet API 的根本应用。首先介绍了一个 DataSet API 的 WordCount 案例,接着介绍了 DataSet API 的数据源与 Sink 操作,以及根本的应用。而后对每一个转换操作进行了具体的解释,并给出了具体的应用案例。最初解说了播送变量和分布式缓存的概念,并就如何应用这两种高级性能,提供了残缺的 Demo 案例。
关注我的公众号:大数据技术与数仓,收费支付百 G 大数据视频材料与书籍