Spark中的transformation和action、element和partition
- transformation
- action
- element和partitions
Spark中transformation和action是RDD中兩個重要的組成部分,也可以理解為一個完整的RDD任務由這兩部分操作組成,在transformation和action操作下,也有兩個重要的概念element和partition下面是對它們的一些理解:
transformation
Transformation用于對RDD的創建,還可以把一個RDD轉換為另一個RDD,方式很多,比如從資料源生成一個新的RDD,從RDD生成一個新的RDD
發生此操作的轉換算子如下:
map(func):對呼叫map的RDD資料集中的每個element都使用func,然后回傳一個新的RDD
filter(func): 對呼叫filter的RDD資料集中的每個元素都使用func,然后回傳一個包含使func為true的元素構成的RDD
flatMap(func):和map差不多,但是flatMap生成的是多個RDD
mapPartitions(func):和map很像,但是map是每個element,而mapPartitions是每個partition
mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一個split上,所以func中應該有index
sample(withReplacement,faction,seed):抽樣
union(otherDataset):回傳一個新的dataset,包含源dataset和給定dataset的元素的集合
distinct([numTasks]):回傳一個新的dataset,這個dataset含有的是源dataset中的distinct的element
groupByKey(numTasks):回傳(K,Seq[V]),
reduceByKey(func,[numTasks]):就是用一個給定的reducefunc再作用在groupByKey產生的(K,Seq[V]),比如求和,求平均數
sortByKey([ascending],[numTasks]):按照key來進行排序,是升序還是降序,ascending是boolean型別
join(otherDataset,[numTasks]):當有兩個KV的dataset(K,V)和(K,W),回傳的是(K,(V,W))的dataset,numTasks為并發的任務數
cogroup(otherDataset,[numTasks]):當有兩個KV的dataset(K,V)和(K,W),回傳的是(K,Seq[V],Seq[W])的dataset,numTasks為并發的任務數
cartesian(otherDataset):笛卡爾積就是m*n
action
action是行動、計算的意思,會對前面的Transformation操作進行執行,得到結果
發生此操作的行動算子如下:
reduce(func):聚集,但是傳入的函式是兩個引數輸入回傳一個值,這個函式必須是滿足交換律和結合律的
collect():一般在filter或者足夠小的結果的時候,再用collect封裝回傳一個陣列
count():回傳的是dataset中的element的個數
first():回傳的是dataset中的第一個元素
take(n):回傳前n個elements
takeSample(withReplacement,num,seed):抽樣回傳一個dataset中的num個元素,隨機種子seed
saveAsTextFile(path):把dataset寫到一個textfile中,或者hdfs,或者hdfs支持的檔案系統中,spark把每條記錄都轉換為一行記錄,然后寫到file中
saveAsSequenceFile(path):只能用在key-value對上,然后生成SequenceFile寫到本地或者hadoop檔案系統
countByKey():回傳的是key對應的個數的一個map,作用于一個RDD
foreach(func):對dataset中的每個元素都使用func
foreachPartition(func):對dataset中的每個磁區使用func,之后可以再對磁區迭代
目前接觸的RDD操作還不是很多,以后有了更深的理解再進行補充,上訴算子操作參考來自此文:原文鏈接
著作權宣告:本文為CSDN博主「簡單點1024」的原創文章,遵循CC 4.0 BY-SA著作權協議,轉載請附上原文出處鏈接及本宣告,
原文鏈接:https://blog.csdn.net/zhangbaoanhadoop/article/details/82111029
element和partitions
在上訴的操作算子中,有map()、mapPartitions()或者foreach()、foreachPartition()這樣的操作,element是RDD中的元素,而partitsions是對若干個元素的分批,如果是普通的map或foreach操作,一次function的執行就處理一條資料;而partitons中,一個task僅僅會執行一次function,function一次接收所有的partition資料,比如有一個需求,將資料插入某個表,如下:
arrayRDD.mapPartitions(datas=>{
dbConnect = getDbConnect() //獲取資料庫連接
datas.foreach(data=>{
dbConnect.insert(data) //回圈插入資料
})
dbConnect.commit() //提交資料庫事務
dbConnect.close() //關閉資料庫連接
})
每批資料只需要開啟一次資料庫連接,大大減少了連接開支,
partitions的缺點
如果是普通的map操作,一次function的執行就處理一條資料;那么如果記憶體不夠用的情況下,比如處理了1千條資料了,那么這個時候記憶體不夠了,那么就可以將已經處理完的1千條資料從記憶體里面垃圾回收掉,所以說普通的map操作通常不會導致記憶體的OOM例外,
但是MapPartitions操作,對于大量資料來說,比如甚至一個partition,100萬資料,一次傳入一個function以后,那么可能一下子記憶體不夠,但是又沒有辦法去騰出記憶體空間來,可能就OOM,記憶體溢位,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/384170.html
標籤:其他
