最近研究用Spark做資料挖掘演算法實作,需要對RDD中的記錄抽樣,通過RDD的幾個方法 first(),sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong),take(num: Int),takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong)都能夠獲得指定的記錄,觀察它們的原始碼,發現最關鍵的部分都呼叫了SparkContext的方法runJob(......),于是有了下面的實驗,
進入spark-shell命令列環境
lj@ubuntu-1:~$ $SPARK_HOME/bin/spark-shell --master local
輸入以下代碼:
val conf = new SparkConf().setAppname("test").setMaster("local")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(1 to 100)
def func(sc: SparkContext, rdd: RDD[Int]) {
var id1 = 0; var id2 = 0
for (i <- 1 to 2) {
id1 = 22; id2 = 79
var temp = sc.runJob(rdd, (ite: Iterator[Int]) => ite.toSeq.apply(id2-id1), Seq(0), true).apply(0)
print(temp+"\n")
}
}
func(sc, rdd)
將Spark的運行模式設定為"local",因此只有上述rdd只有一個磁區,代碼的目的是想將58(在所有元素中的序號為57=id2-id1)
顯示兩次,但是一運行就報錯"org.apache.spark.SparkException: Task not serializable",經過多次嘗試之后,將代碼片段
var temp = sc.runJob(rdd, (ite: Iterator[Int]) => ite.toSeq.apply(id2-id1), Seq(0), true).apply(0)
更改為
var id3 = id2 - id1
var temp = sc.runJob(rdd, (ite: Iterator[Int]) => ite.toSeq.apply(id3), Seq(0), true).apply(0)
之后,代碼就沒問題了,運行成功,實在想不通為什么,請高手賜教,不勝感激!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/98757.html
標籤:Spark
