1.RDD快取機制 cache, persist
Spark 速度非常快的一個原因是 RDD 支持快取,成功快取后,如果之后的操作使用到了該資料集,則直接從快取中獲取,雖然快取也有丟失的風險,但是由于 RDD 之間的依賴關系,如果某個磁區的快取資料丟失,只需要重新計算該磁區即可,
涉及到的算子:persist、cache、unpersist;都是 Transformation
快取是將計算結果寫入不同的介質,用戶定義可定義存盤級別(存盤級別定義了快取存盤的介質,目前支持記憶體、堆
外記憶體、磁盤);
通過快取,Spark避免了RDD上的重復計算,能夠極大地提升計算速度;
RDD持久化或快取,是Spark最重要的特征之一,可以說,快取是Spark構建迭代式演算法和快速互動式查詢的關鍵因
素;
Spark速度非常快的原因之一,就是在記憶體中持久化(或快取)一個資料集,當持久化一個RDD后,每一個節點都將
把計算的分片結果保存在記憶體中,并在對此資料集(或者衍生出的資料集)進行的其他動作(Action)中重用,這使
得后續的動作變得更加迅速;使用persist()方法對一個RDD標記為持久化,之所以說“標記為持久化”,是因為出現persist()陳述句的地方,并不會馬
上計算生成RDD并把它持久化,而是要等到遇到第一個行動操作觸發真正計算以后,才會把計算結果進行持久化;通過persist()或cache()方法可以標記一個要被持久化的RDD,持久化被觸發,RDD將會被保留在計算節點的記憶體中
并重用;
什么時候快取資料,需要對空間和速度進行權衡,一般情況下,如果多個動作需要用到某個 RDD,而它的計算代價
又很高,那么就應該把這個 RDD 快取起來;
快取有可能丟失,或者存盤于記憶體的資料由于記憶體不足而被洗掉,RDD的快取的容錯機制保證了即使快取丟失也能保
證計算的正確執行,通過基于RDD的一系列的轉換,丟失的資料會被重算,RDD的各個Partition是相對獨立的,因此
只需要計算丟失的部分即可,并不需要重算全部Partition,
啟動堆外記憶體需要配置兩個引數:
- spark.memory.offHeap.enabled :是否開啟堆外記憶體,默認值為 false,需要設定為 true;
- spark.memory.offHeap.size : 堆外記憶體空間的大小,默認值為 0,需要設定為正值,
1.1 快取級別
Spark 速度非常快的一個原因是 RDD 支持快取,成功快取后,如果之后的操作使用到了該資料集,則直接從快取中獲取,雖然快取也有丟失的風險,但是由于 RDD 之間的依賴關系,如果某個磁區的快取資料丟失,只需要重新計算該磁區即可,


Spark 支持多種快取級別 :
| Storage Level(存盤級別) | Meaning(含義) |
|---|---|
MEMORY_ONLY |
默認的快取級別,將 RDD 以反序列化的 Java 物件的形式存盤在 JVM 中,如果記憶體空間不夠,則部分磁區資料將不再快取, |
MEMORY_AND_DISK |
將 RDD 以反序列化的 Java 物件的形式存盤 JVM 中,如果記憶體空間不夠,將未快取的磁區資料存盤到磁盤,在需要使用這些磁區時從磁盤讀取, |
MEMORY_ONLY_SER |
將 RDD 以序列化的 Java 物件的形式進行存盤(每個磁區為一個 byte 陣列),這種方式比反序列化物件節省存盤空間,但在讀取時會增加 CPU 的計算負擔,僅支持 Java 和 Scala , |
MEMORY_AND_DISK_SER |
類似于 MEMORY_ONLY_SER,但是溢位的磁區資料會存盤到磁盤,而不是在用到它們時重新計算,僅支持 Java 和 Scala, |
DISK_ONLY |
只在磁盤上快取 RDD |
MEMORY_ONLY_2, MEMORY_AND_DISK_2 |
與上面的對應級別功能相同,但是會為每個磁區在集群中的兩個節點上建立副本, |
OFF_HEAP |
與 MEMORY_ONLY_SER 類似,但將資料存盤在堆外記憶體中,這需要啟用堆外記憶體, |
啟動堆外記憶體需要配置兩個引數:
- spark.memory.offHeap.enabled :是否開啟堆外記憶體,默認值為 false,需要設定為 true;
- spark.memory.offHeap.size : 堆外記憶體空間的大小,默認值為 0,需要設定為正值,
1.2 使用快取
快取資料的方法有兩個:persist 和 cache ,cache 內部呼叫的也是 persist,它是 persist 的特殊化形式,等價于 persist(StorageLevel.MEMORY_ONLY),示例如下:
// 所有存盤級別均定義在 StorageLevel 物件中
fileRDD.persist(StorageLevel.MEMORY_AND_DISK)
fileRDD.cache()
被快取的RDD在DAG圖中有一個綠色的圓點,

1.3 移除快取
Spark 會自動監視每個節點上的快取使用情況,并按照最近最少使用(LRU)的規則洗掉舊資料磁區,當然,你也可以使用 RDD.unpersist() 方法進行手動洗掉,
2.RDD容錯機制Checkpoint
2.1 涉及到的算子:checkpoint;也是 Transformation
Spark中對于資料的保存除了持久化操作之外,還提供了檢查點的機制;檢查點本質是通過將RDD寫入高可靠的磁盤,主要目的是為了容錯,檢查點通過將資料寫入到HDFS檔案系統實作了
RDD的檢查點功能,Lineage過長會造成容錯成本過高,這樣就不如在中間階段做檢查點容錯,如果之后有節點出現問題而丟失磁區,從
做檢查點的RDD開始重做Lineage,就會減少開銷,
2.2 cache 和 checkpoint 區別
cache 和 checkpoint 是有顯著區別的,快取把 RDD 計算出來然后放在記憶體中,但是 RDD 的依賴鏈不能丟掉, 當某個點某個 executor 宕了,上面 cache 的RDD就會丟掉, 需要通過依賴鏈重放計算,不同的是,checkpoint 是把
RDD 保存在 HDFS中,是多副本可靠存盤,此時依賴鏈可以丟掉,所以斬斷了依賴鏈,
2.3 checkpoint適合場景
以下場景適合使用檢查點機制:
-
DAG中的Lineage過長,如果重算,則開銷太大
-
在寬依賴上做 Checkpoint 獲得的收益更大
與cache類似 checkpoint 也是 lazy 的,
val rdd1 = sc.parallelize(1 to 100000)
// 設定檢查點目錄
sc.setCheckpointDir("/tmp/checkpoint")
val rdd2 = rdd1.map(_*2)
rdd2.checkpoint
// checkpoint是lazy操作
rdd2.isCheckpointed
// checkpoint之前的rdd依賴關系
rdd2.dependencies(0).rdd
rdd2.dependencies(0).rdd.collect
// 執行一次action,觸發checkpoint的執行
rdd2.count
rdd2.isCheckpointed
// 再次查看RDD的依賴關系,可以看到checkpoint后,RDD的lineage被截斷,變成從checkpointRDD開始
rdd2.dependencies(0).rdd
rdd2.dependencies(0).rdd.collect
//查看RDD所依賴的checkpoint檔案
rdd2.getCheckpointFile
備注:checkpoint的檔案作業執行完畢后不會被洗掉
吳邪,小三爺,混跡于后臺,大資料,人工智能領域的小菜鳥,
更多請關注

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/250085.html
標籤:其他
