关于人工智能:GPU上的随机森林比Apache-Spark快2000倍

40次阅读

共计 5621 个字符,预计需要花费 15 分钟才能阅读完成。

作者 |Aaron Richter
编译 |VK
起源 |Towards Data Science

随机森林是一种机器学习算法,以其鲁棒性、准确性和可扩展性而受到许多数据科学家的信赖。

该算法通过 bootstrap 聚合训练出多棵决策树,而后通过集成对输入进行预测。因为其集成特色的特点,随机森林是一种能够在分布式计算环境中实现的算法。树能够在集群中跨过程和机器并行训练,后果比应用单个过程的训练工夫快得多。

在本文中,咱们摸索了应用 Apache Spark 在 CPU 机器集群上实现分布式随机森林训练,并将其与应用 NVIDIA RAPIDS 和 Dask 的 GPU 机器集群上的训练性能进行了比拟。

尽管 GPU 计算传统上是为深度学习利用而保留的,但 RAPIDS 是一个在 GPU 上执行数据处理和非深度学习 ML 工作的库,与在 cpu 上执行相比,它能够大大提高性能。

咱们应用 3 亿个实例训练了一个随机森林模型:Spark 在 20 个节点 CPU 集群上耗时 37 分钟,而 RAPIDS 在 20 个节点 GPU 集群上耗时 1 秒。GPU 的速度进步了 2000 倍以上!

试验概述

咱们应用公共可用的纽约出租车数据集,并训练一个随机森林回归器,该回归器能够应用与乘客接送相干的属性来预测出租车的票价金额。以 2017 年、2018 年和 2019 年的出租车出行量为训练集,共计 300700143 个实例。

数据集链接:https://www1.nyc.gov/site/tlc…

Spark 和 RAPIDS 代码能够在 Jupyter Notebook 中找到。

硬件

Spark 集群应用 Amazon EMR 进行治理,而 Dask/RAPIDS 集群则应用 Saturn Cloud 进行治理。

两个集群都有 20 个工作节点,具备以下 AWS 实例类型:

Spark:r5.2xlarge

  • 8 个 CPU,64 GB RAM
  • 按需价格:0.504 美元 / 小时

RAPIDS:g4dn.xlarge

  • 4 个 CPU,16 GB RAM
  • 1 个 GPU,16 GB GPU RAM(NVIDIA T4)
  • 按需价格:0.526 美元 / 小时

Saturn Cloud 也能够用 NVIDIA 特斯拉 V100 GPU 来启动 Dask 集群,但咱们在这个练习中抉择了 g4dn.xlarge,放弃与 Spark 集群类似的小时老本详情。

Spark

Apache Spark 是一个在 Scala 中构建的开源大数据处理引擎,它有一个 Python 接口,能够调用 Scala/JVM 代码。

它是 Hadoop 解决生态系统中的一个重要组成部分,围绕 MapReduce 范例构建,并且具备用于数据帧和机器学习的接口。

设置 Spark 集群不在本文的探讨范畴之内,然而一旦筹备好集群,就能够在 Jupyter Notebook 中运行以下命令来初始化 Spark:

import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = (SparkSession
        .builder
        .config('spark.executor.memory', '36g')
        .getOrCreate())

findspark 包检测零碎上的 Spark 装置地位;如果能够晓得 Spark 包的装置地位,则可能不须要这样做。

要取得有性能的 Spark 代码,须要设置几个配置设置,这取决于集群设置和工作流。在这种状况下,咱们设置 spark.executor.memory 以确保咱们不会遇到任何内存溢出或 Java 堆谬误。

RAPIDS

NVIDIA RAPIDS 是一个开源的 Python 框架,它在 gpu 而不是 cpu 上执行数据迷信代码。相似于在训练深度学习模型时所看到的,这将为数据迷信工作带来微小的性能晋升。

RAPIDS 有数据帧、ML、图形剖析等接口。RAPIDS 应用 Dask 来解决与具备多个 gpu 的机器的并行化,以及每个具备一个或多个 gpu 的机器集群。

设置 GPU 机器可能有点辣手,然而 Saturn Cloud 曾经为启动 GPU 集群预构建了映像,所以你只需几分钟就能够启动并运行了!要初始化指向群集的 Dask 客户端,能够运行以下命令:

from dask.distributed import Client
from dask_saturn import SaturnCluster

cluster = SaturnCluster()
client = Client(cluster)

要本人设置 Dask 集群,请参阅此 docs 页面:https://docs.dask.org/en/late…

数据加载

数据文件托管在一个公共的 S3 bucket 上,因而咱们能够间接从那里读取 csv。S3 bucket 的所有文件都在同一个目录中,所以咱们应用 s3fs 来抉择咱们想要的文件:

import s3fs
fs = s3fs.S3FileSystem(anon=True)
files = [f"s3://{x}" for x in fs.ls('s3://nyc-tlc/trip data/')
         if 'yellow' in x and ('2019' in x or '2018' in x or '2017' in x)]
         
cols = ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance',
      'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount',
      'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount']

应用 Spark,咱们须要独自读取每个 CSV 文件,而后将它们组合在一起:

import functools
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import DataFrame

# 手动指定模式,因为 read.csv 中的 inferSchema 十分慢
schema = StructType([StructField('VendorID', DoubleType()),
    StructField('tpep_pickup_datetime', TimestampType()),
    ...
    # 参考 notebook 取得残缺对象模式
]) 

def read_csv(path):
    df = spark.read.csv(path,
                        header=True,
                        schema=schema,
                        timestampFormat='yyyy-MM-dd HH:mm:ss',
                       )
    df = df.select(cols)
    return df

dfs = []
for tf in files:
    df = read_csv(tf)
    dfs.append(df)

taxi = functools.reduce(DataFrame.unionAll, dfs)
taxi.count()

应用 Dask+RAPIDS,咱们能够一次性读取所有 CSV 文件:

import dask_cudf

taxi = dask_cudf.read_csv(files, 
                          assume_missing=True,
                          parse_dates=[1,2], 
                          usecols=cols, 
                          storage_options={'anon': True})
len(taxi)

特色工程

咱们将依据工夫生成一些特色,而后保留数据帧。在这两个框架中,这将执行所有 CSV 加载和预处理,并将后果存储在 RAM 中(在 RAPIDS 的状况下是 GPU RAM)。咱们将用于训练的特色包含:

features = ['pickup_weekday', 'pickup_hour', 'pickup_minute',
            'pickup_week_hour', 'passenger_count', 'VendorID', 
            'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 
            'DOLocationID']

对于 Spark,咱们须要将特色收集到向量类中:

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.pipeline import Pipeline

taxi = taxi.withColumn('pickup_weekday', F.dayofweek(taxi.tpep_pickup_datetime).cast(DoubleType()))

taxi = taxi.withColumn('pickup_hour', F.hour(taxi.tpep_pickup_datetime).cast(DoubleType()))

taxi = taxi.withColumn('pickup_minute', F.minute(taxi.tpep_pickup_datetime).cast(DoubleType()))

taxi = taxi.withColumn('pickup_week_hour', ((taxi.pickup_weekday * 24) + taxi.pickup_hour).cast(DoubleType()))

taxi = taxi.withColumn('store_and_fwd_flag', F.when(taxi.store_and_fwd_flag == 'Y', 1).otherwise(0))

taxi = taxi.withColumn('label', taxi.total_amount)  
taxi = taxi.fillna(-1)

assembler = VectorAssembler(
    inputCols=features,
    outputCol='features',
)

pipeline = Pipeline(stages=[assembler])
assembler_fitted = pipeline.fit(taxi)
X = assembler_fitted.transform(taxi)
X.cache()
X.count()

对于 RAPIDS,咱们将所有浮点值转换为 float32,以便进行 GPU 计算:

from dask import persist
from dask.distributed import wait

taxi['pickup_weekday'] = taxi.tpep_pickup_datetime.dt.weekday
taxi['pickup_hour'] = taxi.tpep_pickup_datetime.dt.hour
taxi['pickup_minute'] = taxi.tpep_pickup_datetime.dt.minute
taxi['pickup_week_hour'] = (taxi.pickup_weekday * 24) + taxi.pickup_hour
taxi['store_and_fwd_flag'] = (taxi.store_and_fwd_flag == 'Y').astype(float)
taxi = taxi.fillna(-1)

X = taxi[features].astype('float32')
y = taxi['total_amount']
X, y = persist(X, y)
_ = wait([X, y])
len(X)

训练随机森林

咱们只须要几行代码就能够训练随机森林。

Spark:

from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(numTrees=100, maxDepth=10, seed=42)
fitted = rf.fit(X)

RAPIDS:

from cuml.dask.ensemble import RandomForestRegressor
rf = RandomForestRegressor(n_estimators=100, max_depth=10, seed=42)
_ = rf.fit(X, y)

后果

咱们对 Spark(CPU)和 RAPIDS(GPU)集群上的 300700143 个纽约出租车数据实例训练了一个随机森林模型。两个集群都有 20 个工作节点,每小时价格大致相同。以下是工作流每个局部的后果:

Task Spark RAPIDS
Load/rowcount 20.6 seconds 25.5 seconds
Feature engineering 54.3 seconds 23.1 seconds
Random forest 36.9 minutes 1.02 seconds

37 分钟的 Spark 与 1 秒的 RAPIDS

GPU 胜利!想一想,一次拟合你不须要期待 37 分钟了,这将放慢之后迭代和改良模型的速度。而在 CPU 上,一旦增加了超参数调优或测试不同的模型,迭代都很容易累积到数小时或数天。

你须要看到能力置信吗?你能够在这里找到 Notebook,而后本人运行测试:https://github.com/saturnclou…

你须要更快的随机森林吗

对!你能够在几秒钟内用 Saturn Cloud 进入 Dask/RAPIDS。Saturn 解决所有工具基础设施、安全性和部署方面的难题,让你立刻启动并运行 RAPIDS。点击这里在你的 AWS 帐户收费试用 Saturn:https://manager.aws.saturnent…

原文链接:https://towardsdatascience.co…

欢送关注磐创 AI 博客站:
http://panchuang.net/

sklearn 机器学习中文官网文档:
http://sklearn123.com/

欢送关注磐创博客资源汇总站:
http://docs.panchuang.net/

正文完
 0