Flink 为了可能解决有边界的数据集和无边界的数据集,提供了对应的 DataSet API 和 DataStream API。咱们能够开发对应的 Java 程序或者 Scala 程序来实现相应的性能。上面举例了一些 DataSet API 中的根本的算子。
上面咱们通过具体的代码来为大家演示每个算子的作用。
1、Map、FlatMap 与 MapPartition
// 获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<String> data = new ArrayList<String>();
data.add("I love Beijing");
data.add("I love China");
data.add("Beijing is the capital of China");
DataSource<String> text = env.fromCollection(data);
DataSet<List<String>> mapData = text.map(new MapFunction<String, List<String>>() {public List<String> map(String data) throws Exception {String[] words = data.split(" ");
// 创立一个 List
List<String> result = new ArrayList<String>();
for(String w:words){result.add(w);
}
return result;
}
});
mapData.print();
System.out.println("*****************************************");
DataSet<String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() {public void flatMap(String data, Collector<String> collection) throws Exception {String[] words = data.split(" ");
for(String w:words){collection.collect(w);
}
}
});
flatMapData.print();
System.out.println("*****************************************");
/* new MapPartitionFunction<String, String>
第一个 String:示意分区中的数据元素类型
第二个 String:示意解决后的数据元素类型 */
DataSet<String> mapPartitionData = text.mapPartition(new MapPartitionFunction<String, String>() {public void mapPartition(Iterable<String> values, Collector<String> out) throws Exception {
// 针对分区进行操作的益处是:比方要进行数据库的操作,一个分区只须要创立一个 Connection
//values 中保留了一个分区的数据
Iterator<String> it = values.iterator();
while (it.hasNext()) {String next = it.next();
String[] split = next.split(" ");
for (String word : split) {out.collect(word);
}
}
// 敞开链接
}
});
mapPartitionData.print();
2、Filter 与 Distinct
// 获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList<String> data = new ArrayList<String>();
data.add("I love Beijing");
data.add("I love China");
data.add("Beijing is the capital of China");
DataSource<String> text = env.fromCollection(data);
DataSet<String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() {public void flatMap(String data, Collector<String> collection) throws Exception {String[] words = data.split(" ");
for(String w:words){collection.collect(w);
}
}
});
// 去掉反复的单词
flatMapData.distinct().print();
System.out.println("*********************");
// 选出长度大于 3 的单词
flatMapData.filter(new FilterFunction<String>() {public boolean filter(String word) throws Exception {int length = word.length();
return length>3?true:false;
}
}).print();
3、Join 操作
// 获取运行的环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 创立第一张表:用户 ID 姓名
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();
data1.add(new Tuple2(1,"Tom"));
data1.add(new Tuple2(2,"Mike"));
data1.add(new Tuple2(3,"Mary"));
data1.add(new Tuple2(4,"Jone"));
// 创立第二张表:用户 ID 所在的城市
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();
data2.add(new Tuple2(1,"北京"));
data2.add(new Tuple2(2,"上海"));
data2.add(new Tuple2(3,"广州"));
data2.add(new Tuple2(4,"重庆"));
// 实现 join 的多表查问:用户 ID 姓名 所在的程序
DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);
DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);
table1.join(table2).where(0).equalTo(0)
/* 第一个 Tuple2<Integer,String>:示意第一张表
* 第二个 Tuple2<Integer,String>:示意第二张表
* Tuple3<Integer,String, String>:多表 join 连贯查问后的返回后果 */
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String, String>>() {
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1,
Tuple2<Integer, String> table2) throws Exception {return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1);
} }).print();
4、笛卡尔积
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 创立第一张表:用户 ID 姓名
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();
data1.add(new Tuple2(1,"Tom"));
data1.add(new Tuple2(2,"Mike"));
data1.add(new Tuple2(3,"Mary"));
data1.add(new Tuple2(4,"Jone"));
// 创立第二张表:用户 ID 所在的城市
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();
data2.add(new Tuple2(1,"北京"));
data2.add(new Tuple2(2,"上海"));
data2.add(new Tuple2(3,"广州"));
data2.add(new Tuple2(4,"重庆"));
// 实现 join 的多表查问:用户 ID 姓名 所在的程序
DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);
DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);
// 生成笛卡尔积
table1.cross(table2).print();
5、First-N
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 这里的数据是:员工姓名、薪水、部门号
DataSet<Tuple3<String, Integer,Integer>> grade =
env.fromElements(new Tuple3<String, Integer,Integer>("Tom",1000,10),
new Tuple3<String, Integer,Integer>("Mary",1500,20),
new Tuple3<String, Integer,Integer>("Mike",1200,30),
new Tuple3<String, Integer,Integer>("Jerry",2000,10));
// 依照插入程序取前三条记录
grade.first(3).print();
System.out.println("**********************");
// 先依照部门号排序,在依照薪水排序
grade.sortPartition(2, Order.ASCENDING).sortPartition(1, Order.ASCENDING).print();
System.out.println("**********************");
// 依照部门号分组,求每组的第一条记录
grade.groupBy(2).first(1).print();
6、外链接操作
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 创立第一张表:用户 ID 姓名
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();
data1.add(new Tuple2(1,"Tom"));
data1.add(new Tuple2(3,"Mary"));
data1.add(new Tuple2(4,"Jone"));
// 创立第二张表:用户 ID 所在的城市
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();
data2.add(new Tuple2(1,"北京"));
data2.add(new Tuple2(2,"上海"));
data2.add(new Tuple2(4,"重庆"));
// 实现 join 的多表查问:用户 ID 姓名 所在的程序
DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);
DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);
// 左外连贯
table1.leftOuterJoin(table2).where(0).equalTo(0)
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1,
Tuple2<Integer, String> table2) throws Exception {
// 左外连贯示意等号右边的信息会被蕴含
if(table2 == null){return new Tuple3<Integer, String, String>(table1.f0,table1.f1,null);
}else{return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1);
}
}
}).print();
System.out.println("***********************************");
// 右外连贯
table1.rightOuterJoin(table2).where(0).equalTo(0)
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {
public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1,
Tuple2<Integer, String> table2) throws Exception {
// 右外链接示意等号左边的表的信息会被蕴含
if(table1 == null){return new Tuple3<Integer, String, String>(table2.f0,null,table2.f1);
}else{return new Tuple3<Integer, String, String>(table2.f0,table1.f1,table2.f1);
}
}
}).print();
System.out.println("***********************************");
// 全外连贯
table1.fullOuterJoin(table2).where(0).equalTo(0)
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1, Tuple2<Integer, String> table2)
throws Exception {if(table1 == null){return new Tuple3<Integer, String, String>(table2.f0,null,table2.f1);
}else if(table2 == null){return new Tuple3<Integer, String, String>(table1.f0,table1.f1,null);
}else{return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1);
}
}
}).print();