一、实质

Spark是一个分布式的计算框架,是下一代的MapReduce,扩大了MR的数据处理流程

二、mapreduce有什么问题

1.调度慢,启动map、reduce太耗时

2.计算慢,每一步都要保留两头后果落磁盘

3.API形象简略,只有map和reduce两个原语

4.不足作业流形容,一项工作须要多轮mr

三、spark解决了什么问题

1.最大化利用内存cache

2.两头后果放内存,减速迭代

3.将后果集放内存,减速后续查问和解决,解决运行慢的问题

select * from table where col1 > 50

rdd.registerastable(cachetable)

SQL:

select col2, max (col3) from cachetable group by col2

select col3, max (col2) from cachetable group by col3

  1. 更丰盛的API(Transformation类和Actions类)
  2. 残缺作业形容,将用户的整个作业串起来

val file = sc.textFile(hdfs://input)

val counts = file.flatMap(

line => line.split(" "))

.map(word => (word, 1))

.reduceByKey(_ + _)

counts.saveAsTextFile(hdfs://output)

  1. 因为Excutor过程能够运行多个Task线程,因此实现了多线程的操作,放慢了处理速度

四、Spark外围—RDD( Resilient Distributed Dataset 弹性分布式数据集模型)

1.四个特色

– RDD使用户可能显式将计算结果保留在内存中,控制数据的划分

– 记录数据的变换和形容,而不是数据自身,以保障容错

– 懒操作,提早计算,action的时候才操作

– 瞬时性,用时才产生,用完就开释

2.四种构建办法

– 从共享文件系统中获取,如从HDFS中读数据构建RDD

• val a = sc.textFile(“/xxx/yyy/file”)

– 通过现有RDD转换失去

• val b = a.map(x => (x, 1))

– 定义一个scala数组

• val c = sc.parallelize(1 to 10, 1)

– 由一个曾经存在的RDD通过长久化操作生成

• val d = a.persist(), a. saveAsHadoopFile(“/xxx/yyy/zzz”)

3.partition和依赖

– 每个RDD蕴含了数据分块/分区(partition)的汇合,每个partition是不可分割的

– 每个partition的计算就是一个task,task是调度的根本单位

– 与父RDD的依赖关系(rddA=>rddB)

宽依赖: B的每个partition依赖于A的所有partition

• 比方groupByKey、reduceByKey、join……,由A产生B时会先对A做shuffle分桶

窄依赖: B的每个partition依赖于A的常数个partition

• 比方map、filter、union……

4.stage和依赖

– 从后往前,将宽依赖的边删掉,大数据培训连通重量及其在原图中所有依赖的RDD,形成一个stage

– 每个stage外部尽可能多地蕴含一组具备窄依赖关系的转换,并将它们流水线并行化

5.数据局部性准则

– 如果一个工作须要的数据在某个节点的内存中,这个工作就会被调配至那个节点

– 须要的数据在某个节点的文件系统中,就调配至那个节点

6.容错性准则

– 如果此task失败,AM会重新分配task

– 如果task依赖的下层partition数据曾经生效了,会先将其依赖的partition计算工作再重算一遍

• 宽依赖中被依赖partition,能够将数据保留HDFS,以便疾速重构(checkpoint)

• 窄依赖只依赖下层一个partition,复原代价较少

– 能够指定保留一个RDD的数据至节点的cache中,如果内存不够,会LRU开释一部分,仍有重构的可能

五、Spark零碎架构

1.Excutor的内存分为三块:

1)task执行代码所需的内存,占总内存的20%;

2)task通过shuffle过程拉取上一个stage的task的输入后,进行聚合操作时应用,占20%

3)让RDD长久化时应用,默认占executor总内存的60%

2.Excutor的cpu core:

每个core同一时间只能执行一个线程

六、Spark资源参数和开发调优

1.七个参数

• num-executors:该作业总共须要多少executor过程执行

倡议:每个作业运行个别设置5-~100个左右较适合

• executor-memory:设置每个executor过程的内存, num-executors* executor-memory代表作业申请的总内存量(尽量不要超过最大总内存的1/3~1/2)

倡议:设置4G~8G较适合

• executor-cores: 每个executor过程的CPU Core数量,该参数决定每个executor过程并行执行task线程的能力,num-executors * executor-cores代表作业申请总CPU core数(不要超过总CPU Core的1/3~1/2 )

倡议:设置2~4个较适合

• driver-memory: 设置Driver过程的内存

倡议:通常不必设置,个别1G就够了,若呈现应用collect算子将RDD数据全副拉取到Driver上解决,就必须确保该值足够大,否则OOM内存溢出

• spark.default.parallelism: 每个stage的默认task数量

倡议:设置500~1000较适合,默认一个HDFS的block对应一个task,Spark默认值偏少,这样导致不能充分利用资源

• spark.storage.memoryFraction: 设置RDD长久化数据在executor内存中能占的比例,默认0.6,即默认executor 60%的内存能够保留长久化RDD数据

倡议:若有较多的长久化操作,能够设置高些,超出内存的会频繁gc导致运行迟缓

• spark.shuffle.memoryFraction: 聚合操作占executor内存的比例,默认0.2

倡议:若长久化操作较少,但shuffle较多时,能够升高长久化内存占比,进步shuffle操作内存占比

spark-submit:

2.六个准则

• 防止创立反复的RDD

• 尽可能复用同一个RDD

 

• 对屡次应用的RDD进行长久化解决

 

• 防止应用shuffle类算子

如:groupByKey、reduceByKey、join等

 

• 应用map-side预聚合的shuffle操作

肯定要应用shuffle的,无奈用map类算子代替的,那么尽量应用map-site预聚合的算子,如可能的状况下应用reduceByKey或aggregateByKey算子代替groupByKey算子

• 应用Kryo优化序列化性能

Kryo是一个序列化类库,来优化序列化和反序列化性能, Spark反对应用Kryo序列化库,性能比Java序列化库高10倍左右

 

七、Spark技术栈

 • Spark Core: 基于RDD提供操作接口,利用DAG进行对立的工作布局

• Spark SQL: Hive的表 + Spark的里。通过把Hive的HQL转化为Spark DAG计算来实现

• Spark Streaming: Spark的流式计算框架,提早在1S左右,mini batch的解决办法

• MLIB: Spark的机器学习库,蕴含罕用的机器学习算法

• GraphX: Spark图并行操作库