乐趣区

关于spark:Spark-Core基础知识

1、RDD

Resilient Distributed Dataset (RDD),弹性分布式数据集

弹性是指什么?

1、内存的弹性:内存与磁盘的主动切换
2、容错的弹性:数据失落能够主动复原
3、计算的弹性:计算出错重试机制
4、分片的弹性:依据须要从新分片

分布式

就是 RDD 中的计算逻辑依据分区划分 Task 发送 Executor(不同节点) 执行

数据集

RDD 中是划分了分区了,有数据的援用,不是数据的存储

次要个性

  • A list of partitions 分区
  • A function for computing each split 每个切片有一个计算
  • A list of dependencies on other RDDs RDD 之间是相互依赖的
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) key/value 模式的有分区器
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) 有优选的地位去计算分片

2、RDD 依赖

血统

package com.journey.core.wc;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

public class WordCount {public static void main(String[] args) {SparkConf conf = new SparkConf()
                .setAppName("WordCount")
                .setMaster("local[*]");
        JavaSparkContext sc = new JavaSparkContext(conf);

        // 如果是集群上运行,间接就是 hdfs 门路了
        JavaRDD<String> lineRDD = sc.textFile("datas/wc", 4);
        System.out.println(lineRDD.toDebugString());
        System.out.println("*************************************");

        JavaRDD<String> wordsRDD = lineRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {return Arrays.asList(line.split(" ")).iterator();}
        });
        System.out.println(wordsRDD.toDebugString());
        System.out.println("*************************************");

        JavaPairRDD<String, Integer> wordToPairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {return Tuple2.apply(word, 1);
            }
        });
        System.out.println(wordToPairRDD.toDebugString());
        System.out.println("*************************************");

        JavaPairRDD<String, Integer> word2CountRDD = wordToPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}
        });
        System.out.println(word2CountRDD.toDebugString());
        System.out.println("*************************************");

        List<Tuple2<String, Integer>> result = word2CountRDD.collect();
        System.out.println(result);

        sc.stop();}
}

输入后果 :

(4) datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 []
 |  datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []
*************************************
(4) MapPartitionsRDD[2] at flatMap at WordCount.java:29 []
 |  datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 []
 |  datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []
*************************************
(4) MapPartitionsRDD[3] at mapToPair at WordCount.java:38 []
 |  MapPartitionsRDD[2] at flatMap at WordCount.java:29 []
 |  datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 []
 |  datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []
*************************************
(4) ShuffledRDD[4] at reduceByKey at WordCount.java:47 []
 +-(4) MapPartitionsRDD[3] at mapToPair at WordCount.java:38 []
    |  MapPartitionsRDD[2] at flatMap at WordCount.java:29 []
    |  datas/wc MapPartitionsRDD[1] at textFile at WordCount.java:25 []
    |  datas/wc HadoopRDD[0] at textFile at WordCount.java:25 []
*************************************
[(Spark,2), (Hello,4), (World,1), (Mayun,1)]

后面 4 是指分区

待持续。。。。

退出移动版