1)coalesce
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
該函式用于將RDD進行重磁區,使用HashPartitioner,
第一個引數為重磁區的數目,第二個為是否進行shuffle,默認為false;
作用
縮減磁區數,用于大資料集過濾后,提高小資料集的執行效率,
val ListRDD: RDD[Int] = context.makeRDD(Range(1, 15, 2), 4)
println("縮減磁區前: ")
ListRDD.glom.collect.foreach(data => println(data.mkString(",")))
val coalesceRDD: RDD[Int] = ListRDD.coalesce(3)
println("縮減磁區后: ")
coalesceRDD.glom.collect.foreach(data => println(data.mkString(",")))
輸出:
縮減磁區前:
1
3,5
7,9
11,13
縮減磁區后:
1
3,5
7,9,11,13
將coalesce引數設定為2:
縮減磁區前:
1
3,5
7,9
11,13
縮減磁區后:
1,3,5
7,9,11,13
coalesce進行收縮合并磁區,減少磁區的個數,并沒有shuffle操作,但這塊也有隱患,資料傾斜是一個問題.
- repartition
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
} 不同于coalesce算子的是,repartition算子必須進行shuffle操作
將上述coalesce改成repartition算子
縮減磁區前:
1
3,5
7,9
11,13
縮減磁區后:
5,13
1,7
3,9,11
引數3改成2
縮減磁區前:
1
3,5
7,9
11,13
縮減磁區后:
1,5,7,11
3,9,13
比之coalesce算子,shuffle操作導致效率更低了,但是資料傾斜好點,開發中避免shuffle操作更好,提升效率,資料傾斜有其他的處理方式 - union
1.作用: 對源RDD和引數RDD求并集后回傳一個新的RDD
2.需求: 創建兩個RDD,求并集
val ListRDD: RDD[Int] = context.makeRDD(1 to 10, 4)
val ListRDD2: RDD[Int] = context.makeRDD(5 to 12)
val unionRDD: RDD[Int] = ListRDD.union(ListRDD2)
unionRDD.collect.foreach(data => print(data + " "))
輸出:1 2 3 4 5 6 7 8 9 10 5 6 7
- subtract
1.作用: 計算差的一種函式,去除兩個RDD中相同的元素,不同的RDD將保留下來
2.需求: 創建兩個RDD,求第一個RDD與第二個RDD的差集
將上述函式union改成subtract
輸出:4 8 1 9 2 10 3
5).cartesian
1.作用: 笛卡爾積(盡量避免使用)
2.創建兩個RDD,計算兩個RDD的笛卡爾積
val ListRDD: RDD[Int] = context.makeRDD(1 to 3, 4)
val ListRDD2: RDD[Int] = context.makeRDD(5 to 7)
val cartesianRDD: RDD[(Int, Int)] = ListRDD.cartesian(ListRDD2)
cartesianRDD.collect.foreach(data => print(data + " "))
輸出:(1,5) (1,6) (1,7) (2,5) (2,6) (2,7) (3,5) (3,6) (3,7)
6) zip
1.作用將兩個RDD組合成Key/Value形式的RDD,這里默認兩個RDD的partition數量以及元素數量都相同,否則會拋出例外.
2.需求:創建兩個RDD,并將兩個RDD組合到一起形成一個(k,v)的RDD
val ListRDD: RDD[Int] = context.makeRDD(1 to 3,2)
val ListRDD2: RDD[Int] = context.makeRDD(5 to 7,2)
val cartesianRDD: RDD[(Int, Int)] = ListRDD.zip(ListRDD2)
cartesianRDD.collect.foreach(data => print(data + " "))
7)partitionBy案例
1.作用:對partitionRDD進行磁區操作,如果原有的partitionRDD和現有的partition是一致的話就不進行磁區,否則會生成ShuffleRDD,即會產生shuffle程序.
2.需求:創建一個4個磁區的RDD,對其重新磁區
val ListRDD: RDD[(Int, String)] = context.makeRDD(Array((1, "asp"), (2, "scala"), (3, "spark"), (4, "Python")), 4)
val HashRDD: RDD[(Int, String)] = ListRDD.partitionBy(new org.apache.spark.HashPartitioner(2))
HashRDD.mapPartitionsWithIndex {
case (nums,datas) => datas.map((_," 磁區:"+nums))
}.collect.foreach(println)
輸出:
((2,scala), 磁區:0)
((4,Python), 磁區:0)
((1,asp), 磁區:1)
((3,spark), 磁區:1)
底層實作:
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
如果獲取到的key是null,則被分到0磁區
其他key呼叫nonNegativeMod函式(x:Int,mod:Int) x指的是當前key的hashCode值,用x 取模 磁區數,這個余數就是新的磁區號
利用這一點我們可以自定義磁區器
8)自定義磁區器
val ListRDD: RDD[(String, Any)] = context.makeRDD(Array((“Python”, “人生苦短”), (“Scala”, 3), (“Spark”, “記憶體計算”), (“Hadoop”, “大資料存盤”), ("", “沒有Key”)), 4)
val MypartitionRDD: RDD[(String, Any)] = ListRDD.partitionBy(new Mypartition(3))
MypartitionRDD.mapPartitionsWithIndex {
case (nums, datas) => datas.map((_, " 磁區:" + nums))
}.collect.foreach(println)
}
}
class Mypartition(partition: Int) extends Partitioner {
override def numPartitions: Int = {
partition
}
override def getPartition(key: Any): Int = key match {
case key if key.toString.contains("a") => 0 //key中包含字符a 的分到0磁區
case key if ("").equals(key.toString) => 1 //key中是null 的分到1磁區
case _ => 2 //其余分到2磁區
}
}
輸出:
((Scala,3), 磁區:0)
((Spark,記憶體計算), 磁區:0)
((Hadoop,大資料存盤), 磁區:0)
((,沒有Key), 磁區:1)
((Python,人生苦短), 磁區:2)
常用代碼以省略,
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/1437.html
標籤:其他
