关于大数据:图解大数据-Spark机器学习上工作流与特征工程

48次阅读

共计 6862 个字符,预计需要花费 18 分钟才能阅读完成。

作者:韩信子 @ShowMeAI
教程地址:http://www.showmeai.tech/tutorials/84
本文地址:http://www.showmeai.tech/article-detail/180
申明:版权所有,转载请分割平台与作者并注明出处

1.Spark 机器学习工作流

1)Spark mllib 与 ml

Spark 中同样有用于大数据机器学习的板块 MLlib/ML,能够反对对海量数据进行建模与利用。

2)机器学习工作流(Pipeline)

一个典型的机器学习过程,从数据收集开始,要经验多个步骤,能力失去须要的输入。是一个蕴含多个步骤的流水线式工作:

  • 源数据 ETL(抽取、转化、加载)
  • 数据预处理
  • 指标提取
  • 模型训练与穿插验证
  • 新数据预测

MLlib 已足够简略易用,但在一些状况下应用 MLlib 将会让程序结构简单,难以了解和实现。

  • 指标数据集结构复杂须要屡次解决。
  • 对新数据进行预测的时候,须要联合多个曾经训练好的单个模型进行综合预测 Spark 1.2 版本之后引入的 ML Pipeline,能够用于构建简单机器学习工作流利用。

以下是几个重要概念的解释:

(1)DataFrame

应用 Spark SQL 中的 DataFrame 作为数据集,能够包容各种数据类型。较之 RDD,DataFrame 蕴含了 schema 信息,更相似传统数据库中的二维表格。

它被 ML Pipeline 用来存储源数据,例如 DataFrame 中的列能够是存储的文本、特征向量、实在标签和预测的标签等。

(2)Transformer(转换器)

是一种能够将一个 DataFrame 转换为另一个 DataFrame 的算法。比方,一个模型就是一个 Transformer,它能够把一个不蕴含预测标签的测试数据集 DataFrame 打上标签,转化成另一个蕴含预测标签的 DataFrame。

技术上,Transformer 实现了一个办法 transform(),通过附加一个或多个列将一个 DataFrame 转换为另一个 DataFrame。

(3)Estimator(预计器 / 评估器)

是学习算法或在训练数据上的训练方法的概念形象。在 Pipeline 里通常是被用来操作 DataFrame 数据,并生产一个 Transformer。从技术上讲,Estimator 实现了一个办法 fit(),它承受一个 DataFrame 并产生一个 Transformer 转换器。

(4)Parameter

Parameter 被用来设置 Transformer 或者 Estimator 的参数。当初,所有 Transformer(转换器)和 Estimator(预计器)可共享用于指定参数的公共 API。ParamMap 是一组(参数,值)对。

(5)PipeLine(工作流 / 管道)

工作流将多个工作流阶段 (Transformer 转换器和 Estimator 预计器) 连贯在一起,造成机器学习的工作流,并取得后果输入。

3)构建一个 Pipeline 工作流

val pipeline = new Pipeline().setStages(Array(stage1,stage2,stage3,…))

① 首先须要定义 Pipeline 中的各个 PipelineStage(工作流阶段)

  • 包含 Transformer 转换器 和 Estimator 评估器。
  • 比方指标提取 和 转换模型训练。
  • 有了这些解决特定问题的 Transformer 转换器和 Estimator 评估器,就能够依照具体的解决逻辑,有序地组织 PipelineStages,并创立一个 Pipeline。

② 而后,能够把训练数据集作为入参,并调用 Pipelin 实例的 fit 办法,开始以流的形式来解决源训练数据

  • 这个调用会返回一个 PipelineModel 类实例,进而被用来预测测试数据的标签

③ 工作流的各个阶段按程序运行,输出的 DataFrame 在它通过每个阶段时被转换

  • 对于 Transformer 转换器阶段,在 DataFrame 上调用 transform() 办法。
  • 对于 Estimator 预计器阶段,调用 fit()办法来生成一个转换器 (它成为 PipelineModel 的一部分或拟合的 Pipeline),并且在 DataFrame 上调用该转换器的 transform() 办法。

4)构建 Pipeline 示例

  • 获取数据集与代码 → ShowMeAI 的官网 GitHub https://github.com/ShowMeAI-Hub/awesome-AI-cheatsheets
  • 运行代码段与学习 → 在线编程环境 http://blog.showmeai.tech/python3-compiler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([(0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)

# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([(4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row  # type: ignore
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction   # type: ignore
        )
    )

2. 基于 DataFrame 的 Spark ML 特色工程

  • 获取数据集与代码 → ShowMeAI 的官网 GitHub https://github.com/ShowMeAI-Hub/awesome-AI-cheatsheets
  • 运行代码段与学习 → 在线编程环境 http://blog.showmeai.tech/python3-compiler

1)特色工程

2)二值化

continuousDataFrame = spark.createDataFrame([(0, 1.1),(1, 8.5),(2, 5.2)], ["id", "feature"])
binarizer = Binarizer(threshold=5.1, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)

3)定边界离散化

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")] 
data = [(-999.9,),(-0.5,),(-0.3,),(0.0,),(0.2,),(999.9,)] 
dataFrame = spark.createDataFrame(data, ["features"]) 
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures") 

# 依照给定的边界进行分桶 
bucketedData = bucketizer.transform(dataFrame)

4)依照分位数离散化

data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2), (5, 9.2), (6, 14.4)]
df = spark.createDataFrame(data, ["id", "hour"])
df = df.repartition(1)

# 分成 3 个桶进行离散化
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")
result = discretizer.fit(df).transform(df)

5)间断值幅度缩放

dataFrame = spark.createDataFrame([(0, Vectors.dense([1.0, 0.1, -8.0]),), 
(1, Vectors.dense([2.0, 1.0, -4.0]),), 
(2, Vectors.dense([4.0, 10.0, 8.0]),) 
], ["id", "features"]) 
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures") 

# 计算最大绝对值用于缩放 
scalerModel = scaler.fit(dataFrame) 

# 缩放幅度到 [-1, 1] 之间 
scaledData = scalerModel.transform(dataFrame)

6)标准化

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") 
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False) 

# 计算均值方差等参数 
scalerModel = scaler.fit(dataFrame) 

# 标准化 
scaledData = scalerModel.transform(dataFrame)

7)增加多项式特色

df = spark.createDataFrame([(Vectors.dense([2.0, 1.0]),), (Vectors.dense([0.0, 0.0]),), (Vectors.dense([3.0, -1.0]),)], ["features"]) 
polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures") 
polyDF = polyExpansion.transform(df)

8)类别型独热向量编码

df = spark.createDataFrame([(0,"a"), (1,"b"), (2,"c"), (3,"a"), (4,"a"), (5,"c")], ["id","category"]) 
stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex") 
model = stringIndexer.fit(df) 
indexed = model.transform(df) 

encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec") 
encoded = encoder.transform(indexed)

9)文本型特色抽取

df = spark.createDataFrame([(0, "a b c".split("")), (1,"a b b c a".split(" "))], ["id","words"]) 
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0) 
model = cv.fit(df) 
result = model.transform(df)

10)文本型特色抽取

sentenceData = spark.createDataFrame([(0.0, "Hi I heard about Spark"), 
(0.0, "I wish Java could use case classes"), 
(1.0, "Logistic regression models are neat") 
], ["label", "sentence"]) 

tokenizer = Tokenizer(inputCol="sentence", outputCol="words") 
wordsData = tokenizer.transform(sentenceData) 
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20) 
featurizedData = hashingTF.transform(wordsData) 
idf = IDF(inputCol="rawFeatures", outputCol="features") 
idfModel = idf.fit(featurizedData) 
rescaledData = idfModel.transform(featurizedData)

3. 参考资料

  • 数据迷信工具速查 | Spark 使用指南(RDD 版) http://www.showmeai.tech/article-detail/106
  • 数据迷信工具速查 | Spark 使用指南(SQL 版) http://www.showmeai.tech/article-detail/107
  • 黄美灵,Spark MLlib 机器学习:算法、源码及实战详解,电子工业出版社,2016
  • 应用 ML Pipeline 构建机器学习工作流 https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice5/index.html
  • Spark 官网文档:机器学习库 (MLlib) 指南,http://spark.apachecn.org/docs/cn/2.2.0/ml-guide.html

ShowMeAI 相干文章举荐

  • 图解大数据 | 导论:大数据生态与利用
  • 图解大数据 | 分布式平台:Hadoop 与 Map-reduce 详解
  • 图解大数据 | 实操案例:Hadoop 零碎搭建与环境配置
  • 图解大数据 | 实操案例:利用 map-reduce 进行大数据统计
  • 图解大数据 | 实操案例:Hive 搭建与利用案例
  • 图解大数据 | 海量数据库与查问:Hive 与 HBase 详解
  • 图解大数据 | 大数据分析开掘框架:Spark 初步
  • 图解大数据 | Spark 操作:基于 RDD 的大数据处理剖析
  • 图解大数据 | Spark 操作:基于 Dataframe 与 SQL 的大数据处理剖析
  • 图解大数据 | 综合案例:应用 spark 剖析美国新冠肺炎疫情数据
  • 图解大数据 | 综合案例:应用 Spark 剖析开掘批发交易数据
  • 图解大数据 | 综合案例:应用 Spark 剖析开掘音乐专辑数据
  • 图解大数据 | 流式数据处理:Spark Streaming
  • 图解大数据 | Spark 机器学习(上)- 工作流与特色工程
  • 图解大数据 | Spark 机器学习(下)- 建模与超参调优
  • 图解大数据 | Spark GraphFrames:基于图的数据分析开掘

ShowMeAI 系列教程举荐

  • 图解 Python 编程:从入门到精通系列教程
  • 图解数据分析:从入门到精通系列教程
  • 图解 AI 数学根底:从入门到精通系列教程
  • 图解大数据技术:从入门到精通系列教程
  • 图解机器学习算法:从入门到精通系列教程

正文完
 0