主頁 >  其他 > 《初識Spark之RDD算子長文篇》一篇帶你開啟大資料計算之門!

《初識Spark之RDD算子長文篇》一篇帶你開啟大資料計算之門!

2021-04-02 11:03:31 其他

目錄

RDD概念

RDD簡介

什么是RDD?

RDD的五個主要屬性

分片/區(Partition)

磁區計算函式

RDD的分片函式(Partitioner)

RDD相互依賴

優先位置串列

創建RDD

基于資料集合創建RDD

基于外部資料源創建RDD

Python統計檔案總字數

RDD算子-轉換和動作

RDD算子(主要操作)

轉換操作 map

轉換操作 flatMap

轉換操作 filter

轉換操作 union

轉換操作 intersection

轉換操作 distinct

轉換操作 sortBy

轉換操作 glom

轉換操作 mapPartitions

轉換操作 mapPartitionsWithIndex

動作操作

動作操作 collect

動作操作 reduce

動作操作 count

動作操作 take

動作操作 first

動作操作 max

動作操作 top

動作操作 saveAsTextFile

動作操作 foreach

動作操作 foreachPartition

PairRDD

轉換操作 mapValues

轉換操作 flatMapValues

轉換操作 groupByKey

?轉換操作 reduceByKey

轉換操作 sortByKey

轉換操作 partitionBy

轉換操作 keys

轉換操作 values

轉換操作 join

轉換操作 leftOuterJoin

轉換操作 rightOuterJoin

動作操作 collectAsMap

動作操作 countByKey

共享變數

閉包問題

累加器 Accumulator

廣播變數 Broadcast

廣播變數示例

RDD依賴關系

依賴關系

RDD的持久化

每文一語


RDD概念

RDD簡介

什么是RDD?

Spark 的核心是建立在統一的抽象彈性分布式資料集(Resiliennt Distributed Datasets,RDD)之上的,這使得 Spark 的各個組件可以無縫地進行集成,能夠在同一個應用程式中完成大資料處理,

RDD是彈性分布式資料集:它是一種容錯的并行資料機構

RDD數只讀的磁區記錄集合:在這個基礎上提供比較豐富的操作方法

RDD是spark的基石也是spark的靈魂:在spark里面將資料抽象為RDD,這就使得我們在處理大資料集可以得心應手

RDD的五個主要屬性

分片/區(Partition)

Partition(磁區或分片)是 Spark 資料集的基本組成單位,Spark 集群讀取一個檔案會根據具體的配置將檔案加載在不同的節點的記憶體中,每個節點中的資料就是一個分片,對于 RDD 來說,每個分片都會被一個計算任務處理,并決定并行計算的粒度,用戶可以在創建 RDD 時指定 RDD的分片個數,默認分片個數CPU 核心個數相同

磁區計算函式

Spark RDD 的計算是以分片為單位的,每個 RDD 都會實作計算函式以達到分片計算,計算函式會對迭代器進行復合,不需要保存每次計算的結果,

RDD分片函式(Partitioner)

當前 Spark 中實作了兩種型別的分片函式,一個是基于哈希HashPartitioner,另外一個是基于范圍的 RangePartitioner,只有對于key-valueRDD,才會Partitionerkey-valueRDDParititioner的值是NonePartitioner函式不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量,

RDD相互依賴

RDD 的每次轉換都會生成一個新的 RDD,因此 RDD 之間就會形成類似于流水線一樣的前后依賴關系,在部分磁區資料丟失時Spark 可以通過這個依賴關系重新計算丟失的磁區資料,而不是RDD 的所有磁區進行重新計算,

優先位置串列

就是存盤讀取每個Partition優先位置串列,對于一個 HDFS 檔案來說,這個串列保存的就是每個 Partition 所在的塊的位置,按照“移動資料不如移動計算”的理念,Spark 在進行任務調度的時候,會盡可能地將計算任務分配到其所要處理資料塊的存盤位置,

創建RDD

基于資料集合創建RDD

parallelize 函式可以將一個可迭代物件轉換為RDD物件,需要傳入一個可迭代物件,還可以指定磁區數,如果不指定則采用默認值defaultParallelism count函式回傳RDD中元素的個數,

這個代碼是必須要寫入到我們的代碼塊里面的,它的作用是為了找到本地的spark的環境進行相關的操作,spark背景關系的呼叫:

import findspark

findspark.init()

from pyspark import SparkContext

sc = SparkContext()  # 加載檔案
data = [1, 2, 3, 4, 5]
distData_1 = sc.parallelize(data)   # parallelize函式創建RDD
distData_2 = sc.parallelize(data, 2)  # 指定磁區數創建RDD,指定2個磁區
a=distData_2.count() # count函式回傳RDD中元素的個數,
b=distData_1.count()
print(a)
print(b)

紅色警告是正常的!如果你看起來不舒服可以去花一定的時間去解決但是我感覺得沒有必要

基于外部資料源創建RDD

textFile 函式可以將一個外部資料源轉換為RDD物件,檔案的一行資料就是RDD的一個元素,需要傳入該資料源的url,可以是本地檔案,也可以是hdfs中的檔案,如果是本地檔案,則在url前面加file:///,如果是hdfs上的檔案,則在url前面加hdfs:/// count函式回傳RDD中元素的個數,

distFile1 = sc.textFile(“file:///home/camel/Repos/spark/README.md”) # 本地檔案
distFile2 = sc.textFile(“hdfs:///test/output”) # hdfs上的檔案
distFile1.count() # 此時的count函式回傳的是文本的行數

Python統計檔案總字數

#1.常規思路,代碼如下
file=open("/home/camel/Repos/spark/README.md")
lines=file.readlines()
count=0
for line in lines:
    count+=len(line.split(' '))
#2.使用map函式和reduce函式統計,代碼如下
from functools import reduce
from operator import add
file=open("/home/camel/Repos/spark/README.md")
lines=file.readlines()
count =reduce(add,map(lambda line:len(line.split(' ')),lines))

Spark統計一個文本單詞的思路跟Python中使用map函式和reduce函式的思路是一致的,具體代碼如下:

logFile='/home/hadoop/data/README.md'

logData = sc.textFile(logFile).cache()  # 資料快取
numAs = logData.filter(lambda s: 'a' in s).count()  # 元素過濾
numBs = logData.filter(lambda s: 'b' in s).count()
print("Lines with a:%i,lines with b: %i" % (numAs, numBs))

RDD算子-轉換和動作

作用于RDD上的Operation分為轉換(transformantion)動作(action), Spark中的所有“轉換”都是惰性的,在執行“轉換”操作,并不會提交Job,只有在執行“動作”操作,所有operation才會被提交到cluster中真正的被執行,這樣可以大大提升系統的性能,

RDD算子主要操作)

RDD擁有的操作比MR豐富的多,不僅僅包括MapReduce操作,還包括filtersortjoinsavecount等操作,所以SparkMR更容易方便完成更復雜的任務,

轉換操作 map

map 轉換算子與Python中的map函式類似,該方法是將指定的函式作用在RDD的每個磁區的每個元素上,其實就是我們Python語法,只是在Python里面可能對這一方面的語法沒有過于的要求,用法如下:

rdd = sc.parallelize(["b", "a", "c"])
a=rdd.map(lambda x: (x, 1)).collect()
print(a)

轉換操作 flatMap

flatMap 操作和map操作有點類似,flatMap轉換首先將map函式應用于該RDD的所有元素,然后將結果平坦化(可以理解為將型別為元組的元素逐個放出來),從而回傳新的RDD,用法如下:

rdd = sc.parallelize([2, 3, 4])
a=rdd.map(lambda x: [(x, x), (x, x)]).collect()
b=rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()
print(a)
print(b)

我們發現有點不一樣,一個回傳的是串列嵌套串列,一個是串列里面嵌套元組

轉換操作 filter

filter 轉換算子與Python中的filter函式類似,該方法是用指定的函式作為過濾條件,在RDD的所有磁區中篩選出符合條件的元素,用法如下:

rdd = sc.parallelize([1, 2, 3, 4, 5])
a=rdd.filter(lambda x: x % 2 == 0).collect()
print(a)

這里按照演算法進行篩選出偶數,并回傳一個串列集

轉換操作 union

union 轉換算子與Python中的集合中的union函式類似,該方法是對一個RDD和引數RDD求并集后,回傳一個新的RDD,只不過這里的union不會去重,用法如下:

rdd1 = sc.parallelize([1, 2, 3, 4])
rdd2 = sc.parallelize([3, 4, 5, 6])
print(rdd1.union(rdd2).collect())

轉換操作 intersection

intersection 轉換算子與Python中的集合中的intersection函式類似,該方法是對一個RDD和引數RDD求交集后,回傳一個新的RDD,用法如下:

a=rdd1 = sc.parallelize([1, 2, 3, 4])
b=rdd2 = sc.parallelize([3, 4, 5, 6])
c=rdd1.intersection(rdd2).collect()
print(c)

轉換操作 distinct

distinct 轉換算子有點類似于SQL陳述句中的distinct,或者可以理解為將一個Python串列轉成集合(set(a_list)), distinctRDD中回傳包含不同元素的新RDD,用法如下:

rdd2 = sc.parallelize([3, 4, 5, 6, 3, 4])
a=rdd2.distinct().collect()
print(a)

轉換操作 sortBy

sortBy 轉換算子與Pythonsort函式類似,該方法是對一個RDD和根據指定的key進行排序,回傳一個新的RDD,默認是升序排列,如果要降序排列,則添加引數ascending=False,用法如下:

tmp = [("a", 1), ("b", 2), ("1", 3), ("d", 4), ("2", 5)]
a=sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
b=sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
c=sc.parallelize(tmp).sortBy(lambda x: x[1], ascending=False).collect()
print(a)
print(b)
print(c)

注意我們在進行排序的時候,針對一個鍵值對的型別進行排序,我我們的思想就是就是將一個串列里面的元組進行RDD轉換,最終利用lambda函式進行排序,得到我們需要的結果,默認是TRUE:從小到大

這里給出一個問題?如果是一個單純的一維陣列,我們需要去排序又該如何去做,答案是sort即可!語法參考Python即可

轉換操作 glom

glom 轉換算子回傳通過將每個磁區中的所有元素合并到串列中得到新的RDD,用法如下:

data = [1, 2, 3, 4, 5, 6, 7, 8, 9]
rdd = sc.parallelize(data, 4)
rdd = rdd.glom()
a=rdd.collect()
print(a)

這里我們來看,首先分了四個區,那么一個區又該容納多少個元素,這個是需要我們考慮的嗎?顯然不用嗎,系統會根據具體的元素數量,根據特定的演算法組合,結合出最佳的組合串列,

比如這種就很明顯了,10個元素,分成5個區,自動給每個區添加2個元素

data = [1, 2, 3, 4, 5, 6, 7, 8, 9,10]
rdd = sc.parallelize(data, 5)
rdd = rdd.glom()
a=rdd.collect()
print(a)

轉換操作 mapPartitions

mapPartitions 操作類似于map操作,只不過map是作用于每一個元素,mapPartitions是作用于每一個磁區,用法如下:

rdd = sc.parallelize([1, 2, 3, 4], 2)
a=rdd.glom().collect()
def f(iterator): yield sum(iterator)
b=rdd.mapPartitions(f).collect()
print(a)
print(b)

mapPartitions中,f函式接收到的引數為每個磁區的迭代器,回傳值為求和操作,故回傳值37分別為每個磁區的和,

轉換操作 mapPartitionsWithIndex

mapPartitionsWithIndex 類似于mapPartitions算子,區別在于mapPartitionsWithIndex中傳入的函式可接收兩個引數,第一個引數為磁區編號,第二個為對應磁區的元素組成的迭代器,回傳值型別為迭代器(生成器),這個算子可以回傳每個磁區的磁區編號和元素,用法如下:

rdd = sc.parallelize([1, 2, 3, 4], 2)
a=rdd.glom().collect()
def f(splitIndex, iterator): yield (splitIndex, list(iterator))
b=rdd.mapPartitionsWithIndex(f).collect()
print(b)

動作操作

動作操作 collect

collect 回傳一個包含RDD的所有元素的list,是在測驗代碼時最常用的動作算子,類似于Pythonprint函式的作用,寫法如下:

sc.parallelize([1, 2, 3, 4, 5]).collect()

需要注意的是,在使用這個算子時需要保證這個回傳結果比較小,因為這個算子相當于將 RDD 的所有元素收集到 driver 的客戶端的記憶體中,如果回傳結果比較大,可將結果保存在磁盤中的方式,

動作操作 reduce

reduce 有點類似于Python中的reduce函式,可以通過指定的聚合方法來對RDD中元素進行聚合,寫法如下:

from operator import add
a=sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
b=sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
print(a)
print(b)

這個語法如果是對Python不是特別熟悉的可能看不懂,首先那一個回圈就是產生10個序列,這個序列被后面的map(lambda 賦值了1)一次疊加,計算出總和:10

動作操作 count

count 有點類似于Python中的len函式,count回傳RDD中的元素個數,寫法如下:

sc.parallelize([1, 2, 5, 3]).count()

動作操作 take

take(n) 回傳RDD的前n個元素,用法如下:

sc.parallelize([1,2,5,3,6,7,8,9,10],3).take(4)

動作操作 first

first 回傳RDD的第一個值,用法如下:

sc.parallelize([1,2,5,3,6,7,8,9,10],3).first()

動作操作 max

max 回傳RDD的最大值,用法如下:

sc.parallelize([1,2,5,3,6,7,8,9,10],3).max()

動作操作 top

top(num) 回傳RDD中元素的前n個最大值,用法如下

sc.parallelize([1,2,3,4,6,7,8]).top(3)

動作操作 saveAsTextFile

saveAsTextFile RDD中的元素以字串的格式存盤在檔案系統中,寫法如下:

rdd = sc.parallelize((1 for _ in range(50)))
rdd.saveAsTextFile("file:///home/hadoop/data/test01")
print("ok!")

這里隨機產生的50個元素,被轉換為RDD,但是我們保存為檔案的時候,它自動就磁區了,變成了四個檔案,每個檔案12個元素,

動作操作 foreach

foreach( f)遍歷RDD的每個元素,并執行f函式操作,與map算子十分類似,區別在于foreach算子無回傳值,而map算子有回傳值,由于無回傳值,特別適合用于將資料寫入資料庫,存盤在檔案中的操作,用法如下:

rdd = sc.parallelize([3, 4, 5])
a=rdd.foreach(lambda x: print(x))
print(a)

動作操作 foreachPartition

foreachPartition ( f)遍歷RDD的每個磁區,針對每個磁區執行f函式操作,可類比mapPartitions(),用法如下:

rdd = sc.parallelize([3, 4, 5, 6, 7], 3)  # 指定3個磁區
a=rdd.foreachPartition(lambda x: print(list(x))) # 將磁區中的元素轉成串列列印出來
print(a)

PairRDD

?Spark為包含鍵值對(key-value)型別的RDD提供了一些專有的操作,這些RDD被稱為PairRDD,

?鍵值對RDD是一種常見的資料型別,具有廣泛的應用,像聚合計算等,

?PairRDD提供了并行操作各個鍵或跨節點重新進行資料分組的操作介面,

轉換操作 mapValues

mapValues 在不改變原有Key鍵的基礎上,對Key-Value結構RDDVaule值進行一個map操作,磁區保持不變,示例代碼如下:

a = [("a", 1), ("b", 2),("c", 3)]
a_rdd = sc.parallelize(a)
b=a_rdd.mapValues(lambda x: x * 10).collect()
c=a_rdd.mapValues(lambda x: (x, 1)).collect()
print(b)
print(c)

轉換操作 flatMapValues

flatMapValuesKey-Value結構的RDD先執行mapValue操作,再執行壓平的操作,類似mapflatMap的區別,示例代碼如下:

a = [("a", 1), ("b", 2),("c", 3)]
a_rdd = sc.parallelize(a)
b=a_rdd.mapValues(lambda x: (x, 1)).collect()
c=a_rdd.flatMapValues(lambda x: (x, 1)).collect()
print(b)
print(c)

一個是未壓平,一個是壓平后的

轉換操作 groupByKey

groupByKeyPair RDD中的相同Key的值放一個序列中,即根據key值進行分組,示例代碼:

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
a=rdd.groupByKey().collectAsMap()
print(a)

我們發現是一個序列,并沒有顯示它的值 ,怎么辦?看下面操作:

上述結果回傳的是一個字典,字典的value是一個可迭代物件,這樣看不好理解,可以先使用mapValues將其元素轉換為list,再用collectAsMap看結果:

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
a=rdd.groupByKey().collectAsMap()
print(a)
b=rdd.groupByKey().mapValues(list).collectAsMap()
print(b)

轉換操作 reduceByKey

reduceByKey 根據Key進行分組,對分組內的元素Value進行reduce操作 ,示例代碼如下:

rdd = sc.parallelize([("a", 1), ("b", 3), ("a", 1)])
a=rdd.reduceByKey(lambda x, y:x+y).collectAsMap()
print(a)

這個在我們進行成績統計的時候,可以做該相關操作,比如同一個學科的平均分我們就可以,按照這個分組來顯示不同的學科的成績,同時我們還可以用其他的操作來解決我們日常的問題

轉換操作 sortByKey

sortByKey 對原RDD根據Key進行排序,回傳新的RDD示例代碼如下:

rdd = sc.parallelize([("a", 1), ("b", 3), ("a", 1), ("c", 2) ])
a=rdd.sortByKey().collectAsMap()
print(a)

轉換操作 partitionBy

partitionBy 可以針對Key-Value結構的RDD重新磁區,采用默認的Hash磁區,用法如下:

>>> a = [("a", 1), ("b", 2), ("c", 3), ("d", 4)]

>>> a_rdd = sc.parallelize(a, 2) # 創建RDD的時候指定2個磁區

>>> a_rdd.glom().collect()

[[('a', 1), ('b', 2)], [('c', 3), ('d', 4)]]

>>> a_rdd = a_rdd.partitionBy(3) # 重新磁區,分為3個區

>>> a_rdd.glom().collect()

[[('d', 4)], [('b', 2)], [('a', 1), ('c', 3)]]

轉換操作 keys

keys 回傳一個新的RDD,元素為每個原RDD的所有key,示例代碼:

>>> rdd = sc.parallelize([("a", 1), ("b", 3), ("a", 1), (“c", 2) ])

>>> rdd.keys().collect()

['a', 'b', 'a', 'c']

轉換操作 values

values 回傳一個新的RDD,元素為每個原RDD的所有value,示例代碼:

>>> rdd = sc.parallelize([("a", 1), ("b", 3), ("a", 1), (“c", 2) ])

>>> rdd.keys().collect()

[1, 3, 1, 2]

轉換操作 join

join SQL中的join含義一致,根據資料的Key鍵進行連接,示例代碼:

a = sc.parallelize([("a", 1), ("b", 2)])
b = sc.parallelize([("b", 3), ("c", 4)])
c=a.join(b).collect()
print(c)

轉換操作 leftOuterJoin

leftOuterJoin 左外連接SQL中的leftJoin含義一致,示例代碼:

>>> a = sc.parallelize([("a", 1), ("b", 2)])

>>> b = sc.parallelize([("b", 3), ("c", 4)])

>>> a.leftOuterJoin(b).collect()

[('b', (2, 3)), ('a', (1, None))]

轉換操作 rightOuterJoin

rightOuterJoin 右外連接SQL中的rightOuterJoin含義一致,示例代碼:

>>> a = sc.parallelize([("a", 1), ("b", 2)])

>>> b = sc.parallelize([("b", 3), ("c", 4)])

>>> a.rightOuterJoin(b).collect()

[('b', (2, 3)), ('c', (None, 4))]

動作操作 collectAsMap

collectAsMap collect算子相似,collectAsMap算子是將Kev-Value結構的RDD收集到driver端,并回傳成一個字典,有相同鍵會被覆寫,示例代碼:

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
a=rdd.collectAsMap()
print(a)

動作操作 countByKey

countByKey count函式類似,countByKey的作用是統計每個Key鍵的元素數,示例代碼:

>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])

>>> rdd.countByKey()

defaultdict(<class 'int'>, {'a': 2, 'b': 1})

>>> rdd.countByKey().items()

[('a', 2), ('b', 1)]

共享變數

閉包問題

累加器是一個類似于rdd的變數,只不過累加器是一個全域的共享變數,累加器可以完成對資訊的聚合操作,在學習累加器之前,我們先來看一個使用Spark做累加的例子,代碼如下:

sum = 0
def fn1(x):
    global sum
    sum = sum + x
a_rdd = sc.parallelize([1, 2, 3, 4, 5])
a_rdd.foreach(fn1)
print(sum)  # 得到的sum是0,而不是元素的累加和

?上述程式得到的結果并不是我們所期望的,sum是在foreach函式外部定義的,也就是在driver程式中定義,而foreach函式是屬于rdd物件的,rdd函式的執行位置是各個worker節點(或者說worker行程),該代碼段是在driver節點上(或者說driver行程上)執行的,所以當counter變數在driver中定義,被在rdd中使用的時候,出現了變數的“跨域”問題,也就是閉包問題,

?對于上面程式中的sum變數,由于在代碼塊和在rdd物件的foreach函式是屬于不同“閉包”的,所以,傳進foreach中的sum是一個副本,初始值都為0foreach中疊加的是sum的副本,不管副本如何變化,都不會影響到代碼塊中的sum,所以最終列印出來的sum0

累加器 Accumulator

累加器是一個全域的共享變數,累加器可以很好地解決上述程式的閉包問題,使用累加器完成相同的功能,代碼如下

sum = sc.accumulator(0)  # 創建一個累加器,初值為0
def fn1(x):
    global sum
    sum += x  # 注意這里不能是 sum=sum+x,因為+=是原地操作,+是需要兩個變數型別一致
a_rdd = sc.parallelize([1, 2, 3, 4, 5])
a_rdd.foreach(fn1)
print(sum.value)  # sum.value可以獲取累加器的值,此時列印輸出的是15

?累加器是一個write-only的變數,作業節點worker中的task無法讀取這個值,只能在驅動程式中使用value方法來讀取累加器的值,

廣播變數 Broadcast

廣播變數和累加器類似,也是一個共享變數,廣播變數能夠以一種更有效率的方式將一個大資料量輸入集合的副本分配給每個節點,

廣播變數示例

SparkContext物件的broadcast方法可以創建廣播變數,廣播變數的value屬性可以獲取該廣播變數的值,unpersist方法可以在執行程式上洗掉此廣播的快取副本,destroy方法可以銷毀廣播變數,一旦廣播變數被銷毀,就不能再使用了,示例代碼如下:

>>> b = sc.broadcast(10)  # 創建一個廣播物件
>>> b.value  # 獲取廣播物件的值
10
>>> sc.parallelize([1, 2, 3, 4, 5]).map(lambda x: x * b.value).collect()
[10, 20, 30, 40, 50] 
>>> b.destroy()  # 銷毀廣播變數,銷毀后就不能訪問它的value了
>>> b.value  #  但是pyspark中還是能訪問到這個值,這是pyspark的問題,如果是scala確實是無法訪問它的值了
10
>>> sc.parallelize([1, 2, 3, 4, 5]).map(lambda x: x * b.value).collect()  # task中確實無法訪問該廣播變數的值了

廣播變數通過兩個方面提高資料共享效率:

1.集群中每個節點(物理機器)只有一個副本,默認的閉包是每個任務一個副本;

2.廣播傳輸是通過BT下載模式實作的,也就是P2P下載,在集群多的情況下,可以極大的提高資料傳輸速率,廣播變數修改后,不會反饋到其他節點,

RDD依賴關系

依賴關系

Spark每一個轉換操作都會生成一個新的RDDRDD之間連一條邊,最后這些RDD和他們之間的邊組成一個有向無環圖DAG(Directed Acyclic Graph)

有了計算的DAG圖,Spark內核下一步的任務就是根據DAG圖將計算劃分成任務集,也就是Stage,這樣可以將任務提交到計算節點進行真正的計算,

RDD的持久化

Spark可以對RDD進行持久化操作(即將RDD快取到記憶體中或者磁盤上),當持久化一個 RDD 時,每個節點的其它磁區都可以使用該RDD在記憶體中進行計算,在該資料上的其他 action 操作將直接使用記憶體中的資料,這樣會讓以后的 action 操作計算速度加快(通常運行速度會加速 10 倍),

我們可以使用 persist方法或 cache方法進行持久化persist方法可以持久化一個RDD在記憶體或磁盤中,支持設定持久化級別StorageLevelcache方法僅快取到記憶體,本質上是persist(MEMORY_ONLY)的別名,

持久化根據是否使用記憶體,是否在記憶體不足時將RDD丟棄到磁盤,是否以特定于JAVA的序列化格式將資料保留在記憶體中,以及是否在多個節點上復制RDD磁區等情況,又可以劃分為11個持久化存盤等級,這11等級在pyspark.storagelevel.StorageLevel類中定義,

StorageLevel型別

型別描述

對應的useDisk, useMemory, deserialized, off_heap, replication

MEMORY_ONLY

(默認級別)RDDJAVA物件的形式保存到JVM記憶體,如果分片太大,記憶體快取不下,就不快取

StorageLevel(False, True, False, False, 1)

MEMORY_ONLY_2

(默認級別)RDDJAVA物件的形式保存到JVM記憶體,如果分片太大,記憶體快取不下,就不快取,將磁區復制到兩個集群節點上

StorageLevel(False, True, False, False, 2)

MEMORY_ONLY_SER

RDD以序列化的JAVA物件形式保存到記憶體

StorageLevel(False, True, False, False, 1)

MEMORY_ONLY_SER_2

RDD以序列化的JAVA物件形式保存到記憶體,將磁區復制到兩個集群節點上

StorageLevel(False, True, False, False, 2)

DISK_ONLY

RDD持久化到硬碟

StorageLevel(True, False, False, False, 1)

DISK_ONLY_2

RDD持久化到硬碟,將磁區復制到兩個集群節點上

StorageLevel(True, False, False, False, 2)

MEMORY_AND_DISK

RDD資料集以JAVA物件的形式保存到JVM記憶體中,如果分片太大不能保存到記憶體中,則保存到磁盤上,下次用時重新從磁盤讀取

StorageLevel(True, True, False, False, 1)

MEMORY_AND_DISK_2

RDD資料集以JAVA物件的形式保存到JVM記憶體中,如果分片太大不能保存到記憶體中,則保存到磁盤上,下次用時重新從磁盤讀取,并將磁區復制到兩個集群節點上

StorageLevel(True, True, False, False, 2)

MEMORY_AND_DISK_SER

MEMORY_ONLY_SER類似,但當分片太大,不能保存到記憶體中,會將其保存到磁盤中

StorageLevel(True, True, False, False, 1)

MEMORY_AND_DISK_SER_2

MEMORY_ONLY_SER類似,但當分片太大,不能保存到記憶體中,會將其保存到磁盤中,將磁區復制到兩個集群節點上

StorageLevel(True, True, False, False, 2)

OFF_HEAP

是否利用java unsafe API實作的記憶體管理,RDD實際被保存到Tachyon

StorageLevel(True, True, True, False, 1)

注意:表中useDisk使用磁盤, useMemory使用記憶體, deserialized是否以特定于JAVA的序列化格式將資料保留在記憶體中,off_heap是否利用java unsafe API實作的記憶體管理replication備份的節點數,

RDD的持久化可以使用persist方法和cache方法,cache方法只能快取在記憶體中, persist方法可以快取在磁盤上或者記憶體中,is_cached屬性可以查看當前RDD的持久化狀態,或者使用getStorageLevel方法獲取當前RDD的持久化狀態, unpersist方法可以解除RDD的持久化,示例代碼如下,

>>> from pyspark.storagelevel import StorageLevel  # 必須先引入StorageLevel這個類
>>> rdd = sc.parallelize(["b", "a", "c"])
>>> rdd.persist(StorageLevel.MEMORY_ONLY)  # 使用persist方法將RDD持久化到記憶體中
ParallelCollectionRDD[27] at parallelize at PythonRDD.scala:194 
>>> rdd.is_cached  # 查看RDD的持久化狀態
True
>>> rdd.unpersist() # 解除RDD的持久化
ParallelCollectionRDD[27] at parallelize at PythonRDD.scala:194
>>> rdd.is_cached  # 查看RDD的持久化狀態
False
>>> rdd.persist(StorageLevel.DISK_ONLY) )   # 使用persist方法將RDD持久化到磁盤上
ParallelCollectionRDD[27] at parallelize at PythonRDD.scala:194

本期的RDD算子就介紹完了,有的同學肯定會有所疑問,如何操作在日常的開發中,其實這個就我們學Python的字串、元組、串列、集合、函式、還有一系列的語法差不多

打好基礎才可以做好后面的作業

每文一語

在對的時間遇見對的你,所言之語皆是春風化雨...........

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/271551.html

標籤:其他

上一篇:[STL]容器小結+函式物件+謂詞+內建函式物件+函式物件配接器+演算法(匯總)

下一篇:從零開始的編程學習

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more