Spark学习记录之SparkCore初步

概念

Spark是一种基于内存的疾速、通用、可扩大的大数据分析计算引擎。

蕴含的模块有,Spark Core,Spark SQL,Spark Streaming,Spark MLib,Spark GraphX

Spark Submit例子

  1. Standalone

    bin/spark-submit \--class org.apache.spark.examples.SparkPi \--master spark://spark111:7077 \./examples/jars/spark-examples_2.12-3.0.0.jar \1000
  2. Yarn

    bin/spark-submit \--class org.apache.spark.examples.SparkPi \--master yarn \--deploy-mode cluster \./examples/jars/spark-examples_2.12-3.0.0.jar \100bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode client \ ./examples/jars/spark-examples_2.12-3.0.0.jar \ 100

Spark On Yarn

两种模式的区别在于:Driver程序运行的节点不同。Client模式将用于监控和调度的Driver模块启动在客户端,而Cluster则将Driver模块启动在集群中。因而Client模式个别用于测试,Cluster则用于生产部署。

  1. Yarn Client模式

    • Driver在提交工作的本地运行
    • Driver启动后,会和ReourceManager通信,申请启动ApplicationMaster
    • ResourceManager调配Container,在适合的NodeManager上启动ApplicationMaster,负责向ResourceManager申请Executor的资源
    • ResourceManager接到ApplicationMaster的申请后,调配Container,而后ApplicationMaster在资源分配指定的NodeManager上启动Executor过程。
    • Executor过程启动后,会反向注册到Driver,Executor全副启动实现后,Driver开始执行main函数
    • 执行到Action算子时,触发一个job,并依据宽依赖开始切分stage,每个stage生成对应的TaskSet,之后Driver会将task散发到各个Executor上执行
  2. Yarn Cluster模式

    • 在Yarn Cluster模式下,工作提交后会向ResourceManager通信申请启动ApplicationMaster
    • 随后ResourceManger调配Container,在适合的NodeManager上启动ApplicationMaster,此时ApplicationMaster就是Driver
    • Driver启动后,向ResourceManager申请Exexutor资源。
    • ResourceManager调配Contaioner,ApplicationMaster在对应的NodeManager上启动Excutor。Executor启动后会向Driver进行反向注册
    • 当Executor全副启动结束后,Driver开始执行main函数
    • 执行到Action算子时,触发一个job,并依据宽依赖切分stage,每个stage上生成对应的TaskSet,之后Driver回将task散发到各个Executor上执行

RDD

RDD(Resilient Distributed Dataset),弹性分布式数据集

  1. 分区,与Hadoop MapReduce的比照

    • MapReduce有切片和分区,这是两个不同的概念,切片次要作用于MapTask阶段,以TextInputFormat为例,切片依据文件块大小来决定,默认状况下,切片大小就等于块大小,当然切片大小可通过配置调节,个别状况下,块大小为128M或256M,以磁盘速度决定。一个文件会以切片大小来切分为多个逻辑上的小文件,因而切片个数就等于MapTask的个数,即MapTask的并行数
    • MapReduce的分区是要在代码中指定设置的,默认为一个分区,分区个数对应的是ReduceTask的个数。默认的分区器Partitioner是HashPartitioner,以(key & Long.MAX_VALUE)%numReduceTasks计算得来,即在HashPartitioner的计算逻辑中,设定多少个numReduceTasks就会有多少个分区。用户能够通过继承Partioner来自定义分区器,以实现指定的分区个数。
    • MapReduce的分区数也会作用在MapTask阶段,在数据处理map办法进入环形缓冲区前会给数据标记上分区,在环形缓冲区的溢写排序和多个溢写文件的排序合并中都会以分区为单位进行
    • RDD中,一个stage中的task数量,是以该stage中的最初一个算子的分区数决定的
  2. 5个外围属性

    • 分区列表

      protected def getPartitions: Array[Partition]
    • 分区计算函数

      def compute(split: Partition, context: TaskContext): Iterator[T]
    • RDD之间的依赖关系

      protected def getDependencies: Seq[Dependency[_]] = deps
    • 分区器(可选)

      @transient val partitioner: Option[Partitioner] = None
    • 首选地位(可选)

      protected def getPreferredLocations(split: Partition): Seq[String] = Nil
  3. 分区与并行度

    分区与并行度有关系,然而是不同的概念。多个分区会有多个task,然而只有一个Executor的话,也就只能是并发,而非并行。分区的意思是写代码的人心愿在足够资源的状况下能够达到分区数的并行度。

    分区内计算有序,分区间计算无序

    • markRDD的分区,从汇合中创立

      • numSlices = 循环i = 0~3,文件length,
      • start = ((i * length)/numSlices).toInt
      • end = (((i +1) * length)/numSlices).toInt

        [1,2,3,4,5] numSlices = 2length = 50    => [0,2)    =>    1,21    => [2,5)    =>    3,4,5
        def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {    if (numSlices < 1) {      throw new IllegalArgumentException("Positive number of partitions required")    }    // Sequences need to be sliced at the same set of index positions for operations    // like RDD.zip() to behave as expected    def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {      (0 until numSlices).iterator.map { i =>        val start = ((i * length) / numSlices).toInt        val end = (((i + 1) * length) / numSlices).toInt        (start, end)      }    }      //...省略多行代码  }
    • textFile的分区

      • 采纳Hadoop的读文件形式,TextInputformat
      • 以行为单位进行读取
      • 读取数据时以偏移量为单位
      • 偏移量不会反复读取

        1234567@@    => 01234567889@@         => 91011120            => 1314 / 2 = 7[0,7]        => 1234567@@[7,14]       => 89@@                0
        public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {    //...省略多行代码    long totalSize = 0L;    //...省略多行代码    totalSize += file.getLen();    //...省略多行代码    long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);        long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize);    //...省略多行代码    //blockSize,本地运行环境,32M,生产128M或者256M    long blockSize = file.getBlockSize();    long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);    //...省略多行代码    for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {        splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap);        splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts[0], splitHosts[1]));    }    //...省略多行代码}protected long computeSplitSize(long goalSize, long minSize, long blockSize) {    return Math.max(minSize, Math.min(goalSize, blockSize));}