我必須根據確定的條件添加具有開始日期的列:
STATUS.isin('CREATED', 'CREATED AGAIN')
對于每個 ID,我想從狀態為的日期開始,CREATION或者CREATE AGAIN我想重現資料集,為TIME視窗添加一個新的列磁區。
例如:
輸入:
| ID | $ | 地位 | 時間 |
|---|---|---|---|
| 1 | 050 | 已創建 | 2021-11-01 |
| 1 | 120 | 嘗試 | 2021-11-01 |
| 1 | 130 | 有效的 | 2021-11-01 |
| 1 | 200 | 行 | 2021-11-02 |
| 1 | 100 | 再次創建 | 2021-11-03 |
| 1 | 160 | 行 | 2021-11-03 |
| 1 | 200 | 進行中 | 2021-11-04 |
| 1 | 300 | 行 | 2021-11-05 |
| 1 | 1000 | 最后 | 2021-11-06 |
| 2 | ... | ... | ... |
輸出:
| ID | $ | 地位 | 時間 | 開始日期 |
|---|---|---|---|---|
| 1 | 050 | 已創建 | 2021-11-01 | 2021-11-01 |
| 1 | 120 | 嘗試 | 2021-11-01 | 2021-11-01 |
| 1 | 130 | 有效的 | 2021-11-01 | 2021-11-01 |
| 1 | 200 | 行 | 2021-11-02 | 2021-11-01 |
| 1 | 100 | 再次創建 | 2021-11-03 | 2021-11-03 |
| 1 | 160 | 行 | 2021-11-03 | 2021-11-03 |
| 1 | 200 | 進行中 | 2021-11-04 | 2021-11-03 |
| 1 | 300 | 行 | 2021-11-05 | 2021-11-03 |
| 1 | 1000 | 最后 | 2021-11-06 | 2021-11-03 |
| 2 | ... | ... | ... | ... |
uj5u.com熱心網友回復:
您可以group使用累積條件總和添加列,然后在按 磁區START_DATE的first("TIME")Window 上創建列:IDgroup
from pyspark.sql import functions as F, Window
w1 = Window.partitionBy("ID").orderBy("TIME")
w2 = Window.partitionBy("ID", "group").orderBy("TIME")
df1 = df.withColumn(
"group",
F.sum(F.when(F.col("STATUS").isin(['CREATED', 'CREATED AGAIN']), 1)).over(w1)
).withColumn(
"START_DATE",
F.first("TIME").over(w2)
).drop("group")
df1.show()
# --- ---- ------------- ---------- ----------
#| ID| $| STATUS| TIME|START_DATE|
# --- ---- ------------- ---------- ----------
#| 1| 050| CREATED|2021-11-01|2021-11-01|
#| 1| 120| ATTEMPTING|2021-11-01|2021-11-01|
#| 1| 130| VALID|2021-11-01|2021-11-01|
#| 1| 200| OK|2021-11-02|2021-11-01|
#| 1| 100|CREATED AGAIN|2021-11-03|2021-11-03|
#| 1| 160| OK|2021-11-03|2021-11-03|
#| 1| 200| ONGOING|2021-11-04|2021-11-03|
#| 1| 300| OK|2021-11-05|2021-11-03|
#| 1|1000| FINAL|2021-11-06|2021-11-03|
# --- ---- ------------- ---------- ----------
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/419846.html
標籤:
