每天天都在努力學習的我們
前言
本篇博客講解的內容依舊是使用Spark進行相關的資料分析,按理來說資料分析完之后應該搞一搞可視化的,由于目前時間緊張,顧不得學習可視化了,先來看一下此次的內容把,
在Kaggle資料平臺下載了資料集albunms.csv,里面包含了的主要欄位如下,先來看一下,

使用Spark讀取csv
spark讀取csv的方式有兩種,一種是使用rdd進行讀取csv,然后創建RDD物件,另一種是使用spark SQL進行讀取,創建DataFrame物件,本篇博客使用Spark SQL進行讀取檔案,RDD和Data Frame處理資料,
csv檔案我們都知道,","分隔符,但是讀取csv檔案的同時也要注意是否有無表頭,表頭欄位型別,下面來看一下Spark SQL讀取csv,
private
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark SQL") private
val sparkContext = new SparkContext(sparkConf)
private val sparkSession: SparkSession =SparkSession.builder().config(sparkConf).getOrCreate()
import sparkSession.implicits._ def
Transform_demo()={
//使用sparksession 讀取csv檔案,此csv檔案有表頭
val dataFrame = sparkSession.read.format("com.databricks.spark.csv")
//有無表頭
.option("header", true)
//是否自動推斷表頭型別
.option("inferSchema", false)
//分隔符
.option("delimiter", ",")
//csv檔案的地址
.csv("date/albums.csv")
dataFrame
}
統計各型別專輯的數量
思路:
各型別專輯的數量 ==》根據專輯型別(genre)分組,求出專輯型別的總和
使用spark SQL進行處理
def genre_demo() = {
//統計各個型別專輯的數量
val unit = Transform_demo()
.select($"genre")
.groupBy($"genre")
.count()
.sort($"count".desc)
.show() }
在Spark SQL中使用group by的時候,提供了group by之后的操作,比如
max(colNames:String*):獲取分組中指定欄位或者所有的數字型別欄位的最大值,只能作用于數字型欄位,
min(colNames:String*):獲取分組欄位或者所有的數字型別欄位的最小值,只能作用于數字型別的欄位,
mean(colName:String*):獲取分組中指定欄位或者所有數字型別欄位的平均值,只能作用于資料型別的欄位
sum(colNames:String*):獲取分組中指定欄位或者所有數字型別欄位的累加值,只能作用于數字型別的欄位
count():獲取分組中的元素個數
根據genre欄位分組后,求出專輯型別的個數,對專輯型別的個數進行排序,如果需要匯入到檔案里面,那么需要.write,
使用Rdd進行處理
def genre_demo() = {
val value = Transform_demo()
.select($"genre")
.rdd
.map(v => (v(0), 1))
.reduceByKey(_ + _)
.sortBy(_._2,false)
展示一下上面的流程圖把

統計各型別專輯的銷量總數
思路:
根據專輯型別分組,分組之后,計算num_of_sales專輯銷量總和,
如果是spark SQL的話,流程應該是這樣的,
dataFrame => select => group by => sum()
來看一下代碼
def countByNum_sales()={
Transform_demo()
.select($"genre",$"num_of_sales")
.withColumn("num_of_sales",col("num_of_sales")
.cast("Integer"))
.groupBy("genre")
.sum("num_of_sales")
.orderBy($"sum(num_of_sales)".desc)
.show() }
如果在group by后面直接sum求和,那么是會報錯的,因為在最開始的時候,我們并沒有讓系統自動推斷表頭的資料型別,默認為String型別,因此需要先轉換為整型,然后對其進行操作,
來看一下RDD的代碼
def countByNum_sales()={
val value = Transform_demo()
.select($"genre", $"num_of_sales")
.rdd
.map(v => (v(0).toString, v(1).toString.toInt))
.reduceByKey(_ + _) .sortBy(_._2, false)
value }

統計近20年每年發行的專輯數量和單曲數量
思路:
根據年份分組(year_of_pub),求每年發行的專輯數量和單曲數量,
單曲數量很簡單(num_of_tracks)累加,但是專輯數量怎么表示呢?
注意一下,num_of_tracks欄位:每張專輯中的單曲數量,什么意思呢?來看一下這樣的表示
(num_of_tracks,1) =》 (每張專輯的單曲數量,專輯數量)
這個1就代表著專輯的數量,
用RDD來理一下思路,(年份,(每張專輯的單曲數量,1)) ==>經過reduceByKey (年份,(每年發行的單曲數量,每年發行的專輯數量))
def Countbytracks() ={
//統計近20年每年發行的專輯數量和單曲數量;
//每年發行的單曲數量 num_of_tracks 專輯數量怎么算呢?
//num_of_tracks: 每張專輯中單曲數量
//(1張專輯,專輯發行的單曲數量)
val array = Transform_demo()
.select($"year_of_pub", $"num_of_tracks")
.rdd
.map(v => (v(0).toString.toInt, (v(1).toString.toInt, 1)))
.reduceByKey((x, y) => (x._1 + y._1, (x._2 + y._2)))
.sortByKey()
array }
來看一下流程圖

分析總銷量前五的專輯型別的各年份銷量
思路:
首先求出來總銷售量前五的專輯型別,然后取前五個數量最多的,之后在這個五個數量最多的專輯型別里面計算每年的銷量,
首先獲取總銷售量前五的專輯型別
def get_genre() = {
//先獲取總銷售量前5的專輯型別
val array = Transform_demo()
.select($"genre", $"num_of_sales")
.withColumn("num_of_sales", col("num_of_sales").cast("Integer"))
.groupBy($"genre")
.sum("num_of_sales")
.orderBy($"sum(num_of_sales)".desc)
.rdd
.map(v => v(0).toString)
.take(5)
array }
首先更改欄位型別,根據銷售型別(genre)分組,找到銷量最多的銷售型別.經過map轉換之后取前五個銷售型別,
獲取總銷量前5的專輯型別的各年份銷量
def per_year_sales()={
val genre_list = get_genre()
val value = Transform_demo()
.select($"genre", $"year_of_pub", $"num_of_sales")
.rdd
.filter(v => genre_list.contains(v(0)) )
.map(v => ((v(0).toString, v(1).toString), v(2).toString.toInt))
.reduceByKey(_ + _)
value }
如果單獨看上面的代碼看不懂,那么來看一下下面的流程圖把,一定明明白白,

總結
關于這篇音樂分析專案就到此為止了,
關于對資料的基本處理已經有了些眉目,接下來就是不斷的練習練習,
之后還有會其它的專案,我們最終的Boss專案就是離線數倉專案的建設,
最后,希望我的她可以越來越好,天天開心
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/437980.html
標籤:其他
上一篇:第二屆同花順演算法大賽 | 2022 | AI演算法
下一篇:離職在家背誦八股文的第一天
