前言
本文隸屬于專欄《1000個問題搞定大資料技術體系》,該專欄為筆者原創,參考請注明來源,不足和錯誤之處請在評論區幫忙指出,謝謝!
本專欄目錄結構和參考文獻請見1000個問題搞定大資料技術體系
目錄
Spark RDD 論文詳解(一)摘要和介紹
Spark RDD 論文詳解(二)RDDs
Spark RDD 論文詳解(三)Spark 編程介面
Spark RDD 論文詳解(四)表達 RDDs
Spark RDD 論文詳解(五)實作
Spark RDD 論文詳解(六)評估
Spark RDD 論文詳解(七)討論
Spark RDD 論文詳解(八)相關作業和結尾
思維導圖
正文
3、Spark 編程介面
原文翻譯
Spark 使用 Scala 語言實作了抽象的 RDD,Scala 是建立在 java VM 上的靜態型別函式式編程語言,
我們選擇 Scala 是因為它結合了簡潔(很方便進行互動式使用)與高效(由于它的靜態型別),
然而,并不是說 RDD 的抽象需要函式式語言來實作,
開發員需要寫 Driver 程式來使用 Spark,就像上面圖二展示的,這個 Driver 程式連接了集群中的 Workers,
Driver 端程式定義了一系列的 RDDs 并且呼叫了 RDD 的 action 操作,
Driver 端的 Spark 程式同時也會跟蹤 RDDs 之間的的血緣關系,
Workers 是可以將 RDD 磁區資料存盤在記憶體中的長期存活的行程,
在 2.2.1 小節中的日志挖掘例子中,我們提到,用戶提供給 RDD 操作比如 map 以引數作為這個操作的閉包(說白了就是函式),
Scala 將這些函式看作一個 java 物件,這些物件是可以序列化的,并且可以通過網路傳輸傳輸到其他的機器節點上的,
Scala 將函式中的變數看作一個物件中的變數,
比如,我們可以寫一段這樣的代碼:
var x = 5; rdd.map(_ + x)
來達到給這個 RDD 每一個元素加上 5 的目的,
RDDs 是被一元素型別引數化的靜態型別物件,比如,RDD[Int] 表示一個型別為整數的 RDD,
然而,我們很多例子中的 RDD 都會省去這個型別,這個是因為 Scala 支持型別推斷,
雖然我們用 Scala 實作 RDD 的方法很簡單,但是我們需要處理用反射實作的閉包物件相關的作業,我們還需要做很多的作業使得 Spark 可以用 Scala 的解釋器,這個我們在 5.2 小節中會討論到,
盡管如此,我們是不需要修改 Scala 的編譯器的,
3.1 Spark 中 RDD 的操作
原文翻譯
表二中列舉了 Spark 中 RDD 常用的 transformations 和 actions 操作,且描述了每一個方法的簽名以及型別,
我們需要記住 transformations 是用來定義一個新的 RDD 的 lazy 操作,而 actions 是真正觸發一個能回傳結果或者將結果寫到檔案系統中的計算,
需要注意的是,一些操作比如 join 只適合用于 key-value 型別的 RDDs,
我們取的函式的名稱和 scala 或者其他函式式編程語言的函式名是一致的,
比如,map 是一個 one-to-one 的映射操作,而 flatMap 的每一個輸入值會對應一個或者更多的輸出值(有點像 MapReduce 中的 map)
除了這些操作,用戶可以通過 persist 操作來請求快取 RDD,
另外,用戶可以獲取 RDD 的磁區順序,這個命令由 Partitioner 類表示,并且可以根據它來對另一個資料集進行磁區,
像 groupByKey、reduceByKey 以及 sort 等操作都是經過了 hash 或者 range 磁區后的 RDD,

表二:Spark 中 RDD 常用的 transformations 和 actions 操作,Seq[T] 表示元素型別為 T 的一個序列,
3.2 舉例應用
我們用兩個迭代式的應用:線性回歸和 PageRank 來補充 2.2.1 提到的資料挖掘的例子,
稍后也會展示下如何控制 RDD 的磁區以達到提升性能的目的,
3.2.1 線性回歸
原文翻譯
很多的機器學習演算法一般都是迭代式的計算,因為它們需要跑迭代的優化程式(比如梯度下降)來達到最大化功能,
他們將資料存放在記憶體中以達到很快的速度,
作為一個例子,下面的程式實作了線性回歸,一個能找到最佳區分兩種點集(垃圾郵件以及非垃圾郵件)的超平面 w 的常用的分類演算法,
這個演算法用了梯度下降的方法:一個隨機的值作為 w 的初始值,每次迭代都會將含有 w 的方法應用到每一個資料點然后累加得到梯度值,然后將 w 往改善結果的方向移動,
val points = spark.textFile(...).map.(parsePoint).persist()
var w = // 隨機的初始向量
for (i <- 1 to ITERATIONS) {
val gradient = points.map{ p => p.x * (1/(1+exp (-p.y*(w dot p.x)))-1) * p.y }.reduce((a, b)=> a+b)
w -= gradient
}
一開始我們定義一個叫 points 的 RDD,這個 RDD 從一個文本檔案中經過 map 將每一行轉換為 Point 物件,
然后我們重復對 points 進行 map 和 reduce 操作計算出每一步的梯度值,
在迭代之間我們將 points 存放在記憶體中可以使得性能提高 20 倍,我們將會在 6.1 節中討論,
線性回歸簡介
線性回歸( Linear Regression )是利用線性回歸方程(函式)對一個或多個自變數和因變數之間關系進行建模的一種回歸分析方法,
線性回歸問題屬于監督學習范疇,要求訓練資料集中的資料具有明確的目標,
若回歸分析中只包含一個自變數和一個因變數,且二者的關系可用一條直線近似表示,那么這種回歸分析被稱為一元線性回歸分析,或一元線性擬合,
如果回歸分析中包含兩個或兩個以上的自變數,且因變數和自變數之間是線性關系,則稱為多元線性回歸分析,
通常初學者對于線性回歸較難理解,若換個角度來解釋,其實線性回歸問題就是線性擬合問題,
以一元線性回歸為例,就是研究如何確定一條直線來近似地表示描述訓練集中所有資料的空間分布,
類似地,對于二元回歸問題,采用一個平面來擬合,若多元回歸問題,采用超平面來擬合,
線性擬合中用于擬合的直線、平面、超平面都對應著線性方程,因此線性回歸問題轉換為求解相應線性方程(預測函式)的問題,
用戶可以通過求解出的線性方程對資料的目標值進行預測,
實踐
參考我的博客——使用 Spark MLlib 實作線性回歸
3.2.2 PageRank
原文翻譯
在 PageRank 中資料共享更加復雜,
如果一個檔案參考另一個檔案,那被參考的檔案的排名值(rank)需要加上參考的檔案發送過來的貢獻值,當然這個程序是個迭代的程序,
在每一次迭代中,每一個檔案都會發送 r/n 的貢獻值給它的鄰居,其中 r 表示這個檔案的排名值,n 表示這個檔案的鄰居數量,
然后更新檔案的排名值為,這個運算式值表示這個檔案收到的貢獻值,N 表示所有的檔案的數量,我們可以用如下的 Spark 代碼來表達 PageRank:
// 加載圖作為 (URL, outlinks) 對的RDD
val links = spark.textFile(...).map(...).persist()
var ranks = // (URL, rank) 對的RDD
for (i<- 1 to ITERATIONS) {
// 構建 (targetURL, float) 對的RDD
// 伴隨著每個頁面的貢獻
val contribs = links.join(ranks).flatMap {
(ur1, (links, rank))
=> links.map(dest => (dest, rank / links.size))
}
// 按 URL 匯總貢獻并獲得新排名
ranks = contribs.reduceByKey((x, y) => x + y).mapValues(sum => a / N + (1 - a) * sum)
}
其中 links 表示(URL , outlinks)鍵值對,
這個程式的 RDD 的血緣關系圖如圖三,
在每一次迭代中我們都是根據上一次迭代的 contribs 和 ranks 以及原始不變的 links 資料集來創建一個新的 ranks 資料集,
隨著迭代次數的變多這張圖會變的越長,這個是這個圖比較有意思的特點,
如果這個 job 的迭代次數很多的話,那么備份一些版本的 ranks 來達到減少從錯誤中恢復出來的時間是很有必要的,用戶可以呼叫標記為 RELIABLE 的 persist 函式來達到這個目的,
需要注意的是,links 是不需要備份的,因為它的磁區資料可以快速的從重新計算輸入檔案中對應的資料塊而得到,這個資料集一般會比 ranks 資料集大上很多倍,因為每一個檔案會有很多的連接但只會有一個排名值,所以利用 RDD 的血緣關系來恢復資料肯定比 checkpoint 記憶體中的資料快很多(因為資料量太大),
最后,我們可以控制 RDDs 的磁區方式來優化 PageRank 中的節點通訊,
如果我們事先為 links 指定一個磁區方式(比如,根據 link 的 url 來 hash 磁區,就是將相同的 url 發送到同一個節點中),然后我們對 ranks 進行相同的磁區方式,這樣就可以保證 links 和 ranks 之間的 join 不需要機器節點之間的通訊(因為相同的 url 都在同一個機器節點了,那么相對應的 rank 和 link 肯定也是在同一個機器節點了),
我們也可以自定義磁區器來實作將一組頁面 url 放到一起(比如按照 url 的 domain 進行磁區),
以上兩種優化方式都可以通過在定義 links 的時候呼叫 partitionBy 來實作:
links = spark.textFile(...).map(...).partitionBy(myPartFunc).persist()
在呼叫了 partitionBy 后,links 和 ranks 之間的 join 操作會自動的在 link 所在的機器進行每一個 URL 的貢獻值的聚合計算,然后在相同的機器計算新的排名值,然后計算出來的新的 ranks 在相同的機器和 links 進行 join,
這種在迭代之間進行資料一致磁區是像 Pregel 這種框架中的主要的優化計算方式,
RDDs 使得用戶可以直接自己來實作這種優化機制,
PageRank 演算法簡介
PageRank 演算法即網頁排名演算法,是 Google 創始人拉里佩奇和謝爾蓋?布林于 1997 年構建早期的搜索系統原型時提出的鏈接分析演算法,
自從 Google 在商業上獲得巨大成功后,該演算法引起了研究者們的廣泛關注,目前很多重要的鏈接分析演算法都是在 PageRank 演算法基礎上衍生出來的,
PageRank 演算法是 Google 用來標識網頁等級的重要依據,是 Google 衡量一個網站的好壞的唯一標準,
對網頁進行排名需要量化的依據,因此 PageRank 演算法對每一個網頁進行計算后會得到一個在 0 到 10 范圍內的值,即該網頁的 PageRank 值,簡稱 PR 值,
PR 值越高說明網頁越受歡迎,越重要,
PageRank演算法的核心步驟如下:
- 第 1 步,初始化
PageRank 演算法基于兩個假設:數量假設和質量假設,
首先通過鏈接關系構建 Web 圖(網路中每頁面對應 Web 圖中的一個頂點,若網頁 A 中包含一條指向 B 的鏈接,則 Web 圖中存在一條由頂點 A 指向頂點 B 的邊);
然后為 Web 圖中的每個頂點設定初始的 PR 值(通常設定每個頂點的初始 PR 值為 1 / N ,其中 N 為網路中網頁的個數),
- 第 2 步,迭代計算,
首先假設一個用戶在訪問某網頁時,其將跳轉到該網頁上各超鏈接頁面的概率相同,
例如網頁 A 鏈向網頁 B 、 C 、 D ,所以根據假設,用戶從 A 跳轉到 B 、 C 、 D 的概率各為 1 / 3 ,
PageRank 演算法在每一輪迭代計算的程序中,將每個網頁當前的 PR 值平均分配到該網頁指向的超鏈接頁面上,這樣每個網頁便獲得了相應的權值;
再將新權值求和,即得到該網頁的新 PR 值,
當每個網頁的 PR 值都獲得更新后,就完成了一輪迭代,
- 第 3 步,結束,
隨著每一輪的迭代計算,網頁的 PR 值會不斷得到更新,
當迭代達到一定次數,或者每個網頁的 PR 值固定不變,再或者 PR 值收斂至某一范圍內時,演算法停止,
演算法停止時每個網頁的 PR 值就是該網頁最終的 PR 值,
實踐
參考我的博客——使用 Spark GraphX 實作 PageRank 演算法
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/394200.html
標籤:其他
上一篇:spark復習資料
