我有一個 Spark 資料框,我想根據前一行中的 2 列計算下一行的值。我知道如何僅對 1 行執行此操作(使用該lag()函式),但我不知道如何將前幾行中的這些值傳遞給接下來的幾行。
id | month | value | monthly_increment
1 | 01 | 100 | 2
1 | 02 | 200 | 3
1 | 03 | 600 | 4
1 | 04 | 2400 | 2
如您所見,“value”列的值乘以“monthly_increment”,并且它不斷影響該特定“id”的所有以下值。
如何使用 PySpark 做到這一點?
uj5u.com熱心網友回復:
在詢問 Spark 問題時,提供示例輸入資料框非常重要。你沒有,所以我假設你的輸入資料框看起來像這樣:
from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[('1', '01', 100, 2),
('1', '02', None, 3),
('1', '03', None, 4),
('1', '04', None, 2)],
['id', 'month', 'value', 'monthly_increment'])
火花 3.2
product您可以使用,lag和first視窗函式的組合填充缺失的列“值”值:
w = W.partitionBy('id').orderBy('month')
factor = F.product(F.lag('monthly_increment').over(w)).over(w)
df = df.withColumn('value', F.coalesce(F.first('value').over(w) * factor, 'value'))
df.show()
# --- ----- ------ -----------------
# | id|month| value|monthly_increment|
# --- ----- ------ -----------------
# | 1| 01| 100.0| 2|
# | 1| 02| 200.0| 3|
# | 1| 03| 600.0| 4|
# | 1| 04|2400.0| 2|
# --- ----- ------ -----------------
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/518676.html
