前言
第一篇传送门:https://segmentfault.com/a/1190000020841646
RDD 认知
RDD 是什么?
RDD: 弹性分布式数据集(Resiliennt Distributed Datasets)
转为格式 RDD 的几种方式:
1. parallelize:
rdd = sc.parallelize([1,2,3,4,5]) # 里面传的就是普通 python 类型
2. 读文件 / 读数据库 / 读 ES 等各种方式,此处以读文件为例:rdd = sc.textFile('file:///home/lin/data/hello.txt')
RDD 核心概念
Application:
application: 一个 app 就是一个自定义的 py 脚本(被 spark-submit 提交的)或一个 spark-shell
app = 1 个 driver + 多个 executors(相当于多个进程)
注意:数据在不同的 app 之间 不能被共享,若想要共享(需要考虑外部存储)
Driver:
每一个.py 脚本中都有一个 sparkcontext,它就是 driver
Worker Node:
相当于 standalone 的 slave 节点
Executor:
Executor(进程):每个 Driver 中都有多个 Executors
并且可以运行多个 Tasks
Job:
job: 对应下面即将要说的 action : collect() 等
一个 task 对应 一个 job (一个 transformation 对应 一个 action)
一个 job 对应 多个 task (多个 transformations 链式调用之后,再调用一个 action)
Task:
task: 对应下面即将要说的 transformation :map() 等
每个 task 可用一个线程执行。多个 task 可并行
Stage:
一个 job 被切分为多份
Cluster Manager:
管理 从 Standalone, YARN, Mesos 中获取的资源
就是 --master 指定的参数
其中 还包括 空间 内存等参数配置
Cache:
缓存:### persist & cache & unpersist 三种 API 可供选择
Lineage(依赖, 血缘关系):
依赖:父 子 孙
RDD1 -> map-> RDD2 -> filter-> RDD3
服务器 1:part1 -> part1-> part1
服务器 2:part2 -> part2-> part2
服务器 3:part3 -> part3-> part3
如上图: 假如 RDD3 的 part2 挂了,那么就会退回到 RDD2 的 part2 再计算一遍。而不是回到 "最初" 的起点。窄依赖(Narrow, 依赖的很少,很窄):重点: '子 part' 只依赖一个 '父 part'。map, filter 等: 元素被摊分在每一个 part 中,子 part 出错就找 "对应"(一个)父 part 即可。宽依赖(Wide, 依赖的很多,很宽):重点: '子 part' 依赖多个 '父 part' 同时计算得到。shuffle 操作: xxBy, join 等:子 part 出错 找 "对应"(多个)父 part 重新共同计算。
stage:
遇到 1 个宽依赖,就会做 shuffle 操作。然后就会把 "之前" 的“所有窄依赖”划分为 "1 个 stage"。最后,整体全部,也当作 "1 个 stage"。
官档图
传送门:http://spark.apache.org/docs/latest/cluster-overview.html
RDD 两大算子
Transformation(Lazy)
主要机制:各种操作不会被立刻执行,但这些操作之间的关系会被记录下来,等待下面 action 调用。
直观理解举例:
1. 像 sqlalchemy 中的 filter(), groupby(), page()等操作
2. 像 tensorflow1.x 中的 sess.run() 之前的各种操作
3. 像 数据库的事务,在提交之前的各种操作
接下来介绍,Transformation 的各种操作。
map
同 python 的 map。你只需记住 RDD 类型里面包裹的就是我们熟悉的 python 类型
所以:python 的 map 怎么用,RDD 对象的 map 就怎么用,下面 filter 同理
只举一个语法格式例子:(下面同理)rdd.map(lambda x:x+1)
filter
同上,同 python
flatMap
和 map 几乎差不多。唯一有一点区别:map 每次基于单个元素,返回什么,那最终结果就是什么(最后拼成序列)。flatMap 每次基于单个元素,若返回的是序列(列表等),那么会自动被解包,并一字排开返回。
groupBy 和 groupByKey
说一下 没有 key, 和 带有 key 的区别(后面同理,就不啰嗦了):
没有 key:
1. 一般必须需要一个 函数句柄 (lambda), 而这个句柄是针对(操作后新形成的 key)使用的
2. 针对一层序列 [, , ...]
带有 key
1. 一般无参
2. 针对双层序列 [(),(),...]
直接上例子了(对比着看):
rdd1 = sc.parallelize(['a','b','c','a']) # 一层序列
rdd2 = sc.parallelize([('a',1),('b',2), ('c',3), ('a',4)] ) # 双层序列
group1 = rdd1.groupBy(lambda x:x) # 针对 一层序列,注意这里,必须写 函数句柄
group2 = rdd2.groupByKey() # 针对 双层序列
print(group1.collect() )
print(group2.collect() )
# 可以这样告诉你,他们俩的最外层结果是一样的: [{key:value}, ...],
结果如下 ~~~~
[('a', <pyspark.resultiterable.ResultIterable object at 0x7fb7e4384c88>),
('b', <pyspark.resultiterable.ResultIterable object at 0x7fb7e43848d0>),
('c', <pyspark.resultiterable.ResultIterable object at 0x7fb7e4384940>)
]
# 如果加了 count(), 那么它们的结果就是一样的了,返回统计的个数,等到 action 再说。
reduceByKey
照应双层或多层序列,或者 承接 groupByKey()
rdd = sc.parallelize(['Tom', 'Jerry', 'Tom', 'Putch'])
rdd.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).collect()
# 结果(可以忽略上面的 collect(), 它属于 action,放在这里方便贴个结果)>> [('Tom', 2), ('Jerry', 1), ('Putch', 1)]
sortBy 和 sortByKey
同上,这里只说下(这两种只能对 key 排序):
默认升序
降序可指定参数:ascending=False
另一种业务:(基于 value 的排序):可以尝试先用 map-lambda 交换元素次序,然后再排序,最后再用一次 map-lambda 交换回来
union
rdd1.union(rdd2) # 相当于 python 的 "列表加法" 或者 python 的 "extend"
distinct
rdd.distinct() # 去重
join
前提:(我的理解就是,能转化成 python 字典的列表格式即可)
eg: [[1,2], [3,4], [5,6] ]
两层列表
每层列表的每个元素中,只有 2 个元素
错误格式示例:[['a','b','c'], ['d','e','f']]
也不能说错误吧,不过若是这种 3 个 - 多个子元素的格式,join 时默认会取前 2 个元素。其余丢弃。
内连接(innerJoin):
左外连接(leftOuterJoin):
右外连接(rightOuterJoin):
全外连接(fullOuterJoin):
完整示例:
rdd1 = sc.parallelize([['a','b'], ['d','e']] ) # 左
rdd2 = sc.parallelize([['a','c'], ['e','f']] ) # 右
# 开头说过: 能转化成字典的列表格式即可,或者你可以写成这样(但是不能传原生字典进去):rdd1 = sc.parallelize(list({'a': 'b', 'd': 'e'}.items()) )
rdd2 = sc.parallelize(list({'a': 'c', 'e': 'f'}.items()) )
# 内连接(交集)print(rdd1.join(rdd2).collect()) # [('a', ('b', 'c'))]
# 左连接(左并集)print(rdd1.leftOuterJoin(rdd2).collect()) # [('d', ('e', None)), ('a', ('b', 'c'))]
# 右连接(右并集)print(rdd1.rightOuterJoin(rdd2).collect()) # [('a', ('b', 'c')), ('e', (None, 'f'))]
# 全连接(并集)print(rdd1.fullOuterJoin(rdd2).collect()) # [('d', ('e', None)), ('a', ('b', 'c')), ('e', (None, 'f'))]
persist & cache & unpersist
cache(): 缓存
persist(): 持久化
unpersist(): 清空缓存(他属于 action- 立即触发,为了方便对比,我就一起放到了这里)
官档:http://spark.apache.org/docs/…
Action (Commit)
主要机制:拿到 transformation 记录的关系,用 action 的各种操作来真正触发、执行、返回结果。
对应上面,继续直观举例:
1. 像 sqlalchemy 中的 commit()
2. 像 tensorflow1.x 中的 sess.run()
3. 像 数据库的事务的 "提交"
接下来介绍,Action 的各种操作。
collect
执行 transformation 记录的关系 并 返回结果,在 Pyspark 中就是 RDD 类型 转 Python 数据类型。
(中间你可以链式调用各种 transformation 方法,结尾调用一个 collect(), 就可以出结果了)
rdd1.xx().xx().collect()
count
统计元素项的个数,同上语法, 同上理念,触发返回结果
rdd2 = sc.parallelize([['a','c','d'], ['e','f','g']] )
rdd2.count() # 无参
>> 2
reduce
rdd2 = sc.parallelize([['a','c','d'], ['e','f','g']] )
rdd2.reduce(lambda x,y:x+y) # 参数为 2 个参数的函数句柄,做 "累" 的操作,(累加,累乘)等
>> ['a', 'c', 'd', 'e', 'f', 'g']
take
相当于 mysql 的 limit 操作,取前 n 个
rdd2 = sc.parallelize([['a','c','d'], ['e','f','g']] )
rdd2.take(0) # []
rdd2.take(1) # [['a', 'c', 'd']]
rdd2.take(2) # [['a', 'c', 'd'], ['e', 'f', 'g']]
再次强调:take 的参数是,个数的意思,而不是索引,不要混淆额
top
返回最大的 n 个元素(会自动给你排序的)
rdd2 = sc.parallelize([1,2,3,8,5,3,6,8])
rdd2.top(3)
>> [8, 8, 6]
foreach
遍历每个元素,对子元素做 - 对应函数句柄的操作,下面说这个 action 的两点注意事项:
注意 1:无返回值(返回 None)
注意 2:通常用作 print(), 但是它不会在 notebook 中打印,而是在你后台开启的 spark 中打印。
rdd2 = sc.parallelize([['a','c','d'], ['e','f','g']] )
rdd2.foreach(lambda x:print(x))
>> ['a', 'c', 'd']
['e', 'f', 'g']
saveAsTextFile
rdd = sc.textFile('file:///home/lin/data')
rdd.saveAsTextFile('file:///home/lin/mydata')
# 这里有个注意事项:saveAsTextFile 的参数路径不能在都进来的路径范围内。# 或者说,读是从这个文件夹 A(这是最后一级的目录)读进来的,写就不能写入文件夹 A 了
# 另外,mydata 是目录名,进去你会看见 part-00000 这样的文件名,这才是真数据文件。
Spark 优化相关
序列化:
好处 1:网络传输必备
好处 2:节省内存
两种方式序列化方式:1. Java 内部序列化(默认,较慢,但兼容性好)2. Kryo(较快,但兼容性不太好)
内存管理:
可分为 execution(进程执行)和 storage(存储)
execution 相关操作: shuffle, join, sort, aggregation
storage 相关操作 : cache,
特点:
execution 和 storage 共享整体内存:execution 起到 "存霸" 的角色:
1. 若 execution 区域内存 不够用了,它会去抢夺 storage 区域的内存(不归还)2. 当然,可以为 storage 设置阈值(必须给 storage 留下多少)
具体分配多少:
总内存 = n
execution 内存 = (总内存 - 300M) * 50%
storage 内存 = (总内存 - 300M) * 50%
说白了,就是留给 JVM 300M,然后 execution 和 storage 各分一半。
查看内存占用情况
可通过 WebUI 查看(序列化后存储,通常会节省内存)
Broadcasting Variable
情景:正常来说,每个 task(map, filter 等)都会占用 1 份数据,100 个 task 就会拿 100 份数据。
这种情况造成了数据的冗余,BroadCasting Variable(广播变量)就是解决这一问题的。