共计 3082 个字符,预计需要花费 8 分钟才能阅读完成。
一、实质
Spark 是一个分布式的计算框架,是下一代的 MapReduce,扩大了 MR 的数据处理流程
二、mapreduce 有什么问题
1. 调度慢,启动 map、reduce 太耗时
2. 计算慢,每一步都要保留两头后果落磁盘
3.API 形象简略,只有 map 和 reduce 两个原语
4. 不足作业流形容,一项工作须要多轮 mr
三、spark 解决了什么问题
1. 最大化利用内存 cache
2. 两头后果放内存,减速迭代
3. 将后果集放内存,减速后续查问和解决,解决运行慢的问题
select * from table where col1 > 50
rdd.registerastable(cachetable)
SQL:
select col2, max (col3) from cachetable group by col2
select col3, max (col2) from cachetable group by col3
- 更丰盛的 API(Transformation 类和 Actions 类)
- 残缺作业形容,将用户的整个作业串起来
val file = sc.textFile(hdfs://input)
val counts = file.flatMap(
line => line.split(” “))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile(hdfs://output)
- 因为 Excutor 过程能够运行多个 Task 线程,因此实现了多线程的操作,放慢了处理速度
四、Spark 外围—RDD(Resilient Distributed Dataset 弹性分布式数据集模型)
1. 四个特色
– RDD 使用户可能显式将计算结果保留在内存中,控制数据的划分
– 记录数据的变换和形容,而不是数据自身,以保障容错
– 懒操作,提早计算,action 的时候才操作
– 瞬时性,用时才产生,用完就开释
2. 四种构建办法
– 从共享文件系统中获取,如从 HDFS 中读数据构建 RDD
• val a = sc.textFile(“/xxx/yyy/file”)
– 通过现有 RDD 转换失去
• val b = a.map(x => (x, 1))
– 定义一个 scala 数组
• val c = sc.parallelize(1 to 10, 1)
– 由一个曾经存在的 RDD 通过长久化操作生成
• val d = a.persist(), a. saveAsHadoopFile(“/xxx/yyy/zzz”)
3.partition 和依赖
– 每个 RDD 蕴含了数据分块 / 分区(partition)的汇合,每个 partition 是不可分割的
– 每个 partition 的计算就是一个 task,task 是调度的根本单位
– 与父 RDD 的依赖关系(rddA=>rddB)
宽依赖:B 的每个 partition 依赖于 A 的所有 partition
• 比方 groupByKey、reduceByKey、join……,由 A 产生 B 时会先对 A 做 shuffle 分桶
窄依赖:B 的每个 partition 依赖于 A 的常数个 partition
• 比方 map、filter、union……
4.stage 和依赖
– 从后往前,将宽依赖的边删掉,大数据培训连通重量及其在原图中所有依赖的 RDD,形成一个 stage
– 每个 stage 外部尽可能多地蕴含一组具备窄依赖关系的转换,并将它们流水线并行化
5. 数据局部性准则
– 如果一个工作须要的数据在某个节点的内存中,这个工作就会被调配至那个节点
– 须要的数据在某个节点的文件系统中,就调配至那个节点
6. 容错性准则
– 如果此 task 失败,AM 会重新分配 task
– 如果 task 依赖的下层 partition 数据曾经生效了,会先将其依赖的 partition 计算工作再重算一遍
• 宽依赖中被依赖 partition,能够将数据保留 HDFS,以便疾速重构(checkpoint)
• 窄依赖只依赖下层一个 partition,复原代价较少
– 能够指定保留一个 RDD 的数据至节点的 cache 中,如果内存不够,会 LRU 开释一部分,仍有重构的可能
五、Spark 零碎架构
1.Excutor 的内存分为三块:
1)task 执行代码所需的内存,占总内存的 20%;
2)task 通过 shuffle 过程拉取上一个 stage 的 task 的输入后,进行聚合操作时应用,占 20%
3) 让 RDD 长久化时应用,默认占 executor 总内存的 60%
2.Excutor 的 cpu core:
每个 core 同一时间只能执行一个线程
六、Spark 资源参数和开发调优
1. 七个参数
• num-executors:该作业总共须要多少 executor 过程执行
倡议:每个作业运行个别设置 5 -~100 个左右较适合
• executor-memory:设置每个 executor 过程的内存,num-executors* executor-memory 代表作业申请的总内存量(尽量不要超过最大总内存的 1 /3~1/2)
倡议:设置 4G~8G 较适合
• executor-cores:每个 executor 过程的 CPU Core 数量,该参数决定每个 executor 过程并行执行 task 线程的能力,num-executors * executor-cores 代表作业申请总 CPU core 数(不要超过总 CPU Core 的 1 /3~1/2)
倡议:设置 2~4 个较适合
• driver-memory:设置 Driver 过程的内存
倡议:通常不必设置,个别 1G 就够了,若呈现应用 collect 算子将 RDD 数据全副拉取到 Driver 上解决,就必须确保该值足够大,否则 OOM 内存溢出
• spark.default.parallelism:每个 stage 的默认 task 数量
倡议:设置 500~1000 较适合,默认一个 HDFS 的 block 对应一个 task,Spark 默认值偏少,这样导致不能充分利用资源
• spark.storage.memoryFraction:设置 RDD 长久化数据在 executor 内存中能占的比例,默认 0.6,即默认 executor 60% 的内存能够保留长久化 RDD 数据
倡议:若有较多的长久化操作,能够设置高些,超出内存的会频繁 gc 导致运行迟缓
• spark.shuffle.memoryFraction:聚合操作占 executor 内存的比例,默认 0.2
倡议:若长久化操作较少,但 shuffle 较多时,能够升高长久化内存占比,进步 shuffle 操作内存占比
spark-submit:
2. 六个准则
• 防止创立反复的 RDD
• 尽可能复用同一个 RDD
• 对屡次应用的 RDD 进行长久化解决
• 防止应用 shuffle 类算子
如:groupByKey、reduceByKey、join 等
• 应用 map-side 预聚合的 shuffle 操作
肯定要应用 shuffle 的,无奈用 map 类算子代替的,那么尽量应用 map-site 预聚合的算子,如可能的状况下应用 reduceByKey 或 aggregateByKey 算子代替 groupByKey 算子
• 应用 Kryo 优化序列化性能
Kryo 是一个序列化类库,来优化序列化和反序列化性能,Spark 反对应用 Kryo 序列化库,性能比 Java 序列化库高 10 倍左右
七、Spark 技术栈
• Spark Core:基于 RDD 提供操作接口,利用 DAG 进行对立的工作布局
• Spark SQL:Hive 的表 + Spark 的里。通过把 Hive 的 HQL 转化为 Spark DAG 计算来实现
• Spark Streaming:Spark 的流式计算框架,提早在 1S 左右,mini batch 的解决办法
• MLIB:Spark 的机器学习库,蕴含罕用的机器学习算法
• GraphX:Spark 图并行操作库