我想根據鍵標記開始時間和結束時間重疊的行。例如,如果給定一個資料框,例如:
--- ------------------- -------------------
|key|start_date |end_date |
--- ------------------- -------------------
|A |2022-01-11 00:00:00|8888-12-31 00:00:00|
|B |2020-01-01 00:00:00|2022-02-10 00:00:00|
|B |2019-02-08 00:00:00|2020-02-15 00:00:00|
|B |2022-02-16 00:00:00|2022-12-15 00:00:00|
|C |2018-01-01 00:00:00|2122-02-10 00:00:00|
--- ------------------- -------------------
生成的資料幀將標記第一條和第二條 B 記錄,因為它們的開始時間和結束時間重疊。像這樣:
--- ------------------- ------------------- -----
|key|start_date |end_date |valid|
--- ------------------- ------------------- -----
|A |2022-01-11 00:00:00|8888-12-31 00:00:00|true |
|B |2020-01-01 00:00:00|2022-02-10 00:00:00|false|
|B |2019-02-08 00:00:00|2020-02-15 00:00:00|false|
|B |2022-02-16 00:00:00|2022-12-15 00:00:00|true |
|C |2018-01-01 00:00:00|2122-02-10 00:00:00|true |
--- ------------------- ------------------- -----
uj5u.com熱心網友回復:
在這里,我添加了腳本來組合重疊的日期范圍。在您的情況下,我稍微修改了最后一個腳本 - 而不是groupBy重疊范圍的最終腳本,我添加了一個視窗函式,它只是標記它們。
測驗輸入:
from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[('A', '2022-01-11 00:00:00', '8888-12-31 00:00:00'),
('B', '2020-01-01 00:00:00', '2022-02-10 00:00:00'),
('B', '2019-02-08 00:00:00', '2020-02-15 00:00:00'),
('B', '2022-02-16 00:00:00', '2022-12-15 00:00:00'),
('C', '2018-01-01 00:00:00', '2122-02-10 00:00:00')],
['key', 'start_date', 'end_date'])
腳本:
w1 = W.partitionBy("key").orderBy("start_date")
w2 = W.partitionBy("key", "contiguous_grp")
max_end = F.max("end_date").over(w1)
contiguous = F.when(F.datediff(F.lag(max_end).over(w1), "start_date") < 0, 1).otherwise(0)
df = (df
.withColumn("contiguous_grp", F.sum(contiguous).over(w1))
.withColumn("valid", (F.count(F.lit(1)).over(w2)) == 1)
.drop("contiguous_grp")
)
df.show()
# --- ------------------- ------------------- -----
# |key| start_date| end_date|valid|
# --- ------------------- ------------------- -----
# | A|2022-01-11 00:00:00|8888-12-31 00:00:00| true|
# | B|2019-02-08 00:00:00|2020-02-15 00:00:00|false|
# | B|2020-01-01 00:00:00|2022-02-10 00:00:00|false|
# | B|2022-02-16 00:00:00|2022-12-15 00:00:00| true|
# | C|2018-01-01 00:00:00|2122-02-10 00:00:00| true|
# --- ------------------- ------------------- -----
uj5u.com熱心網友回復:
df = (df.select('key',*[to_date(x).alias(x) for x in df.columns if x!='key'])#Coerce to dates
.withColumn('valid', collect_list(array(col('start_date'), col('end_date'))).over(Window.partitionBy('key')))#Create list of start_end dates intervals
.withColumn('valid',expr("array_contains(transform(valid,(x,i)->start_date<(x[0])),true)"))#check if the start date occurs before end date, if true flag
)
--- ---------- ---------- -----
|key|start_date|end_date |valid|
--- ---------- ---------- -----
|A |2022-01-11|8888-12-31|false|
|B |2020-01-01|2022-02-10|true |
|B |2019-02-08|2020-02-15|true |
|B |2022-02-16|2022-12-15|false|
|C |2018-01-01|2122-02-10|false|
--- ---------- ---------- -----
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/526116.html
上一篇:更改來自列的值的固定值會導致錯誤:多載方法值-使用替代方法
下一篇:ApacheSpark-執行緒“主”java.lang.NoClassDefFoundError中的例外:org/apache/hadoop/fs/FSDataInputStream
