我有兩個資料框,每個資料框都有一個日期列。
。
-----------
| DEADLINES|
-----------
| 2023-07-15|
| 2018-08-10|
| 2022-03-28|
| 2021-06-22|
| 2021-12-18|
| 2021-10-11|
| 2021-11-13|
-----------
----------
| DT_DATE|
----------
|2021-04-02|
|2021-04-21|
|2021-05-01|
|2021-06-03|
|2021-09-07|
|2021-10-12|
|2021-11-02||
----------
我需要計算在給定的參考日期和每個DEADLINES日期之間有多少個DT_DATE的日期。
例如:使用2021-03-31作為參考日期應該得到以下結果集。
----------- ------------
| DEADLINES| dt_count|
----------- ------------
| 2023-07-15| 7|
| 2018-08-10| 0|
| 2022-03-28| 7|
| 2021-06-22| 4|
| 2021-12-18| 7|
| 2021-10-11| 5|
| 2021-11-13| 7|
----------- ------------
我設法使其在死線資料框架的每一行中進行迭代,但對于較大的資料集,性能變得非常差。
有誰有更好的解決方案嗎?
編輯:這是我目前的解決方案:
def count_days(deadlines_df, dates_df, ref_date)。
for row in deadlines_df.collect():
qtt = dates_df.filter(dates_df.DT_DATE.between(ref_date, row.DEADLINES)).count()
yield row.deadlines, qtt
new_df = spark.createDataFrame(count_days(deadlines_df, dates_df, "2021-03-31"), ["DEADLINES", "dt_count"] )
uj5u.com熱心網友回復:
兩個資料框可以用不同的權重聯合起來,并使用Window函式,范圍從開始到當前行(Scala):
val deadlines = Seq(
("2023-07-15"),
("2018-08-10"),
("2022-03-28"),
("2021-06-22"),
("2021-12-18"),
("2021-10-11"),
("2021-11-13")
).toDF("DEADLINES")
val dates = Seq(
("2021-04-02"),
("2021-04-21"),
("2021-05-01"),
("2021-06-03"),
("2021-09-07"),
("2021-10-12"),
("2021-11-02")
).toDF("DT_DATE")
Val referenceDate = "2021-03-31""weight", lit(0)
.unionAll(
日期
.where($"DT_DATE"/span> >= referenceDate)
.withColumn("weight"/span>, lit(1)
)
val fromStartToCurrentRowWindow = Window.orderBy("DEADLINES").rangeBetween(Window.unboundedPreceding, Window.currentRow)
val result = united
.withColumn("dt_count", sum("weight") .over(fromStartToCurrentRowWindow)
.where($"weight" === lit(0)
.drop("weight")
輸出:
---------- --------
|DEADLINES |dt_count||
---------- --------
|2018-08-10|0 |
|2021-06-22|4 |
|2021-10-11|5 |
|2021-11-13|7 |
|2021-12-18|7 |
|2022-28|7 |
|2023-07-15|7 !
---------- --------
注意:計算將在一個磁區中執行,Spark顯示這樣的警告。 WARN Logging - No Partition Defined for Window operation! 將所有的資料移到一個磁區中,這可能會導致嚴重的性能下降。
還有其他可能的解決方案。
還有其他可能的解決方案,通過范圍連接兩個資料幀,這導致了cartesian join.
。uj5u.com熱心網友回復:
如果你有少量的截止日期,你可以:
如果你有少量的截止日期,你可以:
你可以
dates_df資料幀上按截止日期添加一列,當DT_DATE在ref_date和截止日期之間時,其值為1,否則為0讓我們來看看一步一步的步驟
按截止日期添加一列:from pyspark.sql import functions as F
deadline_rows = deadlines_df.collect()
dates_with_deadlines = dates_df
for row in deadline_rows。
dates_with_deadlines = dates_with_deadlines.withColumn(
str(row.DEADLINES),
F.when(
dates_df.DT_DATE.between(ref_date, row.DEADLINES), F.lit(1)
.否則(
F.lit(0)
)
)
通過你的例子,你可以得到以下dates_with_deadlines資料框架:
---------- ---------- ---------- ---------- ---------- ---------- ---------- ----------
|DT_DATE |2023-07-15|2018-08-10|2022-03-28|2021-06-22|2021-12-18|2021-10-11|2021-11-13|
---------- ---------- ---------- ---------- ---------- ---------- ---------- ----------
|2021-04-02|1|0|1 |1 |1 |1 |1 |1 !
|2021-04-21|1 |0 |1 |1 |1 |1 |1 |1 !
|2021-05-01|1|0|1 |1 |1 |1 |1 |1 !
|2021-06-03|1|0|1 |1 |1 |1 |1 |1 !
|2021-09-07|1 |0|1 |0 |1 |1 |1 |1 !
|2021-10-12|1 |0 |1 |0 |1 |0 |1 !
|2021-11-02|1 |0 |1 |0 |1 |0 |1 !
---------- ---------- ---------- ---------- ---------- ---------- ---------- ----------
最后期限
。aggregated_df = dates_with_deadlines.agg(*[F。 sum(str(x.DEADLINES)).alias(str(x.DEADLINES) for x in deadline_rows] )
在這一步之后,你會得到以下aggregated_df資料框架:
---------- ---------- ---------- ---------- ---------- ---------- ----------
|2023-07-15|2018-08-10|2022-03-28|2021-06-22|2021-12-18|2021-10-11|2021-11-13|
---------- ---------- ---------- ---------- ---------- ---------- ----------
|7 |0 |7 |4 |7 |5 |7 !
---------- ---------- ---------- ---------- ---------- ---------- ----------
Transpose dataframe
result_df = aggregated_df
.withColumn('merged', F.array(*[F.struct(F.lit(x.DEADLINES).alias('DEADLINES'), F. col(str(x.deadlines)).alias('dt_count') for x in deadline_rows])
.drop(*[str(x.DEADLINES) for x in deadline_rows] )
.withColumn('data'/span>, F.explode('merged'/span>)
.drop('merged')
.withColumn('DEADLINES', F.col('data.DEADLINES')
.withColumn('dt_count', F.col('data.dt_count')
.drop('data')
這樣你就有了預期的result_df資料框架:
---------- --------
|DEADLINES |dt_count|?
---------- --------
|2023-07-15|7 |
|2018-08-10|0 |
|2022-03-28|7 |
|2021-06-22|4 |
|2021-12-18|7 |
|2021-10-11|5 |
|2021-11-13|7 |
---------- --------
完整的代碼
。from pyspark.sql import functions as F
deadline_rows = deadlines_df.collect()
dates_with_deadlines = dates_df
for row in deadline_rows。
dates_with_deadlines = dates_with_deadlines.withColumn(
str(row.DEADLINES),
F.when(
dates_df.DT_DATE.between(ref_date, row.DEADLINES), F.lit(1)
.否則(
F.lit(0)
)
)
aggregated_df = dates_with_deadlines.agg(*[F.sum(str(x.DEADLINES)).alias(str(x.DEADLINES) for x in deadline_rows])
result_df = aggregated_df
.withColumn('merged', F.array(*[F.struct(F.lit(x.DEADLINES).alias('DEADLINES'), F. col(str(x.deadlines)).alias('dt_count') for x in deadline_rows])
.drop(*[str(x.DEADLINES) for x in deadline_rows] )
.withColumn('data'/span>, F.explode('merged'/span>)
.drop('merged')
.withColumn('DEADLINES', F.col('data.DEADLINES')
.withColumn('dt_count', F.col('data.dt_count')
.drop('data')
本解決方案的優點和局限性
在這個解決方案中,唯一不能使用分布式系統的步驟是轉置步驟。此外,與您當前的解決方案不同,我們以并行方式對每個截止日期列進行所有聚合,而不是按順序進行。
然而,該解決方案僅在截止日期較少(數百或數千個截止日期)的情況下才有效,首先是因為我們在 Spark 驅動程式中使用 .collect() 檢索了所有這些截止日期,其次是因為在第一步中我們為每個截止日期創建了一個列,創建了具有大量資料的行,最后是因為最后一步也僅在一個執行器上執行。
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/328160.html
標籤:
上一篇:SSD-Mobilenetv2300x300-Tensorflow異議檢測API
下一篇:如何有條件地呼叫路由器級中間件?
