我有以下 PySpark 資料:
--------- --------- --------- -------------------
|event_id |user_id | status| created_at|
--------- --------- --------- -------------------
| 1| 2| a|2017-05-26 15:12:54|
| 1| 2| b|2017-05-26 15:12:53|
| 2| 1| a|2017-05-26 15:12:56|
| 1| 2| b|2017-05-26 16:12:57|
| 2| 1| c|2017-05-26 16:12:58|
| 2| 1| b|2017-05-26 16:12:58|
| 3| 1| b|2017-05-26 14:17:58|
--------- --------- --------- -------------------
對于每一對(event_id, user_id)(這是主鍵,資料是從資料庫中提取的)我想根據created_at每個的最高值創建新列status,并null為沒有資料的對創建新列。對于以上資料:
--------- --------- ------------------- ------------------- -------------------
|event_id |user_id | a| b| c|
--------- --------- ------------------- ------------------- -------------------
| 1| 2|2017-05-26 15:12:54|2017-05-26 16:12:57| null|
| 2| 1|2017-05-26 15:12:56| null|2017-05-26 16:12:58|
| 3| 1| null|2017-05-26 14:17:58| null|
--------- --------- ------------------- ------------------- -------------------
我的解決方案非常復雜、緩慢,而且我很確定它可以優化:
for status in ["a", "b", "c"]:
df2 = df.filter(F.col("status") == status).groupBy(["event_id", "user_id"]).agg(F.max("created_at").alias(status))
df = (
df
.join(
df2,
on=(
(df["event_id"] == df2["event_id"]) &
(df["user_id"] == df2["user_id"]) &
(df["status"] == status)
),
how="left_outer"
)
.select(df["*"], status)
)
df2 = (
df
.drop("status", "created_at")
.groupBy(["event_id", "user_id"])
.agg(F.max("a").alias("a"), F.max("b").alias("b"), F.max("c").alias("c"))
)
# df2 has the result
我可以在這里避免回圈中的 JOIN,或者至少將 JOIN groupBy 和 max 減少到一步嗎?就像現在一樣,我只是按順序處理狀態,這根本無法擴展。
uj5u.com熱心網友回復:
嘗試這個,
df.groupBy("event_id","user_id").pivot("status").agg(first("created_at")).show
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/370293.html
標籤:Python 阿帕奇火花 火花 apache-spark-sql
上一篇:是否可以使用pyspark在rdd中將值拆分為2個分隔符?
下一篇:Spark-分解和合并列
