Spark RDD編程初級實踐
- 湖工大永遠滴神 茂林!!!
- 提交例外問題解決
- 第一關 資料去重
- 第二個 整合排序
- 第三關 求平均值
湖工大永遠滴神 茂林!!!
RDD(Resilient Distributed Datasets, 彈性分布式資料集)是Spark最為核心的概念,它是一個只讀的、可磁區的分布式資料集,這個資料集的全部或部分可以快取在記憶體中,可在多次計算間重用,Spark用Scala語言實作了RDD的API,程式員可以通過呼叫API實作對RDD的各種操作,從而實作各種復雜的應用,
RDD編程都是從創建RDD開始的,可以通過多種方式創建得到RDD,例如,從本地檔案或者分布式檔案系統HDFS中讀取資料創建RDD,或者使用parallelize()方法從一個集合中創建得到RDD,
創建得到RDD以后,就可以對RDD執行各種操作,包括轉換操作和行動操作,RDD編程主要是對RDD各種操作API的使用,無論多復雜的Spark應用程式,最終都是借助于這些RDD操作來實作的,另外,通過持久化,可以把RDD保存在記憶體或者磁盤中,避免多次重復計算,通過對RDD進行磁區,不僅可以增加程式并行度,而且在一些應用場景中可以降低網路通信開銷,
鍵值對RDD(Pair RDD)是指每個RDD元素都是(key, value)鍵值對型別,是一種常見的RDD型別,在Spark編程中經常被使用,常用的鍵值對轉換操作包括reduceByKey(func)、groupByKey()、sortByKey()、sortBy()、mapValues(func)、join()、combineByKey()等,
提交例外問題解決
點擊右上角的重置代碼倉庫圖示,再重新提交
第一關 資料去重
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
object RemDup {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RemDup").setMaster("local")
val sc = new SparkContext(conf)
//輸入檔案fileA.txt和fileB.txt已保存在本地檔案系統/root/step1_files目錄中
val dataFile = "file:///root/step1_files"
val data = sc.textFile(dataFile, 2)
/********** Begin **********/
//第一步:執行過濾操作,把空行丟棄,
val rdd1 = data.filter(_.trim().length > 0)
//第二步:執行map操作,取出RDD中每個元素,去除尾部空格并生成一個(key, value)鍵值對,
val rdd2 = rdd1.map(line => (line.trim, ""))
//第三步:執行groupByKey操作,把所有key相同的value都組織成一個value-list,
val rdd3 = rdd2.groupByKey()
//第四步:對RDD進行重新磁區,變成一個磁區,
//在分布式環境下只有把所有磁區合并成一個磁區,才能讓所有元素排序后總體有序,
val rdd4 = rdd3.partitionBy(new HashPartitioner(1))
//第五步:執行sortByKey操作,對RDD中所有元素都按照key的升序排序,
val rdd5 = rdd4.sortByKey()
//第六步:執行keys操作,將鍵值對RDD中所有元素的key回傳,形成一個新的RDD,
val rdd6 = rdd5.keys
//第七步:執行collect操作,以陣列的形式回傳RDD中所有元素,
val rdd7 = rdd6.collect()
//第八步:執行foreach操作,并使用println列印出陣列中每個元素的值,
println("") //注意:此行不要修改,否則會影響測驗結果,在此行之后繼續完成第八步的代碼,
rdd7.foreach(println)
/********** End **********/
}
}
第二個 整合排序
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner
object FileSort {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("FileSort").setMaster("local")
val sc = new SparkContext(conf)
//輸入檔案file1.txt、file2.txt和file3.txt已保存在本地檔案系統/root/step2_files目錄中
val dataFile = "file:///root/step2_files"
val data = sc.textFile(dataFile, 3)
/********** Begin **********/
//第一步:執行過濾操作,把空行丟棄,
val rdd1 = data.filter(_.trim().length > 0)
//第二步:執行map操作,取出RDD中每個元素,去除尾部空格并轉換成整數,生成一個(key, value)鍵值對,
val rdd2 = rdd1.map(line => (line.trim.toInt, ""))
//第三步:對RDD進行重新磁區,變成一個磁區,
//在分布式環境下只有把所有磁區合并成一個磁區,才能讓所有元素排序后總體有序,
val rdd3 = rdd2.partitionBy(new HashPartitioner(1))
//第四步:執行sortByKey操作,對RDD中所有元素都按照key的升序排序,
val rdd4 = rdd3.sortByKey()
//第五步:執行keys操作,將鍵值對RDD中所有元素的key回傳,形成一個新的RDD,
val rdd5 = rdd4.keys
//第六步:執行map操作,取出RDD中每個元素,生成一個(key, value)鍵值對,
//其中key是整數的排序位次,value是原待排序的整數,
var index = 0
val rdd6 = rdd5.map(t => {
index = index + 1
(index, t)
})
//第七步:執行collect操作,以陣列的形式回傳RDD中所有元素,
val rdd7 = rdd6.collect()
//第八步:執行foreach操作,依次遍歷陣列中每個元素,分別取出(key, value)鍵值對中key和value,
//按如下格式輸出:key value
println("") //注意:此行不要修改,否則會影響測驗結果,在此行之后繼續完成第八步的代碼,
rdd7.foreach(t => println(t._1 + " " + t._2))
/********** End **********/
}
}
第三關 求平均值
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object AvgScore {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("FileSort").setMaster("local")
val sc = new SparkContext(conf)
//輸入檔案AlgorithmScore.txt、DataBaseScore.txt和PythonScore.txt已保存在本地檔案系統/root/step3_files目錄中
val dataFile = "file:///root/step3_files"
val data = sc.textFile(dataFile)
/********** Begin **********/
//第一步:執行過濾操作,把空行丟棄,
val rdd1 = data.filter(_.trim().length > 0)
//第二步:執行map操作,取出RDD中每個元素(即一行文本),以空格作為分隔符將一行文本拆分成兩個字串,
//拆分后得到的字串封裝在一個陣列物件中,成為新的RDD中一個元素,
var rdd2 = rdd1.map(line => line.split(" "))
//第三步:執行map操作,取出RDD中每個元素(即字串陣列),取字串陣列中第一個元素去除尾部空格,
//取字串陣列中第二個元素去除尾部空格并轉換成整數,并由這兩部分構建一個(key, value)鍵值對,
val rdd3 = rdd2.map(t => (t(0).trim, t(1).trim.toInt))
//第四步:執行mapValues操作,取出鍵值對RDD中每個元素的value,使用x=>(x,1)這個匿名函式進行轉換,
val rdd4 = rdd3.mapValues(x => (x, 1))
//第五步:執行reduceByKey操作,計算出每個學生所有課程的總分數和總課程門數,
val rdd5 = rdd4.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
//第六步:執行mapValues操作,計算出每個學生的平均成績,
val rdd6 = rdd5.mapValues(x => (x._1.toDouble / x._2))
//第七步:執行collect操作,以陣列的形式回傳RDD中所有元素,
val rdd7 = rdd6.collect()
//第八步:執行foreach操作,按如下格式列印出每個學生的平均成績:姓名 成績,其中成績要求保留兩位小數,
println("") //注意:此行不要修改,否則會影響測驗結果,在此行之后繼續完成第八步的代碼,
rdd7.foreach(t => {
val x = t._2
println(t._1 + " " + f"$x%1.2f")
})
/********** End **********/
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/277488.html
標籤:其他
