假設我有兩個具有相同磁區的不同視窗:
window1 = Window.partitionBy("id")
window2 = Window.partitionBy("id").orderBy("date")
然后我使用它們呼叫幾個連續的視窗函式:
df.withColumn("col1", F.sum("x").over(window1))
.withColumn("col2", F.first("x").over(window2))
并且假設df不被id.
計算col2會導致另一個 shuffle 還是會重用相同的磁區?
是否添加
df.repartition("id")
在計算導致任何性能改進之前?
uj5u.com熱心網友回復:
TLDR:只發生一次洗牌,repartition在這里沒用。
這實際上很容易驗證。
// sample data
df = spark.createDataFrame([
(1, 2, "2022-10-22"),
(1, 3, "2022-11-22"),
(2, 4, "2023-12-12"),
(2, 5, "2021-01-01")], ['id', 'x', 'date'])
# let us now introduce 3 windows and see what happens:
window1 = Window.partitionBy("id")
window2 = Window.partitionBy("id").orderBy("date")
window3 = Window.partitionBy("x").orderBy("date")
現在,讓我們explain在您的代碼上使用:
df.withColumn("col1", f.sum("x").over(window1))\
.withColumn("col2", f.first("x").over(window2))\
.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
- Window [first(x#169L, false) windowspecdefinition(id#168L, date#170 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col2#233L], [id#168L], [date#170 ASC NULLS FIRST]
- Sort [id#168L ASC NULLS FIRST, date#170 ASC NULLS FIRST], false, 0
- Window [sum(x#169L) windowspecdefinition(id#168L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS col1#227L], [id#168L]
- Sort [id#168L ASC NULLS FIRST], false, 0
- Exchange hashpartitioning(id#168L, 200), ENSURE_REQUIREMENTS, [id=#164]
- Scan ExistingRDD[id#168L,x#169L,date#170
如您所見,只有一個 Exchange (=shuffle)。添加repartition會產生完全相同的執行計劃。完全沒有變化:
df.repartition("id")\
.withColumn("col1", f.sum("x").over(window1))\
.withColumn("col2", f.first("x").over(window2))\
.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
- Window [first(x#169L, false) windowspecdefinition(id#168L, date#170 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col2#246L], [id#168L], [date#170 ASC NULLS FIRST]
- Sort [id#168L ASC NULLS FIRST, date#170 ASC NULLS FIRST], false, 0
- Window [sum(x#169L) windowspecdefinition(id#168L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS col1#240L], [id#168L]
- Sort [id#168L ASC NULLS FIRST], false, 0
- Exchange hashpartitioning(id#168L, 200), REPARTITION_BY_COL, [id=#182]
- Scan ExistingRDD[id#168L,x#169L,date#170]
這里的要點是,無論它采用何種方式,Spark 都會記住它已經對資料進行了磁區,以及它是如何做到的,以避免再次這樣做。
最后,請注意,window3需要不同的磁區,我們有兩個 shuffle:
df.withColumn("col1", f.sum("x").over(window1))\
.withColumn("col2", f.first("id").over(window3))\
.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
- Window [first(id#168L, false) windowspecdefinition(x#169L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS col2#259L], [x#169L]
- Sort [x#169L ASC NULLS FIRST], false, 0
- Exchange hashpartitioning(x#169L, 200), ENSURE_REQUIREMENTS, [id=#207]
- Window [sum(x#169L) windowspecdefinition(id#168L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS col1#253L], [id#168L]
- Sort [id#168L ASC NULLS FIRST], false, 0
- Exchange hashpartitioning(id#168L, 200), ENSURE_REQUIREMENTS, [id=#203]
- Scan ExistingRDD[id#168L,x#169L,date#170
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/519597.html
標籤:Python阿帕奇火花apache-spark-sql
上一篇:我可以通過AzureDatabricks將訊息作為批處理作業發送到KAFKA集群嗎(一旦我發送的訊息用完就關閉我的連接)?
