前言:
配置JDK1.8
實驗環境IDEA
scala版本為2.11.12
本地Window偽分布運行非集群實驗
創建RDD
從記憶體中創建一個RDD有兩種常用的方法,一種是轉化Seq集合為RDD,另一種是從已有RDD轉化為新的RDD,
SparkContext類中有兩個方法:parallelize和makeRDD,
1.parallelize
parallelize有兩個引數可以輸入
(1)要轉化的集合,必須是Seq集合,
(2)磁區數,一般不設磁區數,則默認為該Application分配到的資源的CPU數,
val rdd1 = sc.parallelize(List(1,2,3,4))
2.makeRDD
makeRDD有兩種實作方法:一種跟parallelize完全一致;另一種接收的引數型別是Seq,生產的RDD中保存的是T的值(Seq[T,Seq[String])),
val seq = Seq((1,Seq(1,2)),(2,Seq(2,3,4))) val rdd =sc.makeRDD(seq) rdd.collect().foreach(println(_))
(1,List(1, 2))
(2,List(2, 3, 4))
從外部存盤創建RDD是指直接讀取一個存放在檔案系統的資料檔案創建RDD,第一種創建RDD的方式常用于測驗,這種方式才是用于實踐操作的常用方法,
(1)從HDFS檔案創建RDD
val test = sc.textFile("/user/root/test.txt")
(2)從Linux本地檔案創建
確實差不多,在路徑前面加上file://表示從本地Linux檔案系統讀取,
1.Map轉換資料
map是一種基礎的RDD轉換操作,用于將RDD中的每一個資料元素通過某種函式進行轉換并回傳新的RDD,
例:
val distData = List(1, 3, 45, 3, 76) val sq_dist = distData.map(x => x * x) print(sq_dist)
List(1, 9, 2025, 9, 5776)
2.SortBy()排序
sortBy()是對標準RDD進行排序的方法,在org.apache.spark.rdd.RDD類中實作:
/**
* Return this RDD sorted by the given key function.
*/
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.size)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
this.keyBy[K](f)
.sortByKey(ascending, numPartitions)
.values
- 第一個引數是函式f(x)=>_._._,左邊是要被排序物件中的每一個元素,右邊回傳的值是元素中要進行排序的值,
- 第二個引數是ascending排序順序,決定排序后RDD中的元素是升序還是降序,默認是ture
- 第三個引數是numPartitions,該引數決定排序后的RDD磁區個數,默認排序后的磁區個數和排序之前的個數相等,
例:
val data = List((5,3),(888,666),(777,65)) val sort_data=data.sortBy(x=>x._1) print(sort_data)
List((5,3), (777,65), (888,666))
3.collect()查詢
collect函式是一個行動操作,把RDD所有元素轉換成陣列并回傳到Driver端,適用于小資料處理后的回傳,
sq_data.collect
Array[(Int,Int)] = Array((7,6),(45,3),(1,3))
4.flatMap轉換資料
faltMap的操作是將函式應用于RDD之中的每一個元素,將回傳的迭代器中的所有元素構成新的RDD,
簡單的來講,使用faltmap就是先map然后flat迭代輸出:
val test = List("How are you", "I am fine", "What about you") print(test.flatMap(x => x.split(" ")))
List(How, are, you, I, am, fine, What, about, you)
5.take()查詢指定數目的值
take(N)方法用于獲取RDD的前N個元素,回傳型別為陣列,take與collect的原理相似,collect用于獲取全部資料,take獲取指定個數的資料,
val data = sc.parallelize(1 to 10) data.take(5)
Array[Int] = Array(1,2,3,4,5)
6.union()合并多個RDD
union是一種轉換操作,用于將兩個RDD的元素合并成一個,不進行去重操作,而且兩個RDD中每個元素中的值的個數和型別需要保持一直,
val rdd1 = sc.parallelize(List(('a',1),('b',2),('c',3))) val rdd2 = sc.parallelize(List(('a',1),('d',4),('e',5))) rdd1.union(rdd2).collect
((a,1),(b,2),(c,3),(a,1),(d,4),(e,5))
7.filter()進行過濾
filter是一種轉換操作,用于過濾RDD中的元素,filter需要一個引數,引數是一個用于過濾的函式,該函式的回傳值為Boolean型別,回傳值為true的元素保留,回傳值為false的元素過濾,最后結果是回傳一個存盤符合過濾條件的所有元素的新RDD,
val rdd1 = sc.parallelize(List(('a',1),('b',2),('c',3))) rdd1.filter(_._2>1).collect.foreach(println(_))
(b,2)
(c,3)
8.distinct()進行去重
distinct()是一個轉換操作,用于RDD的資料去重,去除兩個完全相同的元素,沒有引數,
val rdd1 = sc.parallelize(List(('a',1),('b',2),('c',3),('a',1))) rdd1.distinct().collect().foreach(println(_))
(b,2)
(c,3)
(a,1)
9.intersection()求出兩個RDD的共同元素
intersection()方法用于求出兩個RDD的共同元素,也就是找出兩個RDD的交集,引數是另一個RDD,順序先后與結果無關,
val rdd1 = sc.parallelize(List(('a',1),('b',2),('c',3),('a',1))) val rdd2 = sc.parallelize(List(('a',1),('d',4),('e',5))) rdd1.intersection(rdd2).collect().foreach(println(_))
(a,1)
10.subtract()將相同元素去掉
subtract()的引數是一個RDD,用于將前一個RDD中在后一個RDD出現的元素洗掉,可以看作是求補集的操作,回傳值為前一個RDD去除與后一個RDD相同的元素后的剩余值所組成的新的RDD,所以RDD的順序會影響結果,
val rdd1 = sc.parallelize(List(('a',1),('b',2),('c',3))) val rdd2 = sc.parallelize(List(('a',1),('d',4),('e',5),('b',2))) rdd1.subtract(rdd2).collect().foreach(println(_)) rdd2.subtract(rdd1).collect().foreach(println(_))
(c,3)
(e,5)
(d,4)
11.cartesian()求兩個RDD的笛卡爾積
笛卡爾積就是將兩個集合的元素兩兩組合成一組,假設集合A有5個元素,集合B有10個元素,集合A的每個元素都會和集合B的每個元素組合成一組,結果會回傳50個元素組合,
val rdd1 = sc.parallelize(List(1,2,3,4)) val rdd2 = sc.parallelize(List(1,2,3)) rdd1.cartesian(rdd2).collect().foreach(println(_))
(1,1)
(1,2)
(1,3)
(2,1)
(2,2)
(2,3)
(3,1)
(3,2)
(3,3)
(4,1)
(4,2)
(4,3)
鍵值對RDD
鍵值對RDD由一組組的鍵值對組成,這些RDD被稱為PairRDD,PairRDD提供了并行操作各個鍵或跨節點重新進行資料分組的操作介面,
val rdd= sc.parallelize(List("this is a test","hellow world ","come on ")) val words = rdd.map(x=>(x.split(" ")(0),x)); words.collect().foreach(println(_))
(this,this is a test)
(hellow,hellow world )
(come,come on )
轉換操作Keys與Values
作為鍵值對型別的RDD,包含了鍵和值兩部分,Spark提供了兩種方法,分別獲取鍵值對RDD的鍵和值,keys回傳一個僅包含鍵的RDD,values回傳了一個僅包含值的RDD,
val rdd= sc.parallelize(List("this is a test","hellow world ","come on ")) val words = rdd.map(x=>(x.split(" ")(0),x)); val key = words.keys val value = words.values key.collect().foreach(println(_)) value.collect().foreach(println(_))
this
hellow
come
this is a test
hellow world
come on
1.轉換操作reduceByKey()
reduceByKey()的功能是合并具有相同鍵的值,作用域是Key/Value型別的鍵值對,并且是只對每個Key的Value進行處理,當RDD中有許多個鍵相同的鍵值對,那么就會對這個Key的Values進行處理,
val rdd1 = sc.parallelize(List(('a',1),('d',4),('e',5),('b',2),('a',1),('b',2),('c',3))) val r_rdd=rdd1.reduceByKey((a,b)=>a+b) r_rdd.collect().foreach(println(_))
(d,4)
(e,5)
(a,2)
(b,4)
(c,3)
2.轉換操作groupByKey()
groupByKey()是對具有相同鍵的值進行分組,對于一個由型別K的鍵和型別V的值組成的RDD,通過groupByKey()得到的RDD型別是[K,Iterable[V]],
val rdd1 = sc.parallelize(List(('a',1),('a',4),('b',5),('b',2),('a',1),('b',2),('c',3))) val r_rdd=rdd1.groupByKey() r_rdd.collect().foreach(println(_)) r_rdd.map(x=>(x._1,x._2.size)).collect().foreach(println(_)) //size()用于在指定的映射中查找鍵/值對的數量,
(a,CompactBuffer(1, 4, 1))
(b,CompactBuffer(5, 2, 2))
(c,CompactBuffer(3))
(a,3)
(b,3)
(c,1)
3.join()連接兩個RDD
連接方式(對于學過資料庫SQL的人來說比較容易理解):
| 連接型別 | 描述 |
| join | 對兩個RDD進行內連接 |
| rightOuterJoin | 對兩個RDD進行連接操作,確保第二個RDD的鍵必須存在(右外連接) |
| leftOuterJoin | 對兩個RDD進行連接操作,確保第一個RDD的鍵必須存在(左外連接) |
| fullOuterJoin | 對兩個RDD進行全外連接 |
(1)join
join是根據鍵對兩個RDD進行內連接,將兩個RDD中鍵相同的資料的值存在一個元組中,最后只回傳兩個RDD都存在的鍵的連接結果,
val rdd1 = sc.parallelize(List(('a',1),('b',2),('c',3))) val rdd2 = sc.parallelize(List(('a',1),('d',4),('e',5))) val j_rdd = rdd1.join(rdd2) j_rdd.collect().foreach(println(_))
(a,(1,1))
(2)rightOuterJoin
rightOuterJoin是根據鍵對兩個RDD進行右外連接,連接結果回傳第二個RDD的所有鍵的連接結果,不管在第一個RDD中是否存在,
val rdd1 = sc.parallelize(List(('a',1),('b',2),('c',3))) val rdd2 = sc.parallelize(List(('a',1),('d',4),('e',5))) val r_rdd = rdd1 rightOuterJoin rdd2 r_rdd.collect().foreach(println(_))
(d,(None,4))
(e,(None,5))
(a,(Some(1),1))
(3)leftOuterJoin
leftOuterJoin是對兩個RDD的鍵進行左外連接的方法,與rightOuterJoin相反,回傳結果保留第一個RDD的所有鍵,
val rdd1 = sc.parallelize(List(('a',1),('b',2),('c',3))) val rdd2 = sc.parallelize(List(('a',1),('d',4),('e',5))) val l_rdd = rdd1 leftOuterJoin rdd2 l_rdd.collect().foreach(println(_))
(a,(1,Some(1)))
(b,(2,None))
(c,(3,None))
(4)fullOuterJoin
fullOuterJoin是一種全外連接,會保留兩個連接的RDD中所有鍵的連接結果,
val rdd1 = sc.parallelize(List(('a',1),('b',2),('c',3))) val rdd2 = sc.parallelize(List(('a',1),('d',4),('e',5))) val f_rdd = rdd1 fullOuterJoin rdd2 f_rdd.collect().foreach(println(_))
(d,(None,Some(4)))
(e,(None,Some(5)))
(a,(Some(1),Some(1)))
(b,(Some(2),None))
(c,(Some(3),None))
4.zip組合兩個RDD
zip函式用于將兩個RDD組合成Key/Value形式的RDD,這里要求兩個RDD的partition數量以及元素數量都相同,否則會拋出例外,
val rdd1 = sc.parallelize(List(1,2,3,4,5)) val rdd2 = sc.parallelize(List('a','c','e','d','w')) rdd1.zip(rdd2).collect().foreach(println(_)) rdd2.zip(rdd1).collect().foreach(println(_))
(1,a)
(2,c)
(3,e)
(4,d)
(5,w)
(a,1)
(c,2)
(e,3)
(d,4)
(w,5)
5.combineByKey合并相同鍵的值
combineByKey是Spark中一個比較核心的高級函式,其他一些高階鍵值對函式底層都是用它來實作的,
combineByKey用于將相同鍵的資料聚合,并且允許回傳型別與輸入資料型別不同的回傳值,combineByKey函式的定義為:
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
/*content*/
}
以上三個重要的引數:
(1)createCombiner:V=>C,V是鍵值對RDD中的值部分,將該值轉換為另一種型別C,C會作為每一個鍵的累加器的初始值,
(2)mergeValue: (C, V) => C,該函式把元素V合并到之前的元素C(createCombiner)上(這個操作在每個磁區進行),
(3)mergeCombiners:(C, C)=>C,該函式把兩個元素C合并(這個操作在不同磁區間進行),
由于聚合操作會遍歷磁區中所有的元素,因此每個元素的鍵只有兩種情況:以前沒出現過或以前出現過,
(1)如果以前沒出現過,則執行的是createCombiner方法,createCombiner()會在新遇到的鍵對應的累加器中賦予初始值,否則執行mergeValue方法,
(2)對于已經出現過的鍵,呼叫mergeValue來進行聚合操作,對該鍵的累加器對應的當前值(C個數)與新值(V格式)進行合并,
(3)由于每個磁區都是獨立處理的,因此對于同一個鍵可以有多個累加器,如果有兩個或者更多的磁區都有對應同一個鍵的累加器,就需要使用用戶提供的mergeCombiners()方法將各個磁區的結果進行合并,
本文主要參考Spark大資料技術與運用一書,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/274182.html
標籤:其他
