主頁 >  其他 > Spark Core學習之常用算子(含經典面試題)

Spark Core學習之常用算子(含經典面試題)

2021-10-28 07:28:57 其他

目錄

前言

第一部分:Transformation算子

Value型別

map()映射

mapPartitions()以磁區單位執行Map

mapPartitionsWithIndex()帶磁區號

flatMap()壓平

glom()磁區轉換陣列

groupBy()分組

擴展:復雜版的wordcount

filter()過濾

sample()采樣

distinct()去重

coalesce()重新磁區

reparation()重新磁區(執行Shuffle)

sortBy()排序

pipe()呼叫腳本

雙Value型別互動

union()并集、subtract()差集、intersection()交集

zip()拉鏈

Key-Value型別

partitionBy()按照k重新磁區

reduceByKey()按照k進行聚合v

備注: 正則運算式過濾字串的方法

groupByKey()按照k重新分組

aggregateByKey()按照k進行磁區內和磁區間邏輯

foldByKey()磁區內核磁區間具有相同邏輯的aggregateByKey()

combineByKey()轉換結構后磁區內和磁區間操作

sortByKey()按照k進行排序

mapValues()只對v進行操作

join()將相同k對應的多個v關聯在一起

擴展:左外連接、右外連接和全外連接

cogroup()類似全連接,但是在同一個RDD中對k聚合

第二部分:Action行動算子

reduce()聚合

collect()以陣列的形式回傳資料集

count()回傳RDD中元素個數

first()回傳RDD中的第一個元素

take(n)回傳由RDD前n個元素組成的陣列

takeOrdered(n)回傳RDD排序后前n個元素組成的陣列

aggregate()案例

fold()案例

countByKey()統計每種key的個數

save相關算子

foreach()&foreachPartition遍歷RDD中每一個元素

總結


前言

在 Spark Core中,RDD(Resilient Distributed Dataset,彈性分布式資料集) 支持 2 種操作:

1、transformation
從一個已知的 RDD 中創建出來一個新的 RDD ,例如: map就是一個transformation,
2、action
在資料集上計算結束之后, 給驅動程式回傳一個值.,例如: reduce就是一個action,

注意:

本文只簡單講述Transformation算子和Action算子,目的是了解其相應的算子如何使用;并不打算深入理解每一個算子的執行程序和邏輯思想,


在 Spark 中幾乎所有的transformation操作都是懶執行的(lazy),也就是說transformation操作并不會立即計算他們的結果,而是記住了這個操作,只有當通過一個action來獲取結果回傳給驅動程式的時候這些轉換操作才開始計算,這種設計可以使 Spark 運行起來更加的高效


默認情況下,你每次在一個 RDD 上運行一個action的時候,前面的每個transformed RDD 都會被重新計算,但是我們可以通過persist (or cache)方法來持久化一個 RDD 在記憶體中,也可以持久化到磁盤上, 來加快訪問速度,

根據 RDD 中資料型別的不同,整體分為 2 種 RDD:

  • Value型別
  • Key-Value型別(其實就是存了一個二維的元組)

第一部分:Transformation算子

Value型別

map()映射

需求:

創建一個1~4陣列的RDD,兩個磁區,將所有元素*2形成新的RDD,

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    mapTest(sc)

    //4.關閉連接
    sc.stop()
  }

  def mapTest(sc: SparkContext): Unit = {
    // 3.1 創建一個RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 4,2)

    // 3.2 呼叫map方法,每個元素乘以2
    val mapRdd: RDD[Int] = rdd.map(_ * 2)

    // 3.3 列印修改后的RDD中資料
    mapRdd.collect().foreach(println)
  }
}

map()操作如圖所示:

map()函式結構
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

功能描述

引數f是一個函式,它可以接收一個引數,當某個RDD執行map方法時,會遍歷該RDD中的每一個資料項,并依次應用f函式,從而產生一個新的RDD,即,這個新RDD中的每一個元素都是原來RDD中每一個元素依次應用f函式而得到的,

在本例中,f為:_*2

備注:rdd.map(_ * 2)是rdd.map((f: Int) => f * 2)的簡寫,

mapPartitions()以磁區單位執行Map

需求:

創建一個RDD,4個元素,2個磁區,使每個元素*2組成新的RDD

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    mapPartitionsTest(sc)

    //4.關閉連接
    sc.stop()
  }

  /**
   * 創建一個RDD,4個元素,2個磁區,使每個元素*2組成新的RDD
   * */
  def mapPartitionsTest(sc: SparkContext): Unit ={
    // 3.1 創建第一個RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 4,2)

    // 3.2 需求實作
    val value: RDD[Int] = rdd.mapPartitions((data: Iterator[Int]) => data.map((x: Int) => x * 2))

    // 3.3 列印
    value.foreach(println)
  }
}

mapPartitions()操作如圖所示:

mapPartitions()函式結構:

def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false
): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}

功能介紹:

  • f函式把每一個磁區的資料分別放入到迭代器中,批處理,
  • preservesPartitioning:是否保留上游RDD的磁區資訊,默認false
  • Map是一次處理一個元素,而mapPartitions一次處理一個磁區資料,

備注:map()和mapPartitions()的區別

  1. map():每次處理一條資料,
  2. mapPartitions():每次處理一個磁區的資料,這個磁區的資料處理完后,原 RDD 中該磁區的資料才能釋放,可能導致 OOM,
  3. 開發指導:當記憶體空間較大的時候建議使用mapPartitions(),以提高處理效率,

mapPartitionsWithIndex()帶磁區號

需求:

創建一個RDD,使每個元素跟所在磁區號形成一個元組,組成一個新的RDD,

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    mapPartitionsWithIndexTest(sc)
    //4.關閉連接
    sc.stop()
  }

  /**
   * 創建一個RDD,使每個元素跟所在磁區號形成一個元組,組成一個新的RDD
   * */
  def mapPartitionsWithIndexTest(sc: SparkContext): Unit ={
    // 3.1 創建第一個RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 4,2)

    // 3.2 需求實作
    val value: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index: Int, item: Iterator[Int]) => {
      item.map((f: Int) => (index,f))
    })
    // 3.3 列印
    value.foreach(println)
  }
}

mapPartitionsWithIndex()操作如圖所示:

mapPartitionsWithIndex()函式結構:

def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning)
}

功能介紹:

  1. f: (Int, Iterator[T]) => Iterator[U]中Int表示磁區編號
  2. 類似于mapPartitions,比mapPartitions多一個整數引數表示磁區號

flatMap()壓平

需求:

創建一個集合,集合里面存盤的還是子集合,把所有子集合中資料取出放入到一個大的集合中,

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    flatMapTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * 創建一個集合,集合里面存盤的還是子集合,把所有子集合中資料取出放入到一個大的集合中
   * */
  def flatMapTest(sc: SparkContext): Unit ={
    // 3.1 創建第一個RDD
    val rdd: RDD[List[Int]] = sc.makeRDD(Array(List(1,2),List(3,4),List(5,6),List(7)),2)

    // 3.2 需求實作
    val value: RDD[Int] = rdd.flatMap((list: List[Int]) => list)

    // 3.3 列印
    value.foreach(println)
  }
}

flatMap操作如圖所示:

擴展: 可以通過任務背景關系獲取磁區號,

rdd.foreach((f: List[Int]) => {
// 通過任務背景關系獲取partitionID
println(
TaskContext.getPartitionId() + "---"+ f.mkString(","))

})

flatMap()函式結構:

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

功能介紹:

  1. 與map操作類似,將RDD中的每一個元素通過應用f函式依次轉換為新的元素,并封裝到RDD中,
  2. 區別:在flatMap操作中,f函式的回傳值是一個集合,并且會將每一個該集合中的元素拆分出來放到新的RDD中,并且新的RDD中繼承了原RDD中的磁區數

glom()磁區轉換陣列

需求:

創建一個2個磁區的RDD,并將每個磁區的資料放到一個陣列,求出每個磁區的最大值,

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    glomTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * 創建一個2個磁區的RDD,并將每個磁區的資料放到一個陣列,求出每個磁區的最大值
   * */
  def glomTest(sc: SparkContext): Unit ={
    // 3.1 創建第一個RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 4,2)
    // 3.2 需求實作
    val value: RDD[Int] = rdd.glom().mapPartitions((x: Iterator[Array[Int]]) => x.map((f: Array[Int]) => f.max))

    // 3.3 列印
    value.foreach((f: Int) => {
      println(TaskContext.getPartitionId() + ":" + f)
    })
  }
}

glom()操作如圖所示:

glom()函式結構:

def glom(): RDD[Array[T]] = withScope {
new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}

功能介紹:

該操作將RDD中每一個磁區變成一個陣列,并放置在新的RDD中,陣列中元素的型別與原磁區中元素型別一致,

groupBy()分組

需求:

創建一個RDD,按照元素模以2的值進行分組,

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    groupByTest(sc)
    //4.關閉連接
    sc.stop()
  }

  /**
   * 創建一個RDD,按照元素模以2的值進行分組
   * */
  def groupByTest(sc: SparkContext): Unit ={
    // 3.1 創建第一個RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 10,3)
    // 3.2 需求實作
    val value: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)
    // 3.3 列印
        value.foreach(println)

  }
}

groupBy()操作如圖所示:

groupBy()函式結構:

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
groupBy[K](f, defaultPartitioner(this))
}

功能介紹:

分組,按照傳入函式的回傳值進行分組,將相同的key對應的值放入一個迭代器,

備注:

1、groupBy會存在shuffle程序
2、shuffle:將不同的磁區資料進行打亂重組的程序
3、shuffle一定會落盤,

擴展:復雜版的wordcount

需求:

有如下的資料,("Hello Scala", 2), ("Hello Spark", 3), ("Hello World", 2),("I Love You",5),("I Miss You",2),("Best wish",9),其中數字代表出現的個數,求每個單詞出現的次數,

代碼實作:

def worldCountTest(sc: SparkContext): Unit ={
    val rdd: RDD[(String, Int)] = sc.makeRDD(Array(("Hello Scala", 2), ("Hello Spark", 3), ("Hello World", 2), ("I Love You", 5), ("I Miss You", 2), ("Best wish", 9)))

    // 方式一:適合scala
    /*val value: String = rdd.map {
          // 模式匹配
      case (str, count) => {
        // scala對字串操作,("Hello Scala" + " ") * 2 = Hello Scala Hello Scala
        (str + " ") * count
      }
    }
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_+_)
      .collect()
      .mkString(",")
*/

    // 方式二:比較通用
    val value: String = rdd.flatMap {
      case (str, i) => {
        str.split(" ").map((word: String) => (word, i))
      }
    }.reduceByKey(_ + _)
      .collect()
      .mkString(",")

    println(value)

  }

filter()過濾

需求:

創建一個1到10的RDD,過濾出偶數,

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    filterTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * 過濾偶數
   * */
  def filterTest(sc: SparkContext): Unit = {
    // 3.1 創建第一個RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 10, 3)
    // 3.2 需求實作
    val value: RDD[Int] = rdd.filter(_ % 2 == 0)
    // 3.3 列印
    value.foreach(println)
  }
}

filter()操作程序如下:

filter()函式結構:

def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}

功能介紹:

  • 接收一個回傳值為布爾型別的函式作為引數,
  • 當某個RDD呼叫filter方法時,會對該RDD中每一個元素應用f函式,如果回傳值型別為true,則該元素會被添加到新的RDD中,

sample()采樣

需求:

創建一個RDD(1-10),從中選擇放回和不放回抽樣,

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    sampleTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * 隨機采樣
   * */
  def sampleTest(sc: SparkContext): Unit ={
    // 3.1 創建第一個RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 20, 3)
    // 3.2 需求實作
    val value: String = rdd.sample(true,0.3,3).collect().mkString(",")
    val value2: String = rdd.sample(false,0.3,3).collect().mkString(",")
    // 3.3 列印
    
    println("不放回采樣結果:" + value2)
    println("放回采樣結果:" + value)
  }
}

sample()操作如圖所示:

sample()函式結構:

def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong
): RDD[T] = {
......
}

功能介紹:

  • 從大量的資料中采樣
  • withReplacement: Boolean: 抽出的資料是否放回
  • fraction: Double:
    • 當withReplacement=false時:選擇每個元素的概率;取值一定是[0,1] ;底層使用泊松分布,
    • 當withReplacement=true時:選擇每個元素的期望次數; 取值必須大于等于0,底層使用伯努利采樣,
  • seed: Long: 指定亂數生成器種子

備注:

1、 該函式隨機采樣是偽隨機,因為傳入的隨機種子是一樣的,計算結果當然一樣,

distinct()去重

需求:

對如下的資料去重:3,2,9,1,2,1,5,2,9,6,1

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    distinctTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * 資料去重
   * */
  def distinctTest(sc: SparkContext): Unit = {
    // 3.1 創建第一個RDD
    val rdd: RDD[Int] = sc.makeRDD(List(3,2,9,1,2,1,5,2,9,6,1))
    // 3.2 需求實作
    val value: RDD[Int] = rdd.distinct()
    // 3.3 列印
    value.foreach(println)
  }
}

distinct()操作如圖所示:

distinct()函式結構:

def distinct(): RDD[T] = withScope {
distinct(partitions.length)
}

def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}

功能介紹:

對內部的元素去重,并將去重后的元素放到新的RDD中,

備注:

1、distinct()用分布式的方式去重比HashSet集合方式不容易OOM.

2、默認情況下,distinct會生成與原RDD磁區個數一致的磁區數,當然也可以指定磁區數,

coalesce()重新磁區

需求:

1、將4個磁區的RDD合并成兩個磁區的RDD

2、將三個磁區的RDD合并成兩個磁區的RDD

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    coalesceTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * coalesce重磁區
   * */
  def coalesceTest(sc: SparkContext): Unit = {
    // 3.1 創建第一個RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 4,4)
    val rdd2: RDD[Int] = sc.makeRDD(1 to 4,3)
    // 3.2 需求實作
    val value1: Array[(Int, Int)] = rdd.coalesce(2).map((TaskContext.getPartitionId(),_)).collect()
    val value2: Array[(Int, Int)] = rdd2.coalesce(2).map((TaskContext.getPartitionId(),_)).collect()
    // 3.3 列印
    println("value1:" + value1.mkString(","))
    println("value2:" + value2.mkString(","))
  }
}

coalesce()操作如圖所示:

coalesce()函式結構:

def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)

: RDD[T] = withScope {

......

}

功能介紹:

  • 縮減磁區數,用于大資料集過濾后,提高小資料集的執行效率,
  • shuffle默認為false,則該操作會將磁區數較多的原始RDD向磁區數比較少的目標RDD,進行轉換,
  • shuffle:
    • true: 進行shuffle,此時目標磁區數可以大于可以小于原磁區數,即:可以縮小和擴大原磁區,
    • false:不進行shuffle,此時目標磁區數只能小于原磁區數;當大于時,磁區數不生效,即:只能縮小或等于原磁區,

備注:

1、shuffle原理: 將資料打散,然后重新組合,

2、具體shuffle程序,讀者可以參考“徹底搞懂spark的shuffle程序”,

reparation()重新磁區(執行Shuffle)

需求:

創建一個4個磁區的RDD,對其重新磁區

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    repartitionTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * repartition重磁區
   * */
  def repartitionTest(sc: SparkContext): Unit = {
    // 3.1 創建第一個RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 10,4)

    // 3.2 需求實作
    val value1: Array[(Int, Int)] = rdd.repartition(2).map((TaskContext.getPartitionId(),_)).collect()
    val value2: Array[(Int, Int)] = rdd.repartition(5).map((TaskContext.getPartitionId(),_)).collect()

    // 3.3 列印
    println("value1:" + value1.mkString(","))
    println("value2:" + value2.mkString(","))
  }
}

reparation()操作如圖所示:

reparation()函式結構:

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}

功能介紹:

  • 該操作內部其實執行的是coalesce操作,引數shuffle的默認值為true,
  • 無論是將磁區數多的RDD轉換為磁區數少的RDD,還是將磁區數少的RDD轉換為磁區數多的RDD,repartition操作都可以完成,因為無論如何都會經shuffle程序,

備注: coalesce與reparation的區別

  • coalesce重新磁區,可以選擇是否進行shuffle程序,由引數shuffle: Boolean = false/true決定,
  • repartition實際上是呼叫的coalesce,進行shuffle,
  • 如果是減少磁區, 盡量避免 shuffle,使用coalesce完成,
  • 絕大多數情況下,減少磁區使用coalesce,增加磁區使用reparation,

sortBy()排序

需求:

創建一個RDD,按照不同的規則進行排序,

  1. 按照數字大小升序排序
  2. 按照數字大小降序排序
  3. 按照模以5的余數降序排序
  4. 二元組,則先按照第一個元素升序,如果第一個元素相同,在按照第二個元素降序

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    sortByTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * sortBy排序
   * */
  def sortByTest(sc: SparkContext): Unit = {
    // 3.1 創建第一個RDD
    val rdd: RDD[Int] = sc.makeRDD(List(3,16,5,8,2,10,9,1,7,6))

    // 3.2 需求實作
    // 升序
    val value1: String = rdd.sortBy((num: Int) => num).collect().mkString(",")
    // 降序
    val value2: String = rdd.sortBy((num: Int) => num,false).collect().mkString(",")
    // 按照%5排序
    val value3: String = rdd.sortBy((num: Int) => num % 5,false).collect().mkString(",")

    // 先按照第一個元素升序,如果第一個元素相同,在按照第二個元素降序
    val value4: String = rdd2.sortBy((x: (Int, Int)) => (x._1,-x._2)).collect().mkString(",")

    // 3.3 列印
    println("value1:" + value1)
    println("value2:" + value2)
    println("value3:" + value3)
    println("value4:" + value4)
  }
}

sortBy()操作如圖所示:

sortBy()函式結構:

def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length
)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
this.keyBy[K](f)
.sortByKey(ascending, numPartitions)
.values
}

功能介紹:

  • 該操作用于排序資料,
  • 在排序之前,可以將資料通過f函式進行處理,之后按照f函式處理的結果進行排序,默認為正序排列,
  • 默認情況下,排序后新產生的RDD的磁區數與原RDD的磁區數一致,
  • 可以通過numPartitions引數調整目標磁區數,

pipe()呼叫腳本

需求:

撰寫一個腳本,使用管道將腳本作用于RDD上,

代碼實作:

# 撰寫一個腳本,并增加執行權限
[root@node001 spark]$ vim pipe.sh
#!/bin/sh
echo "Start"
while read LINE; do
   echo ">>>"${LINE}
done

[root@node001 spark]$ chmod 777 pipe.sh

# 創建一個只有一個磁區的RDD
scala> val rdd = sc.makeRDD (List("hi","Hello","how","are","you"),1)

# 將腳本作用該RDD并列印
scala> rdd.pipe("/opt/module/spark/pipe.sh").collect()
res18: Array[String] = Array(Start, >>>hi, >>>Hello, >>>how, >>>are, >>>you)

#創建一個有兩個磁區的RDD
scala> val rdd = sc.makeRDD(List("hi","Hello","how","are","you"),2)

# 將腳本作用該RDD并列印
scala> rdd.pipe("/opt/module/spark/pipe.sh").collect()
res19: Array[String] = Array(Start, >>>hi, >>>Hello, Start, >>>how, >>>are, >>>you)

pipe()操作如圖所示:

pipe()函式結構:

def pipe(command: String): RDD[String] = withScope {
// Similar to Runtime.exec(), if we are given a single string, split it into words
// using a standard StringTokenizer (i.e. by spaces)
pipe(PipedRDD.tokenize(command))
}

功能介紹:

  • 管道,針對每個磁區,都呼叫一次shell腳本,回傳輸出的RDD
  • 腳本要放在 worker 節點可以訪問到的位置

  • 每個磁區執行一次腳本, 但是每個元素算是標準輸入中的一行

雙Value型別互動

雙Value型別的互動常用的算子是數學中的并集、交集合差集,

union()并集、subtract()差集、intersection()交集

需求:

創建兩個RDD,求并集、交集、差集

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    aggTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * 求并集、交集、差集
   * */
  def aggTest(sc: SparkContext): Unit ={
    // 3.1 創建第一個RDD
    val rdd1: RDD[Int] = sc.makeRDD (1 to 6)
    val rdd2: RDD[Int] = sc.makeRDD (4 to 10)

    // 3.2 需求實作
    // 并集
    val value1: Array[Int] = rdd1.union(rdd2).collect()
    // 差集
    val value2: Array[Int] = rdd1.subtract(rdd2).collect()
    // 交集
    val value3: Array[Int] = rdd1.intersection(rdd2).collect()

    // 3.3 列印
    println("并集:" + value1.mkString(","))
    println("差集:" + value2.mkString(","))
    println("交集:" + value3.mkString(","))
  }
}

union()并集操作如圖所示:

subtract()差集操作如圖所示:

intersection()交集操作如圖所示:

union()函式結構:

def union(other: RDD[T]): RDD[T] = withScope {
sc.union(this, other)
}

subtract()函式結構:

def subtract(other: RDD[T]): RDD[T] = withScope {
subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
}

intersection()函式結構:

def intersection(other: RDD[T]): RDD[T] = withScope {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
}

功能介紹:

  • 并集:對源RDD和引數RDD求并集后回傳一個新的RDD
  • 差集:計算差的一種函式,去除兩個RDD中相同元素,不同的RDD將保留下來
  • 交集:對源RDD和引數RDD求交集后回傳一個新的RDD

zip()拉鏈

需求:

創建兩個RDD,并將兩個RDD組合到一起形成一個(k,v)RDD

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    zipTest(sc)
    
    //4.關閉連接
    sc.stop()
  }
  /**
   * zip拉鏈
   * */
  def zipTest(sc: SparkContext): Unit ={
    // 3.1 創建第一個RDD
    val rdd1: RDD[Int] = sc.makeRDD (1 to 3,2)
    val rdd2: RDD[String] = sc.makeRDD (List("a","b","c"),2)

    // 3.2 需求實作
    val value: Array[(String, Int)] = rdd2.zip(rdd1).collect()

    // 3.3 列印
    println(value.mkString(","))
  }
}

zip()操作如圖所示:

zip()函式結構:

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {

......

}

功能介紹:

  • 該操作可以將兩個RDD中的元素,以鍵值對的形式進行合并,其中,鍵值對中的Key為第1個RDD中的元素,Value為第2個RDD中的元素,
  • 將兩個RDD組合成Key/Value形式的RDD,這里默認兩個RDD的partition數量以及元素數量都相同,否則會拋出例外,

Key-Value型別

partitionBy()按照k重新磁區

需求:

創建一個5個磁區的RDD,對其重新磁區,

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    partitionByKeyTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * partitionByKey:根據key重新磁區
   * */
  def partitionByKeyTest(sc: SparkContext): Unit ={
    val tuples: List[(Int, String)] = List((1, "a"), (2, "b"), (3, "c"), (4, "d"), (1, "aa"), (1, "bb"), (3, "cc"), (4, "dd")
      ,(1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd"))
    // 3.1 創建第一個RDD
    val rdd: RDD[(Int, String)] = sc.makeRDD (tuples,5)
    rdd.cache()

    println("-------磁區前-------")
    val value1: String = rdd.mapPartitionsWithIndex((index: Int, data: Iterator[(Int, String)]) => {
      data.map((index,_))
    }).collect().mkString("\t")

    println(value1)
    // 3.2 需求實作
    val value: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(3))

    // 3.3 列印
    println("-------磁區后-------")
    val value2: String = value.mapPartitionsWithIndex((index: Int, data: Iterator[(Int, String)]) => {
      data.map((index,_))
    }).collect().mkString("\t")
    println(value2)
  }
}

partitionBy()操作如圖所示:

partitionBy()函式結構:

def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
if (self.partitioner == Some(partitioner)) {
self
} else {
new ShuffledRDD[K, V, V](self, partitioner)
}
}

功能介紹:

  • 將RDD[K,V]中的K按照指定Partitioner重新進行磁區;會產生Shuffle程序,
  • 如果原有的partionRDD和現有的partionRDD是一致的話就不進行磁區,否則進行磁區,
  • 默認的磁區器是HashPartitioner,

備注:

1、HashPartitioner的原理是:

  • 依據RDD中key值的hashCode的值將資料取模后得到該key值對應的下一個RDD的磁區id值,
  • 支持key值為null的情況,當key為null的時候,回傳0;
  • 該磁區器基本上適合所有RDD資料型別的資料進行磁區操作,

2、詳細的磁區器原理,請參考:[Spark] - HashPartitioner & RangePartitioner 區別

注意:

1、由于JAVA中陣列的hashCode是基于陣列物件本身的,不是基于陣列內容的,所以如果RDD的key是陣列型別,那么可能導致資料內容一致的資料key沒法分配到同一個RDD磁區中,(未測)

2、在scala中,如果RDD的key是Array陣列型別,編譯不通過,Exception in thread "main" org.apache.spark.SparkException: HashPartitioner cannot partition array keys.

這個時候最好自定義資料磁區器,采用陣列內容進行磁區或者將陣列的內容轉換為集合,

自定義磁區

class MyPartitioner(num: Int) extends Partitioner {
// 設定的磁區數
override def numPartitions: Int = num

// 具體磁區邏輯
override def getPartition(key: Any): Int = {

if (key.isInstanceOf[Int]) {

val keyInt: Int = key.asInstanceOf[Int]
if (keyInt % 2 == 0)
0
else
1
}else{
0
}
}
}

reduceByKey()按照k進行聚合v

需求:

統計單詞出現次數(wordCount),

代碼實作:

為了增加難度,本次實驗給字串加了一些中英文的字符,

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    reduceByKeyTest(sc)

    //4.關閉連接
    sc.stop()
  }


  def reduceByKeyTest(sc: SparkContext): Unit = {
    val tuple1: String = "Apache Spark? is a: multi-language engine for executing★ data engineering, "
    val tuple2: String = "$Apache Spark? is a multi-language ※engine for executing data ¥engineering, "
    val tuple3: String = "Apache Spark? is (a multi-language) engine for executing data engineering, "

    val tuples: String = tuple1 + tuple2 + tuple3
    // 忽略標點字符
    val str: String = tuples.replaceAll("\\pP|\\pS", "")
    // 3.1 創建第一個RDD
    val rdd: RDD[String] = sc.parallelize(Array(str))
    // 3.2 需求實作
    val value2: String = rdd.flatMap(f => f.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .collect()
      .mkString("\n")

    // 3.3 列印

    println(value2)
  }
}

運行結果:

備注: 正則運算式過濾字串的方法

/pP :其中的小寫 p 是 property 的意思,表示 Unicode 屬性,用于 Unicode 正運算式的前綴,大寫 P 表示 Unicode 字符集七個字符屬性之一:標點字符,

其他六個是:

L:字母;
M:標記符號(一般不會單獨出現);
Z:分隔符(比如空格、換行等);
S:符號(比如數學符號、貨幣符號等);
N:數字(比如阿拉伯數字、羅馬數字等);
C:其他字符

reduceByKey()操作如圖:

reduceByKey()函式結構:

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
reduceByKey(new HashPartitioner(numPartitions), func)
}

def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}

功能介紹:

  • 該操作可以將RDD[K,V]中的元素按照相同的K對V進行聚合,
  • 其存在多種多載形式,還可以設定新RDD的磁區數,
  • 默認的磁區器為HashPartitioner

groupByKey()按照k重新分組

需求:

求List(("a",1),("b",5),("a",5),("b",2),("a",1),("c",5),("b",5),("b",2))中abc平均出現的次數,

函式實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    groupByKeyTest(sc)

    //4.關閉連接
    sc.stop()
  }

  /**
   * groupByKey():按照key分組
   * */
  def groupByKeyTest(sc: SparkContext): Unit = {
    // 3.1 創建第一個RDD

    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("c",15),("a",5),("b",2),("a",1),("c",9),("b",5),("b",2)), 2)

    // 3.2 需求實作
    val value: Array[(String, String)] = rdd.groupByKey().map(f => {
      val format: DecimalFormat = new DecimalFormat("#0.00")
      val sum: Int = f._2.sum
      val count: Int = f._2.size
      val nums: Double = sum.toDouble / count
      (f._1,format.format(nums))
    }).collect()

    // 3.3 列印
    println(value.mkString("\n"))
  }
}

備注: DecimalFormat 類,

該類主要靠 # 和 0 兩種占位符號來指定數字長度,0 表示如果位數不足則以 0 填充,# 表示只要有可能就把數字拉上這個位置,

如:#.00表示小數點后保留兩位,小數點前不確定,有幾位都行,

00.00表示整數部分兩位,不足用0補足;小數部分2位,不足用0補足,

groupByKey()操作如圖:

groupByKey()函式結構:

def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(defaultPartitioner(self))
}

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {

}

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(new HashPartitioner(numPartitions))
}

功能介紹:

  • groupByKey對每個key進行操作,但只生成一個seq,并不進行聚合,
  • 該操作可以指定磁區器或者磁區數(默認使用HashPartitioner)

備注: reduceByKey和groupByKey區別

  1. reduceByKey:按照key進行聚合,在shuffle之前有combine(預聚合)操作,回傳結果是RDD[k,v],
  2. groupByKey:按照key進行分組,直接進行shuffle,
  3. 開發指導:
    1. 在不影響業務邏輯的前提下,優先選用reduceByKey,
    2. 求和操作不影響業務邏輯,求平均值影響業務邏輯,

aggregateByKey()按照k進行磁區內和磁區間邏輯

需求

創建一個RDD,取出RDD中每個磁區相同key對應值的最大值,然后相加,

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    aggregateByKeyTest(sc)

    //4.關閉連接
    sc.stop()
  }

  /**
   * 取出每個磁區相同key對應值的最大值,然后相加
   * */
  def aggregateByKeyTest(sc: SparkContext): Unit = {
    // 3.1 創建第一個RDD
    val array: Array[(String, Int)] = Array(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8))
    val rdd: RDD[(String, Int)] = sc.makeRDD(array, 2)

    // 3.2 取出每個磁區相同key對應值的最大值,然后相加
    //    val value: RDD[(String, Int)] = rdd.aggregateByKey(0)(math.max, _ + _)
    // 不使用math函式
    val value: RDD[(String, Int)] = rdd.aggregateByKey(0)((a: Int, b: Int) => {
      if (a > b) a else b
    }, _ + _)

    // 3.3 列印結果
    value.foreach(println)
  }

aggregateByKey()操作如圖所示:

函式結構 :

def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(

seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
*********
}

功能介紹:

(1)zeroValue(初始值):給每一個磁區中的每一種key一個初始值;
(2)seqOp(磁區內):函式用于在每一個磁區中用初始值逐步迭代value;
(3)combOp(磁區間):函式用于合并每個磁區中的結果,

foldByKey()磁區內核磁區間具有相同邏輯的aggregateByKey()

需求:

將上例中的資料求一個worldcount,

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    foldByKeyTest(sc)

    //4.關閉連接
    sc.stop()
  }

  def foldByKeyTest(sc: SparkContext): Unit = {
    // 3.1 創建第一個RDD
    val array: Array[(String, Int)] = Array(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8))
    val rdd: RDD[(String, Int)] = sc.makeRDD(array)

    // 3.2 worldcount
    //    val value: RDD[(String, Int)] = rdd.aggregateByKey(0)(_+_, _ + _)
    val value: RDD[(String, Int)] = rdd.foldByKey(0)(_ + _)


    // 3.3 列印
    value.foreach(println)
  }

foldByKey()操作如圖所示:

foldByKey()函式結構:

def foldByKey(
zeroValue: V,
partitioner: Partitioner
)(func: (V, V) => V): RDD[(K, V)] = self.withScope {

........

}

功能介紹:

  1. aggregateByKey的簡化操作,seqop和combop相同,即,磁區內邏輯和磁區間邏輯相同,
  2. 引數zeroValue:是一個初始化值,它可以是任意型別

  3. 引數func:是一個函式,兩個輸入引數相同

combineByKey()轉換結構后磁區內和磁區間操作

需求:

創建一個pairRDD,根據key計算每種key的均值,

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    combineKeyTest(sc)

    //4.關閉連接
    sc.stop()
  }


  def combineKeyTest(sc: SparkContext): Unit = {
    // 3.1 創建第一個RDD
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)))

    // 3.2 需求實作: 將相同key對應的值相加,同時記錄該key出現的次數,放入一個二元組
    val value: Array[(String, String)] = rdd.combineByKey(
      (_, 1),
      (acc: (Int, Int), other: Int) => (acc._1 + other, acc._2 + 1), // 注意此處的型別必須手動添加,否則無法使用
      (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
    ).map(f => {
      val format: DecimalFormat = new DecimalFormat("#0.00")
      (f._1, format.format(f._2._1 / f._2._2.toDouble))
    }).collect()


    // 3.3 列印
    println(value.mkString("\n"))
  }
}

combineByKey()操作如圖所示:

combineByKey()函式功能:

def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null
): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
partitioner, mapSideCombine, serializer)(null)
}

def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)]
= self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)
}

功能介紹:

  • createCombiner:V=>C 分組內的創建組合的函式,通俗點將就是對讀進來的資料進行初始化,其把當前的值作為引數,可以對該值做一些轉換操作,轉換為我們想要的資料格式
  • mergeValue: (C, V) => C該函式主要是磁區內的合并函式,作用在每一個磁區內部,其功能主要是將V合并到之前(createCombiner)的元素C上,注意,這里的C指的是上一函式轉換之后的資料格式,而這里的V指的是原始資料格式(上一函式為轉換之前的)
  • mergeCombiners:(C,C)=>R 該函式主要是進行多磁區合并,此時是將兩個C合并為一個C,例如兩個C:(Int)進行相加之后得到一個R:(Int)

備注: reduceByKey、aggregateByKey、foldByKey、combineByKey 四種對key聚合的區別和聯系

  1. 四種底層都呼叫了combineByKeyWithClassTag
  2. reduceByKey 沒有初始值,磁區內和磁區間邏輯相同
  3. aggregateByKey 第一個初始值和磁區內處理規則一致,磁區內和磁區間邏輯可以不同
  4. foldByKey 有初始值,磁區內和磁區間邏輯一致

  5. combineByKey 初始值可以變化結構,磁區內和磁區間邏輯不同

sortByKey()按照k進行排序

需求:

創建一個pairRDD,按照key的正序和倒序進行排序

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    sortByKeyTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * sortByKeyTest():在一個(K,V)的RDD上呼叫,K必須實作Ordered介面,回傳一個按照key進行排序的(K,V)的RDD
   * */
  def sortByKeyTest(sc: SparkContext): Unit = {
    // 3.1 創建第一個RDD
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)))

    // 3.2 需求實作
    val value1: Array[(String, Int)] = rdd.sortByKey().collect()
    val value2: Array[(String, Int)] = rdd.sortByKey(false).collect()


    // 3.3 列印
    println(value1.mkString("\n"))
    println("--------------------------")
    println(value2.mkString("\n"))
  }
}

sortByKey()操作如圖所示:

sortByKey()函式結構:

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

功能介紹:

  • 在一個(K,V)RDD上呼叫,K必須實作Ordered介面,回傳一個按照key進行排序的(K,V)RDD
  • 默認升序

mapValues()只對v進行操作

需求:

創建一個pairRDD,并將value資料轉換成大寫,

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    mapValuesTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * mapValuesTest():只對Values操作
   * */
  def mapValuesTest(sc: SparkContext): Unit = {
    // 3.1 創建第一個RDD
    val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "b"), (3, "c"), (4, "d"), (1, "aa"), (1, "bb"), (3, "cc")))

    // 3.2 需求實作
    val value: Array[(Int, String)] = rdd.mapValues(_.toUpperCase).collect()

    // 3.3 列印
    println(value.mkString("\n"))
  }
}

mapValues()操作如圖所示:

mapValues()函式結構:

def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
preservesPartitioning = true)
}

功能介紹:

針對于(K,V)形式的型別只對V進行操作

join()將相同k對應的多個v關聯在一起

需求:

創建兩個pairRDD,并將key相同的資料聚合到一個元組,

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    joinTest(sc)

    //4.關閉連接
    sc.stop()
  }

  /**
   * joinTest():在型別為(K,V)和(K,W)的RDD上呼叫,回傳一個相同key對應的所有元素對在一起的(K,(V,W))的RDD
   * */
  def joinTest(sc: SparkContext): Unit = {
    //3.1 創建第兩個RDD
    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c"),(0,"零")))
    val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (4, 6)))

    // 3.2 需求實作
    val value: Array[(Int, (String, Int))] = rdd.join(rdd1).collect()

    // 3.3 列印
    println(value.mkString("\n"))
  }
}

join()操作如圖所示:

join()函式結構:

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}

功能介紹:

在型別為(K,V)和(K,W)的RDD上呼叫,回傳一個相同key對應的所有元素對在一起的(K,(V,W))的RDD

擴展:左外連接、右外連接和全外連接

回傳型別為:(Int, (String, Option[Int])),

Option[A] (sealed trait) 有兩個取值:
1. Some[A] 有型別A的值
2. None 沒有值

一般通過f._2._2.getOrElse("默認值")獲取值,

左外連接(左連接):左邊都有,右邊關聯不上,用None表示,

def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)

右外連接(右連接):右邊都有,左邊關聯不上,用None表示

def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)

全外連接(全連接):左右都有,關聯不上,用None表示,

def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)

cogroup()類似全連接,但是在同一個RDD中對k聚合

需求:

創建兩個pairRDD,并將key相同的資料聚合到一個迭代器,

代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    cogroupTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * cogroupTest():在型別為(K,V)和(K,W)的RDD上呼叫,回傳一個(K,(Iterable<V>,Iterable<W>))型別的RDD
   * */
  def cogroupTest(sc: SparkContext): Unit = {
    //3.1 創建第兩個RDD
    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (3, "cC"), (2, "b"), (3, "c"),(0,"零"),(1, "aA")))
    val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (2, 55),(4, 6),(1, 44)))

    // 3.2 需求實作
    val value: Array[(Int, (Iterable[String], Iterable[Int]))] = rdd.cogroup(rdd1).collect()
    
    // 3.3 列印
    println(value.mkString("\n"))
  }
}

cogroup()操作如圖所示:

cogroup()函式結構:

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
cogroup(other, defaultPartitioner(self, other))
}

功能介紹:

在型別為(K,V)和(K,W)的RDD上呼叫,回傳一個(K,(Iterable<V>,Iterable<W>))型別的RDD

第二部分:Action行動算子

行動算子是觸發了整個作業的執行,因為轉換算子都是懶加載,并不會立即執行,常用的行動算子有如下幾個,

說明:為了方便,行動算子的測驗也將在object TransformationDemo包下實作,不再新建一個包,

reduce()聚合

1)函式簽名:def reduce(f: (T, T) => T): T

2)功能說明:f函式聚集RDD中的所有元素,先聚合磁區內資料,再聚合磁區間資料

3)需求說明:創建一個RDD,將所有元素聚合得到結果

4)代碼如下:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    reduceTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * 行動算子:
   * reduce:聚集RDD內所有元素
   * */
  def reduceTest(sc: SparkContext): Unit = {
    //3.1 創建第兩個RDD
    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (3, "cC"), (2, "b"), (3, "c"),(0,"零"),(1, "aA")))
    val rdd1: RDD[Int] = sc.makeRDD(1 to 10)

    // 3.2 需求實作
    val value: (Int, String) = rdd.reduce((k1: (Int, String), k2: (Int, String)) => (k1._1 + k2._1,k1._2 + k2._2))
    val value1: Int = rdd1.reduce(_+_)

    // 3.3 列印
    println(value)
    println(value1)
  }
}

collect()以陣列的形式回傳資料集

1)函式簽名:def collect(): Array[T]

2)功能說明:在驅動程式中,以陣列Array的形式回傳資料集的所有元素,

注意:所有的資料都會被拉取到Driver端,當資料量大時,會發生OOM,慎用,

3)需求說明:創建一個RDD,并將RDD內容收集到Driver端列印

4)代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    collectTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * 行動算子:
   * collect:收集資料到Driver端,以陣列的形式回傳,
   * */
  def collectTest(sc: SparkContext): Unit = {
    //3.1 創建第兩個RDD
    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (3, "cC"), (2, "b"), (3, "c"),(0,"零"),(1, "aA")))

    // 3.2 需求實作
    val value: Array[(Int, String)] = rdd.collect()

    // 3.3 列印
    value.foreach(println)

  }
}

count()回傳RDD中元素個數

1)函式簽名:def count(): Long

2)功能說明:回傳RDD中元素的個數,

3)需求說明:創建一個RDD,統計該RDD的條數,

4)代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    countTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * 行動算子:
   * count:回傳RDD的條數
   * */
  def countTest(sc: SparkContext): Unit = {
    //3.1 創建第兩個RDD
    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (3, "cC"), (2, "b"), (3, "c"),(0,"零"),(1, "aA")))

    // 3.2 需求實作
    val value: Long = rdd.count()

    // 3.3 列印
    println(value)
  }
}

first()回傳RDD中的第一個元素

1)函式簽名: def first(): T

2)功能說明:回傳RDD中的第一個元素,

3)需求說明:創建一個RDD,回傳該RDD中的第一個元素,

4)代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    firstTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * 行動算子:
   * first:回傳RDD中第一條資料
   * */
  def firstTest(sc: SparkContext): Unit = {
    //3.1 創建第兩個RDD
    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (3, "cC"), (2, "b"), (3, "c"),(0,"零"),(1, "aA")))

    // 3.2 需求實作
    val value: (Int, String) = rdd.first()

    // 3.3 列印
    println(value)
  }
}

take(n)回傳由RDD前n個元素組成的陣列

1)函式簽名: def take(num: Int): Array[T]

2)功能說明:回傳一個由RDD的前n個元素組成的陣列,

3)需求說明:回傳RDD中key值最大的前3個元素,

4)代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    takeTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * 行動算子:
   * take(n):回傳由RDD組成的前n個元素的陣列,
   * */
  def takeTest(sc: SparkContext): Unit = {
    //3.1 創建第兩個RDD
    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"),(4, "4c"), (5, "5c"), (3, "cC"), (2, "b"), (6, "c"),(0,"零"),(7, "aA")))

    // 3.2 需求實作
    val value: Array[(Int, String)] = rdd.sortByKey(false).take(3)

    // 3.3 列印
    println(value.mkString("\n"))
  }
}

takeOrdered(n)回傳RDD排序后前n個元素組成的陣列

1)函式簽名: def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

2)功能說明:回傳該RDD排序后的前n個元素組成的陣列,(默認升序)

3)需求說明:創建一個RDD,獲取該RDD排序后的前2個元素,

4)代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[*]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    takeTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * 行動算子:
   * takeOrdered(n):回傳該RDD排序后的前n個元素組成的陣列.
   * */
  def takeOrderedTest(sc: SparkContext): Unit = {
    //3.1 創建第兩個RDD
    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"),(4, "4c"), (5, "5c"), (3, "cC"), (2, "b"), (6, "c"),(0,"零"),(7, "aA")))

     // 3.2 需求實作
    val value: Array[(Int, String)] = rdd.takeOrdered(5)  // 升序

    // 自定義排序(重寫compare方法):降序
    val value1: Array[(Int, String)] = rdd.takeOrdered(5)(new Ordering[(Int,String)](){
      override def compare(x: (Int, String), y: (Int, String)): Int = y._1-x._1
    })

    // 3.3 列印
    println(value.mkString("\n"))
    println(value1.mkString("\n"))
  }
}

aggregate()案例

1)函式簽名: def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

2)功能說明:aggregate函式將每個磁區里面的元素通過磁區內邏輯和初始值進行聚合然后用磁區間邏輯和初始值(zeroValue)進行操作

注意:1、磁區間邏輯再次使用初始值和aggregateByKey是有區別的,

2、zeroValue 磁區內聚合和磁區間聚合的時候各會使用一次,

操作流圖:

3)需求說明:創建一個RDD,將所有元素相加得到結果,

4)代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[1]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    aggregateTest(sc) 

    //4.關閉連接
    sc.stop()
  }

  /**
   * 行動算子:
   * aggregateTest(n):aggregate函式將每個磁區里面的元素通過磁區內邏輯和初始值進行聚合,然后用磁區間邏輯和初始值(zeroValue)進行操作
   * */
  def aggregateTest(sc: SparkContext): Unit = {
    //3.1 創建第兩個RDD
    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"),(2, "4c"), (3, "5c"), (4, "cC")),2)
    // 3.2 需求實作
    val value: Int = rdd.aggregate(0)(
      (a: Int, b: (Int, String)) => a + b._1,
      (x: Int, y: Int) => x + y
    )

    val value3: Int = rdd.aggregate(10)(
      (a: Int, b: (Int, String)) => a + b._1,
      (x: Int, y: Int) => x + y
    )

    val value2: String = rdd.aggregate("x")(
      (a: String, b: (Int, String)) => a + b._1,
      (x: String, y: String) => x + y
    )

    // 3.3 列印
    println(value)
    println(value2)
    println(value3)
  }

fold()案例

1)函式簽名: def fold(zeroValue: T)(op: (T, T) => T): T

2)功能說明:折疊操作,aggregate的簡化操作,即,磁區內邏輯和磁區間邏輯相同,

3)需求說明:創建一個RDD,將所有元素相加得到結果,

4)代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[1]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    foldTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * 行動算子:
   * fold:折疊操作,aggregate的簡化操作,即,磁區內邏輯和磁區間邏輯相同,
   * */
  def foldTest(sc: SparkContext): Unit = {
    //3.1 創建第兩個RDD
    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"),(2, "4c"), (3, "5c"), (4, "cC")),2)
    // 3.2 需求實作
    val value: (Int, String) = rdd.fold((0," "))((z: (Int, String), x: (Int, String)) => (z._1 + x._1,x._2 + z._2))
    
    // 3.3 列印
    println(value)
  }
}

countByKey()統計每種key的個數

1)函式簽名:def countByKey(): Map[K, Long]

2)功能說明:統計每種key的個數,

注:可以用來查看資料是否傾斜,

3)需求說明:創建一個PairRDD,統計每種key的個數,

4)代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[1]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    countByKeyTest(sc)

    //4.關閉連接
    sc.stop()
  }

  /**
   * 行動算子:
   * countByKey:統計每種key的個數
   * */
  def countByKeyTest(sc: SparkContext): Unit = {
    //3.1 創建第兩個RDD
    val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "b"), (3, "c"), (4, "d"), (1, "aa"), (1, "bb"), (3, "cc")))

    // 3.2 需求實作
    val value: collection.Map[Int, Long] = rdd.countByKey()
    // 3.3 列印
    println(value)
  }
}

save相關算子

1saveAsTextFile(path)保存成Text檔案

(1)函式簽名:def saveAsTextFile(path: String)

(2)功能說明:將資料集的元素以textfile的形式保存到HDFS檔案系統或者其他支持的檔案系統,對于每個元素,Spark將會呼叫toString方法,將它裝換為檔案中的文本,

2saveAsSequenceFile(path) 保存成Sequencefile檔案

(1)函式簽名:def saveAsSequenceFile(path: String)

(2)功能說明:將資料集中的元素以Hadoop Sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的檔案系統,

注意:只有kv型別RDD有該操作,單值的沒有

3saveAsObjectFile(path) 序列化成物件保存到檔案

(1)函式簽名:def saveAsObjectFile(path: String)

(2)功能說明:用于將RDD中的元素序列化成物件,存盤到檔案中,

代碼演示:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[1]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    saveTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * 行動算子:
   * savexxx:將資料持久化到本地(或HDFS)
   * */
  def saveTest(sc: SparkContext): Unit = {
    //3.1 創建RDD
    val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (2, "b"), (3, "c"), (4, "d"), (1, "aa"), (1, "bb"), (3, "cc")))

    // 3.2 需求實作
    rdd.repartition(1).saveAsTextFile("file:///C:/tmp/output/txt/")
    rdd.repartition(1).saveAsObjectFile("file:///C:/tmp/output/obj/")
    rdd.repartition(1).saveAsSequenceFile("file:///C:/tmp/output/seq/")

  }
}

foreach()&foreachPartition遍歷RDD中每一個元素

1)函式簽名:def foreach(f: T => Unit): Unit

2)功能說明:遍歷RDD中的每一個元素,并依次應用f函式,

兩種形式的foreach算子操作如圖所示:

備注:

1、呼叫collect算子時,資料在Drive端列印;未呼叫collect算子時,資料不會回傳Drive端,而是直接在Executor端列印,

2、foreach和foreachPartition的區別:

foreach 是一條一條的拿資料進行處理;
foreachPartition是一個磁區一個磁區的拿資料,一個磁區中有很多資料的資訊,

所以,在使用中,當我們要把處理結果保存到資料庫中的時候,我們要使用foreachPartition這個方法,這樣效率會更高,

3)需求說明:創建一個RDD,對每個元素進行列印,

4)代碼實作:

object TransformationDemo {
  def main(args: Array[String]): Unit = {
    //1.創建SparkConf并設定App名稱
    val conf: SparkConf = new SparkConf().setMaster("local[1]")
      .setAppName("TransformationDemo_test")

    //2.創建SparkContext,該物件是提交Spark App的入口
    val sc: SparkContext = new SparkContext(conf)

    //3具體業務邏輯
    foreachTest(sc)

    //4.關閉連接
    sc.stop()
  }
  /**
   * 行動算子:
   * foreach和foreachPartition:遍歷RDD中的元素
   * */
  def foreachTest(sc: SparkContext): Unit = {
    //3.1 創建RDD
    val rdd: RDD[Int] = sc.makeRDD(1 to 100,5)

    // 3.2 需求實作
    rdd.foreach((f: Int) => println(TaskContext.getPartitionId() + "--" + f))
    rdd.foreachPartition((f: Iterator[Int]) => println(TaskContext.getPartitionId() + "--" + f.mkString(",")))

  }
}

總結

至此,常用的RDD算子已全部匯總完畢,當然,還有許多遺漏的算子和方法的使用未涉及到,感興趣的讀者可以自行研究學習,

下面為讀者匯總關于RDD算子中面試常見問題:

  • map與mapPartitions的區別

map():每次處理一條資料,

mapPartitions():每次處理一個磁區的資料,這個磁區的資料處理完后,原 RDD 中該磁區的資料才能釋放,可能導致 OOM,

開發指導:當記憶體空間較大的時候建議使用mapPartitions(),以提高處理效率,

  • coalesce與repartition兩個算子的作用以及區別與聯系

1)關系:
兩者都是用來改變RDD的partition數量的,repartition底層呼叫的就是coalesce方法:coalesce(numPartitions, shuffle = true)

2)區別:
repartition一定會發生shuffle,coalesce根據傳入的引數來判斷是否發生shuffle,
一般情況下增大rdd的partition數量使用repartition,減少partition數量時使用coalesce,

注:

1、coalesce填寫的引數小于等于RDD的partition數量,大于不起作用
2、coalesce中的引數過小可能會導致OOM,這時,又想保證小檔案的數量少,可以使用reparation,

  • 使用zip算子時需要注意的是什么(即哪些情況不能使用)

在 Spark 中, 兩個 RDD 的元素的數量和磁區數都必須相同, 否則會拋出例外,

其實本質就是要求的每個磁區的元素的數量相同.

  • aggregateByKey與aggregate之間的區別與聯系,

aggregate和aggregateByKey的引數是一樣的,作用也一樣,只不過aggregateByKey多了key而已,

如果將RDD分別有設定成2個磁區和4個磁區,則

  • 不影響aggregateByKey()的輸出結果,因為其是按照K處理磁區內和磁區間邏輯,
  • 會影響aggregate的輸出結果,2個磁區中,初始值共使用了3次;4個磁區中,初始值共使用了5次,
  • reduceByKey跟groupByKey之間的區別,

reduceByKey:按照key進行聚合,在shuffle之前有combine(預聚合)操作,回傳結果是RDD[k,v],

groupByKey:按照key進行分組,直接進行shuffle,回傳結果是RDD[(K, Iterable[V])],

開發指導:reduceByKey比groupByKey性能更好,并且groupByKey會造成OOM,建議使用,但是需要注意是否會影響業務邏輯,

  • reduceByKey跟aggregateByKey之間的區別與聯系,

reduceByKey可以認為是aggregateByKey的簡化版,
aggregateByKey,分為三個引數(zeroValue、seqOp、combOp),可以自定義初始值、磁區內、磁區間的操作,

  • combineByKey的引數作用,說明其引數呼叫時機,

def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]

(1)createCombiner:V=>C 分組內的創建組合的函式,會遍歷磁區中的每個key-value對. 如果第一次碰到這個key, 則呼叫createCombiner函式,傳入value, 得到一個C型別的值,(如果不是第一次碰到這個 key, 則不會呼叫這個方法)

(2)mergeValue:(C,V)=>C 該函式主要是磁區內的合并函式,作用在每一個磁區內部,如果不是第一個遇到這個key,則呼叫這個函式進行合并操作,

(3)mergeCombiners:(C,C)=>R 該函式主要是進行多磁區合并,此時是將兩個C合并為一個C,跨磁區合并相同的key的值,

路漫漫其修遠兮,吾將上下而求索, ---屈原

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/339076.html

標籤:其他

上一篇:MapReduce學習筆記,理解學習Hadoop的MapReduce計算系統

下一篇:Spring與Kafka整合

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more