剛接觸spark,很多東西不太明白,現在的需求是這樣的,假設我在外部宣告了一個欄位,在map中對這個欄位進行了賦值,然后在reduce中對這個欄位進行取值操作。我以wordcount為例,
object WordCount {
var str:String=null
def main(args: Array[String]) {
if (args.length < 1) {
System.err.println("Usage: <file>")
System.exit(1)
}
val conf = new SparkConf()
val sc = new SparkContext(conf)
val line = sc.textFile(args(0))
val counts= line.flatMap(_.split(" ")).map(word=>{
str="welcome to spark"
(word, 1)
})
println(str)
val finalRdd= counts.reduceByKey((x,y)=>{
x+y
println(str)
}).collect().foreach(println)
sc.stop()
}
}
這是一個簡單的wordcount代碼,我只是宣告了一個全域變數str,并且在map中對str進行了賦值操作,我想知道為什么map和reduce之間的那個println陳述句列印出來的str是null,是不是因為我沒有進行action操作,map陳述句還沒有執行的原因么?我希望將map中的str的賦值的內容傳到reduce中去,在reduce中進行操作,請問有什么辦法嗎?
uj5u.com熱心網友回復:
你定義的str是本地的變數,不能夠被集群中其他節點共享使用,你可以通過broadcast(廣播)出去,使所有worker節點共享此變數使用。uj5u.com熱心網友回復:
據我所知廣播變數不能再map中定義,而我的值需要通過map生成,需要將值傳給廣播變數,在序列化的時候會出現問題。uj5u.com熱心網友回復:
你分析的是對的 因為沒有進行action的操作,所以RDD只是記憶了操作,而并沒有進行操作。import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by mahuichao on 16/8/12.
*/
object Test04 {
var str: String = ""
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("test04").setMaster("local[2]")
val sc = new SparkContext(conf)
sc.setLogLevel("FATAL")
val path = "/Users/mahuichao/Downloads/test.txt"
val file = sc.textFile(path)
file.flatMap(_.split(" ")).map { word =>
str = "hello the crude world"
(word, (1, str))
// (word,1)
}.reduceByKey {
case (x: (Int, String), y: (Int, String)) =>
println("I am the value of str:" + str)
(x._1 + y._1, str)
}.map { case (x, (y1, y2)) =>
(x, y1)
}.collect().foreach(println)
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/75852.html
標籤:Spark
下一篇:如何解決spark記憶體溢位問題
