作者:韩信子@ShowMeAI
教程地址:http://www.showmeai.tech/tutorials/84
本文地址:http://www.showmeai.tech/article-detail/176
申明:版权所有,转载请分割平台与作者并注明出处

引言

2020以来新冠疫情扭转了全世界,影响着大家的生存,本案例联合大数据分析技术,应用pyspark对2020年美国新冠肺炎疫情进行数据分析,并联合可视化办法进行后果出现。

1.试验环境

  • (1)Linux: Ubuntu 16.04
  • (2)Hadoop3.1.3
  • (3)Python: 3.8
  • (4)Spark: 2.4.0
  • (5)Jupyter Notebook

2.数据集

1)数据集下载

本案例应用的数据集来自Kaggle平台的美国新冠肺炎疫情数据集,数据名称us-counties.csv,为csv文件,它蕴含了美国发现首例新冠肺炎确诊病例至2020-05-19的相干数据。

数据集下载(百度网盘)
链接:https://pan.baidu.com/s/1YNY2UREm5lXsNkHM3DZFmA
提取码:show

数据一览如下:

2)格局转换

原始数据为csv格式文件,咱们首先做一点数据格式转换,不便spark读取数据生成RDD或者DataFrame,具体数据转换代码如下:

import pandas as pd#.csv->.txtdata = pd.read_csv('/home/hadoop/us-counties.csv')with open('/home/hadoop/us-counties.txt','a+',encoding='utf-8') as f:    for line in data.values:        f.write((str(line[0])+'\t'+str(line[1])+'\t'                +str(line[2])+'\t'+str(line[3])+'\t'+str(line[4])+'\n'))

3)数据上传至HDFS

而后上传“/home/hadoop/us-counties.txt”至HDFS文件系统中,具体门路为“/user/hadoop/us-counties.txt”。操作命令如下:

./bin/hdfs dfs -put /home/hadoop/us-counties.txt /user/hadoop

3.应用Spark对数据进行剖析

这里采纳Python作为编程语言,联合pyspark进行数据分析。

1)数据读取与DataFrame构建

首先咱们读取数据文件,生成Spark DataFrame。
本案例中应用的数据为结构化数据,因而能够应用spark读取源文件生成DataFrame以不便进行后续剖析实现。

from pyspark import SparkConf,SparkContextfrom pyspark.sql import Rowfrom pyspark.sql.types import *from pyspark.sql import SparkSessionfrom datetime import datetimeimport pyspark.sql.functions as funcdef toDate(inputStr):    newStr = ""    if len(inputStr) == 8:        s1 = inputStr[0:4]        s2 = inputStr[5:6]        s3 = inputStr[7]        newStr = s1+"-"+"0"+s2+"-"+"0"+s3    else:        s1 = inputStr[0:4]        s2 = inputStr[5:6]        s3 = inputStr[7:]        newStr = s1+"-"+"0"+s2+"-"+s3    date = datetime.strptime(newStr, "%Y-%m-%d")    return date#主程序:spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()fields = [StructField("date", DateType(),False),StructField("county", StringType(),False),StructField("state", StringType(),False),                    StructField("cases", IntegerType(),False),StructField("deaths", IntegerType(),False),]schema = StructType(fields)rdd0 = spark.sparkContext.textFile("/user/hadoop/us-counties.txt")rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4])))shemaUsInfo = spark.createDataFrame(rdd1,schema)shemaUsInfo.createOrReplaceTempView("usInfo")

2)数据分析

本案例次要进行了以下统计分析,剖析的指标和办法如下:

  • 获取数据集与代码 → ShowMeAI的官网GitHub https://github.com/ShowMeAI-Hub/awesome-AI-cheatsheets
  • 运行代码段与学习 → 在线编程环境 http://blog.showmeai.tech/python3-compiler

(1)统计美国截止每日的累计确诊人数和累计死亡人数。

  • 以date作为分组字段,对cases和deaths字段进行汇总统计。

(2)统计美国每日的新增确诊人数。

  • 因为「新增数=今日数-昨日数」,这里应用自连贯,连贯条件是t1.date = t2.date + 1,而后应用t1.totalCases – t2.totalCases计算该日新增。

(3)统计美国每日的新增确诊人数新增死亡人数。

  • 因为「新增数=今日数-昨日数」,这里应用自连贯,连贯条件是t1.date = t2.date + 1,而后应用t1.totalCases – t2.totalCases计算该日新增。

(4)统计截止5.19日,美国各州的累计确诊人数和死亡人数。

  • 首先筛选出5.19日的数据,而后以state作为分组字段,对cases和deaths字段进行汇总统计。

(5)统计截止5.19日,美国确诊人数最多的十个州。

  • 对3)的后果DataFrame注册长期表,而后按确诊人数降序排列,并取前10个州。

(6)统计截止5.19日,美国死亡人数最多的十个州。

  • 对3)的后果DataFrame注册长期表,而后按死亡人数降序排列,并取前10个州。

(7)统计截止5.19日,美国确诊人数起码的十个州。

  • 对3)的后果DataFrame注册长期表,而后按确诊人数升序排列,并取前10个州。

(8)统计截止5.19日,美国死亡人数起码的十个州。

  • 对3)的后果DataFrame注册长期表,而后按死亡人数升序排列,并取前10个州。

(9)统计截止5.19日,全美和各州的病死率。

  • 病死率 = 死亡数/确诊数,对3)的后果DataFrame注册长期表,而后按公式计算。

咱们上面基于Spark DataFrame和Spark sql进行统计分析。

# 1.计算每日的累计确诊病例数和死亡数df = shemaUsInfo.groupBy("date").agg(func.sum("cases"),func.sum("deaths")).sort(shemaUsInfo["date"].asc())# 列重命名df1 = df.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths")df1.repartition(1).write.json("result1.json")                               #写入hdfs# 注册为长期表供下一步应用df1.createOrReplaceTempView("ustotal")# 2.计算每日较昨日的新增确诊病例数和死亡病例数df2 = spark.sql("select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)")df2.sort(df2["date"].asc()).repartition(1).write.json("result2.json")           #写入hdfs# 3.统计截止5.19日 美国各州的累计确诊人数和死亡人数df3 = spark.sql("select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfo  where date = to_date('2020-05-19','yyyy-MM-dd') group by date,state")df3.sort(df3["totalCases"].desc()).repartition(1).write.json("result3.json") #写入hdfsdf3.createOrReplaceTempView("eachStateInfo")# 4.找出美国确诊最多的10个州df4 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases desc limit 10")df4.repartition(1).write.json("result4.json")# 5.找出美国死亡最多的10个州df5 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths desc limit 10")df5.repartition(1).write.json("result5.json")# 6.找出美国确诊起码的10个州df6 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases asc limit 10")df6.repartition(1).write.json("result6.json")# 7.找出美国死亡起码的10个州df7 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths asc limit 10")df7.repartition(1).write.json("result7.json")# 8.统计截止5.19全美和各州的病死率df8 = spark.sql("select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache()df8.sort(df8["sign"].asc(),df8["deathRate"].desc()).repartition(1).write.json("result8.json")

3)后果文件

上述Spark计算结果保留.json文件,不便后续可视化解决。因为应用Python读取HDFS文件系统不太不便,故将HDFS上后果文件转储到本地文件系统中,应用以下命:

./bin/hdfs dfs -get /user/hadoop/result1.json/*.json /home/hadoop/result/result1...

对于result2等后果文件,应用雷同命令,只须要改一下门路即可。下载过程如下图所示:

4.数据可视化

1)可视化工具抉择与代码

抉择应用python第三方库pyecharts作为可视化工具。

  • 获取数据集与代码 → ShowMeAI的官网GitHub https://github.com/ShowMeAI-Hub/awesome-AI-cheatsheets
  • 运行代码段与学习 → 在线编程环境 http://blog.showmeai.tech/python3-compiler

在应用前,须要装置pyecharts,装置代码如下:

pip install pyecharts

具体可视化实现代码如下:

from pyecharts import options as optsfrom pyecharts.charts import Barfrom pyecharts.charts import Linefrom pyecharts.components import Tablefrom pyecharts.charts import WordCloudfrom pyecharts.charts import Piefrom pyecharts.charts import Funnelfrom pyecharts.charts import Scatterfrom pyecharts.charts import PictorialBarfrom pyecharts.options import ComponentTitleOptsfrom pyecharts.globals import SymbolTypeimport json

每日的累计确诊病例数和死亡数 → 双柱状图

#1.画出每日的累计确诊病例数和死亡数 → 双柱状图def drawChart_1(index):    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"    date = []    cases = []    deaths = []    with open(root, 'r') as f:        while True:            line = f.readline()            if not line:                            # 到 EOF,返回空字符串,则终止循环                break            js = json.loads(line)            date.append(str(js['date']))            cases.append(int(js['cases']))            deaths.append(int(js['deaths']))    d = (    Bar()    .add_xaxis(date)    .add_yaxis("累计确诊人数", cases, stack="stack1")    .add_yaxis("累计死亡人数", deaths, stack="stack1")    .set_series_opts(label_opts=opts.LabelOpts(is_show=False))    .set_global_opts(title_opts=opts.TitleOpts(title="美国每日累计确诊和死亡人数"))    .render("/home/hadoop/result/result1/result1.html")    )

每日的新增确诊病例数和死亡数 → 折线图

#2.画出每日的新增确诊病例数和死亡数 → 折线图def drawChart_2(index):    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"    date = []    cases = []    deaths = []    with open(root, 'r') as f:        while True:            line = f.readline()            if not line:                            # 到 EOF,返回空字符串,则终止循环                break            js = json.loads(line)            date.append(str(js['date']))            cases.append(int(js['caseIncrease']))            deaths.append(int(js['deathIncrease']))    (    Line(init_opts=opts.InitOpts(width="1600px", height="800px"))    .add_xaxis(xaxis_data=date)    .add_yaxis(        series_name="新增确诊",        y_axis=cases,        markpoint_opts=opts.MarkPointOpts(            data=[                opts.MarkPointItem(type_="max", name="最大值")            ]        ),        markline_opts=opts.MarkLineOpts(            data=[opts.MarkLineItem(type_="average", name="平均值")]        ),    )    .set_global_opts(        title_opts=opts.TitleOpts(title="美国每日新增确诊折线图", subtitle=""),        tooltip_opts=opts.TooltipOpts(trigger="axis"),        toolbox_opts=opts.ToolboxOpts(is_show=True),        xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),    )    .render("/home/hadoop/result/result2/result1.html")    )    (    Line(init_opts=opts.InitOpts(width="1600px", height="800px"))    .add_xaxis(xaxis_data=date)    .add_yaxis(        series_name="新增死亡",        y_axis=deaths,        markpoint_opts=opts.MarkPointOpts(            data=[opts.MarkPointItem(type_="max", name="最大值")]        ),        markline_opts=opts.MarkLineOpts(            data=[                opts.MarkLineItem(type_="average", name="平均值"),                opts.MarkLineItem(symbol="none", x="90%", y="max"),                opts.MarkLineItem(symbol="circle", type_="max", name="最高点"),            ]        ),    )    .set_global_opts(        title_opts=opts.TitleOpts(title="美国每日新增死亡折线图", subtitle=""),        tooltip_opts=opts.TooltipOpts(trigger="axis"),        toolbox_opts=opts.ToolboxOpts(is_show=True),        xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),    )    .render("/home/hadoop/result/result2/result2.html")    )

截止5.19,美国各州累计确诊、死亡人数和病死率—->表格

#3.画出截止5.19,美国各州累计确诊、死亡人数和病死率--->表格def drawChart_3(index):    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"    allState = []    with open(root, 'r') as f:        while True:            line = f.readline()            if not line:                            # 到 EOF,返回空字符串,则终止循环                break            js = json.loads(line)            row = []            row.append(str(js['state']))            row.append(int(js['totalCases']))            row.append(int(js['totalDeaths']))            row.append(float(js['deathRate']))            allState.append(row)    table = Table()    headers = ["State name", "Total cases", "Total deaths", "Death rate"]    rows = allState    table.add(headers, rows)    table.set_global_opts(        title_opts=ComponentTitleOpts(title="美国各州疫情一览", subtitle="")    )    table.render("/home/hadoop/result/result3/result1.html")

美国确诊最多的10个州 → 词云图

#4.画出美国确诊最多的10个州 → 词云图def drawChart_4(index):    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"    data = []    with open(root, 'r') as f:        while True:            line = f.readline()            if not line:                            # 到 EOF,返回空字符串,则终止循环                break            js = json.loads(line)            row=(str(js['state']),int(js['totalCases']))            data.append(row)    c = (    WordCloud()    .add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND)    .set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊Top10"))    .render("/home/hadoop/result/result4/result1.html")    )

美国死亡最多的10个州 → 柱状图

#5.画出美国死亡最多的10个州 → 柱状图def drawChart_5(index):    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"    state = []    totalDeath = []    with open(root, 'r') as f:        while True:            line = f.readline()            if not line:                            # 到 EOF,返回空字符串,则终止循环                break            js = json.loads(line)            state.insert(0,str(js['state']))            totalDeath.insert(0,int(js['totalDeaths']))    c = (    PictorialBar()    .add_xaxis(state)    .add_yaxis(        "",        totalDeath,        label_opts=opts.LabelOpts(is_show=False),        symbol_size=18,        symbol_repeat="fixed",        symbol_offset=[0, 0],        is_symbol_clip=True,        symbol=SymbolType.ROUND_RECT,    )    .reversal_axis()    .set_global_opts(        title_opts=opts.TitleOpts(title="PictorialBar-美国各州死亡人数Top10"),        xaxis_opts=opts.AxisOpts(is_show=False),        yaxis_opts=opts.AxisOpts(            axistick_opts=opts.AxisTickOpts(is_show=False),            axisline_opts=opts.AxisLineOpts(                linestyle_opts=opts.LineStyleOpts(opacity=0)            ),        ),    )    .render("/home/hadoop/result/result5/result1.html")    )

找出美国确诊起码的10个州 → 词云图

#6.找出美国确诊起码的10个州 → 词云图def drawChart_6(index):    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"    data = []    with open(root, 'r') as f:        while True:            line = f.readline()            if not line:                            # 到 EOF,返回空字符串,则终止循环                break            js = json.loads(line)            row=(str(js['state']),int(js['totalCases']))            data.append(row)    c = (    WordCloud()    .add("", data, word_size_range=[100, 20], shape=SymbolType.DIAMOND)    .set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊起码的10个州"))    .render("/home/hadoop/result/result6/result1.html")    )

美国死亡起码的10个州 → 漏斗图

#7.找出美国死亡起码的10个州 → 漏斗图def drawChart_7(index):    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"    data = []    with open(root, 'r') as f:        while True:            line = f.readline()            if not line:                            # 到 EOF,返回空字符串,则终止循环                break            js = json.loads(line)            data.insert(0,[str(js['state']),int(js['totalDeaths'])])    c = (    Funnel()    .add(        "State",        data,        sort_="ascending",        label_opts=opts.LabelOpts(position="inside"),    )    .set_global_opts(title_opts=opts.TitleOpts(title=""))    .render("/home/hadoop/result/result7/result1.html")    )

美国的病死率—->饼状图

#8.美国的病死率--->饼状图def drawChart_8(index):    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"    values = []    with open(root, 'r') as f:        while True:            line = f.readline()            if not line:                            # 到 EOF,返回空字符串,则终止循环                break            js = json.loads(line)            if str(js['state'])=="USA":                values.append(["Death(%)",round(float(js['deathRate'])*100,2)])                values.append(["No-Death(%)",100-round(float(js['deathRate'])*100,2)])    c = (    Pie()    .add("", values)    .set_colors(["blcak","orange"])    .set_global_opts(title_opts=opts.TitleOpts(title="全美的病死率"))    .set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}"))    .render("/home/hadoop/result/result8/result1.html")    )#可视化index = 1while index<9:    funcStr = "drawChart_" + str(index)    eval(funcStr)(index)    index+=1

2)后果图标展现

可视化后果是.html格局的,reslut1的后果展现图保留门路为“/home/hadoop/result/result1/result1.html”,reslut2的后果展现图保留门路为“/home/hadoop/result/result2/result1.html”,其余相似递推。具体截图如下:

(1)美国每日的累计确诊病例数和死亡数 → 双柱状图

(2)美国每日的新增确诊病例数 → 折线图

(3)美国每日的新增死亡病例数 → 折线图

(4)截止5.19,美国各州累计确诊、死亡人数和病死率 → 表格

(5)截止5.19,美国累计确诊人数前10的州 → 词云图

(6)截止5.19,美国累计死亡人数前10的州 → 柱状图

(7)截止5.19,美国累计确诊人数起码的10个州 → 词云图

(8)截止5.19,美国累计死亡人数起码的10个州 → 漏斗图

(9)截止5.19,美国的病死率 → 饼状图

5.参考资料

  • 数据迷信工具速查 | Spark使用指南(RDD版) http://www.showmeai.tech/article-detail/106
  • 数据迷信工具速查 | Spark使用指南(SQL版) http://www.showmeai.tech/article-detail/107

ShowMeAI相干文章举荐

  • 图解大数据 | 导论:大数据生态与利用
  • 图解大数据 | 分布式平台:Hadoop与Map-reduce详解
  • 图解大数据 | 实操案例:Hadoop零碎搭建与环境配置
  • 图解大数据 | 实操案例:利用map-reduce进行大数据统计
  • 图解大数据 | 实操案例:Hive搭建与利用案例
  • 图解大数据 | 海量数据库与查问:Hive与HBase详解
  • 图解大数据 | 大数据分析开掘框架:Spark初步
  • 图解大数据 | Spark操作:基于RDD的大数据处理剖析
  • 图解大数据 | Spark操作:基于Dataframe与SQL的大数据处理剖析
  • 图解大数据 | 综合案例:应用spark剖析美国新冠肺炎疫情数据
  • 图解大数据 | 综合案例:应用Spark剖析开掘批发交易数据
  • 图解大数据 | 综合案例:应用Spark剖析开掘音乐专辑数据
  • 图解大数据 | 流式数据处理:Spark Streaming
  • 图解大数据 | Spark机器学习(上)-工作流与特色工程
  • 图解大数据 | Spark机器学习(下)-建模与超参调优
  • 图解大数据 | Spark GraphFrames:基于图的数据分析开掘

ShowMeAI系列教程举荐

  • 图解Python编程:从入门到精通系列教程
  • 图解数据分析:从入门到精通系列教程
  • 图解AI数学根底:从入门到精通系列教程
  • 图解大数据技术:从入门到精通系列教程