假設我有以下兩個資料框:
DF1:
---------- ---------- ----------
| Place|Population| IndexA|
---------- ---------- ----------
| A| Int| X_A|
| B| Int| X_B|
| C| Int| X_C|
---------- ---------- ----------
DF2:
---------- ----------
| City| IndexB|
---------- ----------
| D| X_D|
| E| X_E|
| F| X_F|
| ....| ....|
| ZZ| X_ZZ|
---------- ----------
上面的資料幀通常要大得多。
我想確定從每個from到哪個City( DF2) 的最短距離。距離可以根據指數計算。因此,對于 中的每一行,我必須遍歷中的每一行并根據索引的計算尋找最短距離。對于距離計算,定義了一個函式:PlaceDF1DF1DF2
val distance = udf(
(indexA: Long, indexB: Long) => {
h3.instance.h3Distance(indexA, indexB)
})
我嘗試了以下方法:
val output = DF1.agg(functions.min(distance(col("IndexA"), DF2.col("IndexB"))))
但這,代碼編譯但我收到以下錯誤:
執行緒“main” org.apache.spark.sql.AnalysisException 中的例外:已決議的屬性
H3Index#220L 從 Places#316,Population#330,IndexAx#338L 中丟失!Aggregate
[min(if ((isnull(IndexA) #338L) OR isnull(IndexB#220L))) null else UDF(knownnotnull(IndexA#338L), knownnotnull(IndexB#220L))) AS min(UDF(IndexA, IndexB))#346]。
所以我想我在DF2從其中取出一行時迭代每一行做錯了,DF1但我找不到解決方案。
我究竟做錯了什么?我是在正確的方向嗎?
uj5u.com熱心網友回復:
您收到此錯誤是因為您使用的索引列僅存在于您嘗試執行聚合的位置DF2,而不存在于DF1其中。
為了使該欄位可訪問并確定與所有點的距離,您需要
- 交叉連接
DF1并Df2讓每個索引Df1匹配的每個索引DF2 - 使用 udf 確定距離
- 在這個新的十字架上找到與距離連接的 udf 的最小值
這可能是這樣的:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, min, udf}
val distance = udf(
(indexA: Long, indexB: Long) => {
h3.instance.h3Distance(indexA, indexB)
})
val resultDF = DF1.crossJoin(DF2)
.withColumn("distance", distance(col("IndexA"), col("IndexB")))
//instead of using a groupby then matching the min distance of the aggregation with the initial df. I've chosen to use a window function min to determine the min_distance of each group (determined by Place) and filter by the city with the min distance to each place
.withColumn("min_distance", min("distance").over(Window.partitionBy("Place")))
.where(col("distance") === col("min_distance"))
.drop("min_distance")
這將導致資料框包含來自資料框和附加列的列distance。
注意。您當前將一個 df 中的每個專案與另一個 df 中的每個專案進行比較的方法是一項昂貴的操作。如果您有機會盡早進行過濾(例如加入啟發式列,即可能表明某個地點可能更接近城市的其他列),則建議這樣做。
讓我知道這是否適合您。
uj5u.com熱心網友回復:
如果你只有幾個城市(少于或大約 1000),你可以通過將城市收集在一個陣列中來避免crossJoin和Window洗牌,然后使用這個收集的陣列對每個地方執行距離計算:
import org.apache.spark.sql.functions.{array_min, col, struct, transform, typedLit, udf}
val citiesIndexes = df2.select("City", "IndexB")
.collect()
.map(row => (row.getString(0), row.getLong(1)))
val result = df1.withColumn(
"City",
array_min(
transform(
typedLit(citiesIndexes),
x => struct(distance(col("IndexA"), x.getItem("_2")), x.getItem("_1"))
)
).getItem("col2")
)
這段代碼適用于 Spark 3 及更高版本。如果您使用的是小于 3.0 的 Spark 版本,則應將array_min(...).getItem("col2")部分替換為用戶定義的函式。
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/344796.html
標籤:斯卡拉 数据框 阿帕奇火花 apache-spark-sql 超级api
上一篇:玩!FrameWork:我在將影像上傳到我的服務器時遇到問題
下一篇:我如何在golang中執行此遞回
