我有以下資料框
date | value | ID
--------------------------------------
2021-12-06 15:00:00 25 1
2021-12-06 15:15:00 35 1
2021-11-30 00:00:00 20 2
2021-11-25 00:00:00 10 2
我想和另一個這樣的 DF 一起加入這個 DF:
idUser | Name | Gender
-------------------
1 John M
2 Anne F
我的預期輸出是:
ID | Name | Gender | Value
---------------------------
1 John M 35
2 Anne F 20
我需要的是:僅獲取第一個資料幀的最新值,并僅將此值與我的第二個資料幀連接。雖然,我的 spark 腳本加入了這兩個值:
我的代碼:
df = df1.select(
col("date"),
col("value"),
col("ID"),
).OrderBy(
col("ID").asc(),
col("date").desc(),
).groupBy(
col("ID"), col("date").cast(StringType()).substr(0,10).alias("date")
).agg (
max(col("value")).alias("value")
)
final_df = df2.join(
df,
(col("idUser") == col("ID")),
how="left"
)
當我執行這個連接(格式化列在這篇文章中被抽象出來)時,我有以下輸出:
ID | Name | Gender | Value
---------------------------
1 John M 35
2 Anne F 20
2 Anne F 10
我substr用來洗掉小時和分鐘以僅按日期過濾。但是當我在不同的日子有相同的 ID 時,我的輸出 df 有 2 個值而不是最近的值。我怎樣才能解決這個問題?
注意:我僅使用 pyspark 函式來執行此操作(我現在想使用spark.sql(...))。
uj5u.com熱心網友回復:
您可以在 pysaprk 中使用window和row_number運行
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window.partitionBy("ID").orderBy("date").desc()
df1_latest_val = df1.withColumn("row_number", row_number().over(windowSpec)).filter(
f.col("row_number") == 1
)
表的輸出df1_latest_val看起來像這樣
date | value | ID | row_number |
-----------------------------------------------------
2021-12-06 15:15:00 35 1 1
2021-11-30 00:00:00 20 2 1
現在您將擁有帶有最新 val 的 df,您可以直接將其與另一個表連接。
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/378887.html
