主頁 >  其他 > Spark RDD資料操作函式以及轉換函式一文詳解運用與方法

Spark RDD資料操作函式以及轉換函式一文詳解運用與方法

2021-04-09 11:58:23 其他

前言:

配置JDK1.8

實驗環境IDEA

scala版本為2.11.12

本地Window偽分布運行非集群實驗

創建RDD

從記憶體中創建一個RDD有兩種常用的方法,一種是轉化Seq集合為RDD,另一種是從已有RDD轉化為新的RDD,

SparkContext類中有兩個方法:parallelize和makeRDD,

1.parallelize

parallelize有兩個引數可以輸入

(1)要轉化的集合,必須是Seq集合,

(2)磁區數,一般不設磁區數,則默認為該Application分配到的資源的CPU數,

val rdd1 = sc.parallelize(List(1,2,3,4))

2.makeRDD

makeRDD有兩種實作方法:一種跟parallelize完全一致;另一種接收的引數型別是Seq,生產的RDD中保存的是T的值(Seq[T,Seq[String])),

val seq = Seq((1,Seq(1,2)),(2,Seq(2,3,4)))
val rdd =sc.makeRDD(seq)
rdd.collect().foreach(println(_))                     

(1,List(1, 2))
(2,List(2, 3, 4))

從外部存盤創建RDD是指直接讀取一個存放在檔案系統的資料檔案創建RDD,第一種創建RDD的方式常用于測驗,這種方式才是用于實踐操作的常用方法,

(1)從HDFS檔案創建RDD

val test = sc.textFile("/user/root/test.txt")

(2)從Linux本地檔案創建

確實差不多,在路徑前面加上file://表示從本地Linux檔案系統讀取,

1.Map轉換資料

map是一種基礎的RDD轉換操作,用于將RDD中的每一個資料元素通過某種函式進行轉換并回傳新的RDD,

例:

val distData = List(1, 3, 45, 3, 76)
val sq_dist = distData.map(x => x * x)
print(sq_dist)

List(1, 9, 2025, 9, 5776)

2.SortBy()排序

sortBy()是對標準RDD進行排序的方法,在org.apache.spark.rdd.RDD類中實作:

/**

* Return this RDD sorted by the given key function.

*/

def sortBy[K](

f: (T) => K,

ascending: Boolean = true,

numPartitions: Int = this.partitions.size)

(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =

this.keyBy[K](f)

.sortByKey(ascending, numPartitions)

.values

  • 第一個引數是函式f(x)=>_._._,左邊是要被排序物件中的每一個元素,右邊回傳的值是元素中要進行排序的值,
  • 第二個引數是ascending排序順序,決定排序后RDD中的元素是升序還是降序,默認是ture
  • 第三個引數是numPartitions,該引數決定排序后的RDD磁區個數,默認排序后的磁區個數和排序之前的個數相等,

例:

val data = List((5,3),(888,666),(777,65))
val sort_data=data.sortBy(x=>x._1)
print(sort_data)

List((5,3), (777,65), (888,666))

3.collect()查詢

collect函式是一個行動操作,把RDD所有元素轉換成陣列并回傳到Driver端,適用于小資料處理后的回傳,

sq_data.collect

Array[(Int,Int)] = Array((7,6),(45,3),(1,3))

4.flatMap轉換資料

faltMap的操作是將函式應用于RDD之中的每一個元素,將回傳的迭代器中的所有元素構成新的RDD,

簡單的來講,使用faltmap就是先map然后flat迭代輸出:

val test = List("How are you", "I am fine", "What about you")
print(test.flatMap(x => x.split(" ")))

List(How, are, you, I, am, fine, What, about, you)

5.take()查詢指定數目的值

take(N)方法用于獲取RDD的前N個元素,回傳型別為陣列,take與collect的原理相似,collect用于獲取全部資料,take獲取指定個數的資料,

val data = sc.parallelize(1 to 10)
data.take(5)

Array[Int] = Array(1,2,3,4,5)

6.union()合并多個RDD

union是一種轉換操作,用于將兩個RDD的元素合并成一個,不進行去重操作,而且兩個RDD中每個元素中的值的個數和型別需要保持一直,

val rdd1 = sc.parallelize(List(('a',1),('b',2),('c',3)))
val rdd2 = sc.parallelize(List(('a',1),('d',4),('e',5)))
rdd1.union(rdd2).collect

((a,1),(b,2),(c,3),(a,1),(d,4),(e,5))

7.filter()進行過濾

filter是一種轉換操作,用于過濾RDD中的元素,filter需要一個引數,引數是一個用于過濾的函式,該函式的回傳值為Boolean型別,回傳值為true的元素保留,回傳值為false的元素過濾,最后結果是回傳一個存盤符合過濾條件的所有元素的新RDD,

val rdd1 = sc.parallelize(List(('a',1),('b',2),('c',3)))
rdd1.filter(_._2>1).collect.foreach(println(_))

(b,2)
(c,3)

8.distinct()進行去重

distinct()是一個轉換操作,用于RDD的資料去重,去除兩個完全相同的元素,沒有引數,

val rdd1 = sc.parallelize(List(('a',1),('b',2),('c',3),('a',1)))
rdd1.distinct().collect().foreach(println(_))

(b,2)
(c,3)
(a,1)

9.intersection()求出兩個RDD的共同元素

intersection()方法用于求出兩個RDD的共同元素,也就是找出兩個RDD的交集,引數是另一個RDD,順序先后與結果無關,

val rdd1 = sc.parallelize(List(('a',1),('b',2),('c',3),('a',1)))
val rdd2 = sc.parallelize(List(('a',1),('d',4),('e',5)))
rdd1.intersection(rdd2).collect().foreach(println(_))

(a,1)

10.subtract()將相同元素去掉

subtract()的引數是一個RDD,用于將前一個RDD中在后一個RDD出現的元素洗掉,可以看作是求補集的操作,回傳值為前一個RDD去除與后一個RDD相同的元素后的剩余值所組成的新的RDD,所以RDD的順序會影響結果,

val rdd1 = sc.parallelize(List(('a',1),('b',2),('c',3)))
val rdd2 = sc.parallelize(List(('a',1),('d',4),('e',5),('b',2)))
rdd1.subtract(rdd2).collect().foreach(println(_))
rdd2.subtract(rdd1).collect().foreach(println(_))

(c,3)
(e,5)
(d,4)

11.cartesian()求兩個RDD的笛卡爾積

笛卡爾積就是將兩個集合的元素兩兩組合成一組,假設集合A有5個元素,集合B有10個元素,集合A的每個元素都會和集合B的每個元素組合成一組,結果會回傳50個元素組合,

val rdd1 = sc.parallelize(List(1,2,3,4))
val rdd2 = sc.parallelize(List(1,2,3))
rdd1.cartesian(rdd2).collect().foreach(println(_))

(1,1)
(1,2)
(1,3)
(2,1)
(2,2)
(2,3)
(3,1)
(3,2)
(3,3)
(4,1)
(4,2)
(4,3)

鍵值對RDD

鍵值對RDD由一組組的鍵值對組成,這些RDD被稱為PairRDD,PairRDD提供了并行操作各個鍵或跨節點重新進行資料分組的操作介面,

val rdd= sc.parallelize(List("this is a test","hellow world ","come on "))
val words = rdd.map(x=>(x.split(" ")(0),x));
words.collect().foreach(println(_))

(this,this is a test)
(hellow,hellow world )
(come,come on )

轉換操作Keys與Values

作為鍵值對型別的RDD,包含了鍵和值兩部分,Spark提供了兩種方法,分別獲取鍵值對RDD的鍵和值,keys回傳一個僅包含鍵的RDD,values回傳了一個僅包含值的RDD,

val rdd= sc.parallelize(List("this is a test","hellow world ","come on "))
val words = rdd.map(x=>(x.split(" ")(0),x));
val key = words.keys
val value = words.values
key.collect().foreach(println(_))
value.collect().foreach(println(_))

this
hellow
come
this is a test
hellow world
come on

1.轉換操作reduceByKey()

reduceByKey()的功能是合并具有相同鍵的值,作用域是Key/Value型別的鍵值對,并且是只對每個Key的Value進行處理,當RDD中有許多個鍵相同的鍵值對,那么就會對這個Key的Values進行處理,

val rdd1 = sc.parallelize(List(('a',1),('d',4),('e',5),('b',2),('a',1),('b',2),('c',3)))
val r_rdd=rdd1.reduceByKey((a,b)=>a+b)
r_rdd.collect().foreach(println(_))

(d,4)
(e,5)
(a,2)
(b,4)
(c,3)

2.轉換操作groupByKey()

groupByKey()是對具有相同鍵的值進行分組,對于一個由型別K的鍵和型別V的值組成的RDD,通過groupByKey()得到的RDD型別是[K,Iterable[V]],

val rdd1 = sc.parallelize(List(('a',1),('a',4),('b',5),('b',2),('a',1),('b',2),('c',3)))
val r_rdd=rdd1.groupByKey()
r_rdd.collect().foreach(println(_))
r_rdd.map(x=>(x._1,x._2.size)).collect().foreach(println(_))
//size()用于在指定的映射中查找鍵/值對的數量,

(a,CompactBuffer(1, 4, 1))
(b,CompactBuffer(5, 2, 2))
(c,CompactBuffer(3))
(a,3)
(b,3)
(c,1)

3.join()連接兩個RDD

連接方式(對于學過資料庫SQL的人來說比較容易理解):

連接型別描述
join對兩個RDD進行內連接
rightOuterJoin對兩個RDD進行連接操作,確保第二個RDD的鍵必須存在(右外連接)
leftOuterJoin對兩個RDD進行連接操作,確保第一個RDD的鍵必須存在(左外連接)
fullOuterJoin對兩個RDD進行全外連接

(1)join

join是根據鍵對兩個RDD進行內連接,將兩個RDD中鍵相同的資料的值存在一個元組中,最后只回傳兩個RDD都存在的鍵的連接結果,

val rdd1 = sc.parallelize(List(('a',1),('b',2),('c',3)))
val rdd2 = sc.parallelize(List(('a',1),('d',4),('e',5)))
val j_rdd = rdd1.join(rdd2)
j_rdd.collect().foreach(println(_))

(a,(1,1))

(2)rightOuterJoin

rightOuterJoin是根據鍵對兩個RDD進行右外連接,連接結果回傳第二個RDD的所有鍵的連接結果,不管在第一個RDD中是否存在,

val rdd1 = sc.parallelize(List(('a',1),('b',2),('c',3)))
val rdd2 = sc.parallelize(List(('a',1),('d',4),('e',5)))
val r_rdd = rdd1 rightOuterJoin rdd2
r_rdd.collect().foreach(println(_))

(d,(None,4))
(e,(None,5))
(a,(Some(1),1))

(3)leftOuterJoin

leftOuterJoin是對兩個RDD的鍵進行左外連接的方法,與rightOuterJoin相反,回傳結果保留第一個RDD的所有鍵,

val rdd1 = sc.parallelize(List(('a',1),('b',2),('c',3)))
val rdd2 = sc.parallelize(List(('a',1),('d',4),('e',5)))
val l_rdd = rdd1 leftOuterJoin rdd2
l_rdd.collect().foreach(println(_))

(a,(1,Some(1)))
(b,(2,None))
(c,(3,None))

(4)fullOuterJoin

fullOuterJoin是一種全外連接,會保留兩個連接的RDD中所有鍵的連接結果,

val rdd1 = sc.parallelize(List(('a',1),('b',2),('c',3)))
val rdd2 = sc.parallelize(List(('a',1),('d',4),('e',5)))
val f_rdd = rdd1 fullOuterJoin rdd2
f_rdd.collect().foreach(println(_))

(d,(None,Some(4)))
(e,(None,Some(5)))
(a,(Some(1),Some(1)))
(b,(Some(2),None))
(c,(Some(3),None))

4.zip組合兩個RDD

zip函式用于將兩個RDD組合成Key/Value形式的RDD,這里要求兩個RDD的partition數量以及元素數量都相同,否則會拋出例外,

val rdd1 = sc.parallelize(List(1,2,3,4,5))
val rdd2 = sc.parallelize(List('a','c','e','d','w'))
rdd1.zip(rdd2).collect().foreach(println(_))
rdd2.zip(rdd1).collect().foreach(println(_))

(1,a)
(2,c)
(3,e)
(4,d)
(5,w)
(a,1)
(c,2)
(e,3)
(d,4)
(w,5)

5.combineByKey合并相同鍵的值

combineByKey是Spark中一個比較核心的高級函式,其他一些高階鍵值對函式底層都是用它來實作的,

combineByKey用于將相同鍵的資料聚合,并且允許回傳型別與輸入資料型別不同的回傳值,combineByKey函式的定義為:

def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
/*content*/
}

以上三個重要的引數:

(1)createCombiner:V=>C,V是鍵值對RDD中的值部分,將該值轉換為另一種型別C,C會作為每一個鍵的累加器的初始值,

(2)mergeValue: (C, V) => C,該函式把元素V合并到之前的元素C(createCombiner)上(這個操作在每個磁區進行),

(3)mergeCombiners:(C, C)=>C,該函式把兩個元素C合并(這個操作在不同磁區間進行),

由于聚合操作會遍歷磁區中所有的元素,因此每個元素的鍵只有兩種情況:以前沒出現過或以前出現過,

(1)如果以前沒出現過,則執行的是createCombiner方法,createCombiner()會在新遇到的鍵對應的累加器中賦予初始值,否則執行mergeValue方法,

(2)對于已經出現過的鍵,呼叫mergeValue來進行聚合操作,對該鍵的累加器對應的當前值(C個數)與新值(V格式)進行合并,

(3)由于每個磁區都是獨立處理的,因此對于同一個鍵可以有多個累加器,如果有兩個或者更多的磁區都有對應同一個鍵的累加器,就需要使用用戶提供的mergeCombiners()方法將各個磁區的結果進行合并,


本文主要參考Spark大資料技術與運用一書,

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/274182.html

標籤:其他

上一篇:每天自學Python兩小時,三個月后月入25K

下一篇:我寫了一個腳本,可在“任意”服務器上執行命令!

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more