1.五個基本Properties
-
A list of partitions
-
A function for computing each split
-
A list of dependencies on other RDDs
-
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
-
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
這是RDD的原始碼中注釋中寫到的,下面介紹這五種特征屬性
1.1 磁區
一組分片(Partition),即資料集的基本組成單位,對于RDD來說,每個分片都會被一個計算任務處理,并決
定并行計算的粒度,用戶可以在創建RDD時指定RDD的分片個數,如果沒有指定,那么就會采用默認值
1.2 計算的函式
一個對磁區資料進行計算的函式,Spark中RDD的計算是以分片為單位的,每個RDD都會實作 compute 函式以
達到該目的,compute函式會對迭代器進行組合,不需要保存每次計算的結果
1.3 依賴關系
RDD之間的存在依賴關系,RDD的每次轉換都會生成一個新的RDD,RDD之間形成類似于流水線一樣的前后依
賴關系(lineage),在部分磁區資料丟失時,Spark可以通過這個依賴關系重新計算丟失的磁區資料,而不是
對RDD的所有磁區進行重新計算
1.4 磁區器
對于 key-value 的RDD而言,可能存在磁區器(Partitioner),Spark 實作了兩種型別的分片函式,一個是基于
哈希的HashPartitioner,另外一個是基于范圍的RangePartitioner,只有 key-value 的RDD,才可能有
Partitioner,非key-value的RDD的Parititioner的值是None,Partitioner函式決定了RDD本身的分片數量,也
決定了parent RDD Shuffle輸出時的分片數量
1.5 優先存盤位置
一個串列,存盤存盤每個Partition的優先位置(preferred location),對于一個HDFS檔案來說,這個串列保
存的就是每個Partition所在的塊的位置,按照“移動資料不移動計算”的理念,Spark在任務調度的時候,會盡可
能地將計算任務分配到其所要處理資料塊的存盤位置
2. RDD轉換之間的常見算子
從前面的RDD的基本特征入手,在作業中常撰寫的程式是,創建RDD,RDD的轉換,RDD的算子的執行,創建對應著外部系統的資料流入Spark集群的必選步驟,至于之間從集合創建的資料,一般在測驗時候使用,所以不細述,RDD的轉換對應一個專門的算子叫Transformation其是惰性加載使用的, 而行動對應著觸發Transformation執行的操作,一般是輸出到集合,或者列印出來,或者回傳一個值,另外就是從集群輸出到別的系統,這有一個專業詞叫Action.
2.1 常見轉換算子
轉換算子,即從一個RDD到另外一個RDD的轉換操作,對應一些內置的Compute函式,但是這些函式被有沒有shuffle來分為寬依賴算子和窄依賴算子
2.1.1 寬依賴和窄依賴的區別
一般網上文章有兩種,一種是搬運定義的,即是否一個父RDD磁區會被多個子磁區依賴,另外一種是看有沒有Shuffle,有Shuffle就是寬依賴,沒有則是窄依賴,第一種還靠譜點,第二種就是拿本身來說本身,所以沒有參考價值,2.1.3 如何區別寬依賴和窄依賴,可以之間看這個
2.1.2 寬依賴和窄依賴的常見算子
窄依賴常見算子
map(func):對資料集中的每個元素都使用func,然后回傳一個新的RDD
filter(func):對資料集中的每個元素都使用func,然后回傳一個包含使func為true的元素構成的RDD
flatMap(func):與 map 類似,每個輸入元素被映射為0或多個輸出元素
mapPartitions(func):和map很像,但是map是將func作用在每個元素上,而mapPartitions是func作用在整個分
區上,假設一個RDD有N個元素,M個磁區(N >> M),那么map的函式將被呼叫N次,而mapPartitions中的函式
僅被呼叫M次,一次處理一個磁區中的所有元素
mapPartitionsWithIndex(func):與 mapPartitions 類似,多了磁區的索引值的資訊
glom():將每一個磁區形成一個陣列,形成新的RDD型別 RDD[Array[T]]
sample(withReplacement, fraction, seed):采樣算子,以指定的隨機種子(seed)隨機抽樣出數量為fraction的數
據,withReplacement表示是抽出的資料是否放回,true為有放回的抽樣,false為無放回的抽樣
coalesce(numPartitions,false):無shuffle,一般用來減少磁區
union(otherRDD) : 求兩個RDD的并集
cartesian(otherRDD):笛卡爾積
zip(otherRDD):將兩個RDD組合成 key-value 形式的RDD,默認兩個RDD的partition數量以及元素數量都相同,否
則會拋出例外,
map 與 mapPartitions 的區別
map:每次處理一條資料
mapPartitions:每次處理一個磁區的資料,磁區的資料處理完成后,資料才能釋放,資源不足時容易導致
OOM
最佳實踐:當記憶體資源充足時,建議使用mapPartitions,以提高處理效率
寬依賴常見算子
groupBy(func):按照傳入函式的回傳值進行分組,將key相同的值放入一個迭代器
distinct([numTasks])):對RDD元素去重后,回傳一個新的RDD,可傳入numTasks引數改變RDD磁區數
coalesce(numPartitions, true):有shuffle,無論增加磁區還是減少磁區,一般用repartition來代替
repartition(numPartitions):增加或減少磁區數,有shuffle
sortBy(func, [ascending], [numTasks]):使用 func 對資料進行處理,對處理后的結果進行排序
intersection(otherRDD) : 求兩個RDD的交集
subtract (otherRDD) : 求兩個RDD的差集
2.1.3 如何區別寬依賴和窄依賴
這里我建議理解不了的算子,直接從Spark的history的依賴圖來看,有沒有劃分Stage,如果劃分了就是寬依賴,沒有劃分就是窄依賴,當然這是實戰派的做法,可以在同事或者同學說明問題的時候,show your code 給他,然后把依賴圖拿給他 ,當然作為理論加實踐的并行者,我這里再拿一種來判別,是從理解定義開始的,定義說是父RDD磁區有沒有被多個子磁區依賴,那可以從這個角度想一下,父磁區單個磁區資料,有沒有可能流向不同的子RDD的磁區,比如想一想distinct算子,或者sortBy算子,全域去重和全域排序,假設剛開始1,2,3在一個磁區,經過map(x => (x, null)).reduceByKey((x, y) => x).map(_._1) 去重后,雖然磁區數量沒有變,但是每個磁區資料必然要看別的磁區的資料,才能知道最后自己要不要保留,從輸入磁區,到輸出磁區,必然經過匯合重組,所以必然有shuffle的,sortBy同理,
2.2 常見行動算子
Action觸發Job,一個Spark程式(Driver程式)包含了多少 Action 算子,那么就有多少Job;
典型的Action算子: collect / count
collect() => sc.runJob() => ... => dagScheduler.runJob() => 觸發了Job
collect() / collectAsMap() stats / count / mean / stdev / max / min reduce(func) / fold(func) / aggregate(func)
first():Return the first element in this RDD
take(n):Take the first num elements of the RDD
top(n):按照默認(降序)或者指定的排序規則,回傳前num個元素,
takeSample(withReplacement, num, [seed]):回傳采樣的資料
foreach(func) / foreachPartition(func):與map、mapPartitions類似,區別是 foreach 是 Action
saveAsTextFile(path) / saveAsSequenceFile(path) / saveAsObjectFile(path)
3. PairRDD常見操作
RDD整體上分為 Value 型別和 Key-Value 型別,
前面介紹的是 Value 型別的RDD的操作,實際使用更多的是 key-value 型別的RDD,也稱為 PairRDD,
Value 型別RDD的操作基本集中在 RDD.scala 中;
key-value 型別的RDD操作集中在 PairRDDFunctions.scala 中;
前面介紹的大多數算子對 Pair RDD 都是有效的,RDD的值為key-value的時候即可隱式轉換為PairRDD, Pair RDD還有屬于自己的 Transformation、Action 算子;

3.1 常見PairRDD的Transformation操作
3.1.1 類似 map 操作
mapValues / flatMapValues / keys / values,這些操作都可以使用 map 操作實作,是簡化操作,
3.1.2 聚合操作【重要、難點】
PariRDD(k, v)使用范圍廣,聚合
groupByKey / reduceByKey / foldByKey / aggregateByKey
combineByKey(OLD) / combineByKeyWithClassTag (NEW) => 底層實作
subtractByKey:類似于subtract,刪掉 RDD 中鍵與 other RDD 中的鍵相同的元素
結論:效率相等用最熟悉的方法;groupByKey在一般情況下效率低,盡量少用
3.1.3 排序操作
sortByKey:sortByKey函式作用于PairRDD,對Key進行排序
3.1.4 join操作
cogroup / join / leftOuterJoin / rightOuterJoin / fullOuterJoin

val rdd1 = sc.makeRDD(Array((1,"Spark"), (2,"Hadoop"), (3,"Kylin"), (4,"Flink")))
val rdd2 = sc.makeRDD(Array((3,"李四"), (4,"王五"), (5,"趙六"), (6,"馮七")))
val rdd3 = rdd1.cogroup(rdd2)
rdd3.collect.foreach(println)
rdd3.filter{case (_, (v1, v2)) => v1.nonEmpty & v2.nonEmpty}.collect
// 仿照原始碼實作join操作
rdd3.flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
val rdd1 = sc.makeRDD(Array(("1","Spark"),("2","Hadoop"),("3","Scala"),("4","Java")))
val rdd2 = sc.makeRDD(Array(("3","20K"),("4","18K"),("5","25K"),("6","10K")))
rdd1.join(rdd2).collect
rdd1.leftOuterJoin(rdd2).collect
rdd1.rightOuterJoin(rdd2).collect
rdd1.fullOuterJoin(rdd2).collect
3.1.5 Action操作
collectAsMap / countByKey / lookup(key)

lookup(key):高效的查找方法,只查找對應磁區的資料(如果RDD有磁區器的話
4.寄語
實戰出真知,想要某種實作的時候,假設恰好你想到某個算子,那么去使用它,不懂的地方看原始碼,大業可成!
吳邪,小三爺,混跡于后臺,大資料,人工智能領域的小菜鳥,
更多請關注

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/247526.html
標籤:其他
上一篇:大資料開發-解決Windows下,開發環境常遇的幾個問題
下一篇:演算法-回溯問題解決框架
