在使用RDD 的flatmap函式時,如果flatmap函式中回傳的陣列物件很多,比如幾十上百,會導致Spark運行特別慢,示例代碼:
val rdd2 = sc.textFile(strInputFilePath).flatMap(line => {
var grids = new ArrayBuffer[Tuple2[Tuple2[Int, Int], Double]]()
val values = line.split(",")
if (values.length > 3) {
try {
val br = broadcast.value
val x = values(br._6).toDouble
val y = values(br._7).toDouble
val point = new SPoint2D(x, y)
if (br._1.contains(point)) {
val xmin = point.x - br._2;
val ymin = point.y - br._2;
val xmax = point.x + br._2;
val ymax = point.y + br._2;
val rcBounds = br._1;
val IndexCols = br._4
val IndexRows = br._5
var col1 = (Math.floor((xmin - rcBounds.getLeft())
/ resolution)).toInt;
var col2 = (Math.floor((xmax - rcBounds.getLeft())
/ resolution)).toInt;
var row1 = -(Math.floor((ymax - rcBounds.getTop())
/ resolution)).toInt;
var row2 = -(Math.floor((ymin - rcBounds.getTop())
/ resolution)).toInt;
if (col1 < 0) {
col1 = 0;
} else if (col1 >= IndexCols) {
col1 = IndexCols - 1;
}
if (col2 < 0) {
col2 = 0;
} else if (col2 >= IndexCols) {
col2 = IndexCols - 1;
}
if (row1 < 0) {
row1 = 0;
} else if (row1 >= IndexRows) {
row1 = IndexRows - 1;
}
if (row2 < 0) {
row2 = 0;
} else if (row2 >= IndexRows) {
row2 = IndexRows - 1;
}
if (col1 > col2) {
var temp = col2;
col2 = col1;
col1 = temp;
}
if (row1 > row2) {
var temp = row2;
row2 = row1;
row1 = temp;
}
for (col <- col1.to(col2); row <- row1.to(row2)) {
val xtemp = rcBounds.getLeft() + col * br._3 + 0.5 * br._3;
val ytemp = rcBounds.getTop() - row * br._3 - 0.5 * br._3;
val distance = Math.sqrt((x - xtemp)
* (x - xtemp) + (y - ytemp)
* (y - ytemp));
if (distance <= br._2) {
val disPre = distance / br._2;
val valuePre = 1.0 * 3 * Math.pow(1 - disPre * disPre, 2) / br._8;
grids += new Tuple2((row, col), valuePre)
}
}
}
} catch {
case ex: Exception =>
ex.printStackTrace()
}
}
grids.toArray[Tuple2[Tuple2[Int, Int], Double]]
})
在上面代碼中,如果grids這個ArrayBuffer中物件數目比較多,這段代碼運行會非常慢。請問各位,有沒有好的解決思路和方法。
uj5u.com熱心網友回復:
請問你的問題解決了嗎?uj5u.com熱心網友回復:
I don't think Spark is slow, but your code is slow.You should benchmark your method (The whole line => { your implementation here }), to see how long it run. If it run for 1s, then assuming that your text file has 1,000,000 lines, you will run 1,000,000 seconds in sequence. So even spark can it as 1000 concurrently, your whole job will still take 1000 seconds.
So my suggestion is:
1) Try to benchmark how long your method runs? Try to improve it. I didn't read it carefully, but it looks like it can improve. The code smells bad
2) If there is no way to improve the method, then your place need from each line is:
val x = values(br._6).toDouble
val y = values(br._7).toDouble
So you can distinct the real combination of (values(br._6), values(br._7) first, so each unique combination will only run from your method once, instead of as right now.
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/67622.html
標籤:Spark
上一篇:如何應對云端資料存盤突發故障
下一篇:如何修改spark原始碼
