我想知道使用 x 次的 Window 是否會對資料執行 x 次洗牌。
例子 :
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window.partitionBy('col_a').orderBy('date')
df = df.withColumn('new_col_1', F.lag('col_b').over(w))
df = df.withColumn('new_col_2', F.row_number().over(w))
這段代碼會因為有 1 個 Window 而對資料執行 1 次 shuffle 嗎?還是因為 Window 被使用了兩次而對資料進行了 2 次洗牌?
如果答案是 2 shuffle,col_a 的重新磁區是否會像下面的代碼示例一樣將 shuffle 的數量減少到 1?
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window.partitionBy(col_a).orderBy('date')
df = df.repartition('col_a')
df = df.withColumn('new_col_1', F.lag('col_b').over(w))
df = df.withColumn('new_col_2', F.row_number().over(w))
uj5u.com熱心網友回復:
如果我們使用 顯示 spark 將如何計算此資料幀explain,我們將得到以下執行計劃:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window.partitionBy('col_a').orderBy('date')
df = df.withColumn('new_col_1', F.lag('col_b').over(w))
df = df.withColumn('new_col_2', F.row_number().over(w))
df.explain()
# == Physical Plan ==
# Window [lag(col_b#2, -1, null) windowspecdefinition(col_a#1L, date#0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS new_col_1#19, row_number() windowspecdefinition(col_a#1L, date#0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS new_col_2#25], [col_a#1L], [date#0 ASC NULLS FIRST]
# - *(2) Sort [col_a#1L ASC NULLS FIRST, date#0 ASC NULLS FIRST], false, 0
# - Exchange hashpartitioning(col_a#1L, 200), ENSURE_REQUIREMENTS, [id=#23]
# - *(1) Scan ExistingRDD[date#0,col_a#1L,col_b#2]
如您所見,只有一個Exchange(即一次洗牌)步驟。因此,如果您重用視窗來計算幾列,如果這些計算之間沒有混洗,則只有一次混洗。而且,只有一個Window步驟,這意味著使用 window 的兩列實際上是在同一步驟中計算的,而不是一個接一個。
其他案例
如果我們col_a在計算列視窗之前重新磁區,執行計劃與沒有重新磁區的執行計劃相同:
w = Window.partitionBy('col_a').orderBy('date')
df = df.repartition('col_a')
df = df.withColumn('new_col_1', F.lag('col_b').over(w))
df = df.withColumn('new_col_2', F.row_number().over(w))
df.explain()
# == Physical Plan ==
# Window [lag(col_b#2, -1, null) windowspecdefinition(col_a#1L, date#0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS new_col_1#19, row_number() windowspecdefinition(col_a#1L, date#0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS new_col_2#25], [col_a#1L], [date#0 ASC NULLS FIRST]
# - *(2) Sort [col_a#1L ASC NULLS FIRST, date#0 ASC NULLS FIRST], false, 0
# - Exchange hashpartitioning(col_a#1L, 200), REPARTITION, [id=#26]
# - *(1) Scan ExistingRDD[date#0,col_a#1L,col_b#2]
如果我們col_a在使用 window 的兩列計算之間重新磁區,則兩列不再在同一步驟中計算:
w = Window.partitionBy('col_a').orderBy('date')
df = df.withColumn('new_col_1', F.lag('col_b').over(w))
df = df.repartition('col_a')
df = df.withColumn('new_col_2', F.row_number().over(w))
df.explain()
# == Physical Plan ==
# Window [row_number() windowspecdefinition(col_a#1L, date#0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS new_col_2#25], [col_a#1L], [date#0 ASC NULLS FIRST]
# - Window [lag(col_b#2, -1, null) windowspecdefinition(col_a#1L, date#0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS new_col_1#19], [col_a#1L], [date#0 ASC NULLS FIRST]
# - *(2) Sort [col_a#1L ASC NULLS FIRST, date#0 ASC NULLS FIRST], false, 0
# - Exchange hashpartitioning(col_a#1L, 200), ENSURE_REQUIREMENTS, [id=#33]
# - *(1) Scan ExistingRDD[date#0,col_a#1L,col_b#2]
如果我們col_b在兩個視窗列計算之間重新劃分,我們會得到 3 次洗牌。因此,僅當在視窗列計算之間沒有使用其他列進行重新磁區/洗牌時,才使用相同的視窗觸發一次洗牌:
w = Window.partitionBy('col_a').orderBy('date')
df = df.withColumn('new_col_1', F.lag('col_b').over(w))
df = df.repartition('col_b')
df = df.withColumn('new_col_2', F.row_number().over(w))
df.explain()
# == Physical Plan ==
# Window [row_number() windowspecdefinition(col_a#1L, date#0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS new_col_2#25], [col_a#1L], [date#0 ASC NULLS FIRST]
# - *(3) Sort [col_a#1L ASC NULLS FIRST, date#0 ASC NULLS FIRST], false, 0
# - Exchange hashpartitioning(col_a#1L, 200), ENSURE_REQUIREMENTS, [id=#42]
# - Exchange hashpartitioning(col_b#2, 200), REPARTITION, [id=#41]
# - Window [lag(col_b#2, -1, null) windowspecdefinition(col_a#1L, date#0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS new_col_1#19], [col_a#1L], [date#0 ASC NULLS FIRST]
# - *(2) Sort [col_a#1L ASC NULLS FIRST, date#0 ASC NULLS FIRST], false, 0
# - Exchange hashpartitioning(col_a#1L, 200), ENSURE_REQUIREMENTS, [id=#36]
# - *(1) Scan ExistingRDD[date#0,col_a#1L,col_b#2]
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/378892.html
