1.從Scala中理解閉包
閉包是一個函式,回傳值依賴于宣告在函式外部的一個或多個變數,閉包通常來講可以簡單的認為是可以訪問一個函式里面區域變數的另外一個函式,
如下面這段匿名的函式:
val multiplier = (i:Int) => i * 10
函式體內有一個變數 i,它作為函式的一個引數,如下面的另一段代碼:
val multiplier = (i:Int) => i * factor
在 multiplier 中有兩個變數:i 和 factor,其中的一個 i 是函式的形式引數,在 multiplier 函式被呼叫時,i 被賦予一個新的值,然而,factor不是形式引數,而是自由變數,考慮下面代碼:
var factor = 3 val multiplier = (i:Int) => i * factor
這里我們引入一個自由變數 factor,這個變數定義在函式外面,
這樣定義的函式變數 multiplier 成為一個"閉包",因為它參考到函式外面定義的變數,定義這個函式的程序是將這個自由變數捕獲而構成一個封閉的函式
完整的例子:
object Test {
def main(args: Array[String]) {
println( "muliplier(1) value = "https://www.cnblogs.com/hulichao/p/+ multiplier(1) )
println("muliplier(2) value = "https://www.cnblogs.com/hulichao/p/+ multiplier(2) )
}
var factor = 3
val multiplier = (i:Int) => i * factor
}
2.Spark中的閉包理解
先來看下面一段代碼:
val data=https://www.cnblogs.com/hulichao/p/Array(1, 2, 3, 4, 5)
var counter = 0
var rdd = sc.parallelize(data)
// ???? 這樣做會怎么樣
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
首先肯定的是上面輸出的結果是0,park將RDD操作的處理分解為tasks,每個task由Executor執行,在執行之前,Spark會計算task的閉包,閉包是Executor在RDD上進行計算的時候必須可見的那些變數和方法(在這種情況下是foreach()),閉包會被序列化并發送給每個Executor,但是發送給Executor的是副本,所以在Driver上輸出的依然是counter本身,如果想對全域的進行更新,用累加器,在spark-streaming里面使用updateStateByKey來更新公共的狀態,
另外在Spark中的閉包還有別的作用,
1.清除Driver發送到Executor上的無用的全域變數等,只復制有用的變數資訊給Executor
2.保證發送到Executor上的是序列化以后的資料
比如在使用DataSet時候 case class的定義必須在類下,而不能是方法內,即使語法上沒問題,如果使用過json4s來序列化,implicit val formats = DefaultFormats 的引入最好放在類下,否則要單獨將這個format序列化,即使你沒有使用到它別的東西,
3.總結
閉包在Spark的整個生命周期中處處可見,就比如從Driver上拷貝的所有資料都需要序列化 + 閉包的方式到Executor上的,
吳邪,小三爺,混跡于后臺,大資料,人工智能領域的小菜鳥,
更多請關注

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/250087.html
標籤:其他
