我有一個帶薪水的樣本資料集。我想將該工資分配到 3 個桶中,然后在每個桶中找到較低的工資,然后將其轉換為一個陣列并將其附加到原始集合中。我正在嘗試使用視窗函式來做到這一點。它似乎以漸進的方式進行。
這是我寫的代碼
val spark = sparkSession
import spark.implicits._
val simpleData = Seq(("James", "Sales", 3000),
("Michael", "Sales", 3100),
("Robert", "Sales", 3200),
("Maria", "Finance", 3300),
("James", "Sales", 3400),
("Scott", "Finance", 3500),
("Jen", "Finance", 3600),
("Jeff", "Marketing", 3700),
("Kumar", "Marketing", 3800),
("Saif", "Sales", 3900)
)
val df = simpleData.toDF("employee_name", "department", "salary")
val windowSpec = Window.orderBy("salary")
val ntileFrame = df.withColumn("ntile", ntile(3).over(windowSpec))
val lowWindowSpec = Window.partitionBy("ntile")
val ntileMinDf = ntileFrame.withColumn("lower_bound", min("salary").over(lowWindowSpec))
var rangeDf = ntileMinDf.withColumn("range", collect_set("lower_bound").over(windowSpec))
rangeDf.show()
我得到這樣的資料集
------------- ---------- ------ ----- ----------- ------------------
|employee_name|department|salary|ntile|lower_bound| range|
------------- ---------- ------ ----- ----------- ------------------
| James| Sales| 3000| 1| 3000| [3000]|
| Michael| Sales| 3100| 1| 3000| [3000]|
| Robert| Sales| 3200| 1| 3000| [3000]|
| Maria| Finance| 3300| 1| 3000| [3000]|
| James| Sales| 3400| 2| 3400| [3000, 3400]|
| Scott| Finance| 3500| 2| 3400| [3000, 3400]|
| Jen| Finance| 3600| 2| 3400| [3000, 3400]|
| Jeff| Marketing| 3700| 3| 3700|[3000, 3700, 3400]|
| Kumar| Marketing| 3800| 3| 3700|[3000, 3700, 3400]|
| Saif| Sales| 3900| 3| 3700|[3000, 3700, 3400]|
------------- ---------- ------ ----- ----------- ------------------
我希望資料集看起來像這樣
------------- ---------- ------ ----- ----------- ------------------
|employee_name|department|salary|ntile|lower_bound| range|
------------- ---------- ------ ----- ----------- ------------------
| James| Sales| 3000| 1| 3000|[3000, 3700, 3400]|
| Michael| Sales| 3100| 1| 3000|[3000, 3700, 3400]|
| Robert| Sales| 3200| 1| 3000|[3000, 3700, 3400]|
| Maria| Finance| 3300| 1| 3000|[3000, 3700, 3400]|
| James| Sales| 3400| 2| 3400|[3000, 3700, 3400]|
| Scott| Finance| 3500| 2| 3400|[3000, 3700, 3400]|
| Jen| Finance| 3600| 2| 3400|[3000, 3700, 3400]|
| Jeff| Marketing| 3700| 3| 3700|[3000, 3700, 3400]|
| Kumar| Marketing| 3800| 3| 3700|[3000, 3700, 3400]|
| Saif| Sales| 3900| 3| 3700|[3000, 3700, 3400]|
------------- ---------- ------ ----- ----------- ------------------
uj5u.com熱心網友回復:
為確保您的視窗考慮所有行而不僅僅是當前行之前的行,您可以使用rowsBetweenwithWindow.unboundedPreceding和Window.unboundedFollowing作為引數的方法。您的最后一行因此變為:
var rangeDf = ntileMinDf.withColumn(
"range",
collect_set("lower_bound")
.over(Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
)
你會得到以下rangeDf資料框:
------------- ---------- ------ ----- ----------- ------------------
|employee_name|department|salary|ntile|lower_bound| range|
------------- ---------- ------ ----- ----------- ------------------
| James| Sales| 3000| 1| 3000|[3000, 3700, 3400]|
| Michael| Sales| 3100| 1| 3000|[3000, 3700, 3400]|
| Robert| Sales| 3200| 1| 3000|[3000, 3700, 3400]|
| Maria| Finance| 3300| 1| 3000|[3000, 3700, 3400]|
| James| Sales| 3400| 2| 3400|[3000, 3700, 3400]|
| Scott| Finance| 3500| 2| 3400|[3000, 3700, 3400]|
| Jen| Finance| 3600| 2| 3400|[3000, 3700, 3400]|
| Jeff| Marketing| 3700| 3| 3700|[3000, 3700, 3400]|
| Kumar| Marketing| 3800| 3| 3700|[3000, 3700, 3400]|
| Saif| Sales| 3900| 3| 3700|[3000, 3700, 3400]|
------------- ---------- ------ ----- ----------- ------------------
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/366882.html
