如果你会任意一门语言的stream流,没道理不会大数据开发。
俗话说男追女隔座山,女追男隔层纱。
如果说零根底学大数据,感觉后面是一座山,那么只有你会java或者任意一门语言的stream流,那大数据就只隔了一层纱。
本文以java stream流计算为例,解说一些根底的spark操作。另一个风行的大数据框架flink同理。
筹备工作
测试数据,以下列别离示意姓名,年龄,部门,职位。
复制代码张三,20,研发部,普通员工
李四,31,研发部,普通员工
李丽,36,财务部,普通员工
张伟,38,研发部,经理
杜航,25,人事部,普通员工
周歌,28,研发部,普通员工
创立一个Employee类。
less复制代码 @Getter
@Setter@AllArgsConstructor@NoArgsConstructor@ToStringstaticclass Employee implements Serializable { private String name; private Integer age; private String department; private String level;}
}
版本:
jdk:1.8
spark:3.2.0
scala:2.12.15
下面的scala版本只是spark框架自身须要依赖到scala。
因为scala的确是比拟小众的语言,本文还是应用java演示spark代码。
1.map类
1.1 java stream map
map示意一对一操作。将上游数据的一行数据进行任意操作,最终失去操作后的一条数据。
这种思维,在java和spark,flink都是统一的。
咱们先用java stream演示读取文件,再应用map操作将每行数据映射为Employee对象。
ini复制代码List<String> list = FileUtils.readLines(new File("f:/test.txt"), "utf-8");
List<Employee> employeeList = list.stream().map(word -> { List<String> words = Arrays.stream(word.split(",")).collect(Collectors.toList()); Employee employee = new Employee(words.get(0), Integer.parseInt(words.get(1)), words.get(2), words.get(3)); return employee; }).collect(Collectors.toList()); employeeList.forEach(System.out::println);
转换后的数据:
ini复制代码JavaStreamDemo.Employee(name=张三, age=20, department=研发部, level=普通员工)
JavaStreamDemo.Employee(name=李四, age=31, department=研发部, level=普通员工)
JavaStreamDemo.Employee(name=李丽, age=36, department=财务部, level=普通员工)
JavaStreamDemo.Employee(name=张伟, age=38, department=研发部, level=经理)
JavaStreamDemo.Employee(name=杜航, age=25, department=人事部, level=普通员工)
JavaStreamDemo.Employee(name=周歌, age=28, department=研发部, level=普通员工)
1.2 spark map
首先失去一个SparkSession对象,读取文件,失去一个DataSet弹性数据集对象。
ini复制代码SparkSession session = SparkSession.builder().master("local[*]").getOrCreate();
Dataset<Row> reader = session.read().text("F:/test.txt");
reader.show();
这里的show()就是打印输出以后数据集,它是一个action类的算子。
失去后果:
diff复制代码+-----------------------+
value |
---|
张三,20,研发部,普通员工 |
李四,31,研发部,普通员工 |
李丽,36,财务部,普通员工 |
张伟,38,研发部,经理 |
杜航,25,人事部,普通员工 |
周歌,28,研发部,普通员工 |
当初咱们拿到了根底数据,咱们应用map一对一操作,将一行行数据转换为Employee对象。
咱们这里不应用lamda表达式,让大家看得更加清晰。
这里实现了MapFunction接口里的call办法,每次拿到一行数据,咱们这里进行切分,再转换为对象。
须要特地指出的一点是,与后端WEB利用有一个对立异样解决不同的是,大数据利用,特地是流式计算,要保障7*24在线,须要对每个算子进行异样捕捉。
因为你不晓得上游数据荡涤到底怎么样,很可能拿到一条脏数据,解决的时候抛出异样,如果没有捕捉解决,那么整个利用就会挂掉。
spark的算子分为Transformation和Action两种类型。Transformation会开成一个DAG图,具备lazy提早性,它只会从一个dataset(rdd/df)转换成另一个dataset(rdd/df),只有当遇到action类的算子才会真正执行。
咱们明天会演示的算子都是Transformation类的算子。
典型的Action算子包含show,collect,save之类的。比方在本地进行show查看后果,或者实现运行后save到数据库,或者HDFS。
spark执行时分为driver和executor。但不是本文的重点,不会开展讲。
只须要留神driver端会将代码散发到各个分布式系统的节点executor上,它自身不会参加计算。一般来说,算子内部,如以下示例代码的a处会在driver端执行,b处算子外部会不同服务器上的executor端执行。
所以在算子内部定义的变量,在算子外部应用的时候要特地留神!! 不要想当然地认为都是一个main办法里写的代码,就肯定会在同一个JVM里。
这里波及到序列化的问题,同时它们分处不同的JVM,应用"=="比拟的时候也可能会出问题!!
这是一个后端WEB开发转向大数据开发时,这个思维肯定要转变过去。
简言之,后端WEB服务的分布式是咱们本人实现的,大数据的分布式是框架天生帮咱们实现的。
1.2.1 MapFunction
scss复制代码// a 算子内部,driver端
Dataset<Employee> employeeDataset = reader.map(new MapFunction<Row, Employee>() {
@Override public Employee call(Row row) throws Exception { // b 算子外部,executor端 Employee employee = null; try { // gson.fromJson(); 这里应用gson波及到序列化问题 List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList()); employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3)); } catch (Exception exception) { // 日志记录 // 流式计算中要做到7*24小时不间断,任意一条上流脏数据都可能导致失败,从而导致工作退出,所以这里要做好异样的抓取 exception.printStackTrace(); } return employee; } }, Encoders.bean(Employee.class)); employeeDataset.show();
输入
diff复制代码+---+----------+--------+----+
age | department | level | name |
---|---|---|---|
20 | 研发部 | 普通员工 | 张三 |
31 | 研发部 | 普通员工 | 李四 |
36 | 财务部 | 普通员工 | 李丽 |
38 | 研发部 | 经理 | 张伟 |
25 | 人事部 | 普通员工 | 杜航 |
28 | 研发部 | 普通员工 | 周歌 |
1.2.2 MapPartitionsFunction
spark中 map和mapPartitions有啥区别?
map是1条1条解决数据
mapPartitions是一个分区一个分区解决数据
后者肯定比前者效率高吗?
不肯定,看具体情况。
这里应用后面 map 一样的逻辑解决。能够看到在call办法里失去的是一个Iterator迭代器,是一批数据。
失去一批数据,而后再一对一映射为对象,再以Iterator的模式返回这批数据。
vbnet复制代码Dataset<Employee> employeeDataset2 = reader.mapPartitions(new MapPartitionsFunction<Row, Employee>() {
@Override public Iterator<Employee> call(Iterator<Row> iterator) throws Exception { List<Employee> employeeList = new ArrayList<>(); while (iterator.hasNext()){ Row row = iterator.next(); try { List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList()); Employee employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3)); employeeList.add(employee); } catch (Exception exception) { // 日志记录 // 流式计算中要做到7*24小时不间断,任意一条上流脏数据都可能导致失败,从而导致工作退出,所以这里要做好异样的抓取 exception.printStackTrace(); } } return employeeList.iterator(); } }, Encoders.bean(Employee.class)); employeeDataset2.show();
输入后果跟map一样,这里就不贴出来了。
2.flatMap类
map和flatMap有什么区别?
map是一对一,flatMap是一对多。
当然在java stream中,flatMap叫法叫做扁平化。
这种思维,在java和spark,flink都是统一的。
2.1 java stream flatMap
以下代码将1条原始数据映射到2个对象上并返回。
ini复制代码List<Employee> employeeList2 = list.stream().flatMap(word -> {
List<String> words = Arrays.stream(word.split(",")).collect(Collectors.toList()); List<Employee> lists = new ArrayList<>(); Employee employee = new Employee(words.get(0), Integer.parseInt(words.get(1)), words.get(2), words.get(3)); lists.add(employee); Employee employee2 = new Employee(words.get(0)+"_2", Integer.parseInt(words.get(1)), words.get(2), words.get(3)); lists.add(employee2); return lists.stream(); }).collect(Collectors.toList()); employeeList2.forEach(System.out::println);
输入
ini复制代码JavaStreamDemo.Employee(name=张三, age=20, department=研发部, level=普通员工)
JavaStreamDemo.Employee(name=张三_2, age=20, department=研发部, level=普通员工)
JavaStreamDemo.Employee(name=李四, age=31, department=研发部, level=普通员工)
JavaStreamDemo.Employee(name=李四_2, age=31, department=研发部, level=普通员工)
JavaStreamDemo.Employee(name=李丽, age=36, department=财务部, level=普通员工)
JavaStreamDemo.Employee(name=李丽_2, age=36, department=财务部, level=普通员工)
JavaStreamDemo.Employee(name=张伟, age=38, department=研发部, level=经理)
JavaStreamDemo.Employee(name=张伟_2, age=38, department=研发部, level=经理)
JavaStreamDemo.Employee(name=杜航, age=25, department=人事部, level=普通员工)
JavaStreamDemo.Employee(name=杜航_2, age=25, department=人事部, level=普通员工)
JavaStreamDemo.Employee(name=周歌, age=28, department=研发部, level=普通员工)
JavaStreamDemo.Employee(name=周歌_2, age=28, department=研发部, level=普通员工)
2.2 spark flatMap
这里实现FlatMapFunction的call办法,一次拿到1条数据,而后返回值是Iterator,所以能够返回多条。
scss复制代码Dataset<Employee> employeeDatasetFlatmap = reader.flatMap(new FlatMapFunction<Row, Employee>() {
@Override public Iterator<Employee> call(Row row) throws Exception { List<Employee> employeeList = new ArrayList<>(); try { List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList()); Employee employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3)); employeeList.add(employee); Employee employee2 = new Employee(list.get(0)+"_2", Integer.parseInt(list.get(1)), list.get(2), list.get(3)); employeeList.add(employee2); } catch (Exception exception) { exception.printStackTrace(); } return employeeList.iterator(); } }, Encoders.bean(Employee.class)); employeeDatasetFlatmap.show();
输入
diff复制代码+---+----------+--------+------+
age | department | level | name |
---|---|---|---|
20 | 研发部 | 普通员工 | 张三 |
20 | 研发部 | 普通员工 | 张三_2 |
31 | 研发部 | 普通员工 | 李四 |
31 | 研发部 | 普通员工 | 李四_2 |
36 | 财务部 | 普通员工 | 李丽 |
36 | 财务部 | 普通员工 | 李丽_2 |
38 | 研发部 | 经理 | 张伟 |
38 | 研发部 | 经理 | 张伟_2 |
25 | 人事部 | 普通员工 | 杜航 |
25 | 人事部 | 普通员工 | 杜航_2 |
28 | 研发部 | 普通员工 | 周歌 |
28 | 研发部 | 普通员工 | 周歌_2 |
3 groupby类
与SQL相似,java stream流和spark一样,groupby对数据集进行分组并在此基础上能够进行聚合函数操作。也能够分组间接失去一组子数据集。
3.1 java stream groupBy
按部门分组统计部门人数:
ini复制代码Map<String, Long> map = employeeList.stream().collect(Collectors.groupingBy(Employee::getDepartment, Collectors.counting()));
System.out.println(map);
输入
复制代码{财务部=1, 人事部=1, 研发部=4}
3.2 spark groupBy
将映射为对象的数据集按部门分组,在此基础上统计部门员工数和平均年龄。
sql复制代码RelationalGroupedDataset datasetGroupBy = employeeDataset.groupBy("department");
// 统计每个部门有多少员工
datasetGroupBy.count().show();
/**
- 每个部门的平均年龄
*/
datasetGroupBy.avg("age").withColumnRenamed("avg(age)","avgAge").show();
输入别离为
diff复制代码+----------+-----+
department | count |
---|---|
财务部 | 1 |
人事部 | 1 |
研发部 | 4 |
diff复制代码+----------+------+
department | avgAge |
---|---|
财务部 | 36.0 |
人事部 | 25.0 |
研发部 | 29.25 |
3.3 spark groupByKey
spark的groupBy和groupByKey的区别,前者在此基础上应用聚合函数失去一个聚合值,后者只是进行分组,不进行任何计算。
相似于java stream的:
ini复制代码Map<String, List<Employee>> map2 = employeeList.stream().collect(Collectors.groupingBy(Employee::getDepartment));
System.out.println(map2);
输入
css复制代码{财务部=[JavaStreamDemo.Employee(name=李丽, age=36, department=财务部, level=普通员工)],
人事部=[JavaStreamDemo.Employee(name=杜航, age=25, department=人事部, level=普通员工)],
研发部=[JavaStreamDemo.Employee(name=张三, age=20, department=研发部, level=普通员工), JavaStreamDemo.Employee(name=李四, age=31, department=研发部, level=普通员工), JavaStreamDemo.Employee(name=张伟, age=38, department=研发部, level=经理), JavaStreamDemo.Employee(name=周歌, age=28, department=研发部, level=普通员工)]}
应用spark groupByKey。
先失去一个key-value的一对多的一个汇合数据集。
这里的call()办法返回的是key,即分组的key。
java复制代码KeyValueGroupedDataset keyValueGroupedDataset = employeeDataset.groupByKey(new MapFunction<Employee, String>() {
@Override public String call(Employee employee) throws Exception { // 返回分组的key,这里示意依据部门进行分组 return employee.getDepartment(); } }, Encoders.STRING());
再在keyValueGroupedDataset 的根底上进行mapGroups,在call()办法里就能够拿到每个key的所有原始数据。
vbnet复制代码keyValueGroupedDataset.mapGroups(new MapGroupsFunction() {
@Override public Object call(Object key, Iterator iterator) throws Exception { System.out.println("key = " + key); while (iterator.hasNext()){ System.out.println(iterator.next()); } return iterator; } }, Encoders.bean(Iterator.class)) .show(); // 这里的show()没有意义,只是触发计算而已
输入
ini复制代码key = 人事部
SparkDemo.Employee(name=杜航, age=25, department=人事部, level=普通员工)
key = 研发部
SparkDemo.Employee(name=张三, age=20, department=研发部, level=普通员工)
SparkDemo.Employee(name=李四, age=31, department=研发部, level=普通员工)
SparkDemo.Employee(name=张伟, age=38, department=研发部, level=经理)
SparkDemo.Employee(name=周歌, age=28, department=研发部, level=普通员工)
key = 财务部
SparkDemo.Employee(name=李丽, age=36, department=财务部, level=普通员工)
4 reduce类
reduce的字面意思是:缩小;减小;升高;放大。
又叫归约。
它将数据集进行循环,让以后对象和前一对象两两进行计算,每次计算失去的后果作为下一次计算的前一对象,并最终失去一个对象。
假如有5个数据【1,2,3,4,5】,应用reduce进行求和计算,别离是
比方下面的测试数据集,我要计算各部门年龄总数。应用聚合函数失去的是一个int类型的数字。
4.1 java stream reduce
scss复制代码int age = employeeList.stream().mapToInt(e -> e.age).sum();
System.out.println(age);//178
应用reduce也可进行下面的计算
scss复制代码int age1 = employeeList.stream().mapToInt(e -> e.getAge()).reduce(0,(a,b) -> a+b);
System.out.println(age1);// 178
然而我将年龄求和,同时失去一个残缺的对象呢?
ini复制代码JavaStreamDemo.Employee(name=周歌, age=178, department=研发部, level=普通员工)
能够应用reduce将数据集两两循环,将年龄相加,同时返回最初一个遍历的对象。
上面代码的pre 代表前一个对象,current 代表以后对象。
scss复制代码 /**
- pre 代表前一个对象
- current 代表以后对象
*/
Employee reduceEmployee = employeeList.stream().reduce(new Employee(), (pre,current) -> {
// 当第一次循环时前一个对象为nullif (pre.getAge() == null) { current.setAge(current.getAge());} else { current.setAge(pre.getAge() + current.getAge());}return current;
});
System.out.println(reduceEmployee);
4.2 spark reduce
spark reduce的根本思维跟java stream是一样的。
间接看代码:
ini复制代码 Employee datasetReduce = employeeDataset.reduce(new ReduceFunction<Employee>() {
@Override public Employee call(Employee t1, Employee t2) throws Exception { // 不同的版本看是否须要判断t1 == null t2.setAge(t1.getAge() + t2.getAge()); return t2; } }); System.out.println(datasetReduce);
输入
ini复制代码SparkDemo.Employee(name=周歌, age=178, department=研发部, level=普通员工)
其它常见操作类
ini复制代码Employee employee = employeeDataset.filter("age > 30").limit(3).sort("age").first();
System.out.println(employee);
// SparkDemo.Employee(name=李四, age=31, department=研发部, level=普通员工)
同时能够将dataset注册成table,应用更为弱小的SQL来进行各种弱小的运算。
当初SQL是flink的一等公民,spark也不遑多让。
这里举一个非常简单的例子。
css复制代码employeeDataset.registerTempTable("table");
session.sql("select * from table where age > 30 order by age desc limit 3").show();
输入
diff复制代码+---+----------+--------+----+
age | department | level | name |
---|---|---|---|
38 | 研发部 | 经理 | 张伟 |
36 | 财务部 | 普通员工 | 李丽 |
31 | 研发部 | 普通员工 | 李四 |
scss复制代码employeeDataset.registerTempTable("table");
session.sql("select
concat_ws(',',collect_set(name)) as names, // group_concat avg(age) as age, department from table where age > 30 group by department order by age desc limit 3").show();
输入
diff复制代码+---------+----+----------+
names | age | department |
---|---|---|
李丽 | 36.0 | 财务部 |
张伟,李四 | 34.5 | 研发部 |
小结
本文根据java stream的相似性,介绍了spark外面一些常见的算子操作。
本文只是做一个非常简单的入门介绍。
如果感兴趣的话,
后端的同学能够尝试着操作一下,非常简单,本地不须要搭建环境,只有引入spark 的 maven依赖即可。
我把本文的所有代码全副贴在最初面。
java stream 源码:
点击查看代码
ini复制代码import lombok.*;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class JavaStreamDemo {
public static void main(String[] args) throws IOException { /** * 张三,20,研发部,普通员工 * 李四,31,研发部,普通员工 * 李丽,36,财务部,普通员工 * 张伟,38,研发部,经理 * 杜航,25,人事部,普通员工 * 周歌,28,研发部,普通员工 */ List<String> list = FileUtils.readLines(new File("f:/test.txt"), "utf-8"); List<Employee> employeeList = list.stream().map(word -> { List<String> words = Arrays.stream(word.split(",")).collect(Collectors.toList()); Employee employee = new Employee(words.get(0), Integer.parseInt(words.get(1)), words.get(2), words.get(3)); return employee; }).collect(Collectors.toList()); // employeeList.forEach(System.out::println); List<Employee> employeeList2 = list.stream().flatMap(word -> { List<String> words = Arrays.stream(word.split(",")).collect(Collectors.toList()); List<Employee> lists = new ArrayList<>(); Employee employee = new Employee(words.get(0), Integer.parseInt(words.get(1)), words.get(2), words.get(3)); lists.add(employee); Employee employee2 = new Employee(words.get(0)+"_2", Integer.parseInt(words.get(1)), words.get(2), words.get(3)); lists.add(employee2); return lists.stream(); }).collect(Collectors.toList()); // employeeList2.forEach(System.out::println); Map<String, Long> map = employeeList.stream().collect(Collectors.groupingBy(Employee::getDepartment, Collectors.counting())); System.out.println(map); Map<String, List<Employee>> map2 = employeeList.stream().collect(Collectors.groupingBy(Employee::getDepartment)); System.out.println(map2); int age = employeeList.stream().mapToInt(e -> e.age).sum(); System.out.println(age);// 178 int age1 = employeeList.stream().mapToInt(e -> e.getAge()).reduce(0,(a,b) -> a+b); System.out.println(age1);// 178 /** * pre 代表前一个对象 * current 代表以后对象 */ Employee reduceEmployee = employeeList.stream().reduce(new Employee(), (pre,current) -> { if (pre.getAge() == null) { current.setAge(current.getAge()); } else { current.setAge(pre.getAge() + current.getAge()); } return current; }); System.out.println(reduceEmployee);}@Getter@Setter@AllArgsConstructor@NoArgsConstructor@ToStringstaticclass Employee implements Serializable { private String name; private Integer age; private String department; private String level;}
}
spark的源码:
点击查看代码
java复制代码import com.google.gson.Gson;
import lombok.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.*;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
public class SparkDemo {
public static void main(String[] args) { SparkSession session = SparkSession.builder().master("local[*]").getOrCreate(); Dataset<Row> reader = session.read().text("F:/test.txt"); // reader.show(); /** * +-----------------------+ * | value| * +-----------------------+ * |张三,20,研发部,普通员工| * |李四,31,研发部,普通员工| * |李丽,36,财务部,普通员工| * |张伟,38,研发部,经理| * |杜航,25,人事部,普通员工| * |周歌,28,研发部,普通员工| * +-----------------------+ */ // 本地演示而已,理论分布式环境,这里的gson波及到序列化问题 // 算子以外的代码都在driver端运行 // 任何算子以内的代码都在executor端运行,即会在不同的服务器节点上执行 Gson gson = new Gson(); // a 算子内部,driver端 Dataset<Employee> employeeDataset = reader.map(new MapFunction<Row, Employee>() { @Override public Employee call(Row row) throws Exception { // b 算子外部,executor端 Employee employee = null; try { // gson.fromJson(); 这里应用gson波及到序列化问题 List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList()); employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3)); } catch (Exception exception) { // 日志记录 // 流式计算中要做到7*24小时不间断,任意一条上流脏数据都可能导致失败,从而导致工作退出,所以这里要做好异样的抓取 exception.printStackTrace(); } return employee; } }, Encoders.bean(Employee.class)); // employeeDataset.show(); /** * +---+----------+--------+----+ * |age|department| level|name| * +---+----------+--------+----+ * | 20| 研发部|普通员工|张三| * | 31| 研发部|普通员工|李四| * | 36| 财务部|普通员工|李丽| * | 38| 研发部| 经理|张伟| * | 25| 人事部|普通员工|杜航| * | 28| 研发部|普通员工|周歌| */ Dataset<Employee> employeeDataset2 = reader.mapPartitions(new MapPartitionsFunction<Row, Employee>() { @Override public Iterator<Employee> call(Iterator<Row> iterator) throws Exception { List<Employee> employeeList = new ArrayList<>(); while (iterator.hasNext()){ Row row = iterator.next(); try { List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList()); Employee employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3)); employeeList.add(employee); } catch (Exception exception) { // 日志记录 // 流式计算中要做到7*24小时不间断,任意一条上流脏数据都可能导致失败,从而导致工作退出,所以这里要做好异样的抓取 exception.printStackTrace(); } } return employeeList.iterator(); } }, Encoders.bean(Employee.class)); // employeeDataset2.show(); /** * +---+----------+--------+----+ * |age|department| level|name| * +---+----------+--------+----+ * | 20| 研发部|普通员工|张三| * | 31| 研发部|普通员工|李四| * | 36| 财务部|普通员工|李丽| * | 38| 研发部| 经理|张伟| * | 25| 人事部|普通员工|杜航| * | 28| 研发部|普通员工|周歌| * +---+----------+--------+----+ */ Dataset<Employee> employeeDatasetFlatmap = reader.flatMap(new FlatMapFunction<Row, Employee>() { @Override public Iterator<Employee> call(Row row) throws Exception { List<Employee> employeeList = new ArrayList<>(); try { List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList()); Employee employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3)); employeeList.add(employee); Employee employee2 = new Employee(list.get(0)+"_2", Integer.parseInt(list.get(1)), list.get(2), list.get(3)); employeeList.add(employee2); } catch (Exception exception) { exception.printStackTrace(); } return employeeList.iterator(); } }, Encoders.bean(Employee.class));
// employeeDatasetFlatmap.show();
/** * +---+----------+--------+------+ * |age|department| level| name| * +---+----------+--------+------+ * | 20| 研发部|普通员工| 张三| * | 20| 研发部|普通员工|张三_2| * | 31| 研发部|普通员工| 李四| * | 31| 研发部|普通员工|李四_2| * | 36| 财务部|普通员工| 李丽| * | 36| 财务部|普通员工|李丽_2| * | 38| 研发部| 经理| 张伟| * | 38| 研发部| 经理|张伟_2| * | 25| 人事部|普通员工| 杜航| * | 25| 人事部|普通员工|杜航_2| * | 28| 研发部|普通员工| 周歌| * | 28| 研发部|普通员工|周歌_2| * +---+----------+--------+------+ */ RelationalGroupedDataset datasetGroupBy = employeeDataset.groupBy("department"); // 统计每个部门有多少员工 // datasetGroupBy.count().show(); /** * +----------+-----+ * |department|count| * +----------+-----+ * | 财务部| 1| * | 人事部| 1| * | 研发部| 4| * +----------+-----+ */ /** * 每个部门的平均年龄 */ // datasetGroupBy.avg("age").withColumnRenamed("avg(age)","avgAge").show(); /** * +----------+--------+ * |department|avg(age)| * +----------+--------+ * | 财务部| 36.0| * | 人事部| 25.0| * | 研发部| 29.25| * +----------+--------+ */ KeyValueGroupedDataset keyValueGroupedDataset = employeeDataset.groupByKey(new MapFunction<Employee, String>() { @Override public String call(Employee employee) throws Exception { // 返回分组的key,这里示意依据部门进行分组 return employee.getDepartment(); } }, Encoders.STRING()); keyValueGroupedDataset.mapGroups(new MapGroupsFunction() { @Override public Object call(Object key, Iterator iterator) throws Exception { System.out.println("key = " + key); while (iterator.hasNext()){ System.out.println(iterator.next()); } return iterator; /** * key = 人事部 * SparkDemo.Employee(name=杜航, age=25, department=人事部, level=普通员工) * key = 研发部 * SparkDemo.Employee(name=张三, age=20, department=研发部, level=普通员工) * SparkDemo.Employee(name=李四, age=31, department=研发部, level=普通员工) * SparkDemo.Employee(name=张伟, age=38, department=研发部, level=经理) * SparkDemo.Employee(name=周歌, age=28, department=研发部, level=普通员工) * key = 财务部 * SparkDemo.Employee(name=李丽, age=36, department=财务部, level=普通员工) */ } }, Encoders.bean(Iterator.class)) .show(); // 这里的show()没有意义,只是触发计算而已 Employee datasetReduce = employeeDataset.reduce(new ReduceFunction<Employee>() { @Override public Employee call(Employee t1, Employee t2) throws Exception { // 不同的版本看是否须要判断t1 == null t2.setAge(t1.getAge() + t2.getAge()); return t2; } }); System.out.println(datasetReduce); Employee employee = employeeDataset.filter("age > 30").limit(3).sort("age").first(); System.out.println(employee); // SparkDemo.Employee(name=李四, age=31, department=研发部, level=普通员工) employeeDataset.registerTempTable("table"); session.sql("select * from table where age > 30 order by age desc limit 3").show(); /** * +---+----------+--------+----+ * |age|department| level|name| * +---+----------+--------+----+ * | 38| 研发部| 经理|张伟| * | 36| 财务部|普通员工|李丽| * | 31| 研发部|普通员工|李四| * +---+----------+--------+----+ */}@Getter@Setter@AllArgsConstructor@NoArgsConstructor@ToStringpublic static class Employee implements Serializable { private String name; private Integer age; private String department; private String level;}
}
spark maven依赖,自行不须要的spark-streaming,kafka依赖去掉。