我有一個如下所示的資料框,
----- ---------- --------- ------- -------------------
|jobid|fieldmname|new_value|coltype| createat|
----- ---------- --------- ------- -------------------
| 1| jobstage| sttaus1| null|2022-10-10 12:11:34|
| 1| jobstatus| sttaus2| status|2022-10-10 13:11:34|
| 1| jobstage| sttaus3| null|2022-10-10 14:11:34|
| 1| jobstatus| sttaus4| null|2022-10-10 15:11:34|
| 1| jobstatus| sttaus10| status|2022-10-10 16:11:34|
| 1| jobstatus| sttaus11| null|2022-10-10 17:11:34|
| 2| jobstage| sttaus1| null|2022-10-11 10:11:34|
| 2| jobstatus| sttaus2| status|2022-11-11 12:11:34|
----- ---------- --------- ------- -------------------
Seq(
(1, "jobstage", "sttaus1", "null", "2022-10-10 12:11:34"),
(1, "jobstatus", "sttaus2", "status", "2022-10-10 13:11:34"),
(1, "jobstage", "sttaus3", "null", "2022-10-10 14:11:34"),
(1, "jobstatus", "sttaus4", "null", "2022-10-10 15:11:34"),
(1, "jobstatus", "sttaus10", "status", "2022-10-10 16:11:34"),
(1, "jobstatus", "sttaus11", null, "2022-10-10 17:11:34"),
(2, "jobstage", "sttaus1", "null", "2022-10-11 10:11:34"),
(2, "jobstatus", "sttaus2", "status", "2022-11-10 12:11:34")
).toDF("jobid", "fieldmname", "new_value", "coltype", "createat")
只需為 fieldmname 為“jobstage”的行添加新列并添加值。并且該值應該是相應作業階段的最新狀態(檢查下一行)。在選擇 latest 時需要檢查 coltype 值是否為“狀態”。
預期的資料框:
----- ---------- --------- ------- ------------------- -------------
|jobid|fieldmname|new_value|coltype| createat|latest_status|
----- ---------- --------- ------- ------------------- -------------
| 1| jobstage| sttaus1| null|2022-10-10 12:11:34| sttaus2|
| 1| jobstatus| sttaus2| status|2022-10-10 13:11:34| |
| 1| jobstage| sttaus3| null|2022-10-10 14:11:34| sttaus10|
| 1| jobstatus| sttaus4| null|2022-10-10 15:11:34| |
| 1| jobstatus| sttaus10| status|2022-10-10 16:11:34| |
| 1| jobstatus| sttaus11| null|2022-10-10 17:11:34| |
| 2| jobstage| sttaus1| null|2022-10-11 10:11:34| sttaus2|
| 2| jobstatus| sttaus2| status|2022-11-11 12:11:34| |
----- ---------- --------- ------- ------------------- -------------
我嘗試了領先、滯后、行號,但沒有得到預期的結果。
uj5u.com熱心網友回復:
問題已標記pysparkfirst(),所以我正在撰寫一種使用視窗函式在 pyspark 中執行所需的方法。
data_sdf. \
withColumn('latest',
func.when(func.col('fieldmname') == 'jobstage',
func.first(func.when((func.col('coltype') == 'status') & (func.col('fieldmname') == 'jobstatus'), func.col('new_value')), ignorenulls=True).
over(wd.partitionBy('jobid').orderBy('createat').rowsBetween(0, sys.maxsize))
).
otherwise(func.lit(''))
). \
show()
# ----- ---------- --------- ------- ------------------- --------
# |jobid|fieldmname|new_value|coltype| createat| latest|
# ----- ---------- --------- ------- ------------------- --------
# | 1| jobstage| sttaus1| null|2022-10-10 12:11:34| sttaus2|
# | 1| jobstatus| sttaus2| status|2022-10-10 13:11:34| |
# | 1| jobstage| sttaus3| null|2022-10-10 14:11:34|sttaus10|
# | 1| jobstatus| sttaus4| null|2022-10-10 15:11:34| |
# | 1| jobstatus| sttaus10| status|2022-10-10 16:11:34| |
# | 1| jobstatus| sttaus11| null|2022-10-10 17:11:34| |
# | 2| jobstage| sttaus1| null|2022-10-11 10:11:34| sttaus2|
# | 2| jobstatus| sttaus2| status|2022-11-10 12:11:34| |
# ----- ---------- --------- ------- ------------------- --------
因此,它將考慮相應記錄中的第一條記錄,其中fieldmname是“jobstatus”和coltype“status”。
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/493052.html
