Day1111
Spark任務調度
Spark幾個重要組件
Spark Core
RDD的概念和特性
生成RDD的兩種型別
RDD算子的兩種型別
算子練習
磁區
RDD的依賴關系
DAG:有向無環圖
任務提交
快取
checkPoint
自定義排序
自定義磁區器
自定義累加器
廣播變數
Spark Shuffle程序
Spark優化程序
SparkSQL
集成Hive
一.Spark Core
1 Spark任務調度:
|->:standalone
|->:local
|->:Yarn
|->:Mesos
2 Spark幾個重要的組件
|->:Master:管理Worker,負責接收Driver發送的注冊資訊(任務資訊)
|->:Worker:負責本節點資源和任務的管理,啟動Exector行程
|->:Exector:負責計算任務
|->:Driver:用來提交任務(SparkSubmit行程)
3 Spark Core: RDD的概念和特性
資料的描述
1):一組分片(Partition),即資料集的基本組成單位,對于RDD來說,每個分片都會被一個計算任務處理,并決定并行計算的粒度,用戶可以在創建RDD時指定RDD的分片個數,如果沒有指定,那么就會采用默認值,默認值就是程式所分配到的CPU Core的數目,
2):一個計算每個磁區的函式,Spark中RDD的計算是以分片為單位的,每個RDD都會實作compute函式以達到這個目的,compute函式會對迭代器進行復合,不需要保存每次計算的結果,
3):RDD之間的依賴關系,RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似于流水線一樣的前后依賴關系,在部分磁區資料丟失時,Spark可以通過這個依賴關系重新計算丟失的磁區資料,而不是對RDD的所有磁區進行重新計算,
4):一個Partitioner,即RDD的分片函式,當前Spark中實作了兩種型別的分片函式,一個是基于哈希的HashPartitioner,另外一個是基于范圍的RangePartitioner,只有對于key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None,Partitioner函式不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量,
5):一個串列,存盤存取每個Partition的優先位置(preferred location),對于一個HDFS檔案來說,這個串列保存的就是每個Partition所在的塊的位置,按照“移動資料不如移動計算”的理念,Spark在進行任務調度的時候,會盡可能地將計算任務分配到其所要處理資料塊的存盤位置,
基本特性:可磁區,函式,依賴,磁區器,就近原則
RDD的彈性
1): 自動進行記憶體和磁盤資料存盤的切換
Spark優先把資料放到記憶體中,如果記憶體放不下,就會放到磁盤里面,程式進行自動的存盤切換
2): 基于血統的高效容錯機制
在RDD進行轉換和動作的時候,會形成RDD的Lineage依賴鏈,當某一個RDD失效的時候,可以通過重新計算上游的RDD來重新生成丟失的RDD資料,
3): Task如果失敗會自動進行特定次數的重試
RDD的計算任務如果運行失敗,會自動進行任務的重新計算,默認次數是4次,
4): Stage如果失敗會自動進行特定次數的重試
如果Job的某個Stage階段計算失敗,框架也會自動進行任務的重新計算,默認次數也是4次,
5): Checkpoint和Persist可主動或被動觸發
RDD可以通過Persist持久化將RDD快取到記憶體或者磁盤,當再次用到該RDD時直接讀取就行,也可以將RDD進行檢查點,檢查點會將資料存盤在HDFS中,該RDD的所有父RDD依賴都會被移除,
6): 資料調度彈性
Spark把這個JOB執行模型抽象為通用的有向無環圖DAG,可以將多Stage的任務串聯或并行執行,調度引擎自動處理Stage的失敗以及Task的失敗,
7): 資料分片的高度彈性
可以根據業務的特征,動態調整資料分片的個數,提升整體的應用執行效率,
RDD全稱叫做彈性分布式資料集(Resilient Distributed Datasets):它是一種分布式的記憶體抽象,表示一個只讀的記錄磁區的集合,它只能通過其他RDD轉換而創建,為此,RDD支持豐富的轉換操作(如map, join, filter, groupBy等),通過這種轉換操作,新的RDD則包含了如何從其他RDDs衍生所必需的資訊,所以說RDDs之間是有依賴關系的,基于RDDs之間的依賴,RDDs會形成一個有向無環圖DAG,該DAG描述了整個流式計算的流程,實際執行的時候,RDD是通過血緣關系(Lineage)一氣呵成的,即使出現資料磁區丟失,也可以通過血緣關系重建磁區,總結起來,基于RDD的流式計算任務可描述為:從穩定的物理存盤(如分布式檔案系統)中加載記錄,記錄被傳入由一組確定性操作構成的DAG,然后寫回穩定存盤,另外RDD還可以將資料集快取到記憶體中,使得在多個操作之間可以重用資料集,基于這個特點可以很方便地構建迭代型應用(圖計算、機器學習等)或者互動式資料分析應用,可以說Spark最初也就是實作RDD的一個分布式系統,后面通過不斷發展壯大成為現在較為完善的大資料生態系統,簡單來講,Spark-RDD的關系類似于Hadoop-MapReduce關系,
4 生成RDD的兩種型別
1:從集合中創建RDD
val conf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(conf)
//這兩個方法都有第二引數是一個默認值2 分片數量(partition的數量)
//scala集合通過makeRDD創建RDD,底層實作也是parallelize
val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6))
//scala集合通過parallelize創建RDD
val rdd2 = sc.parallelize(Array(1,2,3,4,5,6))
2:從外部存盤創建RDD
al rdd3 = sc.textFile("hdfs://hadoop01:8020/word.txt")
5 RDD算子的兩種型別
|->:transformation算子:轉化成新RDD
|->:Action算子:轉化成非RDD
6 算子練習
|->迭代型別算子:map,flatMap,mapPartitions,foreach,foreachPartition...
|->shuffle類算子:
|->byKey:groupBy,reduceByKey(不一定),groupByKey,sortBy,SortByKey...
|->重磁區算子:repartition(必然發生shuffle),colaesce(不一定,多磁區變少磁區不需要發生shuffle),partitionBy(發生shuffle),repartitionAndSortWithinPartitions
|->join類算子:join(不一定),fullOuterJoi,leftOuterJoin,rightOuterJoin
|->去重類算子:distinct,countApproxDistinct(回傳去重的個數)
|->聚合類算子:reduce,reduceByKey,aggregate,aggregateByKey,fold,foldByKey,combineByKey,combineByKey,countByKey,countByValue
|->排序類算子:sortBy,sortByKey
優化:
1.map,mapPartition優化:一定要分資料量和對應的物力資源來確定到底使用哪個算子
資料量 | map | mapPartition
| 每個元素 | 每個磁區
--------------------------------------
比較大 | | 優先選擇
海量資料 | 優先選擇 | 可能發生OOM
2.foreach,foreachPartition優化:需要考慮到持久化時能夠承受的連接數
場景 | foreach | foreachPartition
| 每個元素 | 每個磁區
---------------------------------------------------------
連接資料庫 | 每個元素對應一個連接 | 優先選擇(一個磁區對應一個連接)
海量資料 | 優先選擇 | 可能發生OOM
3.groupByKey,reduceByKey:如果能用reduceByKey解決的需求就用reduceByKey
場景 | groupByKey | reduceByKey(區域聚合)
---------------------------------------------------------
| | 優先選擇
4.join+filter(過濾):為了避免join程序產生很大的資料集的情況,可以先filter再join
filter:過濾后再計算可能發生嚴重的資料傾斜,可以在過濾后先調整
5.序列化調優:
:RDD在計算程序中,呼叫的算子和傳入算子的函式都是在Executor端執行,除此之外都是在Driver端執行的
class SearchFunction(val query: String) extends Serializable {
//第一個方法是判斷輸入的字串是否存在query 存在回傳true,不存在回傳false
def isMatch(s: String): Boolean = {
s.contains(query)
}
// 問題:"isMatch"表示"this.isMatch",因此我們要傳遞整個"this"
def getMatchFunctionReference(rdd: RDD[String]): RDD[String] = rdd.filter(x => this.isMatch(x))// 等價于:rdd.filter(isMatch)
// 問題:"query"表示"this.query",因此我們要傳遞整個"this"
def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = rdd.filter(x => x.contains(this.query))
// 安全:只把我們需要的欄位拿出來放入區域變數中
def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
val _query = this.query
rdd.filter(x => x.contains(_query))
}
}
object SearchFunctions {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(SearchFunctions.getClass.getName).setMaster("local[2]")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(List("hello java", "hello scala hello", "hello hello"))
val sf = new SearchFunction("hello")
sf.getMatchFunctionReference(rdd).foreach(println)
sf.getMatchesFieldReference(rdd).foreach(println)
sf.getMatchesNoReference(rdd).foreach(println)
sc.stop()
}
}
class Rules extends Serializable {
val rulesMap = Map("xiaoli" -> 23, "xiaoming" -> 26)
}
object ObjectRules extends Serializable {
val rulesMap = Map("jack" -> 27, "lucy" -> 22)
}
object SerializeTest_1 {
def main(args: Array[String]): Unit = {
val conf = SparkUtil.getSparkConf
val sc = new SparkContext(conf)
val lines = sc.parallelize(Array("xiaolv", "xiaohong", "xiaoming"))
//map方法中的函式是在Executor的某個Task中執行的
val res = lines.map(x => {
val rules = new Rules
val hostname = InetAddress.getLocalHost.getHostName
val threadName = Thread.currentThread().getName
(hostname, threadName, rules.rulesMap.getOrElse(x, 0), rules.toString)
})
println(res.collect.toBuffer)
/*
ArrayBuffer(
(localhost,Executor task launch worker for task 0,0,cn.qf.streaming.day01.test.Rules@5c3d762c),
(localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.Rules@736d5f3b),
(localhost,Executor task launch worker for task 1,26,cn.qf.streaming.day01.test.Rules@374cd5ba))
*/
sc.stop()
}
}
object SerializeTest_2 {
def main(args: Array[String]): Unit = {
val conf = SparkUtil.getSparkConf
val sc = new SparkContext(conf)
val lines = sc.parallelize(Array("xiaolv", "xiaohong", "xiaoming"))
//該物件在Driver中創建
val rules = new Rules
//map方法中的函式是在Executor的某個Task中執行的
val res = lines.map(x => {
val hostname = InetAddress.getLocalHost.getHostName
val threadName = Thread.currentThread().getName
(hostname, threadName, rules.rulesMap.getOrElse(x, 0), rules.toString)
})
println(res.collect.toBuffer)
/*
ArrayBuffer(
(localhost,Executor task launch worker for task 0,0,cn.qf.streaming.day01.test.Rules@48158406),
(localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.Rules@a287af2),
(localhost,Executor task launch worker for task 1,26,cn.qf.streaming.day01.test.Rules@a287af2))
*/
sc.stop()
}
}
object SerializeTest_3 {
def main(args: Array[String]): Unit = {
val conf = SparkUtil.getSparkConf
val sc = new SparkContext(conf)
val lines = sc.parallelize(Array("xiaolv", "xiaohong", "xiaoming"))
//該物件在Driver中創建單例物件
val rules = ObjectRules
//map方法中的函式是在Executor的某個Task中執行的
val res = lines.map(x => {
val hostname = InetAddress.getLocalHost.getHostName
val threadName = Thread.currentThread().getName
(hostname, threadName, rules.rulesMap.getOrElse(x, 0), rules.toString)
})
println(res.collect.toBuffer)
/*
ArrayBuffer(
(localhost,Executor task launch worker for task 0,0,cn.qf.streaming.day01.test.ObjectRules$@543e593),
(localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.ObjectRules$@543e593),
(localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.ObjectRules$@543e593))
*/
sc.stop()
}
}
object SerializeTest_4 {
def main(args: Array[String]): Unit = {
val conf = SparkUtil.getSparkConf
val sc = new SparkContext(conf)
val lines = sc.parallelize(Array("xiaolv", "xiaohong", "xiaoming"))
//該物件在Driver中創建單例物件
//map方法中的函式是在Executor的某個Task中執行的
val res = lines.map(x => {
val hostname = InetAddress.getLocalHost.getHostName
val threadName = Thread.currentThread().getName
/*
不用在Driver端去創建物件,Rules不用實作序列化
*/
(hostname, threadName, ObjectRules.rulesMap.getOrElse(x, 0), ObjectRules.toString)
})
println(res.collect.toBuffer)
/*
ArrayBuffer(
(localhost,Executor task launch worker for task 0,0,cn.qf.streaming.day01.test.ObjectRules$@2539fca6),
(localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.ObjectRules$@2539fca6),
(localhost,Executor task launch worker for task 1,0,cn.qf.streaming.day01.test.ObjectRules$@2539fca6))
*/
sc.stop()
}
}
7 磁區
textFile分片程序:由指定的cpu核數+指定的磁區數+block塊的大小+檔案的個數,經過分片演算法得到最終的磁區數
8 RDD的依賴關系
|->寬依賴:一對多 一個父RDD磁區會被多個子RDD使用
|->窄依賴:一對一,多對一
|->為什么區分寬窄依賴:
|->1:有寬窄依賴就可以進行相應的容錯
|->2:寬依賴決定了stage的劃分的依據
9 DAG
為什么劃分stage:主要是為了生成task,stage劃分程序實際上就將rdd的依賴按照shuffle來分為一個到多個的范圍,task執行程序根本不會跨stage
task數量 = stage數量 * 磁區數(注:前提是沒有手動更改磁區數)
如果手動更改磁區數,該stage的task資料由最后的磁區數決定的
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/185256.html
標籤:Scala
