在《第一篇|Spark概览》一文中,对Spark的整体风貌进行了论述。本文将深刻探索Spark的外围组件–Spark core,Spark Core是Spark平台的根底通用执行引擎,所有其余性能均建设在该引擎之上。它不仅提供了内存计算性能来进步速度,而且还提供了通用的执行模型以反对各种应用程序,另外,用户能够应用Java,Scala和Python API开发应用程序。Spark core是建设在对立的形象RDD之上的,这使得Spark的各个组件能够随便集成,能够在同一个应用程序中应用不同的组件以实现简单的大数据处理工作。本文次要探讨的内容有:
-
什么是RDD
- RDD的设计初衷
- RDD的基本概念与次要特点
- 宽依赖与窄依赖
- stage划分与作业调度
-
RDD操作算子
- Transformations
- Actions
-
共享变量
- 播送变量
- 累加器
- 长久化
- 综合案例
什么是RDD
设计初衷
RDD(Resilient Distributed Datasets)的设计之初是为了解决目前存在的一些计算框架对于两类利用场景的解决效率不高的问题,这两类利用场景是迭代式算法和交互式数据挖掘。在这两种利用场景中,通过将数据保留在内存中,能够将性能进步到几个数量级。对于迭代式算法而言,比方PageRank、K-means聚类、逻辑回归等,常常须要重用两头后果。另一种利用场景是交互式数据挖掘,比方在同一份数据集上运行多个即席查问。大部分的计算框架(比方Hadoop),应用两头计算结果的形式是将其写入到一个内部存储设备(比方HDFS),这会减少额定的负载(数据复制、磁盘IO和序列化),由此会减少利用的执行工夫。
RDD能够无效地反对少数利用中的数据重用,它是一种容错的、并行的数据结构,能够让用户显性地将两头后果长久化到内存中,并且能够通过分区来优化数据的寄存,另外,RDD反对丰盛的算子操作,用户能够很容易地应用这些算子对RDD进行操作。
基本概念
一个RDD是一个分布式对象汇合,其本质是一个只读的、分区的记录汇合。每个RDD能够分成多个分区,不同的分区保留在不同的集群节点上(具体如下图所示)。RDD是一种高度受限的共享内存模型,即RDD是只读的分区记录汇合,所以也就不能对其进行批改。只能通过两种形式创立RDD,一种是基于物理存储的数据创立RDD,另一种是通过在其余RDD上作用转换操作(transformation,比方map、filter、join等)失去新的RDD。
RDD不须要被物化,它通过血缘关系(lineage)来确定其是从RDD计算得来的。另外,用户能够管制RDD的长久化和分区,用户能够将须要被重用的RDD进行长久化操作(比方内存、或者磁盘)以进步计算效率。也能够依照记录的key将RDD的元素散布在不同的机器上,比方在对两个数据集进行JOIN操作时,能够确保以雷同的形式进行hash分区。
次要特点
- 基于内存
RDD是位于内存中的对象汇合。RDD能够存储在内存、磁盘或者内存加磁盘中,然而,Spark之所以速度快,是基于这样一个事实:数据存储在内存中,并且每个算子不会从磁盘上提取数据。
- 分区
分区是对逻辑数据集划分成不同的独立局部,分区是分布式系统性能优化的一种技术手段,能够缩小网络流量传输,将雷同的key的元素散布在雷同的分区中能够缩小shuffle带来的影响。RDD被分成了多个分区,这些分区散布在集群中的不同节点。
- 强类型
RDD中的数据是强类型的,当创立RDD的时候,所有的元素都是雷同的类型,该类型依赖于数据集的数据类型。
- 懒加载
Spark的转换操作是懒加载模式,这就意味着只有在执行了action(比方count、collect等)操作之后,才会去执行一些列的算子操作。
- 不可批改
RDD一旦被创立,就不能被批改。只能从一个RDD转换成另外一个RDD。
- 并行化
RDD是能够被并行操作的,因为RDD是分区的,每个分区散布在不同的机器上,所以每个分区能够被并行操作。
- 长久化
因为RDD是懒加载的,只有action操作才会导致RDD的转换操作被执行,进而创立出绝对应的RDD。对于一些被重复使用的RDD,能够对其进行长久化操作(比方将其保留在内存或磁盘中,Spark反对多种长久化策略),从而进步计算效率。
宽依赖和窄依赖
RDD中不同的操作会使得不同RDD中的分区产不同的依赖,次要有两种依赖:宽依赖和窄依赖。宽依赖是指一个父RDD的一个分区对应一个子RDD的多个分区,窄依赖是指一个父RDD的分区对应与一个子RDD的分区,或者多个父RDD的分区对应一个子RDD分区。对于宽依赖与窄依赖,如下图所示:
Stage划分
窄依赖会被划分到同一个stage中,这样能够以管道的模式迭代执行。宽依赖所依赖的分区个别有多个,所以须要跨节点传输数据。从容灾方面看,两种依赖的计算结果复原的形式是不同的,窄依赖只须要复原父RDD失落的分区即可,而宽依赖则须要思考复原所有父RDD失落的分区。
DAGScheduler会将Job的RDD划分到不同的stage中,并构建一个stage的依赖关系,即DAG。这样划分的目标是既能够保障没有依赖关系的stage能够并行执行,又能够保障存在依赖关系的stage程序执行。stage次要分为两种类型,一种是ShuffleMapStage,另一种是ResultStage。其中ShuffleMapStage是属于上游的stage,而ResulStage属于最上游的stage,这意味着上游的stage先执行,最初执行ResultStage。
- ShuffleMapStage
ShuffleMapStage是DAG调度流程的两头stage,它能够蕴含一个或者多个ShuffleMapTask,用与生成Shuffle的数据,ShuffleMapStage能够是ShuffleMapStage的前置stage,但肯定是ResultStage的前置stage。局部源码如下:
private[spark] class ShuffleMapStage(
id: Int,
rdd: RDD[_],
numTasks: Int,
parents: List[Stage],
firstJobId: Int,
callSite: CallSite,
val shuffleDep: ShuffleDependency[_, _, _],
mapOutputTrackerMaster: MapOutputTrackerMaster)
extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {
// 省略代码
}
}
- ResultStage
ResultStage能够应用指定的函数对RDD中的分区进行计算并失去最终后果,ResultStage是最初执行的stage,比方打印数据到控制台,或者将数据写入到内部存储设备等。局部源码如下:
private[spark] class ResultStage(
id: Int,
rdd: RDD[_],
val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int],
parents: List[Stage],
firstJobId: Int,
callSite: CallSite)
extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) {
// 省略代码
}
下面提到Spark通过剖析各个RDD的依赖关系生成DAG,通过各个RDD中的分区之间的依赖关系来决定如何划分stage。具体的思路是:在DAG中进行反向解析,遇到宽依赖就断开、遇到窄依赖就把以后的RDD退出到以后的stage中。行将窄依赖划分到同一个stage中,从而造成一个pipeline,晋升计算效率。所以一个DAG图能够划分为多个stage,每个stage都代表了一组关联的,相互之间没有shuffle依赖关系的工作组成的task汇合,每个task汇合会被提交到TaskScheduler进行调度解决,最终将工作散发到Executor中进行执行。
Spark作业调度流程
Spark首先会对Job进行一系列的RDD转换操作,并通过RDD之间的依赖关系构建DAG(Direct Acyclic Graph,有向无环图)。而后依据RDD依赖关系将RDD划分到不同的stage中,每个stage依照partition的数量创立多个Task,最初将这些Task提交到集群的work节点上执行。具体流程如下图所示:
- 1.构建DAG,将DAG提交到调度零碎;
- 2.DAGScheduler负责接管DAG,并将DAG划分成多个stage,最初将每个stage中的Task以工作汇合(TaskSet)的模式提交个TaskScheduler做下一步解决;
- 3.应用集群管理器分配资源与任务调度,对于失败的工作会有相应的重试机制。TaskScheduler负责从DAGScheduler接管TaskSet,而后会创立TaskSetManager对TaskSet进行治理,最初由SchedulerBackend对Task进行调度;
- 4.执行具体的工作,并将工作的两头后果和最终后果存入存储体系。
RDD操作算子
Spark提供了丰盛的RDD操作算子,次要包含两大类:Transformation与Action,上面会对一些常见的算子进行阐明。
Transformation
上面是一些常见的transformation操作,值得注意的是,对于一般的RDD,反对Scala、Java、Python和R的API,对于pairRDD,仅反对Scala和JavaAPI。上面将对一些常见的算子进行解释:
- map(func)
/**
* 将每个元素传递到func函数中,并返回一个新的RDD
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
- filter(func)
/**
* 筛选出满足func函数的元素,并返回一个新的RDD
*/
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}
- flatMap(func)
/**
* 首先对该RDD所有元素利用func函数,而后将后果打平,一个元素会映射到0或者多个元素,返回一个新RDD
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
- mapPartitions(func)
/**
* 将func作用于该RDD的每个分区,返回一个新的RDD
*/
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
- union(otherDataset)
/**
* 返回一个新的RDD,蕴含两个RDD的元素,相似于SQL的UNION ALL
*/
def union(other: RDD[T]): RDD[T] = withScope {
sc.union(this, other)
}
- intersection(otherDataset)
/**
* 返回一个新的RDD,蕴含两个RDD的交加
*/
def intersection(other: RDD[T]): RDD[T] = withScope {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
}
- distinct([numPartitions]))
/**
* 返回一个新的RDD,对原RDD元素去重
*/
def distinct(): RDD[T] = withScope {
distinct(partitions.length)
}
- groupByKey([numPartitions])
/**
* 将pairRDD依照key进行分组,该算子的性能开销较大,能够应用PairRDDFunctions.aggregateByKey
*或者PairRDDFunctions.reduceByKey进行代替
*/
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(defaultPartitioner(self))
}
- reduceByKey(func, [numPartitions])
/**
* 应用reduce函数对每个key对应的值进行聚合,该算子会在本地先对每个mapper后果进行合并,而后再将后果发送到reducer,相似于MapReduce的combiner性能
*/
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}
- aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
/**
* 应用给定的聚合函数和初始值对每个key对应的value值进行聚合
*/
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
}
- sortByKey([ascending], [numPartitions])
/**
* 依照key对RDD进行排序,所以每个分区的元素都是排序的
*/
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
- join(otherDataset, [numPartitions])
/**
* 将雷同key的pairRDD JOIN在一起,返回(k, (v1, v2))tuple类型
*/
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
join(other, defaultPartitioner(self, other))
}
- cogroup(otherDataset, [numPartitions])
/**
* 将雷同key的元素放在一组,返回的RDD类型为(K, (Iterable[V], Iterable[W1], Iterable[W2])
* 第一个Iterable外面蕴含以后RDD的key对应的value值,第二个Iterable外面蕴含W1 RDD的key对应的 * value值,第三个Iterable外面蕴含W2 RDD的key对应的value值
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope {
cogroup(other1, other2, new HashPartitioner(numPartitions))
}
- coalesce(numPartitions)
/**
* 该函数用于将RDD进行重分区,应用HashPartitioner。第一个参数为重分区的数目,第二个为是否进行 * shuffle,默认为false;
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
- repartition(numPartitions)
/**
* 能够减少或者缩小分区,底层调用的是coalesce办法。如果要缩小分区,倡议应用coalesce,因为能够避 * 免shuffle
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
Action
一些常见的action算子如下表所示
操作 | 含意 |
---|---|
count() | 返回数据集中的元素个数 |
collect() | 以数组的模式返回数据集中的所有元素 |
first() | 返回数据集中的第一个元素 |
take(n) | 以数组的模式返回数据集中的前n个元素 |
reduce(func) | 通过函数func(输出两个参数并返回一个值)聚合数据集中的元素 |
foreach(func) | 将数据集中的每个元素传递到函数func中运行 |
共享变量
Spark提供了两种类型的共享变量:播送变量和累加器。播送变量(Broadcast variables)是一个只读的变量,并且在每个节点都保留一份正本,而不须要在集群中发送数据。累加器(Accumulators)能够将所有工作的数据累加到一个共享后果中。
播送变量
播送变量容许用户在集群中共享一个不可变的值,该共享的、不可变的值被持打算到集群的每台节点上。通常在须要将一份小数据集(比方维表)复制到集群中的每台节点时应用,比方日志剖析的利用,web日志通常只蕴含pageId,而每个page的题目保留在一张表中,如果要剖析日志(比方哪些page被拜访的最多),则须要将两者join在一起,这时就能够应用播送变量,将该表播送到集群的每个节点。具体如下图所示:
如上图,首先Driver将序列化对象宰割成小的数据库,而后将这些数据块存储在Driver节点的BlockManager上。当ececutor中执行具体的task时,每个executor首先尝试从本人所在节点的BlockManager提取数据,如果之前曾经提取的该播送变量的值,就间接应用它。如果没有找到,则会向近程的Driver或者其余的Executor中提取播送变量的值,一旦获取该值,就将其存储在本人节点的BlockManager中。这种机制能够防止Driver端向多个executor发送数据而造成的性能瓶颈。
根本应用形式如下:
// 模仿一个数据汇合
scala> val mockCollection = "Spark Flink Hadoop Hive".split(" ")
mockCollection: Array[String] = Array(Spark, Flink, Hadoop, Hive)
// 结构RDD
scala> val words = sc.parallelize(mockCollection,2)
words: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:29
// 模仿播送变量数据
scala> val mapData = Map("Spark" -> 10, "Flink" -> 20,"Hadoop" -> 15, "Hive" -> 9)
mapData: scala.collection.immutable.Map[String,Int] = Map(Spark -> 10, Flink -> 20, Hadoop -> 15, Hive -> 9)
// 创立一个播送变量
scala> val broadCast = sc.broadcast(mapData)
broadCast: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[String,Int]] = Broadcast(4)
// 在算子外部应用播送变量,依据key取出value值,按value升序排列
scala> words.map(word => (word,broadCast.value.getOrElse(word,0))).sortBy(wordPair => wordPair._2).collect
res5: Array[(String, Int)] = Array((Hive,9), (Spark,10), (Hadoop,15), (Flink,20))
累加器
累加器(Accumulator)是Spark提供的另外一个共享变量,与播送变量不同,累加器是能够被批改的,是可变的。每个transformation会将批改的累加器值传输到Driver节点,累加器能够实现一个累加的性能,相似于一个计数器。Spark自身反对数字类型的累加器,用户也能够自定义累加器的类型。
根本应用
能够通过sparkContext.longAccumulator()
或者SparkContext.doubleAccumulator()
别离创立Long和Double类型的累加器。运行在集群中的task能够调用add办法对该累加器变量进行累加,然而不可能读取累加器的值,只有Driver程序能够通过调用value办法读取累加器的值。
object SparkAccumulator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(SparkShareVariable.getClass.getSimpleName)
val sc = new SparkContext(conf)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
val listRDD = sc.parallelize(list)
var counter = 0 //内部变量
//初始化一个accumulator,初始值默认为0
val countAcc = sc.longAccumulator("my accumulator")
val mapRDD = listRDD.map(num => {
counter += 1 //在算子外部应用了内部变量,这样操作不会扭转内部变量的值
if (num % 3 == 0) {
//遇到3的倍数,累加器+1
countAcc.add(1)
}
num * 2
})
mapRDD.foreach(println)
println("counter = " + counter) // counter = 0
println("countAcc = " + countAcc.value) // countAcc = 4
sc.stop()
}
}
尖叫提醒:
咱们在dirver中申明的一些局部变量或者成员变量,能够间接在transformation中应用,然而通过transformation操作之后,是不会将最终的后果从新赋值给dirver中的对应的变量。因为通过action触发transformation操作之后,transformation的操作都是通过DAGScheduler将代码打包,而后序列化,最初交由TaskScheduler传送到各个Worker节点中的Executor去执行,在transformation中执行的这些变量,是本人节点上的变量,不是dirver上最后的变量,只不过是将driver上的对应的变量拷贝了一份而已。
自定义累加器
Spark提供了一些默认类型的累加器,同时也反对自定义累加器。通过继承AccumulatorV2类即可实现自定义累加器,具体代码如下:
class customAccumulator extends AccumulatorV2[BigInt, BigInt]{
private var num:BigInt = 0
/**
* 返回该accumulator是否为0值,比方一个计数器,0代表zero,如果是一个list,Nil代表zero
*/
def isZero: Boolean = {
this.num == 0
}
// 创立一个该accumulator正本
def copy(): AccumulatorV2[BigInt, BigInt] = {
new customAccumulator
}
/**
* 重置accumulator的值, 该值为0,调用 `isZero` 必须返回true
*/
def reset(): Unit = {
this.num = 0
}
// 依据输出的值,进行累加,
// 判断为偶数时,累加器加上该值
def add(intVal: BigInt): Unit = {
if(intVal % 2 == 0){
this.num += intVal
}
}
/**
* 合并其余的同一类型的accumulator,并更新该accumulator值
*/
def merge(other: AccumulatorV2[BigInt, BigInt]): Unit = {
this.num += other.value
}
/**
* 定义以后accumulator的值
*/
def value: BigInt = {
this.num
}
}
应用该自定义累加器
val acc = new customAccumulator
val newAcc = sc.register(acc, "evenAcc")
println(acc.value)
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => acc.add(x))
println(acc.value)
长久化
长久化办法
在Spark中,RDD采纳惰性求值的机制,每次遇到action操作,都会从头开始执行计算。每次调用action操作,都会触发一次从头开始的计算。对于须要被重复使用的RDD,spark反对对其进行长久化,通过调用persist()或者cache()办法即可实现RDD的持打算。通过长久化机制能够防止反复计算带来的开销。值得注意的是,当调用长久化的办法时,只是对该RDD标记为了长久化,须要等到第一次执行action操作之后,才会把计算结果进行长久化。长久化后的RDD将会被保留在计算节点的内存中被前面的口头操作重复使用。
Spark提供的两个长久化办法的次要区别是:cache()办法默认应用的是内存级别,其底层调用的是persist()办法,具体源码片段如下:
def persist(newLevel: StorageLevel): this.type = {
if (isLocallyCheckpointed) {
persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
} else {
persist(newLevel, allowOverride = false)
}
}
/**
* 应用默认的存储级别长久化RDD (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/**
* 应用默认的存储级别长久化RDD (`MEMORY_ONLY`).
*/
def cache(): this.type = persist()
/**
* 手动地把长久化的RDD从缓存中移除
*/
def unpersist(blocking: Boolean = true): this.type = {
logInfo("Removing RDD " + id + " from persistence list")
sc.unpersistRDD(id, blocking)
storageLevel = StorageLevel.NONE
this
}
持打算存储级别
Spark的提供了多种长久化级别,比方内存、磁盘、内存+磁盘等。具体如下表所示:
Storage Level | Meaning |
---|---|
MEMORY_ONLY | 默认,示意将RDD作为反序列化的Java对象存储于JVM中,如果内存不够用,则局部分区不会被长久化,等到应用到这些分区时,会从新计算。 |
MEMORY_AND_DISK | 将RDD作为反序列化的Java对象存储在JVM中,如果内存不足,超出的分区将会被寄存在硬盘上. |
MEMORY_ONLY_SER (Java and Scala) | 将RDD序列化为Java对象进行长久化,每个分区对应一个字节数组。此形式比反序列化要节俭空间,然而会占用更多cpu资源 |
MEMORY_AND_DISK_SER (Java and Scala) | 与 MEMORY_ONLY_SER, 如果内存放不下,则溢写到磁盘。 |
DISK_ONLY | 将RDD的分区数据存储到磁盘 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | 与下面的形式相似,然而会将分区数据复制到两个集群 |
OFF_HEAP (experimental) | 与MEMORY_ONLY_SER相似,将数据存储到堆外内存 off-heap,须要将off-heap 开启 |
长久化级别的抉择
Spark提供的长久化存储级别是在内存应用与CPU效率之间做衡量,通常举荐上面的抉择形式:
- 如果内存能够包容RDD,能够应用默认的长久化级别,即MEMORY_ONLY。这是CPU最有效率的抉择,能够使作用在RDD上的算子尽可能第疾速执行。
-
如果内存不够用,能够尝试应用MEMORY_ONLY_SER,应用一个疾速的序列化库能够节俭很多空间,比方 Kryo 。
tips:在一些shuffle算子中,比方reduceByKey,即使没有显性调用persist办法,Spark也会主动将两头后果进行长久化,这样做的目标是防止在shuffle期间产生故障而造成从新计算整个输出。即便如此,还是举荐对须要被重复使用的RDD进行长久化解决。
综合案例
- case 1
/**
* 1.数据集
* [orderId,userId,payment,productId]
* 1,108,280,1002
* 2,202,300,2004
* 3,210,588,3241
* 4,198,5000,3567
* 5,200,590,2973
* 6,678,8000,18378
* 7,243,200,2819
* 8,236,7890,2819
* 2.需要形容
* 计算Top3订单金额
*
* 3.后果输入
* 1 8000
* 2 7890
* 3 5000
*/
object TopOrder {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("TopN").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val lines = sc.textFile("E://order.txt")
var num = 0;
val result = lines.filter(line => (line.trim().length > 0) && (line.split(",").length == 4))
.map(_.split(",")(2)) // 取出领取金额
.map(x => (x.toInt,""))
.sortByKey(false) // 依照领取金额降序排列
.map(x => x._1).take(3) // 取出前3个
.foreach(x => {
num = num + 1
println(num + "\t" + x)
})
}
}
- case 2
/**
* 1.数据集(movielensSet)
* 用户电影评分数据[UserID::MovieID::Rating::Timestamp]
* 电影名称数据[MovieId::MovieName::MovieType]
* 2.需要形容
* 求均匀评分大于5的电影名称
*
*/
object MovieRating {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("MovieRating").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
// 用户电影评分数据[UserID::MovieID::Rating::Timestamp]
val userRating = sc.textFile("E://ml-1m/ratings.dat")
// 电影名称数据[MovieId::MovieName::MovieType]
val movies = sc.textFile("E://ml-1m/movies.dat")
//提取电影id和评分,(MovieID, Rating)
val movieRating = userRating.map { line => {
val rating = line.split("::")
(rating(1).toInt, rating(2).toDouble)
}
}
// 计算电影id及其均匀评分,(MovieId,AvgRating)
val movieAvgRating = movieRating
.groupByKey()
.map { rating =>
val avgRating = rating._2.sum / rating._2.size
(rating._1, avgRating)
}
//提取电影id和电影名称,(MovieId,MovieName)
val movieName = movies.map { movie =>
val fields = movie.split("::")
(fields(0).toInt, fields(1))
}.keyBy(_._1)
movieAvgRating
.keyBy(_._1)
.join(movieName) // Join的后果(MovieID,((MovieID,AvgRating),(MovieID,MovieName)))
.filter(joinData => joinData._2._1._2 > 5.0)
.map(rs => (rs._1,rs._2._1._2,rs._2._2._2))
.saveAsTextFile("E:/MovieRating/")
}
}
总结
本文对Spark Core进行了具体解说,次要包含RDD的基本概念、RDD的操作算子、共享变量以及持打算,最初给出两个残缺的Spark Core编程案例。下一篇将分享Spark SQL编程指南。
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包