我有一個資料框,其中包含某個人的 id 以及他執行某個操作的日期:
---- ----------
| id| date|
---- ----------
| 1|2022-09-01|
| 1|2022-10-01|
| 1|2022-11-01|
| 2|2022-07-01|
| 2|2022-10-01|
| 2|2022-11-01|
| 3|2022-09-01|
| 3|2022-10-01|
| 3|2022-11-01|
---- ----------
我需要確定此人在一段時間內(假設過去 3 個月)執行了某些操作。在一個具體的例子中,第 2 個人分別錯過了 08 和 09 月,條件不滿足。所以我希望得到以下結果:
---- ------------------------------------ ------
| id| dates|3month|
---- ------------------------------------ ------
| 1|[2022-09-01, 2022-10-01, 2022-11-01]| true|
| 2|[2022-07-01, 2022-10-01, 2022-11-01]| false|
| 3|[2022-09-01, 2022-10-01, 2022-11-01]| true|
---- ------------------------------------ ------
我知道我應該按人員 ID 分組并獲取與其對應的日期陣列。
data.groupBy(col("id")).agg(collect_list("date") as "dates").withColumn("3month", ???)
但是,我無法撰寫一個檢查是否符合要求的函式。我有一個使用遞回的選項,但由于性能低,它不適合我(可能有超過一千個日期)。如果有人可以幫助我解決我的問題,我將不勝感激。
uj5u.com熱心網友回復:
一個簡單的技巧是在聚合中使用 aset而不是 a list,以便具有不同的值,然后檢查該集合的大小。以下是一些可能的解決方案:
解決方案 1
假設您有一個要檢查的感興趣月份的串列,您可以對所需月份執行初步篩選,然后進行匯總和驗證。
import org.apache.spark.sql.{functions => F}
import java.time.{LocalDate, Duration}
val requiredMonths = Seq(
LocalDate.parse("2022-09-01"),
LocalDate.parse("2022-10-01"),
LocalDate.parse("2022-11-01")
);
df
.filter(F.date_trunc("month", $"date").isInCollection(requiredMonths))
.groupBy($"id")
.agg(F.collect_set(F.date_trunc("month", $"date")) as "months")
.withColumn("is_valid", F.size($"months") === requiredMonths.size)
date_trunc用于將date列截斷為月份。
解決方案 2
與前一個類似,帶有初步過濾器,但這里假設您有幾個月的范圍
import java.time.temporal.ChronoUnit
val firstMonth = LocalDate.parse("2022-09-01");
val lastMonth = LocalDate.parse("2022-11-01");
val requiredNumberOfMonths = ChronoUnit.MONTHS.between(firstMonth, lastMonth) 1;
df
.withColumn("month", F.date_trunc("month", $"date"))
.filter($"month" >= firstMonth && $"month" <= lastMonth)
.groupBy($"id")
.agg(F.collect_set($"month") as "months")
.withColumn("is_valid", F.size($"months") === requiredNumberOfMonths)
解決方案 3
解決方案 1和2都有一個問題,導致從最終結果中完全排除id與感興趣的日期沒有交集的 s。這是由分組前應用的過濾器引起的。
這是基于解決方案2的解決方案,它不過濾并解決了這個問題。
df
.withColumn("month", F.date_trunc("month", $"date"))
.groupBy($"id")
.agg(F.collect_set(F.when($"month" >= firstMonth && $"month" <= lastMonth, $"month")) as "months")
.withColumn("is_valid", F.size($"months") === requiredNumberOfMonths)
現在過濾器使用條件執行collect_set。
考慮解決方案 1和2也是正確的,因為初步過濾器可能具有優勢,并且在某些情況下可能是預期的結果。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qianduan/530962.html
