我曾經使用df.repartition(1200).write.parquet(...)which 創建了 repartion 引數中指定的 1200 個檔案。我現在正在使用paritionBy,即df.repartition(1200).write.partitionBy("mykey").parquet(...)。這作業正常,除了它現在為每個mykey. 我想總共有 1200 個檔案。
其他帖子建議跨某些鍵重新磁區。我的 spark 版本 ( )的相關檔案2.4.0似乎表明此功能是稍后添加的。有沒有其他方法可以實作它?我想我可以重新磁區到1200/len(unique("mykey"). 但這有點hacky。有沒有更好的方法來做到這一點?我還擔心減少磁區數量會導致記憶體不足錯誤。
uj5u.com熱心網友回復:
在撰寫器上呼叫 partitionBy 不會更改資料幀的磁區方案。相反,它用于指定資料寫入磁盤后的磁區方案
假設您有一個包含 200 個 parititons 的資料框,然后您呼叫
df.write.partitionBy("mykey").parquet(...)您的每個磁區都將按“mykey”的唯一值存盤其資料
每個磁區中的每個bucket對應一個寫入磁盤磁區的檔案
例如,假設資料幀具有欄位 mykey=KEY1 的 200 個值
假設這 200 個值中的每一個都均勻分布在 200 個磁區中,每個磁區 1 個
然后當我們打電話
df.write.partitionBy("mykey").parquet(...)我們將在磁盤磁區 mykey=KEY1 中獲得 200 個檔案。每個磁區一個
要回答您的問題,有幾種方法可以確保將 1200 個檔案寫入磁盤。所有方法都需要精確控制 parititons 中唯一值的數量
方法一
# requires mykey to have exactly 1200 unique values
df = df.repartition("mykey")
df.write.partitionBy("mykey").parquet(...)
- 重新磁區資料,以便資料幀磁區匹配磁盤磁區
- 重新磁區是一項昂貴的操作,因此應謹慎使用
方法二
# requires mykey to have exactly 1200 unique values
df = df.coalesce(1)
df.write.partitionBy("mykey").parquet(...)
- 這僅在您要寫入的最終資料集小到足以放入單個磁區時才有效。
方法三
# requires mykey to have exactly 1 unique value
df = df.repartition(1200)
df.write.partitionBy("mykey").parquet(...)
uj5u.com熱心網友回復:
我不太確定你為什么要同時做repartitionand partitionBy,但你可以做
df = df.repartition(1200)
df = your_processing(df)
df.coalesce(1).write.partitionBy("mykey").parquet(...)
coalesce(1)將磁區合并為一個磁區,然后由partitionBy.
uj5u.com熱心網友回復:
對我來說,處理它的最佳方法似乎是按mykey. 這樣,正確的資料已經在各自的磁區中,這樣partitionBy('mykey')就不會創建太多的磁區(但大致與 num 個磁區一樣多的檔案)。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/374918.html
