关于spark:第二篇Spark-core编程指南

50次阅读

共计 15531 个字符,预计需要花费 39 分钟才能阅读完成。

在《第一篇 |Spark 概览》一文中,对 Spark 的整体风貌进行了论述。本文将深刻探索 Spark 的外围组件 –Spark core,Spark Core 是 Spark 平台的根底通用执行引擎,所有其余性能均建设在该引擎之上。它不仅提供了内存计算性能来进步速度,而且还提供了通用的执行模型以反对各种应用程序,另外,用户能够应用 Java,Scala 和 Python API 开发应用程序。Spark core 是建设在对立的形象 RDD 之上的,这使得 Spark 的各个组件能够随便集成,能够在同一个应用程序中应用不同的组件以实现简单的大数据处理工作。本文次要探讨的内容有:

  • 什么是 RDD

    • RDD 的设计初衷
    • RDD 的基本概念与次要特点
    • 宽依赖与窄依赖
    • stage 划分与作业调度
  • RDD 操作算子

    • Transformations
    • Actions
  • 共享变量

    • 播送变量
    • 累加器
  • 长久化
  • 综合案例

什么是 RDD

设计初衷

RDD(Resilient Distributed Datasets)的设计之初是为了解决目前存在的一些计算框架对于两类利用场景的解决效率不高的问题,这两类利用场景是 迭代式算法 交互式数据挖掘 。在这两种利用场景中,通过将数据保留在内存中,能够将性能进步到几个数量级。对于 迭代式算法 而言,比方 PageRank、K-means 聚类、逻辑回归等,常常须要重用两头后果。另一种利用场景是 交互式数据挖掘,比方在同一份数据集上运行多个即席查问。大部分的计算框架(比方 Hadoop),应用两头计算结果的形式是将其写入到一个内部存储设备(比方 HDFS),这会减少额定的负载(数据复制、磁盘 IO 和序列化),由此会减少利用的执行工夫。

RDD 能够无效地反对少数利用中的数据重用,它是一种容错的、并行的数据结构,能够让用户显性地将两头后果长久化到内存中,并且能够通过分区来优化数据的寄存,另外,RDD 反对丰盛的算子操作,用户能够很容易地应用这些算子对 RDD 进行操作。

基本概念

一个 RDD 是一个分布式对象汇合,其本质是一个只读的、分区的记录汇合。每个 RDD 能够分成多个分区,不同的分区保留在不同的集群节点上 ( 具体如下图所示 )。RDD 是一种高度受限的共享内存模型,即 RDD 是只读的分区记录汇合,所以也就不能对其进行批改。只能通过两种形式创立 RDD,一种是基于物理存储的数据创立 RDD,另一种是通过在其余 RDD 上作用转换操作(transformation,比方 map、filter、join 等) 失去新的 RDD。

RDD 不须要被物化,它通过血缘关系 (lineage) 来确定其是从 RDD 计算得来的。另外,用户能够管制 RDD 的 长久化 分区 ,用户能够将须要被重用的 RDD 进行长久化操作(比方内存、或者磁盘) 以进步计算效率。也能够依照记录的 key 将 RDD 的元素散布在不同的机器上,比方在对两个数据集进行 JOIN 操作时,能够确保以雷同的形式进行 hash 分区。

次要特点

  • 基于内存

    RDD 是位于内存中的对象汇合。RDD 能够存储在内存、磁盘或者内存加磁盘中,然而,Spark 之所以速度快,是基于这样一个事实:数据存储在内存中,并且每个算子不会从磁盘上提取数据。

  • 分区

    分区是对逻辑数据集划分成不同的独立局部,分区是分布式系统性能优化的一种技术手段,能够缩小网络流量传输,将雷同的 key 的元素散布在雷同的分区中能够缩小 shuffle 带来的影响。RDD 被分成了多个分区,这些分区散布在集群中的不同节点。

  • 强类型

    RDD 中的数据是强类型的,当创立 RDD 的时候,所有的元素都是雷同的类型,该类型依赖于数据集的数据类型。

  • 懒加载

    Spark 的转换操作是懒加载模式,这就意味着只有在执行了 action(比方 count、collect 等)操作之后,才会去执行一些列的算子操作。

  • 不可批改

    RDD 一旦被创立,就不能被批改。只能从一个 RDD 转换成另外一个 RDD。

  • 并行化

    RDD 是能够被并行操作的,因为 RDD 是分区的,每个分区散布在不同的机器上,所以每个分区能够被并行操作。

  • 长久化

    因为 RDD 是懒加载的,只有 action 操作才会导致 RDD 的转换操作被执行,进而创立出绝对应的 RDD。对于一些被重复使用的 RDD,能够对其进行长久化操作(比方将其保留在内存或磁盘中,Spark 反对多种长久化策略),从而进步计算效率。

宽依赖和窄依赖

RDD 中不同的操作会使得不同 RDD 中的分区产不同的依赖,次要有两种依赖:宽依赖 窄依赖。宽依赖是指一个父 RDD 的一个分区对应一个子 RDD 的多个分区,窄依赖是指一个父 RDD 的分区对应与一个子 RDD 的分区,或者多个父 RDD 的分区对应一个子 RDD 分区。对于宽依赖与窄依赖,如下图所示:

Stage 划分

窄依赖会被划分到同一个 stage 中,这样能够以管道的模式迭代执行。宽依赖所依赖的分区个别有多个,所以须要跨节点传输数据。从容灾方面看,两种依赖的计算结果复原的形式是不同的,窄依赖只须要复原父 RDD 失落的分区即可,而宽依赖则须要思考复原所有父 RDD 失落的分区。

DAGScheduler 会将 Job 的 RDD 划分到不同的 stage 中,并构建一个 stage 的依赖关系,即 DAG。这样划分的目标是既能够保障没有依赖关系的 stage 能够并行执行,又能够保障存在依赖关系的 stage 程序执行。stage 次要分为两种类型,一种是ShuffleMapStage,另一种是ResultStage。其中 ShuffleMapStage 是属于上游的 stage,而 ResulStage 属于最上游的 stage,这意味着上游的 stage 先执行,最初执行 ResultStage。

  • ShuffleMapStage

ShuffleMapStage 是 DAG 调度流程的两头 stage,它能够蕴含一个或者多个 ShuffleMapTask,用与生成 Shuffle 的数据,ShuffleMapStage 能够是 ShuffleMapStage 的前置 stage,但肯定是 ResultStage 的前置 stage。局部源码如下:

private[spark] class ShuffleMapStage(
    id: Int,
    rdd: RDD[_],
    numTasks: Int,
    parents: List[Stage],
    firstJobId: Int,
    callSite: CallSite,
    val shuffleDep: ShuffleDependency[_, _, _],
    mapOutputTrackerMaster: MapOutputTrackerMaster)
  extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {// 省略代码}
}
  • ResultStage

ResultStage 能够应用指定的函数对 RDD 中的分区进行计算并失去最终后果,ResultStage 是最初执行的 stage,比方打印数据到控制台,或者将数据写入到内部存储设备等。局部源码如下:

private[spark] class ResultStage(
    id: Int,
    rdd: RDD[_],
    val func: (TaskContext, Iterator[_]) => _,
    val partitions: Array[Int],
    parents: List[Stage],
    firstJobId: Int,
    callSite: CallSite)
  extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) {// 省略代码}

下面提到 Spark 通过剖析各个 RDD 的依赖关系生成 DAG,通过各个 RDD 中的分区之间的依赖关系来决定如何划分 stage。具体的思路是:在 DAG 中进行反向解析,遇到宽依赖就断开、遇到窄依赖就把以后的 RDD 退出到以后的 stage 中。行将窄依赖划分到同一个 stage 中,从而造成一个 pipeline,晋升计算效率。所以一个 DAG 图能够划分为多个 stage,每个 stage 都代表了一组关联的,相互之间没有 shuffle 依赖关系的工作组成的 task 汇合,每个 task 汇合会被提交到 TaskScheduler 进行调度解决,最终将工作散发到 Executor 中进行执行。

Spark 作业调度流程

Spark 首先会对 Job 进行一系列的 RDD 转换操作,并通过 RDD 之间的依赖关系构建 DAG(Direct Acyclic Graph, 有向无环图)。而后依据 RDD 依赖关系将 RDD 划分到不同的 stage 中,每个 stage 依照 partition 的数量创立多个 Task,最初将这些 Task 提交到集群的 work 节点上执行。具体流程如下图所示:

  • 1. 构建 DAG,将 DAG 提交到调度零碎;
  • 2.DAGScheduler 负责接管 DAG,并将 DAG 划分成多个 stage,最初将每个 stage 中的 Task 以工作汇合 (TaskSet) 的模式提交个 TaskScheduler 做下一步解决;
  • 3. 应用集群管理器分配资源与任务调度,对于失败的工作会有相应的重试机制。TaskScheduler 负责从 DAGScheduler 接管 TaskSet,而后会创立 TaskSetManager 对 TaskSet 进行治理,最初由 SchedulerBackend 对 Task 进行调度;
  • 4. 执行具体的工作,并将工作的两头后果和最终后果存入存储体系。

RDD 操作算子

Spark 提供了丰盛的 RDD 操作算子,次要包含两大类:TransformationAction,上面会对一些常见的算子进行阐明。

Transformation

上面是一些常见的 transformation 操作,值得注意的是,对于一般的 RDD,反对 Scala、Java、Python 和 R 的 API,对于 pairRDD,仅反对 Scala 和 JavaAPI。上面将对一些常见的算子进行解释:

  • map(func)
  /**
   * 将每个元素传递到 func 函数中,并返回一个新的 RDD
   */
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }
  • filter(func)
/**
   * 筛选出满足 func 函数的元素,并返回一个新的 RDD
   */
  def filter(f: T => Boolean): RDD[T] = withScope {val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
      this,
      (context, pid, iter) => iter.filter(cleanF),
      preservesPartitioning = true)
  }
  • flatMap(func)
/**
   * 首先对该 RDD 所有元素利用 func 函数,而后将后果打平,一个元素会映射到 0 或者多个元素,返回一个新 RDD 
   */
  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  }
  • mapPartitions(func)
/**
   * 将 func 作用于该 RDD 的每个分区,返回一个新的 RDD
   */
  def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = withScope {val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
      preservesPartitioning)
  }
  • union(otherDataset)
/**
   * 返回一个新的 RDD,蕴含两个 RDD 的元素,相似于 SQL 的 UNION ALL
   */
  def union(other: RDD[T]): RDD[T] = withScope {sc.union(this, other)
  }
  • intersection(otherDataset)
/**
   * 返回一个新的 RDD,蕴含两个 RDD 的交加
   */
  def intersection(other: RDD[T]): RDD[T] = withScope {this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
        .filter {case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
        .keys
  }
  • distinct([numPartitions]))
 /**
   * 返回一个新的 RDD,对原 RDD 元素去重
   */
  def distinct(): RDD[T] = withScope {distinct(partitions.length)
  }
  • groupByKey([numPartitions])
/**
   * 将 pairRDD 依照 key 进行分组,该算子的性能开销较大,能够应用 PairRDDFunctions.aggregateByKey
   * 或者 PairRDDFunctions.reduceByKey 进行代替
   */
  def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {groupByKey(defaultPartitioner(self))
  }
  • reduceByKey(func, [numPartitions])
/**
   * 应用 reduce 函数对每个 key 对应的值进行聚合,该算子会在本地先对每个 mapper 后果进行合并,而后再将后果发送到 reducer,相似于 MapReduce 的 combiner 性能
   */
  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {reduceByKey(defaultPartitioner(self), func)
  }
  • aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
/**
   * 应用给定的聚合函数和初始值对每个 key 对应的 value 值进行聚合
   */
  def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
  }
  • sortByKey([ascending], [numPartitions])
/**
   * 依照 key 对 RDD 进行排序,所以每个分区的元素都是排序的
   */

  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
      : RDD[(K, V)] = self.withScope
  {val part = new RangePartitioner(numPartitions, self, ascending)
    new ShuffledRDD[K, V, V](self, part)
      .setKeyOrdering(if (ascending) ordering else ordering.reverse)
  }
  • join(otherDataset, [numPartitions])
/**
   * 将雷同 key 的 pairRDD JOIN 在一起,返回(k, (v1, v2))tuple 类型
   */
  def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {join(other, defaultPartitioner(self, other))
  }
  • cogroup(otherDataset, [numPartitions])
/**
   * 将雷同 key 的元素放在一组,返回的 RDD 类型为(K, (Iterable[V], Iterable[W1], Iterable[W2])
   * 第一个 Iterable 外面蕴含以后 RDD 的 key 对应的 value 值,第二个 Iterable 外面蕴含 W1 RDD 的 key 对应的    * value 值,第三个 Iterable 外面蕴含 W2 RDD 的 key 对应的 value 值
   */
  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {cogroup(other1, other2, new HashPartitioner(numPartitions))
  }
  • coalesce(numPartitions)
/**
   * 该函数用于将 RDD 进行重分区,应用 HashPartitioner。第一个参数为重分区的数目,第二个为是否进行      * shuffle,默认为 false;
   */
def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      val distributePartition = (index: Int, items: Iterator[T]) => {var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      new CoalescedRDD(new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
        new HashPartitioner(numPartitions)),
        numPartitions,
        partitionCoalescer).values
    } else {new CoalescedRDD(this, numPartitions, partitionCoalescer)
    }
  }
  • repartition(numPartitions)
/**
   * 能够减少或者缩小分区,底层调用的是 coalesce 办法。如果要缩小分区,倡议应用 coalesce,因为能够避    * 免 shuffle
   */
  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {coalesce(numPartitions, shuffle = true)
  }

Action

一些常见的 action 算子如下表所示

操作 含意
count() 返回数据集中的元素个数
collect() 以数组的模式返回数据集中的所有元素
first() 返回数据集中的第一个元素
take(n) 以数组的模式返回数据集中的前 n 个元素
reduce(func) 通过函数 func(输出两个参数并返回一个值)聚合数据集中的元素
foreach(func) 将数据集中的每个元素传递到函数 func 中运行

共享变量

Spark 提供了两种类型的共享变量:播送变量 累加器 。播送变量(Broadcast variables) 是一个只读的变量,并且在每个节点都保留一份正本,而不须要在集群中发送数据。累加器 (Accumulators) 能够将所有工作的数据累加到一个共享后果中。

播送变量

播送变量容许用户在集群中共享一个不可变的值,该共享的、不可变的值被持打算到集群的每台节点上。通常在须要将一份小数据集 (比方维表) 复制到集群中的每台节点时应用,比方日志剖析的利用,web 日志通常只蕴含 pageId,而每个 page 的题目保留在一张表中,如果要剖析日志(比方哪些 page 被拜访的最多),则须要将两者 join 在一起,这时就能够应用播送变量,将该表播送到集群的每个节点。具体如下图所示:

如上图,首先 Driver 将序列化对象宰割成小的数据库,而后将这些数据块存储在 Driver 节点的 BlockManager 上。当 ececutor 中执行具体的 task 时,每个 executor 首先尝试从本人所在节点的 BlockManager 提取数据,如果之前曾经提取的该播送变量的值,就间接应用它。如果没有找到,则会向近程的 Driver 或者其余的 Executor 中提取播送变量的值,一旦获取该值,就将其存储在本人节点的 BlockManager 中。这种机制能够防止 Driver 端向多个 executor 发送数据而造成的性能瓶颈。

根本应用形式如下:

// 模仿一个数据汇合
scala> val mockCollection = "Spark Flink Hadoop Hive".split(" ")
mockCollection: Array[String] = Array(Spark, Flink, Hadoop, Hive)
// 结构 RDD
scala> val words = sc.parallelize(mockCollection,2)
words: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:29
// 模仿播送变量数据
scala> val mapData = Map("Spark" -> 10, "Flink" -> 20,"Hadoop" -> 15, "Hive" -> 9)
mapData: scala.collection.immutable.Map[String,Int] = Map(Spark -> 10, Flink -> 20, Hadoop -> 15, Hive -> 9)
// 创立一个播送变量
scala> val broadCast = sc.broadcast(mapData)
broadCast: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[String,Int]] = Broadcast(4)
// 在算子外部应用播送变量, 依据 key 取出 value 值,按 value 升序排列
scala> words.map(word => (word,broadCast.value.getOrElse(word,0))).sortBy(wordPair => wordPair._2).collect
res5: Array[(String, Int)] = Array((Hive,9), (Spark,10), (Hadoop,15), (Flink,20))

累加器

累加器 (Accumulator) 是 Spark 提供的另外一个共享变量,与播送变量不同,累加器是能够被批改的,是可变的。每个 transformation 会将批改的累加器值传输到 Driver 节点,累加器能够实现一个累加的性能,相似于一个计数器。Spark 自身反对数字类型的累加器,用户也能够自定义累加器的类型。

根本应用

能够通过 sparkContext.longAccumulator() 或者 SparkContext.doubleAccumulator() 别离创立 Long 和 Double 类型的累加器。运行在集群中的 task 能够调用 add 办法对该累加器变量进行累加,然而不可能读取累加器的值,只有 Driver 程序能够通过调用 value 办法读取累加器的值。

object SparkAccumulator {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[2]").setAppName(SparkShareVariable.getClass.getSimpleName)
    val sc = new SparkContext(conf)
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
    val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
    val listRDD = sc.parallelize(list)
    var counter = 0 // 内部变量
    // 初始化一个 accumulator,初始值默认为 0
    val countAcc = sc.longAccumulator("my accumulator")
    val mapRDD = listRDD.map(num => {
      counter += 1 // 在算子外部应用了内部变量,这样操作不会扭转内部变量的值
      if (num % 3 == 0) {
        // 遇到 3 的倍数,累加器 +1
        countAcc.add(1)
      }
      num * 2
    })
    mapRDD.foreach(println)
    println("counter =" + counter) // counter = 0
    println("countAcc =" + countAcc.value) // countAcc = 4
    sc.stop()}
}

尖叫提醒:

咱们在 dirver 中申明的一些局部变量或者成员变量,能够间接在 transformation 中应用,然而通过 transformation 操作之后,是不会将最终的后果从新赋值给 dirver 中的对应的变量。因为通过 action 触发 transformation 操作之后,transformation 的操作都是通过 DAGScheduler 将代码打包,而后序列化,最初交由 TaskScheduler 传送到各个 Worker 节点中的 Executor 去执行,在 transformation 中执行的这些变量,是本人节点上的变量,不是 dirver 上最后的变量,只不过是将 driver 上的对应的变量拷贝了一份而已。

自定义累加器

Spark 提供了一些默认类型的累加器,同时也反对自定义累加器。通过继承 AccumulatorV2 类即可实现自定义累加器,具体代码如下:

class customAccumulator extends AccumulatorV2[BigInt, BigInt]{
  private var num:BigInt = 0
  /**
    * 返回该 accumulator 是否为 0 值,比方一个计数器,0 代表 zero,如果是一个 list,Nil 代表 zero 
    */
  def isZero: Boolean = {this.num == 0}
  // 创立一个该 accumulator 正本
  def copy(): AccumulatorV2[BigInt, BigInt] = {new customAccumulator}
  /**
    * 重置 accumulator 的值, 该值为 0,调用 `isZero` 必须返回 true
    */
  def reset(): Unit = {this.num = 0}
  // 依据输出的值,进行累加,// 判断为偶数时,累加器加上该值
  def add(intVal: BigInt): Unit = {if(intVal % 2 == 0){this.num += intVal}
  }
  /**
    * 合并其余的同一类型的 accumulator,并更新该 accumulator 值
    */
  def merge(other: AccumulatorV2[BigInt, BigInt]): Unit = {this.num += other.value}
  /**
    * 定义以后 accumulator 的值
    */
  def value: BigInt = {this.num}
}

应用该自定义累加器

    val acc = new customAccumulator
    val newAcc = sc.register(acc, "evenAcc")
    println(acc.value)
    sc.parallelize(Array(1, 2, 3, 4)).foreach(x => acc.add(x))
    println(acc.value)

长久化

长久化办法

在 Spark 中,RDD 采纳惰性求值的机制,每次遇到 action 操作,都会从头开始执行计算。每次调用 action 操作,都会触发一次从头开始的计算。对于须要被重复使用的 RDD,spark 反对对其进行长久化,通过调用 persist()或者 cache()办法即可实现 RDD 的持打算。通过长久化机制能够防止反复计算带来的开销。值得注意的是,当调用长久化的办法时,只是对该 RDD 标记为了长久化,须要等到第一次执行 action 操作之后,才会把计算结果进行长久化。长久化后的 RDD 将会被保留在计算节点的内存中被前面的口头操作重复使用。

Spark 提供的两个长久化办法的次要区别是:cache()办法默认应用的是内存级别,其底层调用的是 persist()办法,具体源码片段如下:

def persist(newLevel: StorageLevel): this.type = {if (isLocallyCheckpointed) {persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
    } else {persist(newLevel, allowOverride = false)
    }
  }

  /**
   * 应用默认的存储级别长久化 RDD (`MEMORY_ONLY`).
   */
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

  /**
   * 应用默认的存储级别长久化 RDD (`MEMORY_ONLY`).
   */
  def cache(): this.type = persist()

/**
  * 手动地把长久化的 RDD 从缓存中移除
   */
  def unpersist(blocking: Boolean = true): this.type = {logInfo("Removing RDD" + id + "from persistence list")
    sc.unpersistRDD(id, blocking)
    storageLevel = StorageLevel.NONE
    this
  }

持打算存储级别

Spark 的提供了多种长久化级别,比方内存、磁盘、内存 + 磁盘等。具体如下表所示:

Storage Level Meaning
MEMORY_ONLY 默认,示意将 RDD 作为反序列化的 Java 对象存储于 JVM 中,如果内存不够用,则局部分区不会被长久化,等到应用到这些分区时,会从新计算。
MEMORY_AND_DISK 将 RDD 作为反序列化的 Java 对象存储在 JVM 中,如果内存不足,超出的分区将会被寄存在硬盘上.
MEMORY_ONLY_SER (Java and Scala) 将 RDD 序列化为 Java 对象进行长久化,每个分区对应一个字节数组。此形式比反序列化要节俭空间,然而会占用更多 cpu 资源
MEMORY_AND_DISK_SER (Java and Scala) 与 MEMORY_ONLY_SER, 如果内存放不下,则溢写到磁盘。
DISK_ONLY 将 RDD 的分区数据存储到磁盘
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 与下面的形式相似, 然而会将分区数据复制到两个集群
OFF_HEAP (experimental) 与 MEMORY_ONLY_SER 相似, 将数据存储到堆外内存 off-heap,须要将 off-heap 开启

长久化级别的抉择

Spark 提供的长久化存储级别是在 内存应用 CPU 效率 之间做衡量,通常举荐上面的抉择形式:

  • 如果内存能够包容 RDD,能够应用默认的长久化级别,即 MEMORY_ONLY。这是 CPU 最有效率的抉择,能够使作用在 RDD 上的算子尽可能第疾速执行。
  • 如果内存不够用,能够尝试应用 MEMORY_ONLY_SER,应用一个疾速的序列化库能够节俭很多空间,比方 Kryo。

    tips:在一些 shuffle 算子中,比方 reduceByKey,即使没有显性调用 persist 办法,Spark 也会主动将两头后果进行长久化,这样做的目标是防止在 shuffle 期间产生故障而造成从新计算整个输出。即便如此,还是举荐对须要被重复使用的 RDD 进行长久化解决。

综合案例

  • case 1
/**
  *  1. 数据集
  *          [orderId,userId,payment,productId]
  *          1,108,280,1002
  *          2,202,300,2004
  *          3,210,588,3241
  *          4,198,5000,3567
  *          5,200,590,2973
  *          6,678,8000,18378
  *          7,243,200,2819
  *          8,236,7890,2819
  *  2. 需要形容
  *           计算 Top3 订单金额
  *           
  *  3. 后果输入
  *             1    8000
  *             2    7890
  *          3    5000       
  */
object TopOrder {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("TopN").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    val lines = sc.textFile("E://order.txt")
    var num = 0;
    val result = lines.filter(line => (line.trim().length > 0) && (line.split(",").length == 4))
      .map(_.split(",")(2))     // 取出领取金额
      .map(x => (x.toInt,""))   
      .sortByKey(false)         // 依照领取金额降序排列    
      .map(x => x._1).take(3)   // 取出前 3 个
      .foreach(x => {
        num = num + 1
        println(num + "\t" + x)
      })
  } 
}
  • case 2
/**
  * 1. 数据集(movielensSet)
  *        用户电影评分数据[UserID::MovieID::Rating::Timestamp]
  *        电影名称数据[MovieId::MovieName::MovieType]
  * 2. 需要形容
  *        求均匀评分大于 5 的电影名称
  *
  */
object MovieRating {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("MovieRating").setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    // 用户电影评分数据[UserID::MovieID::Rating::Timestamp]
    val userRating = sc.textFile("E://ml-1m/ratings.dat")
    // 电影名称数据[MovieId::MovieName::MovieType]
    val movies = sc.textFile("E://ml-1m/movies.dat")
    // 提取电影 id 和评分,(MovieID, Rating)
    val movieRating = userRating.map { line => {val rating = line.split("::")
      (rating(1).toInt, rating(2).toDouble)
    }
    }
    // 计算电影 id 及其均匀评分,(MovieId,AvgRating)
    val movieAvgRating = movieRating
      .groupByKey()
      .map { rating =>
          val avgRating = rating._2.sum / rating._2.size
          (rating._1, avgRating)
      }
    // 提取电影 id 和电影名称,(MovieId,MovieName)
   val movieName =  movies.map { movie =>
        val fields = movie.split("::")
        (fields(0).toInt, fields(1))

    }.keyBy(_._1)

    movieAvgRating
      .keyBy(_._1)
      .join(movieName) // Join 的后果(MovieID,((MovieID,AvgRating),(MovieID,MovieName)))
      .filter(joinData => joinData._2._1._2 > 5.0)
      .map(rs => (rs._1,rs._2._1._2,rs._2._2._2))
      .saveAsTextFile("E:/MovieRating/")
  }

}

总结

本文对 Spark Core 进行了具体解说,次要包含 RDD 的基本概念、RDD 的操作算子、共享变量以及持打算,最初给出两个残缺的 Spark Core 编程案例。下一篇将分享 Spark SQL 编程指南。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包

正文完
 0