关于大数据:图解大数据-综合案例使用spark分析新冠肺炎疫情数据

6次阅读

共计 12666 个字符,预计需要花费 32 分钟才能阅读完成。

作者:韩信子 @ShowMeAI
教程地址:http://www.showmeai.tech/tutorials/84
本文地址:http://www.showmeai.tech/article-detail/176
申明:版权所有,转载请分割平台与作者并注明出处

引言

2020 以来新冠疫情扭转了全世界,影响着大家的生存,本案例联合大数据分析技术,应用 pyspark 对 2020 年美国新冠肺炎疫情进行数据分析,并联合可视化办法进行后果出现。

1. 试验环境

  • (1)Linux:Ubuntu 16.04
  • (2)Hadoop3.1.3
  • (3)Python: 3.8
  • (4)Spark: 2.4.0
  • (5)Jupyter Notebook

2. 数据集

1)数据集下载

本案例应用的数据集来自 Kaggle 平台的美国新冠肺炎疫情数据集,数据名称 us-counties.csv,为 csv 文件,它蕴含了美国发现首例新冠肺炎确诊病例至 2020-05-19 的相干数据。

数据集下载(百度网盘)
链接:https://pan.baidu.com/s/1YNY2UREm5lXsNkHM3DZFmA
提取码:show

数据一览如下:

2)格局转换

原始数据为 csv 格式文件,咱们首先做一点数据格式转换,不便 spark 读取数据生成 RDD 或者 DataFrame,具体数据转换代码如下:

import pandas as pd
#.csv->.txt
data = pd.read_csv('/home/hadoop/us-counties.csv')
with open('/home/hadoop/us-counties.txt','a+',encoding='utf-8') as f:
    for line in data.values:
        f.write((str(line[0])+'\t'+str(line[1])+'\t'
                +str(line[2])+'\t'+str(line[3])+'\t'+str(line[4])+'\n'))

3)数据上传至 HDFS

而后上传“/home/hadoop/us-counties.txt”至 HDFS 文件系统中,具体门路为“/user/hadoop/us-counties.txt”。操作命令如下:

./bin/hdfs dfs -put /home/hadoop/us-counties.txt /user/hadoop

3. 应用 Spark 对数据进行剖析

这里采纳 Python 作为编程语言,联合 pyspark 进行数据分析。

1)数据读取与 DataFrame 构建

首先咱们读取数据文件,生成 Spark DataFrame。
本案例中应用的数据为结构化数据,因而能够应用 spark 读取源文件生成 DataFrame 以不便进行后续剖析实现。

from pyspark import SparkConf,SparkContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import datetime
import pyspark.sql.functions as func

def toDate(inputStr):
    newStr = ""
    if len(inputStr) == 8:
        s1 = inputStr[0:4]
        s2 = inputStr[5:6]
        s3 = inputStr[7]
        newStr = s1+"-"+"0"+s2+"-"+"0"+s3
    else:
        s1 = inputStr[0:4]
        s2 = inputStr[5:6]
        s3 = inputStr[7:]
        newStr = s1+"-"+"0"+s2+"-"+s3
    date = datetime.strptime(newStr, "%Y-%m-%d")
    return date


#主程序:
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

fields = [StructField("date", DateType(),False),StructField("county", StringType(),False),StructField("state", StringType(),False),
                    StructField("cases", IntegerType(),False),StructField("deaths", IntegerType(),False),]
schema = StructType(fields)

rdd0 = spark.sparkContext.textFile("/user/hadoop/us-counties.txt")
rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4])))

shemaUsInfo = spark.createDataFrame(rdd1,schema)

shemaUsInfo.createOrReplaceTempView("usInfo")

2)数据分析

本案例次要进行了以下统计分析,剖析的指标和办法如下:

  • 获取数据集与代码 → ShowMeAI 的官网 GitHub https://github.com/ShowMeAI-Hub/awesome-AI-cheatsheets
  • 运行代码段与学习 → 在线编程环境 http://blog.showmeai.tech/python3-compiler

(1)统计美国截止每日的累计确诊人数和累计死亡人数。

  • 以 date 作为分组字段,对 cases 和 deaths 字段进行汇总统计。

(2)统计美国每日的新增确诊人数。

  • 因为「新增数 = 今日数 - 昨日数」,这里应用自连贯,连贯条件是 t1.date = t2.date + 1,而后应用 t1.totalCases – t2.totalCases 计算该日新增。

(3)统计美国每日的新增确诊人数新增死亡人数。

  • 因为「新增数 = 今日数 - 昨日数」,这里应用自连贯,连贯条件是 t1.date = t2.date + 1,而后应用 t1.totalCases – t2.totalCases 计算该日新增。

(4)统计截止 5.19 日,美国各州的累计确诊人数和死亡人数。

  • 首先筛选出 5.19 日的数据,而后以 state 作为分组字段,对 cases 和 deaths 字段进行汇总统计。

(5)统计截止 5.19 日,美国确诊人数最多的十个州。

  • 对 3) 的后果 DataFrame 注册长期表,而后按确诊人数降序排列,并取前 10 个州。

(6)统计截止 5.19 日,美国死亡人数最多的十个州。

  • 对 3) 的后果 DataFrame 注册长期表,而后按死亡人数降序排列,并取前 10 个州。

(7)统计截止 5.19 日,美国确诊人数起码的十个州。

  • 对 3) 的后果 DataFrame 注册长期表,而后按确诊人数升序排列,并取前 10 个州。

(8)统计截止 5.19 日,美国死亡人数起码的十个州。

  • 对 3) 的后果 DataFrame 注册长期表,而后按死亡人数升序排列,并取前 10 个州。

(9)统计截止 5.19 日,全美和各州的病死率。

  • 病死率 = 死亡数 / 确诊数,对 3) 的后果 DataFrame 注册长期表,而后按公式计算。

咱们上面基于 Spark DataFrame 和 Spark sql 进行统计分析。

# 1. 计算每日的累计确诊病例数和死亡数
df = shemaUsInfo.groupBy("date").agg(func.sum("cases"),func.sum("deaths")).sort(shemaUsInfo["date"].asc())

# 列重命名
df1 = df.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths")
df1.repartition(1).write.json("result1.json")                               #写入 hdfs

# 注册为长期表供下一步应用
df1.createOrReplaceTempView("ustotal")

# 2. 计算每日较昨日的新增确诊病例数和死亡病例数
df2 = spark.sql("select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)")

df2.sort(df2["date"].asc()).repartition(1).write.json("result2.json")           #写入 hdfs

# 3. 统计截止 5.19 日 美国各州的累计确诊人数和死亡人数
df3 = spark.sql("select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfo  where date = to_date('2020-05-19','yyyy-MM-dd') group by date,state")

df3.sort(df3["totalCases"].desc()).repartition(1).write.json("result3.json") #写入 hdfs

df3.createOrReplaceTempView("eachStateInfo")

# 4. 找出美国确诊最多的 10 个州
df4 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases desc limit 10")
df4.repartition(1).write.json("result4.json")

# 5. 找出美国死亡最多的 10 个州
df5 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths desc limit 10")
df5.repartition(1).write.json("result5.json")

# 6. 找出美国确诊起码的 10 个州
df6 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases asc limit 10")
df6.repartition(1).write.json("result6.json")

# 7. 找出美国死亡起码的 10 个州
df7 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths asc limit 10")
df7.repartition(1).write.json("result7.json")

# 8. 统计截止 5.19 全美和各州的病死率
df8 = spark.sql("select 1 as sign,date,'USA'as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache()
df8.sort(df8["sign"].asc(),df8["deathRate"].desc()).repartition(1).write.json("result8.json")

3)后果文件

上述 Spark 计算结果保留.json 文件,不便后续可视化解决。因为应用 Python 读取 HDFS 文件系统不太不便,故将 HDFS 上后果文件转储到本地文件系统中,应用以下命:

./bin/hdfs dfs -get /user/hadoop/result1.json/*.json /home/hadoop/result/result1
...

对于 result2 等后果文件,应用雷同命令,只须要改一下门路即可。下载过程如下图所示:

4. 数据可视化

1)可视化工具抉择与代码

抉择应用 python 第三方库 pyecharts 作为可视化工具。

  • 获取数据集与代码 → ShowMeAI 的官网 GitHub https://github.com/ShowMeAI-Hub/awesome-AI-cheatsheets
  • 运行代码段与学习 → 在线编程环境 http://blog.showmeai.tech/python3-compiler

在应用前,须要装置 pyecharts,装置代码如下:

pip install pyecharts

具体可视化实现代码如下:

from pyecharts import options as opts
from pyecharts.charts import Bar
from pyecharts.charts import Line
from pyecharts.components import Table
from pyecharts.charts import WordCloud
from pyecharts.charts import Pie
from pyecharts.charts import Funnel
from pyecharts.charts import Scatter
from pyecharts.charts import PictorialBar
from pyecharts.options import ComponentTitleOpts
from pyecharts.globals import SymbolType
import json

每日的累计确诊病例数和死亡数 → 双柱状图

#1. 画出每日的累计确诊病例数和死亡数 → 双柱状图

def drawChart_1(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    date = []
    cases = []
    deaths = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            date.append(str(js['date']))
            cases.append(int(js['cases']))
            deaths.append(int(js['deaths']))

    d = (Bar()
    .add_xaxis(date)
    .add_yaxis("累计确诊人数", cases, stack="stack1")
    .add_yaxis("累计死亡人数", deaths, stack="stack1")
    .set_series_opts(label_opts=opts.LabelOpts(is_show=False))
    .set_global_opts(title_opts=opts.TitleOpts(title="美国每日累计确诊和死亡人数"))
    .render("/home/hadoop/result/result1/result1.html")
    )

每日的新增确诊病例数和死亡数 → 折线图

#2. 画出每日的新增确诊病例数和死亡数 → 折线图
def drawChart_2(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    date = []
    cases = []
    deaths = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            date.append(str(js['date']))
            cases.append(int(js['caseIncrease']))
            deaths.append(int(js['deathIncrease']))

    (Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
    .add_xaxis(xaxis_data=date)
    .add_yaxis(
        series_name="新增确诊",
        y_axis=cases,
        markpoint_opts=opts.MarkPointOpts(
            data=[opts.MarkPointItem(type_="max", name="最大值")

            ]
        ),
        markline_opts=opts.MarkLineOpts(data=[opts.MarkLineItem(type_="average", name="平均值")]
        ),
    )
    .set_global_opts(title_opts=opts.TitleOpts(title="美国每日新增确诊折线图", subtitle=""),
        tooltip_opts=opts.TooltipOpts(trigger="axis"),
        toolbox_opts=opts.ToolboxOpts(is_show=True),
        xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
    )
    .render("/home/hadoop/result/result2/result1.html")
    )
    (Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
    .add_xaxis(xaxis_data=date)
    .add_yaxis(
        series_name="新增死亡",
        y_axis=deaths,
        markpoint_opts=opts.MarkPointOpts(data=[opts.MarkPointItem(type_="max", name="最大值")]
        ),
        markline_opts=opts.MarkLineOpts(
            data=[opts.MarkLineItem(type_="average", name="平均值"),
                opts.MarkLineItem(symbol="none", x="90%", y="max"),
                opts.MarkLineItem(symbol="circle", type_="max", name="最高点"),
            ]
        ),
    )
    .set_global_opts(title_opts=opts.TitleOpts(title="美国每日新增死亡折线图", subtitle=""),
        tooltip_opts=opts.TooltipOpts(trigger="axis"),
        toolbox_opts=opts.ToolboxOpts(is_show=True),
        xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
    )
    .render("/home/hadoop/result/result2/result2.html")
    )

截止 5.19,美国各州累计确诊、死亡人数和病死率—-> 表格

#3. 画出截止 5.19,美国各州累计确诊、死亡人数和病死率 ---> 表格
def drawChart_3(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    allState = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            row = []
            row.append(str(js['state']))
            row.append(int(js['totalCases']))
            row.append(int(js['totalDeaths']))
            row.append(float(js['deathRate']))
            allState.append(row)

    table = Table()

    headers = ["State name", "Total cases", "Total deaths", "Death rate"]
    rows = allState
    table.add(headers, rows)
    table.set_global_opts(title_opts=ComponentTitleOpts(title="美国各州疫情一览", subtitle="")
    )
    table.render("/home/hadoop/result/result3/result1.html")

美国确诊最多的 10 个州 → 词云图

#4. 画出美国确诊最多的 10 个州 → 词云图
def drawChart_4(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    data = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            row=(str(js['state']),int(js['totalCases']))
            data.append(row)

    c = (WordCloud()
    .add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND)
    .set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊 Top10"))
    .render("/home/hadoop/result/result4/result1.html")
    )

美国死亡最多的 10 个州 → 柱状图

#5. 画出美国死亡最多的 10 个州 → 柱状图
def drawChart_5(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    state = []
    totalDeath = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            state.insert(0,str(js['state']))
            totalDeath.insert(0,int(js['totalDeaths']))

    c = (PictorialBar()
    .add_xaxis(state)
    .add_yaxis(
        "",
        totalDeath,
        label_opts=opts.LabelOpts(is_show=False),
        symbol_size=18,
        symbol_repeat="fixed",
        symbol_offset=[0, 0],
        is_symbol_clip=True,
        symbol=SymbolType.ROUND_RECT,
    )
    .reversal_axis()
    .set_global_opts(title_opts=opts.TitleOpts(title="PictorialBar- 美国各州死亡人数 Top10"),
        xaxis_opts=opts.AxisOpts(is_show=False),
        yaxis_opts=opts.AxisOpts(axistick_opts=opts.AxisTickOpts(is_show=False),
            axisline_opts=opts.AxisLineOpts(linestyle_opts=opts.LineStyleOpts(opacity=0)
            ),
        ),
    )
    .render("/home/hadoop/result/result5/result1.html")
    )

找出美国确诊起码的 10 个州 → 词云图

#6. 找出美国确诊起码的 10 个州 → 词云图
def drawChart_6(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    data = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            row=(str(js['state']),int(js['totalCases']))
            data.append(row)

    c = (WordCloud()
    .add("", data, word_size_range=[100, 20], shape=SymbolType.DIAMOND)
    .set_global_opts(title_opts=opts.TitleOpts(title="美国各州确诊起码的 10 个州"))
    .render("/home/hadoop/result/result6/result1.html")
    )

美国死亡起码的 10 个州 → 漏斗图

#7. 找出美国死亡起码的 10 个州 → 漏斗图
def drawChart_7(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    data = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            data.insert(0,[str(js['state']),int(js['totalDeaths'])])

    c = (Funnel()
    .add(
        "State",
        data,
        sort_="ascending",
        label_opts=opts.LabelOpts(position="inside"),
    )
    .set_global_opts(title_opts=opts.TitleOpts(title=""))
    .render("/home/hadoop/result/result7/result1.html")
    )

美国的病死率—-> 饼状图

#8. 美国的病死率 ---> 饼状图
def drawChart_8(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    values = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # 到 EOF,返回空字符串,则终止循环
                break
            js = json.loads(line)
            if str(js['state'])=="USA":
                values.append(["Death(%)",round(float(js['deathRate'])*100,2)])
                values.append(["No-Death(%)",100-round(float(js['deathRate'])*100,2)])
    c = (Pie()
    .add("", values)
    .set_colors(["blcak","orange"])
    .set_global_opts(title_opts=opts.TitleOpts(title="全美的病死率"))
    .set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}"))
    .render("/home/hadoop/result/result8/result1.html")
    )

#可视化
index = 1
while index<9:
    funcStr = "drawChart_" + str(index)
    eval(funcStr)(index)
    index+=1

2)后果图标展现

可视化后果是.html 格局的,reslut1 的后果展现图保留门路为“/home/hadoop/result/result1/result1.html”,reslut2 的后果展现图保留门路为“/home/hadoop/result/result2/result1.html”,其余相似递推。具体截图如下:

(1)美国每日的累计确诊病例数和死亡数 → 双柱状图

(2)美国每日的新增确诊病例数 → 折线图

(3)美国每日的新增死亡病例数 → 折线图

(4)截止 5.19,美国各州累计确诊、死亡人数和病死率 → 表格

(5)截止 5.19,美国累计确诊人数前 10 的州 → 词云图

(6)截止 5.19,美国累计死亡人数前 10 的州 → 柱状图

(7)截止 5.19,美国累计确诊人数起码的 10 个州 → 词云图

(8)截止 5.19,美国累计死亡人数起码的 10 个州 → 漏斗图

(9)截止 5.19,美国的病死率 → 饼状图

5. 参考资料

  • 数据迷信工具速查 | Spark 使用指南 (RDD 版) http://www.showmeai.tech/article-detail/106
  • 数据迷信工具速查 | Spark 使用指南 (SQL 版) http://www.showmeai.tech/article-detail/107

ShowMeAI 相干文章举荐

  • 图解大数据 | 导论:大数据生态与利用
  • 图解大数据 | 分布式平台:Hadoop 与 Map-reduce 详解
  • 图解大数据 | 实操案例:Hadoop 零碎搭建与环境配置
  • 图解大数据 | 实操案例:利用 map-reduce 进行大数据统计
  • 图解大数据 | 实操案例:Hive 搭建与利用案例
  • 图解大数据 | 海量数据库与查问:Hive 与 HBase 详解
  • 图解大数据 | 大数据分析开掘框架:Spark 初步
  • 图解大数据 | Spark 操作:基于 RDD 的大数据处理剖析
  • 图解大数据 | Spark 操作:基于 Dataframe 与 SQL 的大数据处理剖析
  • 图解大数据 | 综合案例:应用 spark 剖析美国新冠肺炎疫情数据
  • 图解大数据 | 综合案例:应用 Spark 剖析开掘批发交易数据
  • 图解大数据 | 综合案例:应用 Spark 剖析开掘音乐专辑数据
  • 图解大数据 | 流式数据处理:Spark Streaming
  • 图解大数据 | Spark 机器学习 (上)- 工作流与特色工程
  • 图解大数据 | Spark 机器学习 (下)- 建模与超参调优
  • 图解大数据 | Spark GraphFrames:基于图的数据分析开掘

ShowMeAI 系列教程举荐

  • 图解 Python 编程:从入门到精通系列教程
  • 图解数据分析:从入门到精通系列教程
  • 图解 AI 数学根底:从入门到精通系列教程
  • 图解大数据技术:从入门到精通系列教程

正文完
 0