共计 3259 个字符,预计需要花费 9 分钟才能阅读完成。
1、RDD
Resilient Distributed Dataset (RDD),弹性分布式数据集
弹性是指什么?
1、内存的弹性:内存与磁盘的主动切换
2、容错的弹性:数据失落能够主动复原
3、计算的弹性:计算出错重试机制
4、分片的弹性:依据须要从新分片
分布式
就是 RDD 中的计算逻辑依据分区划分 Task 发送 Executor(不同节点) 执行
数据集
RDD 中是划分了分区了,有数据的援用,不是数据的存储
次要个性
- A list of partitions 分区
- A function for computing each split 每个切片有一个计算
- A list of dependencies on other RDDs RDD 之间是相互依赖的
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) key/value 模式的有分区器
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 有优选的地位去计算分片
2、RDD 依赖
血统
package com.journey.core.wc;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class WordCount {public static void main(String[] args) {SparkConf conf = new SparkConf()
.setAppName("WordCount")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
// 如果是集群上运行,间接就是 hdfs 门路了
JavaRDD<String> lineRDD = sc.textFile("datas/wc", 4);
System.out.println(lineRDD.toDebugString());
System.out.println("*************************************");
JavaRDD<String> wordsRDD = lineRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {return Arrays.asList(line.split(" ")).iterator();}
});
System.out.println(wordsRDD.toDebugString());
System.out.println("*************************************");
JavaPairRDD<String, Integer> wordToPairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {return Tuple2.apply(word, 1);
}
});
System.out.println(wordToPairRDD.toDebugString());
System.out.println("*************************************");
JavaPairRDD<String, Integer> word2CountRDD = wordToPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}
});
System.out.println(word2CountRDD.toDebugString());
System.out.println("*************************************");
List<Tuple2<String, Integer>> result = word2CountRDD.collect();
System.out.println(result);
sc.stop();}
}
输入后果 :
(4) datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 []
| datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []
*************************************
(4) MapPartitionsRDD[2] at flatMap at WordCount.java:29 []
| datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 []
| datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []
*************************************
(4) MapPartitionsRDD[3] at mapToPair at WordCount.java:38 []
| MapPartitionsRDD[2] at flatMap at WordCount.java:29 []
| datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 []
| datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []
*************************************
(4) ShuffledRDD[4] at reduceByKey at WordCount.java:47 []
+-(4) MapPartitionsRDD[3] at mapToPair at WordCount.java:38 []
| MapPartitionsRDD[2] at flatMap at WordCount.java:29 []
| datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 []
| datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []
*************************************
[(Spark,2), (Hello,4), (World,1), (Mayun,1)]
后面 4 是指分区
待持续。。。。
正文完