廣播變數里面的值是從redis 里面讀取出來的,當redis 里面的資料變化之后,我也要更新spark 的廣播變數,不知道有沒有大佬們有沒有處理過類似的問題呢?能否提供一些相關的處理方式?
uj5u.com熱心網友回復:
我自己先頂一下,目前這個問題百度了很多 broadcast.doUnpersist(true);方法 這個方法是再driver 端執行的,也就是說整個spark 程式只會執行一邊,當所有任務執行器再計算rdd 的時候都不會取執行這個broadcast.doUnpersist(true); 這個方法;所以我在想spark 又上面機制能讓執行器強制中斷,然后回到driver 端 執行我想執行的操作,然后執行器再恢復執行任務! 這句話可能表述的會違背sprak 的某些原則,但是我想要的類似的效果,不知道各位大牛們有沒有相關的處理經驗分享一下!
uj5u.com熱心網友回復:
經過實驗 sparkstreaming 使用廣播變數 在yarn模式下是回報空例外的,所以還是使用別的方案uj5u.com熱心網友回復:
可以在spark的driver端創建一個執行緒,定時呼叫sc.broadcast,測驗過,可用uj5u.com熱心網友回復:
在yarn 模式上會報空例外uj5u.com熱心網友回復:
在yarn模式上可以操作嗎?我試過會報空例外的哦!可不可以貼你成功的代碼瞧瞧uj5u.com熱心網友回復:
樓主這個問題解決了嗎?我遇到同樣的問題,嘗試用物件的方式實體化廣播變數,并且序列化廣播出去,還是不行,
uj5u.com熱心網友回復:
這個問題用廣播變數還是不行,后面使用的是全域變數uj5u.com熱心網友回復:
在yarn模式上可以操作嗎?我試過會報空例外的哦!可不可以貼你成功的代碼瞧瞧 可以在spark的driver端創建一個執行緒,定時呼叫sc.broadcast,測驗過,可用
樓主這個問題解決了嗎?我遇到同樣的問題,嘗試用物件的方式實體化廣播變數,并且序列化廣播出去,還是不行,
uj5u.com熱心網友回復:
反正都是從Redis上讀取的,用的時候直接從Redis讀不就完了?為什么還要廣播呢?我們都是通過Redis代替廣播變數的。。。你這反過來沒有啥意義啊
uj5u.com熱心網友回復:
反正都是從Redis上讀取的,用的時候直接從Redis讀不就完了?為什么還要廣播呢?
我們都是通過Redis代替廣播變數的。。。你這反過來沒有啥意義啊
因為不想反復從redis取資料,而且還想定時更新,所以想用廣播變數并且能夠動態修改
uj5u.com熱心網友回復:
樓主有解決嗎?我嘗試在讀取本地檔案,序列化然后通過廣播變數廣播出去,并且定時更新。同樣是spark on yarn模式,現在是無法讀取更新的檔案值,更奇怪的是spark-submit再次提交作業,讀取出的檔案中的值還是更新前的值。不知道哪里寫的有問題,代碼如下:
object BroadcastWrapper {
@volatile private var broadcast:Broadcast[List[String]] = null
private var lastUpdatedTime:Date = Calendar.getInstance.getTime()
//決議組態檔中的Columns配置
def getProperties(filePath:String="/home/xxx/test.properties"):List[String]={
val fileStream = new FileInputStream(filePath)
val prop = new Properties()
prop.load(fileStream)
val value = prop.getProperty("columns").split(",").toList
println("value is *******************"+value)
value
}
def getInstance(sc:SparkContext,filePath:String="/home/z672898/lzw/test.properties"):Broadcast[List[String]] ={
if(broadcast==null){
synchronized{
if(broadcast==null)
broadcast = sc.broadcast(getProperties(filePath))
}
}
broadcast
}
def updateAndGet(sc:SparkContext,block:Boolean=false,filePath:String="/home/z672898/lzw/test.properties"): Broadcast[List[String]] ={
val currentTime = Calendar.getInstance().getTime
//1min = 60s = 60000ms
val date_diff = currentTime.getTime -lastUpdatedTime.getTime
//3min update
if(broadcast==null||date_diff>60000){
if(broadcast != null){
/**
* unpersist(blocking):把廣播變數從集群中所有保存該廣播變數的作業節點的記憶體中移除
* 布爾型別的blocking引數指定該操作是堵塞直至變數已經從所有節點洗掉,還是作為異步非堵塞操作執行.
* 如果希望立刻釋放記憶體,應該把這個引數設定為True
*/
broadcast.unpersist(block)
}
val columns = getProperties(filePath)
println("other broadcast:****************"+columns)
broadcast = sc.broadcast(columns)
//更新時間
lastUpdatedTime = Calendar.getInstance().getTime
}
broadcast
}
// 讀寫序列化
def writeObject(out:ObjectOutputStream): Unit ={
out.writeObject(broadcast)
}
def readObject(in:ObjectInputStream): Unit ={
in.readObject().asInstanceOf[Broadcast[List[String]]]
}
}
uj5u.com熱心網友回復:
使用廣播的話,這個是不可能實作的。可以說說人具體需求看看有沒有其他方案uj5u.com熱心網友回復:
可以使用廣播變數,但是好像只能在dstream.foreachRdd算子里面才能用,否則報空指標例外uj5u.com熱心網友回復:
可以實作的,從hdfs上更新檔案,每天定時更新廣播變數轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/25803.html
標籤:Spark
