RDD介紹
1.RDD概念以及特性
RDD(Resilient Distributed Dataset)叫做彈性分布式資料集,是Spark中最基本的資料抽象,它代表一個不可變、可磁區、里面的元素可并行計算的集合,RDD具有資料流模型的特點:自動容錯、位置感知性調度和可伸縮性,RDD允許用戶在執行多個查詢時顯式地將資料快取在記憶體中,后續的查詢能夠重用這些資料,這極大地提升了查詢速度,(A Resilient Distributed Dataset)彈性分布式資料集合,并且是spark最基本的編程抽象,而且RDD是只讀、可磁區的、可以進行并行計算的一個物件,
-
資料集:一個資料集合,用于存放資料的,RDD是一個資料容器,用來組織管理資料的,跟Array和List類似,并且都能夠進行map、flatMap、filter等等
-
分布式:RDD中的資料是分布式存盤的,可用于分布式計算,RDD的資料是分布存盤的,也就是Spark集群中每個節點上只存盤了RDD的部分資料,計算同樣也是分布式并行計算的
-
彈性:
-
存盤的彈性:RDD的資料可以在記憶體和磁盤之間進行自由切換
-
可靠性的彈性:RDD的在丟失資料的時候能夠自動恢復,RDD在計算程序中會出現失敗的情況,失敗以后會進行一定次數的重試(4次)
-
并行度的彈性:RDD的資料磁區可以改變,進而增加并行計算的粒度
-
-
RDD其他特點:
-
RDD的資料是只讀,每次操作都會產生新的RDD,安全,
-
RDD中資料可以快取在記憶體、磁盤、HDFS之上
-
1.1RDD彈性
1) 自動進行記憶體和磁盤資料存盤的切換
? Spark優先把資料放到記憶體中,如果記憶體放不下,就會放到磁盤里面,程式進行自動的存盤切換
2) 基于血統的高效容錯機制
? 在RDD進行轉換和動作的時候,會形成RDD的Lineage依賴鏈,當某一個RDD失效的時候,可以通過重新計算上游的RDD來重新生成丟失的RDD資料,
3) Task如果失敗會自動進行特定次數的重試
? RDD的計算任務如果運行失敗,會自動進行任務的重新計算,默認次數是4次,
4) Stage如果失敗會自動進行特定次數的重試
? 如果Job的某個Stage階段計算失敗,框架也會自動進行任務的重新計算,默認次數也是4次,
5) Checkpoint和Persist可主動或被動觸發
? RDD可以通過Persist持久化將RDD快取到記憶體或者磁盤,當再次用到該RDD時直接讀取就行,也可以將RDD進行檢查點,檢查點會將資料存盤在HDFS中,該RDD的所有父RDD依賴都會被移除,
6) 資料調度彈性
? Spark把這個JOB執行模型抽象為通用的有向無環圖DAG,可以將多Stage的任務串聯或并行執行,調度引擎自動處理Stage的失敗以及Task的失敗,
7) 資料分片的高度彈性
? 可以根據業務的特征,動態調整資料分片的個數,提升整體的應用執行效率,
? RDD是一種分布式的記憶體抽象,表示一個只讀的記錄磁區的集合,它只能通過其他RDD轉換而創建,為此,RDD支持豐富的轉換操作(如map, join, filter, groupBy等),通過這種轉換操作,新的RDD則包含了如何從其他RDDs衍生所必需的資訊,所以說RDDs之間是有依賴關系的,基于RDDs之間的依賴,RDDs會形成一個有向無環圖DAG,該DAG描述了整個流式計算的流程,實際執行的時候,RDD是通過血緣關系(Lineage)一氣呵成的,即使出現資料磁區丟失,也可以通過血緣關系重建磁區,總結起來,基于RDD的流式計算任務可描述為:從穩定的物理存盤(如分布式檔案系統)中加載記錄,記錄被傳入由一組確定性操作構成的DAG,然后寫回穩定存盤(HDFS或磁盤),另外RDD還可以將資料集快取到記憶體中,使得在多個操作之間可以重用資料集,基于這個特點可以很方便地構建迭代型應用(圖計算、機器學習等)或者互動式資料分析應用,可以說Spark最初也就是實作RDD的一個分布式系統,后面通過不斷發展壯大成為現在較為完善的大資料生態系統,簡單來講,Spark-RDD的關系類似于Hadoop-MapReduce關系,
1.2RDD的五大屬性
1) 一組分片(Partition),即資料集的基本組成單位,對于RDD來說,每個分片都會被一個計算任務處理,并決定并行計算的粒度,
如果檔案的block個數 <=2 那么 sc.textFile(“file:///wordcount.txt”)磁區個數為2
如果檔案的block塊個數 >2 那么 sc.textFile(“file:///wordcount.txt”)磁區的個數等于block塊的個數
2) 一個計算每個磁區的函式,Spark中RDD的計算是以分片為單位的,每個RDD都會實作compute函式以達到這個目的,compute函式會對迭代器進行復合,不需要保存每次計算的結果,RDD的每一個算子操作比如map 都會通過compute方法作用在每個磁區之上
3) RDD之間的依賴關系,RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似于流水線一樣的前后依賴關系,在部分磁區資料丟失時,Spark可以通過這個依賴關系重新計算丟失的磁區資料,而不是對RDD的所有磁區進行重新計算,每一個RDD都有其依賴串列RDD的依賴關系 都是存在一個序列集合中,作用:容錯 以及構建起血統機制
4) 一個Partitioner,即RDD的分片函式,當前Spark中實作了兩種型別的分片函式,一個是基于哈希的HashPartitioner,另外一個是基于范圍的RangePartitioner,只有對于于key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None,Partitioner函式不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量,
5) 一個串列,存取每個Partition的優先位置(preferred location),對于一個HDFS檔案來說,這個串列保存的就是每個Partition所在的塊的位置,按照“移動資料不如移動計算”的理念,Spark在進行任務調度的時候,會盡可能地將計算任務分配到其所要處理資料塊的存盤位置,
a list of preferred locations to compute each split on (e.g. block locations for * an HDFS file) Spark在讀取hdfs檔案的是,hdfs檔案每一個block默認有多個備份,spark會獲取每一個block塊以及其備份的位置資訊構建成串列,在進行計算的時候,spark會在位置串列中選取一個最佳位置進行任務分配, 移動資料不如移動計算的原則, 移動資料不如移動計算的原則最高境界:資料在當前運行程式的行程之中 RDD是如何確定優先位置? getPreferredLocations(split: Partition): Seq[String] 通過以上方法確定計算的最佳位置, RDD的資料本地化: 5種
2.RDD的構建方式
3種構建方式
-
根據以后資料集合構建RDD
-
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
-
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8))
-
-
根據外部檔案 可以是本地檔案也可是HDFS上檔案
-
sc.textFile(filePath)
-
-
根據以后RDD創建新的RDD 需要經過算子操作
-
val newRDD=lineRDD.flatMap(function)
-
3.RDD的算子操作
RDD的算子分為兩類
-
轉換算子(Transform算子)
-
將一個RDD通過轉換算子操作以后會構建新的RDD,比如map 、flatMap、reduceByKey
-
轉換算子操作都是直接new新的RDD,此時RDD并沒有進行真正的計算,轉換算子只是對資料如何計算做了標記,轉換算子都是懶加載,
-
-
重要算子操作
-
mapPartitions :作用于每個磁區之上的
-
mapPartitions 和map區別:
-
mapPartitions 相當于partition批量操作
-
map作用于每一條資料
-
重要區別:mapPartitions 這個在大量task運行的時候可能會出現記憶體溢位的情況,小資料量的操作 mapPartitions 要優于map操作
-
-
-
groupByKey算子和ReduceByKey算子的區別
-
1.groupByKey 回傳值:key->集合 ReduceByKey回傳值: key-》值
-
2.ReduceByKey操作會在本地進行初步merge操作,能夠減少網路資料的傳輸
-
-
coalesce 減少磁區資料的算子
-
該算子可以進行shuffle也可以不進shuffle操作, coalesce(numPartitions: Int, shuffle: Boolean = false)
-
-
repartition 實際上是呼叫了 coalesce 算子 ,而且 repartition一定會進行shuffle操作,既可以增加也可以減少磁區
-
-
Action算子
action算子內部都會有一個runJob方法進行提交一個Job任務
廣播變數:
-
廣播變數需要資料傳遞
-
HTTP協議:基于HTTP協議將資料傳遞到Executor,Executor會Driver端申請下載(已經被廢棄)
-
torrent協議:默認的方式, Driver下載到Executor上,然后Executor會再次資料源,將資料傳遞到下一個需要資料Executor之上,參考 (TorrentBroadcast類)
-
4.RDD的依賴關系
RDD和它依賴的父RDD的關系有兩種不同的型別,即窄依賴(narrow dependency)和寬依賴(wide dependency),
-
RDD的依賴型別
-
窄依賴:父RDD中一個partition最多被子RDD中的一個partition所依賴,這種依賴關系就是窄依賴
-
窄依賴算子:map 、filter 、union 、flatMap等
-
寬依賴:父RDD中一個partition被子RDD中的多個partition所依賴,這種依賴關系就是寬依賴
-
寬依賴算子:groupByKey、reduceByKey,凡是By基本上都是寬依賴
一對一或者多對==一:窄依賴==
一對多或者多對==多:寬依賴==
-
-
寬窄依賴算子的判斷依據是轉換算子是否會產生shuffle操作,如果有shuffle操作則是寬依賴,否則是窄依賴
-
join既是寬依賴算子也是窄依賴算子 (在一個shuffle操作之后,在使用Join的時候,此時join就是窄依賴)
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/22799.html
標籤:大數據
