我對 PySpark 還很陌生,但我正在嘗試在我的代碼中使用最佳實踐。我有一個 PySpark 資料框,我想滯后多列,用滯后值替換原始值。例子:
ID date value1 value2 value3
1 2021-12-23 1.1 4.0 2.2
2 2021-12-21 2.4 1.6 11.9
1 2021-12-24 5.4 3.2 7.8
2 2021-12-22 4.2 1.4 9.0
1 2021-12-26 2.3 5.2 7.6
.
.
.
我想根據 獲取所有值,按ID排序date,然后將值滯后一些。我到目前為止的代碼:
from pyspark.sql import functions as F, Window
window = Window.partitionBy(F.col("ID")).orderBy(F.col("date"))
valueColumns = ['value1', 'value2', 'value3']
df = F.lag(valueColumns, offset=shiftAmount).over(window)
我想要的輸出是:
ID date value1 value2 value3
1 2021-12-23 Null Null Null
2 2021-12-21 Null Null Null
1 2021-12-24 1.1 4.0 2.2
2 2021-12-22 2.4 1.6 11.9
1 2021-12-26 5.4 3.2 7.86
.
.
.
我遇到的問題是,據我所知,F.lag只接受一列。我正在尋找有關如何最好地完成此任務的建議。我想我可以使用 for 回圈來附加移位的列或其他東西,但這似乎很不雅。謝謝!
uj5u.com熱心網友回復:
對列名的簡單串列理解應該可以完成這項作業:
df = df.select(
"ID", "date",
*[F.lag(c, offset=shiftAmount).over(window).alias(c) for c in valueColumns]
)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/405143.html
標籤:
