关于人工智能:大数据开发Pandas转spark无痛指南⛵

47次阅读

共计 6816 个字符,预计需要花费 18 分钟才能阅读完成。

Pandas 灵便弱小,是数据分析必备工具库!但解决大型数据集时,需过渡到 PySpark 才能够施展并行计算的劣势。本文总结了 Pandas 与 PySpark 的外围性能代码段,把握即可丝滑切换。


💡 作者:韩信子 @ShowMeAI
📘 大数据技术◉技能晋升系列:https://www.showmeai.tech/tutorials/84
📘 数据分析实战系列:https://www.showmeai.tech/tutorials/40
📘 本文地址:https://www.showmeai.tech/article-detail/338
📢 申明:版权所有,转载请分割平台与作者并注明出处
📢 珍藏 ShowMeAI 查看更多精彩内容

Pandas 是每位数据科学家和 Python 数据分析师都相熟的工具库,它灵便且弱小具备丰盛的性能,但在解决大型数据集时,它是十分受限的。

这种状况下,咱们会过渡到 PySpark,联合 Spark 生态弱小的大数据处理能力,充分利用多机器并行的计算能力,能够减速计算。不过 PySpark 的语法和 Pandas 差别也比拟大,很多开发人员会感觉这很让人头大。

在本篇内容中,ShowMeAI 将对最外围的数据处理和剖析性能,梳理 PySpark 和 Pandas 绝对应的代码片段,以便大家能够无痛地实现 Pandas 到大数据 PySpark 的转换😉

大数据处理剖析及机器学习建模相干常识,ShowMeAI 制作了具体的教程与工具速查手册,大家能够通过如下内容开展学习或者回顾相干常识。

📘图解数据分析:从入门到精通系列教程

📘图解大数据技术:从入门到精通系列教程

📘图解机器学习算法:从入门到精通系列教程

📘数据迷信工具库速查表 | Spark RDD 速查表

📘数据迷信工具库速查表 | Spark SQL 速查表

💡 导入工具库

在应用具体性能之前,咱们须要先导入所需的库:

# pandas vs pyspark,工具库导入
import pandas as pd
import pyspark.sql.functions as F

PySpark 所有性能的入口点是 SparkSession 类。通过 SparkSession 实例,您能够创立 spark dataframe、利用各种转换、读取和写入文件等,上面是定义 SparkSession 的代码模板:

from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName('SparkByExamples.com')\
.getOrCreate()

💡 创立 dataframe

在 Pandas 和 PySpark 中,咱们最不便的数据承载数据结构都是 dataframe,它们的定义有一些不同,咱们来比照一下看看:

💦 Pandas

columns = ["employee","department","state","salary","age"]
data = [("Alain","Sales","Paris",60000,34),
        ("Ahmed","Sales","Lyon",80000,45),
        ("Ines","Sales","Nice",55000,30),
        ("Fatima","Finance","Paris",90000,28),
        ("Marie","Finance","Nantes",100000,40)]

创立 DataFrame 的 Pandas 语法如下:

df = pd.DataFrame(data=data, columns=columns)
# 查看头 2 行
df.head(2)

💦 PySpark

创立 DataFrame 的 PySpark 语法如下:

df = spark.createDataFrame(data).toDF(*columns)
# 查看头 2 行
df.limit(2).show()

💡 指定列类型

💦 Pandas

Pandas 指定字段数据类型的办法如下:

types_dict = {"employee": pd.Series([r[0] for r in data], dtype='str'),
    "department": pd.Series([r[1] for r in data], dtype='str'),
    "state": pd.Series([r[2] for r in data], dtype='str'),
    "salary": pd.Series([r[3] for r in data], dtype='int'),
    "age": pd.Series([r[4] for r in data], dtype='int')
}

df = pd.DataFrame(types_dict)

Pandas 能够通过如下代码来查看数据类型:

df.dtypes

💦 PySpark

PySpark 指定字段数据类型的办法如下:

from pyspark.sql.types import StructType,StructField, StringType, IntegerType

schema = StructType([ \
    StructField("employee",StringType(),True), \
    StructField("department",StringType(),True), \
    StructField("state",StringType(),True), \
    StructField("salary", IntegerType(), True), \
    StructField("age", IntegerType(), True) \
  ])

df = spark.createDataFrame(data=data,schema=schema)

PySpark 能够通过如下代码来查看数据类型:

df.dtypes
# 查看数据类型 
df.printSchema() 

💡 读写文件

Pandas 和 PySpark 中的读写文件形式十分类似。具体语法比照如下:

💦 Pandas

df = pd.read_csv(path, sep=';', header=True)
df.to_csv(path, ';', index=False)

💦 PySpark

df = spark.read.csv(path, sep=';')
df.coalesce(n).write.mode('overwrite').csv(path, sep=';')

留神 ①

PySpark 中能够指定要分区的列:

df.partitionBy("department","state").write.mode('overwrite').csv(path, sep=';')

留神 ②

能够通过下面所有代码行中的 parquet 更改 CSV 来读取和写入不同的格局,例如 parquet 格局

💡 数据抉择 – 列

💦 Pandas

在 Pandas 中抉择某些列是这样实现的:

columns_subset = ['employee', 'salary']

df[columns_subset].head()

df.loc[:, columns_subset].head()

💦 PySpark

在 PySpark 中,咱们须要应用带有列名列表的 select 办法来进行字段抉择:

columns_subset = ['employee', 'salary']

df.select(columns_subset).show(5)

💡 数据抉择 – 行

💦 Pandas

Pandas 能够应用 iloc对行进行筛选:

# 头 2 行
df.iloc[:2].head()

💦 PySpark

在 Spark 中,能够像这样抉择前 n 行:

df.take(2).head()
# 或者
df.limit(2).head()

留神:应用 spark 时,数据可能散布在不同的计算节点上,因而“第一行”可能会随着运行而变动。

💡 条件抉择

💦 Pandas

Pandas 中依据特定条件过滤数据 / 抉择数据的语法如下:

# First method
flt = (df['salary'] >= 90_000) & (df['state'] == 'Paris')
filtered_df = df[flt]

# Second Method: Using query which is generally faster
filtered_df = df.query('(salary >= 90_000) and (state =="Paris")')
# Or
target_state = "Paris"
filtered_df = df.query('(salary >= 90_000) and (state == @target_state)')

💦 PySpark

在 Spark 中,应用 filter办法或执行 SQL 进行数据抉择。语法如下:

# 办法 1:基于 filter 进行数据抉择
filtered_df = df.filter((F.col('salary') >= 90_000) & (F.col('state') == 'Paris'))

# 或者
filtered_df = df.filter(F.expr('(salary >= 90000) and (state =="Paris")'))

# 办法 2:基于 SQL 进行数据抉择
df.createOrReplaceTempView("people")

filtered_df = spark.sql("""
SELECT * FROM people
WHERE (salary >= 90000) and (state == "Paris")
""") 

💡 增加字段

💦 Pandas

在 Pandas 中,有几种增加列的办法:

seniority = [3, 5, 2, 4, 10]
# 办法 1
df['seniority'] = seniority

# 办法 2
df.insert(2, "seniority", seniority, True)

💦 PySpark

在 PySpark 中有一个特定的办法 withColumn 可用于增加列:

seniority = [3, 5, 2, 4, 10]
df = df.withColumn('seniority', seniority)

💡 dataframe 拼接

💦 2 个 dataframe – pandas

# pandas 拼接 2 个 dataframe
df_to_add = pd.DataFrame(data=[("Robert","Advertisement","Paris",55000,27)], columns=columns)
df = pd.concat([df, df_to_add], ignore_index = True)

💦 2 个 dataframe – PySpark

# PySpark 拼接 2 个 dataframe
df_to_add = spark.createDataFrame([("Robert","Advertisement","Paris",55000,27)]).toDF(*columns)
df = df.union(df_to_add)

💦 多个 dataframe – pandas

# pandas 拼接多个 dataframe
dfs = [df, df1, df2,...,dfn]
df = pd.concat(dfs, ignore_index = True)

💦 多个 dataframe – PySpark

PySpark 中 unionAll 办法只能用来连贯两个 dataframe。咱们应用 reduce 办法配合 unionAll 来实现多个 dataframe 拼接:

# pyspark 拼接多个 dataframe
from functools import reduce
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

dfs = [df, df1, df2,...,dfn]
df = unionAll(*dfs)

💡 简略统计

Pandas 和 PySpark 都提供了为 dataframe 中的每一列进行统计计算的办法,能够轻松对下列统计值进行统计计算:

  • 列元素的计数
  • 列元素的平均值
  • 最大值
  • 最小值
  • 标准差
  • 三个分位数:25%、50% 和 75%

Pandas 和 PySpark 计算这些统计值的办法很相似,如下:

💦 Pandas & PySpark

df.summary()
#或者
df.describe()

💡 数据分组聚合统计

Pandas 和 PySpark 分组聚合的操作也是十分相似的:

💦 Pandas

df.groupby('department').agg({'employee': 'count', 'salary':'max', 'age':'mean'})

💦 PySpark

df.groupBy('department').agg({'employee': 'count', 'salary':'max', 'age':'mean'})

然而,最终显示的后果须要一些调整能力统一。

在 Pandas 中,要分组的列会主动成为索引,如下所示:

要将其作为列复原,咱们须要利用 reset_index办法:

df.groupby('department').agg({'employee': 'count', 'salary':'max', 'age':'mean'}).reset_index()

在 PySpark 中,列名会在后果 dataframe 中被重命名,如下所示:

要复原列名,能够像上面这样应用别名办法:

df.groupBy('department').agg(F.count('employee').alias('employee'), F.max('salary').alias('salary'), F.mean('age').alias('age'))

💡 数据转换

在数据处理中,咱们常常要进行数据变换,最常见的是要对「字段 / 列」利用特定转换,在 Pandas 中咱们能够轻松基于 apply 函数实现,但在 PySpark 中咱们能够应用udf(用户定义的函数)封装咱们须要实现的变换的 Python 函数。

例如,咱们对 salary 字段进行解决,如果工资低于 60000,咱们须要减少工资 15%,如果超过 60000,咱们须要减少 5%。

💦 Pandas

Pandas 中的语法如下:

df['new_salary'] = df['salary'].apply(lambda x: x*1.15 if x<= 60000 else x*1.05)

💦 Pyspark

PySpark 中的等价操作下:

from pyspark.sql.types import FloatType

df.withColumn('new_salary', F.udf(lambda x: x*1.15 if x<= 60000 else x*1.05, FloatType())('salary'))

⚠️ 请留神,udf办法须要明确指定数据类型(在咱们的例子中为 FloatType)

💡 总结

本篇内容中,ShowMeAI 给大家总结了 Pandas 和 PySpark 对应的性能操作细节,咱们能够看到 Pandas 和 PySpark 的语法有很多相似之处,然而要留神一些细节差别。

另外,大家还是要基于场景进行适合的工具抉择:

  • 在解决大型数据集时,应用 PySpark 能够为您提供很大的劣势,因为它容许并行计算。
  • 如果您正在应用的数据集很小,那么应用 Pandas 会很快和灵便。

参考资料

  • 📘 图解数据分析:从入门到精通系列教程:https://www.showmeai.tech/tutorials/33
  • 📘 图解大数据技术:从入门到精通系列教程:https://www.showmeai.tech/tutorials/84
  • 📘 图解机器学习算法:从入门到精通系列教程:https://www.showmeai.tech/tutorials/34
  • 📘 数据迷信工具库速查表 | Spark RDD 速查表:https://www.showmeai.tech/article-detail/106
  • 📘 数据迷信工具库速查表 | Spark SQL 速查表:https://www.showmeai.tech/article-detail/107

举荐浏览

  • 🌍 数据分析实战系列:https://www.showmeai.tech/tutorials/40
  • 🌍 机器学习数据分析实战系列:https://www.showmeai.tech/tutorials/41
  • 🌍 深度学习数据分析实战系列:https://www.showmeai.tech/tutorials/42
  • 🌍 TensorFlow 数据分析实战系列:https://www.showmeai.tech/tutorials/43
  • 🌍 PyTorch 数据分析实战系列:https://www.showmeai.tech/tutorials/44
  • 🌍 NLP 实战数据分析实战系列:https://www.showmeai.tech/tutorials/45
  • 🌍 CV 实战数据分析实战系列:https://www.showmeai.tech/tutorials/46
  • 🌍 AI 面试题库系列:https://www.showmeai.tech/tutorials/48

正文完
 0