作者|GUEST
编译|VK
起源|Analytics Vidhya

概述

  • 在AWS电子病历上建设John Snow实验室的Spark NLP,并应用该库对BBC文章进行简略的文本分类。

介绍

自然语言解决是寰球数据迷信团队的重要过程之一。随着数据的一直增长,大多数组织曾经转移到大数据平台,如apachehadoop和AWS、Azure和GCP等云产品。

这些平台不仅可能解决大数据,使组织可能对非结构化数据(如文本分类)进行大规模剖析。但在机器学习方面,大数据系统和机器学习工具之间依然存在差距。

风行的机器学习python库,如scikit-learn和Gensim,通过高度优化,能够在单节点计算机上执行,而不是为分布式环境设计的。

Apache Spark MLlib是许多帮忙弥合这一差距的工具之一,它提供了大多数机器学习模型,如线性回归、Logistic回归、反对向量机、随机森林、K-means、LDA等,以执行最常见的机器学习工作。

除了机器学习算法,Spark MLlib还提供了大量的特色变换器,如Tokenizer、StopWordRemover、n-grams和countvector、TF-IDF和Word2Vec等。

尽管这些转换器和提取器足以构建根本的NLP管道,然而要构建一个更全面和生产级的管道,咱们须要更先进的技术,如词干剖析、词法化、词性标记和命名实体辨认。

Spark NLP提供了各种正文器来执行高级NLP工作。无关更多信息,请在网站上查看正文器列表及其用法

https://nlp.johnsnowlabs.com/...。

设置环境

让咱们持续看看如何在AWS EMR上设置Spark NLP。

1.在启动EMR集群之前,咱们须要创立一个疏导操作。疏导操作用于设置其他软件或自定义群集节点的配置。以下是可用于在EMR集群上设置Spark NLP的疏导操作,

#!/bin/bashsudo yum install -y python36-devel python36-pip python36-setuptools python36-virtualenvsudo python36 -m pip install --upgrade pip#sudo python36 -m pip install pandas#sudo python36 -m pip install boto3#sudo python36 -m pip install re#sudo python36 -m pip install spark-nlp==2.4.5

创立shell脚本之后,将该脚本复制到AWS S3中的一个地位。你还能够依据须要装置其余python包。

2.咱们能够应用AWS控制台、API或python中的boto3库来启动EMR集群。应用Python的益处是,无论何时须要实例化集群或将其增加到工作流中,都能够重用代码。

上面是实例化EMR集群的python代码。

import boto3region_name='region_name'def get_security_group_id(group_name, region_name):    ec2 = boto3.client('ec2', region_name=region_name)    response = ec2.describe_security_groups(GroupNames=[group_name])    return response['SecurityGroups'][0]['GroupId']emr = boto3.client('emr', region_name=region_name)cluster_response = emr.run_job_flow(        Name='cluster_name', # 更新值        ReleaseLabel='emr-5.27.0',        LogUri='s3_path_for_logs', # 更新值        Instances={            'InstanceGroups': [                {                    'Name': "Master nodes",                    'Market': 'ON_DEMAND',                    'InstanceRole': 'MASTER',                    'InstanceType': 'm5.2xlarge', # 依据要求进行变更                    'InstanceCount': 1 #对于主节点高可用性,设置计数大于1                },                {                    'Name': "Slave nodes",                    'Market': 'ON_DEMAND',                    'InstanceRole': 'CORE',                    'InstanceType': 'm5.2xlarge', # 依据要求进行变更                    'InstanceCount': 2                }            ],            'KeepJobFlowAliveWhenNoSteps': True,            'Ec2KeyName' : 'key_pair_name', # 更新值            'EmrManagedMasterSecurityGroup': get_security_group_id('ElasticMapReduce-master', region_name=region_name)            'EmrManagedSlaveSecurityGroup': get_security_group_id('ElasticMapReduce-master', region_name=region_name)        },        BootstrapActions=[    {                    'Name':'install_dependencies',                    'ScriptBootstrapAction':{                            'Args':[],                            'Path':'path_to_bootstrapaction_on_s3' # 更新值                            }                }],        Steps = [],        VisibleToAllUsers=True,        JobFlowRole='EMR_EC2_DefaultRole',        ServiceRole='EMR_DefaultRole',        Applications=[            { 'Name': 'hadoop' },            { 'Name': 'spark' },            { 'Name': 'hive' },            { 'Name': 'zeppelin' },            { 'Name': 'presto' }        ],        Configurations=[            # YARN            {                "Classification": "yarn-site",                 "Properties": {"yarn.nodemanager.vmem-pmem-ratio": "4",                               "yarn.nodemanager.pmem-check-enabled": "false",                               "yarn.nodemanager.vmem-check-enabled": "false"}            },                        # HADOOP            {                "Classification": "hadoop-env",                 "Configurations": [                        {                            "Classification": "export",                             "Configurations": [],                             "Properties": {"JAVA_HOME": "/usr/lib/jvm/java-1.8.0"}                        }                    ],                 "Properties": {}            },                        # SPARK            {                "Classification": "spark-env",                 "Configurations": [                        {                            "Classification": "export",                             "Configurations": [],                             "Properties": {"PYSPARK_PYTHON":"/usr/bin/python3",                                           "JAVA_HOME": "/usr/lib/jvm/java-1.8.0"}                        }                    ],                 "Properties": {}            },            {                "Classification": "spark",                "Properties": {"maximizeResourceAllocation": "true"},                "Configurations": []             },            {                "Classification": "spark-defaults",                "Properties": {                    "spark.dynamicAllocation.enabled": "true" #default is also true                }            }        ]    )

留神:请确保你对用于日志记录和存储疏导操作脚本的S3 bucket具备正确的拜访权限。

基于Spark-NLP的BBC文章文本分类

当初咱们曾经筹备好集群了,让咱们应用Spark NLP和Spark MLlib在BBC数据上构建一个简略的文本分类示例。

1.初始化Spark

咱们将导入所需的库并应用不同的配置参数初始化spark会话。配置值取决于我的本地环境。相应地调整参数。

# 导入Spark NLPfrom sparknlp.base import *from sparknlp.annotator import *from sparknlp.pretrained import PretrainedPipelineimport sparknlpfrom pyspark.sql import SparkSessionfrom pyspark.ml import Pipeline# 应用Spark NLP启动Spark会话#spark = sparknlp.start()spark = SparkSession.builder \    .appName("BBC Text Categorization")\    .config("spark.driver.memory","8G")\ change accordingly    .config("spark.memory.offHeap.enabled",True)\    .config("spark.memory.offHeap.size","8G") \    .config("spark.driver.maxResultSize", "2G") \    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.5")\    .config("spark.kryoserializer.buffer.max", "1000M")\    .config("spark.network.timeout","3600s")\    .getOrCreate()

2.加载文本数据

咱们将应用BBC的数据。你能够从这个链接下载数据。下载以下数据后,应用spark代码加载;

https://www.kaggle.com/yufeng...

# 文件地位和类型file_location = r'path\to\bbc-text.csv'file_type = "csv"# CSVinfer_schema = "true"first_row_is_header = "true"delimiter = ","df = spark.read.format(file_type) \  .option("inferSchema", infer_schema) \  .option("header", first_row_is_header) \  .option("sep", delimiter) \  .load(file_location)df.count()

3.将数据集拆分为训练集和测试集

与python应用scikit learn宰割数据不同,Spark Dataframe有一个内置函数randomSplit()来执行雷同的操作。

(trainingData, testData) = df.randomSplit([0.7, 0.3], seed = 100)

randomSplit()函数须要两个参数viz。权重数组和seed。在咱们的例子中,咱们将应用70/30宰割,其中70%是训练数据,30%是测试数据。

4.应用Spark NLP的NLP管道

让咱们持续应用Spark NLP构建NLP管道。Spark NLP最大的长处之一是它与Spark MLLib模块本机集成,有助于构建由transformers和estimators组成的综合ML管道。

这个管道能够包含诸如CountVectorizer或HashingTF和IDF之类的特征提取模块。咱们还能够在这个管道中蕴含一个机器学习模型。

上面是由具备特征提取和机器学习模型的NLP管道组成的示例;

from pyspark.ml.feature import HashingTF, IDF, StringIndexer, SQLTransformer,IndexToStringfrom pyspark.ml.classification import LogisticRegressionfrom pyspark.ml.evaluation import MulticlassClassificationEvaluator# 转换text列为nlp文件document_assembler = DocumentAssembler() \    .setInputCol("text") \    .setOutputCol("document")#将文档转换为标识数组tokenizer = Tokenizer() \  .setInputCols(["document"]) \  .setOutputCol("token") # 清理标识normalizer = Normalizer() \    .setInputCols(["token"]) \    .setOutputCol("normalized")# 删除停用词stopwords_cleaner = StopWordsCleaner()\      .setInputCols("normalized")\      .setOutputCol("cleanTokens")\      .setCaseSensitive(False)stemmer = Stemmer() \    .setInputCols(["cleanTokens"]) \    .setOutputCol("stem")# 将自定义文档构造转换为标识数组。finisher = Finisher() \    .setInputCols(["stem"]) \    .setOutputCols(["token_features"]) \    .setOutputAsArray(True) \    .setCleanAnnotations(False)# 生成频率hashingTF = HashingTF(inputCol="token_features", outputCol="rawFeatures", numFeatures=1000)# 生成逆文档频率idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)# 将标签(字符串)转换为整数。label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")# 定义一个简略的多项式逻辑回归模型。尝试不同的超参数组合,看看哪个更适宜你的数据。你也能够尝试不同的算法来比拟分数。lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.0)# 将索引(整数)转换为相应的类标签label_to_stringIdx = IndexToString(inputCol="label", outputCol="article_class")# 定义nlp管道nlp_pipeline = Pipeline(    stages=[document_assembler,             tokenizer,            normalizer,            stopwords_cleaner,             stemmer,             finisher,            hashingTF,            idf,            label_stringIdx,            lr,            label_to_stringIdx])

5.训练模型

当初咱们的NLP管道曾经筹备好了,让咱们依据训练数据训练咱们的模型。

# 在训练数据上拟合管道pipeline_model = nlp_pipeline.fit(trainingData)

6.执行预测

一旦训练实现,咱们就能够预测测试数据上的类标签。

# 对测试数据进行预测predictions =  pipeline_model.transform(testData)

7. 评估模型

对训练后的模型进行评估对于了解模型如何在看不见的数据上运行是十分重要的。咱们将看到3个风行的评估指标,准确度、精确度和召回率。

  1. 准确度
# 导入evaluatorfrom pyspark.ml.evaluation import MulticlassClassificationEvaluatorevaluator = MulticlassClassificationEvaluator(    labelCol="label", predictionCol="prediction", metricName="accuracy")accuracy = evaluator.evaluate(predictions)print("Accuracy = %g" % (accuracy))print("Test Error = %g " % (1.0 - accuracy))

  1. 精确度
evaluator = MulticlassClassificationEvaluator(    labelCol="label", predictionCol="prediction", metricName="weightedPrecision")accuracy = evaluator.evaluate(predictions)print("Accuracy = %g" % (accuracy))print("Test Error = %g " % (1.0 - accuracy))

  1. 召回率
evaluator = MulticlassClassificationEvaluator(    labelCol="label", predictionCol="prediction", metricName="weightedRecall")accuracy = evaluator.evaluate(predictions)print("Accuracy = %g" % (accuracy))print("Test Error = %g " % (1.0 - accuracy))

依据业务用例,你能够决定应用哪个度量来评估模型。

例如.如果一个机器学习模型被设计用来依据某些参数来检测癌症,那么最好应用召回率,因为公司无奈接受假负例(一个患有癌症但模型没有检测到癌症的人),而如果机器学习模型旨在生成用户举荐,公司能够负担得起误报(10条倡议中有8条合乎用户配置文件),因而能够应用精确度作为评估指标。

8. 保留管道模型

在胜利地训练、测试和评估模型之后,你能够将模型保留到磁盘,并在不同的Spark应用程序中应用它。要将模型保留到光盘,请应用以下代码;

pipeline_model.save('/path/to/storage_location')

论断

Spark NLP提供了大量的正文器和转换器来构建数据预处理管道。Sparl NLP与Spark MLLib无缝集成,使咱们可能在分布式环境中构建端到端的自然语言解决我的项目。

在本文中,咱们钻研了如何在AWS EMR上装置Spark NLP并实现了BBC数据的文本分类。咱们还钻研了Spark MLlib中的不同评估指标,并理解了如何存储模型以供进一步应用。

心愿你喜爱这篇文章。

原文链接:https://www.analyticsvidhya.c...

欢送关注磐创AI博客站:
http://panchuang.net/

sklearn机器学习中文官网文档:
http://sklearn123.com/

欢送关注磐创博客资源汇总站:
http://docs.panchuang.net/