一:依賴關系
1:依賴和血緣關系介紹
rdd.todebugstring:列印血緣關系
rdd.dependencies:列印依賴關系

2:保存血緣關系

3:OneToOne依賴---窄依賴

4:shuffle依賴--寬依賴
新的RDD的一個磁區的資料依賴于舊的RDD多個磁區的資料,這個依賴稱之為shuffle依賴,

5:窄依賴的任務

6:寬依賴的任務

7:任務分類

1: 一個main方法里面可能有多個行動算子,比如collect,所以會有多個job
2:一個job可能會有多個階段,比如上圖寬依賴
3:一個階段可能會有多個task,比如上圖一個階段中的多個磁區
二:持久化
1:RDD自身并不會保存資料,重復讀取物件

2:引入持久化進行優化(檔案、記憶體均可)

3:持久化操作必須在行動算子執行時完成的,不然沒有資料,沒辦法進行持久化,
4:RDD物件的持久化操作并不一定是為了重用,在資料執行較長,或資料比較重要的場合也可以采用持久化操作,
5:CheckPoint檢查點
所謂的檢查點,就是通過將RDD中間結果寫入磁盤,
由于血緣依賴過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果檢查點之后出現問題,可以從檢查點開始重做血緣,減少了開銷,
對RDD進行checkpoint操作并不會馬上被執行,必須執行action操作才能觸發,
6: 快取和檢查點的區別
1:cache快取只是將資料保存起來,不切斷血緣依賴,checkpoint檢查點切斷血緣依賴,
2:cache快取的資料通常存盤在磁盤、記憶體等地方,可靠性低,checkpoint的資料通常存盤在hdfs等容錯、高可用的檔案系統,可靠性高,
3:建議對checkpoin的rdd使用cache快取,這樣checkpoint的job只需從cache快取中讀取資料即可,否則需要再從頭計算一次rdd
cache:將資料臨時存盤在記憶體中進行資料重用
會在血緣關系中添加新的依賴,一旦出現問題,可以重頭讀取資料,
persist:將資料臨時存盤在磁盤檔案中進行資料重用
涉及到磁盤IO,性能較低,但是資料安全
如果作業執行完畢,臨時保存的資料檔案就會丟失
checkpoint:將資料長久的保存在磁盤檔案中進行資料重用
涉及到磁盤IO,性能較低,但是資料安全
為了保證資料安全,所以一般情況下,會獨立執行作業
為了能夠提高效率,一般情況下,是需要和cache聯合使用
執行程序中,會切斷血緣關系,重新建立新的血緣關系,因為保存的資料比較安全,所以就是資料源的保存地址發生了改變,導致血緣關系發生改變,
三:磁區器
1:自定義磁區器:根據設定的規則,將同一規則的資料放在同一磁區內
package com.atguigu.bigdata.spark.rdd.part
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
object Spark01_RDD_Part {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(
("nba","************"),
("cba","************"),
("wnba","************"),
("nba","************")
),3)
val value = rdd.partitionBy(new MyPartitioner)
value.saveAsTextFile("output")
sc.stop()
}
class MyPartitioner extends Partitioner{
//磁區數量
override def numPartitions: Int = 3
//根據資料的key值,回傳資料的磁區索引(從0開始)
override def getPartition(key: Any): Int = {
key match {
case "nba" => 0
case "wnba" => 1
case _ => 2
}
}
}
}
四:檔案的讀取與保存
1:保存
package com.atguigu.bigdata.spark.rdd.IO
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_IO_Save {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(
List(
("a",1),
("b",2),
("c",3)
)
)
rdd.saveAsTextFile("output1")
rdd.saveAsObjectFile("output2")
rdd.saveAsSequenceFile("output3")
sc.stop()
}
}
2:讀取
package com.atguigu.bigdata.spark.rdd.IO
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_IO_Load {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.textFile("output1")
println(rdd.collect().mkString(","))
val rdd1 = sc.objectFile[(String,Int)]("output2")
println(rdd1.collect().mkString(","))
val rdd2 = sc.sequenceFile[String,Int]("output3")
println(rdd2.collect().mkString(","))
sc.stop()
}
}
五:資料結構--累加器(分布式的共享只寫變數)
1:概念
累加器用來將executor端變數資訊聚合到driver端,在driver程式中定義的變數,在executor端的每個task都會得到這個變數的一份新的副本,每個task更新這些副本的值后,傳回driver端進行merge

package com.atguigu.bigdata.spark.acc
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_Acc {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
//獲取系統累加器,spark默認提供了簡單資料聚合的累加器
val sumAcc = sc.longAccumulator("sum")
rdd.foreach(
num => {
sumAcc.add(num)
}
)
println(sumAcc.value)
sc.stop()
}
}
2:累加器的少加和多加
package com.atguigu.bigdata.spark.acc
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_Acc {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
//獲取系統累加器,spark默認提供了簡單資料聚合的累加器
val sumAcc = sc.longAccumulator("sum")
val mapRDD = rdd.map(
num => {
sumAcc.add(num)
num
}
)
//少加:轉換算子中呼叫累加器,如果沒有行動算子的話,那么不會執行
mapRDD.collect()
mapRDD.collect()
//多加:多次執行
println(sumAcc.value)
sc.stop()
}
}
3:自定義累加器
package com.atguigu.bigdata.spark.acc
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_Acc {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List("hello","spark","hello"))
//累加器:word count
//創建累加器物件
val wcAcc = new MyAccumulator()
//向spark進行注冊
sc.register(wcAcc,"wordCountAcc")
rdd.foreach(
word => {
//資料的累加(使用累加器)
wcAcc.add(word)
}
)
println(wcAcc.value)
sc.stop()
}
/*
自定義累加器
1.繼承:AccumulatorV2 定義泛型
IN:累加器輸入的資料型別
OUT:累加器回傳的資料型別
2.重寫方法
*/
class MyAccumulator extends AccumulatorV2[String,Map[String,Long]] {
private var wcMap = Map[String,Long]()
//判斷是否初始狀態
override def isZero: Boolean = {
wcMap.isEmpty
}
override def copy(): AccumulatorV2[String, Map[String, Long]] = {
new MyAccumulator()
}
override def reset(): Unit = {
wcMap.clear()
}
//獲取累加器需要計算的值
override def add(word: String): Unit = {
val newCnt = wcMap.getOrElse(word,0L) + 1
wcMap.updated(word,newCnt)
}
//driver合并多個累加器
override def merge(other: AccumulatorV2[String, Map[String, Long]]): Unit = {
val map1 = this.wcMap
val map2 = other.value
map2.foreach{
case (word,count) => {
val newCount = map1.getOrElse(word,0L) + count
map1.updated(word,newCount)
}
}
}
//累加器結果
override def value: Map[String, Long] = {
wcMap
}
}
}
六:廣播變數
Task的量,是由driver的磁區數決定的,和executor的個數無關

轉換為

只能訪問不能修改
spark中的廣播變數就可以將閉包的資料保存到executor的記憶體中,不能進行更改,
package com.atguigu.bigdata.spark.acc
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable
object Spark04_Bc {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd1 = sc.makeRDD(List(
("a",1),("b",2),("c",3)
))
/*val rdd2 = sc.makeRDD(List(
("a",4),("b",5),("c",6)
))
//join會導致資料量幾何增長,并且會影響shuffle大的性能,不推薦使用
val value:RDD[(String,(Int,Int))] = rdd1.join(rdd2)
value.collect().foreach(println)*/
/*val map = mutable.Map(("a",4),("b",5),("c",6))
rdd1.map{
case (w,c) => {
val l:Int = map.getOrElse(w,0)
(w,(c,l))
}
}.collect().foreach(println)*/
val map = mutable.Map(("a",4),("b",5),("c",6))
//封裝廣播變數
val bc:Broadcast[mutable.Map[String,Int]] = sc.broadcast(map)
rdd1.map{
case (w,c) => {
//訪問廣播變數
val l:Int = bc.value.getOrElse(w,0)
(w,(c,l))
}
}.collect().foreach(println)
sc.stop()
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/425056.html
標籤:其他
上一篇:云計算——讓學習更輕松
下一篇:泉州買房小指南
