乐趣区

关于python:PY-用python的map模拟Spark的mapPartitions

问题引出

python map 带 for 循环的状况 兴许 很 easy!但 spark 的 mapPartitions 晕了吗?其实这两种是一样的场景。。。

注释试验(可疏忽,直奔后果)

为了不便,用把组装的数据类型灌入 map 来模仿 mapPartitions
性能是模拟计算 tf-idf

class Article:
    '''文章类'''
    def __init__(self,id, indexex, tfidfs):
        self.id = id
        self.indexex = indexex   # 文章分词后的所有词索引列表
        self.tfidfs = tfidfs     # 每个词对应的 TF-IDF 值 列表

def f(partition):
    for row in partition:   # row 代表每个文章
        # row.indexex 代表 文章分词后的所有词索引列表
        # row.tfidfs  代表 每个词对应的 TF-IDF 值 列表
        word_list = list(zip(row.indexex, row.tfidfs))  
        
        for index, tfidf in word_list:    # 遍历 "每个" 词语 的 index 与 tfidf
            ########### 这里 yield 是重点 ###########
            yield f'文章 {row.id}', index, tfidf
            

c = map(f, 
    [   #  <- 为了模仿分区,这一层的列表代表 partition
        [   # <- 这一层模仿的是每个分区外面的文章列表
            Article(0, [1,2],[0.1,0.4] ),   # <- 文章 0
            Article(1, [3,4],[3.4,3.7] )    # <- 文章 1
        ] 
    ] 
)
######################## 执行 ########################
for x in c:              # 解 zip
    print(list(x))       # 解 yield

后果:

如果应用 return 关键词,得出的最终打印后果:(不满足)

['文章 0', 1, 0.1]          

如果应用 yield 关键词,得出的最终打印后果:(满足)

[('文章 0', 1, 0.1), ('文章 0', 2, 0.4), ('文章 1', 3, 3.4), ('文章 1', 4, 3.7)]

这里就呈现了一个问题:

失常用法都是用 return,时罕用 lambda(lambda 默认也是隐式 return。)
是何原因让咱们不得不用 yield?
一点一点往下推:

map: 外围是 "按单个数据映射"
mapPartition:外围是 "把数据分组,按组映射"
    按组映射是没错,但咱们的目标是想操作组内的每条数据。所以咱们必须须要每次对组内数据 for 循环遍历进去独自解决。而后 返回回去。

那咱们先用失常的 return 返回试试:

def(partation):
    for x in partition:
        return x.name, x.age

兴许看到这里你感觉没什么问题。。。
然而不要忘了最根底的内容,return 是间接跳出 for 循环和函数的。
再次强调,mapPartition 是按组映射,所以认真看下面代码:
最终的 mapPartition 是按组映射后果就是:

 每组的第一个元素的汇合(因为 for 被 return 了,每组的函数也被 return 了)

解决这种问题,有两种形式:

  1. 最简略将源代码 for 循环外部的 return 改为 yield
  2. 新建长期列表过渡,return 放在 for 里面,如下案例:
def f(partition):
    _ = []
    for x in partition:
            _.append(x)
        return _
b = [[1,2,3], [4,5,6]]
c = map(f,b)
print(list(c)) 

map + yield:

退出移动版