1.spark中的RDD
RDD(Resilient Distributed Dataset)叫做分布式資料集,是spark中最基本的資料抽象,它代表一個不可變,可磁區,里面的元素可以并行計算的集合
RDD被表示為物件,通過物件上的方法呼叫來對RDD進行轉換,經過一系列的transformations定義RDD之后,就可以呼叫actions觸發RDD的計算,action可以是向應用程式回傳結果(count, collect等),或者是向存盤系統保存資料(saveAsTextFile等),在Spark中,只有遇到action,才會執行RDD的計算(即延遲計算),這樣在運行時可以通過管道的方式傳輸多個轉換,
2.coalesce和repartition的區別
coalesce重新磁區,可以選擇是否進行shuffle程序,由引數shuffle: Boolean = false/true決定,
repartition實際上是呼叫的coalesce,默認是進行shuffle的,
減少磁區允許不進行shuffle程序,但是增大磁區需要,
所以coalesce可以在不進行shuffle的情況下減少磁區,增大磁區需要指定第二個引數為true
減少磁區的應用場景:例如通過filter之后,有些磁區資料量比較少,通過減少磁區,防止資料傾斜
增大磁區的應用場景:磁區內資料量太大,通過增加磁區提高并行度
3.spark中的寬窄依賴
窄依賴
父RDD和子RDD partition之間的關系是一對一的,或者父RDD一個partition只對應一個子RDD的partition情況下的父RDD和子RDD partition關系是多對一的,不會有shuffle的產生,父RDD的一個磁區去到子RDD的一個磁區,
寬依賴
RDD與子RDD partition之間的關系是一對多,會有shuffle的產生,父RDD的一個磁區的資料去到子RDD的不同磁區里面,
4.spark中如何劃分stage
Spark任務會根據RDD之間的依賴關系,形成一個DAG有向無環圖,DAG會提交給DAGScheduler,DAGScheduler會把DAG劃分相互依賴的多個stage,劃分依據就是寬窄依賴,遇到寬依賴就劃分stage,每個stage包含一個或多個task,然后將這些task以taskSet的形式提交給TaskScheduler運行,stage是由一組并行的task組成
注意:
park程式中可以因為不同的action觸發眾多的job,一個程式中可以有很多的job,每一個job是由一個或者多個stage構成的,后面的stage依賴于前面的stage,也就是說只有前面依賴的stage計算完畢后,后面的stage才會運行;
stage 的劃分標準就是寬依賴:何時產生寬依賴就會產生一個新的stage,例如reduceByKey,groupByKey,join的算子,會導致寬依賴的產生;
5.DAG
DAG,有向無環圖,說白了,就是一個由頂點和有方向性的邊構成的圖中,從任意一個頂點出發,并且不會回到原點,它為每個spark job計算出有多少個stage任務階段,通常根據shuffle來劃分stage,如reduceByKey,groupByKey等涉及到shuffle的transformation就會產生新的stage ,然后將每個stage劃分為具體的一組任務,以TaskSets的形式提交給底層的任務調度模塊來執行,其中不同stage之前的RDD為寬依賴關系,TaskScheduler任務調度模塊負責具體啟動任務,監控和匯報任務運行情況,
6.Job的生成
在撰寫的一個程式中只要有一個行動算子的出現例如:collect ,就會生成一個job,然后向DAGScheduler提交job,如果driver程式后面還有別的action,那么其他action也會對應生成相應的job,所以,driver端有多少action就會提交多少job,這可能就是為什么spark將driver程式稱為application而不是job 的原因,每一個job可能會包含一個或者多個stage,最后一個stage生成result,在提交job 的程序中,DAGScheduler會首先從后往前劃分stage,劃分的標準就是寬依賴,一旦遇到寬依賴就劃分,然后先提交沒有父階段的stage,并在提交程序中,計算該stage的task數目以及型別,并提交具體的task,在這些無父階段的stage提交完之后,依賴該stage 的stage才會提交
7.RDD快取
Spark可以使用 persist 和 cache 方法將任意 RDD 快取到記憶體、磁盤檔案系統中,快取是容錯的,如果一個 RDD 分片丟失,可以通過構建它的 transformation自動重構,被快取的 RDD 被使用的時,存取速度會被大大加速,一般的executor記憶體60%做 cache, 剩下的40%做task,
Spark中,RDD類可以使用cache() 和 persist() 方法來快取,cache()是persist()的特例,將該RDD快取到記憶體中,而persist可以指定一個StorageLevel,StorageLevel的串列可以在StorageLevel 伴生單例物件中找到,
8.spark的有幾種部署模式
8.1.本地模式
Spark不一定非要跑在hadoop集群,可以在本地,起多個執行緒的方式來指定,將Spark應用以多執行緒的方式直接運行在本地,一般都是為了方便除錯,本地模式分三類
local:只啟動一個executor
local[k]:啟動k個executor
local:啟動跟cpu數目相同的 executor
8.2standalone模式
分布式部署集群, 自帶完整的服務,資源管理和任務監控是Spark自己監控,這個模式也是其他模式的基礎
8.3Spark on yarn模式
分布式部署集群,資源和任務監控交給yarn管理,但是目前僅支持粗粒度資源分配方式,包含cluster和client運行模式,cluster適合生產,driver運行在集群子節點,具有容錯功能,client適合除錯,dirver運行在客戶端
9.spark中worker 的主要作業
管理當前節點記憶體,CPU的使用情況,接受master發送過來的資源指令,通過executorRunner啟動程式分配任務,worker就類似于包工頭,管理分配新行程,做計算的服務,相當于process服務,需要注意的是
-
worker會不會匯報當前資訊給master?worker心跳給master主要只有workid,不會以心跳的方式發送資源資訊給master,這樣master就知道worker是否存活,只有故障的時候才會發送資源資訊;
-
worker不會運行代碼,具體運行的是executor,可以運行具體application斜的業務邏輯代碼,操作代碼的節點,不會去運行代碼,
10.hadoop和spark的shuffle相同和差異
10.1 同
都是將 mapper(Spark 里是 ShuffleMapTask)的輸出進行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一個 stage 里的 ShuffleMapTask,也可能是 ResultTask),Reducer 以記憶體作緩沖區,邊 shuffle 邊 aggregate 資料,等到資料 aggregate 好以后進行 reduce() (Spark 里可能是后續的一系列操作),
10.2 異
-
Hadoop的有一個Map完成,Reduce便可以去拉取資料了,不必等到所有Map任務完成,而Spark的必須等到父stage完成,也就是父stage的map操作全部完成才能去拉取資料,
-
Hadoop的Reduce要等到拉取完全部資料,才將資料傳入reduce函式進行聚合,而spark是一邊拉取一邊聚合,
11.spark組件
-
master:管理集群和節點,不參與計算,
-
worker:計算節點,行程本身不參與計算,和master匯報,
-
Driver:運行程式的main方法,創建spark context物件,
-
spark context:控制整個application的生命周期,包括dagsheduler和task scheduler等組件,
-
client:用戶提交程式的入口,
12.spark作業機制
spark-submit提交代碼,執行new SparkContext(),在 SparkContext 里構造DAGScheduler和TaskScheduler,- TaskScheduler 會通過后臺的一個行程,連接 Master,向 Master 注冊 Application,
- Master 接收到 Application 請求后,會使用相應的資源調度演算法,在 Worker 上為這個 Application 啟動多個 Executer,
- Executor 啟動后,會自己反向注冊到 TaskScheduler 中, 所有 Executor 都注冊到 Driver 上之后,SparkContext 結束初始化,接下來往下執行我們自己的代碼,
- 每執行到一個 Action,就會創建一個 Job,Job 會提交給 DAGScheduler,
- DAGScheduler 會將 Job劃分為多個 stage,然后每個 stage 創建一個 TaskSet,
- TaskScheduler 會把每一個 TaskSet 里的 Task,提交到 Executor 上執行,
- Executor 上有執行緒池,每接收到一個 Task,就用 TaskRunner 封裝,然后從執行緒池里取出一個執行緒執行這個 task,(TaskRunner 將我們撰寫的代碼,拷貝,反序列化,執行 Task,每個 Task 執行 RDD 里的一個 partition)
13.collect是什么
driver通過collect把集群中各個節點的內容收集過來匯總成結果,collect回傳結果是Array型別的,collect把各個節點上的資料抓過來,抓過來資料是Array型,collect對Array抓過來的結果進行合并,合并后Array中只有一個元素,是tuple型別(KV型別的)的,
14.map與flatMap的區別
map:對RDD每個元素轉換,檔案中的每一行資料回傳一個陣列物件
flatMap:對RDD每個元素轉換,然后再扁平化將所有的物件合并為一個物件,檔案中的所有行資料僅回傳一個陣列物件,會拋棄值為null的值
15.Spark 中算子的使用
在我們的開發程序中,能避免則盡可能避免使用 reduceByKey、join、distinct、repartition 等會進行 shuffle 的算子,盡量使用 map 類的非 shuffle 算子,這樣的話,沒有 shuffle 操作或者僅有較少 shuffle 操作的 Spark 作業,可以大大減少性能開銷,
16.Spark 優點
-
更高的性能,因為資料被加載到集群主機的分布式記憶體中,資料可以被快速的轉換迭代,并快取用以后續的頻繁訪問需求,在資料全部加載到記憶體的情況下,Spark可以比Hadoop快100倍,在記憶體不夠存放所有資料的情況下快hadoop10倍,
-
通過建立在Java,Scala,Python,SQL(應對互動式查詢)的標準API以方便各行各業使用,同時還含有大量開箱即用的機器學習庫,
-
與現有Hadoop 1和2.x(YARN)生態兼容,因此機構可以無縫遷移,
-
方便下載和安裝,方便的shell(REPL: Read-Eval-Print-Loop)可以對API進行互動式的學習,
-
借助高等級的架構提高生產力,從而可以講精力放到計算上,
17.MapReduce和Spark的差異
原理上:
-
MapReduce:基于磁盤的大資料批量處理系統
-
Spark:基于RDD(彈性分布式資料集)資料處理,顯示將RDD資料存盤到磁盤和記憶體中,
模型上:
1.MapReduce可以處理超大規模的資料,適合日志分析挖掘等較少的迭代的長任務需求,結合了資料的分布式的計算,
- Spark:適合資料的挖掘,機器學習等多輪迭代式計算任務,
總結:
1)基于記憶體計算,減少低效的磁盤互動;
2)高效的調度演算法,基于DAG;
3)容錯機制Linage,精華部分就是DAG和Lingae
18.spark中的資料傾斜
1.什么是資料傾斜:
因為task是 并發執行的,所以有的task執行快,有的執行慢,或者等很長時間給你個提示說沒記憶體了,導致的執行失敗
2.原因:
資料問題
-
1、key本身分布不均衡(包括大量的key為空)
-
2、key的設定不合理
spark使用的問題
-
1、shuffle時的并發度不夠
-
2、計算方式有誤
3.造成的后果
-
1、spark中的stage的執行時間受限于最后那個執行完成的task,因此運行緩慢的任務會拖垮整個程式的運行速度(分布式程式運行的速度是由最慢的那個task決定的),
-
2、過多的資料在同一個task中運行,將會把executor撐爆,
4.如何避免
發現資料傾斜的時候,不要急于提高executor的資源,修改引數或是修改程式,首先要檢查資料本身,是否存在例外資料,找出例外的key
資料問題
如果任務長時間卡在最后最后1個(幾個)任務,首先要對key進行抽樣分析,判斷是哪些key造成的, 選取key,對資料進行抽樣,統計出現的次數,根據出現次數大小排序取出前幾個,經過分析,傾斜的資料主要有以下三種情況:
-
1、null(空值)或是一些無意義的資訊()之類的,大多是這個原因引起,
-
2、無效資料,大量重復的測驗資料或是對結果影響不大的有效資料,
-
3、有效資料,業務導致的正常資料分布,
-
第1,2種情況,直接對資料進行過濾即可(因為該資料對當前業務不會產生影響),
-
第3種情況則需要進行一些特殊操作,常見的有以下幾種做法
-
(1) 隔離執行,將例外的key過濾出來單獨處理,最后與正常資料的處理結果進行union操作,
-
(2) 對key先添加隨機值,進行操作后,去掉隨機值,再進行一次操作,
-
(3) 使用reduceByKey 代替 groupByKey(reduceByKey用于對每個key對應的多個value進行merge操作,最重要的是它能夠在本地先進行merge操作,并且merge操作可以通過函式自定義.)
-
(4) 使用map join,
Spark操作問題
-
- 提高shuffle并行度
-
-
- dataFrame和sparkSql可以設定spark.sql.shuffle.partitions引數控制shuffle的并發度,默認為200,
-
rdd操作可以設定spark.default.parallelism控制并發度,默認引數由不同的Cluster Manager控制,
-
局限性: 只是讓每個task執行更少的不同的key,無法解決個別key特別大的情況造成的傾斜,如果某些key的大小非常大,即使一個task單獨執行它,也會受到資料傾斜的困擾,
-
使用map join 代替reduce join
-
19.Spark streamning特點
Spark Streaming 是基于 RDD 的,因此需要將一小段時間內的,比如1秒內的資料,收集起來,作為一個 RDD,然后再針對這個 batch 的資料進行處理,Spark Streaming 是無法動態調整并行度,流式處理完的資料,可以立即進行各種map、reduce轉換操作,可以立即使用sql進行查詢,甚至可以立即使用machine learning或者圖計算演算法進行處理,
注意:
-
只要一個StreamingContext啟動之后,就不能再往其中添加任何計算邏輯了,
-
一個StreamingContext停止之后,是肯定不能重啟的,呼叫
stop()之后,不能再呼叫start() -
一個JVM同時只能有一個StreamingContext啟動(和SparkContext一樣),在應用程式中,不能創建兩個StreamingContext,
-
呼叫
stop()方法時,會同時停止內部的SparkContext,如果希望后面繼續使用SparkContext創建其他型別的Context,比如SQLContext,可以用stop(false), -
一個SparkContext可以創建多個StreamingContext,只要上一個先用
stop(false)停止,再創建下一個即可,
20.Spark中的血統
RDD是彈性分布式資料集,是Spark中最基本的資料抽象,代表一個不可變、可磁區、里面的元素可并行計算 的集合,
它提供了一個抽象的資料模型,將具體的應用邏輯表達為一系列轉換操作(函式),另外不同RDD之間的轉換操作之間還可以形成依賴關系,進而實作管道化,從而避免了中間結果的存盤,大大降低了資料復制、磁盤IO和序列化開銷,并且還提供了更多的API(map/reduec/filter/groupBy…)
RDD 的 lineage 記錄的是粗顆粒度的特定資料轉換(transformation)操作(filter, map, join etc.)行為,當這個 RDD 的部分磁區資料丟失時,它可以通過 lineage 獲取足夠的資訊來重新運算和恢復丟失的資料磁區,這種粗顆粒的資料模型,限制了 Spark 的運用場合,但同時相比細顆粒度的資料模型,也帶來了性能的提升,
21.RDD共享變數
在應用開發中,一個函式被傳遞給Spark操作(例如map和reduce),在一個遠程集群上運行,它實際上操作的是這個函式用到的所有變數的獨立拷貝,這些變數會被拷貝到每一臺機器,通常看來,在任務之間中,讀寫共享變數顯然不夠高效,然而,Spark還是為兩種常見的使用模式,提供了兩種有限的共享變數:廣播變數和累加器
21.1廣播變數(Broadcast Variables)
-
廣播變數快取到各個節點的記憶體中,而不是每個 Task
-
廣播變數被創建后,能在集群中運行的任何函式呼叫
-
廣播變數是只讀的,不能在被廣播后修改
-
對于大資料集的廣播, Spark 嘗試使用高效的廣播演算法來降低通信成本
-
val broadcastVar = sc.broadcast(Array(1, 2, 3))方法引數中是要廣播的變數
21.2 累加器
累加器只支持加法操作,可以高效地并行,用于實作計數器和變數求和,Spark 原生支持數值型別和標準可變集合的計數器,但用戶可以添加新的型別,只有驅動程式才能獲取累加器的值
22.Spark 調優
引數調優
-
num-executors:設定Spark作業總共要用多少個Executor行程來執行
-
executor-memory:設定每個Executor行程的記憶體
-
executor-cores:設定每個Executor行程的CPU core數量
-
driver-memory:設定Driver行程的記憶體
-
spark.default.parallelism:設定每個stage的默認task數量
開發調優
-
避免創建重復的RDD
-
盡可能復用同一個RDD
-
對多次使用的RDD進行持久化
-
盡量避免使用shuffle類算子
-
使用map-side預聚合的shuffle操作
-
使用高性能的算子
23.Spark實作TopN
-
方法1:
(1)按照key對資料進行聚合(groupByKey)
(2)將value轉換為陣列,利用scala的sortBy或者sortWith進行排序(mapValues)
注意:當資料量太大時,會導致OOM
-
方法2:
(1)取出所有的key
(2)對key進行迭代,每次取出一個key利用spark的排序算子進行排序
-
方法3:
(1)自定義磁區器,按照key進行磁區,使不同的key進到不同的磁區
(2)對每個磁區運用spark的排序算子進行排序
24.RDD有幾種操作型別
-
transformation,rdd由一種轉為另一種rdd
-
action
-
cronroller,crontroller是控制算子,cache,persist,對性能和效率的有很好的支持三種型別
25.RDD的checkpoint
RDD的快取能夠在第一次計算完成后,將計算結果保存到記憶體、本地檔案系統中,通過快取,Spark避免了RDD上的重復計算,能夠極大地提升計算速度,但是,如果快取丟失了,則需要重新計算,如果計算特別復雜或者計算耗時特別多,那么快取丟失對于整個Job的效率有很大影響,為了避免快取丟失重新計算帶來的開銷,Spark又引入了檢查點(checkpoint)機制,
快取是在計算結束后,直接將計算結果通過用戶定義的存盤級別(存盤級別定義了快取存盤的介質,現在支持記憶體、本地檔案系統和Tachyon)寫入不同的介質,而檢查點不同,它是在計算完成后,重新建立一個Job來計算,為了避免重復計算,推薦 先將RDD快取,這樣就能保證檢查點的操作可以快速完成,
26.RDD的彈性表現在哪?
-
自動的進行記憶體和磁盤的存盤切換;
-
基于Lingage的高效容錯;
-
task如果失敗會自動進行特定次數的重試;
-
stage如果失敗會自動進行特定次數的重試,而且只會計算失敗的分片;
-
checkpoint和persist,資料計算之后持久化快取
-
資料調度彈性,DAG TASK調度和資源無關
-
資料分片的高度彈性,a.分片很多碎片可以合并成大的,b.par
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/293549.html
標籤:其他
上一篇:阿里云服務器持久記憶體型re6p實體采用Intel傲騰持久記憶體
下一篇:Spark之RDD算子
