Tranform(转换算子)
map
package com.journey.core.rdd.transform;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import java.util.ArrayList;import java.util.List;/** * 将解决的数据逐条进行映射转换,这里的转换能够是类型的转换,也能够是指的转换 */public class MapRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("MapRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); List<Integer> nums = new ArrayList<>(); nums.add(1); nums.add(2); nums.add(3); nums.add(4); JavaRDD<Integer> numsRDD = sc.parallelize(nums); JavaRDD<Integer> mapRDD = numsRDD.map(new Function<Integer, Integer>() { @Override public Integer call(Integer value) throws Exception { return value * 2; } }); mapRDD.collect().forEach(System.out::println); JavaRDD<String> fileRDD = sc.textFile("datas/apache.log"); JavaRDD<String> urlRDD = fileRDD.map(new Function<String, String>() { @Override public String call(String line) throws Exception { return line.split(" ")[6]; } }); urlRDD.collect().forEach(System.out::println); sc.stop(); }}
mapPartitions
package com.journey.core.rdd.transform;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function;import java.util.ArrayList;import java.util.Iterator;import java.util.List;/** * 将解决的数据以分区为单位发送给计算节点进行解决,这里的解决是指能够进行任意的解决,哪怕是过滤数据 * * map和mapPartitions的区别? * 数据处理角度 * Map算子是分区内一个数据一个数据的执行,相似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作 * * 性能的角度 * Map算子次要目标将数据源中的数据进行转换和扭转。然而不会缩小或增多数据。MapPartitions算子须要传递一个迭代器,返回一个迭代器,没有要求的元素的个数 * 放弃不变,所以能够减少或缩小数据 * * 性能角度 * Map算子因为相似于串行操作,所以性能比拟低,而mapPartitions算子相似于批处理,所以性能较高。然而mapPartitions算子会长工夫占用内存,那么这样会导致 * 内存可能不够用,呈现内存溢出的谬误。所以在内存无限的状况下,不举荐应用。应用map操作 */public class MapPartitionsRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("MapPartitionsRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); List<Integer> nums = new ArrayList<>(); nums.add(1); nums.add(2); nums.add(3); nums.add(4); JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2); JavaRDD<Integer> mapPartitionsRDD = numsRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() { @Override public Iterator<Integer> call(Iterator<Integer> iterator) throws Exception { // 留神,这里只会打印两遍,为什么呢?是因为有两个分区,每个分区解决一次 System.out.println("xxxxxxxxxxx"); List<Integer> result = new ArrayList<>(); while (iterator.hasNext()) { Integer num = iterator.next(); result.add(num * 2); } return result.iterator(); } }); mapPartitionsRDD.collect().forEach(System.out::println); // 计算每个分区的最大值 JavaRDD<Integer> maxPartitionValueRDD = mapPartitionsRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() { @Override public Iterator<Integer> call(Iterator<Integer> iterator) throws Exception { List<Integer> result = new ArrayList<>(); Integer maxValue = Integer.MIN_VALUE; while (iterator.hasNext()) { Integer value = iterator.next(); if (value > maxValue) { maxValue = value; } } result.add(maxValue); return result.iterator(); } }); maxPartitionValueRDD.collect().forEach(System.out::println); sc.stop(); }}
mapPartitionsWithIndex
package com.journey.core.rdd.transform;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import java.util.ArrayList;import java.util.Collections;import java.util.Iterator;import java.util.List;/** * 将解决的数据以分区为单位发送到计算节点进行解决,这里解决的是指能够进行任意的解决,哪怕是过滤数据,在解决时同时能够获取以后分区的索引 */public class MapPartitionsWithIndexRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("MapPartitionsWithIndexRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); List<Integer> nums = new ArrayList<>(); nums.add(1); nums.add(2); nums.add(3); nums.add(4); JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2); Function2 mpIndexFunction = new Function2<Integer, Iterator<Integer>, Iterator<Integer>>(){ @Override public Iterator<Integer> call(Integer index, Iterator<Integer> iterator) throws Exception { if(index == 0){ return iterator; } // 返回一个空的迭代器 return Collections.emptyIterator(); } }; // mapPartitionsWithIndex 的时候须要留神,preservesPartitioning是否保留 partitioner // 函数内部申明 JavaRDD mpRDD = numsRDD.mapPartitionsWithIndex(mpIndexFunction, true); mpRDD.collect().forEach(System.out::println); sc.stop(); }}
flatMap
package com.journey.core.rdd.transform;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function;import java.util.ArrayList;import java.util.Arrays;import java.util.Iterator;import java.util.List;/** * 将解决的数据进行扁平化后再进行映射解决,所以算子也称之为扁平映射,说白了其实就是能够一对多的输入 */public class FlatMapRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("FlatMapRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> fileRDD = sc.textFile("datas/wc"); JavaRDD<String> wordRDD = fileRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String line) throws Exception { return Arrays.stream(line.split(" ")).iterator(); } }); wordRDD.collect().forEach(System.out::println); List<ArrayList<Integer>> nums = new ArrayList<>(); ArrayList<Integer> nums1 = new ArrayList<>(); nums1.add(1); nums1.add(2); nums.add(nums1); ArrayList<Integer> nums2 = new ArrayList<>(); nums2.add(3); nums2.add(4); nums.add(nums2); JavaRDD<ArrayList<Integer>> numsRDD = sc.parallelize(nums); JavaRDD<Integer> numsFlatMapRDD = numsRDD.flatMap(new FlatMapFunction<ArrayList<Integer>, Integer>() { @Override public Iterator<Integer> call(ArrayList<Integer> integers) throws Exception { return integers.iterator(); } }); numsFlatMapRDD.collect().forEach(System.out::println); sc.stop(); }}
mapValues
package com.journey.core.rdd.transform;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import scala.Tuple2;import java.util.ArrayList;import java.util.List;/** * 只对value进行操作 */public class MapValuesRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("MapValuesRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<String, Integer>> userInfos = new ArrayList<>(); userInfos.add(Tuple2.apply("Alice", 300)); userInfos.add(Tuple2.apply("zhangsan", 200)); userInfos.add(Tuple2.apply("lisi", 309)); userInfos.add(Tuple2.apply("wagnwu", 201)); userInfos.add(Tuple2.apply("mayun", 234)); userInfos.add(Tuple2.apply("haha", 223)); JavaPairRDD<String, Integer> userInfosRDD = sc.parallelizePairs(userInfos, 2); // 都涨薪100 JavaPairRDD<String, Integer> userInfosSalaryAdd100 = userInfosRDD.mapValues(new Function<Integer, Integer>() { @Override public Integer call(Integer v1) throws Exception { return v1 + 100; } }); userInfosSalaryAdd100.collect().forEach(System.out::println); sc.stop(); }}
glom
package com.journey.core.rdd.transform;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import java.util.ArrayList;import java.util.Collections;import java.util.List;/** * 将同一个分区的数据间接转换为雷同类型的内存数组进行解决,分区不变 */public class GlomRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("GlomRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); List<Integer> nums = new ArrayList<>(); nums.add(1); nums.add(2); nums.add(3); nums.add(4); JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2); JavaRDD<List<Integer>> glomRDD = numsRDD.glom(); JavaRDD<Integer> mapRDD = glomRDD.map(new Function<List<Integer>, Integer>() { @Override public Integer call(List<Integer> nums) throws Exception { return Collections.max(nums); } }); List<Integer> resultList = mapRDD.collect(); Integer result = resultList.stream().reduce(Integer::sum).orElse(0); System.out.println(result); sc.stop(); }}
groupBy
package com.journey.core.rdd.transform;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import java.util.ArrayList;import java.util.Collection;import java.util.List;/** * reduceByKey和groupByKey的区别? * 从shuffle角度 : reduceByKey和groupByKey都存在shuffle操作,然而reduceByKey能够在shuffle前对分区内雷同的key进行预聚合(combine)性能, * 这样会缩小落盘的数据量,而groupByKey只是进行分组,不存在数据量缩小的问题,reduceByKey性能比拟高 * * 从性能角度: reduceByKey其实蕴含分区和聚合的性能。GroupByKey只能分组,不能聚合,所以分组聚合场景下,举荐应用reduceByKey,如果仅仅是分组而 * 不须要聚合。那么还是只能应用reduceByKey */public class GroupByKeyRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("GroupByKeyRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); List<String> words = new ArrayList<>(); words.add("Hello"); words.add("Spark"); words.add("Spark"); words.add("World"); JavaRDD<String> wordsRDD = sc.parallelize(words); JavaPairRDD<String, Integer> wordToPairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return Tuple2.apply(word, 1); } }); JavaPairRDD<String, Iterable<Integer>> wordGroupByRDD = wordToPairRDD.groupByKey(); JavaPairRDD<String, Integer> wordCountRDD = wordGroupByRDD.mapValues(new Function<Iterable<Integer>, Integer>() { @Override public Integer call(Iterable<Integer> iterable) throws Exception { return ((Collection<?>) iterable).size(); } }); wordCountRDD.collect().forEach(System.out::println); sc.stop(); }}
filter
package com.journey.core.rdd.transform;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import java.text.SimpleDateFormat;import java.util.ArrayList;import java.util.Collection;import java.util.Date;import java.util.List;/** * 将数据依据指定的规定进行筛选过滤,合乎规定的数据保留,不合乎规定的数据抛弃。当数据进行筛选过滤过,分区不变,然而分区内的数据可能不平衡 * 生成环境下,可能会呈现数据歪斜,所以个别filter之后能够repartition */public class FilterRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("FilterRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> logFileRDD = sc.textFile("datas/apache.log"); JavaRDD<String> filterRDD = logFileRDD.filter(new Function<String, Boolean>() { @Override public Boolean call(String value) throws Exception { return value.contains("7/05/2015"); } }); JavaRDD<String> mapRDD = filterRDD.map(new Function<String, String>() { @Override public String call(String value) throws Exception { String[] fields = value.split(" "); return fields[6]; } }); mapRDD.collect().forEach(System.out::println); sc.stop(); }}
sample
package com.journey.core.rdd.transform;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import java.util.ArrayList;import java.util.List;/** * 其实次要查看一下数据的散布 */public class SampleRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("SampleRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); List<Integer> nums = new ArrayList<>(); nums.add(1); nums.add(2); nums.add(3); nums.add(4); JavaRDD<Integer> numsRDD = sc.parallelize(nums); /** * 第一个参数 : 抽取的数据是否放回,false : 不放回,true : 放回 * 第二个参数 : 抽取的几率,范畴在[0,1]之间,抽取呈现的概率,大于1,反复几率 * 第三个参数 : 随机种子 */ JavaRDD<Integer> sampleRDD1 = numsRDD.sample(false, 0.5); JavaRDD<Integer> sampleRDD2 = numsRDD.sample(true, 3); sampleRDD1.collect().forEach(System.out::println); System.out.println("**************************"); sampleRDD2.collect().forEach(System.out::println); sc.stop(); }}
distinct
package com.journey.core.rdd.transform;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import java.util.ArrayList;import java.util.List;/** * 将数据集中反复的数据去重 */public class DistinctRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("DistinctRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); List<Integer> nums = new ArrayList<>(); nums.add(1); nums.add(1); nums.add(2); nums.add(3); nums.add(3); nums.add(1); JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2); JavaRDD<Integer> distinctRDD = numsRDD.distinct(2); distinctRDD.collect().forEach(System.out::println); sc.stop(); }}
coalesce
package com.journey.core.rdd.transform;import com.clearspring.analytics.util.Lists;import org.apache.commons.collections.IteratorUtils;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import java.util.ArrayList;import java.util.Iterator;import java.util.List;/** * 依据数据量缩减分区,用于大数据集过滤后,进步小数据集的执行效率 * 当Spark程序中,存在过多的小工作的时候,能够通过coalesce办法,缩减合并分区,缩小分区的个数,缩小任务调度老本 */public class CoalesceRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("CoalesceRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); List<Integer> nums = new ArrayList<>(); nums.add(1); nums.add(2); nums.add(3); nums.add(4); nums.add(5); nums.add(6); JavaRDD<Integer> numsRDD = sc.parallelize(nums, 6); /** * coalesce其实须要留神一点,就是默认shuffle为false,也就是在缩减分区的时候,是进行分区的合并的 * coalesce 在不shuffle的状况下,不能减少分区 */ JavaRDD<Integer> coalesceRDD = numsRDD.coalesce(2); coalesceRDD.saveAsTextFile("datas/output"); sc.stop(); }}
repartition
package com.journey.core.rdd.transform;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import java.util.ArrayList;import java.util.List;/** * 该操作外部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区少的RDD,还是将分区少的RDD * 转换为分区多的RDD,repartition都能够实现,因为无论如何都会通过shuffle过程 */public class RepartitionRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("RepartitionRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); List<Integer> nums = new ArrayList<>(); nums.add(1); nums.add(2); nums.add(3); nums.add(4); nums.add(5); nums.add(6); JavaRDD<Integer> numsRDD = sc.parallelize(nums, 6); JavaRDD<Integer> coalesceRDD = numsRDD.repartition(10); coalesceRDD.saveAsTextFile("datas/output"); sc.stop(); }}
intersection & union & subtract & zip
package com.journey.core.rdd.transform;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import java.util.ArrayList;import java.util.List;/** * 该操作用于排序数据。在排序之前,能够将数据通过f函数进行解决,之后依照f函数解决的后果进行排序,默认是升序排序。排序后新产生的RDD的分区数 * 与原RDD分区数始终。两头存在shuffle的过程 */public class IntersectionRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("SortByRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); List<Integer> nums1 = new ArrayList<>(); nums1.add(1); nums1.add(2); nums1.add(3); nums1.add(4); List<Integer> nums2 = new ArrayList<>(); nums2.add(3); nums2.add(4); nums2.add(5); nums2.add(6); List<String> nums3 = new ArrayList<>(); nums3.add("3"); JavaRDD<Integer> nums1RDD = sc.parallelize(nums1,1); JavaRDD<Integer> nums2RDD = sc.parallelize(nums2,1); // 必须雷同类型 JavaRDD<Integer> intersectionRDD = nums1RDD.intersection(nums2RDD); JavaRDD<Integer> unionRDD = nums1RDD.union(nums2RDD); // 必须雷同类型 JavaRDD<Integer> subtractRDD = nums1RDD.subtract(nums2RDD); // 必须雷同类型,雷同分区个数 JavaPairRDD<Integer, Integer> zipRDD = nums1RDD.zip(nums2RDD); intersectionRDD.collect().forEach(System.out::println); System.out.println("******************************"); unionRDD.collect().forEach(System.out::println); System.out.println("******************************"); subtractRDD.collect().forEach(System.out::println); System.out.println("******************************"); zipRDD.collect().forEach(System.out::println); sc.stop(); }}
partitionBy
package com.journey.core.rdd.transform;import org.apache.spark.Partitioner;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import scala.Tuple2;import java.util.ArrayList;import java.util.List;/** * 将数据依照指定Partitioner从新进行分区。Spark默认的分区器是HashPartitioner */public class PartitionerByRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("PartitionerByRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<String, String>> infos = new ArrayList<>(); infos.add(Tuple2.apply("1305261989234", "zhangsan")); infos.add(Tuple2.apply("1505261989234", "lisi")); infos.add(Tuple2.apply("1305261982343", "wagnwu")); infos.add(Tuple2.apply("1505261382343", "zhaoliu")); // 将130结尾的放入一个分区,将150结尾放入一个分区中 // TODO 留神,如果是pairs,须要调用的是parallelizePairs JavaPairRDD<String, String> infosRDD = sc.parallelizePairs(infos, 2); JavaPairRDD<String, String> partitionByRDD = infosRDD.partitionBy(new Partitioner() { @Override public int numPartitions() { return 2; } @Override public int getPartition(Object key) { String item = key.toString(); if (item.startsWith("130")) { return 0; } else if (item.startsWith("150")) { return 1; } return 0; } }); partitionByRDD.collect().forEach(System.out::println); sc.stop(); }}
reduceByKey
package com.journey.core.rdd.transform;import org.apache.spark.Partitioner;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import java.util.ArrayList;import java.util.List;/** * 能够将雷同的key对应的value进行聚合 */public class ReduceByKeyRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("ReduceByKeyRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); List<String> words = new ArrayList<>(); words.add("Hello"); words.add("Spark"); words.add("Spark"); words.add("World"); JavaRDD<String> wordsRDD = sc.parallelize(words); JavaPairRDD<String, Integer> wordToPairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return Tuple2.apply(word, 1); } }); JavaPairRDD<String, Integer> wordCountRDD = wordToPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordCountRDD.collect().forEach(System.out::println); sc.stop(); }}
groupByKey
package com.journey.core.rdd.transform;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import java.util.ArrayList;import java.util.Collection;import java.util.List;/** * reduceByKey和groupByKey的区别? * 从shuffle角度 : reduceByKey和groupByKey都存在shuffle操作,然而reduceByKey能够在shuffle前对分区内雷同的key进行预聚合(combine)性能, * 这样会缩小落盘的数据量,而groupByKey只是进行分组,不存在数据量缩小的问题,reduceByKey性能比拟高 * * 从性能角度: reduceByKey其实蕴含分区和聚合的性能。GroupByKey只能分组,不能聚合,所以分组聚合场景下,举荐应用reduceByKey,如果仅仅是分组而 * 不须要聚合。那么还是只能应用reduceByKey */public class GroupByKeyRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("GroupByKeyRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); List<String> words = new ArrayList<>(); words.add("Hello"); words.add("Spark"); words.add("Spark"); words.add("World"); JavaRDD<String> wordsRDD = sc.parallelize(words); JavaPairRDD<String, Integer> wordToPairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return Tuple2.apply(word, 1); } }); JavaPairRDD<String, Iterable<Integer>> wordGroupByRDD = wordToPairRDD.groupByKey(); JavaPairRDD<String, Integer> wordCountRDD = wordGroupByRDD.mapValues(new Function<Iterable<Integer>, Integer>() { @Override public Integer call(Iterable<Integer> iterable) throws Exception { return ((Collection<?>) iterable).size(); } }); wordCountRDD.collect().forEach(System.out::println); sc.stop(); }}
aggregateByKey
package com.journey.core.rdd.transform;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import java.util.ArrayList;import java.util.Collection;import java.util.List;/** * 第一个参数示意初始值 * 第二个参数分区内的计算规定 * 第三个参数分区间的计算规定 */public class AggregateByKeyRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("AggregateByKeyRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<String, Integer>> words = new ArrayList<>(); words.add(Tuple2.apply("Hello", 3)); words.add(Tuple2.apply("Spark", 2)); words.add(Tuple2.apply("Hello", 10)); words.add(Tuple2.apply("Spark", 17)); JavaPairRDD<String, Integer> wordsRDD = sc.parallelizePairs(words, 2); // aggregateByKey 的初始值只会参加分区内的计算 JavaPairRDD<String, Integer> aggregateByKeyRDD = wordsRDD.aggregateByKey(10, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); aggregateByKeyRDD.collect().forEach(System.out::println); // aggregateByKey 的初始值只会参加分区内的计算 JavaPairRDD<String, Integer> aggregateByKeyRDD2 = wordsRDD.aggregateByKey(10, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { // 分区内计算最大值 return Math.max(v1, v2); } }, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); aggregateByKeyRDD2.collect().forEach(System.out::println); sc.stop(); }}
foldByKey
package com.journey.core.rdd.transform;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function2;import scala.Tuple2;import java.util.ArrayList;import java.util.List;/** * 第一个参数示意初始值 * 第二个参数示意分区内和分区间的计算规定,雷同 */public class FoldByKeyRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("FoldByKeyRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<String, Integer>> words = new ArrayList<>(); words.add(Tuple2.apply("Hello", 3)); words.add(Tuple2.apply("Spark", 2)); words.add(Tuple2.apply("Hello", 10)); words.add(Tuple2.apply("Spark", 17)); JavaPairRDD<String, Integer> wordsRDD = sc.parallelizePairs(words, 2); JavaPairRDD<String, Integer> foldByKeyRDD = wordsRDD.foldByKey(10, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); foldByKeyRDD.collect().forEach(System.out::println); sc.stop(); }}
combineByKey
package com.journey.core.rdd.transform;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import scala.Tuple2;import java.util.ArrayList;import java.util.List;/** * 求平均数 * 第一个参数只做数据的转换 * 第二个参数分区内的计算 * 第三个参数分区间的计算 * * reduceByKey : 雷同key的第一个数据不过程任何计算,分区内和分区间计算规定雷同 * foldByKey : 雷同key的第一个数据和初始值进行分区内计算,分区内和分区间计算规定雷同 * aggregateByKey : 雷同key的第一个数据和初始值进行分区内计算,分区内和分区间计算规定能够不雷同 * combineByKey : 当计算时,发现数据结构不满足时,能够让第一个数据转换构造。分区内和分区间计算规定能够不雷同 */public class CombineByKeyRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("CombineByKeyRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<String, Integer>> words = new ArrayList<>(); words.add(Tuple2.apply("Hello", 3)); words.add(Tuple2.apply("Spark", 2)); words.add(Tuple2.apply("Hello", 3)); words.add(Tuple2.apply("Spark", 2)); words.add(Tuple2.apply("Spark", 2)); words.add(Tuple2.apply("Spark", 2)); JavaPairRDD<String, Integer> wordsRDD = sc.parallelizePairs(words, 2); JavaPairRDD<String, Tuple2<Integer, Integer>> combineByKeyRDD = wordsRDD.combineByKey(new Function<Integer, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> call(Integer v1) throws Exception { return Tuple2.apply(v1, 1); } }, new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Integer v2) throws Exception { return Tuple2.apply(v1._1 + v2, v1._2 + 1); } }, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception { return Tuple2.apply(v1._1 + v2._1, v1._2 + v2._2); } }); combineByKeyRDD.collect().forEach(t -> { String key = t._1; Tuple2<Integer, Integer> tuple = t._2; System.out.println(key + ":" + tuple._1 / tuple._2); }); JavaPairRDD<String, Integer> wordCountRDD = wordsRDD.combineByKey(new Function<Integer, Integer>() { @Override public Integer call(Integer v1) throws Exception { return v1; } }, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordCountRDD.collect().forEach(System.out::println); sc.stop(); }}
sortByKey
package com.journey.core.rdd.transform;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import scala.Tuple2;import java.util.ArrayList;import java.util.List;/** * 对key进行排序 */public class SortByKeyRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("CombineByKeyRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<String, Integer>> words = new ArrayList<>(); words.add(Tuple2.apply("Alice", 3)); words.add(Tuple2.apply("zhangsan", 2)); words.add(Tuple2.apply("lisi", 3)); words.add(Tuple2.apply("wagnwu", 2)); words.add(Tuple2.apply("mayun", 2)); words.add(Tuple2.apply("haha", 2)); JavaPairRDD<String, Integer> wordsRDD = sc.parallelizePairs(words, 2); // 默认是升序,能够指定降序排序,也能够指定自定义排序规定 JavaPairRDD<String, Integer> sortWordsRDD = wordsRDD.sortByKey(true); sortWordsRDD.collect().forEach(System.out::println); sc.stop(); }}
join & leftOuterJoin
package com.journey.core.rdd.transform;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.Optional;import scala.Tuple2;import java.util.ArrayList;import java.util.List;/** * 在类型为(K,V)和(K,W)的RDD上调用,返回一个雷同key对应的所有元素连贯在一起的(K,(V,W))的RDD */public class JoinRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("JoinRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<Integer, String>> userInfos = new ArrayList<>(); userInfos.add(Tuple2.apply(1, "zhagnsan")); userInfos.add(Tuple2.apply(2, "lisi")); userInfos.add(Tuple2.apply(3, "lisi")); List<Tuple2<Integer, String>> orders = new ArrayList<>(); orders.add(Tuple2.apply(1, "iphone pad")); orders.add(Tuple2.apply(1, "mac pad")); orders.add(Tuple2.apply(2, "java book")); JavaPairRDD<Integer, String> userInfosRDD = sc.parallelizePairs(userInfos, 2); JavaPairRDD<Integer, String> ordersRDD = sc.parallelizePairs(orders, 2); JavaPairRDD<Integer, Tuple2<String, String>> joinRDD = userInfosRDD.join(ordersRDD); joinRDD.collect().forEach(System.out::println); // 左连贯,就是右边都显示,左边没有为empty JavaPairRDD<Integer, Tuple2<String, Optional<String>>> leftOuterJoinRDD = userInfosRDD.leftOuterJoin(ordersRDD); leftOuterJoinRDD.collect().forEach(System.out::println); sc.stop(); }}
cogroup
package com.journey.core.rdd.transform;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.Optional;import scala.Tuple2;import java.util.ArrayList;import java.util.List;/** * 雷同的key汇聚合在一起,value是一个汇合 */public class CogroupRDD { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("CogroupRDD") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<Integer, String>> userInfos = new ArrayList<>(); userInfos.add(Tuple2.apply(1, "zhagnsan")); userInfos.add(Tuple2.apply(2, "lisi")); userInfos.add(Tuple2.apply(3, "lisi")); List<Tuple2<Integer, String>> orders = new ArrayList<>(); orders.add(Tuple2.apply(1, "iphone pad")); orders.add(Tuple2.apply(1, "mac pad")); orders.add(Tuple2.apply(2, "java book")); JavaPairRDD<Integer, String> userInfosRDD = sc.parallelizePairs(userInfos, 2); JavaPairRDD<Integer, String> ordersRDD = sc.parallelizePairs(orders, 2); JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<String>>> cogroupRDD = userInfosRDD.cogroup(ordersRDD); cogroupRDD.collect().forEach(System.out::println); sc.stop(); }}
Top N 案例
package com.journey.core.rdd.transform;import org.apache.commons.collections.IteratorUtils;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.Optional;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;import scala.Tuple3;import java.util.ArrayList;import java.util.Collections;import java.util.Comparator;import java.util.Iterator;import java.util.List;/** * Serialization stack: * - object not serializable (class: java.util.ArrayList$SubList, value: [(16,26), (26,25), (1,23)]) * - field (class: scala.Tuple2, name: _2, type: class java.lang.Object) * - object (class scala.Tuple2, (7,[(16,26), (26,25), (1,23)])) * - element of array (index: 0) * - array (class [Lscala.Tuple2;, size 5) * at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) * at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) * at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) * at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:489) * at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) * at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) * at java.base/java.lang.Thread.run(Thread.java:835) * 23/05/09 20:29:01 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 4) * java.io.NotSerializableException: java.util.ArrayList$SubList * Serialization stack: * * 解决之法 : * It's because, List returned by subList() method is an instance of 'RandomAccessSubList' which is not serializable. * Therefore you need to create a new ArrayList object from the list returned by the subList(). */public class Demo { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("Demo") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> logRDD = sc.textFile("datas/agent.log"); JavaPairRDD<Tuple2<String, String>, Integer> proviceAdRDD = logRDD.mapToPair(new PairFunction<String, Tuple2<String, String>, Integer>() { @Override public Tuple2<Tuple2<String, String>, Integer> call(String line) throws Exception { String[] fields = line.split(" "); String provice = fields[1]; String ad = fields[4]; return Tuple2.apply(Tuple2.apply(provice, ad), 1); } }); JavaPairRDD<Tuple2<String, String>, Integer> proviceAdToCountRDD = proviceAdRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); JavaPairRDD<String, Tuple2<String, Integer>> proviceToAdCountRDD = proviceAdToCountRDD.mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Tuple2<String, Integer>> call(Tuple2<Tuple2<String, String>, Integer> value) throws Exception { return Tuple2.apply(value._1._1, Tuple2.apply(value._1._2, value._2)); } }); JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> proviceToAdGroupRDD = proviceToAdCountRDD.groupByKey(); // 在分组内进行排序,取分组内的 top N JavaPairRDD<String , Iterable<Tuple2<String , Integer>>> proviceToAdTop3RDD = proviceToAdGroupRDD.mapToPair(new PairFunction<Tuple2<String, Iterable<Tuple2<String, Integer>>>, String, Iterable<Tuple2<String, Integer>>>() { @Override public Tuple2<String, Iterable<Tuple2<String, Integer>>> call(Tuple2<String, Iterable<Tuple2<String, Integer>>> iterable) throws Exception { List<Tuple2<String, Integer>> result = IteratorUtils.toList(iterable._2.iterator()); Collections.sort(result, new Comparator<Tuple2<String, Integer>>() { @Override public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) { return o2._2 - o1._2; } }); // 肯定要次要,这里须要的是new ArrayList<>(result.subList(0, 3)),封装一下 return Tuple2.apply(iterable._1, new ArrayList<>(result.subList(0, 3))); } });// proviceToAdTop3RDD.foreach(new VoidFunction<Tuple2<String, Iterable<Tuple2<String, Integer>>>>() {// @Override// public void call(Tuple2<String, Iterable<Tuple2<String, Integer>>> stringIterableTuple2) throws Exception {// System.out.println(stringIterableTuple2);// }// }); proviceToAdTop3RDD.collect().forEach(System.out::println); sc.stop(); }}