1、RDD

Resilient Distributed Dataset (RDD),弹性分布式数据集

弹性是指什么?

1、内存的弹性:内存与磁盘的主动切换
2、容错的弹性:数据失落能够主动复原
3、计算的弹性:计算出错重试机制
4、分片的弹性:依据须要从新分片

分布式

就是RDD中的计算逻辑依据分区划分Task发送Executor(不同节点)执行

数据集

RDD中是划分了分区了,有数据的援用,不是数据的存储

次要个性

  • A list of partitions 分区
  • A function for computing each split 每个切片有一个计算
  • A list of dependencies on other RDDs RDD之间是相互依赖的
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) key/value模式的有分区器
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 有优选的地位去计算分片

2、RDD依赖

血统

package com.journey.core.wc;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import java.util.Arrays;import java.util.Iterator;import java.util.List;public class WordCount {    public static void main(String[] args) {        SparkConf conf = new SparkConf()                .setAppName("WordCount")                .setMaster("local[*]");        JavaSparkContext sc = new JavaSparkContext(conf);        // 如果是集群上运行,间接就是hdfs门路了        JavaRDD<String> lineRDD = sc.textFile("datas/wc", 4);        System.out.println(lineRDD.toDebugString());        System.out.println("*************************************");        JavaRDD<String> wordsRDD = lineRDD.flatMap(new FlatMapFunction<String, String>() {            @Override            public Iterator<String> call(String line) throws Exception {                return Arrays.asList(line.split(" ")).iterator();            }        });        System.out.println(wordsRDD.toDebugString());        System.out.println("*************************************");        JavaPairRDD<String, Integer> wordToPairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {            @Override            public Tuple2<String, Integer> call(String word) throws Exception {                return Tuple2.apply(word, 1);            }        });        System.out.println(wordToPairRDD.toDebugString());        System.out.println("*************************************");        JavaPairRDD<String, Integer> word2CountRDD = wordToPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {            @Override            public Integer call(Integer v1, Integer v2) throws Exception {                return v1 + v2;            }        });        System.out.println(word2CountRDD.toDebugString());        System.out.println("*************************************");        List<Tuple2<String, Integer>> result = word2CountRDD.collect();        System.out.println(result);        sc.stop();    }}

输入后果 :

(4) datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 [] |  datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []*************************************(4) MapPartitionsRDD[2] at flatMap at WordCount.java:29 [] |  datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 [] |  datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []*************************************(4) MapPartitionsRDD[3] at mapToPair at WordCount.java:38 [] |  MapPartitionsRDD[2] at flatMap at WordCount.java:29 [] |  datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 [] |  datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []*************************************(4) ShuffledRDD[4] at reduceByKey at WordCount.java:47 [] +-(4) MapPartitionsRDD[3] at mapToPair at WordCount.java:38 []    |  MapPartitionsRDD[2] at flatMap at WordCount.java:29 []    |  datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 []    |  datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []*************************************[(Spark,2), (Hello,4), (World,1), (Mayun,1)]

后面4是指分区

待持续。。。。