共计 4211 个字符,预计需要花费 11 分钟才能阅读完成。
Spark 学习记录之 SparkCore 初步
概念
Spark 是一种基于内存的疾速、通用、可扩大的大数据分析计算引擎。
蕴含的模块有,Spark Core,Spark SQL,Spark Streaming,Spark MLib,Spark GraphX
Spark Submit 例子
-
Standalone
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://spark111:7077 \ ./examples/jars/spark-examples_2.12-3.0.0.jar \ 1000
-
Yarn
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ ./examples/jars/spark-examples_2.12-3.0.0.jar \ 100 bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode client \ ./examples/jars/spark-examples_2.12-3.0.0.jar \ 100
Spark On Yarn
两种模式的区别在于:Driver 程序运行的节点不同。Client 模式将用于监控和调度的 Driver 模块启动在客户端,而 Cluster 则将 Driver 模块启动在集群中。因而 Client 模式个别用于测试,Cluster 则用于生产部署。
-
Yarn Client 模式
- Driver 在提交工作的本地运行
- Driver 启动后,会和 ReourceManager 通信,申请启动 ApplicationMaster
- ResourceManager 调配 Container,在适合的 NodeManager 上启动 ApplicationMaster,负责向 ResourceManager 申请 Executor 的资源
- ResourceManager 接到 ApplicationMaster 的申请后,调配 Container,而后 ApplicationMaster 在资源分配指定的 NodeManager 上启动 Executor 过程。
- Executor 过程启动后,会反向注册到 Driver,Executor 全副启动实现后,Driver 开始执行 main 函数
- 执行到 Action 算子时,触发一个 job,并依据宽依赖开始切分 stage,每个 stage 生成对应的 TaskSet,之后 Driver 会将 task 散发到各个 Executor 上执行
-
Yarn Cluster 模式
- 在 Yarn Cluster 模式下,工作提交后会向 ResourceManager 通信申请启动 ApplicationMaster
- 随后 ResourceManger 调配 Container,在适合的 NodeManager 上启动 ApplicationMaster,此时 ApplicationMaster 就是 Driver
- Driver 启动后,向 ResourceManager 申请 Exexutor 资源。
- ResourceManager 调配 Contaioner,ApplicationMaster 在对应的 NodeManager 上启动 Excutor。Executor 启动后会向 Driver 进行反向注册
- 当 Executor 全副启动结束后,Driver 开始执行 main 函数
- 执行到 Action 算子时,触发一个 job,并依据宽依赖切分 stage,每个 stage 上生成对应的 TaskSet,之后 Driver 回将 task 散发到各个 Executor 上执行
RDD
RDD(Resilient Distributed Dataset),弹性分布式数据集
-
分区,与 Hadoop MapReduce 的比照
- MapReduce 有切片和分区,这是两个不同的概念,切片次要作用于 MapTask 阶段,以 TextInputFormat 为例,切片依据文件块大小来决定,默认状况下,切片大小就等于块大小,当然切片大小可通过配置调节,个别状况下,块大小为 128M 或 256M,以磁盘速度决定。一个文件会以切片大小来切分为多个逻辑上的小文件,因而切片个数就等于 MapTask 的个数,即 MapTask 的并行数
- MapReduce 的分区是要在代码中指定设置的,默认为一个分区,分区个数对应的是 ReduceTask 的个数。默认的分区器 Partitioner 是 HashPartitioner,以 (key & Long.MAX_VALUE)%numReduceTasks 计算得来,即在 HashPartitioner 的计算逻辑中,设定多少个 numReduceTasks 就会有多少个分区。用户能够通过继承 Partioner 来自定义分区器,以实现指定的分区个数。
- MapReduce 的分区数也会作用在 MapTask 阶段,在数据处理 map 办法进入环形缓冲区前会给数据标记上分区,在环形缓冲区的溢写排序和多个溢写文件的排序合并中都会以分区为单位进行
- RDD 中,一个 stage 中的 task 数量,是以该 stage 中的最初一个算子的分区数决定的
-
5 个外围属性
-
分区列表
protected def getPartitions: Array[Partition]
-
分区计算函数
def compute(split: Partition, context: TaskContext): Iterator[T]
-
RDD 之间的依赖关系
protected def getDependencies: Seq[Dependency[_]] = deps
-
分区器(可选)
@transient val partitioner: Option[Partitioner] = None
-
首选地位(可选)
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
-
-
分区与并行度
分区与并行度有关系,然而是不同的概念。多个分区会有多个 task,然而只有一个 Executor 的话,也就只能是并发,而非并行。分区的意思是写代码的人心愿在足够资源的状况下能够达到分区数的并行度。
分区内计算有序,分区间计算无序
-
markRDD 的分区,从汇合中创立
- numSlices = 循环 i = 0~3,文件 length,
- start = ((i * length)/numSlices).toInt
-
end = (((i +1) * length)/numSlices).toInt
[1,2,3,4,5] numSlices = 2 length = 5 0 => [0,2) => 1,2 1 => [2,5) => 3,4,5
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {if (numSlices < 1) {throw new IllegalArgumentException("Positive number of partitions required") } // Sequences need to be sliced at the same set of index positions for operations // like RDD.zip() to behave as expected def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {(0 until numSlices).iterator.map { i => val start = ((i * length) / numSlices).toInt val end = (((i + 1) * length) / numSlices).toInt (start, end) } } //... 省略多行代码 }
-
textFile 的分区
- 采纳 Hadoop 的读文件形式,TextInputformat
- 以行为单位进行读取
- 读取数据时以偏移量为单位
-
偏移量不会反复读取
1234567@@ => 012345678 89@@ => 9101112 0 => 13 14 / 2 = 7 [0,7] => 1234567@@ [7,14] => 89@@ 0
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { //... 省略多行代码 long totalSize = 0L; //... 省略多行代码 totalSize += file.getLen(); //... 省略多行代码 long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits); long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize); //... 省略多行代码 //blockSize,本地运行环境,32M,生产 128M 或者 256M long blockSize = file.getBlockSize(); long splitSize = this.computeSplitSize(goalSize, minSize, blockSize); //... 省略多行代码 for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap); splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts[0], splitHosts[1])); } //... 省略多行代码 } protected long computeSplitSize(long goalSize, long minSize, long blockSize) {return Math.max(minSize, Math.min(goalSize, blockSize)); }
-
正文完