大資料美國疫情分析2020年
本案例以2020年美國新冠肺炎疫情資料作為資料集,以Python為編程語言,使用Spark對資料進行分析,并對分析結果進行可視化,
0x00 實驗環境
(1)Linux:Ubuntu 16.04
(2)Hadoop3.1.3
(3)Python: 3.7
(4)Spark: 2.4.0
0x01 資料集
- 資料集下載
本次實驗使用的資料集來自資料網站Kaggle的美國新冠肺炎疫情資料集,該資料集以資料表us-counties.csv,其中包含了美國發現首例新冠肺炎確診病例至今(2020-05-19)的相關資料,資料包含以下欄位:
欄位名稱 欄位含義 例子
date 日期 2020/1/21;2020/1/22;etc
county 區縣(州的下一級單位) Snohomish;
state 州 Washington
cases 截止該日期該區縣的累計確診人數 1,2,3…
deaths 截止該日期該區縣的累計確診人數 1,2,3…

- 格式轉換
原始資料集是以.csv檔案組織的,為了方便spark讀取生成RDD或者DataFrame,首先將us-counties.csv轉換為.txt格式檔案us-counties.txt,轉換操作使用python實作,代碼組織在TranstionTxt.py中,具體代碼如下:
# -*- coding:utf-8 -*-
# @Time : 2021/1/21 10:26
# @Author: qiuqiuqiu
# @File : TranstionTxt.py
import pandas as pd
#.csv->.txt
data = pd.read_csv('us-counties.csv')
with open('us-counties.txt','a+',encoding='utf-8') as f:
for line in data.values:
f.write((str(line[0])+'\t'+str(line[1])+'\t')+str(line[2])+'\t'+str(line[3])+'\t'+str(line[4])+'\n')
print('success')
0x02 使用Spark對資料分析
這里采用Python作為編程語言,操作的完整實驗代碼存放在了analyst.py中,具體如下:
# -*- coding:utf-8 -*-
# @Time : 2021/1/21 14:30
# @Author: qiuqiuqiu
# @File : analyst.py
# 1) 統計美國截止每日的累計確診人數和累計死亡人數,做法是以date作為分組欄位,對cases和deaths欄位進行匯總統計,
# 2) 統計美國每日的新增確診人數和新增死亡人數,因為新增數=今日數-昨日數,所以考慮使用自連接,連接條件是t1.date = t2.date + 1,然后使用t1.totalCases – t2.totalCases計算該日新增,
# 3) 統計截止5.19日,美國各州的累計確診人數和死亡人數,首先篩選出5.19日的資料,然后以state作為分組欄位,對cases和deaths欄位進行匯總統計,
# 4) 統計截止5.19日,美國確診人數最多的十個州,對3)的結果DataFrame注冊臨時表,然后按確診人數降序排列,并取前10個州,
# 5) 統計截止5.19日,美國死亡人數最多的十個州,對3)的結果DataFrame注冊臨時表,然后按死亡人數降序排列,并取前10個州,
# 6) 統計截止5.19日,美國確診人數最少的十個州,對3)的結果DataFrame注冊臨時表,然后按確診人數升序排列,并取前10個州,
# 7) 統計截止5.19日,美國死亡人數最少的十個州,對3)的結果DataFrame注冊臨時表,然后按死亡人數升序排列,并取前10個州
# 8) 統計截止5.19日,全美和各州的病死率,病死率 = 死亡數/確診數,對3)的結果DataFrame注冊臨時表,然后按公式計算,
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import datetime
import pyspark.sql.functions as func
def toDate(inputStr):
newStr = ""
if len(inputStr) == 8:
s1 = inputStr[0:4]
s2 = inputStr[5:6]
s3 = inputStr[7]
newStr = s1 + "-" + "0" + s2 + "-" + "0" + s3
else:
s1 = inputStr[0:4]
s2 = inputStr[5:6]
s3 = inputStr[7:]
newStr = s1 + "-" + "0" + s2 + "-" + s3
date = datetime.strptime(newStr, "%Y-%m-%d")
return date
# 主程式:
spark = SparkSession.builder.appName('PythonSparkSQLexample').getOrCreate()
fields = [StructField("date", DateType(), False), StructField("county", StringType(), False),
StructField("state", StringType(), False),
StructField("cases", IntegerType(), False), StructField("deaths", IntegerType(), False), ]
schema = StructType(fields)
rdd0 = spark.sparkContext.textFile("us-counties.txt")
rdd1 = rdd0.map(lambda x: x.split("\t")).map(lambda p: Row(toDate(p[0]), p[1], p[2], int(p[3]), int(p[4])))
shemaUsInfo = spark.createDataFrame(rdd1, schema)
shemaUsInfo.createOrReplaceTempView("usInfo")
# 1.計算每日的累計確診病例數和死亡數
df = shemaUsInfo.groupBy("date").agg(func.sum("cases"), func.sum("deaths")).sort(shemaUsInfo["date"].asc())
# 列重命名
df1 = df.withColumnRenamed("sum(cases)", "cases").withColumnRenamed("sum(deaths)", "deaths")
# df1.repartition(1).write.json("result1.json") # 寫入hdfs
df1.repartition(1).write.format('json').mode('overwrite').save('result1.json')
print("success result1")
# 注冊為臨時表供下一步使用
df1.createOrReplaceTempView("ustotal")
# 2.計算每日較昨日的新增確診病例數和死亡病例數
df2 = spark.sql(
"select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)")
df2.sort(df2["date"].asc()).repartition(1).write.json("result2.json") # 寫入hdfs
print("success result2")
# 3.統計截止5.19日 美國各州的累計確診人數和死亡人數
df3 = spark.sql(
"select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfo where date = to_date('2020-05-19','yyyy-MM-dd') group by date,state")
df3.sort(df3["totalCases"].desc()).repartition(1).write.json("result3.json") # 寫入hdfs
print("success result3")
df3.createOrReplaceTempView("eachStateInfo")
# 4.找出美國確診最多的10個州
df4 = spark.sql("select date,state,totalCases from eachStateInfo order by totalCases desc limit 10")
df4.repartition(1).write.json("result4.json")
print("success result4")
# 5.找出美國死亡最多的10個州
df5 = spark.sql("select date,state,totalDeaths from eachStateInfo order by totalDeaths desc limit 10")
df5.repartition(1).write.json("result5.json")
print("success result5")
# 6.找出美國確診最少的10個州
df6 = spark.sql("select date,state,totalCases from eachStateInfo order by totalCases asc limit 10")
df6.repartition(1).write.json("result6.json")
print("success result6")
# 7.找出美國死亡最少的10個州
df7 = spark.sql("select date,state,totalDeaths from eachStateInfo order by totalDeaths asc limit 10")
df7.repartition(1).write.json("result7.json")
print("success result7")
# 8.統計截止5.19全美和各州的病死率
df8 = spark.sql(
"select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache()
df8.sort(df8["sign"].asc(), df8["deathRate"].desc()).repartition(1).write.json("result8.json")
print("success result8")
- 讀取檔案生成DataFrame
由于本實驗中使用的資料為結構化資料,因此可以使用spark讀取源檔案生成DataFrame以方便進行后續分析實作,
本部分代碼組織在analyst.py中,讀取us-counties.txt生成DataFrame的代碼如下:
# 主程式:
spark = SparkSession.builder.appName('PythonSparkSQLexample').getOrCreate()
fields = [StructField("date", DateType(), False), StructField("county", StringType(), False),
StructField("state", StringType(), False),
StructField("cases", IntegerType(), False), StructField("deaths", IntegerType(), False), ]
schema = StructType(fields)
rdd0 = spark.sparkContext.textFile("us-counties.txt")
rdd1 = rdd0.map(lambda x: x.split("\t")).map(lambda p: Row(toDate(p[0]), p[1], p[2], int(p[3]), int(p[4])))
shemaUsInfo = spark.createDataFrame(rdd1, schema)
shemaUsInfo.createOrReplaceTempView("usInfo")
- 進行資料分析
本實驗主要統計以下8個指標,分別是:
- 統計美國截止每日的累計確診人數和累計死亡人數,做法是以date作為分組欄位,對cases和deaths欄位進行匯總統計,
- 統計美國每日的新增確診人數和新增死亡人數,因為新增數=今日數-昨日數,所以考慮使用自連接,連接條件是t1.date = t2.date + 1,然后使用t1.totalCases – t2.totalCases計算該日新增,
- 統計截止5.19日,美國各州的累計確診人數和死亡人數,首先篩選出5.19日的資料,然后以state作為分組欄位,對cases和deaths欄位進行匯總統計,
- 統計截止5.19日,美國確診人數最多的十個州,對3)的結果DataFrame注冊臨時表,然后按確診人數降序排列,并取前10個州,
- 統計截止5.19日,美國死亡人數最多的十個州,對3)的結果DataFrame注冊臨時表,然后按死亡人數降序排列,并取前10個州,
- 統計截止5.19日,美國確診人數最少的十個州,對3)的結果DataFrame注冊臨時表,然后按確診人數升序排列,并取前10個州,
- 統計截止5.19日,美國死亡人數最少的十個州,對3)的結果DataFrame注冊臨時表,然后按死亡人數升序排列,并取前10個州
- 統計截止5.19日,全美和各州的病死率,病死率 = 死亡數/確診數,對3)的結果DataFrame注冊臨時表,然后按公式計算,
在計算以上幾個指標程序中,根據實作的簡易程度,既采用了DataFrame自帶的操作函式,又采用了spark sql進行操作,
- 結果檔案
上述Spark計算結果保存.json檔案,方便后續可視化處理,由于使用Python讀取HDFS檔案系統不太方便,故將HDFS上結果檔案轉儲到本地檔案系統中,使用以下命令:
對于result2等結果檔案,使用相同命令,只需要改一下路徑即可,
df2.sort(df2["date"].asc()).repartition(1).write.json("result2.json") # 寫入hdfs
資料可視化
- 可視化工具與選擇
選擇使用python第三方庫pyecharts作為可視化工具,
在使用前,需要安裝pyecharts,安裝代碼如下:
pip install pyecharts
具體可視化實作代碼組織與showdata.py檔案中,具體代碼如下:
# -*- coding:utf-8 -*-
# @Time : 2021/1/21 16:07
# @Author: qiuqiuqiu
# @File : showdata.py
from pyecharts import options as opts
from pyecharts.charts import Bar
from pyecharts.charts import Line
from pyecharts.components import Table
from pyecharts.charts import WordCloud
from pyecharts.charts import Pie
from pyecharts.charts import Funnel
from pyecharts.charts import Scatter
from pyecharts.charts import PictorialBar
from pyecharts.options import ComponentTitleOpts
from pyecharts.globals import SymbolType
import json
# 1.畫出每日的累計確診病例數和死亡數——>雙柱狀圖
def drawChart_1(index):
root = "p"+str(index)+".json"
print(root)
date = []
cases = []
deaths = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,回傳空字串,則終止回圈
break
js = json.loads(line)
date.append(str(js['date']))
cases.append(int(js['cases']))
deaths.append(int(js['deaths']))
d = (
Bar()
.add_xaxis(date)
.add_yaxis("累計確診人數", cases, stack="stack1")
.add_yaxis("累計死亡人數", deaths, stack="stack1")
.set_series_opts(label_opts=opts.LabelOpts(is_show=False))
.set_global_opts(title_opts=opts.TitleOpts(title="美國每日累計確診和死亡人數"))
.render("pp"+str(index)+".html")
)
# 2.畫出每日的新增確診病例數和死亡數——>折線圖
def drawChart_2(index):
root = "p"+str(index)+".json"
date = []
cases = []
deaths = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,回傳空字串,則終止回圈
break
js = json.loads(line)
date.append(str(js['date']))
cases.append(int(js['caseIncrease']))
deaths.append(int(js['deathIncrease']))
(
Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
.add_xaxis(xaxis_data=date)
.add_yaxis(
series_name="新增確診",
y_axis=cases,
markpoint_opts=opts.MarkPointOpts(
data=[
opts.MarkPointItem(type_="max", name="最大值")
]
),
markline_opts=opts.MarkLineOpts(
data=[opts.MarkLineItem(type_="average", name="平均值")]
),
)
.set_global_opts(
title_opts=opts.TitleOpts(title="美國每日新增確診折線圖", subtitle=""),
tooltip_opts=opts.TooltipOpts(trigger="axis"),
toolbox_opts=opts.ToolboxOpts(is_show=True),
xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
)
.render("pp"+str(index)+".html")
)
(
Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
.add_xaxis(xaxis_data=date)
.add_yaxis(
series_name="新增死亡",
y_axis=deaths,
markpoint_opts=opts.MarkPointOpts(
data=[opts.MarkPointItem(type_="max", name="最大值")]
),
markline_opts=opts.MarkLineOpts(
data=[
opts.MarkLineItem(type_="average", name="平均值"),
opts.MarkLineItem(symbol="none", x="90%", y="max"),
opts.MarkLineItem(symbol="circle", type_="max", name="最高點"),
]
),
)
.set_global_opts(
title_opts=opts.TitleOpts(title="美國每日新增死亡折線圖", subtitle=""),
tooltip_opts=opts.TooltipOpts(trigger="axis"),
toolbox_opts=opts.ToolboxOpts(is_show=True),
xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
)
.render("pp"+str(index)+".html")
)
# 3.畫出截止5.19,美國各州累計確診、死亡人數和病死率--->表格
def drawChart_3(index):
root = "p"+str(index)+".json"
allState = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,回傳空字串,則終止回圈
break
js = json.loads(line)
row = []
row.append(str(js['state']))
row.append(int(js['totalCases']))
row.append(int(js['totalDeaths']))
row.append(float(js['deathRate']))
allState.append(row)
table = Table()
headers = ["State name", "Total cases", "Total deaths", "Death rate"]
rows = allState
table.add(headers, rows)
table.set_global_opts(
title_opts=ComponentTitleOpts(title="美國各州疫情一覽", subtitle="")
)
table.render("pp"+str(index)+".html")
# 4.畫出美國確診最多的10個州——>詞云圖
def drawChart_4(index):
root ="p"+str(index)+".json"
data = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,回傳空字串,則終止回圈
break
js = json.loads(line)
row = (str(js['state']), int(js['totalCases']))
data.append(row)
c = (
WordCloud()
.add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND)
.set_global_opts(title_opts=opts.TitleOpts(title="美國各州確診Top10"))
.render("pp"+str(index)+".html")
)
# 5.畫出美國死亡最多的10個州——>象柱狀圖
def drawChart_5(index):
root ="p"+str(index)+".json"
state = []
totalDeath = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,回傳空字串,則終止回圈
break
js = json.loads(line)
state.insert(0, str(js['state']))
totalDeath.insert(0, int(js['totalDeaths']))
c = (
PictorialBar()
.add_xaxis(state)
.add_yaxis(
"",
totalDeath,
label_opts=opts.LabelOpts(is_show=False),
symbol_size=18,
symbol_repeat="fixed",
symbol_offset=[0, 0],
is_symbol_clip=True,
symbol=SymbolType.ROUND_RECT,
)
.reversal_axis()
.set_global_opts(
title_opts=opts.TitleOpts(title="PictorialBar-美國各州死亡人數Top10"),
xaxis_opts=opts.AxisOpts(is_show=False),
yaxis_opts=opts.AxisOpts(
axistick_opts=opts.AxisTickOpts(is_show=False),
axisline_opts=opts.AxisLineOpts(
linestyle_opts=opts.LineStyleOpts(opacity=0)
),
),
)
.render("pp"+str(index)+".html")
)
# 6.找出美國確診最少的10個州——>詞云圖
def drawChart_6(index):
root = "p"+str(index)+".json"
data = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,回傳空字串,則終止回圈
break
js = json.loads(line)
row = (str(js['state']), int(js['totalCases']))
data.append(row)
c = (
WordCloud()
.add("", data, word_size_range=[100, 20], shape=SymbolType.DIAMOND)
.set_global_opts(title_opts=opts.TitleOpts(title="美國各州確診最少的10個州"))
.render("pp"+str(index)+".html")
)
# 7.找出美國死亡最少的10個州——>漏斗圖
def drawChart_7(index):
root = "p"+str(index)+".json"
data = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,回傳空字串,則終止回圈
break
js = json.loads(line)
data.insert(0, [str(js['state']), int(js['totalDeaths'])])
c = (
Funnel()
.add(
"State",
data,
sort_="ascending",
label_opts=opts.LabelOpts(position="inside"),
)
.set_global_opts(title_opts=opts.TitleOpts(title=""))
.render("pp"+str(index)+".html")
)
# 8.美國的病死率--->餅狀圖
def drawChart_8(index):
root ="p"+str(index)+".json"
values = []
with open(root, 'r') as f:
while True:
line = f.readline()
if not line: # 到 EOF,回傳空字串,則終止回圈
break
js = json.loads(line)
if str(js['state']) == "USA":
values.append(["Death(%)", round(float(js['deathRate']) * 100, 2)])
values.append(["No-Death(%)", 100 - round(float(js['deathRate']) * 100, 2)])
c = (
Pie()
.add("", values)
.set_colors(["blcak", "orange"])
.set_global_opts(title_opts=opts.TitleOpts(title="全美的病死率"))
.set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}"))
.render("pp"+str(index)+".html")
)
# 可視化主程式:
index = 1
while index < 9:
funcStr = "drawChart_" + str(index)
eval(funcStr)(index)
index += 1
- 結果圖表展示
可視化結果是.html格式的,reslut1的結果展示圖保存路徑為“/home/hadoop/result/result1/result1.html”,reslut2的結果展示圖保存路徑為“/home/hadoop/result/result2/result1.html”,其余類似遞推,
-
美國每日的累計確診病例數和死亡數——>雙柱狀圖

-
美國每日的新增確診病例數——>折線圖

-
美國每日的新增死亡病例數——>折線圖

-
截止5.19,美國各州累計確診、死亡人數和病死率—>表格

-
截止5.19,美國累計確診人數前10的州—>詞云圖

-
截止5.19,美國累計死亡人數前10的州—>象柱狀圖

-
截止5.19,美國累計確診人數最少的10個州—>詞云圖

-
截止5.19,美國累計死亡人數最少的10個州—>漏斗圖

-
截止5.19,美國的病死率—>餅狀圖

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/252651.html
標籤:其他
上一篇:Scrapy爬取前程無憂
