RDD、DataFrame、DataSet 三者的關系
? Spark1.0 => RDD
? Spark1.3 => DataFrame
? Spark1.6 => Dataset
如果同樣的資料都給到這三個資料結構,他們分別計算之后,都會給出相同的結果,不同是的他們的執行效率和執行方式,在后期的 Spark版本DataSet 有可能會逐步取代RDD和DataFrame 成為唯一的API 介面,
三者的共性
? RDD、DataFrame、DataSet 全都是 spark 平臺下的分布式彈性資料集,為處理超大型資料提供便利;
? 三者都有惰性機制,在進行創建、轉換,如map 方法時,不會立即執行,只有在遇到Action 如 foreach 時,三者才會開始遍歷運算;
? 三者有許多共同的函式,如 filter,排序等;
? 在對DataFrame 和Dataset 進行操作許多操作都需要這個包:import spark.implicits._(在創建好 SparkSession 物件后盡量直接匯入)
? 三者都會根據 Spark 的記憶體情況自動快取運算,這樣即使資料量很大,也不用擔心會記憶體溢位
? 三者都有 partition 的概念
? DataFrame 和DataSet 均可使用模式匹配獲取各個欄位的值和型別
三者的區別
RDD
? RDD一般和 spark mllib 同時使用
? RDD不支持 sparksql 操作
DataFrame
? 與RDD和Dataset 不同,DataFrame 每一行的型別固定為Row,每一列的值沒法直接訪問,只有通過決議才能獲取各個欄位的值
? DataFrame 與DataSet 一般不與 spark mllib 同時使用
? DataFrame 與DataSet 均支持 SparkSQL 的操作,比如 select,groupby 之類,還能注冊臨時表/視窗,進行 sql 陳述句操作
? DataFrame 與DataSet 支持一些特別方便的保存方式,比如保存成 csv,可以帶上表頭,這樣每一列的欄位名一目了然
DataSet
? Dataset 和DataFrame 擁有完全相同的成員函式,區別只是每一行的資料型別不同,DataFrame 其實就是DataSet 的一個特例 typeDataFrame = Dataset[Row]
? DataFrame 也可以叫Dataset[Row],每一行的型別是Row,不決議,每一行究竟有哪些欄位,各個欄位又是什么型別都無從得知,只能用上面提到的 getAS 方法或者共性中的第七條提到的模式匹配拿出特定欄位,而Dataset 中,每一行是什么型別是不一定的,在自定義了case class 之后可以很自由的獲得每一行的資訊
RDD、DataFrame、DataSet 三者的轉換
DataFrame與DataSet轉換
package sparkSQL.study
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, SparkSession}
object DF_DS {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DataFrame_DateSet...").setMaster("local[*]")
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
import sparkSession.implicits._
// TODO DataFrame <==> DataSet
val rdd = sparkSession.sparkContext.makeRDD(List(("zahngsan", 18), ("lisi", 20)))
val dataFrame = rdd.toDF("name", "age")
dataFrame.show()
val ds = dataFrame.as[UserDF_DS]
ds.show()
val dataFrame1 = ds.toDF()
dataFrame1.show()
sparkSession.stop()
}
case class UserDF_DS( name:String, age:Int)
}
DataFrame與RDD轉換
package sparkSQL.study
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object DF_RDD {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DataFrame_RDD...").setMaster("local[*]")
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
import sparkSession.implicits._
// TODO DataFrame <==> RDD
val rdd = sparkSession.sparkContext.makeRDD(List(("zahngsan", 18), ("lisi", 20)))
val dataFrame = rdd.toDF("name", "age")
dataFrame.show()
val rdd1 = dataFrame.rdd
rdd1.collect().foreach(println)
sparkSession.stop()
}
}
RDD與DataSet轉換
package sparkSQL.study
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object RDD_DS {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DataSet_RDD...").setMaster("local[*]")
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
import sparkSession.implicits._
// TODO RDD <==> DataSet
val rdd = sparkSession.sparkContext.makeRDD(List(("zahngsan", 18), ("lisi", 20)))
val ds = rdd.map {
case (name, age) => {
UserRDD_DS(name, age)
}
}.toDS()
ds.show()
val rdd1 = ds.rdd
rdd1.collect().foreach(println)
sparkSession.stop()
}
case class UserRDD_DS(name:String, age:Int)
}
所需依賴:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.1</version>
</dependency>
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/251721.html
標籤:其他
上一篇:Jmeter中添加JSR223 Sampler組件實作jmeter和python之間引數互相傳遞
下一篇:spring boot整合spark,集群模式或local模式運行,http請求呼叫spark API,啟動job任務配置、優化spark配置等
