最近在學習用spark 的GraphX框架實作近鄰傳播聚類演算法(AP)的并行化,但是代碼寫好后,迭代次數設定>30,運行中就會報錯java.lang.stackoverflowerror,我之前查了查,有可能是迭代次數過多導致lineage過長,但是checkpoint并沒有效果,該報錯還是報錯。在迭代次數設定不多就可以跑成功。本人是spark1.6.0環境,local模式。求有相關經驗的人看看,不勝感激!
演算法主體代碼:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.graphx.{Graph, TripletFields, VertexId}
/**
* Created by LCJ on 2017.3.19.
*/
class AP (
val graphInput: Graph[VertexData, EdgeData],
val lambda: Double, val iterations: Int, val threshold: Int,
val a: Double, val r: Double
)
extends Serializable {
private var graph = this.graphInput
private val lam = this.lambda
private val maxIterNum = this.iterations
private val thresholdNum = this.threshold
private val avaiInitial = this.a
private val respInitial = this.r
private def checkOutputPath(path: String): Unit = {
val fs = FileSystem.get(new Configuration())
if (fs.exists(new Path(path))) {
fs.delete(new Path(path), true)
}
}
private def balance(valuePrevious: Double, valueNow: Double): Double = {
lam * valuePrevious + (1 - lam) * valueNow
}
private def getExemplars(g: Graph[VertexData, EdgeData]): Set[VertexId] = {
g.aggregateMessages[(VertexId, Double)](
sendMsg = s => s.sendToSrc((s.dstId, s.attr.avai + s.attr.resp)),
mergeMsg = (a, b) => if (a._2 > b._2) a else b,
TripletFields.EdgeOnly
).map(v => v._2._1).collect().toSet
}
def run(): Unit = {
var prevG: Graph[VertexData, EdgeData] = null
var centers = Set[VertexId]()
var countForThreshold = 0 // 聚類中心不改變(演算法收斂)迭代次數計數
var flag = true
var iterCount = 0 // 總迭代次數計數
for (_ <- 1 to maxIterNum if flag) {
// 必須先用prevG保留住對原來圖的參考, 并在新圖產生后, 快速將舊圖徹底釋放掉.
// 否則, 十幾輪迭代后, 會有記憶體泄漏問題, 很快耗光作業快取空間
prevG = graph
// 更新r
val updating_r = graph.aggregateMessages[Seq[Double]](
sendMsg = s => s.sendToSrc(Seq(s.attr.similarity + s.attr.avai)),
mergeMsg = (a, b) => a ++ b,
TripletFields.EdgeOnly
)
val updated_r = Graph(updating_r, graph.edges)
.mapTriplets(t => {
val filtered = t.srcAttr.filter(_ != (t.attr.similarity + t.attr.avai))
val pool =
if (filtered.size < t.srcAttr.size - 1) filtered :+ (t.attr.similarity + t.attr.avai)
else filtered
val maxValue = if (pool.isEmpty) 0.0 else pool.max
EdgeData(t.attr.similarity, t.attr.avai, balance(t.attr.resp, t.attr.similarity - maxValue))
}, TripletFields.Src)
graph = Graph.fromEdges(updated_r.edges, VertexData(avaiInitial, respInitial))
// 更新a
val updating_a = graph.aggregateMessages[Double](
sendMsg = s => {
if (s.srcId != s.dstId) s.sendToDst(math.max(s.attr.resp, 0.0))
else s.sendToDst(s.attr.resp)
},
mergeMsg = (a, b) => a + b,
TripletFields.EdgeOnly
)
val updated_a = Graph(updating_a, graph.edges)
.mapTriplets(t => {
if (t.srcId != t.dstId) {
val a = balance(
t.attr.avai,
math.min(0.0, t.dstAttr - math.max(t.attr.resp, 0.0))
)
EdgeData(t.attr.similarity, a, t.attr.resp)
}
else {
val a = balance(
t.attr.avai,
t.dstAttr - t.attr.resp
)
EdgeData(t.attr.similarity, a, t.attr.resp)
}
}, TripletFields.Dst)
graph = Graph.fromEdges(updated_a.edges, VertexData(avaiInitial, respInitial)).persist()
iterCount += 1
// 每次更新r和a后判斷聚類中心有無變化
val centersTmp = getExemplars(graph)
if (centers == centersTmp) {
countForThreshold += 1
if (countForThreshold == thresholdNum) {
flag = false
println("Break!")
}
}
else {
centers = centersTmp
countForThreshold = 0
}
if (iterCount % 6 == 0) {
graph.cache()
graph.checkpoint()
println(graph.numVertices)
}
prevG.unpersistVertices()
prevG.edges.unpersist()
}
println("演算法總迭代次數: " + iterCount)
println("聚類中心不改變次數: " + countForThreshold)
println("Exemplars: " + centers)
// 確定每個點到聚類中心的分配情況
val clusterInfo = graph.aggregateMessages[(VertexId, Double, Double)](
sendMsg = s => s.sendToSrc((s.dstId, s.attr.similarity, s.attr.avai + s.attr.resp)),
mergeMsg = (a, b) => if (a._3 > b._3) a else b,
TripletFields.EdgeOnly
).persist()
// 將點的分配存入文本
checkOutputPath("member-exemplar")
clusterInfo.mapValues(s => s._1).saveAsTextFile("member-exemplar")
// 計算總誤差平方和
val WSSSE = clusterInfo.map(e => if (e._1 == e._2._1) 0.0 else math.pow(e._2._2, 2)).sum()
println("WSSSE: " + WSSSE)
clusterInfo.unpersist()
graph.unpersist()
}
}
報錯位置在我紅色的地方那個collect action操作。謝謝各位大神!
uj5u.com熱心網友回復:
頂一個,多來點大神指點uj5u.com熱心網友回復:
求大神
別沉!!
uj5u.com熱心網友回復:
Read this:GraphX StackOverflow錯誤診斷與修復程序
uj5u.com熱心網友回復:
你好,這個我看過,上面說修改的地方在spark1.2版本就已經包括進去了,我現在用的1.6,還是報錯是怎么回事?
uj5u.com熱心網友回復:
If you gave me a case can be reproduced locally, I will take a look. Are you running Spark 1.6.3?uj5u.com熱心網友回復:
I'm running Spark1.6.0-Hadoop2.6.0 with scala2.10.5.
If it's convenient for you, my QQ is 475168571 and I'm looking forward to asking you for advice.
uj5u.com熱心網友回復:
檢查下graph, 使用spark on yarn去執行, 讓yarn幫助資源調配然后有問題再進行除錯
uj5u.com熱心網友回復:
我去研究研究,yarn還沒試過,初學......
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/67628.html
標籤:Spark
