乐趣区

PY-PySparkSpark-CoreRDD

前言

第一篇传送门:https://segmentfault.com/a/1190000020841646

RDD 认知

RDD 是什么?

RDD: 弹性分布式数据集(Resiliennt Distributed Datasets)

转为格式 RDD 的几种方式:

1. parallelize:  
    rdd = sc.parallelize([1,2,3,4,5])   # 里面传的就是普通 python 类型
    
2. 读文件 / 读数据库 / 读 ES 等各种方式,此处以读文件为例:rdd = sc.textFile('file:///home/lin/data/hello.txt')

RDD 核心概念

Application:

application: 一个 app 就是一个自定义的 py 脚本(被 spark-submit 提交的)或一个 spark-shell
app = 1 个 driver + 多个 executors(相当于多个进程)

注意:数据在不同的 app 之间 不能被共享,若想要共享(需要考虑外部存储)

Driver:

每一个.py 脚本中都有一个 sparkcontext,它就是 driver

Worker Node:

相当于 standalone 的 slave 节点

Executor:

Executor(进程):每个 Driver 中都有多个 Executors

并且可以运行多个 Tasks

Job:

job:  对应下面即将要说的 action   : collect() 等

一个 task 对应 一个 job  (一个 transformation 对应 一个 action)
一个 job 对应 多个 task  (多个 transformations 链式调用之后,再调用一个 action)

Task:

task: 对应下面即将要说的 transformation   :map() 等
每个 task 可用一个线程执行。多个 task 可并行

Stage:

一个 job 被切分为多份

Cluster Manager:

管理 从 Standalone, YARN, Mesos 中获取的资源
就是 --master 指定的参数
其中 还包括 空间 内存等参数配置

Cache:

缓存:### persist & cache & unpersist 三种 API 可供选择

Lineage(依赖, 血缘关系):

依赖:父              子               孙
                    RDD1  -> map->  RDD2 -> filter-> RDD3
    服务器 1:part1 ->        part1->          part1
    服务器 2:part2 ->        part2->          part2
    服务器 3:part3 ->        part3->          part3

    如上图: 假如 RDD3 的 part2 挂了,那么就会退回到 RDD2 的 part2 再计算一遍。而不是回到 "最初" 的起点。窄依赖(Narrow, 依赖的很少,很窄):重点:  '子 part' 只依赖一个 '父 part'。map, filter 等:  元素被摊分在每一个 part 中,子 part 出错就找 "对应"(一个)父 part 即可。宽依赖(Wide, 依赖的很多,很宽):重点:  '子 part' 依赖多个 '父 part' 同时计算得到。shuffle 操作: xxBy, join 等:子 part 出错 找 "对应"(多个)父 part 重新共同计算。

stage:

遇到 1 个宽依赖,就会做 shuffle 操作。然后就会把 "之前" 的“所有窄依赖”划分为 "1 个 stage"。最后,整体全部,也当作 "1 个 stage"。

官档图

传送门:http://spark.apache.org/docs/latest/cluster-overview.html

RDD 两大算子

Transformation(Lazy)

主要机制:各种操作不会被立刻执行,但这些操作之间的关系会被记录下来,等待下面 action 调用。
直观理解举例:

1. 像 sqlalchemy 中的 filter(), groupby(), page()等操作
2. 像 tensorflow1.x 中的 sess.run() 之前的各种操作
3. 像 数据库的事务,在提交之前的各种操作

接下来介绍,Transformation 的各种操作。

map

同 python 的 map。你只需记住 RDD 类型里面包裹的就是我们熟悉的 python 类型
所以:python 的 map 怎么用,RDD 对象的 map 就怎么用,下面 filter 同理

只举一个语法格式例子:(下面同理)rdd.map(lambda x:x+1)

filter

同上,同 python

flatMap

和 map 几乎差不多。唯一有一点区别:map 每次基于单个元素,返回什么,那最终结果就是什么(最后拼成序列)。flatMap 每次基于单个元素,若返回的是序列(列表等),那么会自动被解包,并一字排开返回。

groupBy 和 groupByKey

说一下 没有 key, 和 带有 key 的区别(后面同理,就不啰嗦了):

没有 key:
    1. 一般必须需要一个 函数句柄 (lambda), 而这个句柄是针对(操作后新形成的 key)使用的
    2. 针对一层序列   [, , ...]
带有 key
    1. 一般无参    
    2. 针对双层序列   [(),(),...]

直接上例子了(对比着看):

rdd1 = sc.parallelize(['a','b','c','a'])                      # 一层序列
rdd2 = sc.parallelize([('a',1),('b',2), ('c',3), ('a',4)] )  # 双层序列

group1 = rdd1.groupBy(lambda x:x)   # 针对 一层序列,注意这里,必须写 函数句柄
group2 = rdd2.groupByKey()          # 针对 双层序列

print(group1.collect() )
print(group2.collect() )

# 可以这样告诉你,他们俩的最外层结果是一样的: [{key:value}, ...], 
结果如下 ~~~~
[('a', <pyspark.resultiterable.ResultIterable object at 0x7fb7e4384c88>),     
    ('b', <pyspark.resultiterable.ResultIterable object at 0x7fb7e43848d0>),
    ('c', <pyspark.resultiterable.ResultIterable object at 0x7fb7e4384940>)
]
# 如果加了 count(), 那么它们的结果就是一样的了,返回统计的个数,等到 action 再说。

reduceByKey

照应双层或多层序列,或者 承接 groupByKey()

rdd = sc.parallelize(['Tom', 'Jerry', 'Tom', 'Putch'])
rdd.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).collect()

# 结果(可以忽略上面的 collect(), 它属于 action,放在这里方便贴个结果)>> [('Tom', 2), ('Jerry', 1), ('Putch', 1)]

sortBy 和 sortByKey

同上,这里只说下(这两种只能对 key 排序):
    默认升序
    降序可指定参数:ascending=False
    
另一种业务:(基于 value 的排序):可以尝试先用 map-lambda 交换元素次序,然后再排序,最后再用一次 map-lambda 交换回来

union

rdd1.union(rdd2)    # 相当于 python 的 "列表加法" 或者 python 的 "extend"

distinct

rdd.distinct()      # 去重

join

前提:(我的理解就是,能转化成 python 字典的列表格式即可)

eg:  [[1,2], [3,4], [5,6] ]  
两层列表
每层列表的每个元素中,只有 2 个元素

错误格式示例:[['a','b','c'], ['d','e','f']]

也不能说错误吧,不过若是这种 3 个 - 多个子元素的格式,join 时默认会取前 2 个元素。其余丢弃。

内连接(innerJoin):
左外连接(leftOuterJoin):
右外连接(rightOuterJoin):
全外连接(fullOuterJoin):

完整示例:

rdd1 = sc.parallelize([['a','b'], ['d','e']] )       # 左
rdd2 = sc.parallelize([['a','c'], ['e','f']] )       # 右
# 开头说过: 能转化成字典的列表格式即可,或者你可以写成这样(但是不能传原生字典进去):rdd1 = sc.parallelize(list({'a': 'b', 'd': 'e'}.items()) )
rdd2 = sc.parallelize(list({'a': 'c', 'e': 'f'}.items()) )
  

# 内连接(交集)print(rdd1.join(rdd2).collect())             # [('a', ('b', 'c'))]

# 左连接(左并集)print(rdd1.leftOuterJoin(rdd2).collect())    # [('d', ('e', None)), ('a', ('b', 'c'))]

# 右连接(右并集)print(rdd1.rightOuterJoin(rdd2).collect())   # [('a', ('b', 'c')), ('e', (None, 'f'))]

# 全连接(并集)print(rdd1.fullOuterJoin(rdd2).collect())    # [('d', ('e', None)), ('a', ('b', 'c')), ('e', (None, 'f'))]

persist & cache & unpersist

cache(): 缓存
persist(): 持久化
unpersist(): 清空缓存(他属于 action- 立即触发,为了方便对比,我就一起放到了这里)
官档:http://spark.apache.org/docs/…

Action (Commit)

主要机制:拿到 transformation 记录的关系,用 action 的各种操作来真正触发、执行、返回结果。
对应上面,继续直观举例:

1. 像 sqlalchemy 中的 commit()
2. 像 tensorflow1.x 中的 sess.run()
3. 像 数据库的事务的 "提交"

接下来介绍,Action 的各种操作。

collect

执行 transformation 记录的关系 并 返回结果,在 Pyspark 中就是 RDD 类型 转 Python 数据类型。

(中间你可以链式调用各种 transformation 方法,结尾调用一个 collect(), 就可以出结果了)
rdd1.xx().xx().collect()

count

统计元素项的个数,同上语法, 同上理念,触发返回结果

rdd2 = sc.parallelize([['a','c','d'], ['e','f','g']] )
rdd2.count()  # 无参
>> 2

reduce

rdd2 = sc.parallelize([['a','c','d'], ['e','f','g']] )
rdd2.reduce(lambda x,y:x+y)    # 参数为 2 个参数的函数句柄,做 "累" 的操作,(累加,累乘)等
>> ['a', 'c', 'd', 'e', 'f', 'g']

take

相当于 mysql 的 limit 操作,取前 n 个

rdd2 = sc.parallelize([['a','c','d'], ['e','f','g']] )
rdd2.take(0)  # []
rdd2.take(1)  # [['a', 'c', 'd']]
rdd2.take(2)  # [['a', 'c', 'd'], ['e', 'f', 'g']]

再次强调:take 的参数是,个数的意思,而不是索引,不要混淆额

top

返回最大的 n 个元素(会自动给你排序的)

rdd2 = sc.parallelize([1,2,3,8,5,3,6,8])  
rdd2.top(3)
>> [8, 8, 6]

foreach

遍历每个元素,对子元素做 - 对应函数句柄的操作,下面说这个 action 的两点注意事项:
注意 1:无返回值(返回 None)
注意 2:通常用作 print(), 但是它不会在 notebook 中打印,而是在你后台开启的 spark 中打印。

rdd2 = sc.parallelize([['a','c','d'], ['e','f','g']] )  
rdd2.foreach(lambda x:print(x))

>> ['a', 'c', 'd']
   ['e', 'f', 'g']

saveAsTextFile

rdd = sc.textFile('file:///home/lin/data')
rdd.saveAsTextFile('file:///home/lin/mydata')  

# 这里有个注意事项:saveAsTextFile 的参数路径不能在都进来的路径范围内。# 或者说,读是从这个文件夹 A(这是最后一级的目录)读进来的,写就不能写入文件夹 A 了
# 另外,mydata 是目录名,进去你会看见 part-00000  这样的文件名,这才是真数据文件。

Spark 优化相关

序列化:

好处 1:网络传输必备
好处 2:节省内存
两种方式序列化方式:1. Java 内部序列化(默认,较慢,但兼容性好)2. Kryo(较快,但兼容性不太好)

内存管理:

可分为 execution(进程执行)和 storage(存储)

execution 相关操作: shuffle, join, sort, aggregation
storage 相关操作  :   cache,

特点:

execution 和 storage 共享整体内存:execution 起到 "存霸" 的角色:
1. 若 execution 区域内存 不够用了,它会去抢夺 storage 区域的内存(不归还)2. 当然,可以为 storage 设置阈值(必须给 storage 留下多少)

具体分配多少:

总内存 = n
execution 内存 = (总内存 - 300M) * 50%
storage 内存   = (总内存 - 300M) * 50%

说白了,就是留给 JVM 300M,然后 execution 和 storage 各分一半。

查看内存占用情况

可通过 WebUI 查看(序列化后存储,通常会节省内存)

Broadcasting Variable

情景:正常来说,每个 task(map, filter 等)都会占用 1 份数据,100 个 task 就会拿 100 份数据。
这种情况造成了数据的冗余,BroadCasting Variable(广播变量)就是解决这一问题的。

退出移动版