目錄
前言
第一部分:Transformation算子
Value型別
map()映射
mapPartitions()以磁區單位執行Map
mapPartitionsWithIndex()帶磁區號
flatMap()壓平
glom()磁區轉換陣列
groupBy()分組
擴展:復雜版的wordcount
filter()過濾
sample()采樣
distinct()去重
coalesce()重新磁區
reparation()重新磁區(執行Shuffle)
sortBy()排序
pipe()呼叫腳本
雙Value型別互動
union()并集、subtract()差集、intersection()交集
zip()拉鏈
Key-Value型別
partitionBy()按照k重新磁區
reduceByKey()按照k進行聚合v
備注: 正則運算式過濾字串的方法
groupByKey()按照k重新分組
aggregateByKey()按照k進行磁區內和磁區間邏輯
foldByKey()磁區內核磁區間具有相同邏輯的aggregateByKey()
combineByKey()轉換結構后磁區內和磁區間操作
sortByKey()按照k進行排序
mapValues()只對v進行操作
join()將相同k對應的多個v關聯在一起
擴展:左外連接、右外連接和全外連接
cogroup()類似全連接,但是在同一個RDD中對k聚合
第二部分:Action行動算子
reduce()聚合
collect()以陣列的形式回傳資料集
count()回傳RDD中元素個數
first()回傳RDD中的第一個元素
take(n)回傳由RDD前n個元素組成的陣列
takeOrdered(n)回傳RDD排序后前n個元素組成的陣列
aggregate()案例
fold()案例
countByKey()統計每種key的個數
save相關算子
foreach()&foreachPartition遍歷RDD中每一個元素
總結
前言
在 Spark Core中,RDD(Resilient Distributed Dataset,彈性分布式資料集) 支持 2 種操作:
1、transformation
從一個已知的 RDD 中創建出來一個新的 RDD ,例如: map就是一個transformation,
2、action
在資料集上計算結束之后, 給驅動程式回傳一個值.,例如: reduce就是一個action,
注意:
本文只簡單講述Transformation算子和Action算子,目的是了解其相應的算子如何使用;并不打算深入理解每一個算子的執行程序和邏輯思想,
在 Spark 中幾乎所有的transformation操作都是懶執行的(lazy),也就是說transformation操作并不會立即計算他們的結果,而是記住了這個操作,只有當通過一個action來獲取結果回傳給驅動程式的時候這些轉換操作才開始計算,這種設計可以使 Spark 運行起來更加的高效,
默認情況下,你每次在一個 RDD 上運行一個action的時候,前面的每個transformed RDD 都會被重新計算,但是我們可以通過persist (or cache)方法來持久化一個 RDD 在記憶體中,也可以持久化到磁盤上, 來加快訪問速度,
根據 RDD 中資料型別的不同,整體分為 2 種 RDD:
- Value型別
- Key-Value型別(其實就是存了一個二維的元組)
第一部分:Transformation算子
Value型別
map()映射
需求:
創建一個1~4陣列的RDD,兩個磁區,將所有元素*2形成新的RDD,
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
mapTest(sc)
//4.關閉連接
sc.stop()
}
def mapTest(sc: SparkContext): Unit = {
// 3.1 創建一個RDD
val rdd: RDD[Int] = sc.makeRDD(1 to 4,2)
// 3.2 呼叫map方法,每個元素乘以2
val mapRdd: RDD[Int] = rdd.map(_ * 2)
// 3.3 列印修改后的RDD中資料
mapRdd.collect().foreach(println)
}
}
map()操作如圖所示:

map()函式結構
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}功能描述
引數f是一個函式,它可以接收一個引數,當某個RDD執行map方法時,會遍歷該RDD中的每一個資料項,并依次應用f函式,從而產生一個新的RDD,即,這個新RDD中的每一個元素都是原來RDD中每一個元素依次應用f函式而得到的,
在本例中,f為:_*2
備注:rdd.map(_ * 2)是rdd.map((f: Int) => f * 2)的簡寫,
mapPartitions()以磁區單位執行Map
需求:
創建一個RDD,4個元素,2個磁區,使每個元素*2組成新的RDD
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
mapPartitionsTest(sc)
//4.關閉連接
sc.stop()
}
/**
* 創建一個RDD,4個元素,2個磁區,使每個元素*2組成新的RDD
* */
def mapPartitionsTest(sc: SparkContext): Unit ={
// 3.1 創建第一個RDD
val rdd: RDD[Int] = sc.makeRDD(1 to 4,2)
// 3.2 需求實作
val value: RDD[Int] = rdd.mapPartitions((data: Iterator[Int]) => data.map((x: Int) => x * 2))
// 3.3 列印
value.foreach(println)
}
}
mapPartitions()操作如圖所示:

mapPartitions()函式結構:
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}功能介紹:
- f函式把每一個磁區的資料分別放入到迭代器中,批處理,
- preservesPartitioning:是否保留上游RDD的磁區資訊,默認false
- Map是一次處理一個元素,而mapPartitions一次處理一個磁區資料,
備注:map()和mapPartitions()的區別
- map():每次處理一條資料,
- mapPartitions():每次處理一個磁區的資料,這個磁區的資料處理完后,原 RDD 中該磁區的資料才能釋放,可能導致 OOM,
- 開發指導:當記憶體空間較大的時候建議使用mapPartitions(),以提高處理效率,
mapPartitionsWithIndex()帶磁區號
需求:
創建一個RDD,使每個元素跟所在磁區號形成一個元組,組成一個新的RDD,
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
mapPartitionsWithIndexTest(sc)
//4.關閉連接
sc.stop()
}
/**
* 創建一個RDD,使每個元素跟所在磁區號形成一個元組,組成一個新的RDD
* */
def mapPartitionsWithIndexTest(sc: SparkContext): Unit ={
// 3.1 創建第一個RDD
val rdd: RDD[Int] = sc.makeRDD(1 to 4,2)
// 3.2 需求實作
val value: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index: Int, item: Iterator[Int]) => {
item.map((f: Int) => (index,f))
})
// 3.3 列印
value.foreach(println)
}
}
mapPartitionsWithIndex()操作如圖所示:

mapPartitionsWithIndex()函式結構:
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning)
}功能介紹:
- f: (Int, Iterator[T]) => Iterator[U]中Int表示磁區編號
- 類似于mapPartitions,比mapPartitions多一個整數引數表示磁區號
flatMap()壓平
需求:
創建一個集合,集合里面存盤的還是子集合,把所有子集合中資料取出放入到一個大的集合中,
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
flatMapTest(sc)
//4.關閉連接
sc.stop()
}
/**
* 創建一個集合,集合里面存盤的還是子集合,把所有子集合中資料取出放入到一個大的集合中
* */
def flatMapTest(sc: SparkContext): Unit ={
// 3.1 創建第一個RDD
val rdd: RDD[List[Int]] = sc.makeRDD(Array(List(1,2),List(3,4),List(5,6),List(7)),2)
// 3.2 需求實作
val value: RDD[Int] = rdd.flatMap((list: List[Int]) => list)
// 3.3 列印
value.foreach(println)
}
}
flatMap操作如圖所示:

擴展: 可以通過任務背景關系獲取磁區號,
rdd.foreach((f: List[Int]) => {
// 通過任務背景關系獲取partitionID
println(TaskContext.getPartitionId() + "---"+ f.mkString(","))
})
flatMap()函式結構:
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}功能介紹:
- 與map操作類似,將RDD中的每一個元素通過應用f函式依次轉換為新的元素,并封裝到RDD中,
- 區別:在flatMap操作中,f函式的回傳值是一個集合,并且會將每一個該集合中的元素拆分出來放到新的RDD中,并且新的RDD中繼承了原RDD中的磁區數,
glom()磁區轉換陣列
需求:
創建一個2個磁區的RDD,并將每個磁區的資料放到一個陣列,求出每個磁區的最大值,
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
glomTest(sc)
//4.關閉連接
sc.stop()
}
/**
* 創建一個2個磁區的RDD,并將每個磁區的資料放到一個陣列,求出每個磁區的最大值
* */
def glomTest(sc: SparkContext): Unit ={
// 3.1 創建第一個RDD
val rdd: RDD[Int] = sc.makeRDD(1 to 4,2)
// 3.2 需求實作
val value: RDD[Int] = rdd.glom().mapPartitions((x: Iterator[Array[Int]]) => x.map((f: Array[Int]) => f.max))
// 3.3 列印
value.foreach((f: Int) => {
println(TaskContext.getPartitionId() + ":" + f)
})
}
}
glom()操作如圖所示:

glom()函式結構:
def glom(): RDD[Array[T]] = withScope {
new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}功能介紹:
該操作將RDD中每一個磁區變成一個陣列,并放置在新的RDD中,陣列中元素的型別與原磁區中元素型別一致,
groupBy()分組
需求:
創建一個RDD,按照元素模以2的值進行分組,
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
groupByTest(sc)
//4.關閉連接
sc.stop()
}
/**
* 創建一個RDD,按照元素模以2的值進行分組
* */
def groupByTest(sc: SparkContext): Unit ={
// 3.1 創建第一個RDD
val rdd: RDD[Int] = sc.makeRDD(1 to 10,3)
// 3.2 需求實作
val value: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
// 3.3 列印
value.foreach(println)
}
}
groupBy()操作如圖所示:

groupBy()函式結構:
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
groupBy[K](f, defaultPartitioner(this))
}功能介紹:
分組,按照傳入函式的回傳值進行分組,將相同的key對應的值放入一個迭代器,
備注:
1、groupBy會存在shuffle程序
2、shuffle:將不同的磁區資料進行打亂重組的程序
3、shuffle一定會落盤,
擴展:復雜版的wordcount
需求:
有如下的資料,("Hello Scala", 2), ("Hello Spark", 3), ("Hello World", 2),("I Love You",5),("I Miss You",2),("Best wish",9),其中數字代表出現的個數,求每個單詞出現的次數,
代碼實作:
def worldCountTest(sc: SparkContext): Unit ={
val rdd: RDD[(String, Int)] = sc.makeRDD(Array(("Hello Scala", 2), ("Hello Spark", 3), ("Hello World", 2), ("I Love You", 5), ("I Miss You", 2), ("Best wish", 9)))
// 方式一:適合scala
/*val value: String = rdd.map {
// 模式匹配
case (str, count) => {
// scala對字串操作,("Hello Scala" + " ") * 2 = Hello Scala Hello Scala
(str + " ") * count
}
}
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_+_)
.collect()
.mkString(",")
*/
// 方式二:比較通用
val value: String = rdd.flatMap {
case (str, i) => {
str.split(" ").map((word: String) => (word, i))
}
}.reduceByKey(_ + _)
.collect()
.mkString(",")
println(value)
}
filter()過濾
需求:
創建一個1到10的RDD,過濾出偶數,
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
filterTest(sc)
//4.關閉連接
sc.stop()
}
/**
* 過濾偶數
* */
def filterTest(sc: SparkContext): Unit = {
// 3.1 創建第一個RDD
val rdd: RDD[Int] = sc.makeRDD(1 to 10, 3)
// 3.2 需求實作
val value: RDD[Int] = rdd.filter(_ % 2 == 0)
// 3.3 列印
value.foreach(println)
}
}
filter()操作程序如下:

filter()函式結構:
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}功能介紹:
- 接收一個回傳值為布爾型別的函式作為引數,
- 當某個RDD呼叫filter方法時,會對該RDD中每一個元素應用f函式,如果回傳值型別為true,則該元素會被添加到新的RDD中,
sample()采樣
需求:
創建一個RDD(1-10),從中選擇放回和不放回抽樣,
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
sampleTest(sc)
//4.關閉連接
sc.stop()
}
/**
* 隨機采樣
* */
def sampleTest(sc: SparkContext): Unit ={
// 3.1 創建第一個RDD
val rdd: RDD[Int] = sc.makeRDD(1 to 20, 3)
// 3.2 需求實作
val value: String = rdd.sample(true,0.3,3).collect().mkString(",")
val value2: String = rdd.sample(false,0.3,3).collect().mkString(",")
// 3.3 列印
println("不放回采樣結果:" + value2)
println("放回采樣結果:" + value)
}
}
sample()操作如圖所示:

sample()函式結構:
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T] = {
......
}功能介紹:
- 從大量的資料中采樣
- withReplacement: Boolean: 抽出的資料是否放回
- fraction: Double:
- 當withReplacement=false時:選擇每個元素的概率;取值一定是[0,1] ;底層使用泊松分布,
- 當withReplacement=true時:選擇每個元素的期望次數; 取值必須大于等于0,底層使用伯努利采樣,
- seed: Long: 指定亂數生成器種子
備注:
1、 該函式隨機采樣是偽隨機,因為傳入的隨機種子是一樣的,計算結果當然一樣,
distinct()去重
需求:
對如下的資料去重:3,2,9,1,2,1,5,2,9,6,1
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
distinctTest(sc)
//4.關閉連接
sc.stop()
}
/**
* 資料去重
* */
def distinctTest(sc: SparkContext): Unit = {
// 3.1 創建第一個RDD
val rdd: RDD[Int] = sc.makeRDD(List(3,2,9,1,2,1,5,2,9,6,1))
// 3.2 需求實作
val value: RDD[Int] = rdd.distinct()
// 3.3 列印
value.foreach(println)
}
}
distinct()操作如圖所示:

distinct()函式結構:
def distinct(): RDD[T] = withScope {
distinct(partitions.length)
}def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}功能介紹:
對內部的元素去重,并將去重后的元素放到新的RDD中,
備注:
1、distinct()用分布式的方式去重比HashSet集合方式不容易OOM.
2、默認情況下,distinct會生成與原RDD磁區個數一致的磁區數,當然也可以指定磁區數,
coalesce()重新磁區
需求:
1、將4個磁區的RDD合并成兩個磁區的RDD
2、將三個磁區的RDD合并成兩個磁區的RDD
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
coalesceTest(sc)
//4.關閉連接
sc.stop()
}
/**
* coalesce重磁區
* */
def coalesceTest(sc: SparkContext): Unit = {
// 3.1 創建第一個RDD
val rdd: RDD[Int] = sc.makeRDD(1 to 4,4)
val rdd2: RDD[Int] = sc.makeRDD(1 to 4,3)
// 3.2 需求實作
val value1: Array[(Int, Int)] = rdd.coalesce(2).map((TaskContext.getPartitionId(),_)).collect()
val value2: Array[(Int, Int)] = rdd2.coalesce(2).map((TaskContext.getPartitionId(),_)).collect()
// 3.3 列印
println("value1:" + value1.mkString(","))
println("value2:" + value2.mkString(","))
}
}
coalesce()操作如圖所示:

coalesce()函式結構:
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {......
}
功能介紹:
- 縮減磁區數,用于大資料集過濾后,提高小資料集的執行效率,
- shuffle默認為false,則該操作會將磁區數較多的原始RDD向磁區數比較少的目標RDD,進行轉換,
- shuffle:
- true: 進行shuffle,此時目標磁區數可以大于可以小于原磁區數,即:可以縮小和擴大原磁區,
- false:不進行shuffle,此時目標磁區數只能小于原磁區數;當大于時,磁區數不生效,即:只能縮小或等于原磁區,
備注:
1、shuffle原理: 將資料打散,然后重新組合,

2、具體shuffle程序,讀者可以參考“徹底搞懂spark的shuffle程序”,
reparation()重新磁區(執行Shuffle)
需求:
創建一個4個磁區的RDD,對其重新磁區
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
repartitionTest(sc)
//4.關閉連接
sc.stop()
}
/**
* repartition重磁區
* */
def repartitionTest(sc: SparkContext): Unit = {
// 3.1 創建第一個RDD
val rdd: RDD[Int] = sc.makeRDD(1 to 10,4)
// 3.2 需求實作
val value1: Array[(Int, Int)] = rdd.repartition(2).map((TaskContext.getPartitionId(),_)).collect()
val value2: Array[(Int, Int)] = rdd.repartition(5).map((TaskContext.getPartitionId(),_)).collect()
// 3.3 列印
println("value1:" + value1.mkString(","))
println("value2:" + value2.mkString(","))
}
}
reparation()操作如圖所示:

reparation()函式結構:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}功能介紹:
- 該操作內部其實執行的是coalesce操作,引數shuffle的默認值為true,
- 無論是將磁區數多的RDD轉換為磁區數少的RDD,還是將磁區數少的RDD轉換為磁區數多的RDD,repartition操作都可以完成,因為無論如何都會經shuffle程序,
備注: coalesce與reparation的區別
- coalesce重新磁區,可以選擇是否進行shuffle程序,由引數shuffle: Boolean = false/true決定,
- repartition實際上是呼叫的coalesce,進行shuffle,
- 如果是減少磁區, 盡量避免 shuffle,使用coalesce完成,
- 絕大多數情況下,減少磁區使用coalesce,增加磁區使用reparation,
sortBy()排序
需求:
創建一個RDD,按照不同的規則進行排序,
- 按照數字大小升序排序
- 按照數字大小降序排序
- 按照模以5的余數降序排序
- 二元組,則先按照第一個元素升序,如果第一個元素相同,在按照第二個元素降序
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
sortByTest(sc)
//4.關閉連接
sc.stop()
}
/**
* sortBy排序
* */
def sortByTest(sc: SparkContext): Unit = {
// 3.1 創建第一個RDD
val rdd: RDD[Int] = sc.makeRDD(List(3,16,5,8,2,10,9,1,7,6))
// 3.2 需求實作
// 升序
val value1: String = rdd.sortBy((num: Int) => num).collect().mkString(",")
// 降序
val value2: String = rdd.sortBy((num: Int) => num,false).collect().mkString(",")
// 按照%5排序
val value3: String = rdd.sortBy((num: Int) => num % 5,false).collect().mkString(",")
// 先按照第一個元素升序,如果第一個元素相同,在按照第二個元素降序
val value4: String = rdd2.sortBy((x: (Int, Int)) => (x._1,-x._2)).collect().mkString(",")
// 3.3 列印
println("value1:" + value1)
println("value2:" + value2)
println("value3:" + value3)
println("value4:" + value4)
}
}
sortBy()操作如圖所示:

sortBy()函式結構:
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
this.keyBy[K](f)
.sortByKey(ascending, numPartitions)
.values
}功能介紹:
- 該操作用于排序資料,
- 在排序之前,可以將資料通過f函式進行處理,之后按照f函式處理的結果進行排序,默認為正序排列,
- 默認情況下,排序后新產生的RDD的磁區數與原RDD的磁區數一致,
- 可以通過numPartitions引數調整目標磁區數,
pipe()呼叫腳本
需求:
撰寫一個腳本,使用管道將腳本作用于RDD上,
代碼實作:
# 撰寫一個腳本,并增加執行權限
[root@node001 spark]$ vim pipe.sh
#!/bin/sh
echo "Start"
while read LINE; do
echo ">>>"${LINE}
done
[root@node001 spark]$ chmod 777 pipe.sh
# 創建一個只有一個磁區的RDD
scala> val rdd = sc.makeRDD (List("hi","Hello","how","are","you"),1)
# 將腳本作用該RDD并列印
scala> rdd.pipe("/opt/module/spark/pipe.sh").collect()
res18: Array[String] = Array(Start, >>>hi, >>>Hello, >>>how, >>>are, >>>you)
#創建一個有兩個磁區的RDD
scala> val rdd = sc.makeRDD(List("hi","Hello","how","are","you"),2)
# 將腳本作用該RDD并列印
scala> rdd.pipe("/opt/module/spark/pipe.sh").collect()
res19: Array[String] = Array(Start, >>>hi, >>>Hello, Start, >>>how, >>>are, >>>you)
pipe()操作如圖所示:

pipe()函式結構:
def pipe(command: String): RDD[String] = withScope {
// Similar to Runtime.exec(), if we are given a single string, split it into words
// using a standard StringTokenizer (i.e. by spaces)
pipe(PipedRDD.tokenize(command))
}功能介紹:
- 管道,針對每個磁區,都呼叫一次shell腳本,回傳輸出的RDD
腳本要放在 worker 節點可以訪問到的位置
每個磁區執行一次腳本, 但是每個元素算是標準輸入中的一行
雙Value型別互動
雙Value型別的互動常用的算子是數學中的并集、交集合差集,
union()并集、subtract()差集、intersection()交集
需求:
創建兩個RDD,求并集、交集、差集
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
aggTest(sc)
//4.關閉連接
sc.stop()
}
/**
* 求并集、交集、差集
* */
def aggTest(sc: SparkContext): Unit ={
// 3.1 創建第一個RDD
val rdd1: RDD[Int] = sc.makeRDD (1 to 6)
val rdd2: RDD[Int] = sc.makeRDD (4 to 10)
// 3.2 需求實作
// 并集
val value1: Array[Int] = rdd1.union(rdd2).collect()
// 差集
val value2: Array[Int] = rdd1.subtract(rdd2).collect()
// 交集
val value3: Array[Int] = rdd1.intersection(rdd2).collect()
// 3.3 列印
println("并集:" + value1.mkString(","))
println("差集:" + value2.mkString(","))
println("交集:" + value3.mkString(","))
}
}
union()并集操作如圖所示:

subtract()差集操作如圖所示:

intersection()交集操作如圖所示:

union()函式結構:
def union(other: RDD[T]): RDD[T] = withScope {
sc.union(this, other)
}subtract()函式結構:
def subtract(other: RDD[T]): RDD[T] = withScope {
subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
}intersection()函式結構:
def intersection(other: RDD[T]): RDD[T] = withScope {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
}功能介紹:
- 并集:對源RDD和引數RDD求并集后回傳一個新的RDD
- 差集:計算差的一種函式,去除兩個RDD中相同元素,不同的RDD將保留下來
交集:對源RDD和引數RDD求交集后回傳一個新的RDD
zip()拉鏈
需求:
創建兩個RDD,并將兩個RDD組合到一起形成一個(k,v)RDD
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
zipTest(sc)
//4.關閉連接
sc.stop()
}
/**
* zip拉鏈
* */
def zipTest(sc: SparkContext): Unit ={
// 3.1 創建第一個RDD
val rdd1: RDD[Int] = sc.makeRDD (1 to 3,2)
val rdd2: RDD[String] = sc.makeRDD (List("a","b","c"),2)
// 3.2 需求實作
val value: Array[(String, Int)] = rdd2.zip(rdd1).collect()
// 3.3 列印
println(value.mkString(","))
}
}
zip()操作如圖所示:

zip()函式結構:
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
......
}
功能介紹:
- 該操作可以將兩個RDD中的元素,以鍵值對的形式進行合并,其中,鍵值對中的Key為第1個RDD中的元素,Value為第2個RDD中的元素,
- 將兩個RDD組合成Key/Value形式的RDD,這里默認兩個RDD的partition數量以及元素數量都相同,否則會拋出例外,
Key-Value型別
partitionBy()按照k重新磁區
需求:
創建一個5個磁區的RDD,對其重新磁區,
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
partitionByKeyTest(sc)
//4.關閉連接
sc.stop()
}
/**
* partitionByKey:根據key重新磁區
* */
def partitionByKeyTest(sc: SparkContext): Unit ={
val tuples: List[(Int, String)] = List((1, "a"), (2, "b"), (3, "c"), (4, "d"), (1, "aa"), (1, "bb"), (3, "cc"), (4, "dd")
,(1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd"))
// 3.1 創建第一個RDD
val rdd: RDD[(Int, String)] = sc.makeRDD (tuples,5)
rdd.cache()
println("-------磁區前-------")
val value1: String = rdd.mapPartitionsWithIndex((index: Int, data: Iterator[(Int, String)]) => {
data.map((index,_))
}).collect().mkString("\t")
println(value1)
// 3.2 需求實作
val value: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(3))
// 3.3 列印
println("-------磁區后-------")
val value2: String = value.mapPartitionsWithIndex((index: Int, data: Iterator[(Int, String)]) => {
data.map((index,_))
}).collect().mkString("\t")
println(value2)
}
}
partitionBy()操作如圖所示:

partitionBy()函式結構:
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
if (self.partitioner == Some(partitioner)) {
self
} else {
new ShuffledRDD[K, V, V](self, partitioner)
}
}功能介紹:
- 將RDD[K,V]中的K按照指定Partitioner重新進行磁區;會產生Shuffle程序,
- 如果原有的partionRDD和現有的partionRDD是一致的話就不進行磁區,否則進行磁區,
- 默認的磁區器是HashPartitioner,
備注:
1、HashPartitioner的原理是:
- 依據RDD中key值的hashCode的值將資料取模后得到該key值對應的下一個RDD的磁區id值,
- 支持key值為null的情況,當key為null的時候,回傳0;
- 該磁區器基本上適合所有RDD資料型別的資料進行磁區操作,
2、詳細的磁區器原理,請參考:[Spark] - HashPartitioner & RangePartitioner 區別
注意:
1、由于JAVA中陣列的hashCode是基于陣列物件本身的,不是基于陣列內容的,所以如果RDD的key是陣列型別,那么可能導致資料內容一致的資料key沒法分配到同一個RDD磁區中,(未測)
2、在scala中,如果RDD的key是Array陣列型別,編譯不通過,Exception in thread "main" org.apache.spark.SparkException: HashPartitioner cannot partition array keys.
這個時候最好自定義資料磁區器,采用陣列內容進行磁區或者將陣列的內容轉換為集合,
自定義磁區
class MyPartitioner(num: Int) extends Partitioner {
// 設定的磁區數
override def numPartitions: Int = num// 具體磁區邏輯
override def getPartition(key: Any): Int = {if (key.isInstanceOf[Int]) {
val keyInt: Int = key.asInstanceOf[Int]
if (keyInt % 2 == 0)
0
else
1
}else{
0
}
}
}
reduceByKey()按照k進行聚合v
需求:
統計單詞出現次數(wordCount),
代碼實作:
為了增加難度,本次實驗給字串加了一些中英文的字符,
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
reduceByKeyTest(sc)
//4.關閉連接
sc.stop()
}
def reduceByKeyTest(sc: SparkContext): Unit = {
val tuple1: String = "Apache Spark? is a: multi-language engine for executing★ data engineering, "
val tuple2: String = "$Apache Spark? is a multi-language ※engine for executing data ¥engineering, "
val tuple3: String = "Apache Spark? is (a multi-language) engine for executing data engineering, "
val tuples: String = tuple1 + tuple2 + tuple3
// 忽略標點字符
val str: String = tuples.replaceAll("\\pP|\\pS", "")
// 3.1 創建第一個RDD
val rdd: RDD[String] = sc.parallelize(Array(str))
// 3.2 需求實作
val value2: String = rdd.flatMap(f => f.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.collect()
.mkString("\n")
// 3.3 列印
println(value2)
}
}
運行結果:

備注: 正則運算式過濾字串的方法
/pP :其中的小寫 p 是 property 的意思,表示 Unicode 屬性,用于 Unicode 正運算式的前綴,大寫 P 表示 Unicode 字符集七個字符屬性之一:標點字符,
其他六個是:
L:字母;
M:標記符號(一般不會單獨出現);
Z:分隔符(比如空格、換行等);
S:符號(比如數學符號、貨幣符號等);
N:數字(比如阿拉伯數字、羅馬數字等);
C:其他字符
reduceByKey()操作如圖:

reduceByKey()函式結構:
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
reduceByKey(new HashPartitioner(numPartitions), func)
}def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}功能介紹:
- 該操作可以將RDD[K,V]中的元素按照相同的K對V進行聚合,
- 其存在多種多載形式,還可以設定新RDD的磁區數,
- 默認的磁區器為HashPartitioner
groupByKey()按照k重新分組
需求:
求List(("a",1),("b",5),("a",5),("b",2),("a",1),("c",5),("b",5),("b",2))中abc平均出現的次數,
函式實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
groupByKeyTest(sc)
//4.關閉連接
sc.stop()
}
/**
* groupByKey():按照key分組
* */
def groupByKeyTest(sc: SparkContext): Unit = {
// 3.1 創建第一個RDD
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("c",15),("a",5),("b",2),("a",1),("c",9),("b",5),("b",2)), 2)
// 3.2 需求實作
val value: Array[(String, String)] = rdd.groupByKey().map(f => {
val format: DecimalFormat = new DecimalFormat("#0.00")
val sum: Int = f._2.sum
val count: Int = f._2.size
val nums: Double = sum.toDouble / count
(f._1,format.format(nums))
}).collect()
// 3.3 列印
println(value.mkString("\n"))
}
}
備注: DecimalFormat 類,
該類主要靠 # 和 0 兩種占位符號來指定數字長度,0 表示如果位數不足則以 0 填充,# 表示只要有可能就把數字拉上這個位置,
如:#.00表示小數點后保留兩位,小數點前不確定,有幾位都行,
00.00表示整數部分兩位,不足用0補足;小數部分2位,不足用0補足,
groupByKey()操作如圖:

groupByKey()函式結構:
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(defaultPartitioner(self))
}def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
}
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(new HashPartitioner(numPartitions))
}功能介紹:
- groupByKey對每個key進行操作,但只生成一個seq,并不進行聚合,
- 該操作可以指定磁區器或者磁區數(默認使用HashPartitioner)
備注: reduceByKey和groupByKey區別
- reduceByKey:按照key進行聚合,在shuffle之前有combine(預聚合)操作,回傳結果是RDD[k,v],
- groupByKey:按照key進行分組,直接進行shuffle,
- 開發指導:
- 在不影響業務邏輯的前提下,優先選用reduceByKey,
- 求和操作不影響業務邏輯,求平均值影響業務邏輯,
aggregateByKey()按照k進行磁區內和磁區間邏輯
需求
創建一個RDD,取出RDD中每個磁區相同key對應值的最大值,然后相加,
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
aggregateByKeyTest(sc)
//4.關閉連接
sc.stop()
}
/**
* 取出每個磁區相同key對應值的最大值,然后相加
* */
def aggregateByKeyTest(sc: SparkContext): Unit = {
// 3.1 創建第一個RDD
val array: Array[(String, Int)] = Array(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8))
val rdd: RDD[(String, Int)] = sc.makeRDD(array, 2)
// 3.2 取出每個磁區相同key對應值的最大值,然后相加
// val value: RDD[(String, Int)] = rdd.aggregateByKey(0)(math.max, _ + _)
// 不使用math函式
val value: RDD[(String, Int)] = rdd.aggregateByKey(0)((a: Int, b: Int) => {
if (a > b) a else b
}, _ + _)
// 3.3 列印結果
value.foreach(println)
}
aggregateByKey()操作如圖所示:

函式結構 :
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(
seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
*********
}功能介紹:
(1)zeroValue(初始值):給每一個磁區中的每一種key一個初始值;
(2)seqOp(磁區內):函式用于在每一個磁區中用初始值逐步迭代value;
(3)combOp(磁區間):函式用于合并每個磁區中的結果,
foldByKey()磁區內核磁區間具有相同邏輯的aggregateByKey()
需求:
將上例中的資料求一個worldcount,
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
foldByKeyTest(sc)
//4.關閉連接
sc.stop()
}
def foldByKeyTest(sc: SparkContext): Unit = {
// 3.1 創建第一個RDD
val array: Array[(String, Int)] = Array(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8))
val rdd: RDD[(String, Int)] = sc.makeRDD(array)
// 3.2 worldcount
// val value: RDD[(String, Int)] = rdd.aggregateByKey(0)(_+_, _ + _)
val value: RDD[(String, Int)] = rdd.foldByKey(0)(_ + _)
// 3.3 列印
value.foreach(println)
}
foldByKey()操作如圖所示:

foldByKey()函式結構:
def foldByKey(
zeroValue: V,
partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope {........
}
功能介紹:
- aggregateByKey的簡化操作,seqop和combop相同,即,磁區內邏輯和磁區間邏輯相同,
引數zeroValue:是一個初始化值,它可以是任意型別
- 引數func:是一個函式,兩個輸入引數相同
combineByKey()轉換結構后磁區內和磁區間操作
需求:
創建一個pairRDD,根據key計算每種key的均值,
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
combineKeyTest(sc)
//4.關閉連接
sc.stop()
}
def combineKeyTest(sc: SparkContext): Unit = {
// 3.1 創建第一個RDD
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)))
// 3.2 需求實作: 將相同key對應的值相加,同時記錄該key出現的次數,放入一個二元組
val value: Array[(String, String)] = rdd.combineByKey(
(_, 1),
(acc: (Int, Int), other: Int) => (acc._1 + other, acc._2 + 1), // 注意此處的型別必須手動添加,否則無法使用
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).map(f => {
val format: DecimalFormat = new DecimalFormat("#0.00")
(f._1, format.format(f._2._1 / f._2._2.toDouble))
}).collect()
// 3.3 列印
println(value.mkString("\n"))
}
}
combineByKey()操作如圖所示:

combineByKey()函式功能:
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
partitioner, mapSideCombine, serializer)(null)
}def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)
}功能介紹:
- createCombiner:V=>C 分組內的創建組合的函式,通俗點將就是對讀進來的資料進行初始化,其把當前的值作為引數,可以對該值做一些轉換操作,轉換為我們想要的資料格式
- mergeValue: (C, V) => C該函式主要是磁區內的合并函式,作用在每一個磁區內部,其功能主要是將V合并到之前(createCombiner)的元素C上,注意,這里的C指的是上一函式轉換之后的資料格式,而這里的V指的是原始資料格式(上一函式為轉換之前的)
- mergeCombiners:(C,C)=>R 該函式主要是進行多磁區合并,此時是將兩個C合并為一個C,例如兩個C:(Int)進行相加之后得到一個R:(Int)
備注: reduceByKey、aggregateByKey、foldByKey、combineByKey 四種對key聚合的區別和聯系
- 四種底層都呼叫了combineByKeyWithClassTag
- reduceByKey 沒有初始值,磁區內和磁區間邏輯相同
- aggregateByKey 第一個初始值和磁區內處理規則一致,磁區內和磁區間邏輯可以不同
foldByKey 有初始值,磁區內和磁區間邏輯一致
combineByKey 初始值可以變化結構,磁區內和磁區間邏輯不同
sortByKey()按照k進行排序
需求:
創建一個pairRDD,按照key的正序和倒序進行排序
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
sortByKeyTest(sc)
//4.關閉連接
sc.stop()
}
/**
* sortByKeyTest():在一個(K,V)的RDD上呼叫,K必須實作Ordered介面,回傳一個按照key進行排序的(K,V)的RDD
* */
def sortByKeyTest(sc: SparkContext): Unit = {
// 3.1 創建第一個RDD
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)))
// 3.2 需求實作
val value1: Array[(String, Int)] = rdd.sortByKey().collect()
val value2: Array[(String, Int)] = rdd.sortByKey(false).collect()
// 3.3 列印
println(value1.mkString("\n"))
println("--------------------------")
println(value2.mkString("\n"))
}
}
sortByKey()操作如圖所示:

sortByKey()函式結構:
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}功能介紹:
- 在一個(K,V)的RDD上呼叫,K必須實作Ordered介面,回傳一個按照key進行排序的(K,V)的RDD
- 默認升序
mapValues()只對v進行操作
需求:
創建一個pairRDD,并將value資料轉換成大寫,
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
mapValuesTest(sc)
//4.關閉連接
sc.stop()
}
/**
* mapValuesTest():只對Values操作
* */
def mapValuesTest(sc: SparkContext): Unit = {
// 3.1 創建第一個RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "b"), (3, "c"), (4, "d"), (1, "aa"), (1, "bb"), (3, "cc")))
// 3.2 需求實作
val value: Array[(Int, String)] = rdd.mapValues(_.toUpperCase).collect()
// 3.3 列印
println(value.mkString("\n"))
}
}
mapValues()操作如圖所示:

mapValues()函式結構:
def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
preservesPartitioning = true)
}功能介紹:
針對于(K,V)形式的型別只對V進行操作
join()將相同k對應的多個v關聯在一起
需求:
創建兩個pairRDD,并將key相同的資料聚合到一個元組,
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
joinTest(sc)
//4.關閉連接
sc.stop()
}
/**
* joinTest():在型別為(K,V)和(K,W)的RDD上呼叫,回傳一個相同key對應的所有元素對在一起的(K,(V,W))的RDD
* */
def joinTest(sc: SparkContext): Unit = {
//3.1 創建第兩個RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c"),(0,"零")))
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (4, 6)))
// 3.2 需求實作
val value: Array[(Int, (String, Int))] = rdd.join(rdd1).collect()
// 3.3 列印
println(value.mkString("\n"))
}
}
join()操作如圖所示:

join()函式結構:
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}功能介紹:
在型別為(K,V)和(K,W)的RDD上呼叫,回傳一個相同key對應的所有元素對在一起的(K,(V,W))的RDD
擴展:左外連接、右外連接和全外連接
回傳型別為:(Int, (String, Option[Int])),
Option[A] (sealed trait) 有兩個取值:
1. Some[A] 有型別A的值
2. None 沒有值
一般通過f._2._2.getOrElse("默認值")獲取值,
左外連接(左連接):左邊都有,右邊關聯不上,用None表示,
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
右外連接(右連接):右邊都有,左邊關聯不上,用None表示
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
全外連接(全連接):左右都有,關聯不上,用None表示,
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
cogroup()類似全連接,但是在同一個RDD中對k聚合
需求:
創建兩個pairRDD,并將key相同的資料聚合到一個迭代器,
代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
cogroupTest(sc)
//4.關閉連接
sc.stop()
}
/**
* cogroupTest():在型別為(K,V)和(K,W)的RDD上呼叫,回傳一個(K,(Iterable<V>,Iterable<W>))型別的RDD
* */
def cogroupTest(sc: SparkContext): Unit = {
//3.1 創建第兩個RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (3, "cC"), (2, "b"), (3, "c"),(0,"零"),(1, "aA")))
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (2, 55),(4, 6),(1, 44)))
// 3.2 需求實作
val value: Array[(Int, (Iterable[String], Iterable[Int]))] = rdd.cogroup(rdd1).collect()
// 3.3 列印
println(value.mkString("\n"))
}
}
cogroup()操作如圖所示:

cogroup()函式結構:
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
cogroup(other, defaultPartitioner(self, other))
}功能介紹:
在型別為(K,V)和(K,W)的RDD上呼叫,回傳一個(K,(Iterable<V>,Iterable<W>))型別的RDD
第二部分:Action行動算子
行動算子是觸發了整個作業的執行,因為轉換算子都是懶加載,并不會立即執行,常用的行動算子有如下幾個,
說明:為了方便,行動算子的測驗也將在object TransformationDemo包下實作,不再新建一個包,
reduce()聚合
1)函式簽名:def reduce(f: (T, T) => T): T
2)功能說明:f函式聚集RDD中的所有元素,先聚合磁區內資料,再聚合磁區間資料,
3)需求說明:創建一個RDD,將所有元素聚合得到結果
4)代碼如下:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
reduceTest(sc)
//4.關閉連接
sc.stop()
}
/**
* 行動算子:
* reduce:聚集RDD內所有元素
* */
def reduceTest(sc: SparkContext): Unit = {
//3.1 創建第兩個RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (3, "cC"), (2, "b"), (3, "c"),(0,"零"),(1, "aA")))
val rdd1: RDD[Int] = sc.makeRDD(1 to 10)
// 3.2 需求實作
val value: (Int, String) = rdd.reduce((k1: (Int, String), k2: (Int, String)) => (k1._1 + k2._1,k1._2 + k2._2))
val value1: Int = rdd1.reduce(_+_)
// 3.3 列印
println(value)
println(value1)
}
}
collect()以陣列的形式回傳資料集
1)函式簽名:def collect(): Array[T]
2)功能說明:在驅動程式中,以陣列Array的形式回傳資料集的所有元素,
注意:所有的資料都會被拉取到Driver端,當資料量大時,會發生OOM,慎用,
3)需求說明:創建一個RDD,并將RDD內容收集到Driver端列印
4)代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
collectTest(sc)
//4.關閉連接
sc.stop()
}
/**
* 行動算子:
* collect:收集資料到Driver端,以陣列的形式回傳,
* */
def collectTest(sc: SparkContext): Unit = {
//3.1 創建第兩個RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (3, "cC"), (2, "b"), (3, "c"),(0,"零"),(1, "aA")))
// 3.2 需求實作
val value: Array[(Int, String)] = rdd.collect()
// 3.3 列印
value.foreach(println)
}
}
count()回傳RDD中元素個數
1)函式簽名:def count(): Long
2)功能說明:回傳RDD中元素的個數,
3)需求說明:創建一個RDD,統計該RDD的條數,
4)代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
countTest(sc)
//4.關閉連接
sc.stop()
}
/**
* 行動算子:
* count:回傳RDD的條數
* */
def countTest(sc: SparkContext): Unit = {
//3.1 創建第兩個RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (3, "cC"), (2, "b"), (3, "c"),(0,"零"),(1, "aA")))
// 3.2 需求實作
val value: Long = rdd.count()
// 3.3 列印
println(value)
}
}
first()回傳RDD中的第一個元素
1)函式簽名: def first(): T
2)功能說明:回傳RDD中的第一個元素,
3)需求說明:創建一個RDD,回傳該RDD中的第一個元素,
4)代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
firstTest(sc)
//4.關閉連接
sc.stop()
}
/**
* 行動算子:
* first:回傳RDD中第一條資料
* */
def firstTest(sc: SparkContext): Unit = {
//3.1 創建第兩個RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (3, "cC"), (2, "b"), (3, "c"),(0,"零"),(1, "aA")))
// 3.2 需求實作
val value: (Int, String) = rdd.first()
// 3.3 列印
println(value)
}
}
take(n)回傳由RDD前n個元素組成的陣列
1)函式簽名: def take(num: Int): Array[T]
2)功能說明:回傳一個由RDD的前n個元素組成的陣列,
3)需求說明:回傳RDD中key值最大的前3個元素,
4)代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
takeTest(sc)
//4.關閉連接
sc.stop()
}
/**
* 行動算子:
* take(n):回傳由RDD組成的前n個元素的陣列,
* */
def takeTest(sc: SparkContext): Unit = {
//3.1 創建第兩個RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"),(4, "4c"), (5, "5c"), (3, "cC"), (2, "b"), (6, "c"),(0,"零"),(7, "aA")))
// 3.2 需求實作
val value: Array[(Int, String)] = rdd.sortByKey(false).take(3)
// 3.3 列印
println(value.mkString("\n"))
}
}
takeOrdered(n)回傳RDD排序后前n個元素組成的陣列
1)函式簽名: def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
2)功能說明:回傳該RDD排序后的前n個元素組成的陣列,(默認升序)
3)需求說明:創建一個RDD,獲取該RDD排序后的前2個元素,
4)代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[*]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
takeTest(sc)
//4.關閉連接
sc.stop()
}
/**
* 行動算子:
* takeOrdered(n):回傳該RDD排序后的前n個元素組成的陣列.
* */
def takeOrderedTest(sc: SparkContext): Unit = {
//3.1 創建第兩個RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"),(4, "4c"), (5, "5c"), (3, "cC"), (2, "b"), (6, "c"),(0,"零"),(7, "aA")))
// 3.2 需求實作
val value: Array[(Int, String)] = rdd.takeOrdered(5) // 升序
// 自定義排序(重寫compare方法):降序
val value1: Array[(Int, String)] = rdd.takeOrdered(5)(new Ordering[(Int,String)](){
override def compare(x: (Int, String), y: (Int, String)): Int = y._1-x._1
})
// 3.3 列印
println(value.mkString("\n"))
println(value1.mkString("\n"))
}
}
aggregate()案例
1)函式簽名: def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
2)功能說明:aggregate函式將每個磁區里面的元素通過磁區內邏輯和初始值進行聚合,然后用磁區間邏輯和初始值(zeroValue)進行操作,
注意:1、磁區間邏輯再次使用初始值和aggregateByKey是有區別的,
2、zeroValue 磁區內聚合和磁區間聚合的時候各會使用一次,
操作流圖:

3)需求說明:創建一個RDD,將所有元素相加得到結果,
4)代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[1]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
aggregateTest(sc)
//4.關閉連接
sc.stop()
}
/**
* 行動算子:
* aggregateTest(n):aggregate函式將每個磁區里面的元素通過磁區內邏輯和初始值進行聚合,然后用磁區間邏輯和初始值(zeroValue)進行操作
* */
def aggregateTest(sc: SparkContext): Unit = {
//3.1 創建第兩個RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"),(2, "4c"), (3, "5c"), (4, "cC")),2)
// 3.2 需求實作
val value: Int = rdd.aggregate(0)(
(a: Int, b: (Int, String)) => a + b._1,
(x: Int, y: Int) => x + y
)
val value3: Int = rdd.aggregate(10)(
(a: Int, b: (Int, String)) => a + b._1,
(x: Int, y: Int) => x + y
)
val value2: String = rdd.aggregate("x")(
(a: String, b: (Int, String)) => a + b._1,
(x: String, y: String) => x + y
)
// 3.3 列印
println(value)
println(value2)
println(value3)
}
fold()案例
1)函式簽名: def fold(zeroValue: T)(op: (T, T) => T): T
2)功能說明:折疊操作,aggregate的簡化操作,即,磁區內邏輯和磁區間邏輯相同,
3)需求說明:創建一個RDD,將所有元素相加得到結果,
4)代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[1]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
foldTest(sc)
//4.關閉連接
sc.stop()
}
/**
* 行動算子:
* fold:折疊操作,aggregate的簡化操作,即,磁區內邏輯和磁區間邏輯相同,
* */
def foldTest(sc: SparkContext): Unit = {
//3.1 創建第兩個RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"),(2, "4c"), (3, "5c"), (4, "cC")),2)
// 3.2 需求實作
val value: (Int, String) = rdd.fold((0," "))((z: (Int, String), x: (Int, String)) => (z._1 + x._1,x._2 + z._2))
// 3.3 列印
println(value)
}
}
countByKey()統計每種key的個數
1)函式簽名:def countByKey(): Map[K, Long]
2)功能說明:統計每種key的個數,
注:可以用來查看資料是否傾斜,
3)需求說明:創建一個PairRDD,統計每種key的個數,
4)代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[1]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
countByKeyTest(sc)
//4.關閉連接
sc.stop()
}
/**
* 行動算子:
* countByKey:統計每種key的個數
* */
def countByKeyTest(sc: SparkContext): Unit = {
//3.1 創建第兩個RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "b"), (3, "c"), (4, "d"), (1, "aa"), (1, "bb"), (3, "cc")))
// 3.2 需求實作
val value: collection.Map[Int, Long] = rdd.countByKey()
// 3.3 列印
println(value)
}
}
save相關算子
1)saveAsTextFile(path)保存成Text檔案
(1)函式簽名:def saveAsTextFile(path: String)
(2)功能說明:將資料集的元素以textfile的形式保存到HDFS檔案系統或者其他支持的檔案系統,對于每個元素,Spark將會呼叫toString方法,將它裝換為檔案中的文本,
2)saveAsSequenceFile(path) 保存成Sequencefile檔案
(1)函式簽名:def saveAsSequenceFile(path: String)
(2)功能說明:將資料集中的元素以Hadoop Sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的檔案系統,
注意:只有kv型別RDD有該操作,單值的沒有
3)saveAsObjectFile(path) 序列化成物件保存到檔案
(1)函式簽名:def saveAsObjectFile(path: String)
(2)功能說明:用于將RDD中的元素序列化成物件,存盤到檔案中,
代碼演示:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[1]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
saveTest(sc)
//4.關閉連接
sc.stop()
}
/**
* 行動算子:
* savexxx:將資料持久化到本地(或HDFS)
* */
def saveTest(sc: SparkContext): Unit = {
//3.1 創建RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "b"), (3, "c"), (4, "d"), (1, "aa"), (1, "bb"), (3, "cc")))
// 3.2 需求實作
rdd.repartition(1).saveAsTextFile("file:///C:/tmp/output/txt/")
rdd.repartition(1).saveAsObjectFile("file:///C:/tmp/output/obj/")
rdd.repartition(1).saveAsSequenceFile("file:///C:/tmp/output/seq/")
}
}
foreach()&foreachPartition遍歷RDD中每一個元素
1)函式簽名:def foreach(f: T => Unit): Unit
2)功能說明:遍歷RDD中的每一個元素,并依次應用f函式,
兩種形式的foreach算子操作如圖所示:

備注:
1、呼叫collect算子時,資料在Drive端列印;未呼叫collect算子時,資料不會回傳Drive端,而是直接在Executor端列印,
2、foreach和foreachPartition的區別:
foreach 是一條一條的拿資料進行處理;
foreachPartition是一個磁區一個磁區的拿資料,一個磁區中有很多資料的資訊,
所以,在使用中,當我們要把處理結果保存到資料庫中的時候,我們要使用foreachPartition這個方法,這樣效率會更高,
3)需求說明:創建一個RDD,對每個元素進行列印,
4)代碼實作:
object TransformationDemo {
def main(args: Array[String]): Unit = {
//1.創建SparkConf并設定App名稱
val conf: SparkConf = new SparkConf().setMaster("local[1]")
.setAppName("TransformationDemo_test")
//2.創建SparkContext,該物件是提交Spark App的入口
val sc: SparkContext = new SparkContext(conf)
//3具體業務邏輯
foreachTest(sc)
//4.關閉連接
sc.stop()
}
/**
* 行動算子:
* foreach和foreachPartition:遍歷RDD中的元素
* */
def foreachTest(sc: SparkContext): Unit = {
//3.1 創建RDD
val rdd: RDD[Int] = sc.makeRDD(1 to 100,5)
// 3.2 需求實作
rdd.foreach((f: Int) => println(TaskContext.getPartitionId() + "--" + f))
rdd.foreachPartition((f: Iterator[Int]) => println(TaskContext.getPartitionId() + "--" + f.mkString(",")))
}
}
總結
至此,常用的RDD算子已全部匯總完畢,當然,還有許多遺漏的算子和方法的使用未涉及到,感興趣的讀者可以自行研究學習,
下面為讀者匯總關于RDD算子中面試常見問題:
- map與mapPartitions的區別
map():每次處理一條資料,
mapPartitions():每次處理一個磁區的資料,這個磁區的資料處理完后,原 RDD 中該磁區的資料才能釋放,可能導致 OOM,
開發指導:當記憶體空間較大的時候建議使用mapPartitions(),以提高處理效率,
- coalesce與repartition兩個算子的作用以及區別與聯系
1)關系:
兩者都是用來改變RDD的partition數量的,repartition底層呼叫的就是coalesce方法:coalesce(numPartitions, shuffle = true)2)區別:
repartition一定會發生shuffle,coalesce根據傳入的引數來判斷是否發生shuffle,
一般情況下增大rdd的partition數量使用repartition,減少partition數量時使用coalesce,注:
1、coalesce填寫的引數小于等于RDD的partition數量,大于不起作用
2、coalesce中的引數過小可能會導致OOM,這時,又想保證小檔案的數量少,可以使用reparation,
- 使用zip算子時需要注意的是什么(即哪些情況不能使用)
在 Spark 中, 兩個 RDD 的元素的數量和磁區數都必須相同, 否則會拋出例外,
其實本質就是要求的每個磁區的元素的數量相同.
- aggregateByKey與aggregate之間的區別與聯系,
aggregate和aggregateByKey的引數是一樣的,作用也一樣,只不過aggregateByKey多了key而已,
如果將RDD分別有設定成2個磁區和4個磁區,則
- 不影響aggregateByKey()的輸出結果,因為其是按照K處理磁區內和磁區間邏輯,
- 會影響aggregate的輸出結果,2個磁區中,初始值共使用了3次;4個磁區中,初始值共使用了5次,
- reduceByKey跟groupByKey之間的區別,
reduceByKey:按照key進行聚合,在shuffle之前有combine(預聚合)操作,回傳結果是RDD[k,v],
groupByKey:按照key進行分組,直接進行shuffle,回傳結果是RDD[(K, Iterable[V])],
開發指導:reduceByKey比groupByKey性能更好,并且groupByKey會造成OOM,建議使用,但是需要注意是否會影響業務邏輯,
- reduceByKey跟aggregateByKey之間的區別與聯系,
reduceByKey可以認為是aggregateByKey的簡化版,
aggregateByKey,分為三個引數(zeroValue、seqOp、combOp),可以自定義初始值、磁區內、磁區間的操作,
- combineByKey的引數作用,說明其引數呼叫時機,
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)](1)createCombiner:V=>C 分組內的創建組合的函式,會遍歷磁區中的每個key-value對. 如果第一次碰到這個key, 則呼叫createCombiner函式,傳入value, 得到一個C型別的值,(如果不是第一次碰到這個 key, 則不會呼叫這個方法)
(2)mergeValue:(C,V)=>C 該函式主要是磁區內的合并函式,作用在每一個磁區內部,如果不是第一個遇到這個key,則呼叫這個函式進行合并操作,
(3)mergeCombiners:(C,C)=>R 該函式主要是進行多磁區合并,此時是將兩個C合并為一個C,跨磁區合并相同的key的值,
路漫漫其修遠兮,吾將上下而求索, ---屈原
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/339076.html
標籤:其他
上一篇:MapReduce學習筆記,理解學習Hadoop的MapReduce計算系統
下一篇:Spring與Kafka整合
