import org.apache.spark.broadcast._
import scala.collection.mutable.ArrayBuffer
import java.io._
object SimpleApp {
var broadcast1: Broadcast[ArrayBuffer[String]] = _
val ip_grp_start = ArrayBuffer[String]()
def matchword(s: (String, Int)): List[(String ,Int)] = {
val fw = new FileWriter("/home/hadoop/a.txt", true)
val out = new PrintWriter(fw)
out.println("11111111111111111"+broadcast1.value.length)
out.close()
return List(s)
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val line = sc.textFile("wordcount.txt")
ip_grp_start += "fsda"
ip_grp_start += "dsfsf"
broadcast1 = line.sparkContext.broadcast(ip_grp_start)
val words = line.flatMap(line => line.split(" "))
val wordpair = words.map(word => (word,1))
val word = wordpair.flatMap(x => matchword(x))
val pair = word.reduceByKey(_+_)
pair.collect().foreach(println)
sc.stop()
}
}
uj5u.com熱心網友回復:
17/02/19 15:07:46 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.1.2:56023 (size: 2.8 KB, free: 366.3 MB)17/02/19 15:07:47 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.2:56023 (size: 20.4 KB, free: 366.3 MB)
17/02/19 15:07:48 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 192.168.1.2, executor 0): java.lang.NullPointerException
at SimpleApp$.matchword(SimpleApp.scala:20)
at SimpleApp$$anonfun$4.apply(SimpleApp.scala:38)
at SimpleApp$$anonfun$4.apply(SimpleApp.scala:38)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
上面是錯誤報的空指標的錯誤。其實就是傳到閉包函式里,這個廣播變數的值為空。
uj5u.com熱心網友回復:
這是我寫的測驗廣播變數函式的例子,不知道為何傳進去會為空,如果我在運行時候去掉--master引數,就會變成單機運行,運行就不會報任何錯誤。求大神解釋。。uj5u.com熱心網友回復:
The reason is that your broadcast var is defined in the class/object level. When the class is initialized in the worker node, it will ony see a NULL, instead of the value you assigned in main method.Change the broadcast scope to the main method. You can pass the value to your method.
See the detail example here:
https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/ChapterSixExample.scala#L75
val signPrefixes = sc.broadcast(loadCallSignTable())
val countryContactCounts = contactCounts.map{case (sign, count) =>
val country = lookupInArray(sign, signPrefixes.value)
(country, count)
}.reduceByKey((x, y) => x + y)
See how the signPrefixes broadcast val is defined in the main method, and how its value passed to lookupInArray method.
uj5u.com熱心網友回復:
我也碰到這個問題,還沒有找到解決方法。我估計是broadcast變數被發送到Executor上的時間導致的,
也就是說 還沒有執行 broadcast1 = line.sparkContext.broadcast(ip_grp_start) 這句代碼。broadcast1已經被發送到Executor上了。
我暫時的解決辦法是,不定義 var broadcast1。而是 val broadcast1 = line.sparkContext.broadcast(ip_grp_start),
然后將 broadcast1作為 matchword 函式的引數傳遞過去
def matchword(s: (String, Int), broadcast1: Broadcast[ArrayBuffer[String]]): List[(String ,Int)]
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/61426.html
標籤:Spark
