我想在多列上對 Pyspark 進行前向填充。如果列的起始值為“NaN”,則將其替換為 0。下面是我的 DF 的樣子。
| 開始時間戳 | 第一列 | 第 2 列 | 第 3 列 | 第 4 列 |
|---|---|---|---|---|
| 2020-11-02 08:51:50 | 2 | 空值 | 空值 | 空值 |
| 2020-11-02 09:14:29 | 空值 | 空值 | 空值 | 40 |
| 2020-11-02 09:18:32 | 空值 | 4 | 2 | 空值 |
| 2020-11-02 09:32:42 | 4 | 空值 | 空值 | 空值 |
| 2020-11-03 13:06:03 | 空值 | 空值 | 空值 | 20 |
| 2020-11-03 13:10:01 | 6 | 空值 | 4 | 空值 |
| 2020-11-03 13:54:38 | 空值 | 5 | 空值 | 空值 |
| 2020-11-03 14:46:25 | 空值 | 空值 | 空值 | 空值 |
| 2020-11-03 14:57:31 | 7 | 空值 | 空值 | 10 |
| 2020-11-03 15:07:07 | 8 | 7 | 空值 | 空值 |
預期 DF 將是:
| 開始時間戳 | 第一列 | 第 2 列 | 第 3 列 | 第 4 列 |
|---|---|---|---|---|
| 2020-11-02 08:51:50 | 2 | 0 | 0 | 0 |
| 2020-11-02 09:14:29 | 2 | 0 | 0 | 40 |
| 2020-11-02 09:18:32 | 2 | 4 | 2 | 40 |
| 2020-11-02 09:32:42 | 4 | 4 | 2 | 40 |
| 2020-11-03 13:06:03 | 4 | 4 | 2 | 20 |
| 2020-11-03 13:10:01 | 6 | 4 | 4 | 20 |
| 2020-11-03 13:54:38 | 6 | 5 | 4 | 20 |
| 2020-11-03 14:46:25 | 6 | 5 | 4 | 20 |
| 2020-11-03 14:57:31 | 7 | 5 | 4 | 10 |
| 2020-11-03 15:07:07 | 8 | 7 | 4 | 10 |
下面是我在stackoverflow上試過的代碼:
from pyspark.sql import Window
from pyspark.sql.functions import last,first
from pyspark.sql.functions import col, max as max_, min as min_
import sys
def stringReplaceFunc(x, y):
return F.when(x != y, x).otherwise(F.lit(None)) # replace with NULL
def forwardFillImputer(df, cols=[], partitioner="start_timestamp", value="null"):
for i in cols:
window = Window\
.partitionBy(F.month(partitioner))\
.orderBy(partitioner)\
.rowsBetween(-sys.maxsize, 0)
df= df\
.withColumn(i, stringReplaceFunc(F.col(i), value))
fill = F.last(df[i], ignorenulls=True).over(window)
df= df.withColumn(i, fill)
return df
df= forwardFillImputer(df, cols=[i for i in df.columns])
代碼不起作用,請讓我知道我在做什么錯誤。請讓我知道是否有任何替代解決方案。謝謝。
uj5u.com熱心網友回復:
在您當前的代碼中,您不應該按月對視窗進行磁區,使用rowsBetween是無用的。你應該只有一個有序的視窗start_timestamp
此外,當沒有最后一個值時,您沒有管理案例。您可以使用coalesce文字值來管理它'0'
因此,您的代碼可以重寫如下:
from pyspark.sql import functions as F
from pyspark.sql import Window
def forwardFillImputer(df, cols=[], partitioner='start_timestamp', value='null'):
for c in cols:
df = df.withColumn(c, F.when(F.col(c) != value, F.col(c)))
df = df.withColumn(c, F.coalesce(F.col(c), F.last(c, True).over(Window.orderBy(partitioner)), F.lit('0')))
return df
df = forwardFillImputer(df, df.columns)
使用以下資料框df:
------------------- ------- ------- ------- -------
|start_timestamp |Column1|Column2|Column3|Column4|
------------------- ------- ------- ------- -------
|2020-11-02 08:51:50|2 |null |null |null |
|2020-11-02 09:14:29|null |null |null |40 |
|2020-11-02 09:18:32|null |4 |2 |null |
|2020-11-02 09:32:42|4 |null |null |null |
|2020-11-03 13:06:03|null |null |null |20 |
|2020-11-03 13:10:01|6 |null |4 |null |
|2020-11-03 13:54:38|null |5 |null |null |
|2020-11-03 14:46:25|null |null |null |null |
|2020-11-03 14:57:31|7 |null |null |10 |
|2020-11-03 15:07:07|8 |7 |null |null |
------------------- ------- ------- ------- -------
您將獲得以下輸出:
------------------- ------- ------- ------- -------
|start_timestamp |Column1|Column2|Column3|Column4|
------------------- ------- ------- ------- -------
|2020-11-02 08:51:50|2 |0 |0 |0 |
|2020-11-02 09:14:29|2 |0 |0 |40 |
|2020-11-02 09:18:32|2 |4 |2 |40 |
|2020-11-02 09:32:42|4 |4 |2 |40 |
|2020-11-03 13:06:03|4 |4 |2 |20 |
|2020-11-03 13:10:01|6 |4 |4 |20 |
|2020-11-03 13:54:38|6 |5 |4 |20 |
|2020-11-03 14:46:25|6 |5 |4 |20 |
|2020-11-03 14:57:31|7 |5 |4 |10 |
|2020-11-03 15:07:07|8 |7 |4 |10 |
------------------- ------- ------- ------- -------
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/378891.html
