共计 5983 个字符,预计需要花费 15 分钟才能阅读完成。
在 Pandas 2.0 公布当前,咱们公布过一些评测的文章,这次咱们看看,除了 Pandas 以外,罕用的两个都是为了大数据处理的并行数据框架的比照测试。
本文咱们应用两个相似的脚本来执行提取、转换和加载 (ETL) 过程。
测试内容
这两个脚本次要性能包含:
从两个 parquet 文件中提取数据,对于小型数据集,变量 path1 将为“yellow_tripdata/ yellow_tripdata_2014-01”,对于中等大小的数据集,变量 path1 将是“yellow_tripdata/yellow_tripdata”。对于大数据集,变量 path1 将是“yellow_tripdata/yellow_tripdata*.parquet”;
进行数据转换:a)连贯两个 DF,b)依据 PULocationID 计算行程间隔的平均值,c)只抉择某些条件的行,d)将步骤 b 的值四舍五入为 2 位小数,e)将列“trip_distance”重命名为“mean_trip_distance”,f)对列“mean_trip_distance”进行排序
将最终的后果保留到新的文件
脚本
1、Polars
数据加载读取
def extraction(): | |
"""Extract two datasets from parquet files""" | |
path1="yellow_tripdata/yellow_tripdata_2014-01.parquet" | |
df_trips= pl_read_parquet(path1,) | |
path2 = "taxi+_zone_lookup.parquet" | |
df_zone = pl_read_parquet(path2,) | |
return df_trips, df_zone | |
def pl_read_parquet(path,): | |
"""Converting parquet file into Polars dataframe""" | |
df= pl.scan_parquet(path,) | |
return df |
转换函数
def transformation(df_trips, df_zone): | |
"""Proceed to several transformations""" | |
df_trips= mean_test_speed_pl(df_trips,) | |
df = df_trips.join(df_zone,how="inner", left_on="PULocationID", right_on="LocationID",) | |
df = df.select(["Borough","Zone","trip_distance",]) | |
df = get_Queens_test_speed_pd(df) | |
df = round_column(df, "trip_distance",2) | |
df = rename_column(df, "trip_distance","mean_trip_distance") | |
df = sort_by_columns_desc(df, "mean_trip_distance") | |
return df | |
def mean_test_speed_pl(df_pl,): | |
"""Getting Mean per PULocationID""" | |
df_pl = df_pl.groupby('PULocationID').agg(pl.col(["trip_distance",]).mean()) | |
return df_pl | |
def get_Queens_test_speed_pd(df_pl): | |
"""Only getting Borough in Queens""" | |
df_pl = df_pl.filter(pl.col("Borough")=='Queens') | |
return df_pl | |
def round_column(df, column,to_round): | |
"""Round numbers on columns""" | |
df = df.with_columns(pl.col(column).round(to_round)) | |
return df | |
def rename_column(df, column_old, column_new): | |
"""Renaming columns""" | |
df = df.rename({column_old: column_new}) | |
return df | |
def sort_by_columns_desc(df, column): | |
"""Sort by column""" | |
df = df.sort(column, descending=True) | |
return df |
保留
def loading_into_parquet(df_pl): | |
"""Save dataframe in parquet""" | |
df_pl.collect(streaming=True).write_parquet(f'yellow_tripdata_pl.parquet') |
其余代码
import polars as pl | |
import time | |
def pl_read_parquet(path,): | |
"""Converting parquet file into Polars dataframe""" | |
df= pl.scan_parquet(path,) | |
return df | |
def mean_test_speed_pl(df_pl,): | |
"""Getting Mean per PULocationID""" | |
df_pl = df_pl.groupby('PULocationID').agg(pl.col(["trip_distance",]).mean()) | |
return df_pl | |
def get_Queens_test_speed_pd(df_pl): | |
"""Only getting Borough in Queens""" | |
df_pl = df_pl.filter(pl.col("Borough")=='Queens') | |
return df_pl | |
def round_column(df, column,to_round): | |
"""Round numbers on columns""" | |
df = df.with_columns(pl.col(column).round(to_round)) | |
return df | |
def rename_column(df, column_old, column_new): | |
"""Renaming columns""" | |
df = df.rename({column_old: column_new}) | |
return df | |
def sort_by_columns_desc(df, column): | |
"""Sort by column""" | |
df = df.sort(column, descending=True) | |
return df | |
def main(): | |
print(f'Starting ETL for Polars') | |
start_time = time.perf_counter() | |
print('Extracting...') | |
df_trips, df_zone =extraction() | |
end_extract=time.perf_counter() | |
time_extract =end_extract- start_time | |
print(f'Extraction Parquet end in {round(time_extract,5)} seconds') | |
print('Transforming...') | |
df = transformation(df_trips, df_zone) | |
end_transform = time.perf_counter() | |
time_transformation =time.perf_counter() - end_extract | |
print(f'Transformation end in {round(time_transformation,5)} seconds') | |
print('Loading...') | |
loading_into_parquet(df,) | |
load_transformation =time.perf_counter() - end_transform | |
print(f'Loading end in {round(load_transformation,5)} seconds') | |
print(f"End ETL for Polars in {str(time.perf_counter()-start_time)}") | |
if __name__ == "__main__": | |
main() |
2、Dask
函数性能与下面一样,所以咱们把代码整合在一起:
import dask.dataframe as dd | |
from dask.distributed import Client | |
import time | |
def extraction(): | |
path1 = "yellow_tripdata/yellow_tripdata_2014-01.parquet" | |
df_trips = dd.read_parquet(path1) | |
path2 = "taxi+_zone_lookup.parquet" | |
df_zone = dd.read_parquet(path2) | |
return df_trips, df_zone | |
def transformation(df_trips, df_zone): | |
df_trips = mean_test_speed_dask(df_trips) | |
df = df_trips.merge(df_zone, how="inner", left_on="PULocationID", right_on="LocationID") | |
df = df[["Borough", "Zone", "trip_distance"]] | |
df = get_Queens_test_speed_dask(df) | |
df = round_column(df, "trip_distance", 2) | |
df = rename_column(df, "trip_distance", "mean_trip_distance") | |
df = sort_by_columns_desc(df, "mean_trip_distance") | |
return df | |
def loading_into_parquet(df_dask): | |
df_dask.to_parquet("yellow_tripdata_dask.parquet", engine="fastparquet") | |
def mean_test_speed_dask(df_dask): | |
df_dask = df_dask.groupby("PULocationID").agg({"trip_distance": "mean"}) | |
return df_dask | |
def get_Queens_test_speed_dask(df_dask): | |
df_dask = df_dask[df_dask["Borough"] == "Queens"] | |
return df_dask | |
def round_column(df, column, to_round): | |
df[column] = df[column].round(to_round) | |
return df | |
def rename_column(df, column_old, column_new): | |
df = df.rename(columns={column_old: column_new}) | |
return df | |
def sort_by_columns_desc(df, column): | |
df = df.sort_values(column, ascending=False) | |
return df | |
def main(): | |
print("Starting ETL for Dask") | |
start_time = time.perf_counter() | |
client = Client() # Start Dask Client | |
df_trips, df_zone = extraction() | |
end_extract = time.perf_counter() | |
time_extract = end_extract - start_time | |
print(f"Extraction Parquet end in {round(time_extract, 5)} seconds") | |
print("Transforming...") | |
df = transformation(df_trips, df_zone) | |
end_transform = time.perf_counter() | |
time_transformation = time.perf_counter() - end_extract | |
print(f"Transformation end in {round(time_transformation, 5)} seconds") | |
print("Loading...") | |
loading_into_parquet(df) | |
load_transformation = time.perf_counter() - end_transform | |
print(f"Loading end in {round(load_transformation, 5)} seconds") | |
print(f"End ETL for Dask in {str(time.perf_counter() - start_time)}") | |
client.close() # Close Dask Client | |
if __name__ == "__main__": | |
main() |
测试后果比照
1、小数据集
咱们应用 164 Mb 的数据集,这样大小的数据集对咱们来说比拟小,在日常中也时十分常见的。
上面是每个库运行五次的后果:
Polars
Dask
2、中等数据集
咱们应用 1.1 Gb 的数据集,这种类型的数据集是 GB 级别,尽管能够残缺的加载到内存中,然而数据体量要比小数据集大很多。
Polars
Dask
3、大数据集
咱们应用一个 8gb 的数据集,这样大的数据集可能一次性加载不到内存中,须要框架的解决。
Polars
Dask
总结
从后果中能够看出,Polars 和 Dask 都能够应用惰性求值。所以读取和转换十分快,执行它们的工夫简直不随数据集大小而变动;
能够看到这两个库都十分善于解决中等规模的数据集。
因为 polar 和 Dask 都是应用惰性运行的,所以上面展现了残缺 ETL 的后果(均匀运行 5 次)。
Polars 在小型数据集和中型数据集的测试中都获得了胜利。然而,Dask 在大型数据集上的均匀工夫性能为 26 秒。
这可能和 Dask 的并行计算优化无关,因为官网的文档说“Dask 工作的运行速度比 Spark ETL 查问快三倍,并且应用更少的 CPU 资源”。
下面是测试应用的电脑配置,Dask 在计算时占用的 CPU 更多,能够说并行性能更好。
https://avoid.overfit.cn/post/74128cd8803b43f2a51ca4ff4fed4a95
作者:Luís Oliveira