关于人工智能:利用机器学习模型对PySpark流数据进行预测

42次阅读

共计 6056 个字符,预计需要花费 16 分钟才能阅读完成。

作者 |LAKSHAY ARORA
编译 |VK
起源 |Analytics Vidhya

概述

  • 流数据是机器学习畛域的一个新兴概念
  • 学习如何应用机器学习模型(如 logistic 回归)应用 PySpark 对流数据进行预测
  • 咱们将介绍流数据和 Spark 流的基础知识,而后深刻到实现局部

介绍

设想一下,每秒有超过 8500 条微博被发送,900 多张照片被上传到 Instagram 上,超过 4200 个 Skype 电话被打,超过 78000 个谷歌搜寻产生,超过 200 万封电子邮件被发送(依据互联网实时统计)。

咱们正在以前所未有的速度和规模生成数据。在数据迷信畛域工作真是太好了!然而,随着大量数据的呈现,同样面临着简单的挑战。

次要是,咱们如何收集这种规模的数据?咱们如何确保咱们的机器学习管道在数据生成和收集后持续产生后果?这些都是业界面临的重大挑战,也是为什么流式数据的概念在各组织中越来越受到重视的起因。

减少解决流式数据的能力将大大提高你以后的数据迷信能力。这是业界急需的技能,如果你能把握它,它将帮忙你取得下一个数据迷信的角色。

因而,在本文中,咱们将理解什么是流数据,理解 Spark 流的基本原理,而后钻研一个与行业相干的数据集,以应用 Spark 实现流数据。

目录

  1. 什么是流数据?
  2. Spark 流根底

    1. 离散流
    2. 缓存
    3. 检查点
  3. 流数据中的共享变量

    1. 累加器变量
    2. 播送变量
  4. 利用 PySpark 对流数据进行情感剖析

什么是流数据?

咱们看到了下面的社交媒体数据——咱们正在解决的数据令人难以置信。你能设想存储所有这些数据须要什么吗?这是一个简单的过程!因而,在咱们深刻探讨本文的 Spark 方面之前,让咱们花点工夫理解流式数据到底是什么。

流数据没有离散的开始或完结。这些数据是每秒从数千个数据源生成的,须要尽快进行解决和剖析。相当多的流数据须要实时处理,比方 Google 搜寻后果。

咱们晓得,一些论断在事件产生后更具价值,它们往往会随着工夫而失去价值。举个体育赛事的例子——咱们心愿看到即时剖析、即时统计得出的论断,以便在那一刻真正享受较量,对吧?

Spark 流根底

Spark 流是 Spark API 的扩大,它反对对实时数据流进行可伸缩和容错的流解决。

在跳到实现局部之前,让咱们先理解 Spark 流的不同组件。

离散流

离散流或数据流代表一个间断的数据流。这里,数据流要么间接从任何源接管,要么在咱们对原始数据做了一些解决之后接管。

构建流应用程序的第一步是定义咱们从数据源收集数据的批处理工夫。如果批处理工夫为 2 秒,则数据将每 2 秒收集一次并存储在 RDD 中。而这些 RDD 的间断序列链是一个不可变的离散流,Spark 能够将其作为一个分布式数据集应用。

想想一个典型的数据迷信我的项目。在数据预处理阶段,咱们须要对变量进行转换,包含将分类变量转换为数值变量、删除异常值等。Spark 保护咱们在任何数据上定义的所有转换的历史。因而,无论何时产生任何谬误,它都能够追溯转换的门路并从新生成计算结果。

咱们心愿 Spark 利用程序运行 24 小时 x 7,并且无论何时呈现任何故障,咱们都心愿它尽快恢复。然而,Spark 在解决大规模数据时,呈现任何谬误时须要从新计算所有转换。你能够设想,这十分低廉。

缓存

以下是应答这一挑战的一种办法。咱们能够长期存储计算(缓存)的后果,以保护在数据上定义的转换的后果。这样,当呈现任何谬误时,咱们不用一次又一次地从新计算这些转换。

数据流容许咱们将流数据保留在内存中。当咱们要计算同一数据上的多个操作时,这很有帮忙。

检查点 (Checkpointing)

当咱们正确应用缓存时,它十分有用,但它须要大量内存。并不是每个人都有数百台领有 128GB 内存的机器来缓存所有货色。

这就引入了检查点的概念。

检查点是保留转换数据帧后果的另一种技术。它将运行中的应用程序的状态不断地保留在任何牢靠的存储器(如 HDFS)上。然而,它比缓存速度慢,灵活性低。

当咱们有流数据时,咱们能够应用检查点。转换后果取决于以前的转换后果,须要保留能力应用它。咱们还查看元数据信息,比方用于创立流数据的配置和一组 DStream(离散流) 操作的后果等等。

流数据中的共享变量

有时咱们须要为 Spark 利用程序定义 map、reduce 或 filter 等函数,这些函数必须在多个集群上执行。此函数中应用的变量将复制到每个计算机(集群)。

在这里,每个集群有一个不同的执行器,咱们须要一些货色,能够给咱们这些变量之间的关系。

例如,假如咱们的 Spark 利用程序运行在 100 个不同的集群上,捕捉来自不同国家的人公布的 Instagram 图片。咱们须要一个在他们的帖子中提到的特定标签的计数。

当初,每个集群的执行器将计算该集群上存在的数据的后果。然而咱们须要一些货色来帮忙这些集群进行通信,这样咱们就能够失去聚合的后果。在 Spark 中,咱们有一些共享变量能够帮忙咱们克服这个问题

累加器变量

用例,比方谬误产生的次数、空白日志的次数、咱们从某个特定国家收到申请的次数,所有这些都能够应用累加器来解决。

每个集群上的执行器将数据发送回驱动程序过程,以更新累加器变量的值。累加器仅实用于关联和替换的操作。例如,sum 和 maximum 无效,而 mean 有效。

播送变量

当咱们解决地位数据时,比方城市名称和邮政编码的映射,这些都是固定变量。当初,如果任何集群上的特定转换每次都须要此类数据,咱们不须要向驱动程序发送申请,因为这太低廉了。

相同,咱们能够在每个集群上存储此数据的正本。这些类型的变量称为播送变量。

播送变量容许程序员在每台机器上缓存一个只读变量。通常,Spark 会应用无效的播送算法主动调配播送变量,但如果咱们有多个阶段须要雷同数据的工作,咱们也能够定义它们。

利用 PySpark 对流数据进行情感剖析

是时候启动你最喜爱的 IDE 了!让咱们在本节中进行写代码,并以理论的形式了解流数据。

在本节中,咱们将应用实在的数据集。咱们的指标是在推特上发现怨恨舆论。为了简略起见,如果推特带有种族主义或性别歧视情绪,咱们说它蕴含怨恨舆论。

因而,工作是将种族主义或性别歧视的推文与其余推文进行分类。咱们将应用 Tweets 和 label 的训练样本,其中 label’1’ 示意 Tweet 是种族主义 / 性别歧视,label’0’ 示意其余。

为什么这个我的项目与流解决相干?因为社交媒体平台以评论和状态更新的模式接管海量流媒体数据。这个我的项目将帮忙咱们限度公开公布的内容。

你能够在这里更具体地查看问题陈说 - 练习问题:Twitter 情感剖析 (https://datahack.analyticsvid…。咱们开始吧!

设置我的项目工作流

  1. 模型构建 :咱们将建设一个逻辑回归模型管道来分类 tweet 是否蕴含怨恨舆论。在这里,咱们的重点不是建设一个十分准确的分类模型,而是查看如何应用任何模型并返回流数据的后果
  2. 初始化 Spark 流上下文 :一旦构建了模型,咱们就须要定义从中获取流数据的主机名和端口号
  3. 流数据 :接下来,咱们将从定义的端口增加 netcat 服务器的 tweets,Spark API 将在指定的持续时间后接收数据
  4. 预测并返回后果 :一旦咱们收到 tweet 文本,咱们将数据传递到咱们创立的机器学习管道中,并从模型返回预测的情绪

上面是咱们工作流程的一个简洁阐明:

建设 Logistic 回归模型的数据训练

咱们在映射到标签的 CSV 文件中有对于 Tweets 的数据。咱们将应用 logistic 回归模型来预测 tweet 是否蕴含怨恨舆论。如果是,那么咱们的模型将预测标签为 1(否则为 0)。

你能够在这里下载数据集和代码(https://github.com/lakshay-ar…)。

首先,咱们须要定义 CSV 文件的模式,否则,Spark 将把每列的数据类型视为字符串。咱们读取数据并查看:

# 导入所需库
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.streaming import StreamingContext
import pyspark.sql.types as tp
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row

# 初始化 spark session
sc = SparkContext(appName="PySparkShell")
spark = SparkSession(sc)
    
# 定义计划
my_schema = tp.StructType([tp.StructField(name= 'id',          dataType= tp.IntegerType(),  nullable= True),
  tp.StructField(name= 'label',       dataType= tp.IntegerType(),  nullable= True),
  tp.StructField(name= 'tweet',       dataType= tp.StringType(),   nullable= True)
])
    
  
# 读取数据集
my_data = spark.read.csv('twitter_sentiments.csv',
                         schema=my_schema,
                         header=True)

# 查看数据
my_data.show(5)

# 输入计划
my_data.printSchema()

定义机器学习管道

当初咱们曾经在 Spark 数据帧中有了数据,咱们须要定义转换数据的不同阶段,而后应用它从咱们的模型中获取预测的标签。

在第一阶段中,咱们将应用 RegexTokenizer 将 Tweet 文本转换为单词列表。而后,咱们将从单词列表中删除停用词并创立单词向量。在最初阶段,咱们将应用这些词向量建设一个逻辑回归模型,并失去预测情绪。

请记住,咱们的重点不是建设一个十分准确的分类模型,而是看看如何在预测模型中取得流数据的后果。

# 定义阶段 1:标记 tweet 文本 
stage_1 = RegexTokenizer(inputCol= 'tweet' , outputCol= 'tokens', pattern= '\\W')
# 定义阶段 2:删除停用字
stage_2 = StopWordsRemover(inputCol= 'tokens', outputCol= 'filtered_words')
# 定义阶段 3:创立大小为 100 的词向量
stage_3 = Word2Vec(inputCol= 'filtered_words', outputCol= 'vector', vectorSize= 100)
# 定义阶段 4:逻辑回归模型
model = LogisticRegression(featuresCol= 'vector', labelCol= 'label')

设置咱们的机器学习管道

让咱们在 Pipeline 对象中增加 stages 变量,而后按程序执行这些转换。将管道与训练数据集匹配,当初,每当咱们有新的 Tweet 时,咱们只须要将其传递到管道对象并转换数据以取得预测:

# 设置管道
pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])

#拟合模型
pipelineFit = pipeline.fit(my_data)

流数据和返回的后果

假如咱们每秒收到数百条评论,咱们心愿通过阻止公布蕴含怨恨舆论的评论的用户来放弃平台的洁净。所以,每当咱们收到新的文本,咱们就会把它传递到管道中,失去预测的情绪。

咱们将定义一个函数 get_prediction,它将删除空白语句并创立一个数据框,其中每行蕴含一条推特。

因而,初始化 Spark 流上下文并定义 3 秒的批处理持续时间。这意味着咱们将对每 3 秒收到的数据进行预测:

# 定义一个函数来计算情感
def get_prediction(tweet_text):
    try:
    # 过滤失去长度大于 0 的 tweets
        tweet_text = tweet_text.filter(lambda x: len(x) > 0)
    # 创立一个列名为“tweet”的数据框,每行将蕴含一条 tweet
        rowRdd = tweet_text.map(lambda w: Row(tweet=w))
    # 创立 spark 数据框
        wordsDataFrame = spark.createDataFrame(rowRdd)
    # 利用管道对数据进行转换,失去预测的情绪
        pipelineFit.transform(wordsDataFrame).select('tweet','prediction').show()
    except : 
        print('No data')
    
# 初始化流上下文
ssc = StreamingContext(sc, batchDuration= 3)

# 创立一个将连贯到 hostname:port 的数据流,如 localhost:9991
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))

# 用一个关键字“tweet_APP”宰割 tweet 文本,这样咱们就能够从一条 tweet 中辨认出一组单词
words = lines.flatMap(lambda line : line.split('TWEET_APP'))

# 获取收到的推文的预期情绪
words.foreachRDD(get_prediction)

#开始计算
ssc.start()             

# 期待完结
ssc.awaitTermination()  

在一个终端上运行程序并应用 Netcat(一个实用工具,可用于将数据发送到定义的主机名和端口号)。能够应用以下命令启动 TCP 连贯:

nc -lk port_number

最初,在第二个终端中键入文本,你将在另一个终端中实时取得预测:

视频演示地址:https://cdn.analyticsvidhya.c…

结尾

流数据在将来几年会减少的越来越多,所以你应该开始相熟这个话题。记住,数据迷信不仅仅是建设模型,还有一个残缺的管道须要解决。

本文介绍了 Spark 流的基本原理以及如何在实在数据集上实现它。我激励你应用另一个数据集或收集实时数据并实现咱们刚刚介绍的内容(你也能够尝试其余模型)。

原文链接:https://www.analyticsvidhya.c…

欢送关注磐创 AI 博客站:
http://panchuang.net/

sklearn 机器学习中文官网文档:
http://sklearn123.com/

欢送关注磐创博客资源汇总站:
http://docs.panchuang.net/

正文完
 0