1.Spark計算依賴記憶體,如果目前只有10g記憶體,但是需要將500G的檔案排序并輸出,需要如何操作?
①、把磁盤上的500G資料分割為100塊(chunks),每份5GB,(注意,要留一些系統空間!)
②、順序將每份5GB資料讀入記憶體,使用quick sort演算法排序,
③、把排序好的資料(也是5GB)存放回磁盤,
④、回圈100次,現在,所有的100個塊都已經各自排序了,(剩下的作業就是如何把它們合并排序!)
⑤、從100個塊中分別讀取5G/100=0.05 G入記憶體(100input buffers),
⑥、執行100路合并,并將合并結果臨時存盤于5g基于記憶體的輸出緩沖區中,當緩沖區寫滿5GB時,寫入硬碟上最終檔案,并清空輸出緩沖區;當100個輸入緩沖區中任何一個處理完畢時,寫入該緩沖區所對應的塊中的下一個0.05 GB,直到全部處理完成,
2.countByValue和countByKey的區別
首先從原始碼角度來看:
// PairRDDFunctions.scala
def countByKey(): Map[K, Long] = self.withScope {
self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
}
// RDD.scala
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {
map(value =https://www.cnblogs.com/hulichao/p/> (value, null)).countByKey()
}
countByValue(RDD.scala)
-
作用在普通的
RDD上 -
其實作程序呼叫了
countByKey
countByKey(PairRDDFunctions.scala)
-
作用在 PairRDD 上
-
對 key 進行計數
-
資料要收到Driver端,結果集大時,不適用
問題:
-
countByKey可以作用在 普通的RDD上嗎 -
countByValue可以作用在PairRDD上嗎
val rdd1: RDD[Int] = sc.makeRDD(1 to 10)
val rdd2: RDD[(Int, Int)] = sc.makeRDD((1 to 10).toList.zipWithIndex)
val result1 = rdd1.countByValue() //可以
val result2 = rdd1.countByKey() //語法錯誤
val result3 = rdd2.countByValue() //可以
val result4 = rdd2.countByKey() //可以
3.兩個rdd join 什么時候有shuffle什么時候沒有shuffle
其中join操作是考驗所有資料庫性能的一項重要指標,對于Spark來說,考驗join的性能就是Shuffle,Shuffle 需要經過磁盤和網路傳輸,Shuffle資料越少性能越好,有時候可以盡量避免程式進行Shuffle ,那么什么情況下有Shuffle ,什么情況下沒有Shuffle 呢
3.1 Broadcast join
broadcast join 比較好理解,除了自己實作外,Spark SQL 已經幫我們默認來實作了,其實就是小表分發到所有Executors,控制引數是:spark.sql.autoBroadcastJoinThreshold 默認大小是10m, 即小于這個閾值即自動使用broadcast join.
3.2 Bucket join
其實rdd方式和table類似,不同的是后者要寫入Bucket表,這里主要講rdd的方式,原理就是,當兩個rdd根據相同磁區方式,預先做好磁區,磁區結果是一致的,這樣就可以進行Bucket join, 另外這種join沒有預先的算子,需要在寫程式時候自己來開發,對于表的這種join可以看一下 位元組跳動在Spark SQL上的核心優化實踐 ,可以看下下面的例子
rdd1、rdd2都是Pair RDD
rdd1、rdd2的資料完全相同
一定有shuffle
rdd1 => 5個磁區
rdd2 => 6個磁區
rdd1 => 5個磁區 => (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0),(1, 0), || (2,0),(1, 0), (2,0)
rdd2 => 5個磁區 => (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0), || (1, 0), (2,0),(1, 0), || (2,0),(1, 0), (2,0)
一定沒有shuffle
rdd1 => 5個磁區 => (1,0), (1,0), (1,0), (1,0), (1,0), || (2,0), (2,0), (2,0), (2,0), (2,0), (2,0), (2,0) || 空 || 空 || 空
rdd2 => 5個磁區 => (1,0), (1,0), (1,0), (1,0), (1,0), || (2,0), (2,0), (2,0), (2,0), (2,0), (2,0), (2,0) || 空 || 空 || 空
這樣所有Shuffle的算子,如果資料提前做好了磁區(partitionBy),很多情況下沒有Shuffle.
除上面兩種方式外,一般就是有Shuffle的join, 關于spark的join原理可以查看:大資料開發-Spark Join原理詳解
4..transform 是不是一定不觸發action
有個算子例外,那就是sortByKey,其底層有個抽樣演算法,水塘抽樣,最后需要根據抽樣的結果,進行RangePartition的,所以從job角度來說會看到兩個job,除了觸發action的本身算子之外,記住下面的
sortByKey → 水塘抽樣→ collect
5.廣播變數是怎么設計的
我們都知道,廣播變數是把資料放到每個excutor上,也都知道廣播變數的資料一定是從driver開始出去的,什么意思呢,如果廣播表放在hive表中,那么它的存盤就是在各個block塊上,也對應多個excutor (不一樣的叫法),首先將資料拉到driver上,然后再進行廣播,廣播時候不是全部廣播,是根據excutor預先用到資料的,首先拿資料,然后通過bt協議進行傳輸,什么是bt協議呢,就是資料在分布式點對點網路上,根據網路距離來去拉對應的資料,下載者也是上傳者,這樣就不同每個task (excutor)都從driver上來拉資料,這樣就減少了壓力,另外在spark1.幾的時候還是task級別,現在是共同的一個鎖,整個excutor上的task共享這份資料,
參考
https://juejin.cn/post/6844903989557854216
https://www.jianshu.com/p/6bf887bf52b2
吳邪,小三爺,混跡于后臺,大資料,人工智能領域的小菜鳥,
更多請關注

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/255092.html
標籤:其他
上一篇:大資料開發-Spark-RDD實操案例-http日志分析
下一篇:mysql5.7安裝教程【轉載】
