作者|Aaron Richter
编译|VK
起源|Towards Data Science

随机森林是一种机器学习算法,以其鲁棒性、准确性和可扩展性而受到许多数据科学家的信赖。

该算法通过bootstrap聚合训练出多棵决策树,而后通过集成对输入进行预测。因为其集成特色的特点,随机森林是一种能够在分布式计算环境中实现的算法。树能够在集群中跨过程和机器并行训练,后果比应用单个过程的训练工夫快得多。

在本文中,咱们摸索了应用Apache Spark在CPU机器集群上实现分布式随机森林训练,并将其与应用NVIDIA RAPIDS和Dask的GPU机器集群上的训练性能进行了比拟。

尽管GPU计算传统上是为深度学习利用而保留的,但RAPIDS是一个在GPU上执行数据处理和非深度学习ML工作的库,与在cpu上执行相比,它能够大大提高性能。

咱们应用3亿个实例训练了一个随机森林模型:Spark在20个节点CPU集群上耗时37分钟,而RAPIDS在20个节点GPU集群上耗时1秒。GPU的速度进步了2000倍以上!

试验概述

咱们应用公共可用的纽约出租车数据集,并训练一个随机森林回归器,该回归器能够应用与乘客接送相干的属性来预测出租车的票价金额。以2017年、2018年和2019年的出租车出行量为训练集,共计300700143个实例。

数据集链接:https://www1.nyc.gov/site/tlc...

Spark和RAPIDS代码能够在Jupyter Notebook中找到。

硬件

Spark集群应用Amazon EMR进行治理,而Dask/RAPIDS集群则应用Saturn Cloud进行治理。

两个集群都有20个工作节点,具备以下AWS实例类型:

Spark:r5.2xlarge

  • 8个CPU,64 GB RAM
  • 按需价格:0.504美元/小时

RAPIDS:g4dn.xlarge

  • 4个CPU,16 GB RAM
  • 1个GPU,16 GB GPU RAM(NVIDIA T4)
  • 按需价格:0.526美元/小时

Saturn Cloud也能够用NVIDIA特斯拉V100 GPU来启动Dask集群,但咱们在这个练习中抉择了g4dn.xlarge,放弃与Spark集群类似的小时老本详情。

Spark

Apache Spark是一个在Scala中构建的开源大数据处理引擎,它有一个Python接口,能够调用Scala/JVM代码。

它是Hadoop解决生态系统中的一个重要组成部分,围绕MapReduce范例构建,并且具备用于数据帧和机器学习的接口。

设置Spark集群不在本文的探讨范畴之内,然而一旦筹备好集群,就能够在Jupyter Notebook中运行以下命令来初始化Spark:

import findsparkfindspark.init()from pyspark.sql import SparkSessionspark = (SparkSession        .builder        .config('spark.executor.memory', '36g')        .getOrCreate())

findspark包检测零碎上的Spark装置地位;如果能够晓得Spark包的装置地位,则可能不须要这样做。

要取得有性能的Spark代码,须要设置几个配置设置,这取决于集群设置和工作流。在这种状况下,咱们设置spark.executor.memory以确保咱们不会遇到任何内存溢出或Java堆谬误。

RAPIDS

NVIDIA RAPIDS是一个开源的Python框架,它在gpu而不是cpu上执行数据迷信代码。相似于在训练深度学习模型时所看到的,这将为数据迷信工作带来微小的性能晋升。

RAPIDS有数据帧、ML、图形剖析等接口。RAPIDS应用Dask来解决与具备多个gpu的机器的并行化,以及每个具备一个或多个gpu的机器集群。

设置GPU机器可能有点辣手,然而Saturn Cloud曾经为启动GPU集群预构建了映像,所以你只需几分钟就能够启动并运行了!要初始化指向群集的Dask客户端,能够运行以下命令:

from dask.distributed import Clientfrom dask_saturn import SaturnClustercluster = SaturnCluster()client = Client(cluster)

要本人设置Dask集群,请参阅此docs页面:https://docs.dask.org/en/late...

数据加载

数据文件托管在一个公共的S3 bucket上,因而咱们能够间接从那里读取csv。S3 bucket的所有文件都在同一个目录中,所以咱们应用s3fs来抉择咱们想要的文件:

import s3fsfs = s3fs.S3FileSystem(anon=True)files = [f"s3://{x}" for x in fs.ls('s3://nyc-tlc/trip data/')         if 'yellow' in x and ('2019' in x or '2018' in x or '2017' in x)]         cols = ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance',      'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount',      'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount']

应用Spark,咱们须要独自读取每个CSV文件,而后将它们组合在一起:

import functoolsfrom pyspark.sql.types import *import pyspark.sql.functions as Ffrom pyspark.sql import DataFrame# 手动指定模式,因为read.csv中的inferSchema十分慢schema = StructType([    StructField('VendorID', DoubleType()),    StructField('tpep_pickup_datetime', TimestampType()),    ...    # 参考notebook取得残缺对象模式]) def read_csv(path):    df = spark.read.csv(path,                        header=True,                        schema=schema,                        timestampFormat='yyyy-MM-dd HH:mm:ss',                       )    df = df.select(cols)    return dfdfs = []for tf in files:    df = read_csv(tf)    dfs.append(df)taxi = functools.reduce(DataFrame.unionAll, dfs)taxi.count()

应用Dask+RAPIDS,咱们能够一次性读取所有CSV文件:

import dask_cudftaxi = dask_cudf.read_csv(files,                           assume_missing=True,                          parse_dates=[1,2],                           usecols=cols,                           storage_options={'anon': True})len(taxi)

特色工程

咱们将依据工夫生成一些特色,而后保留数据帧。在这两个框架中,这将执行所有CSV加载和预处理,并将后果存储在RAM中(在RAPIDS的状况下是GPU RAM)。咱们将用于训练的特色包含:

features = ['pickup_weekday', 'pickup_hour', 'pickup_minute',            'pickup_week_hour', 'passenger_count', 'VendorID',             'RatecodeID', 'store_and_fwd_flag', 'PULocationID',             'DOLocationID']

对于Spark,咱们须要将特色收集到向量类中:

from pyspark.ml.feature import VectorAssemblerfrom pyspark.ml.pipeline import Pipelinetaxi = taxi.withColumn('pickup_weekday', F.dayofweek(taxi.tpep_pickup_datetime).cast(DoubleType()))taxi = taxi.withColumn('pickup_hour', F.hour(taxi.tpep_pickup_datetime).cast(DoubleType()))taxi = taxi.withColumn('pickup_minute', F.minute(taxi.tpep_pickup_datetime).cast(DoubleType()))taxi = taxi.withColumn('pickup_week_hour', ((taxi.pickup_weekday * 24) + taxi.pickup_hour).cast(DoubleType()))taxi = taxi.withColumn('store_and_fwd_flag', F.when(taxi.store_and_fwd_flag == 'Y', 1).otherwise(0))taxi = taxi.withColumn('label', taxi.total_amount)  taxi = taxi.fillna(-1)assembler = VectorAssembler(    inputCols=features,    outputCol='features',)pipeline = Pipeline(stages=[assembler])assembler_fitted = pipeline.fit(taxi)X = assembler_fitted.transform(taxi)X.cache()X.count()

对于RAPIDS,咱们将所有浮点值转换为float32,以便进行GPU计算:

from dask import persistfrom dask.distributed import waittaxi['pickup_weekday'] = taxi.tpep_pickup_datetime.dt.weekdaytaxi['pickup_hour'] = taxi.tpep_pickup_datetime.dt.hourtaxi['pickup_minute'] = taxi.tpep_pickup_datetime.dt.minutetaxi['pickup_week_hour'] = (taxi.pickup_weekday * 24) + taxi.pickup_hourtaxi['store_and_fwd_flag'] = (taxi.store_and_fwd_flag == 'Y').astype(float)taxi = taxi.fillna(-1)X = taxi[features].astype('float32')y = taxi['total_amount']X, y = persist(X, y)_ = wait([X, y])len(X)

训练随机森林

咱们只须要几行代码就能够训练随机森林。

Spark:

from pyspark.ml.regression import RandomForestRegressorrf = RandomForestRegressor(numTrees=100, maxDepth=10, seed=42)fitted = rf.fit(X)

RAPIDS:

from cuml.dask.ensemble import RandomForestRegressorrf = RandomForestRegressor(n_estimators=100, max_depth=10, seed=42)_ = rf.fit(X, y)

后果

咱们对Spark(CPU)和RAPIDS(GPU)集群上的300700143个纽约出租车数据实例训练了一个随机森林模型。两个集群都有20个工作节点,每小时价格大致相同。以下是工作流每个局部的后果:

TaskSparkRAPIDS
Load/rowcount20.6 seconds25.5 seconds
Feature engineering54.3 seconds23.1 seconds
Random forest36.9 minutes1.02 seconds

37分钟的Spark 与1秒的RAPIDS

GPU胜利!想一想,一次拟合你不须要期待37分钟了,这将放慢之后迭代和改良模型的速度。而在CPU上,一旦增加了超参数调优或测试不同的模型,迭代都很容易累积到数小时或数天。

你须要看到能力置信吗?你能够在这里找到Notebook,而后本人运行测试:https://github.com/saturnclou...

你须要更快的随机森林吗

对!你能够在几秒钟内用Saturn Cloud进入Dask/RAPIDS。Saturn解决所有工具基础设施、安全性和部署方面的难题,让你立刻启动并运行RAPIDS。点击这里在你的AWS帐户收费试用Saturn:https://manager.aws.saturnent...

原文链接:https://towardsdatascience.co...

欢送关注磐创AI博客站:
http://panchuang.net/

sklearn机器学习中文官网文档:
http://sklearn123.com/

欢送关注磐创博客资源汇总站:
http://docs.panchuang.net/