作者|LAKSHAY ARORA
编译|VK
起源|Analytics Vidhya

概述

  • 流数据是机器学习畛域的一个新兴概念
  • 学习如何应用机器学习模型(如logistic回归)应用PySpark对流数据进行预测
  • 咱们将介绍流数据和Spark流的基础知识,而后深刻到实现局部

介绍

设想一下,每秒有超过8500条微博被发送,900多张照片被上传到Instagram上,超过4200个Skype电话被打,超过78000个谷歌搜寻产生,超过200万封电子邮件被发送(依据互联网实时统计)。

咱们正在以前所未有的速度和规模生成数据。在数据迷信畛域工作真是太好了!然而,随着大量数据的呈现,同样面临着简单的挑战。

次要是,咱们如何收集这种规模的数据?咱们如何确保咱们的机器学习管道在数据生成和收集后持续产生后果?这些都是业界面临的重大挑战,也是为什么流式数据的概念在各组织中越来越受到重视的起因。

减少解决流式数据的能力将大大提高你以后的数据迷信能力。这是业界急需的技能,如果你能把握它,它将帮忙你取得下一个数据迷信的角色。

因而,在本文中,咱们将理解什么是流数据,理解Spark流的基本原理,而后钻研一个与行业相干的数据集,以应用Spark实现流数据。

目录

  1. 什么是流数据?
  2. Spark流根底

    1. 离散流
    2. 缓存
    3. 检查点
  3. 流数据中的共享变量

    1. 累加器变量
    2. 播送变量
  4. 利用PySpark对流数据进行情感剖析

什么是流数据?

咱们看到了下面的社交媒体数据——咱们正在解决的数据令人难以置信。你能设想存储所有这些数据须要什么吗?这是一个简单的过程!因而,在咱们深刻探讨本文的Spark方面之前,让咱们花点工夫理解流式数据到底是什么。

流数据没有离散的开始或完结。这些数据是每秒从数千个数据源生成的,须要尽快进行解决和剖析。相当多的流数据须要实时处理,比方Google搜寻后果。

咱们晓得,一些论断在事件产生后更具价值,它们往往会随着工夫而失去价值。举个体育赛事的例子——咱们心愿看到即时剖析、即时统计得出的论断,以便在那一刻真正享受较量,对吧?

Spark流根底

Spark流是Spark API的扩大,它反对对实时数据流进行可伸缩和容错的流解决。

在跳到实现局部之前,让咱们先理解Spark流的不同组件。

离散流

离散流或数据流代表一个间断的数据流。这里,数据流要么间接从任何源接管,要么在咱们对原始数据做了一些解决之后接管。

构建流应用程序的第一步是定义咱们从数据源收集数据的批处理工夫。如果批处理工夫为2秒,则数据将每2秒收集一次并存储在RDD中。而这些RDD的间断序列链是一个不可变的离散流,Spark能够将其作为一个分布式数据集应用。

想想一个典型的数据迷信我的项目。在数据预处理阶段,咱们须要对变量进行转换,包含将分类变量转换为数值变量、删除异常值等。Spark保护咱们在任何数据上定义的所有转换的历史。因而,无论何时产生任何谬误,它都能够追溯转换的门路并从新生成计算结果。

咱们心愿Spark利用程序运行24小时 x 7,并且无论何时呈现任何故障,咱们都心愿它尽快恢复。然而,Spark在解决大规模数据时,呈现任何谬误时须要从新计算所有转换。你能够设想,这十分低廉。

缓存

以下是应答这一挑战的一种办法。咱们能够长期存储计算(缓存)的后果,以保护在数据上定义的转换的后果。这样,当呈现任何谬误时,咱们不用一次又一次地从新计算这些转换。

数据流容许咱们将流数据保留在内存中。当咱们要计算同一数据上的多个操作时,这很有帮忙。

检查点(Checkpointing)

当咱们正确应用缓存时,它十分有用,但它须要大量内存。并不是每个人都有数百台领有128GB内存的机器来缓存所有货色。

这就引入了检查点的概念。

检查点是保留转换数据帧后果的另一种技术。它将运行中的应用程序的状态不断地保留在任何牢靠的存储器(如HDFS)上。然而,它比缓存速度慢,灵活性低。

当咱们有流数据时,咱们能够应用检查点。转换后果取决于以前的转换后果,须要保留能力应用它。咱们还查看元数据信息,比方用于创立流数据的配置和一组DStream(离散流)操作的后果等等。

流数据中的共享变量

有时咱们须要为Spark利用程序定义map、reduce或filter等函数,这些函数必须在多个集群上执行。此函数中应用的变量将复制到每个计算机(集群)。

在这里,每个集群有一个不同的执行器,咱们须要一些货色,能够给咱们这些变量之间的关系。

例如,假如咱们的Spark利用程序运行在100个不同的集群上,捕捉来自不同国家的人公布的Instagram图片。咱们须要一个在他们的帖子中提到的特定标签的计数。

当初,每个集群的执行器将计算该集群上存在的数据的后果。然而咱们须要一些货色来帮忙这些集群进行通信,这样咱们就能够失去聚合的后果。在Spark中,咱们有一些共享变量能够帮忙咱们克服这个问题

累加器变量

用例,比方谬误产生的次数、空白日志的次数、咱们从某个特定国家收到申请的次数,所有这些都能够应用累加器来解决。

每个集群上的执行器将数据发送回驱动程序过程,以更新累加器变量的值。累加器仅实用于关联和替换的操作。例如,sum和maximum无效,而mean有效。

播送变量

当咱们解决地位数据时,比方城市名称和邮政编码的映射,这些都是固定变量。当初,如果任何集群上的特定转换每次都须要此类数据,咱们不须要向驱动程序发送申请,因为这太低廉了。

相同,咱们能够在每个集群上存储此数据的正本。这些类型的变量称为播送变量。

播送变量容许程序员在每台机器上缓存一个只读变量。通常,Spark会应用无效的播送算法主动调配播送变量,但如果咱们有多个阶段须要雷同数据的工作,咱们也能够定义它们。

利用PySpark对流数据进行情感剖析

是时候启动你最喜爱的IDE了!让咱们在本节中进行写代码,并以理论的形式了解流数据。

在本节中,咱们将应用实在的数据集。咱们的指标是在推特上发现怨恨舆论。为了简略起见,如果推特带有种族主义或性别歧视情绪,咱们说它蕴含怨恨舆论。

因而,工作是将种族主义或性别歧视的推文与其余推文进行分类。咱们将应用Tweets和label的训练样本,其中label'1'示意Tweet是种族主义/性别歧视,label'0'示意其余。

为什么这个我的项目与流解决相干?因为社交媒体平台以评论和状态更新的模式接管海量流媒体数据。这个我的项目将帮忙咱们限度公开公布的内容。

你能够在这里更具体地查看问题陈说-练习问题:Twitter情感剖析(https://datahack.analyticsvid...。咱们开始吧!

设置我的项目工作流

  1. 模型构建:咱们将建设一个逻辑回归模型管道来分类tweet是否蕴含怨恨舆论。在这里,咱们的重点不是建设一个十分准确的分类模型,而是查看如何应用任何模型并返回流数据的后果
  2. 初始化Spark流上下文:一旦构建了模型,咱们就须要定义从中获取流数据的主机名和端口号
  3. 流数据:接下来,咱们将从定义的端口增加netcat服务器的tweets,Spark API将在指定的持续时间后接收数据
  4. 预测并返回后果:一旦咱们收到tweet文本,咱们将数据传递到咱们创立的机器学习管道中,并从模型返回预测的情绪

上面是咱们工作流程的一个简洁阐明:

建设Logistic回归模型的数据训练

咱们在映射到标签的CSV文件中有对于Tweets的数据。咱们将应用logistic回归模型来预测tweet是否蕴含怨恨舆论。如果是,那么咱们的模型将预测标签为1(否则为0)。

你能够在这里下载数据集和代码(https://github.com/lakshay-ar...)。

首先,咱们须要定义CSV文件的模式,否则,Spark将把每列的数据类型视为字符串。咱们读取数据并查看:

# 导入所需库from pyspark import SparkContextfrom pyspark.sql.session import SparkSessionfrom pyspark.streaming import StreamingContextimport pyspark.sql.types as tpfrom pyspark.ml import Pipelinefrom pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssemblerfrom pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizerfrom pyspark.ml.classification import LogisticRegressionfrom pyspark.sql import Row# 初始化spark sessionsc = SparkContext(appName="PySparkShell")spark = SparkSession(sc)    # 定义计划my_schema = tp.StructType([  tp.StructField(name= 'id',          dataType= tp.IntegerType(),  nullable= True),  tp.StructField(name= 'label',       dataType= tp.IntegerType(),  nullable= True),  tp.StructField(name= 'tweet',       dataType= tp.StringType(),   nullable= True)])      # 读取数据集my_data = spark.read.csv('twitter_sentiments.csv',                         schema=my_schema,                         header=True)# 查看数据my_data.show(5)# 输入计划my_data.printSchema()

定义机器学习管道

当初咱们曾经在Spark数据帧中有了数据,咱们须要定义转换数据的不同阶段,而后应用它从咱们的模型中获取预测的标签。

在第一阶段中,咱们将应用RegexTokenizer 将Tweet文本转换为单词列表。而后,咱们将从单词列表中删除停用词并创立单词向量。在最初阶段,咱们将应用这些词向量建设一个逻辑回归模型,并失去预测情绪。

请记住,咱们的重点不是建设一个十分准确的分类模型,而是看看如何在预测模型中取得流数据的后果。

# 定义阶段1:标记tweet文本 stage_1 = RegexTokenizer(inputCol= 'tweet' , outputCol= 'tokens', pattern= '\\W')# 定义阶段2:删除停用字stage_2 = StopWordsRemover(inputCol= 'tokens', outputCol= 'filtered_words')# 定义阶段3:创立大小为100的词向量stage_3 = Word2Vec(inputCol= 'filtered_words', outputCol= 'vector', vectorSize= 100)# 定义阶段4:逻辑回归模型model = LogisticRegression(featuresCol= 'vector', labelCol= 'label')

设置咱们的机器学习管道

让咱们在Pipeline对象中增加stages变量,而后按程序执行这些转换。将管道与训练数据集匹配,当初,每当咱们有新的Tweet时,咱们只须要将其传递到管道对象并转换数据以取得预测:

# 设置管道pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])#拟合模型pipelineFit = pipeline.fit(my_data)

流数据和返回的后果

假如咱们每秒收到数百条评论,咱们心愿通过阻止公布蕴含怨恨舆论的评论的用户来放弃平台的洁净。所以,每当咱们收到新的文本,咱们就会把它传递到管道中,失去预测的情绪。

咱们将定义一个函数 get_prediction,它将删除空白语句并创立一个数据框,其中每行蕴含一条推特。

因而,初始化Spark流上下文并定义3秒的批处理持续时间。这意味着咱们将对每3秒收到的数据进行预测:

#定义一个函数来计算情感def get_prediction(tweet_text):    try:    # 过滤失去长度大于0的tweets        tweet_text = tweet_text.filter(lambda x: len(x) > 0)    # 创立一个列名为“tweet”的数据框,每行将蕴含一条tweet        rowRdd = tweet_text.map(lambda w: Row(tweet=w))    # 创立spark数据框        wordsDataFrame = spark.createDataFrame(rowRdd)    # 利用管道对数据进行转换,失去预测的情绪        pipelineFit.transform(wordsDataFrame).select('tweet','prediction').show()    except :         print('No data')    # 初始化流上下文ssc = StreamingContext(sc, batchDuration= 3)# 创立一个将连贯到hostname:port的数据流,如localhost:9991lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))# 用一个关键字“tweet_APP”宰割tweet文本,这样咱们就能够从一条tweet中辨认出一组单词words = lines.flatMap(lambda line : line.split('TWEET_APP'))# 获取收到的推文的预期情绪words.foreachRDD(get_prediction)#开始计算ssc.start()             # 期待完结ssc.awaitTermination()  

在一个终端上运行程序并应用Netcat(一个实用工具,可用于将数据发送到定义的主机名和端口号)。能够应用以下命令启动TCP连贯:

nc -lk port_number

最初,在第二个终端中键入文本,你将在另一个终端中实时取得预测:

视频演示地址:https://cdn.analyticsvidhya.c...

结尾

流数据在将来几年会减少的越来越多,所以你应该开始相熟这个话题。记住,数据迷信不仅仅是建设模型,还有一个残缺的管道须要解决。

本文介绍了Spark流的基本原理以及如何在实在数据集上实现它。我激励你应用另一个数据集或收集实时数据并实现咱们刚刚介绍的内容(你也能够尝试其余模型)。

原文链接:https://www.analyticsvidhya.c...

欢送关注磐创AI博客站:
http://panchuang.net/

sklearn机器学习中文官网文档:
http://sklearn123.com/

欢送关注磐创博客资源汇总站:
http://docs.panchuang.net/