五, Spark 核心編程
5.0 一個簡單的分布式計算程式
- 先通過一個簡單🌰了解一下什么是分布式計算
在了解了分布式計算大概是個什么情況后, 我們來學習Spark
Spark計算框架為了能夠進行高并發和高吞吐的資料處理, 封裝了三大資料結構, 用于處理不同的應用場景,分別是:
- RDD: 彈性分布式資料集
- 累加器: 分布式共享只寫變數
- 廣播變數: 分布式共享只讀變數
資料結構: 計算機存盤和組織資料的方式
5.1 RDD (資料和邏輯, 最小的計算單元)
RDD: 代表著不可變的, 可以磁區和并行計算的元素集合;

拿前篇文章中的wordcount為例, Spark中對資料的不同處理(在這里資料被指代為一個一個的RDD)是通過一層又一層的包裝來實作的, 如何包裝? 就是層層遞進的進行傳參;
比如, 我要在IO流中使用包裝類去讀取位元組流, 位元組流也需要檔案物件file的傳參, 所以可以寫成下面的這種形式: (不理解的話可以先看看IO流那篇文章:點我)
//new BufferedInputStream(new FileInputStream(new File(path, 是否是追加操作), 字符集))
File file = new File(path);
FileInputStream fis = new FileInputStream(file);
BufferedInputStream bis = new BufferedStream(fis);
5.1.1 什么是RDD
- RDD(Resilient Distributed DataSet) 叫做彈性分布式資料集, 是Spark底層的分布式存盤的資料結構, 可以說;
- 在代碼中它代表著
彈性的,不可變的,可分布,里面的元素可并行計算的集合;
| 特性 | 解釋 |
|---|---|
| 彈性 | 存盤的彈性(記憶體與磁盤自動切換); 容錯的彈性(數據丟失自動恢復); 計算的彈性(計算出錯重試機制); 分片的彈性(根據需要重新分片) |
| 分布式 | 資料存盤在大資料集群不同節點上 |
| 資料集 | RDD封裝了計算邏輯, 不保存資料 |
| 資料抽象 | RDD是一個抽象類, 需要子類具體實作 |
| 不可變 | RDD 封裝了計算邏輯,是不可以改變的,想要改變,只能產生新的 RDD,在新的 RDD 里面封裝計算邏輯 |
| 可磁區, 并行計算 | RDD 內部的資料集合在邏輯上和物理上被劃分成多個小子集合,這些集合就是磁區, 是并行計算的一個計算單元 |
5.1.2 RDD的五大核心屬性
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
*
* All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
1. A list of partitions (一個磁區串列)
一個磁區串列, 這里表示
一個RDD很多磁區, 每一個磁區內部是包含了該RDD的部分資料, Spark中任務是以Task執行緒的方式運行, 一個磁區就對應一個Task執行緒, 磁區串列是實作分布式并行計算的重要屬性;

用戶可以在創建RDD時指定RDD的磁區個數, 如果沒有指定, 那么就會采用默認值.
- rdd= sparkContext.textFile("/words.txt", 指定partitions)

磁區數的默認值的計算公式如下:
- RDD的磁區數 = max(檔案的block個數, defaultMinPartitions)
- 通過Spark Context讀取HDFS上的檔案來計算磁區數
2. A function for computing each split (作用磁區中的函式)
一個計算每個磁區的函式,這里表示Spark中RDD的計算是以磁區為單位的,每個RDD都會實作compute計算函式以達到這個目的.
3. A list of dependencies on other RDDs (對其他RDD的依賴關系)
一個RDD會依賴于其他多個RDD, 這里涉及到RDD與RDD之間的依賴關系,
Spark 任務的容錯機制就是根據這個特性(血統)而來;
- RDD 是計算模型的封裝,當需求中需要將多個計算模型進行組合時,就需要將多個 RDD 建立依賴關系;
//rdd2依賴于rdd1,而rdd3依賴于rdd2
//rdd6依賴于rdd4、rdd5
val rdd1:RDD[String]=sc.textFile("/words.txt")
val rdd2:RDD[String]=rdd1.flatMap(x=>x.split(" "))
val rdd3:RDD[(String,Int)]=rdd2.Map(x=>(x,1))
val rdd6=rdd4.join(rdd5)
4. Optionally, a Partioner for key-value RDDs (針對k-v的磁區器)
當資料為 KV 型別資料時,可以通過設定磁區器(可選)自定義資料的磁區

5. Optionally, a list of preferred locations to compute each split on (資料本地性)
一個串列,存盤每個Partition的優先位置(可選項),這里涉及到資料的本地性,資料塊位置最優,
- spark任務在調度的時候會優先考慮存有資料的節點開啟計算任務,減少資料的網路傳輸,提升計算效率
5.1.3 通過WordCount示例理解五大屬性
[需求]
HDFS上有一個大小為300M的檔案,通過spark實作檔案單詞統計,最后把結果資料保存到HDFS上
[凝練后的代碼]
sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/out")
[流程]

5.1.4 執行原理
- 從計算的角度來講,資料處理程序中需要計算資源(記憶體 & CPU)和計算模型(邏輯),執行時,需要將計算資源和計算模型進行協調和整合,
- Spark 框架在執行時,先申請資源,然后將應用程式的資料處理邏輯分解成一個一個的計算任務,然后將任務發到已經分配資源的計算節點上, 按照指定的計算模型進行資料計算,最后得到計算結果,
- RDD 是 Spark 框架中用于資料處理的核心模型,接下來我們看看,在 Yarn 環境中,RDD的作業原理:
- 啟動 Yarn 集群環境

- Spark 通過申請資源創建
調度節點和計算節點

- Spark 框架根據需求將計算邏輯按照磁區劃分成不同的任務

- 調度節點將任務根據計算節點狀態發送到對應的計算節點進行計算

從以上流程可以看出 RDD 在整個流程中主要用于
將邏輯進行封裝,并生成 Task 發送給 Executor 節點執行計算
- 參考文章: https://www.cnblogs.com/jimmy888/p/13551699.html
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/438083.html
標籤:其他
