1、RDD
Resilient Distributed Dataset (RDD),弹性分布式数据集
弹性是指什么?
1、内存的弹性:内存与磁盘的主动切换
2、容错的弹性:数据失落能够主动复原
3、计算的弹性:计算出错重试机制
4、分片的弹性:依据须要从新分片
分布式
就是RDD中的计算逻辑依据分区划分Task发送Executor(不同节点)执行
数据集
RDD中是划分了分区了,有数据的援用,不是数据的存储
次要个性
- A list of partitions 分区
- A function for computing each split 每个切片有一个计算
- A list of dependencies on other RDDs RDD之间是相互依赖的
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) key/value模式的有分区器
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 有优选的地位去计算分片
2、RDD依赖
血统
package com.journey.core.wc;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import java.util.Arrays;import java.util.Iterator;import java.util.List;public class WordCount { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("WordCount") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); // 如果是集群上运行,间接就是hdfs门路了 JavaRDD<String> lineRDD = sc.textFile("datas/wc", 4); System.out.println(lineRDD.toDebugString()); System.out.println("*************************************"); JavaRDD<String> wordsRDD = lineRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); System.out.println(wordsRDD.toDebugString()); System.out.println("*************************************"); JavaPairRDD<String, Integer> wordToPairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception { return Tuple2.apply(word, 1); } }); System.out.println(wordToPairRDD.toDebugString()); System.out.println("*************************************"); JavaPairRDD<String, Integer> word2CountRDD = wordToPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); System.out.println(word2CountRDD.toDebugString()); System.out.println("*************************************"); List<Tuple2<String, Integer>> result = word2CountRDD.collect(); System.out.println(result); sc.stop(); }}
输入后果 :
(4) datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 [] | datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []*************************************(4) MapPartitionsRDD[2] at flatMap at WordCount.java:29 [] | datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 [] | datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []*************************************(4) MapPartitionsRDD[3] at mapToPair at WordCount.java:38 [] | MapPartitionsRDD[2] at flatMap at WordCount.java:29 [] | datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 [] | datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []*************************************(4) ShuffledRDD[4] at reduceByKey at WordCount.java:47 [] +-(4) MapPartitionsRDD[3] at mapToPair at WordCount.java:38 [] | MapPartitionsRDD[2] at flatMap at WordCount.java:29 [] | datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 [] | datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []*************************************[(Spark,2), (Hello,4), (World,1), (Mayun,1)]
后面4是指分区
待持续。。。。