我正在嘗試將值陣列作為新列添加到 DataFrame。
例如:假設有一個 Array(4,5,10) 和一個資料框
---------- -----
| name | age |
---------- -----
| John | 32 |
| Elizabeth| 28 |
| Eric | 41 |
---------- -----
我的要求是將上述陣列作為新列添加到資料框中。我的預期輸出如下:
---------- ----- ------
| name | age | rank |
---------- ----- ------
| John | 32 | 4 |
| Elizabeth| 28 | 5 |
| Eric | 41 | 10 |
---------- ----- ------
我正在嘗試是否可以使用 rdd 和 zipWithIndex 來實作這一點。
df.rdd.zipWithIndex.map(_.swap).join(array_rdd.zipWithIndex.map(_.swap))
這導致了這種情況。
(0,([John, 32],4))
我想將上述 RDD 轉換回所需的資料幀。讓我知道如何實作這一目標。
除了使用 rdd 和 zipWithIndex 之外,是否有任何替代方法可用于實作所需的結果?最好的方法是什么?
PS:
更好理解的背景關系:
我正在使用 Xpress 優化套件來解決數學問題。Xpress 根據陣列接受輸入,并將結果輸出到陣列中。我將輸入作為 DataFrame 并將列提取為陣列(使用收集)并傳遞給 Xpress。Xpress 輸出 Array[Double] 作為解。我想將此解決方案作為一列添加回資料幀,并且解決方案陣列中的每個值都對應于資料幀在其索引處的行,即輸出陣列的索引“n”處的值對應于第 n 行資料框
uj5u.com熱心網友回復:
加入后,只需將結果映射到您要查找的內容。加入 RDD 后,您可以將其轉換回資料幀。
val originalDF = Seq(("John", 32), ("Elizabeth", 28), ("Eric", 41)).toDF("name", "age")
val rank = Array(4, 5, 10)
// convert to Seq first
val rankDF = rank.toSeq.toDF("rank")
val joined = originalDF.rdd.zipWithIndex.map(_.swap).join(rankDF.rdd.zipWithIndex.map(_.swap))
val finalRDD = joined.map{ case (k,v) => (k, v._1.getString(0), v._1.getInt(1), v._2.getInt(0)) }
val finalDF = finalRDD.toDF("id", "name", "age", "rank")
finalDF.show()
/*
--- --------- --- ----
| id| name|age|rank|
--- --------- --- ----
| 0| John| 32| 4|
| 1|Elizabeth| 28| 5|
| 2| Eric| 41| 10|
--- --------- --- ----
*/
我能想到的唯一替代方法是使用org.apache.spark.sql.functions.row_number()視窗函式。這本質上是通過向資料幀添加一個不斷增加的連續行號來實作相同的目標。
這樣做的缺點是大量資料混洗到一個磁區中,因為我們需要為資料幀中的所有行提供不重復的行號。如果您的資料非常大,這可能會導致記憶體不足問題。(注意:這可能不適用于您的情況,因為您提到您正在收集資料并且沒有提到任何記憶體問題)。
轉換為anrdd和使用的方法zipWithIndex是可以接受的解決方案,但一般不推薦從dataframe轉換為rdd,因為使用RDD而不是dataframe的性能差異。
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/415352.html
標籤:
上一篇:映射可能回傳多個值或單個值的集合
下一篇:如何使有狀態的API變得純粹
