问题引出
python map 带 for 循环的状况 兴许 很 easy!但 spark 的 mapPartitions 晕了吗?其实这两种是一样的场景。。。
注释试验(可疏忽,直奔后果)
为了不便,用把组装的数据类型灌入 map 来模仿 mapPartitions
性能是模拟计算 tf-idf
class Article:
'''文章类'''
def __init__(self,id, indexex, tfidfs):
self.id = id
self.indexex = indexex # 文章分词后的所有词索引列表
self.tfidfs = tfidfs # 每个词对应的 TF-IDF 值 列表
def f(partition):
for row in partition: # row 代表每个文章
# row.indexex 代表 文章分词后的所有词索引列表
# row.tfidfs 代表 每个词对应的 TF-IDF 值 列表
word_list = list(zip(row.indexex, row.tfidfs))
for index, tfidf in word_list: # 遍历 "每个" 词语 的 index 与 tfidf
########### 这里 yield 是重点 ###########
yield f'文章 {row.id}', index, tfidf
c = map(f,
[ # <- 为了模仿分区,这一层的列表代表 partition
[ # <- 这一层模仿的是每个分区外面的文章列表
Article(0, [1,2],[0.1,0.4] ), # <- 文章 0
Article(1, [3,4],[3.4,3.7] ) # <- 文章 1
]
]
)
######################## 执行 ########################
for x in c: # 解 zip
print(list(x)) # 解 yield
后果:
如果应用 return 关键词,得出的最终打印后果:(不满足)
['文章 0', 1, 0.1]
如果应用 yield 关键词,得出的最终打印后果:(满足)
[('文章 0', 1, 0.1), ('文章 0', 2, 0.4), ('文章 1', 3, 3.4), ('文章 1', 4, 3.7)]
这里就呈现了一个问题:
失常用法都是用 return,时罕用 lambda(lambda 默认也是隐式 return。)
是何原因让咱们不得不用 yield?
一点一点往下推:
map: 外围是 "按单个数据映射"
mapPartition:外围是 "把数据分组,按组映射"
按组映射是没错,但咱们的目标是想操作组内的每条数据。所以咱们必须须要每次对组内数据 for 循环遍历进去独自解决。而后 返回回去。
那咱们先用失常的 return 返回试试:
def(partation):
for x in partition:
return x.name, x.age
兴许看到这里你感觉没什么问题。。。
然而不要忘了最根底的内容,return 是间接跳出 for 循环和函数的。
再次强调,mapPartition 是按组映射,所以认真看下面代码:
最终的 mapPartition 是按组映射后果就是:
每组的第一个元素的汇合(因为 for 被 return 了,每组的函数也被 return 了)
解决这种问题,有两种形式:
- 最简略将源代码 for 循环外部的 return 改为 yield
- 新建长期列表过渡,return 放在 for 里面,如下案例:
def f(partition):
_ = []
for x in partition:
_.append(x)
return _
b = [[1,2,3], [4,5,6]]
c = map(f,b)
print(list(c))
map + yield: