目錄
- 基本概念
- 算子介紹
- 1. reduce
- 2. collect
- 3. count
- 4. first
- 5. take
- 6. takeOrdered
- 案例實操1-6
- 7. aggregate
- 8. fold
- 案例實操7-8
- 9. countByKey
- 案例實操
- 10. save相關算子
- 案例實操
- 11. foreach
- 案例實操
基本概念
行動算子主要是將在資料集上運行計算后的數值回傳到驅動程式,從而觸發觸發作業(Job)的執行,其底層代碼呼叫的就是runJob的方法,底層會創建ActiveJob,并提交執行,
算子介紹
1. reduce
函式定義
def reduce(f: (T, T) => T): T
說明
聚集RDD 中的所有元素,先聚合磁區內資料,再聚合磁區間資料,
2. collect
函式定義
def collect(): Array[T]
說明
在驅動程式中,以陣列Array 的形式回傳資料集的所有元素 ,
3. count
函式定義
def count(): Long
說明
回傳RDD 中元素的個數,
4. first
函式定義
def first(): T
說明
回傳RDD 中的第一個元素 ,
5. take
函式定義
def take(num: Int): Array[T]
說明
回傳一個由RDD 的前 n 個元素組成的陣列,
6. takeOrdered
函式定義
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
說明
回傳該RDD 排序后的前 n 個元素組成的陣列,
案例實操1-6
package com.atguigu.bigdata.spark.core.rdd.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark02_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
//TODO 準備環境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
//TODO - 行動算子
//reduce
// val i = rdd.reduce(_ + _)
//
// println(i)
//collect:方法會講不同磁區的資料按照磁區順序采集到Driver端記憶體中,形成陣列
// val ints: Array[Int] = rdd.collect()
//
// println(ints.mkString(","))
//統計資料源中的資料的個數
val cnt = rdd.count()
println(cnt)
//first:獲取資料源中第一個資料
val first = rdd.first()
println(first)
//take:獲取N個資料
val ints: Array[Int] = rdd.take(3)
println(ints.mkString(","))
val rdd1 = sc.makeRDD(List(4,2,3,1))
//takeOrdered:排序后,獲取N個資料
val ints1: Array[Int] = rdd1.takeOrdered(3)
println(ints1.mkString(","))
sc.stop()
}
}
7. aggregate
函式定義
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
說明
磁區的資料通過初始值和磁區內的資料進行聚合,然后再和初始值進行磁區間的資料聚合,
8. fold
函式定義
def fold(zeroValue: T)(op: (T, T) => T): T
說明
折疊操作,aggregate 的簡化版操作,
案例實操7-8
package com.atguigu.bigdata.spark.core.rdd.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark03_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
//TODO 準備環境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4),2)
//TODO - 行動算子
//aggregateByKey:初始值只會參與磁區內的計算
//aggregate:初始值會參與磁區內的計算,并且參與磁區間的計算
// val result: Int = rdd.aggregate(10)(_ + _, _ + _)
val result: Int = rdd.fold(10)(_ + _)
println(result)
sc.stop()
}
}
9. countByKey
函式定義
def countByKey(): Map[K, Long]
說明
統計每種key 的個數,
package com.atguigu.bigdata.spark.core.rdd.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark04_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
//TODO 準備環境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,1,3,4),2)
val rdd1 = sc.makeRDD(List(("a",1),("a",2),("a",3)))
//TODO - 行動算子
val intToLong: collection.Map[Int, Long] = rdd.countByValue()
println(intToLong)
val stringToLong: collection.Map[String, Long] = rdd1.countByKey()
println(stringToLong)
sc.stop()
}
}
案例實操
10. save相關算子
函式定義
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
說明
將資料保存到不同格式的檔案中,
案例實操
package com.atguigu.bigdata.spark.core.rdd.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark05_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
//TODO 準備環境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3)))
//TODO - 行動算子
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
//saveAsSequenceFile方法要求資料的格式必須為K-V型別
rdd.saveAsSequenceFile("output2")
sc.stop()
}
}
11. foreach
函式定義
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
說明
分布式遍歷RDD 中的每一個元素,呼叫指定函式,
案例實操
package com.atguigu.bigdata.spark.core.rdd.action
import org.apache.spark.{SparkConf, SparkContext}
object Spark06_RDD_Operator_Action {
def main(args: Array[String]): Unit = {
//TODO 準備環境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4) )
//foreach其實Driver端記憶體集合回圈遍歷的方法
rdd.collect().foreach(println)
println("*************")
//foreach其實是Executor端記憶體資料列印
rdd.foreach(println)
//算子:Operator(算子)
//RDD的方法和Scala集合物件的方法不一樣
//集合物件的方法都是在同一個節點的記憶體中完成的
//RDD的方法可以將計算邏輯發送到Executor端(分布式節點)執行
//為了區分不同的處理效果,所以將RDD的方法稱之為算子
//RDD的方法外部的操作都是在Driver端執行的,而方法內部的邏輯代碼是在EXecutor端執行
sc.stop()
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/294658.html
標籤:其他
