乐趣区

Spark如何与深度学习框架协作处理非结构化数据

随着大数据和 AI 业务的不断融合,大数据分析和处理过程中,通过深度学习技术对非结构化数据(如图片、音频、文本)进行大数据处理的业务场景越来越多。本文会介绍 Spark 如何与深度学习框架进行协同工作,在大数据的处理过程利用深度学习框架对非结构化数据进行处理。

Spark 介绍

Spark 是大规模数据处理的事实标准,包括机器学习的操作,希望把大数据处理和机器学习管道整合。

Spark 使用函数式编程范式扩展了 MapReduce 模型以支持更多计算类型,可以涵盖广泛的工作流。Spark 使用内存缓存来提升性能,因此进行交互式分析也足够快速(如同使用 Python 解释器,与集群进行交互一样)。 缓存同时提升了迭代算法的性能,这使得 Spark 非常适合机器学习。

由于 Spark 库提供了 Python、Scale、Java 编写的 API,以及内建的机器学习、流数据、图算法、类 SQL 查询等模块;Spark 迅速成为当今最重要的分布式计算框架之一。与 YARN 结合,Spark 提供了增量,而不是替代已存在的 Hadoop 集群。在最近的 Spark 版本中,Spark 加入了对于 K8s 的支持,为 Spark 与 AI 能力的融合提供了更好的支持。

深度学习框架介绍

TensorFlow

TensorFlow 最初是由 Google 机器智能研究部门的 Google Brain 团队开发,基于 Google 2011 年开发的深度学习基础架构 DistBelief 构建起来的。由于 Google 在深度学习领域的巨大影响力和强大的推广能力,TensorFlow 一经推出就获得了极大的关注,并迅速成为如今用户最多的深度学习框架。

TensorFlow 是一个非常基础的系统,因此也可以应用于众多领域。但由于过于复杂的系统设计,对读者来说,学习 TensorFlow 底层运行机制更是一个极其痛苦的过程。TensorFlow 的接口一直处于快速迭代之中,并且没有很好地考虑向后兼容性,这导致现在许多开源代码已经无法在新版的 TensorFlow 上运行,同时也间接导致了许多基于 TensorFlow 的第三方框架出现 BUG。

Keras

Keras 于 2015 年 3 月首次发布,拥有“为人类而不是机器设计的 API”,得到 Google 的支持。它是一个用于快速构建深度学习原型的高层神经网络库,由纯 Python 编写而成,以 TensorFlow、CNTK、Theano 和 MXNet 为底层引擎,提供简单易用的 API 接口,能够极大地减少一般应用下用户的工作量。

严格意义上讲,Keras 并不能称为一个深度学习框架,它更像一个深度学习接口,它构建于第三方框架之上。Keras 的缺点很明显:过度封装导致丧失灵活性。Keras 最初作为 Theano 的高级 API 而诞生,后来增加了 TensorFlow 和 CNTK 作为后端。学习 Keras 十分容易,但是很快就会遇到瓶颈,因为它缺少灵活性。另外,在使用 Keras 的大多数时间里,用户主要是在调用接口,很难真正学习到深度学习的内容。

PyTorch

PyTorch 于 2016 年 10 月发布,是一款专注于直接处理数组表达式的低级 API。前身是 Torch(一个基于 Lua 语言的深度学习库)。Facebook 人工智能研究院对 PyTorch 提供了强力支持。PyTorch 支持动态计算图,为更具数学倾向的用户提供了更低层次的方法和更多的灵活性,目前许多新发表的论文都采用 PyTorch 作为论文实现的工具,成为学术研究的首选解决方案。

Caffe/Caffe2.0

Caffe 的全称是 Convolutional Architecture for Fast Feature Embedding,它是一个清晰、高效的深度学习框架,于 2013 年底由加州大学伯克利分校开发,核心语言是 C ++。它支持命令行、Python 和 MATLAB 接口。Caffe 的一个重要特色是可以在不编写代码的情况下训练和部署模型。如果您是 C ++ 熟练使用者,并对 CUDA 计算游刃有余,你可以考虑选择 Caffe。

在 Spark 大数据处理中使用深度学习框架

在 Spark 程序中使用一个预训练过的模型,将其并行应用于大型数据集的数据处理。比如,给定一个可以识别图片的分类模型,其通过一个标准数据集(如 ImageNet)训练过。可以在一个 Spark 程序中调用一个框架(如 TensorFlow 或 Keras)进行分布式预测。通过在大数据处理过程中调用预训练模型可以直接对非结构化数据进行直接处理。

我们重点介绍在 Spark 程序中使用 Keras+TensorFlow 来进行模型推理。

使用深度学习处理图片的第一步,就是载入图片。Spark 2.3 中新增的 ImageSchema 包含了载入数百万张图像到 Spark DataFrame 的实用函数,并且以分布式方式自动解码,容许可扩展地操作。

使用 Spark’s ImageSchema:

from pyspark.ml.image import ImageSchema
image_df = ImageSchema.readImages("/data/myimages")
image_df.show()

也可以利用 Keras 的图片处理库:

from keras.preprocessing import image
img = image.load_img("/data/myimages/daisy.jpg", target_size=(299, 299))

可以通过图片路径来构造 Spark DataFrame:

def get_image_paths_df(sqlContext, dirpath, colName):
    files = [os.path.abspath(os.path.join(dirpath, f)) for f in os.listdir(dirpath) if f.endswith('.jpg')]
    return sqlContext.createDataFrame(files, StringType()).toDF(colName)

使用 Keras 接口加载预训练模型:

from keras.applications import InceptionV3
model = InceptionV3(weights="imagenet")
model.save('/tmp/model-full.h5')
model = load_model('/tmp/model-full.h5')

定义图片识别推理方法:

        def iv3_predict(fpath):
            model = load_model('/tmp/model-full.h5')
            img = image.load_img(fpath, target_size=(299, 299))
            x = image.img_to_array(img)
            x = np.expand_dims(x, axis=0)
            x = preprocess_input(x)
 
            preds = model.predict(x)
            preds_decode_list = decode_predictions(preds, top=3)
            tmp = preds_decode_list[0]
            res_list = []
            for x in tmp:
                res = [x[0], x[1], float(x[2])]
                res_list.append(res)
            return res_list

定义推理输入结果 Schema:

def get_labels_type():    
    ele_type = StructType()    
    ele_type.add("class", data_type=StringType())    
    ele_type.add("description", data_type=StringType())    
    ele_type.add("probability", data_type=FloatType())    
    return ArrayType(ele_type)

将推理方法定义成 Spark UDF:

spark.udf.register("iv3_predict", iv3_predict, returnType=get_labels_type())

载入图片定义为数据表:

df = get_image_paths_df(self.sql)
df.createOrReplaceTempView("_test_image_paths_df")

使用 SQL 语句对接图片进行处理:

df_images = spark.sql("select fpath, iv3_predict(fpath) as predicted_labels from _test_image_paths_df")
df_images.printSchema()
df_images.show(truncate=False)

结语

在大数据 Spark 引擎中使用深度学习框架加载预处理模型,来进行非结构数据处理有非常多的应用场景。但是由于深度学习框架目前比较多,模型与框架本身是深度耦合,在大数据环境中安装和部署深度学习框架软件及其依赖软件会非常复杂,同时不利于大数据集群的管理和维护,增加人力成本。

华为云 DLI 服务,采用大数据 Serverless 架构,用户不需要感知实际物理集群,同时 DLI 服务已经在大数据集群中内置了 AI 计算框架和底层依赖库(Keras/tensorflow/scikit-learn/pandas/numpy 等)。DLI 最新版本中已经支持 k8s+Docker 生态,并开放用户自定义 Docker 镜像能力,提供给用户来扩展自己的 AI 框架、模型、算法包。在 Serverless 基础上,为用户提供更加开放的自定义扩展能力。

点击关注,第一时间了解华为云新鲜技术~

退出移动版