我有一個包含以下資料的 Spark 資料框:
val df = sc.parallelize(Seq(
(1, "A", "2022-01-01", 30, 0),
(1, "A", "2022-01-02", 20, 30),
(1, "B", "2022-01-03", 50, 20),
(1, "A", "2022-01-04", 10, 70),
(1, "B", "2022-01-05", 30, 60),
(1, "A", "2022-01-06", 0, 40),
(1, "C", "2022-01-07", 100,30),
(2, "D", "2022-01-08", 5, 0)
)).toDF("id", "event", "eventTimestamp", "amount", "expected")
display(df)
| ID | 事件 | 事件時間戳 | 數量 | 預期的 |
|---|---|---|---|---|
| 1 | “一個” | “2022-01-01” | 30 | 0 |
| 1 | “一個” | “2022-01-02” | 20 | 30 |
| 1 | “乙” | “2022-01-03” | 50 | 20 |
| 1 | “一個” | “2022-01-04” | 10 | 70 |
| 1 | “乙” | “2022-01-05” | 30 | 60 |
| 1 | “一個” | “2022-01-06” | 0 | 40 |
| 1 | “C” | “2022-01-07” | 100 | 30 |
| 2 | “D” | “2022-01-08” | 5 | 0 |
我想為每一行找到以下內容:每個 id 和每個唯一事件的所有最后條目(當前行上方)的總和。期望的結果在“預期”列中。
例如,對于訂單“C”,我想獲得“A”和“B”的最新金額:30 0 = 30
我嘗試了以下查詢,但是它會總結所有先前訂單的數量,包括重復,(我不確定,是否可以對總和應用過濾器以僅采用不同的值):
val days = (x:Int) => x * 86400
val idWindow = Window.partitionBy("id").orderBy(col("eventTimestamp")
.cast("timestamp").cast("long"))
.rangeBetween(Window.unboundedPreceding, -days(1))
val res = df.withColumn("totalAmount", sum($"amount").over(idWindow))
請注意 rangeBetween 功能對我的用例很重要,應該保留。
uj5u.com熱心網友回復:
訣竅是將數量轉換為 (id, event) 對中的差異,這樣您就可以在下一步計算移動總和。該移動總和保持每個獨特事件的最新數量。
df
.withColumn("diff", coalesce($"amount" - lag($"amount", 1).over(wIdEvent), $"amount")).
.withColumn("sum", sum($"diff").over(wId)).
.withColumn("final", coalesce(lag($"sum", 1).over(wId), lit(0))).
.orderBy($"eventTimestamp").show
--- ----- -------------- ------ -------- ---- --- -----
| id|event|eventTimestamp|amount|expected|diff|sum|final|
--- ----- -------------- ------ -------- ---- --- -----
| 1| A| 2022-01-01| 30| 0| 30| 30| 0|
| 1| A| 2022-01-02| 20| 30| -10| 20| 30|
| 1| B| 2022-01-03| 50| 20| 50| 70| 20|
| 1| A| 2022-01-04| 10| 70| -10| 60| 70|
| 1| B| 2022-01-05| 30| 60| -20| 40| 60|
| 1| A| 2022-01-06| 0| 40| -10| 30| 40|
| 1| C| 2022-01-07| 100| 30| 100|130| 30|
| 2| D| 2022-01-08| 5| 0| 5| 5| 0|
--- ----- -------------- ------ -------- ---- --- -----
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/424391.html
上一篇:函式定義中的型別邊界錯誤
