共计 32733 个字符,预计需要花费 82 分钟才能阅读完成。
Tranform(转换算子)
map
package com.journey.core.rdd.transform;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.util.ArrayList;
import java.util.List;
/**
* 将解决的数据逐条进行映射转换,这里的转换能够是类型的转换,也能够是指的转换
*/
public class MapRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("MapRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> nums = new ArrayList<>();
nums.add(1);
nums.add(2);
nums.add(3);
nums.add(4);
JavaRDD<Integer> numsRDD = sc.parallelize(nums);
JavaRDD<Integer> mapRDD = numsRDD.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer value) throws Exception {return value * 2;}
});
mapRDD.collect().forEach(System.out::println);
JavaRDD<String> fileRDD = sc.textFile("datas/apache.log");
JavaRDD<String> urlRDD = fileRDD.map(new Function<String, String>() {
@Override
public String call(String line) throws Exception {return line.split(" ")[6];
}
});
urlRDD.collect().forEach(System.out::println);
sc.stop();}
}
mapPartitions
package com.journey.core.rdd.transform;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* 将解决的数据以分区为单位发送给计算节点进行解决,这里的解决是指能够进行任意的解决,哪怕是过滤数据
*
* map 和 mapPartitions 的区别?* 数据处理角度
* Map 算子是分区内一个数据一个数据的执行,相似于串行操作。而 mapPartitions 算子是以分区为单位进行批处理操作
*
* 性能的角度
* Map 算子次要目标将数据源中的数据进行转换和扭转。然而不会缩小或增多数据。MapPartitions 算子须要传递一个迭代器,返回一个迭代器,没有要求的元素的个数
* 放弃不变,所以能够减少或缩小数据
*
* 性能角度
* Map 算子因为相似于串行操作,所以性能比拟低,而 mapPartitions 算子相似于批处理,所以性能较高。然而 mapPartitions 算子会长工夫占用内存,那么这样会导致
* 内存可能不够用,呈现内存溢出的谬误。所以在内存无限的状况下,不举荐应用。应用 map 操作
*/
public class MapPartitionsRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("MapPartitionsRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> nums = new ArrayList<>();
nums.add(1);
nums.add(2);
nums.add(3);
nums.add(4);
JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2);
JavaRDD<Integer> mapPartitionsRDD = numsRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() {
@Override
public Iterator<Integer> call(Iterator<Integer> iterator) throws Exception {
// 留神,这里只会打印两遍,为什么呢?是因为有两个分区,每个分区解决一次
System.out.println("xxxxxxxxxxx");
List<Integer> result = new ArrayList<>();
while (iterator.hasNext()) {Integer num = iterator.next();
result.add(num * 2);
}
return result.iterator();}
});
mapPartitionsRDD.collect().forEach(System.out::println);
// 计算每个分区的最大值
JavaRDD<Integer> maxPartitionValueRDD = mapPartitionsRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() {
@Override
public Iterator<Integer> call(Iterator<Integer> iterator) throws Exception {List<Integer> result = new ArrayList<>();
Integer maxValue = Integer.MIN_VALUE;
while (iterator.hasNext()) {Integer value = iterator.next();
if (value > maxValue) {maxValue = value;}
}
result.add(maxValue);
return result.iterator();}
});
maxPartitionValueRDD.collect().forEach(System.out::println);
sc.stop();}
}
mapPartitionsWithIndex
package com.journey.core.rdd.transform;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
/**
* 将解决的数据以分区为单位发送到计算节点进行解决,这里解决的是指能够进行任意的解决,哪怕是过滤数据,在解决时同时能够获取以后分区的索引
*/
public class MapPartitionsWithIndexRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("MapPartitionsWithIndexRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> nums = new ArrayList<>();
nums.add(1);
nums.add(2);
nums.add(3);
nums.add(4);
JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2);
Function2 mpIndexFunction = new Function2<Integer, Iterator<Integer>, Iterator<Integer>>(){
@Override
public Iterator<Integer> call(Integer index, Iterator<Integer> iterator) throws Exception {if(index == 0){return iterator;}
// 返回一个空的迭代器
return Collections.emptyIterator();}
};
// mapPartitionsWithIndex 的时候须要留神,preservesPartitioning 是否保留 partitioner
// 函数内部申明
JavaRDD mpRDD = numsRDD.mapPartitionsWithIndex(mpIndexFunction, true);
mpRDD.collect().forEach(System.out::println);
sc.stop();}
}
flatMap
package com.journey.core.rdd.transform;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
* 将解决的数据进行扁平化后再进行映射解决,所以算子也称之为扁平映射,说白了其实就是能够一对多的输入
*/
public class FlatMapRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("FlatMapRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> fileRDD = sc.textFile("datas/wc");
JavaRDD<String> wordRDD = fileRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {return Arrays.stream(line.split(" ")).iterator();}
});
wordRDD.collect().forEach(System.out::println);
List<ArrayList<Integer>> nums = new ArrayList<>();
ArrayList<Integer> nums1 = new ArrayList<>();
nums1.add(1);
nums1.add(2);
nums.add(nums1);
ArrayList<Integer> nums2 = new ArrayList<>();
nums2.add(3);
nums2.add(4);
nums.add(nums2);
JavaRDD<ArrayList<Integer>> numsRDD = sc.parallelize(nums);
JavaRDD<Integer> numsFlatMapRDD = numsRDD.flatMap(new FlatMapFunction<ArrayList<Integer>, Integer>() {
@Override
public Iterator<Integer> call(ArrayList<Integer> integers) throws Exception {return integers.iterator();
}
});
numsFlatMapRDD.collect().forEach(System.out::println);
sc.stop();}
}
mapValues
package com.journey.core.rdd.transform;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* 只对 value 进行操作
*/
public class MapValuesRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("MapValuesRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<String, Integer>> userInfos = new ArrayList<>();
userInfos.add(Tuple2.apply("Alice", 300));
userInfos.add(Tuple2.apply("zhangsan", 200));
userInfos.add(Tuple2.apply("lisi", 309));
userInfos.add(Tuple2.apply("wagnwu", 201));
userInfos.add(Tuple2.apply("mayun", 234));
userInfos.add(Tuple2.apply("haha", 223));
JavaPairRDD<String, Integer> userInfosRDD = sc.parallelizePairs(userInfos, 2);
// 都涨薪 100
JavaPairRDD<String, Integer> userInfosSalaryAdd100 = userInfosRDD.mapValues(new Function<Integer, Integer>() {
@Override
public Integer call(Integer v1) throws Exception {return v1 + 100;}
});
userInfosSalaryAdd100.collect().forEach(System.out::println);
sc.stop();}
}
glom
package com.journey.core.rdd.transform;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* 将同一个分区的数据间接转换为雷同类型的内存数组进行解决,分区不变
*/
public class GlomRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("GlomRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> nums = new ArrayList<>();
nums.add(1);
nums.add(2);
nums.add(3);
nums.add(4);
JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2);
JavaRDD<List<Integer>> glomRDD = numsRDD.glom();
JavaRDD<Integer> mapRDD = glomRDD.map(new Function<List<Integer>, Integer>() {
@Override
public Integer call(List<Integer> nums) throws Exception {return Collections.max(nums);
}
});
List<Integer> resultList = mapRDD.collect();
Integer result = resultList.stream().reduce(Integer::sum).orElse(0);
System.out.println(result);
sc.stop();}
}
groupBy
package com.journey.core.rdd.transform;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* reduceByKey 和 groupByKey 的区别?* 从 shuffle 角度 : reduceByKey 和 groupByKey 都存在 shuffle 操作,然而 reduceByKey 能够在 shuffle 前对分区内雷同的 key 进行预聚合 (combine) 性能,* 这样会缩小落盘的数据量,而 groupByKey 只是进行分组,不存在数据量缩小的问题,reduceByKey 性能比拟高
*
* 从性能角度:reduceByKey 其实蕴含分区和聚合的性能。GroupByKey 只能分组,不能聚合,所以分组聚合场景下,举荐应用 reduceByKey,如果仅仅是分组而
* 不须要聚合。那么还是只能应用 reduceByKey
*/
public class GroupByKeyRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("GroupByKeyRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<String> words = new ArrayList<>();
words.add("Hello");
words.add("Spark");
words.add("Spark");
words.add("World");
JavaRDD<String> wordsRDD = sc.parallelize(words);
JavaPairRDD<String, Integer> wordToPairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {return Tuple2.apply(word, 1);
}
});
JavaPairRDD<String, Iterable<Integer>> wordGroupByRDD = wordToPairRDD.groupByKey();
JavaPairRDD<String, Integer> wordCountRDD = wordGroupByRDD.mapValues(new Function<Iterable<Integer>, Integer>() {
@Override
public Integer call(Iterable<Integer> iterable) throws Exception {return ((Collection<?>) iterable).size();}
});
wordCountRDD.collect().forEach(System.out::println);
sc.stop();}
}
filter
package com.journey.core.rdd.transform;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
/**
* 将数据依据指定的规定进行筛选过滤,合乎规定的数据保留,不合乎规定的数据抛弃。当数据进行筛选过滤过,分区不变,然而分区内的数据可能不平衡
* 生成环境下,可能会呈现数据歪斜,所以个别 filter 之后能够 repartition
*/
public class FilterRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("FilterRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logFileRDD = sc.textFile("datas/apache.log");
JavaRDD<String> filterRDD = logFileRDD.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String value) throws Exception {return value.contains("7/05/2015");
}
});
JavaRDD<String> mapRDD = filterRDD.map(new Function<String, String>() {
@Override
public String call(String value) throws Exception {String[] fields = value.split(" ");
return fields[6];
}
});
mapRDD.collect().forEach(System.out::println);
sc.stop();}
}
sample
package com.journey.core.rdd.transform;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.util.ArrayList;
import java.util.List;
/**
* 其实次要查看一下数据的散布
*/
public class SampleRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("SampleRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> nums = new ArrayList<>();
nums.add(1);
nums.add(2);
nums.add(3);
nums.add(4);
JavaRDD<Integer> numsRDD = sc.parallelize(nums);
/**
* 第一个参数 : 抽取的数据是否放回,false : 不放回,true : 放回
* 第二个参数 : 抽取的几率,范畴在 [0,1] 之间,抽取呈现的概率,大于 1,反复几率
* 第三个参数 : 随机种子
*/
JavaRDD<Integer> sampleRDD1 = numsRDD.sample(false, 0.5);
JavaRDD<Integer> sampleRDD2 = numsRDD.sample(true, 3);
sampleRDD1.collect().forEach(System.out::println);
System.out.println("**************************");
sampleRDD2.collect().forEach(System.out::println);
sc.stop();}
}
distinct
package com.journey.core.rdd.transform;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.ArrayList;
import java.util.List;
/**
* 将数据集中反复的数据去重
*/
public class DistinctRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("DistinctRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> nums = new ArrayList<>();
nums.add(1);
nums.add(1);
nums.add(2);
nums.add(3);
nums.add(3);
nums.add(1);
JavaRDD<Integer> numsRDD = sc.parallelize(nums, 2);
JavaRDD<Integer> distinctRDD = numsRDD.distinct(2);
distinctRDD.collect().forEach(System.out::println);
sc.stop();}
}
coalesce
package com.journey.core.rdd.transform;
import com.clearspring.analytics.util.Lists;
import org.apache.commons.collections.IteratorUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* 依据数据量缩减分区,用于大数据集过滤后,进步小数据集的执行效率
* 当 Spark 程序中,存在过多的小工作的时候,能够通过 coalesce 办法,缩减合并分区,缩小分区的个数,缩小任务调度老本
*/
public class CoalesceRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("CoalesceRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> nums = new ArrayList<>();
nums.add(1);
nums.add(2);
nums.add(3);
nums.add(4);
nums.add(5);
nums.add(6);
JavaRDD<Integer> numsRDD = sc.parallelize(nums, 6);
/**
* coalesce 其实须要留神一点,就是默认 shuffle 为 false,也就是在缩减分区的时候,是进行分区的合并的
* coalesce 在不 shuffle 的状况下,不能减少分区
*/
JavaRDD<Integer> coalesceRDD = numsRDD.coalesce(2);
coalesceRDD.saveAsTextFile("datas/output");
sc.stop();}
}
repartition
package com.journey.core.rdd.transform;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.ArrayList;
import java.util.List;
/**
* 该操作外部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的 RDD 转换为分区少的 RDD,还是将分区少的 RDD
* 转换为分区多的 RDD,repartition 都能够实现,因为无论如何都会通过 shuffle 过程
*/
public class RepartitionRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("RepartitionRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> nums = new ArrayList<>();
nums.add(1);
nums.add(2);
nums.add(3);
nums.add(4);
nums.add(5);
nums.add(6);
JavaRDD<Integer> numsRDD = sc.parallelize(nums, 6);
JavaRDD<Integer> coalesceRDD = numsRDD.repartition(10);
coalesceRDD.saveAsTextFile("datas/output");
sc.stop();}
}
intersection & union & subtract & zip
package com.journey.core.rdd.transform;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.util.ArrayList;
import java.util.List;
/**
* 该操作用于排序数据。在排序之前,能够将数据通过 f 函数进行解决,之后依照 f 函数解决的后果进行排序,默认是升序排序。排序后新产生的 RDD 的分区数
* 与原 RDD 分区数始终。两头存在 shuffle 的过程
*/
public class IntersectionRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("SortByRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Integer> nums1 = new ArrayList<>();
nums1.add(1);
nums1.add(2);
nums1.add(3);
nums1.add(4);
List<Integer> nums2 = new ArrayList<>();
nums2.add(3);
nums2.add(4);
nums2.add(5);
nums2.add(6);
List<String> nums3 = new ArrayList<>();
nums3.add("3");
JavaRDD<Integer> nums1RDD = sc.parallelize(nums1,1);
JavaRDD<Integer> nums2RDD = sc.parallelize(nums2,1);
// 必须雷同类型
JavaRDD<Integer> intersectionRDD = nums1RDD.intersection(nums2RDD);
JavaRDD<Integer> unionRDD = nums1RDD.union(nums2RDD);
// 必须雷同类型
JavaRDD<Integer> subtractRDD = nums1RDD.subtract(nums2RDD);
// 必须雷同类型,雷同分区个数
JavaPairRDD<Integer, Integer> zipRDD = nums1RDD.zip(nums2RDD);
intersectionRDD.collect().forEach(System.out::println);
System.out.println("******************************");
unionRDD.collect().forEach(System.out::println);
System.out.println("******************************");
subtractRDD.collect().forEach(System.out::println);
System.out.println("******************************");
zipRDD.collect().forEach(System.out::println);
sc.stop();}
}
partitionBy
package com.journey.core.rdd.transform;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* 将数据依照指定 Partitioner 从新进行分区。Spark 默认的分区器是 HashPartitioner
*/
public class PartitionerByRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("PartitionerByRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<String, String>> infos = new ArrayList<>();
infos.add(Tuple2.apply("1305261989234", "zhangsan"));
infos.add(Tuple2.apply("1505261989234", "lisi"));
infos.add(Tuple2.apply("1305261982343", "wagnwu"));
infos.add(Tuple2.apply("1505261382343", "zhaoliu"));
// 将 130 结尾的放入一个分区,将 150 结尾放入一个分区中
// TODO 留神,如果是 pairs,须要调用的是 parallelizePairs
JavaPairRDD<String, String> infosRDD = sc.parallelizePairs(infos, 2);
JavaPairRDD<String, String> partitionByRDD = infosRDD.partitionBy(new Partitioner() {
@Override
public int numPartitions() {return 2;}
@Override
public int getPartition(Object key) {String item = key.toString();
if (item.startsWith("130")) {return 0;} else if (item.startsWith("150")) {return 1;}
return 0;
}
});
partitionByRDD.collect().forEach(System.out::println);
sc.stop();}
}
reduceByKey
package com.journey.core.rdd.transform;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* 能够将雷同的 key 对应的 value 进行聚合
*/
public class ReduceByKeyRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("ReduceByKeyRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<String> words = new ArrayList<>();
words.add("Hello");
words.add("Spark");
words.add("Spark");
words.add("World");
JavaRDD<String> wordsRDD = sc.parallelize(words);
JavaPairRDD<String, Integer> wordToPairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {return Tuple2.apply(word, 1);
}
});
JavaPairRDD<String, Integer> wordCountRDD = wordToPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}
});
wordCountRDD.collect().forEach(System.out::println);
sc.stop();}
}
groupByKey
package com.journey.core.rdd.transform;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* reduceByKey 和 groupByKey 的区别?* 从 shuffle 角度 : reduceByKey 和 groupByKey 都存在 shuffle 操作,然而 reduceByKey 能够在 shuffle 前对分区内雷同的 key 进行预聚合 (combine) 性能,* 这样会缩小落盘的数据量,而 groupByKey 只是进行分组,不存在数据量缩小的问题,reduceByKey 性能比拟高
*
* 从性能角度:reduceByKey 其实蕴含分区和聚合的性能。GroupByKey 只能分组,不能聚合,所以分组聚合场景下,举荐应用 reduceByKey,如果仅仅是分组而
* 不须要聚合。那么还是只能应用 reduceByKey
*/
public class GroupByKeyRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("GroupByKeyRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<String> words = new ArrayList<>();
words.add("Hello");
words.add("Spark");
words.add("Spark");
words.add("World");
JavaRDD<String> wordsRDD = sc.parallelize(words);
JavaPairRDD<String, Integer> wordToPairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {return Tuple2.apply(word, 1);
}
});
JavaPairRDD<String, Iterable<Integer>> wordGroupByRDD = wordToPairRDD.groupByKey();
JavaPairRDD<String, Integer> wordCountRDD = wordGroupByRDD.mapValues(new Function<Iterable<Integer>, Integer>() {
@Override
public Integer call(Iterable<Integer> iterable) throws Exception {return ((Collection<?>) iterable).size();}
});
wordCountRDD.collect().forEach(System.out::println);
sc.stop();}
}
aggregateByKey
package com.journey.core.rdd.transform;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* 第一个参数示意初始值
* 第二个参数分区内的计算规定
* 第三个参数分区间的计算规定
*/
public class AggregateByKeyRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("AggregateByKeyRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<String, Integer>> words = new ArrayList<>();
words.add(Tuple2.apply("Hello", 3));
words.add(Tuple2.apply("Spark", 2));
words.add(Tuple2.apply("Hello", 10));
words.add(Tuple2.apply("Spark", 17));
JavaPairRDD<String, Integer> wordsRDD = sc.parallelizePairs(words, 2);
// aggregateByKey 的初始值只会参加分区内的计算
JavaPairRDD<String, Integer> aggregateByKeyRDD = wordsRDD.aggregateByKey(10,
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}
}, new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}
});
aggregateByKeyRDD.collect().forEach(System.out::println);
// aggregateByKey 的初始值只会参加分区内的计算
JavaPairRDD<String, Integer> aggregateByKeyRDD2 = wordsRDD.aggregateByKey(10,
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
// 分区内计算最大值
return Math.max(v1, v2);
}
}, new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}
});
aggregateByKeyRDD2.collect().forEach(System.out::println);
sc.stop();}
}
foldByKey
package com.journey.core.rdd.transform;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* 第一个参数示意初始值
* 第二个参数示意分区内和分区间的计算规定,雷同
*/
public class FoldByKeyRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("FoldByKeyRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<String, Integer>> words = new ArrayList<>();
words.add(Tuple2.apply("Hello", 3));
words.add(Tuple2.apply("Spark", 2));
words.add(Tuple2.apply("Hello", 10));
words.add(Tuple2.apply("Spark", 17));
JavaPairRDD<String, Integer> wordsRDD = sc.parallelizePairs(words, 2);
JavaPairRDD<String, Integer> foldByKeyRDD = wordsRDD.foldByKey(10,
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}
});
foldByKeyRDD.collect().forEach(System.out::println);
sc.stop();}
}
combineByKey
package com.journey.core.rdd.transform;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* 求平均数
* 第一个参数只做数据的转换
* 第二个参数分区内的计算
* 第三个参数分区间的计算
*
* reduceByKey : 雷同 key 的第一个数据不过程任何计算,分区内和分区间计算规定雷同
* foldByKey : 雷同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规定雷同
* aggregateByKey : 雷同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规定能够不雷同
* combineByKey : 当计算时,发现数据结构不满足时,能够让第一个数据转换构造。分区内和分区间计算规定能够不雷同
*/
public class CombineByKeyRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("CombineByKeyRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<String, Integer>> words = new ArrayList<>();
words.add(Tuple2.apply("Hello", 3));
words.add(Tuple2.apply("Spark", 2));
words.add(Tuple2.apply("Hello", 3));
words.add(Tuple2.apply("Spark", 2));
words.add(Tuple2.apply("Spark", 2));
words.add(Tuple2.apply("Spark", 2));
JavaPairRDD<String, Integer> wordsRDD = sc.parallelizePairs(words, 2);
JavaPairRDD<String, Tuple2<Integer, Integer>> combineByKeyRDD = wordsRDD.combineByKey(new Function<Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Integer v1) throws Exception {return Tuple2.apply(v1, 1);
}
}, new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Integer v2) throws Exception {return Tuple2.apply(v1._1 + v2, v1._2 + 1);
}
}, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception {return Tuple2.apply(v1._1 + v2._1, v1._2 + v2._2);
}
});
combineByKeyRDD.collect().forEach(t -> {
String key = t._1;
Tuple2<Integer, Integer> tuple = t._2;
System.out.println(key + ":" + tuple._1 / tuple._2);
});
JavaPairRDD<String, Integer> wordCountRDD = wordsRDD.combineByKey(new Function<Integer, Integer>() {
@Override
public Integer call(Integer v1) throws Exception {return v1;}
}, new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}
}, new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}
});
wordCountRDD.collect().forEach(System.out::println);
sc.stop();}
}
sortByKey
package com.journey.core.rdd.transform;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* 对 key 进行排序
*/
public class SortByKeyRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("CombineByKeyRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<String, Integer>> words = new ArrayList<>();
words.add(Tuple2.apply("Alice", 3));
words.add(Tuple2.apply("zhangsan", 2));
words.add(Tuple2.apply("lisi", 3));
words.add(Tuple2.apply("wagnwu", 2));
words.add(Tuple2.apply("mayun", 2));
words.add(Tuple2.apply("haha", 2));
JavaPairRDD<String, Integer> wordsRDD = sc.parallelizePairs(words, 2);
// 默认是升序,能够指定降序排序,也能够指定自定义排序规定
JavaPairRDD<String, Integer> sortWordsRDD = wordsRDD.sortByKey(true);
sortWordsRDD.collect().forEach(System.out::println);
sc.stop();}
}
join & leftOuterJoin
package com.journey.core.rdd.transform;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* 在类型为 (K,V) 和(K,W)的 RDD 上调用,返回一个雷同 key 对应的所有元素连贯在一起的 (K,(V,W)) 的 RDD
*/
public class JoinRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("JoinRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<Integer, String>> userInfos = new ArrayList<>();
userInfos.add(Tuple2.apply(1, "zhagnsan"));
userInfos.add(Tuple2.apply(2, "lisi"));
userInfos.add(Tuple2.apply(3, "lisi"));
List<Tuple2<Integer, String>> orders = new ArrayList<>();
orders.add(Tuple2.apply(1, "iphone pad"));
orders.add(Tuple2.apply(1, "mac pad"));
orders.add(Tuple2.apply(2, "java book"));
JavaPairRDD<Integer, String> userInfosRDD = sc.parallelizePairs(userInfos, 2);
JavaPairRDD<Integer, String> ordersRDD = sc.parallelizePairs(orders, 2);
JavaPairRDD<Integer, Tuple2<String, String>> joinRDD = userInfosRDD.join(ordersRDD);
joinRDD.collect().forEach(System.out::println);
// 左连贯,就是右边都显示,左边没有为 empty
JavaPairRDD<Integer, Tuple2<String, Optional<String>>> leftOuterJoinRDD = userInfosRDD.leftOuterJoin(ordersRDD);
leftOuterJoinRDD.collect().forEach(System.out::println);
sc.stop();}
}
cogroup
package com.journey.core.rdd.transform;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
/**
* 雷同的 key 汇聚合在一起,value 是一个汇合
*/
public class CogroupRDD {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("CogroupRDD")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<Integer, String>> userInfos = new ArrayList<>();
userInfos.add(Tuple2.apply(1, "zhagnsan"));
userInfos.add(Tuple2.apply(2, "lisi"));
userInfos.add(Tuple2.apply(3, "lisi"));
List<Tuple2<Integer, String>> orders = new ArrayList<>();
orders.add(Tuple2.apply(1, "iphone pad"));
orders.add(Tuple2.apply(1, "mac pad"));
orders.add(Tuple2.apply(2, "java book"));
JavaPairRDD<Integer, String> userInfosRDD = sc.parallelizePairs(userInfos, 2);
JavaPairRDD<Integer, String> ordersRDD = sc.parallelizePairs(orders, 2);
JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<String>>> cogroupRDD = userInfosRDD.cogroup(ordersRDD);
cogroupRDD.collect().forEach(System.out::println);
sc.stop();}
}
Top N 案例
package com.journey.core.rdd.transform;
import org.apache.commons.collections.IteratorUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import scala.Tuple3;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
/**
* Serialization stack:
* - object not serializable (class: java.util.ArrayList$SubList, value: [(16,26), (26,25), (1,23)])
* - field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
* - object (class scala.Tuple2, (7,[(16,26), (26,25), (1,23)]))
* - element of array (index: 0)
* - array (class [Lscala.Tuple2;, size 5)
* at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
* at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
* at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
* at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:489)
* at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
* at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
* at java.base/java.lang.Thread.run(Thread.java:835)
* 23/05/09 20:29:01 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 4)
* java.io.NotSerializableException: java.util.ArrayList$SubList
* Serialization stack:
*
* 解决之法 :
* It's because, List returned by subList() method is an instance of'RandomAccessSubList' which is not serializable.
* Therefore you need to create a new ArrayList object from the list returned by the subList().
*/
public class Demo {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("Demo")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logRDD = sc.textFile("datas/agent.log");
JavaPairRDD<Tuple2<String, String>, Integer> proviceAdRDD = logRDD.mapToPair(new PairFunction<String, Tuple2<String, String>, Integer>() {
@Override
public Tuple2<Tuple2<String, String>, Integer> call(String line) throws Exception {String[] fields = line.split(" ");
String provice = fields[1];
String ad = fields[4];
return Tuple2.apply(Tuple2.apply(provice, ad), 1);
}
});
JavaPairRDD<Tuple2<String, String>, Integer> proviceAdToCountRDD = proviceAdRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}
});
JavaPairRDD<String, Tuple2<String, Integer>> proviceToAdCountRDD = proviceAdToCountRDD.mapToPair(new PairFunction<Tuple2<Tuple2<String, String>, Integer>, String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Tuple2<String, Integer>> call(Tuple2<Tuple2<String, String>, Integer> value) throws Exception {return Tuple2.apply(value._1._1, Tuple2.apply(value._1._2, value._2));
}
});
JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> proviceToAdGroupRDD = proviceToAdCountRDD.groupByKey();
// 在分组内进行排序,取分组内的 top N
JavaPairRDD<String , Iterable<Tuple2<String , Integer>>> proviceToAdTop3RDD = proviceToAdGroupRDD.mapToPair(new PairFunction<Tuple2<String, Iterable<Tuple2<String, Integer>>>, String, Iterable<Tuple2<String, Integer>>>() {
@Override
public Tuple2<String, Iterable<Tuple2<String, Integer>>> call(Tuple2<String, Iterable<Tuple2<String, Integer>>> iterable) throws Exception {List<Tuple2<String, Integer>> result = IteratorUtils.toList(iterable._2.iterator());
Collections.sort(result, new Comparator<Tuple2<String, Integer>>() {
@Override
public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {return o2._2 - o1._2;}
});
// 肯定要次要,这里须要的是 new ArrayList<>(result.subList(0, 3)),封装一下
return Tuple2.apply(iterable._1, new ArrayList<>(result.subList(0, 3)));
}
});
// proviceToAdTop3RDD.foreach(new VoidFunction<Tuple2<String, Iterable<Tuple2<String, Integer>>>>() {
// @Override
// public void call(Tuple2<String, Iterable<Tuple2<String, Integer>>> stringIterableTuple2) throws Exception {// System.out.println(stringIterableTuple2);
// }
// });
proviceToAdTop3RDD.collect().forEach(System.out::println);
sc.stop();}
}
正文完