我正在嘗試建立一個時間線,我希望能夠檢測到時間線中斷。我有這個測驗 df:
| ID | 日期 |
|---|---|
| 1 | 2012-12-01 |
| 1 | 2012-12-02 |
| 1 | 2012-12-03 |
| 1 | 2012-12-05 |
| 1 | 2012-12-06 |
| 1 | 2012-12-07 |
| 1 | 2012-12-10 |
| 1 | 2012-12-11 |
我想得到一個像這樣開始結束日期的時間表:
| ID | 日期 | 結尾 |
|---|---|---|
| 1 | 2012-12-01 | 2012-12-03 |
| 1 | 2012-12-05 | 2012-12-07 |
| 1 | 2012-12-10 | 2012-12-11 |
我一直在嘗試:
columns = ['id','snapshot_date']
data = [
('1','2012-12-01'),
('1','2012-12-02'),
('1','2012-12-03'),
('1','2012-12-05'),
('1','2012-12-06'),
('1','2012-12-07'),
('1','2012-12-10'),
('1','2012-12-11')]
dftest = spark.createDataFrame(data).toDF(*columns)
w1 = Window.partitionBy('id').orderBy(F.col('date'))
df2 = (df1.withColumn("group_date", F.when( ~(F.date_add(F.col('snapshot_date'), -1) == F.lag(F.col("snapshot_date"), 1, 0).over(w1)), F.lit(1)).otherwise(F.lit(0))).filter(F.col('group_date')>1)
但不確定如何獲得正確的結束日期
uj5u.com熱心網友回復:
這是一個會話化的案例,你可以通過這篇文章了解更多關于使用 spark 進行會話化的資訊。
如果我們將上面參考的文章中的 window 解決方案適應您的具體情況,我們會得到以下代碼:
from pyspark.sql import functions as F
from pyspark.sql import Window
columns = ['id','snapshot_date']
data = [
('1','2012-12-01'),
('1','2012-12-02'),
('1','2012-12-03'),
('1','2012-12-05'),
('1','2012-12-06'),
('1','2012-12-07'),
('1','2012-12-10'),
('1','2012-12-11')]
dftest = spark.createDataFrame(data).toDF(*columns)
w1 = Window.partitionBy('id').orderBy('snapshot_date')
df2 = dftest \
.withColumn('session_change', F.when(F.datediff(F.col('snapshot_date'), F.lag('snapshot_date').over(w1)) > 1, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('session_id', F.sum('session_change').over(w1)) \
.groupBy('ID', 'session_id') \
.agg(F.min('snapshot_date').alias('date'), F.max('snapshot_date').alias('end')) \
.drop('session_id')
這將為我們提供以下內容df2:
--- ---------- ----------
|ID |date |end |
--- ---------- ----------
|1 |2012-12-01|2012-12-03|
|1 |2012-12-05|2012-12-07|
|1 |2012-12-10|2012-12-11|
--- ---------- ----------
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/378893.html
