Spark介紹
什么是Spark
- 專為大規模資料處理而設計的快速通用的計算引擎
- 類 Hadoop MapReduce 的通用并行計算框架
- 擁有 Hadoop MapReduce 所具有的優點
- 但不同于 MapReduce 的是 Job 中間輸出結果可以快取在記憶體中,從而不再需要讀寫 HDFS ,減少磁盤資料互動
- 因此 Spark 能更好地適用于資料挖掘與機器學習等需要迭代的演算法
- Spark 是 Scala 撰寫,方便快速編程
Spark與MR的區別
- 都是分布式計算框架, Spark 計算中間結果基于記憶體快取, MapReduce 基于 HDFS 存盤
- 也正因此,Spark 處理資料的能力一般是 MR 的三到五倍以上
- Spark 中除了基于記憶體計算這一個計算快的原因,還有 DAG(DAGShecdule) 有向無環圖來切分任務的執行先后順序
Spark API
- 多種編程語言的支持: Scala,Java,Python,R,SQL
Spark運行模式
-
Local
- 多用于本地測驗,如在 eclipse , idea 中寫程式測驗等,
-
Standalone
- Standalone 是 Spark 自帶的一個資源調度框架,它支持完全分布式,
-
Yarn
- Hadoop 生態圈里面的一個資源調度框架, Spark 也是可以基于 Yarn 來計算的,
-
Mesos
- 資源調度框架,
SparkCore
Partition
-
概念
- Spark RDD 是一種分布式的資料集,由于資料量很大,因此要它被切分并存盤在各個結點的磁區當中
- RDD(Resilient Distributed Dataset)是其最基本的抽象資料集,其中每個RDD是由若干個Partition組成
-
磁區方式
-
HashPartitioner(哈希磁區)
-
Hash磁區
-
HashPartitioner采用哈希的方式對<Key,Value>鍵值對資料進行磁區,
-
其資料磁區規則為 partitionId = Key.hashCode % numPartitions
- partitionId代表該Key對應的鍵值對資料應當分配到的Partition標識
- Key.hashCode表示該Key的哈希值
- numPartitions表示包含的Partition個數,
-
-
RangePartitioner(范圍磁區)
- 范圍磁區
- Spark引入RangePartitioner的目的是為了解決HashPartitioner所帶來的磁區傾斜問題,也即磁區中包含的資料量不均衡問題,
- HashPartitioner采用哈希的方式將同一型別的Key分配到同一個Partition中,當某幾種型別資料量較多時,就會造成若干Partition中包含的資料過大
- 在Job執行程序中,一個Partition對應一個Task,此時就會使得某幾個Task運行過慢,
- RangePartitioner基于抽樣的思想來對資料進行磁區
-
-
HDFS與Partition
-
block
- hdfs中的block是分布式存盤的最小單元
- 類似于盛放檔案的盒子,一個檔案可能要占多個盒子,但一個盒子里的內容只可能來自同一份檔案
-
partition
- spark中的partition 是彈性分布式資料集RDD的最小單元
- RDD是由分布在各個節點上的partition組成的
- spark在計算程序中,生成的資料在計算空間內最小單元
-
區別
- block位于存盤空間、partition 位于計算空間,
- block的大小是固定的、partition 大小是不固定的,
- block是有冗余的、不會輕易丟失,partition(RDD)沒有冗余設計、丟失之后重新計算得到.
-
Spark從HDFS讀入檔案的磁區數默認等于HDFS檔案的塊數(blocks),HDFS中的block是分布式存盤的最小單元
-
RDD
-
名詞解釋
- RDD(Resilient Distributed Dataset) 彈性分布式資料集,
-
RDD五大屬性
- 上圖
-
RDD流程圖
-
注意
-
textFile 方法底層封裝的是 MR 讀取檔案的方式,讀取檔案之前先進行 split 切片,默認 split大小是一個 block 大小,
-
RDD 實際上不存盤資料,這里方便理解,暫時理解為存盤資料,
-
什么是 K,V格式的RDD ?
- 如果 RDD 里面存盤的資料都是二元組物件,那么這個 RDD 我們就叫做 K,V格式的RDD
-
哪里體現 RDD 的彈性(容錯)?
- partition 數量,大小沒有限制,體現了 RDD 的彈性,
- RDD 之間依賴關系,可以基于上一個 RDD 重新計算出 RDD
-
哪里體現 RDD 的分布式?
- RDD 是由 Partition 組成, partition 是分布在不同節點上的,
- RDD 提供計算最佳位置,體現了資料本地化,體現了大資料中“計算移動資料不移動”的理念,
-
-
Lineage血統
- RDD 的最重要的特性之一就是血緣關系(Lineage ),它描述了一個 RDD 是如何從父 RDD 計算得來的
- 如果某個 RDD 丟失了,則可以根據血緣關系,從父 RDD 計算得來
系統架構
- Master ( standalone 模式):資源管理的主節點(行程),
- Cluster Manager :在集群上獲取資源的外部服務(例如: standalone ; yarn ; mesos ),
- Worker ( standalone 模式):資源管理的從節點(行程)或者說是是管理本機資源的行程,
- Application :基于 Spark 的用戶程式,包含 driver 程式和運行在集群上的 executor 程式,即一個完整的 spark 應用 ,
- Dirver ( program ):用來連接作業行程( worker )的程式 ,
- Executor :是在一個 worker 行程所管理的節點上為某 Application 啟動的一個個行程,這個行程負責運行任務,并且負責將資料存在記憶體或者磁盤上,每個應用之間都有各自獨立的executors ,
- Task :被發送到 executor 上的作業單元,
- Job :包含很多任務( Task )的并行計算,和 action 算子對應,
- Stage :一個 job 會被拆分成很多組任務,每組任務被稱為 Stage (就像 MapReduce 分為MapTask 和 ReduceTask 一樣),
算子(單檔案)
轉換算子
-
概念
- Transformations 類算子叫做轉換算子(本質就是函式), Transformations 算子是延遲執行,也叫懶加載執行,
-
常見 Transformation 類算子
-
filter :過濾符合條件的記錄數, true 保留, false 過濾掉,
-
map :將一個 RDD 中的每個資料項,通過 map 中的函式映射變為一個新的元素,特點:輸入一條,輸出一條資料,
-
flatMap :先 map 后 flat ,與 map 類似,每個輸入項可以映射為0到多個輸出項,
-
sample 隨機抽樣算子,根據傳進去的小數按比例進行有放回或者無放回的抽樣,
-
reduceByKey 將相同的 Key 根據相應的邏輯進行處理,
-
reduceByKey有一個操作combiner合并
當有多個磁區的時候會先將每個磁區先進行reduceByKey再將結果傳出
所以在TOPN練習中reduceByKey((x,y))按理說x為sum總數,y為1但當有多個磁區的時候y就會等于該磁區的累加和
-
-
sortByKey / sortBy 作用在 K,V格式的RDD 上,對 key 進行升序或者降序排序,
-
行動算子
-
概念
- Action 類算子叫做行動算子, Action 類算子是觸發執行,
- 一個 application 應用程式中有幾個 Action 類算子執行,就有幾個 job 運行,
-
常見 Action 類算子
- count :回傳資料集中的元素數,會在結果計算完成后回收到 Driver 端,
- take(n) :回傳一個包含資料集前 n 個元素的集合,
- first :效果等同于 take(1) ,回傳資料集中的第一個元素,
- foreach :回圈遍歷資料集中的每個元素,運行相應的邏輯,
- collect :將計算結果回收到 Driver 端,
控制算子
-
概念
- 將 RDD 持久化,持久化的單位是 partition ,
- 控制算子有三種, cache , persist , checkpoint , cache 和 persist 都是懶執行的,必須有一個 action 類算子觸發執行,
- checkpoint 算子不僅能將 RDD 持久化到磁盤,還能切斷 RDD 之間的依賴關系,
-
cache
- 默認將 RDD 的資料持久化到記憶體中, cache 是懶執行,
- cache() = persist() = persist(StorageLevel.Memory_Only)
- rdd.cache().count() 回傳的不是持久化的RDD,而是一個count的數值
-
persist
-
可以指定持久化的級別,最常用的是 MEMORY_ONLY 和 MEMORY_AND_DISK ,
-
持久化級別如下
-
cache是persist的特例,MEMORY_ONLY就是cache
-
-
checkpoint
-
checkpoint 將 RDD 持久化到磁盤,還可以切斷 RDD 之間的依賴關系,也是懶執行,
-
執行原理
- 當 RDD 的 job 執行完畢后,會從 finalRDD 從后往前回溯,
- 當回溯到某一個 RDD 呼叫了 checkpoint 方法,會對當前的 RDD 做一個標記,
- Spark 框架會自動啟動一個新的 job ,重新計算這個 RDD 的資料,將資料持久化到Checkpint目錄中,
-
使用 checkpoint 時常用優化手段
- 對 RDD 執行 checkpoint 之前,最好對這個 RDD 先執行 cache
- 這樣新啟動的 job 只需要將記憶體中的資料拷貝到Checkpint目錄中就可以,省去了重新計算這一步,
- 但是缺點是:多花費一倍的記憶體空間
-
任務提交方式
Standalone-client
-
提交命令
-
執行流程
- client 模式提交任務后,會在客戶端啟動 Driver 行程,
- Driver 會向 Master 申請啟動 Application 啟動的資源,資源申請成功,
- Driver 端將 task 分發到 worker 端執行,啟動 executor 行程(任務的分發),
- Worker 端( exectuor 行程)將 task 執行結果回傳到 Driver 端(任務結果的回收)
-
總結
- client 模式適用于測驗除錯程式, Driver 行程是在客戶端啟動的,這里的客戶端就是指提交應用程式的當前節點,在 Driver 端可以看到 task 執行的情況,
- 生產環境下不能使用 client 模式,是因為:假設要提交100個 application 到集群運行,Driver 每次都會在 client 端啟動,那么就會導致客戶端100次網卡流量暴增的問題,
Standalone-cluster
-
提交命令
-
執行流程
- cluster 模式提交應用程式后,會向 Master 請求啟動 Driver ,
- Master 接受請求,隨機在集群一臺節點啟動 Driver 行程,
- Driver 啟動后為當前的應用程式申請資源,
- Driver 端發送 task 到 worker 節點上執行(任務的分發),
- worker 上的 executor 行程將執行情況和執行結果回傳給 Driver 端(任務結果的回收),
-
總結
-
Standalone-cluster 提交方式,應用程式使用的所有 jar 包和檔案,必須保證所有的
worker 節點都要有,因為此種方式, spark 不會自動上傳包 -
解決方案
- 將所有的依賴包和檔案打到同一個包中,然后放在 hdfs 上,
- 將所有的依賴包和檔案各放一份在 worker 節點上,
-
yarn-client
-
提交命令
-
執行流程
- 客戶端提交一個 Application ,在客戶端啟動一個 Driver 行程,
- 應用程式啟動后會向 RS ( ResourceManager )(相當于 standalone 模式下的 master 行程)發送請求,啟動 AM ( ApplicationMaster ),
- RS 收到請求,隨機選擇一臺 NM ( NodeManager )啟動 AM ,這里的 NM 相當于 Standalone 中的 Worker 行程,
- AM 啟動后,會向 RS 請求一批 container 資源,用于啟動 Executor ,
- RS 會找到一批 NM (包含 container )回傳給 AM ,用于啟動 Executor ,
- AM 會向 NM 發送命令啟動 Executor ,
- Executor 啟動后,會反向注冊給 Driver , Driver 發送 task 到 Executor ,執行情況和結果回傳給 Driver 端,
-
總結
-
Yarn-client 模式同樣是適用于測驗,因為 Driver 運行在本地, Driver 會與 yarn 集群中的 Executor 進行大量的通信
-
ApplicationMaster (executorLauncher)的在此模式中的作用:
- 為當前的 Application 申請資源
- 給 NodeManager 發送訊息啟動 Executor ,
- 注意: ApplicationMaster 在此種模式下沒有作業調度的功能,
-
yarn-cluster
-
提交命令
-
執行流程
- 客戶機提交 Application 應用程式,發送請求到 RS ( ResourceManager ),請求啟動AM ( ApplicationMaster ),
- RS 收到請求后隨機在一臺 NM ( NodeManager )上啟動 AM (相當于 Driver 端),
- AM 啟動, AM 發送請求到 RS ,請求一批 container 用于啟動 Excutor ,
- RS 回傳一批 NM 節點給 AM ,
- AM 連接到 NM ,發送請求到 NM 啟動 Excutor ,
- Excutor 反向注冊到 AM 所在的節點的 Driver , Driver 發送 task 到 Excutor ,
-
總結
-
Yarn-Cluster 主要用于生產環境中因為 Driver 運行在 Yarn 集群中某一臺 nodeManager中,每次提交任務的 Driver 所在的機器都是不再是提交任務的客戶端機器,而是多個 NM 節點中的一臺不會產生某一臺機器網卡流量激增的現象但同樣也有缺點,任務提交后不能看到日志,只能通過 yarn 查看日志
-
ApplicationMaster 在此模式中的的作用
- 為當前的 Application 申請資源
- 給 NodeManger 發送訊息啟動 Executor ,
- 任務調度,
-
算子(多檔案)
轉換算子
-
轉換算子join
- leftOuterJoin
- rightOuterJoin
- fullOuterJoin
- 這些 join 都是作用在 K,V 格式的 RDD 上,根據 key 值進行連接,例如: (K,V)join(K,W)回傳(K,(V,W))
- 注意: join 后的磁區數與父RDD磁區數多的那一個相同,
-
union
- 合并兩個資料集,兩個資料集的型別要一致,
- 回傳新的 RDD 的磁區數是合并 RDD 磁區數的總和,
-
intersection
- 取兩個資料集的交集,
- 注意: join 后的磁區數與父RDD磁區數多的那一個相同,
-
subtract
- 取兩個資料集的差集,
- 新磁區數等于前面那個RDD的磁區數
-
mapPartitions
- mapPartition與 map 類似,單位是每個 partition 上的資料,
- Map遍歷的是每個元素
mapPartition是按磁區遍歷,進迭代器回傳迭代器 - 磁區數不變
-
distinct(map+reduceByKey+map)
- 對 RDD 內資料去重,
- 會對整個物件匹配,K,V都相同才去重
- 磁區數不變
-
cogroup
- 當呼叫型別 (K,V) 和 (K,W) 的資料上時,回傳一個資料集 (K,(Iterable,Iterable))
- 全外連接,相同K會放在一起,若沒有迭代器回傳NIL
- 新RDD磁區數為父RDD磁區數多的那一個
行動算子
-
foreachPartition
- 遍歷的資料是每個 partition 的資料,
- 引數為迭代器
窄依賴和寬依賴
窄依賴
- 父 RDD 和子 RDD 的 partition 之間的關系是一對一的,或者父 RDD 和子 RDD 的 partition 關系是多對一的,不會有 shuffle 的產生
寬依賴
- 父 RDD 與子 RDD 的 partition 之間的關系是一對多,會有 shuffle 的產生,
寬窄依賴圖理解
Stage(階段)
圖解
簡介
-
stage
- 根據RDD依賴關系形成一個DAG有向無環圖
- 若父RDD和子RDD不需要shuffle(窄依賴)我們可以將它們連接在一起,減少資料的網路傳輸
-
pipeline
- 將窄依賴的RDD連接到一起,當前RDD鏈和其他RDD鏈不相關
- 子RDD鏈不必等父RDD全部執行完才開始執行
- 只需要等當前鏈的上一個task計算結果,當前task就可以執行
stage切割規則
-
從后往前,遇到寬依賴就切割 stage
- 1.從后向前推理,遇到寬依賴就斷開,遇到窄依賴就把當前的RDD加入到Stage中;
- 2.每個Stage里面的Task的數量是由該Stage中最后一個RDD的Partition數量決定的;
- 3.最后一個Stage里面的任務的型別是ResultTask,前面所有其他Stage里面的任務型別都是ShuffleMapTask;
- 4.代表當前Stage的算子一定是該Stage的最后一個計算步驟;
-
總結
- 由于spark中stage的劃分是根據shuffle來劃分的,而寬依賴必然有shuffle程序,因此可以說spark是根據寬窄依賴來劃分stage的,
stage計算模式
-
pipeline
-
管道計算模式, pipeline 只是一種計算思想、模式
-
在spark中pipeline是一個partition對應一個partition,所以在stage內部只有窄依賴
-
資料一直在管道里面什么時候資料會落地
- 對 RDD 進行持久化( cache , persist ),
- shuffle write 的時候,
-
-
Stage 的 task 并行度是由 stage 的最后一個 RDD 的磁區數來決定的
-
如何改變 RDD 的磁區數
- reduceByKey(XXX,3)
- GroupByKey(4)
- sc.textFile(path,numpartition)
-
-
使用算子時傳遞 磁區num引數 就是磁區 partition 的數量
Spark資源調度和任務調度
調度流程
-
啟動集群后, Worker 節點會向 Master 節點匯報資源情況, Master 掌握了集群資源情況,
-
當 Spark 提交一個 Application 后,根據 RDD 之間的依賴關系將 Application 形成一個 DAG 有向無環圖,
-
任務提交后, Spark 會在 Driver 端創建兩個物件: DAGScheduler 和 TaskScheduler ,DAGScheduler 是任務調度的高層調度器,是一個物件
-
DAGScheduler 的主要作用就是將 DAG 根據 RDD 之間的寬窄依賴關系劃分為一個個的 Stage ,然后將這些 Stage 以 TaskSet 的形式提交給 TaskScheduler ( TaskScheduler 是任務調度的低層調度器,這里 TaskSet 其實就是一個集合,里面封裝的就是一個個的 task 任務,也就是 stage 中的并行的 task 任務)
-
TaskSchedule 會遍歷 TaskSet 集合,拿到每個 task 后會將 task 發送到 Executor 中去執行(其實就是發送到 Executor 中的執行緒池 ThreadPool 去執行),
-
task 在 Executor 執行緒池中的運行情況會向 TaskScheduler 反饋,當 task 執行失敗時,則由TaskScheduler 負責重試,將 task 重新發送給 Executor 去執行,默認重試3次,如果重試3次依然失敗,那么這個 task 所在的 stage 就失敗了,
-
stage 失敗了則由 DAGScheduler 來負責重試,重新發送 TaskSet 到 TaskScheduler , Stage 默認重試4次,如果重試4次以后依然失敗,那么這個 job 就失敗了, job 失敗了, Application 就失敗了
-
TaskScheduler 不僅能重試失敗的 task ,還會重試 straggling (落后,緩慢) task ( 也就是執行速度比其他task慢太多的task ),如果有運行緩慢的 task 那么 TaskScheduler 會啟動一個新的task 來與這個運行緩慢的 task 執行相同的處理邏輯,兩個 task 哪個先執行完,就以哪個 task的執行結果為準,這就是 Spark 的推測執行機制,在 Spark 中推測執行默認是關閉的,推測執行可以通過 spark.speculation 屬性來配置
-
注意
- 對于 ETL 型別要入資料庫的業務要關閉推測執行機制,這樣就不會有重復的資料入庫,
- 如果遇到資料傾斜的情況,開啟推測執行則有可能導致一直會有 task 重新啟動處理相同的邏輯,任務可能一直處于處理不完的狀態,
-
流程圖解
粗細粒度資源申請
-
粗粒度資源申請(Spark)
- 在 Application 執行之前,將所有的資源申請完畢,當資源申請成功后,才會進行任務的調度,當所有的 task 執行完成后,才會釋放這部分資源,
- 優點:在 Application 執行之前,所有的資源都申請完畢,每一個 task 直接使用資源就可以了,不需要 task 在執行前自己去申請資源, task 啟動就快了, task 執行快了, stage 執行就快了,job 就快了, application 執行就快了,
- 缺點:直到最后一個 task 執行完成才會釋放資源,集群的資源無法充分利用,
-
細粒度資源申請(MR)
- Application 執行之前不需要先去申請資源,而是直接執行,讓 job 中的每一個 task 在執行前自己去申請資源, task 執行完成就釋放資源,
- 優點:集群的資源可以充分利用,
- 缺點: task 自己去申請資源, task 啟動變慢, Application 的運行就回應的變慢了,
算子(磁區)
轉換算子
-
mapPartitionsWithIndex
- 類似于 mapPartitions ,除此之外還會攜帶磁區的索引值,
- 引數為磁區索引和每個磁區的迭代器
-
repartition (重新磁區)
- 增加或減少磁區,此算子會產生 shuffle ,
-
coalesce
- 常用來減少磁區,算子中第二個引數是減少磁區的程序中是否產生 shuffle
- true 為產生 shuffle , false 不產生 shuffle ,默認是 false ,
- 如果 coalesce 設定的磁區數比原來的 RDD 的磁區數還多的話,第二個引數設定為 false 不會起作用(轉換之后磁區數大于之前),如果設定成 true ,效果和 repartition 一樣
- repartition(numPartitions) = coalesce(numPartitions,true)
-
groupByKey
- 作用在 K,V 格式的 RDD 上,根據 Key 進行分組,作用在 (K,V) ,回傳 (K,Iterable)
-
zip
- 合并RDD,將兩個 RDD 中的元素( KV格式/非KV格式 )變成一個 KV 格式的 RDD ,兩個 RDD 的個數必須相同,其中一個RDD的元素為K一個為V
-
zipWithIndex
- 該函式將 RDD 中的元素和這個元素在 RDD 中的索引號(從0開始)組合成 (K,V) 對
行動算子
-
countByKey
- 作用到 K,V 格式的 RDD 上,根據 Key 計數相同 Key 的資料集元素個數,
-
countByValue
- 根據資料集每個元素相同的內容來計數,回傳相同內容的元素對應的條數,
-
reduce
- 根據聚合邏輯聚合資料集中的每個元素,迭代
廣播變數和累加器
廣播變數
-
廣播變數理解圖
- 若每個Excutor中Task都需要用到某個變數,就需將變數傳到每一個Excutor中,則可使用廣播變數
-
廣播變數使用
-
注意事項
- 廣播變數只能在 Driver 端定義,不能在 Executor 端定義,
- 在 Driver 端可以修改廣播變數的值,在 Executor 端無法修改廣播變數的值,
累加器
-
累加器理解圖
- Driver端定義變數,將變數封裝到Task中發送到Executor,在Excutor端對變數進行累加,但dirver端變數的值并不會發生改變
- 可以用到累加器,Excutor端累加器加1,然后更新Driver端累加器
-
累加器的使用
-
注意事項
- 累加器在 Driver 端定義賦初始值,累加器只能在 Driver 端讀取,在 Excutor 端更新,
SparkShuffle
SparkShuffle概念
-
reduceByKey會將上一個RDD中的每一個key對應的所有value聚合成一個value,然后生成一個新的RDD,元素型別是<key,value>對的形式,這樣每一個key對應一個聚合起來的value
-
如何聚合
- Shuffle Write:上一個stage的每個map task就必須保證將自己處理的當前磁區的資料相同的key寫入一個磁區檔案中,可能會寫入多個不同的磁區檔案中,
- Shuffle Read:reduce task就會從上一個stage的所有task所在的機器上尋找屬于自己的那些磁區檔案,這樣就可以保證每一個key所對應的value都會匯聚到同一個節點上去處理和聚合,
-
Spark中有兩種Shuffle型別,HashShuffle和SortShuffle
- Spark1.2之前是HashShuffle
- Spark1.2引入SortShuffle
- spark2.0就只有sortshuffle,
HashShuffle
-
普通機制
-
執行流程
- 每一個map task將不同結果寫到不同的buffer中,每個buffer的大小為32K,buffer起到資料快取的作用,
- 每個buffer檔案最后對應一個磁盤小檔案,
- reduce task來拉取對應的磁盤小檔案,
-
總結
-
map task的計算結果會根據磁區器(默認是hashPartitioner)來決定寫入到哪一個磁盤小檔案中去,ReduceTask會去Map端拉取相應的磁盤小檔案,
-
產生的磁盤小檔案的個數:
- M(map task的個數)*R(reduce task的個數)
-
-
產生的磁盤小檔案過多,會導致以下問題
- 在Shuffle Write程序中會產生很多寫磁盤小檔案的物件,
- 在Shuffle Read程序中會產生很多讀取磁盤小檔案的物件,
- 在JVM堆記憶體中物件過多會造成頻繁的gc,gc還無法解決運行所需要的記憶體的話,就會OOM,
- 在資料傳輸程序中會有頻繁的網路通信,頻繁的網路通信出現通信故障的可能性大大增加,一旦網路通信出現了故障會導致shuffle file cannot find 由于這個錯誤導致的task失敗,TaskScheduler不負責重試,由DAGScheduler負責重試Stage,
-
-
合并機制
-
執行流程
- 合并機制就是復用buffer,開啟合并機制的配置是spark.shuffle.consolidateFiles,該引數默認值為false,將其設定為true即可開啟優化機制
- 在shuffle write程序中,task就不是為下游stage的每個task創建一個磁盤檔案了,此時會出現shuffleFileGroup的概念,每個shuffleFileGroup會對應一批磁盤檔案,磁盤檔案的數量與下游stage的task數量是相同的,一個Executor上有多少個CPU core,就可以并行執行多少個task,而第一批并行執行的每個task都會創建一個shuffleFileGroup,并將資料寫入對應的磁盤檔案內,
- 假設第一個stage有50個task,第二個stage有100個task,總共還是有10個Executor,每個Executor執行5個task,那么原本使用未經優化的HashShuffleManager時,每個Executor會產生500個磁盤檔案,所有Executor會產生5000個磁盤檔案的,但是此時經過優化之后,每個Executor創建的磁盤檔案的數量的計算公式為:CPU core的數量 * 下一個stage的task數量,也就是說,每個Executor此時只會創建100個磁盤檔案,所有Executor只會創建1000個磁盤檔案,
-
總結
- 產生磁盤小檔案的個數: C(core的個數)*R(reduce的個數)
-
SortShuffle
(類似MapReduce的shuffle)
-
普通機制
-
執行流程
- map task 的計算結果會寫入到一個記憶體資料結構里面,記憶體資料結構默認是 5M ,
- 在 shuffle 的時候會有一個定時器,不定期的去估算這個記憶體結構的大小,當記憶體結構中的資料超過 5M 時,比如現在記憶體結構中的資料為 5.01M ,那么他會申請 5.01*2-5=5.02M 記憶體給記憶體資料結構
- 如果申請成功不會進行溢寫,如果申請不成功,這時候會發生溢寫磁盤,
- 在溢寫之前記憶體結構中的資料會進行排序磁區
- 然后開始溢寫磁盤,寫磁盤是以 batch 的形式去寫(批量),一個 batch 是1萬條資料,
- map task 執行完成后,會將這些 磁盤小檔案 合并成一個大的磁盤檔案,同時生成一個 索引檔案 ,
- reduce task 去 map 端拉取資料的時候,首先決議索引檔案,根據索引檔案再去拉取對應的資料,
-
總結
- 產生磁盤小檔案的個數: 2*M(map task的個數)
-
-
bypass機制
-
觸發條件
- shuffle reduce task 的數量小于 spark.shuffle.sort.bypassMergeThreshold 的引數值,這個值默認是 200 ,
- 不需要進行 map 端的預聚合,比如 groupBykey , join ,
- 產生的磁盤小檔案為: 2*M(map task的個數) ,
-
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/379431.html
標籤:其他
上一篇:大資料運維---Linux安裝hadoop Hadoop HA集群部署
下一篇:大資料之Kafka看這一篇就夠了
