关于springboot:揭开神秘面纱会stream流就会大数据

44次阅读

共计 19515 个字符,预计需要花费 49 分钟才能阅读完成。

如果你会任意一门语言的 stream 流,没道理不会大数据开发。
俗话说男追女隔座山,女追男隔层纱。
如果说零根底学大数据,感觉后面是一座山,那么只有你会 java 或者任意一门语言的 stream 流,那大数据就只隔了一层纱。
本文以 java stream 流计算为例,解说一些根底的 spark 操作。另一个风行的大数据框架 flink 同理。

筹备工作
测试数据, 以下列别离示意姓名,年龄,部门,职位。
复制代码张三,20, 研发部, 普通员工
李四,31, 研发部, 普通员工
李丽,36, 财务部, 普通员工
张伟,38, 研发部, 经理
杜航,25, 人事部, 普通员工
周歌,28, 研发部, 普通员工

创立一个 Employee 类。
less 复制代码 @Getter

@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
static
class Employee implements Serializable {
    private String name;
    private Integer age;
    private String department;
    private String level;
}

}

版本:
jdk:1.8
spark:3.2.0
scala:2.12.15
下面的 scala 版本只是 spark 框架自身须要依赖到 scala。
因为 scala 的确是比拟小众的语言,本文还是应用 java 演示 spark 代码。
1.map 类
1.1 java stream map
map 示意一对一操作。将上游数据的一行数据进行任意操作,最终失去操作后的一条数据。
这种思维,在 java 和 spark,flink 都是统一的。
咱们先用 java stream 演示读取文件,再应用 map 操作将每行数据映射为 Employee 对象。
ini 复制代码 List<String> list = FileUtils.readLines(new File(“f:/test.txt”), “utf-8”);

    List<Employee> employeeList = list.stream().map(word -> {List<String> words = Arrays.stream(word.split(",")).collect(Collectors.toList());
        Employee employee = new Employee(words.get(0), Integer.parseInt(words.get(1)), words.get(2), words.get(3));
        return employee;
    }).collect(Collectors.toList());

    employeeList.forEach(System.out::println);

转换后的数据:
ini 复制代码 JavaStreamDemo.Employee(name= 张三, age=20, department= 研发部, level= 普通员工)
JavaStreamDemo.Employee(name= 李四, age=31, department= 研发部, level= 普通员工)
JavaStreamDemo.Employee(name= 李丽, age=36, department= 财务部, level= 普通员工)
JavaStreamDemo.Employee(name= 张伟, age=38, department= 研发部, level= 经理)
JavaStreamDemo.Employee(name= 杜航, age=25, department= 人事部, level= 普通员工)
JavaStreamDemo.Employee(name= 周歌, age=28, department= 研发部, level= 普通员工)

1.2 spark map
首先失去一个 SparkSession 对象,读取文件,失去一个 DataSet 弹性数据集对象。
ini 复制代码 SparkSession session = SparkSession.builder().master(“local[*]”).getOrCreate();
Dataset<Row> reader = session.read().text(“F:/test.txt”);
reader.show();

这里的 show()就是打印输出以后数据集,它是一个 action 类的算子。
失去后果:
diff 复制代码 +———————–+

value
张三,20, 研发部, 普通员工
李四,31, 研发部, 普通员工
李丽,36, 财务部, 普通员工
张伟,38, 研发部, 经理
杜航,25, 人事部, 普通员工
周歌,28, 研发部, 普通员工

当初咱们拿到了根底数据,咱们应用 map 一对一操作,将一行行数据转换为 Employee 对象。
咱们这里不应用 lamda 表达式,让大家看得更加清晰。
这里实现了 MapFunction 接口里的 call 办法,每次拿到一行数据,咱们这里进行切分,再转换为对象。

须要特地指出的一点是,与后端 WEB 利用有一个对立异样解决不同的是,大数据利用,特地是流式计算,要保障 7 *24 在线,须要对每个算子进行异样捕捉。
因为你不晓得上游数据荡涤到底怎么样,很可能拿到一条脏数据,解决的时候抛出异样,如果没有捕捉解决,那么整个利用就会挂掉。

spark 的算子分为 Transformation 和 Action 两种类型。Transformation 会开成一个 DAG 图,具备 lazy 提早性,它只会从一个 dataset(rdd/df)转换成另一个 dataset(rdd/df),只有当遇到 action 类的算子才会真正执行。
咱们明天会演示的算子都是 Transformation 类的算子。
典型的 Action 算子包含 show,collect,save 之类的。比方在本地进行 show 查看后果,或者实现运行后 save 到数据库,或者 HDFS。

spark 执行时分为 driver 和 executor。但不是本文的重点,不会开展讲。
只须要留神 driver 端会将代码散发到各个分布式系统的节点 executor 上,它自身不会参加计算。一般来说,算子内部,如以下示例代码的 a 处会在 driver 端执行,b 处算子外部会不同服务器上的 executor 端执行。
所以在算子内部定义的变量,在算子外部应用的时候要特地留神!!不要想当然地认为都是一个 main 办法里写的代码,就肯定会在同一个 JVM 里。
这里波及到序列化的问题,同时它们分处不同的 JVM,应用 ”==” 比拟的时候也可能会出问题!!
这是一个后端 WEB 开发转向大数据开发时,这个思维肯定要转变过去。
简言之,后端 WEB 服务的分布式是咱们本人实现的,大数据的分布式是框架天生帮咱们实现的。

1.2.1 MapFunction
scss 复制代码 // a 算子内部,driver 端
Dataset<Employee> employeeDataset = reader.map(new MapFunction<Row, Employee>() {

        @Override
        public Employee call(Row row) throws Exception {
            // b 算子外部,executor 端
            Employee employee = null;
            try {// gson.fromJson(); 这里应用 gson 波及到序列化问题
                List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList());
                employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3));
            } catch (Exception exception) {
                // 日志记录
                // 流式计算中要做到 7 *24 小时不间断,任意一条上流脏数据都可能导致失败,从而导致工作退出,所以这里要做好异样的抓取
                exception.printStackTrace();}
            return employee;
        }
    }, Encoders.bean(Employee.class));

    employeeDataset.show();

输入
diff 复制代码 +—+———-+——–+—-+

age department level name
20 研发部 普通员工 张三
31 研发部 普通员工 李四
36 财务部 普通员工 李丽
38 研发部 经理 张伟
25 人事部 普通员工 杜航
28 研发部 普通员工 周歌

1.2.2 MapPartitionsFunction
spark 中 map 和 mapPartitions 有啥区别?
map 是 1 条 1 条解决数据
mapPartitions 是一个分区一个分区解决数据

后者肯定比前者效率高吗?
不肯定,看具体情况。
这里应用后面 map 一样的逻辑解决。能够看到在 call 办法里失去的是一个 Iterator 迭代器,是一批数据。
失去一批数据,而后再一对一映射为对象,再以 Iterator 的模式返回这批数据。
vbnet 复制代码 Dataset<Employee> employeeDataset2 = reader.mapPartitions(new MapPartitionsFunction<Row, Employee>() {

        @Override
        public Iterator<Employee> call(Iterator<Row> iterator) throws Exception {List<Employee> employeeList = new ArrayList<>();
            while (iterator.hasNext()){Row row = iterator.next();
                try {List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList());
                    Employee employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3));
                    employeeList.add(employee);
                } catch (Exception exception) {
                    // 日志记录
                    // 流式计算中要做到 7 *24 小时不间断,任意一条上流脏数据都可能导致失败,从而导致工作退出,所以这里要做好异样的抓取
                    exception.printStackTrace();}
            }
            return employeeList.iterator();}
    }, Encoders.bean(Employee.class));

    employeeDataset2.show();

输入后果跟 map 一样,这里就不贴出来了。
2.flatMap 类
map 和 flatMap 有什么区别?
map 是一对一,flatMap 是一对多。
当然在 java stream 中,flatMap 叫法叫做扁平化。
这种思维,在 java 和 spark,flink 都是统一的。
2.1 java stream flatMap
以下代码将 1 条原始数据映射到 2 个对象上并返回。
ini 复制代码 List<Employee> employeeList2 = list.stream().flatMap(word -> {

        List<String> words = Arrays.stream(word.split(",")).collect(Collectors.toList());
        List<Employee> lists = new ArrayList<>();
        Employee employee = new Employee(words.get(0), Integer.parseInt(words.get(1)), words.get(2), words.get(3));
        lists.add(employee);
        Employee employee2 = new Employee(words.get(0)+"_2", Integer.parseInt(words.get(1)), words.get(2), words.get(3));
        lists.add(employee2);
        return lists.stream();}).collect(Collectors.toList());
    employeeList2.forEach(System.out::println);

输入
ini 复制代码 JavaStreamDemo.Employee(name= 张三, age=20, department= 研发部, level= 普通员工)
JavaStreamDemo.Employee(name= 张三_2, age=20, department= 研发部, level= 普通员工)
JavaStreamDemo.Employee(name= 李四, age=31, department= 研发部, level= 普通员工)
JavaStreamDemo.Employee(name= 李四_2, age=31, department= 研发部, level= 普通员工)
JavaStreamDemo.Employee(name= 李丽, age=36, department= 财务部, level= 普通员工)
JavaStreamDemo.Employee(name= 李丽_2, age=36, department= 财务部, level= 普通员工)
JavaStreamDemo.Employee(name= 张伟, age=38, department= 研发部, level= 经理)
JavaStreamDemo.Employee(name= 张伟_2, age=38, department= 研发部, level= 经理)
JavaStreamDemo.Employee(name= 杜航, age=25, department= 人事部, level= 普通员工)
JavaStreamDemo.Employee(name= 杜航_2, age=25, department= 人事部, level= 普通员工)
JavaStreamDemo.Employee(name= 周歌, age=28, department= 研发部, level= 普通员工)
JavaStreamDemo.Employee(name= 周歌_2, age=28, department= 研发部, level= 普通员工)

2.2 spark flatMap
这里实现 FlatMapFunction 的 call 办法,一次拿到 1 条数据,而后返回值是 Iterator,所以能够返回多条。
scss 复制代码 Dataset<Employee> employeeDatasetFlatmap = reader.flatMap(new FlatMapFunction<Row, Employee>() {

        @Override
        public Iterator<Employee> call(Row row) throws Exception {List<Employee> employeeList = new ArrayList<>();
            try {List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList());
                Employee employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3));
                employeeList.add(employee);

                Employee employee2 = new Employee(list.get(0)+"_2", Integer.parseInt(list.get(1)), list.get(2), list.get(3));
                employeeList.add(employee2);
            } catch (Exception exception) {exception.printStackTrace();
            }
            return employeeList.iterator();}
    }, Encoders.bean(Employee.class));
    employeeDatasetFlatmap.show();

输入
diff 复制代码 +—+———-+——–+——+

age department level name
20 研发部 普通员工 张三
20 研发部 普通员工 张三_2
31 研发部 普通员工 李四
31 研发部 普通员工 李四_2
36 财务部 普通员工 李丽
36 财务部 普通员工 李丽_2
38 研发部 经理 张伟
38 研发部 经理 张伟_2
25 人事部 普通员工 杜航
25 人事部 普通员工 杜航_2
28 研发部 普通员工 周歌
28 研发部 普通员工 周歌_2

3 groupby 类
与 SQL 相似,java stream 流和 spark 一样,groupby 对数据集进行分组并在此基础上能够进行聚合函数操作。也能够分组间接失去一组子数据集。
3.1 java stream groupBy
按部门分组统计部门人数:
ini 复制代码 Map<String, Long> map = employeeList.stream().collect(Collectors.groupingBy(Employee::getDepartment, Collectors.counting()));

    System.out.println(map);

输入
复制代码{财务部 =1, 人事部 =1, 研发部 =4}

3.2 spark groupBy
将映射为对象的数据集按部门分组,在此基础上统计部门员工数和平均年龄。
sql 复制代码 RelationalGroupedDataset datasetGroupBy = employeeDataset.groupBy(“department”);
// 统计每个部门有多少员工
datasetGroupBy.count().show();
/**

  • 每个部门的平均年龄
    */

datasetGroupBy.avg(“age”).withColumnRenamed(“avg(age)”,”avgAge”).show();

输入别离为
diff 复制代码 +———-+—–+

department count
财务部 1
人事部 1
研发部 4

diff 复制代码 +———-+——+

department avgAge
财务部 36.0
人事部 25.0
研发部 29.25

3.3 spark groupByKey
spark 的 groupBy 和 groupByKey 的区别, 前者在此基础上应用聚合函数失去一个聚合值,后者只是进行分组,不进行任何计算。
相似于 java stream 的:
ini 复制代码 Map<String, List<Employee>> map2 = employeeList.stream().collect(Collectors.groupingBy(Employee::getDepartment));
System.out.println(map2);

输入
css 复制代码{财务部 =[JavaStreamDemo.Employee(name= 李丽, age=36, department= 财务部, level= 普通员工)],
人事部 =[JavaStreamDemo.Employee(name= 杜航, age=25, department= 人事部, level= 普通员工)],
研发部 =[JavaStreamDemo.Employee(name= 张三, age=20, department= 研发部, level= 普通员工), JavaStreamDemo.Employee(name= 李四, age=31, department= 研发部, level= 普通员工), JavaStreamDemo.Employee(name= 张伟, age=38, department= 研发部, level= 经理), JavaStreamDemo.Employee(name= 周歌, age=28, department= 研发部, level= 普通员工)]}

应用 spark groupByKey。
先失去一个 key-value 的一对多的一个汇合数据集。
这里的 call()办法返回的是 key, 即分组的 key。
java 复制代码 KeyValueGroupedDataset keyValueGroupedDataset = employeeDataset.groupByKey(new MapFunction<Employee, String>() {

        @Override
        public String call(Employee employee) throws Exception {
            // 返回分组的 key, 这里示意依据部门进行分组
            return employee.getDepartment();}
    }, Encoders.STRING());

再在 keyValueGroupedDataset 的根底上进行 mapGroups,在 call()办法里就能够拿到每个 key 的所有原始数据。
vbnet 复制代码 keyValueGroupedDataset.mapGroups(new MapGroupsFunction() {

        @Override
        public Object call(Object key, Iterator iterator) throws Exception {System.out.println("key =" + key);
            while (iterator.hasNext()){System.out.println(iterator.next());
            }
            return iterator; 
        }
    }, Encoders.bean(Iterator.class))
            .show(); // 这里的 show()没有意义,只是触发计算而已

输入
ini 复制代码 key = 人事部
SparkDemo.Employee(name= 杜航, age=25, department= 人事部, level= 普通员工)
key = 研发部
SparkDemo.Employee(name= 张三, age=20, department= 研发部, level= 普通员工)
SparkDemo.Employee(name= 李四, age=31, department= 研发部, level= 普通员工)
SparkDemo.Employee(name= 张伟, age=38, department= 研发部, level= 经理)
SparkDemo.Employee(name= 周歌, age=28, department= 研发部, level= 普通员工)
key = 财务部
SparkDemo.Employee(name= 李丽, age=36, department= 财务部, level= 普通员工)

4 reduce 类
reduce 的字面意思是:缩小;减小;升高;放大。
又叫归约。
它将数据集进行循环,让以后对象和前一对象两两进行计算,每次计算失去的后果作为下一次计算的前一对象,并最终失去一个对象。
假如有 5 个数据【1,2,3,4,5】,应用 reduce 进行求和计算,别离是

比方下面的测试数据集,我要计算各部门年龄总数。应用聚合函数失去的是一个 int 类型的数字。
4.1 java stream reduce
scss 复制代码 int age = employeeList.stream().mapToInt(e -> e.age).sum();
System.out.println(age);//178

应用 reduce 也可进行下面的计算
scss 复制代码 int age1 = employeeList.stream().mapToInt(e -> e.getAge()).reduce(0,(a,b) -> a+b);
System.out.println(age1);// 178

然而我将年龄求和,同时失去一个残缺的对象呢?
ini 复制代码 JavaStreamDemo.Employee(name= 周歌, age=178, department= 研发部, level= 普通员工)

能够应用 reduce 将数据集两两循环,将年龄相加,同时返回最初一个遍历的对象。
上面代码的 pre 代表前一个对象,current 代表以后对象。
scss 复制代码 /**

  • pre 代表前一个对象
  • current 代表以后对象
    */

Employee reduceEmployee = employeeList.stream().reduce(new Employee(), (pre,current) -> {

 // 当第一次循环时前一个对象为 null
if (pre.getAge() == null) {current.setAge(current.getAge());
} else {current.setAge(pre.getAge() + current.getAge());
}
return current;

});
System.out.println(reduceEmployee);

4.2 spark reduce
spark reduce 的根本思维跟 java stream 是一样的。
间接看代码:
ini 复制代码 Employee datasetReduce = employeeDataset.reduce(new ReduceFunction<Employee>() {

        @Override
        public Employee call(Employee t1, Employee t2) throws Exception {
            // 不同的版本看是否须要判断 t1 == null
            t2.setAge(t1.getAge() + t2.getAge());
            return t2;
        }
    });

    System.out.println(datasetReduce);

输入
ini 复制代码 SparkDemo.Employee(name= 周歌, age=178, department= 研发部, level= 普通员工)

其它常见操作类
ini 复制代码 Employee employee = employeeDataset.filter(“age > 30”).limit(3).sort(“age”).first();
System.out.println(employee);
// SparkDemo.Employee(name= 李四, age=31, department= 研发部, level= 普通员工)

同时能够将 dataset 注册成 table,应用更为弱小的 SQL 来进行各种弱小的运算。
当初 SQL 是 flink 的一等公民,spark 也不遑多让。
这里举一个非常简单的例子。
css 复制代码 employeeDataset.registerTempTable(“table”);
session.sql(“select * from table where age > 30 order by age desc limit 3”).show();

输入
diff 复制代码 +—+———-+——–+—-+

age department level name
38 研发部 经理 张伟
36 财务部 普通员工 李丽
31 研发部 普通员工 李四

scss 复制代码 employeeDataset.registerTempTable(“table”);
session.sql(“select

        concat_ws(',',collect_set(name)) as names, // group_concat
        avg(age) as age,
        department from table 
        where age > 30  
        group by department 
        order by age desc 
        limit 3").show();

输入
diff 复制代码 +———+—-+———-+

names age department
李丽 36.0 财务部
张伟, 李四 34.5 研发部

小结
本文根据 java stream 的相似性,介绍了 spark 外面一些常见的算子操作。
本文只是做一个非常简单的入门介绍。
如果感兴趣的话,
后端的同学能够尝试着操作一下,非常简单,本地不须要搭建环境,只有引入 spark 的 maven 依赖即可。
我把本文的所有代码全副贴在最初面。
java stream 源码:

点击查看代码
ini 复制代码 import lombok.*;
import org.apache.commons.io.FileUtils;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class JavaStreamDemo {

public static void main(String[] args) throws IOException {
    /**
     * 张三,20, 研发部, 普通员工
     * 李四,31, 研发部, 普通员工
     * 李丽,36, 财务部, 普通员工
     * 张伟,38, 研发部, 经理
     * 杜航,25, 人事部, 普通员工
     * 周歌,28, 研发部, 普通员工
     */
    List<String> list = FileUtils.readLines(new File("f:/test.txt"), "utf-8");
    List<Employee> employeeList = list.stream().map(word -> {List<String> words = Arrays.stream(word.split(",")).collect(Collectors.toList());
        Employee employee = new Employee(words.get(0), Integer.parseInt(words.get(1)), words.get(2), words.get(3));
        return employee;
    }).collect(Collectors.toList());

    // employeeList.forEach(System.out::println);

    List<Employee> employeeList2 = list.stream().flatMap(word -> {List<String> words = Arrays.stream(word.split(",")).collect(Collectors.toList());
        List<Employee> lists = new ArrayList<>();
        Employee employee = new Employee(words.get(0), Integer.parseInt(words.get(1)), words.get(2), words.get(3));
        lists.add(employee);
        Employee employee2 = new Employee(words.get(0)+"_2", Integer.parseInt(words.get(1)), words.get(2), words.get(3));
        lists.add(employee2);
        return lists.stream();}).collect(Collectors.toList());
    // employeeList2.forEach(System.out::println);

    Map<String, Long> map = employeeList.stream().collect(Collectors.groupingBy(Employee::getDepartment, Collectors.counting()));
    System.out.println(map);
    Map<String, List<Employee>> map2 = employeeList.stream().collect(Collectors.groupingBy(Employee::getDepartment));
    System.out.println(map2);

    int age = employeeList.stream().mapToInt(e -> e.age).sum();
    System.out.println(age);// 178

    int age1 = employeeList.stream().mapToInt(e -> e.getAge()).reduce(0,(a,b) -> a+b);
    System.out.println(age1);// 178

    /**
     * pre 代表前一个对象
     * current 代表以后对象
     */
    Employee reduceEmployee = employeeList.stream().reduce(new Employee(), (pre,current) -> {if (pre.getAge() == null) {current.setAge(current.getAge());
        } else {current.setAge(pre.getAge() + current.getAge());
        }
        return current;
    });
    System.out.println(reduceEmployee);




}

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
static
class Employee implements Serializable {
    private String name;
    private Integer age;
    private String department;
    private String level;
}

}

spark 的源码:

点击查看代码
java 复制代码 import com.google.gson.Gson;
import lombok.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.*;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;

public class SparkDemo {

public static void main(String[] args) {SparkSession session = SparkSession.builder().master("local[*]").getOrCreate();
    Dataset<Row> reader = session.read().text("F:/test.txt");
    // reader.show();
    /**
     * +-----------------------+
     * |                  value|
     * +-----------------------+
     * | 张三,20, 研发部, 普通员工 |
     * | 李四,31, 研发部, 普通员工 |
     * | 李丽,36, 财务部, 普通员工 |
     * | 张伟,38, 研发部, 经理 |
     * | 杜航,25, 人事部, 普通员工 |
     * | 周歌,28, 研发部, 普通员工 |
     * +-----------------------+
     */

    // 本地演示而已,理论分布式环境,这里的 gson 波及到序列化问题
    // 算子以外的代码都在 driver 端运行
    // 任何算子以内的代码都在 executor 端运行, 即会在不同的服务器节点上执行
    Gson gson = new Gson();
    // a 算子内部,driver 端
    Dataset<Employee> employeeDataset = reader.map(new MapFunction<Row, Employee>() {
        @Override
        public Employee call(Row row) throws Exception {
            // b 算子外部,executor 端
            Employee employee = null;
            try {// gson.fromJson(); 这里应用 gson 波及到序列化问题
                List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList());
                employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3));
            } catch (Exception exception) {
                // 日志记录
                // 流式计算中要做到 7 *24 小时不间断,任意一条上流脏数据都可能导致失败,从而导致工作退出,所以这里要做好异样的抓取
                exception.printStackTrace();}
            return employee;
        }
    }, Encoders.bean(Employee.class));

    // employeeDataset.show();
    /**
     * +---+----------+--------+----+
     * |age|department|   level|name|
     * +---+----------+--------+----+
     * | 20|    研发部 | 普通员工 | 张三 |
     * | 31|    研发部 | 普通员工 | 李四 |
     * | 36|    财务部 | 普通员工 | 李丽 |
     * | 38|    研发部 |    经理 | 张伟 |
     * | 25|    人事部 | 普通员工 | 杜航 |
     * | 28|    研发部 | 普通员工 | 周歌 |
     */

    Dataset<Employee> employeeDataset2 = reader.mapPartitions(new MapPartitionsFunction<Row, Employee>() {
        @Override
        public Iterator<Employee> call(Iterator<Row> iterator) throws Exception {List<Employee> employeeList = new ArrayList<>();
            while (iterator.hasNext()){Row row = iterator.next();
                try {List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList());
                    Employee employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3));
                    employeeList.add(employee);
                } catch (Exception exception) {
                    // 日志记录
                    // 流式计算中要做到 7 *24 小时不间断,任意一条上流脏数据都可能导致失败,从而导致工作退出,所以这里要做好异样的抓取
                    exception.printStackTrace();}
            }
            return employeeList.iterator();}
    }, Encoders.bean(Employee.class));

    // employeeDataset2.show();
    /**
     * +---+----------+--------+----+
     * |age|department|   level|name|
     * +---+----------+--------+----+
     * | 20|    研发部 | 普通员工 | 张三 |
     * | 31|    研发部 | 普通员工 | 李四 |
     * | 36|    财务部 | 普通员工 | 李丽 |
     * | 38|    研发部 |    经理 | 张伟 |
     * | 25|    人事部 | 普通员工 | 杜航 |
     * | 28|    研发部 | 普通员工 | 周歌 |
     * +---+----------+--------+----+
     */

    Dataset<Employee> employeeDatasetFlatmap = reader.flatMap(new FlatMapFunction<Row, Employee>() {
        @Override
        public Iterator<Employee> call(Row row) throws Exception {List<Employee> employeeList = new ArrayList<>();
            try {List<String> list = Arrays.stream(row.mkString().split(",")).collect(Collectors.toList());
                Employee employee = new Employee(list.get(0), Integer.parseInt(list.get(1)), list.get(2), list.get(3));
                employeeList.add(employee);

                Employee employee2 = new Employee(list.get(0)+"_2", Integer.parseInt(list.get(1)), list.get(2), list.get(3));
                employeeList.add(employee2);
            } catch (Exception exception) {exception.printStackTrace();
            }
            return employeeList.iterator();}
    }, Encoders.bean(Employee.class));

// employeeDatasetFlatmap.show();

    /**
     * +---+----------+--------+------+
     * |age|department|   level|  name|
     * +---+----------+--------+------+
     * | 20|    研发部 | 普通员工 |  张三 |
     * | 20|    研发部 | 普通员工 | 张三_2|
     * | 31|    研发部 | 普通员工 |  李四 |
     * | 31|    研发部 | 普通员工 | 李四_2|
     * | 36|    财务部 | 普通员工 |  李丽 |
     * | 36|    财务部 | 普通员工 | 李丽_2|
     * | 38|    研发部 |    经理 |  张伟 |
     * | 38|    研发部 |    经理 | 张伟_2|
     * | 25|    人事部 | 普通员工 |  杜航 |
     * | 25|    人事部 | 普通员工 | 杜航_2|
     * | 28|    研发部 | 普通员工 |  周歌 |
     * | 28|    研发部 | 普通员工 | 周歌_2|
     * +---+----------+--------+------+
     */

    RelationalGroupedDataset datasetGroupBy = employeeDataset.groupBy("department");
    // 统计每个部门有多少员工
    // datasetGroupBy.count().show();
    /**
     * +----------+-----+
     * |department|count|
     * +----------+-----+
     * |    财务部 |    1|
     * |    人事部 |    1|
     * |    研发部 |    4|
     * +----------+-----+
     */
    /**
     * 每个部门的平均年龄
     */
    // datasetGroupBy.avg("age").withColumnRenamed("avg(age)","avgAge").show();
    /**
     * +----------+--------+
     * |department|avg(age)|
     * +----------+--------+
     * |    财务部 |    36.0|
     * |    人事部 |    25.0|
     * |    研发部 |   29.25|
     * +----------+--------+
     */

    KeyValueGroupedDataset keyValueGroupedDataset = employeeDataset.groupByKey(new MapFunction<Employee, String>() {
        @Override
        public String call(Employee employee) throws Exception {
            // 返回分组的 key, 这里示意依据部门进行分组
            return employee.getDepartment();}
    }, Encoders.STRING());

    keyValueGroupedDataset.mapGroups(new MapGroupsFunction() {
        @Override
        public Object call(Object key, Iterator iterator) throws Exception {System.out.println("key =" + key);
            while (iterator.hasNext()){System.out.println(iterator.next());
            }
            return iterator;
            /**
             * key = 人事部
             * SparkDemo.Employee(name= 杜航, age=25, department= 人事部, level= 普通员工)
             * key = 研发部
             * SparkDemo.Employee(name= 张三, age=20, department= 研发部, level= 普通员工)
             * SparkDemo.Employee(name= 李四, age=31, department= 研发部, level= 普通员工)
             * SparkDemo.Employee(name= 张伟, age=38, department= 研发部, level= 经理)
             * SparkDemo.Employee(name= 周歌, age=28, department= 研发部, level= 普通员工)
             * key = 财务部
             * SparkDemo.Employee(name= 李丽, age=36, department= 财务部, level= 普通员工)
             */
        }
    }, Encoders.bean(Iterator.class))
            .show(); // 这里的 show()没有意义,只是触发计算而已


    Employee datasetReduce = employeeDataset.reduce(new ReduceFunction<Employee>() {
        @Override
        public Employee call(Employee t1, Employee t2) throws Exception {
            // 不同的版本看是否须要判断 t1 == null
            t2.setAge(t1.getAge() + t2.getAge());
            return t2;
        }
    });

    System.out.println(datasetReduce);


    Employee employee = employeeDataset.filter("age > 30").limit(3).sort("age").first();
    System.out.println(employee);
    // SparkDemo.Employee(name= 李四, age=31, department= 研发部, level= 普通员工)

    employeeDataset.registerTempTable("table");
    session.sql("select * from table where age > 30 order by age desc limit 3").show();

    /**
     * +---+----------+--------+----+
     * |age|department|   level|name|
     * +---+----------+--------+----+
     * | 38|    研发部 |    经理 | 张伟 |
     * | 36|    财务部 | 普通员工 | 李丽 |
     * | 31|    研发部 | 普通员工 | 李四 |
     * +---+----------+--------+----+
     */


}

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
public static class Employee implements Serializable {
    private String name;
    private Integer age;
    private String department;
    private String level;
}

}

spark maven 依赖, 自行不须要的 spark-streaming,kafka 依赖去掉。

正文完
 0