Spark調優
spark調優常見手段,在生產中常常會遇到各種各樣的問題,有事前原因,有事中原因,也有不規范原因,spark調優總結下來可以從下面幾個點來調優,
1. 分配更多的資源
分配更多的資源:
它是性能優化調優的王道,就是增加和分配更多的資源,這對于性能和速度上的提升是顯而易見的,
基本上,在一定范圍之內,增加資源與性能的提升,是成正比的;寫完了一個復雜的spark作業之后,進行性能調優的時候,首先第一步,就是要來調節最優的資源配置;
在這個基礎之上,如果說你的spark作業,能夠分配的資源達到了你的能力范圍的頂端之后,無法再分配更多的資源了,公司資源有限;那么才是考慮去做后面的這些性能調優的點,
相關問題:
(1)分配哪些資源?
(2)在哪里可以設定這些資源?
(3)剖析為什么分配這些資源之后,性能可以得到提升?
1.1 分配哪些資源
executor-memory、executor-cores、driver-memory
1.2 在哪里可以設定這些資源
在實際的生產環境中,提交spark任務時,使用spark-submit shell腳本,在里面調整對應的引數,
提交任務的腳本:
spark-submit \
--master spark://node1:7077 \
--class com.hoult.WordCount \
--num-executors 3 \ 配置executor的數量
--driver-memory 1g \ 配置driver的記憶體(影響不大)
--executor-memory 1g \ 配置每一個executor的記憶體大小
--executor-cores 3 \ 配置每一個executor的cpu個數
/export/servers/wordcount.jar
1.2 引數調節到多大,算是最大
- Standalone模式
先計算出公司spark集群上的所有資源 每臺節點的記憶體大小和cpu核數,
比如:一共有20臺worker節點,每臺節點8g記憶體,10個cpu,
實際任務在給定資源的時候,可以給20個executor、每個executor的記憶體8g、每個executor的使用的cpu個數10,
- Yarn模式
先計算出yarn集群的所有大小,比如一共500g記憶體,100個cpu;
這個時候可以分配的最大資源,比如給定50個executor、每個executor的記憶體大小10g,每個executor使用的cpu個數為2,
- 使用原則
在資源比較充足的情況下,盡可能的使用更多的計算資源,盡量去調節到最大的大小
1.3 為什么調大資源以后性能可以提升
--executor-memory
--total-executor-cores
2. 提高并行度
2.1 Spark的并行度指的是什么
spark作業中,各個stage的task的數量,也就代表了spark作業在各個階段stage的并行度!
當分配完所能分配的最大資源了,然后對應資源去調節程式的并行度,如果并行度沒有與資源相匹配,那么導致你分配下去的資源都浪費掉了,同時并行運行,還可以讓每個task要處理的數量變少(很簡單的原理,合理設定并行度,可以充分利用集群資源,減少每個task處理資料量,而增加性能加快運行速度,)
2.2 如何提高并行度
2.2.1 可以設定task的數量
至少設定成與spark Application 的總cpu core 數量相同,
最理想情況,150個core,分配150task,一起運行,差不多同一時間運行完畢
官方推薦,task數量,設定成spark Application 總cpu core數量的2~3倍 ,
比如150個cpu core ,基本設定task數量為300~500. 與理想情況不同的,有些task會運行快一點,比如50s就完了,有些task 可能會慢一點,要一分半才運行完,所以如果你的task數量,剛好設定的跟cpu core 數量相同,可能會導致資源的浪費,
因為比如150個task中10個先運行完了,剩余140個還在運行,但是這個時候,就有10個cpu core空閑出來了,導致浪費,如果設定2~3倍,那么一個task運行完以后,另外一個task馬上補上來,盡量讓cpu core不要空閑,同時盡量提升spark運行效率和速度,提升性能,
2.2.2 如何設定task數量來提高并行度
設定引數spark.default.parallelism
默認是沒有值的,如果設定了值為10,它會在shuffle的程序才會起作用,
比如 val rdd2 = rdd1.reduceByKey(_+_)
此時rdd2的磁區數就是10
可以通過在構建SparkConf物件的時候設定,例如:
new SparkConf().set("spark.defalut.parallelism","500")
2.2.3 給RDD重新設定partition的數量
使用rdd.repartition 來重新磁區,該方法會生成一個新的rdd,使其磁區數變大,
此時由于一個partition對應一個task,那么對應的task個數越多,通過這種方式也可以提高并行度,
2.2.4 提高sparksql運行的task數量
http://spark.apache.org/docs/2.3.3/sql-programming-guide.html
通過設定引數 spark.sql.shuffle.partitions=500 默認為200;
可以適當增大,來提高并行度, 比如設定為 spark.sql.shuffle.partitions=500
專門針對sparkSQL來設定的
3. RDD的重用和持久化
3.1 實際開發遇到的情況說明
如上圖所示的計算邏輯:
(1)當第一次使用rdd2做相應的算子操作得到rdd3的時候,就會從rdd1開始計算,先讀取HDFS上的檔案,然后對rdd1做對應的算子操作得到rdd2,再由rdd2計算之后得到rdd3,同樣為了計算得到rdd4,前面的邏輯會被重新計算,
(3)默認情況下多次對一個rdd執行算子操作,去獲取不同的rdd,都會對這個rdd及之前的父rdd全部重新計算一次,
這種情況在實際開發代碼的時候會經常遇到,但是我們一定要避免一個rdd重復計算多次,否則會導致性能急劇降低,
總結:可以把多次使用到的rdd,也就是公共rdd進行持久化,避免后續需要,再次重新計算,提升效率,
3.2 如何對rdd進行持久化
- 可以呼叫rdd的cache或者persist方法,
(1)cache方法默認是把資料持久化到記憶體中 ,例如:rdd.cache ,其本質還是呼叫了persist方法
(2)persist方法中有豐富的快取級別,這些快取級別都定義在StorageLevel這個object中,可以結合實際的應用場景合理的設定快取級別,例如: rdd.persist(StorageLevel.MEMORY_ONLY),這是cache方法的實作,
3.3 rdd持久化的時可以采用序列化
(1)如果正常將資料持久化在記憶體中,那么可能會導致記憶體的占用過大,這樣的話,也許會導致OOM記憶體溢位,
(2)當純記憶體無法支撐公共RDD資料完全存放的時候,就優先考慮使用序列化的方式在純記憶體中存盤,將RDD的每個partition的資料,序列化成一個位元組陣列;序列化后,大大減少記憶體的空間占用,
(3)序列化的方式,唯一的缺點就是,在獲取資料的時候,需要反序列化,但是可以減少占用的空間和便于網路傳輸
(4)如果序列化純記憶體方式,還是導致OOM,記憶體溢位;就只能考慮磁盤的方式,記憶體+磁盤的普通方式(無序列化),
(5)為了資料的高可靠性,而且記憶體充足,可以使用雙副本機制,進行持久化
持久化的雙副本機制,持久化后的一個副本,因為機器宕機了,副本丟了,就還是得重新計算一次;
持久化的每個資料單元,存盤一份副本,放在其他節點上面,從而進行容錯;
一個副本丟了,不用重新計算,還可以使用另外一份副本,這種方式,僅僅針對你的記憶體資源極度充足,
比如: StorageLevel.MEMORY_ONLY_2
4. 廣播變數的使用
4.1 場景描述
在實際作業中可能會遇到這樣的情況,由于要處理的資料量非常大,這個時候可能會在一個stage中出現大量的task,比如有1000個task,這些task都需要一份相同的資料來處理業務,這份資料的大小為100M,該資料會拷貝1000份副本,通過網路傳輸到各個task中去,給task使用,這里會涉及大量的網路傳輸開銷,同時至少需要的記憶體為1000*100M=100G,這個記憶體開銷是非常大的,不必要的記憶體的消耗和占用,就導致了你在進行RDD持久化到記憶體,也許就沒法完全在記憶體中放下;就只能寫入磁盤,最后導致后續的操作在磁盤IO上消耗性能;這對于spark任務處理來說就是一場災難,
由于記憶體開銷比較大,task在創建物件的時候,可能會出現堆記憶體放不下所有物件,就會導致頻繁的垃圾回收器的回收GC,GC的時候一定是會導致作業執行緒停止,也就是導致Spark暫停作業那么一點時間,頻繁GC的話,對Spark作業的運行的速度會有相當可觀的影響,
4.2 廣播變數引入
Spark中分布式執行的代碼需要傳遞到各個executor的task上運行,對于一些只讀、固定的資料,每次都需要Driver廣播到各個Task上,這樣效率低下,廣播變數允許將變數只廣播給各個executor,該executor上的各個task再從所在節點的BlockManager(負責管理某個executor對應的記憶體和磁盤上的資料)獲取變數,而不是從Driver獲取變數,從而提升了效率,
廣播變數,初始的時候,就在Drvier上有一份副本,通過在Driver把共享資料轉換成廣播變數,
task在運行的時候,想要使用廣播變數中的資料,此時首先會在自己本地的Executor對應的BlockManager中,嘗試獲取變數副本;如果本地沒有,那么就從Driver遠程拉取廣播變數副本,并保存在本地的BlockManager中;
此后這個executor上的task,都會直接使用本地的BlockManager中的副本,那么這個時候所有該executor中的task都會使用這個廣播變數的副本,也就是說一個executor只需要在第一個task啟動時,獲得一份廣播變數資料,之后的task都從本節點的BlockManager中獲取相關資料,
executor的BlockManager除了從driver上拉取,也可能從其他節點的BlockManager上拉取變數副本,網路距離越近越好,
4.3 使用廣播變數后的性能分析
比如一個任務需要50個executor,1000個task,共享資料為100M,
(1)在不使用廣播變數的情況下,1000個task,就需要該共享資料的1000個副本,也就是說有1000份數需要大量的網路傳輸和記憶體開銷存盤,耗費的記憶體大小1000*100=100G.
(2)使用了廣播變數后,50個executor就只需要50個副本資料,而且不一定都是從Driver傳輸到每個節點,還可能是就近從最近的節點的executor的blockmanager上拉取廣播變數副本,網路傳輸速度大大增加;記憶體開銷 50*100M=5G
總結:
不使用廣播變數的記憶體開銷為100G,使用后的記憶體開銷5G,這里就相差了20倍左右的網路傳輸性能損耗和記憶體開銷,使用廣播變數后對于性能的提升和影響,還是很可觀的,
廣播變數的使用不一定會對性能產生決定性的作用,比如運行30分鐘的spark作業,可能做了廣播變數以后,速度快了2分鐘,或者5分鐘,但是一點一滴的調優,積少成多,最后還是會有效果的,
4.4 廣播變數使用注意事項
(1)能不能將一個RDD使用廣播變數廣播出去?
不能,因為RDD是不存盤資料的,可以將RDD的結果廣播出去,
(2)廣播變數只能在Driver端定義,不能在Executor端定義,
(3)在Driver端可以修改廣播變數的值,在Executor端無法修改廣播變數的值,
(4)如果executor端用到了Driver的變數,如果不使用廣播變數在Executor有多少task就有多少Driver端的變數副本,
(5)如果Executor端用到了Driver的變數,如果使用廣播變數在每個Executor中只有一份Driver端的變數副本,
4.5 如何使用廣播變數
- 例如
(1) 通過sparkContext的broadcast方法把資料轉換成廣播變數,型別為Broadcast,
val broadcastArray: Broadcast[Array[Int]] = sc.broadcast(Array(1,2,3,4,5,6))
(2) 然后executor上的BlockManager就可以拉取該廣播變數的副本獲取具體的資料,
獲取廣播變數中的值可以通過呼叫其value方法
val array: Array[Int] = broadcastArray.value
5. 盡量避免使用shuffle類算子
5.1 shuffle描述
spark中的shuffle涉及到資料要進行大量的網路傳輸,下游階段的task任務需要通過網路拉取上階段task的輸出資料,shuffle程序,簡單來說,就是將分布在集群中多個節點上的同一個key,拉取到同一個節點上,進行聚合或join等操作,比如reduceByKey、join等算子,都會觸發shuffle操作,
如果有可能的話,要盡量避免使用shuffle類算子,
因為Spark作業運行程序中,最消耗性能的地方就是shuffle程序,
5.2 哪些算子操作會產生shuffle
spark程式在開發的程序中使用reduceByKey、join、distinct、repartition等算子操作,這里都會產生shuffle,由于shuffle這一塊是非常耗費性能的,實際開發中盡量使用map類的非shuffle算子,這樣的話,沒有shuffle操作或者僅有較少shuffle操作的Spark作業,可以大大減少性能開銷,
5.3 如何避免產生shuffle
- 小案例
//錯誤的做法:
// 傳統的join操作會導致shuffle操作,
// 因為兩個RDD中,相同的key都需要通過網路拉取到一個節點上,由一個task進行join操作,
val rdd3 = rdd1.join(rdd2)
//正確的做法:
// Broadcast+map的join操作,不會導致shuffle操作,
// 使用Broadcast將一個資料量較小的RDD作為廣播變數,
val rdd2Data = https://www.cnblogs.com/hulichao/archive/2020/12/20/rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
// 在rdd1.map算子中,可以從rdd2DataBroadcast中,獲取rdd2的所有資料,
// 然后進行遍歷,如果發現rdd2中某條資料的key與rdd1的當前資料的key是相同的,那么就判定可以進行join,
// 此時就可以根據自己需要的方式,將rdd1當前資料與rdd2中可以連接的資料,拼接在一起(String或Tuple),
val rdd3 = rdd1.map(rdd2DataBroadcast...)
// 注意,以上操作,建議僅僅在rdd2的資料量比較少(比如幾百M,或者一兩G)的情況下使用,
// 因為每個Executor的記憶體中,都會駐留一份rdd2的全量資料,
5.4 使用map-side預聚合的shuffle操作
- map-side預聚合
如果因為業務需要,一定要使用shuffle操作,無法用map類的算子來替代,那么盡量使用可以map-side預聚合的算子,
所謂的map-side預聚合,說的是在每個節點本地對相同的key進行一次聚合操作,類似于MapReduce中的本地combiner,
map-side預聚合之后,每個節點本地就只會有一條相同的key,因為多條相同的key都被聚合起來了,其他節點在拉取所有節點上的相同key時,就會大大減少需要拉取的資料數量,從而也就減少了磁盤IO以及網路傳輸開銷,
通常來說,在可能的情況下,建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子,因為reduceByKey和aggregateByKey算子都會使用用戶自定義的函式對每個節點本地的相同key進行預聚合,
而groupByKey算子是不會進行預聚合的,全量的資料會在集群的各個節點之間分發和傳輸,性能相對來說比較差,
比如如下兩幅圖,就是典型的例子,分別基于reduceByKey和groupByKey進行單詞計數,其中第一張圖是groupByKey的原理圖,可以看到,沒有進行任何本地聚合時,所有資料都會在集群節點之間傳輸;第二張圖是reduceByKey的原理圖,可以看到,每個節點本地的相同key資料,都進行了預聚合,然后才傳輸到其他節點上進行全域聚合,
-
groupByKey進行單詞計數原理
-
reduceByKey單詞計數原理
6. 使用高性能的算子
6.1 使用reduceByKey/aggregateByKey替代groupByKey
-
reduceByKey/aggregateByKey 可以進行預聚合操作,減少資料的傳輸量,提升性能
-
groupByKey 不會進行預聚合操作,進行資料的全量拉取,性能比較低
6.2 使用mapPartitions替代普通map
mapPartitions類的算子,一次函式呼叫會處理一個partition所有的資料,而不是一次函式呼叫處理一條,性能相對來說會高一些,
但是有的時候,使用mapPartitions會出現OOM(記憶體溢位)的問題,因為單次函式呼叫就要處理掉一個partition所有的資料,如果記憶體不夠,垃圾回收時是無法回收掉太多物件的,很可能出現OOM例外,所以使用這類操作時要慎重!
6.3 使用foreachPartition替代foreach
原理類似于“使用mapPartitions替代map”,也是一次函式呼叫處理一個partition的所有資料,而不是一次函式呼叫處理一條資料,
在實踐中發現,foreachPartitions類的算子,對性能的提升還是很有幫助的,比如在foreach函式中,將RDD中所有資料寫MySQL,那么如果是普通的foreach算子,就會一條資料一條資料地寫,每次函式呼叫可能就會創建一個資料庫連接,此時就勢必會頻繁地創建和銷毀資料庫連接,性能是非常低下; 但是如果用foreachPartitions算子一次性處理一個partition的資料,那么對于每個partition,只要創建一個資料庫連接即可,然后執行批量插入操作,此時性能是比較高的,實踐中發現,對于1萬條左右的資料量寫MySQL,性能可以提升30%以上,
6.4 使用filter之后進行coalesce操作
通常對一個RDD執行filter算子過濾掉RDD中較多資料后(比如30%以上的資料),建議使用coalesce算子,手動減少RDD的partition數量,將RDD中的資料壓縮到更少的partition中去,
因為filter之后,RDD的每個partition中都會有很多資料被過濾掉,此時如果照常進行后續的計算,其實每個task處理的partition中的資料量并不是很多,有一點資源浪費,而且此時處理的task越多,可能速度反而越慢,
因此用coalesce減少partition數量,將RDD中的資料壓縮到更少的partition之后,只要使用更少的task即可處理完所有的partition,在某些場景下,對于性能的提升會有一定的幫助,
6.5 使用repartitionAndSortWithinPartitions替代repartition與sort類操作
repartitionAndSortWithinPartitions是Spark官網推薦的一個算子,官方建議,如果需要在repartition重磁區之后,還要進行排序,建議直接使用repartitionAndSortWithinPartitions算子,
因為該算子可以一邊進行重磁區的shuffle操作,一邊進行排序,shuffle與sort兩個操作同時進行,比先shuffle再sort來說,性能可能是要高的,
7. 使用Kryo優化序列化性能
7.1 spark序列化介紹
Spark在進行任務計算的時候,會涉及到資料跨行程的網路傳輸、資料的持久化,這個時候就需要對資料進行序列化,Spark默認采用Java的序列化器,默認java序列化的優缺點如下:
其好處:
處理起來方便,不需要我們手動做其他操作,只是在使用一個物件和變數的時候,需要實作Serializble介面,
其缺點:
默認的序列化機制的效率不高,序列化的速度比較慢;序列化以后的資料,占用的記憶體空間相對還是比較大,
Spark支持使用Kryo序列化機制,Kryo序列化機制,比默認的Java序列化機制,速度要快,序列化后的資料要更小,大概是Java序列化機制的1/10,所以Kryo序列化優化以后,可以讓網路傳輸的資料變少;在集群中耗費的記憶體資源大大減少,
7.2 Kryo序列化啟用后生效的地方
Kryo序列化機制,一旦啟用以后,會生效的幾個地方:
(1)算子函式中使用到的外部變數
算子中的外部變數可能來著與driver需要涉及到網路傳輸,就需要用到序列化,
最終可以優化網路傳輸的性能,優化集群中記憶體的占用和消耗
(2)持久化RDD時進行序列化,StorageLevel.MEMORY_ONLY_SER
將rdd持久化時,對應的存盤級別里,需要用到序列化,
最終可以優化記憶體的占用和消耗;持久化RDD占用的記憶體越少,task執行的時候,創建的物件,就不至于頻繁的占滿記憶體,頻繁發生GC,
(3) 產生shuffle的地方,也就是寬依賴
下游的stage中的task,拉取上游stage中的task產生的結果資料,跨網路傳輸,需要用到序列化,最終可以優化網路傳輸的性能
7.3 如何開啟Kryo序列化機制
// 創建SparkConf物件,
val conf = new SparkConf().setMaster(...).setAppName(...)
// 設定序列化器為KryoSerializer,
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注冊要序列化的自定義型別,
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
8. 使用fastutil優化資料格式
8.1 fastutil介紹
fastutil是擴展了Java標準集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的類別庫,提供了特殊型別的map、set、list和queue;
fastutil能夠提供更小的記憶體占用,更快的存取速度;我們使用fastutil提供的集合類,來替代自己平時使用的JDK的原生的Map、List、Set.
8.2 fastutil好處
fastutil集合類,可以減小記憶體的占用,并且在進行集合的遍歷、根據索引(或者key)獲取元素的值和設定元素的值的時候,提供更快的存取速度
8.3 Spark中應用fastutil的場景和使用
8.3.1 算子函式使用了外部變數
(1)你可以使用Broadcast廣播變數優化;
(2)可以使用Kryo序列化類別庫,提升序列化性能和效率;
(3)如果外部變數是某種比較大的集合,那么可以考慮使用fastutil改寫外部變數;
首先從源頭上就減少記憶體的占用(fastutil),通過廣播變數進一步減少記憶體占用,再通過Kryo序列化類別庫進一步減少記憶體占用,
8.3.2 算子函式里使用了比較大的集合Map/List
在你的算子函式里,也就是task要執行的計算邏輯里面,如果有邏輯中,出現,要創建比較大的Map、List等集合,
可能會占用較大的記憶體空間,而且可能涉及到消耗性能的遍歷、存取等集合操作;
那么此時,可以考慮將這些集合型別使用fastutil類別庫重寫,
使用了fastutil集合類以后,就可以在一定程度上,減少task創建出來的集合型別的記憶體占用,
避免executor記憶體頻繁占滿,頻繁喚起GC,導致性能下降,
8.3.3 fastutil的使用
第一步:在pom.xml中參考fastutil的包
<dependency>
<groupId>fastutil</groupId>
<artifactId>fastutil</artifactId>
<version>5.0.9</version>
</dependency>
第二步:平時使用List (Integer)的替換成IntList即可,
List<Integer>的list對應的到fastutil就是IntList型別
使用說明:
基本都是類似于IntList的格式,前綴就是集合的元素型別;
特殊的就是Map,Int2IntMap,代表了key-value映射的元素型別,
9. 調節資料本地化等待時長
Spark在Driver上對Application的每一個stage的task進行分配之前,都會計算出每個task要計算的是哪個分片資料,RDD的某個partition;Spark的task分配演算法,優先會希望每個task正好分配到它要計算的資料所在的節點,這樣的話就不用在網路間傳輸資料;
但是通常來說,有時事與愿違,可能task沒有機會分配到它的資料所在的節點,為什么呢,可能那個節點的計算資源和計算能力都滿了;所以這種時候,通常來說,Spark會等待一段時間,默認情況下是3秒(不是絕對的,還有很多種情況,對不同的本地化級別,都會去等待),到最后實在是等待不了了,就會選擇一個比較差的本地化級別,比如說將task分配到距離要計算的資料所在節點比較近的一個節點,然后進行計算,
9.1 本地化級別
(1)PROCESS_LOCAL:行程本地化
代碼和資料在同一個行程中,也就是在同一個executor中;計算資料的task由executor執行,資料在executor的BlockManager中;性能最好
(2)NODE_LOCAL:節點本地化
代碼和資料在同一個節點中;比如說資料作為一個HDFS block塊,就在節點上,而task在節點上某個executor中運行;或者是資料和task在一個節點上的不同executor中;資料需要在行程間進行傳輸;性能其次
(3)RACK_LOCAL:機架本地化
資料和task在一個機架的兩個節點上;資料需要通過網路在節點之間進行傳輸; 性能比較差
(4) ANY:無限制
資料和task可能在集群中的任何地方,而且不在一個機架中;性能最差
9.2 資料本地化等待時長
spark.locality.wait,默認是3s
首先采用最佳的方式,等待3s后降級,還是不行,繼續降級...,最后還是不行,只能夠采用最差的,
9.3 如何調節引數并且測驗
修改spark.locality.wait引數,默認是3s,可以增加
下面是每個資料本地化級別的等待時間,默認都是跟spark.locality.wait時間相同,
默認都是3s(可查看spark官網對應引數說明,如下圖所示)
spark.locality.wait.node
spark.locality.wait.process
spark.locality.wait.rack
在代碼中設定:
new SparkConf().set("spark.locality.wait","10")
然后把程式提交到spark集群中運行,注意觀察日志,spark作業的運行日志,推薦大家在測驗的時候,先用client模式,在本地就直接可以看到比較全的日志,
日志里面會顯示,starting task .... PROCESS LOCAL、NODE LOCAL.....
例如:
Starting task 0.0 in stage 1.0 (TID 2, 192.168.200.102, partition 0, NODE_LOCAL, 5254 bytes)
觀察大部分task的資料本地化級別
如果大多都是PROCESS_LOCAL,那就不用調節了,如果是發現,好多的級別都是NODE_LOCAL、ANY,那么最好就去調節一下資料本地化的等待時長,應該是要反復調節,每次調節完以后,再來運行,觀察日志
看看大部分的task的本地化級別有沒有提升;看看整個spark作業的運行時間有沒有縮短,
注意注意:
在調節引數、運行任務的時候,別本末倒置,本地化級別倒是提升了, 但是因為大量的等待時長,spark作業的運行時間反而增加了,那就還是不要調節了,
10. 基于Spark記憶體模型調優
10.1 spark中executor記憶體劃分
-
Executor的記憶體主要分為三塊
-
第一塊是讓task執行我們自己撰寫的代碼時使用;
-
第二塊是讓task通過shuffle程序拉取了上一個stage的task的輸出后,進行聚合等操作時使用
-
第三塊是讓RDD快取時使用
-
10.2 spark的記憶體模型
在spark1.6版本以前 spark的executor使用的靜態記憶體模型,但是在spark1.6開始,多增加了一個統一記憶體模型,
通過spark.memory.useLegacyMode 這個引數去配置
默認這個值是false,代表用的是新的動態記憶體模型;
如果想用以前的靜態記憶體模型,那么就要把這個值改為true,
10.2.1 靜態記憶體模型
實際上就是把我們的一個executor分成了三部分,
一部分是Storage記憶體區域,
一部分是execution區域,
還有一部分是其他區域,如果使用的靜態記憶體模型,那么用這幾個引數去控制:
spark.storage.memoryFraction:默認0.6
spark.shuffle.memoryFraction:默認0.2
所以第三部分就是0.2
如果我們cache資料量比較大,或者是我們的廣播變數比較大,
那我們就把spark.storage.memoryFraction這個值調大一點,
但是如果我們代碼里面沒有廣播變數,也沒有cache,shuffle又比較多,那我們要把spark.shuffle.memoryFraction 這值調大,
- 靜態記憶體模型的缺點
我們配置好了Storage記憶體區域和execution區域后,我們的一個任務假設execution記憶體不夠用了,但是它的Storage記憶體區域是空閑的,兩個之間不能互相借用,不夠靈活,所以才出來我們新的統一記憶體模型,
10.2.2 統一記憶體模型
動態記憶體模型先是預留了300m記憶體,防止記憶體溢位,動態記憶體模型把整體記憶體分成了兩部分,
由這個引數表示spark.memory.fraction 這個指的默認值是0.6 代表另外的一部分是0.4,
然后spark.memory.fraction 這部分又劃分成為兩個小部分,這兩小部分共占整體記憶體的0.6 .這兩部分其實就是:Storage記憶體和execution記憶體,由spark.memory.storageFraction 這個引數去調配,因為兩個共占0.6,如果spark.memory.storageFraction這個值配的是0.5,那說明這0.6里面 storage占了0.5,也就是executor占了0.3 ,
- 統一記憶體模型有什么特點呢?
Storage記憶體和execution記憶體 可以相互借用,不用像靜態記憶體模型那樣死板,但是是有規則的
為什么受傷的都是storage呢?
是因為execution里面的資料是馬上就要用的,而storage里的資料不一定馬上就要用,
10.2.3 任務提交腳本參考
- 以下是一份spark-submit命令的示例,大家可以參考一下,并根據自己的實際情況進行調節
bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \
10.2.4 個人經驗
java.lang.OutOfMemoryError
ExecutorLostFailure
Executor exit code 為143
executor lost
hearbeat time out
shuffle file lost
如果遇到以上問題,很有可能就是記憶體除了問題,可以先嘗試增加記憶體,如果還是解決不了,那么請聽下一次資料傾斜調優的課,
吳邪,小三爺,混跡于后臺,大資料,人工智能領域的小菜鳥,
更多請關注

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/237943.html
標籤:其他
上一篇:VMware 安裝Linux系統
