目錄
- 一、Spark SQL概述
- 二、SparkSQL版本
- 1)SparkSQL的演變之路
- 2)shark與SparkSQL對比
- 3)SparkSession
- 三、RDD、DataFrames和DataSet
- 1)三者關聯關系
- 1)RDD
- 1、核心概念
- 2、RDD簡單操作
- 3、RDD API
- 1)Transformation
- 2)Action
- 4、實戰操作
- 2)DataFrames
- 1、DSL風格語法操作
- 1)DataFrame創建
- 2、SQL風格語法操作
- 1、DSL風格語法操作
- 3)DataSet
- RDD,DataFrame,DataSet互相轉化
- 四、RDD、DataFrame和DataSet的共性與區別
- 1)共性
- 2)區別
- 五、spark-shell
- 1)local
- 2)on Yarn(推薦)
- 六、SparkSQL和Hive的集成(Spark on Hive)
- 1)創建軟鏈接
- 2)復制 hive lib目錄 下的mysql連接jar包到spark的jars下
- 3)配置
- 4)啟動 spark-shell操作Hive(local)
- 七、Spark beeline
- 1)Spark Thrift Server架構于HiveServer2架構對比
- 2)Spark Thrift Server和HiveServer2的區別
- 3)配置啟動Spark Thrift Server
- 八、Spark Streaming
一、Spark SQL概述
Spark SQL是Spark用來處理結構化資料的一個模塊,它提供了兩個編程抽象叫做DataFrame和DataSet并且作為分布式SQL查詢引擎的作用,其實也是對RDD的再封裝,大資料Hadoop之——計算引擎Spark,官方檔案:https://spark.apache.org/sql/
二、SparkSQL版本
1)SparkSQL的演變之路

-
1.0以前: Shark(入口:SQLContext和HiveContext)
- SQLContext:主要DataFrame的構建以及DataFrame的執行,SQLContext指的是spark中SQL模塊的程式入口,
- HiveContext:是SQLContext的子類,專門用于與Hive的集成,比如讀取Hive的元資料,資料存盤到Hive表、Hive的視窗分析函式等,
-
1.1.x開始:SparkSQL(只是測驗性的)
-
1.3.x: SparkSQL(正式版本)+Dataframe
-
1.5.x: SparkSQL 鎢絲計劃
-
1.6.x: SparkSQL+DataFrame+DataSet(測驗版本)
-
2.x:
- 入口:SparkSession(spark應用程式的一個整體入口),合并了SQLContext和HiveContext
- SparkSQL+DataFrame+DataSet(正式版本)
- Spark Streaming-》Structured Streaming(DataSet)
2)shark與SparkSQL對比
- shark
- 執行計劃優化完全依賴于Hive,不方便添加新的優化策略;
- Spark是執行緒級并行,而MapReduce是行程級并行,
- Spark在兼容Hive的實作上存在執行緒安全問題,導致Shark
不得不使用另外一套獨立維護的打了補丁的Hive原始碼分支;
- Spark SQL
- 作為Spark生態的一員繼續發展,而不再受限于Hive,
- 只是兼容Hive;Hive on Spark作為Hive的底層引擎之一
- Hive可以采用Map-Reduce、Tez、Spark等引擎
3)SparkSession
- SparkSession是Spark 2.0引如的新概念,SparkSession為用戶提供了統一的切入點,來讓用戶學習spark的各項功能,
- 在spark的早期版本中,SparkContext是spark的主要切入點,由于RDD是主要的API,我們通過sparkcontext來創建和操作RDD,對于每個其他的API,我們需要使用不同的context,
【例如】對于Streming,我們需要使用StreamingContext;對于sql,使用sqlContext;對于Hive,使用hiveContext,但是隨著DataSet和DataFrame的API逐漸成為標準的API,就需要為他們建立接入點,所以在spark2.0中,引入SparkSession作為DataSet和DataFrame API的切入點,SparkSession封裝了SparkConf、SparkContext和SQLContext,為了向后兼容,SQLContext和HiveContext也被保存下來,
- SparkSession實質上是SQLContext和HiveContext的組合(未來可能還會加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的,SparkSession內部封裝了sparkContext,所以計算實際上是由sparkContext完成的,在spark 2.x中不推薦使用SparkContext物件讀取資料,而是推薦SparkSession,
三、RDD、DataFrames和DataSet
1)三者關聯關系
DataFrame 和 DataSet 是 Spark SQL 提供的基于 RDD 的結構化資料抽象,它既有 RDD 不可變、磁區、存盤依賴關系等特性,又擁有類似于關系型資料庫的結構化資訊,所以,基于 DataFrame 和 DataSet API 開發出的程式會被自動優化,使得開發人員不需要操作底層的 RDD API 來進行手動優化,大大提升開發效率,但是 RDD API 對于非結構化的資料處理有獨特的優勢,比如文本流資料,而且更方便我們做底層的操作,


1)RDD
RDD(Resilient Distributed Dataset)叫做彈性分布式資料集,是Spark中最基本的資料抽象,它代表一個不可變、可磁區、里面的元素可并行計算的集合,RDD具有資料流模型的特點:自動容錯、位置感知性調度和可伸縮性,RDD允許用戶在執行多個查詢時顯式地將作業集快取在記憶體中,后續的查詢能夠重用作業集,這極大地提升了查詢速度,
1、核心概念
-
一組分片(Partition):即資料集的基本組成單位,對于RDD來說,每個分片都會被一個計算任務處理,并決定并行計算的粒度,用戶可以在創建RDD時指定RDD的分片個數,如果沒有指定,那么就會采用默認值,默認值就是程式所分配到的CPU Core的數目,
-
一個計算每個磁區的函式,Spark中RDD的計算是以分片為單位的,每個RDD都會實作compute函式以達到這個目的,compute函式會對迭代器進行復合,不需要保存每次計算的結果,
-
RDD之間的依賴關系:RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似于流水線一樣的前后依賴關系,在部分磁區資料丟失時,Spark可以通過這個依賴關系重新計算丟失的磁區資料,而不是對RDD的所有磁區進行重新計算,
-
一個Partitioner:即RDD的分片函式,當前Spark中實作了兩種型別的分片函式,一個是基于哈希的HashPartitioner,另外一個是基于范圍的RangePartitioner,只有對于于key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None,Partitioner函式不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量,
-
一個串列:存盤存取每個Partition的優先位置(preferred location),對于一個HDFS檔案來說,這個串列保存的就是每個Partition所在的塊的位置,按照“移動資料不如移動計算”的理念,Spark在進行任務調度的時候,會盡可能地將計算任務分配到其所要處理資料塊的存盤位置,
2、RDD簡單操作
啟動spark-shell,其實spark-shell低層也是呼叫spark-submit,首先需要配置好,當然也可以寫在命令列,但是不推薦,配置如下,僅供參考(這里使用yarn模式):
$ cat spark-defaults.conf

啟動spark-shell(下面會詳解講解)
$ spark-shell

【問題】發現有個WARN:WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
【原因】是因為Spark提交任務到yarn集群,需要上傳相關spark的jar包到HDFS,
【解決】 提前上傳到HDFS集群,并且在Spark組態檔指定檔案路徑,就可以避免每次提交任務到Yarn都需要重復上傳檔案,下面是解決的具體操作步驟:
### 打包jars,jar相關的引數說明
#-c 創建一個jar包
# -t 顯示jar中的內容串列
#-x 解壓jar包
#-u 添加檔案到jar包中
#-f 指定jar包的檔案名
#-v 生成詳細的報造,并輸出至標準設備
#-m 指定manifest.mf檔案.(manifest.mf檔案中可以對jar包及其中的內容作一些一設定)
#-0 產生jar包時不對其中的內容進行壓縮處理
#-M 不產生所有檔案的清單檔案(Manifest.mf),這個引數與忽略掉-m引數的設定
#-i 為指定的jar檔案創建索引檔案
#-C 表示轉到相應的目錄下執行jar命令,相當于cd到那個目錄,然后不帶-C執行jar命令
$ cd /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2
$ jar cv0f spark-libs.jar -C ./jars/ .
$ ll
### 在hdfs上創建存放jar包目錄
$ hdfs dfs -mkdir -p /spark/jars
## 上傳jars到HDFS
$ hdfs dfs -put spark-libs.jar /spark/jars/
## 增加配置spark-defaults.conf
spark.yarn.archive=hdfs:///spark/jars/spark-libs.jar
然后再啟動spark-shell
在Spark Shell中,有一個專有的SparkContext已經為您創建好了,變數名叫做sc,自己創建的SparkContext將無法作業,
$ spark-shell

### 由一個已經存在的Scala集合創建,
val array = Array(1,2,3,4,5)
# spark使用parallelize方法創建RDD
val rdd = sc.parallelize(array)

這里只是簡單的創建RDD操作,后面會有更多RDD相關的演示操作,
3、RDD API
Spark支持兩個型別(算子)操作:Transformation和Action
1)Transformation
主要做的是就是將一個已有的RDD生成另外一個RDD,Transformation具有lazy特性(延遲加載),Transformation算子的代碼不會真正被執行,只有當我們的程式里面遇到一個action算子的時候,代碼才會真正的被執行,這種設計讓Spark更加有效率地運行,
常用的Transformation:
| 轉換 | 含義 |
|---|---|
| map(func) | 回傳一個新的RDD,該RDD由每一個輸入元素經過func函式轉換后組成 |
| filter(func) | 回傳一個新的RDD,該RDD由經過func函式計算后回傳值為true的輸入元素組成 |
| flatMap(func) | 類似于map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該回傳一個序列,而不是單一元素) |
| mapPartitions(func) | 類似于map,但獨立地在RDD的每一個分片上運行,因此在型別為T的RDD上運行時,func的函式型別必須是Iterator[T] => Iterator[U] |
| mapPartitionsWithIndex(func) | 類似于mapPartitions,但func帶有一個整數引數表示分片的索引值,因此在型別為T的RDD上運行時,func的函式型別必須是(Int, Interator[T]) => Iterator[U] |
| sample(withReplacement, fraction, seed) | 根據fraction指定的比例對資料進行采樣,可以選擇是否使用亂數進行替換,seed用于指定亂數生成器種子 |
| union(otherDataset) | 對源RDD和引數RDD求并集后回傳一個新的RDD |
| intersection(otherDataset) | 對源RDD和引數RDD求交集后回傳一個新的RDD |
| distinct([numTasks])) | 對源RDD進行去重后回傳一個新的RDD |
| groupByKey([numTasks]) | 在一個(K,V)的RDD上呼叫,回傳一個(K, Iterator[V])的RDD |
| reduceByKey(func, [numTasks]) | 在一個(K,V)的RDD上呼叫,回傳一個(K,V)的RDD,使用指定的reduce函式,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的引數來設定 |
| aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 先按磁區聚合 再總的聚合 每次要跟初始值交流 例如:aggregateByKey(0)(+,+) 對k/y的RDD進行操作 |
| sortByKey([ascending], [numTasks]) | 在一個(K,V)的RDD上呼叫,K必須實作Ordered介面,回傳一個按照key進行排序的(K,V)的RDD |
| sortBy(func,[ascending], [numTasks]) | 與sortByKey類似,但是更靈活 第一個引數是根據什么排序 第二個是怎么排序 false倒序 第三個排序后磁區數 默認與原RDD一樣 |
| join(otherDataset, [numTasks]) | 在型別為(K,V)和(K,W)的RDD上呼叫,回傳一個相同key對應的所有元素對在一起的(K,(V,W))的RDD 相當于內連接(求交集) |
| cogroup(otherDataset, [numTasks]) | 在型別為(K,V)和(K,W)的RDD上呼叫,回傳一個(K,(Iterable |
| cartesian(otherDataset) | 兩個RDD的笛卡爾積 的成很多個K/V |
| pipe(command, [envVars]) | 呼叫外部程式 |
| coalesce(numPartitions) | 重新磁區 第一個引數是要分多少區,第二個引數是否shuffle 默認false 少磁區變多磁區 true 多磁區變少磁區 false |
| repartition(numPartitions) | |
| 重新磁區 必須shuffle 引數是要分多少區 少變多 | |
| repartitionAndSortWithinPartitions(partitioner) | 重新磁區+排序 比先磁區再排序效率高 對K/V的RDD進行操作 |
| foldByKey(zeroValue)(seqOp) | 該函式用于K/V做折疊,合并處理 ,與aggregate類似 第一個括號的引數應用于每個V值 第二括號函式是聚合例如:+ |
| combineByKey | 合并相同的key的值 rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) |
| partitionBy(partitioner) | 對RDD進行磁區 partitioner是磁區器 例如new HashPartition(2) |
| cache/persist | RDD快取,可以避免重復計算從而減少時間,區別:cache內部呼叫了persist算子,cache默認就一個快取級別MEMORY-ONLY ,而persist則可以選擇快取級別 |
| Subtract(rdd) | 回傳前rdd元素不在后rdd的rdd |
| leftOuterJoin | leftOuterJoin類似于SQL中的左外關聯left outer join,回傳結果以前面的RDD為主,關聯不上的記錄為空,只能用于兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可, |
| rightOuterJoin | rightOuterJoin類似于SQL中的有外關聯right outer join,回傳結果以引數中的RDD為主,關聯不上的記錄為空,只能用于兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可 |
| subtractByKey | substractByKey和基本轉換操作中的subtract類似只不過這里是針對K的,回傳在主RDD中出現,并且不在otherRDD中出現的元素 |
2)Action
觸發代碼的運行,我們一段spark代碼里面至少需要有一個action操作,
常用的Action:
| 動作 | 含義 |
|---|---|
| reduce(func) | 通過func函式聚集RDD中的所有元素,這個功能必須是課交換且可并聯的 |
| collect() | 在驅動程式中,以陣列的形式回傳資料集的所有元素 |
| count() | 回傳RDD的元素個數 |
| first() | 回傳RDD的第一個元素(類似于take(1)) |
| take(n) | 回傳一個由資料集的前n個元素組成的陣列 |
| takeSample(withReplacement,num, [seed]) | 回傳一個陣列,該陣列由從資料集中隨機采樣的num個元素組成,可以選擇是否用亂數替換不足的部分,seed用于指定亂數生成器種子 |
| takeOrdered(n, [ordering]) | 回傳原RDD排序(默認升序排)后,前n個元素組成的陣列 |
| saveAsTextFile(path) | 將資料集的元素以textfile的形式保存到HDFS檔案系統或者其他支持的檔案系統,對于每個元素,Spark將會呼叫toString方法,將它裝換為檔案中的文本 |
| saveAsSequenceFile(path) | 將資料集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的檔案系統, |
| saveAsObjectFile(path) | saveAsObjectFile用于將RDD中的元素序列化成物件,存盤到檔案中,使用方法和saveAsTextFile類似 |
| countByKey() | 針對(K,V)型別的RDD,回傳一個(K,Int)的map,表示每一個key對應的元素個數, |
| foreach(func) | 在資料集的每一個元素上,運行函式func進行更新, |
| aggregate | 先對磁區進行操作,在總體操作 |
| reduceByKeyLocally | 回傳一個 dict 物件,同樣是將同 key 的元素進行聚合 |
| lookup | lookup用于(K,V)型別的RDD,指定K值,回傳RDD中該K對應的所有V值, |
| top | top函式用于從RDD中,按照默認(降序)或者指定的排序規則,回傳前num個元素, |
| fold | fold是aggregate的簡化,將aggregate中的seqOp和combOp使用同一個函式op, |
| foreachPartition | 遍歷原RDD元素經過func函式運算過后的結果集,foreachPartition算子磁區操作 |
4、實戰操作
1、針對各個元素的轉化操作
我們最常用的轉化操作應該是map() 和filter(),轉化操作map() 接收一個函式,把這個函式用于RDD 中的每個元素,將函式的回傳結果作為結果RDD 中對應元素的值,而轉化操作filter() 則接收一個函式,并將RDD 中滿足該函式的元素放入新的RDD 中回傳,
讓我們看一個簡單的例子,用map() 對RDD 中的所有數求平方
# 通過parallelize創建RDD物件
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x * x)
println(result.collect().mkString(","))

2、對一個資料為{1,2,3,3}的RDD進行基本的RDD轉化操作(去重)
var rdd = sc.parallelize(List(1,2,3,3))
rdd.distinct().collect().mkString(",")

3、對資料分別為{1,2,3}和{3,4,5}的RDD進行針對兩個RDD的轉化操作
var rdd = sc.parallelize(List(1,2,3))
var other = sc.parallelize(List(3,4,5))
# 生成一個包含兩個RDD中所有元素的RDD
rdd.union(other).collect().mkString(",")
# 求兩個RDD共同的元素RDD
rdd.intersection(other).collect().mkString(",")

4、行動操作
行動操作reduce(),它接收一個函式作為引數,這個函式要操作兩個RDD 的元素型別的資料并回傳一個同樣型別的新元素,一個簡單的例子就是函式+,可以用它來對我們的RDD 進行累加,使用reduce(),可以很方便地計算出RDD中所有元素的總和、元素的個數,以及其他型別的聚合操作,
var rdd = sc.parallelize(List(1,2,3,4,5,6,7))
# 求和
var sum = rdd.reduce((x, y) => x + y)
# 求元素個數
var sum = rdd.count()
# 聚合操作
var rdd = sc.parallelize(List(1,2,3,4,5,6,7))
var result = rdd.aggregate((0,0))((acc,value) => (acc._1 + value,acc._2 + 1),(acc1,acc2) => (acc1._1 + acc2._1 , acc1._2 + acc2._2))
var avg = result._1/result._2.toDouble

這里只是演示幾個簡單的示例,更多RDD的操作,可以參考官方檔案學習哦,
2)DataFrames
在Spark中,DataFrame提供了一個領域特定語言(DSL)和SQL來操作結構化資料,DataFrame是一種以RDD為基礎的分布式資料集,類似于傳統資料庫中的二維表格,

- RDD,由于無從得知所存資料元素的具體內部結構,Spark Core只能在stage層面進行簡單、通用的流水線優化,
- DataFrame底層是以RDD為基礎的分布式資料集,和RDD的主要區別的是:RDD中沒有schema資訊,而DataFrame中資料每一行都包含schema,DataFrame = RDD + shcema
1、DSL風格語法操作
1)DataFrame創建
創建DataFrame的兩種基本方式:
- 已存在的RDD呼叫toDF()方法轉換得到DataFrame,
- 通過Spark讀取資料源直接創建DataFrame,
直接創建DataFarme物件
若使用SparkSession方式創建DataFrame,可以使用spark.read從不同型別的檔案中加載資料創建DataFrame,spark.read的具體操作,如下所示,
| 方法名 | 描述 |
|---|---|
| spark.read.text(“people.txt”) | 讀取txt格式檔案,創建DataFrame |
| spark.read.csv (“people.csv”) | 讀取csv格式檔案,創建DataFrame |
| spark.read.text(“people.json”) | 讀取json格式檔案,創建DataFrame |
| spark.read.text(“people.parquet”) | 讀取parquet格式檔案,創建DataFrame |
1、在本地創建一個person.txt文本檔案,用于讀取:運行spark-shell:
# person.txt,Name,Age,Height
p1_name,18,165
p2_name,19,170
p3_name,20,188
p4_name,21,190
# 啟動spark shell,默認會創建一個spark名稱的spark session物件
$ spark-shell
# 定義變數,【注意】所有節點都得創建這個person檔案,要不然調度沒有這個檔案的機器會報錯
var inputFile = "file:///opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/test/person.txt"
# 讀取本地檔案
val personDF = spark.read.text("file:///opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/test/person.txt")
val personDF = spark.read.text(inputFile)
# 顯示
personDF.show()
# 將檔案put到hdfs上
# 讀取hdfs檔案(推薦)
val psersonDF = spark.read.text("hdfs:///person.txt")

2、有RDD轉換成DataFrame
| 動作 | 含義 |
|---|---|
| show() | 查看DataFrame中的具體內容資訊 |
| printSchema() | 查看DataFrame的Schema資訊 |
| select() | 查看DataFrame中選取部分列的資料及進行重命名 |
| filter() | 實作條件查詢,過濾出想要的結果 |
| groupBy() | 對記錄進行分組 |
| sort() | 對特定欄位進行排序操作 |
| toDF() | 把RDD資料型別轉成DataFarme |
# 讀取文本檔案,按逗號分割開來
val lineRDD = sc.textFile("hdfs:///person.txt").map(_.split(","))
case class Person(name:String, age:Int, height:Int)
# 按照樣式類對RDD資料進行分割成map
val personRDD = lineRDD.map(x => Person(x(0).toString, x(1).toInt, x(2).toInt))
# 把RDD資料型別轉成DataFarme
val personDF = personRDD.toDF()
# 查看這個表
personDF.show()
# 查看Schema資料
personDF.printSchema()
# 查看列
personDF.select(personDF.col("name")).show
# 過濾年齡小于25的
personDF.filter(col("age") >= 25).show


這里提供常用的spark dataframe方法:
| 方法名 | 含義 |
|---|---|
| collect() | 回傳值是一個陣列,回傳dataframe集合所有的行 |
| collectAsList() | 回傳值是一個java型別的陣列,回傳dataframe集合所有的行 |
| count() | 回傳一個number型別的,回傳dataframe集合的行數 |
| describe(cols: String*) | 回傳一個通過數學計算的類表值(count, mean, stddev, min, and max),這個可以傳多個引數,中間用逗號分隔,如果有欄位為空,那么不參與運算,只這對數值型別的欄位,例如df.describe("age", "height").show() |
| first() | 回傳第一行 ,型別是row型別 |
| head() | 回傳第一行 ,型別是row型別 |
| head(n:Int) | 回傳n行 ,型別是row 型別 |
| show() | 回傳dataframe集合的值 默認是20行,回傳型別是unit |
| show(n:Int) | 回傳n行,回傳值型別是unit |
| table(n:Int) | 回傳n行 ,型別是row 型別 |
| cache() | 同步資料的記憶體 |
| columns | 回傳一個string型別的陣列,回傳值是所有列的名字 |
| dtypes | 回傳一個string型別的二維陣列,回傳值是所有列的名字以及型別 |
| explan() | 列印執行計劃 物理的 |
| explain(n:Boolean) | 輸入值為 false 或者true ,回傳值是unit 默認是false ,如果輸入true 將會列印 邏輯的和物理的 |
| isLocal | 回傳值是Boolean型別,如果允許模式是local回傳true 否則回傳false |
| persist(newlevel:StorageLevel) | 回傳一個dataframe.this.type 輸入存盤模型型別 |
| printSchema() | 列印出欄位名稱和型別 按照樹狀結構來列印 |
| registerTempTable(tablename:String) | 回傳Unit ,將df的物件只放在一張表里面,這個表隨著物件的洗掉而洗掉了 |
| schema | 回傳structType 型別,將欄位名稱和型別按照結構體型別回傳 |
| toDF() | 回傳一個新的dataframe型別的 |
| toDF(colnames:String*) | 將引數中的幾個欄位回傳一個新的dataframe型別的 |
| unpersist() | 回傳dataframe.this.type 型別,去除模式中的資料 |
| unpersist(blocking:Boolean) | 回傳dataframe.this.type型別 true 和unpersist是一樣的作用false 是去除RDD |
| agg(expers:column*) | 回傳dataframe型別 ,同數學計算求值 |
| agg(exprs: Map[String, String]) | 回傳dataframe型別 ,同數學計算求值 map型別的 |
| agg(aggExpr: (String, String), aggExprs: (String, String)*) | 回傳dataframe型別 ,同數學計算求值 |
| apply(colName: String) | 回傳column型別,捕獲輸入進去列的物件 |
| as(alias: String) | 回傳一個新的dataframe型別,就是原來的一個別名 |
| col(colName: String) | 回傳column型別,捕獲輸入進去列的物件 |
| cube(col1: String, cols: String*) | 回傳一個GroupedData型別,根據某些欄位來匯總 |
| distinct | 去重 回傳一個dataframe型別 |
| drop(col: Column) | 洗掉某列 回傳dataframe型別 |
| dropDuplicates(colNames: Array[String]) | 洗掉相同的列 回傳一個dataframe |
| except(other: DataFrame) | 回傳一個dataframe,回傳在當前集合存在的在其他集合不存在的 |
| filter(conditionExpr: String) | 刷選部分資料,回傳dataframe型別 |
| groupBy(col1: String, cols: String*) | 根據某寫欄位來匯總回傳groupedate型別 |
| intersect(other: DataFrame) | 回傳一個dataframe,在2個dataframe都存在的元素 |
| join(right: DataFrame, joinExprs: Column, joinType: String) | 一個是關聯的dataframe,第二個關聯的條件,第三個關聯的型別:inner, outer, left_outer, right_outer, leftsemi |
| limit(n: Int) | 回傳dataframe型別 去n 條資料出來 |
| orderBy(sortExprs: Column*) | 做alise排序 |
| sort(sortExprs: Column*) | 排序 df.sort(df("age").desc).show(); 默認是asc |
| select(cols:string*) | dataframe 做欄位的刷選 df.select($"colA", $"colB" + 1) |
| withColumnRenamed(existingName: String, newName: String) | 修改串列 df.withColumnRenamed("name","names").show(); |
| withColumn(colName: String, col: Column) | 增加一列 df.withColumn("aa",df("name")).show(); |
這里已經列出了很多常用方法了,基本上涵蓋了大部分操作,當然也可以參考官方檔案
2、SQL風格語法操作
DataFrame的一個強大之處就是我們可以將它看作是一個關系型資料表,然后可以通過在程式中使用spark.sql() 來執行SQL查詢,結果將作為一個DataFrame回傳,因為spark session包含了Hive Context,所以spark.sql() 會自動啟動連接hive,默認模式就是hive里的local模式(內嵌derby)
啟動spark-shell
$ spark-shell
會在執行spark-shell當前目錄下生成兩個檔案:derby.log,metastore_db

接下來就可以happy的寫sql了,這里就演示幾個命令,跟之前的hive一樣,把sql陳述句放在spark.sql()方法里執行即可,不清楚hive sql的可以參考我之前的文章:大資料Hadoop之——資料倉庫Hive
# 有個默認default庫
$ spark.sql("show databases").show
# 默認當前庫是default
$ spark.sql("show tables").show

通過spark-sql啟動spark shell
操作就更像sql語法了,已經跟hive差不多了,接下來演示幾個命令,大家就很清楚了,
$ spark-sql
show databases;
create database test007
同樣也會在當前目錄下自動創建兩個檔案:derby.log,metastore_db

3)DataSet
DataSet是分布式的資料集合,Dataset提供了強型別支持,也是在RDD的每行資料加了型別約束,DataSet是在Spark1.6中添加的新的介面,它集中了RDD的優點(強型別和可以用強大lambda函式)以及使用了Spark SQL優化的執行引擎,DataSet可以通過JVM的物件進行構建,可以用函式式的轉換(map/flatmap/filter)進行多種操作,
1、通過spark.createDataset通過集合進行創建dataSet
val ds1 = spark.createDataset(1 to 10)
ds1.show

2、從已經存在的rdd當中構建dataSet
官方檔案
val ds2 = spark.createDataset(sc.textFile("hdfs:////person.txt"))

3、通過樣例類配合創建DataSet
case class Person(name:String,age:Int)
val personDataList = List(Person("zhangsan",18),Person("lisi",28))
val personDS = personDataList.toDS
personDS.show

4、通過DataFrame轉化生成
Music.json檔案內容如下:
{"name":"上海灘","singer":"葉麗儀","album":"香港電視劇主題歌","path":"mp3/shanghaitan.mp3"}
{"name":"一生何求","singer":"陳百強","album":"香港電視劇主題歌","path":"mp3/shanghaitan.mp3"}
{"name":"紅日","singer":"李克勤","album":"懷舊專輯","path":"mp3/shanghaitan.mp3"}
{"name":"愛如潮水","singer":"張信哲","album":"懷舊專輯","path":"mp3/airucaoshun.mp3"}
{"name":"紅茶館","singer":"陳惠嫻","album":"懷舊專輯","path":"mp3/redteabar.mp3"}
case class Music(name:String,singer:String,album:String,path:String)
# 注意把test.json傳到hdfs上
val jsonDF = spark.read.json("hdfs:///Music.json")
val jsonDS = jsonDF.as[Music]
jsonDS.show

RDD,DataFrame,DataSet互相轉化

四、RDD、DataFrame和DataSet的共性與區別

-
RDD[Person]:以Person為型別引數,但不了解 其內部結構,
-
DataFrame:提供了詳細的結構資訊schema(結構)列的名稱和型別,這樣看起來就像一張表了
-
DataSet[Person]:不光有schema(結構)資訊,還有型別資訊
1)共性
- 三者都是spark平臺下的分布式彈性資料集,為處理超大型資料提供便利
- 三者都有惰性機制,在創建時、轉換時(如map)不會立即執行,只有在遇到action算子的時候(比如foreach),才開始進行觸發計算,極端情況下,如果代碼中只有創建、轉換,但是沒有在后面的action中使用對應的結果,在執行時會被跳過,
- 三者都有partition的概念,都有快取(cache)的操作,還可以進行檢查點操作(checkpoint)
- 三者都有許多共同的函式(如map、filter,sorted等等),
在對DataFrame和DataSet操作的時候,大多數情況下需要引入隱式轉換(ssc.implicits._)
2)區別
- DataFrame:DataFrame是DataSet的特例,也就是說DataSet[Row]的別名;DataFrame = RDD + schema
- DataFrame的每一行的固定型別為Row,只有通過決議才能獲得各個欄位的值
- DataFrame與DataSet通常與spark ml同時使用
- DataFrame與DataSet均支持sparkSql操作,比如select,groupby等,也可以注冊成臨時表,進行sql陳述句操作
- DataFrame與DateSet支持一些方便的保存方式,比如csv,可以帶上表頭,這樣每一列的欄位名就可以一目了然
- DataSet:DataSet = RDD + case class
- DataSet與DataFrame擁有相同的成員函式,區別只是只是每一行的資料型別不同,
- DataSet的每一行都是case class,在自定義case class之后可以很方便的獲取每一行的資訊
五、spark-shell
Spark的shell作為一個強大的互動式資料分析工具,提供了一個簡單的方式學習API,它可以使用Scala(在Java虛擬機上運行現有的Java庫的一個很好方式)或Python,spark-shell的本質是在后臺呼叫了spark-submit腳本來啟動應用程式的,在spark-shell中會創建了一個名為sc的SparkContext物件,
【注】spark-shell只能以client方式啟動,
查看幫助
$ spark-shell --help

spark-shell常用選項
--master MASTER_URL 指定模式(spark://host:port, mesos://host:port, yarn,
k8s://https://host:port, or local (Default: local[*]))
--executor-memory MEM 指定每個Executor的記憶體,默認1GB
--total-executor-cores NUM 指定所有Executor所占的核數
--num-executors NUM 指定Executor的個數
--help, -h 顯示幫助資訊
--version 顯示版本號
從上面幫助看,spark有五種運行模式:spark、mesos、yarn、k8s、local,這里主要講local和yarn模式
| Master URL | 含義 |
|---|---|
| local | 在本地運行,只有一個作業行程,無并行計算能力 |
| local[K] | 在本地運行,有 K 個作業行程,通常設定 K 為機器的CPU 核心數量 |
| local[*] | 在本地運行,作業行程數量等于機器的 CPU 核心數量, |
| spark://HOST:PORT | 以 Standalone 模式運行,這是 Spark 自身提供的集群運行模式,默認埠號: 7077 |
| mesos://HOST:PORT | 在 Mesos 集群上運行,Driver 行程和 Worker 行程運行在 Mesos 集群上,部署模式必須使用固定值:--deploy-mode cluster |
| yarn | 在yarn集群上運行,依賴于hadoop集群,yarn資源調度框架,將應用提交給yarn,在ApplactionMaster(相當于Stand alone模式中的Master)中運行driver,在集群上調度資源,開啟excutor執行任務, |
| k8s | 在k8s集群上運行 |
1)local
在Spark Shell中,有一個專有的SparkContext已經為您創建好了,變數名叫做sc,自己創建的SparkContext將無法作業,可以用--master引數來設定SparkContext要連接的集群,用--jars來設定需要添加到CLASSPATH的jar包,如果有多個jar包,可以使用逗號分隔符連接它們,例如,在一個擁有2核的環境上運行spark-shell,使用:
#資源存盤的位置,默認為本地,以及使用什么調度框架 ,默認使用的是spark內置的資源管理和調度框架Standalone
# local單機版,只占用一個執行緒,local[*]占用當前所有執行緒,local[2]:2個CPU核運行
$ spark-shell --master local[2]
# --master 默認為 local[*]
#默認使用集群最大的記憶體大小
--executor-memorty
#默認使用最大核數
--total-executor-cores
$ spark-shell --master local[*] --executor-memory 1g --total-executor-cores 1

Web UI地址:http://hadoop-node1:4040

隨后,就可以使用spark-shell內使用Scala語言完成一定的操作,這里做幾個簡單的操作,有興趣的話,可以自行去了解scala
val textFile = sc.textFile("file:///opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/README.md")
textFile.count()
textFile.first()

其中,count代表RDD中的總資料條數;first代表RDD中的第一行資料,
2)on Yarn(推薦)
# on yarn,也可以在組態檔中修改這個欄位spark.master
$ spark-shell --master yarn
--master用來設定context將要連接并使用的資源主節點,master的值是standalone模式中spark的集群地址、yarn或mesos集群的URL,或是一個local地址,
六、SparkSQL和Hive的集成(Spark on Hive)
1)創建軟鏈接
$ ln -s /opt/bigdata/hadoop/server/apache-hive-3.1.2-bin/conf/hive-site.xml /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/conf/hive-site.xml
2)復制 hive lib目錄 下的mysql連接jar包到spark的jars下
$ cp /opt/bigdata/hadoop/server/apache-hive-3.1.2-bin/lib/mysql-connector-java-5.1.49-bin.jar /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/jars/
3)配置
# 創建spark日志在hdfs存盤目錄
$ hadoop fs -mkdir -p /tmp/spark
$ cd /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/conf
$ cp spark-defaults.conf.template spark-defaults.conf
在spark-defaults.conf追加如下配置:
# 使用yarn模式
spark.master yarn
spark.eventLog.enabled true
spark.eventLog.dir hdfs://hadoop-node1:8082/tmp/spark
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 512m
spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
4)啟動 spark-shell操作Hive(local)
支持多用戶得啟動metastore服務
$ nohup hive --service metastore &
$ ss -atnlp|grep 9083
在hive-site.xml加入如下配置:
<property>
<name>hive.metastore.uris</name>
<value>thrift://hadoop-node1:9083</value>
</property>
啟動spark-sql
# yarn模式,--master yarn可以不帶,因為上面在組態檔里已經配置了yarn模式了
$ spark-sql --master yarn
show databases;

從上圖就可發現,已經查到我之前創建的庫了,說明已經集成ok了,
七、Spark beeline
Spark Thrift Server 是 Spark 社區基于 HiveServer2 實作的一個 Thrift 服務,旨在無縫兼容
HiveServer2,因為 Spark Thrift Server 的介面和協議都和 HiveServer2 完全一致,因此我們部署好Spark Thrift Server后,可以直接使用hive的beeline訪問Spark Thrift Server執行相關陳述句,Spark Thrift Server 的目的也只是取代 HiveServer2,因此它依舊可以和 Hive Metastore進行互動,獲取到 hive 的元資料,
1)Spark Thrift Server架構于HiveServer2架構對比

2)Spark Thrift Server和HiveServer2的區別
| Hive on Spark | Spark Thrift Server | |
|---|---|---|
| 任務提交模式 | 每個session都會創建一個RemoteDriver,也就是對于一個Application,之后將sql決議成執行的物理計劃序列化后發到RemoteDriver執行 | 本身的Server服務就是一個Driver,直接接收sql執行,也就是所有的session都共享一個Application |
| 性能 | 性能一般 | 如果存盤格式是orc或者parquet,性能會比hive高幾倍,某些陳述句甚至會高幾十倍,其他格式的話,性能相差不是很大,有時hive性能會更好 |
| 并發 | 如果任務執行不是異步的,就是在thrift的worker執行緒中執行,受worker執行緒數量的限制,異步的話則放到執行緒池執行,并發度受異步執行緒池大小限制, | 處理任務的模式和Hive一樣, |
| sql兼容 | 主要支持ANSI SQL 2003,但并不完全遵守,只是大部分支持,并擴展了很多自己的語法 | Spark SQL也有自己的實作標準,因此和hive不會完全兼容,具體哪些陳述句會不兼容需要測驗才能知道 |
| HA | 可以通過zk實作HA | 沒有內置的HA實作,不過spark社區提了一個issue并帶上了patch,可以拿來用:https://issues.apache.org/jira/browse/SPARK-11100 |
【總結】Spark Thrift Server說白了就是小小的改動了下HiveServer2,代碼量也不多,雖然介面和HiveServer2完全一致,但是它以單個Application在集群運行的方式還是比較奇葩的,可能官方也是為了實作簡單而沒有再去做更多的優化,
3)配置啟動Spark Thrift Server
1、配置hive-site.xml
<!-- hs2埠 -->
<property>
<name>hive.server2.thrift.port</name>
<value>11000</value>
</property>
2、啟動spark thriftserver服務(不能起hs2,因為配置是一樣的,會有沖突)
$ cd /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/sbin
$ ./start-thriftserver.sh
$ ss -tanlp|grep 11000

3、啟動beeline操作
# 為了和hive的區別,這里使用絕對路徑啟動
$ cd /opt/bigdata/hadoop/server/spark-3.2.0-bin-hadoop3.2/bin
# 操作跟hive操作一模一樣,只是計算引擎不一樣了,換成了spark了
$ ./beeline
!connect jdbc:hive2://hadoop-node1:11000
show databases;

訪問HDFS WEB UI:http://hadoop-node1:8088/cluster/apps



八、Spark Streaming
Spark Streaming與其他大資料框架Storm、Flink一樣,Spark Streaming是基于Spark Core基礎之上用于處理實時計算業務的框架,其實作就是把輸入的流資料進行按時間切分,切分的資料塊用離線批處理的方式進行并行計算處理,原理如下圖:

支持多種資料源獲取資料:

Spark處理的是批量的資料(離線資料),Spark Streaming實際上處理并不是像Strom一樣來一條處理一條資料,而是將接收到的實時流資料,按照一定時間間隔,對資料進行拆分,交給Spark Engine引擎,最終得到一批批的結果,

由于考慮到本篇文章篇幅太長,所以這里只是稍微提了一下,如果有時間會繼續補充Spark Streaming相關的知識點,請耐心等待……
官方檔案:https://spark.apache.org/docs/3.2.0/streaming-programming-guide.html

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/462967.html
標籤:其他
上一篇:Oracle備份與還原(實用版)
