RDD序列化
1. 閉包檢查
從計算的角度, 算子以外的代碼都是在Driver 端執行, 算子里面的代碼都是在 Executor端執行,那么在 scala 的函式式編程中,就會導致算子內經常會用到算子外的資料,這樣就形成了閉包的效果,如果使用的算子外的資料無法序列化,就意味著無法傳值給Executor端執行,就會發生錯誤,所以需要在執行任務計算前,檢測閉包內的物件是否可以進行序列化,這個操作我們稱之為閉包檢測,
注:Scala2.12 版本后閉包編譯方式發生了改變 ,
2. 序列化方法和屬性
從計算的角度, 算子以外的代碼都是在Driver 端執行, 算子里面的代碼都是在 Executor端執行,看如下代碼:
package com.atguigu.bigdata.spark.core.rdd.serial
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Spark01_RDD_Serial {
def main(args: Array[String]): Unit = {
//TODO 準備環境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
//3.創建一個 RDD
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))
val search = new Search("h")
// search.getMatch1(rdd).collect().foreach(println)
search.getMatch2(rdd).collect().foreach(println)
//TODO 關倍訓境
sc.stop()
}
// class Search(query:String) extends Serializable {
class Search(query:String){
def isMatch(s: String): Boolean = {
s.contains(query)
}
// 函式序列化案例
def getMatch1 (rdd: RDD[String]): RDD[String] = {
rdd.filter(isMatch)
}
// 屬性序列化案例
def getMatch2(rdd: RDD[String]): RDD[String] = {
val s = query
rdd.filter(x => x.contains(s))// 對應第二個rdd
}
}
}
3. Kryo 序列化框架
Java 的序列化能夠序列化任何的類,但是比較重(位元組多),序列化后,物件的提交也比較大,Spark 出于性能的考慮,Spark2.0 開始支持另外一種Kryo 序列化機制,Kryo 速度是Serializable 的10 倍,當 RDD 在Shuffle 資料的時候,簡單資料型別、陣列和字串型別已經在Spark 內部使用 Kryo 來序列化,
注意:即使使用Kryo 序列化,也要繼承Serializable 介面,
package com.atguigu.bigdata.spark.core.rdd.serial
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object serializable_Kryo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("SerDemo")
.setMaster("local[*]")
// 替換默認的序列化機制
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
// 注冊需要使用 kryo 序列化的自定義類
.registerKryoClasses(Array(classOf[Searcher]))
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello atguigu
"atguigu", "hahah"), 2)
val searcher = new Searcher("hello")
val result: RDD[String] = searcher.getMatchedRDD1(rdd)
result.collect.foreach(println)
}
}
case class Searcher(val query: String) {
def isMatch(s: String) = {
s.contains(query)
}
def getMatchedRDD1(rdd: RDD[String]) = {
rdd.filter(isMatch)
}
def getMatchedRDD2(rdd: RDD[String]) = {
val q = query
rdd.filter(_.contains(q))
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/295113.html
標籤:其他
上一篇:(2)大資料技術綜述總結
下一篇:Spark的RDD依賴關系
