我有一個名為 的 spark 資料框df,它在列上進行了磁區date。我需要使用 CSV 格式將此資料幀保存在 S3 上。當我撰寫資料幀時,我需要洗掉 S3 上資料幀有資料要寫入的磁區(即日期)。所有其他磁區都需要保持完整。
我在這里看到這正是選項spark.sql.sources.partitionoverwritemode設定為dynamic.
但是,它似乎不適用于 CSV 檔案。
如果我使用以下命令在鑲木地板上使用它,它會完美運行:
df.write
.option("partitionOverwriteMode", "dynamic")
.partitionBy("date")
.format("parquet")
.mode("overwrite")
.save(output_dir)
但是,如果我使用以下命令在 CSV 上使用它,它就不起作用:
df.write
.option("partitionOverwriteMode", "dynamic")
.partitionBy("date")
.format("csv")
.mode("overwrite")
.save(output_dir)
為什么會這樣?知道如何使用 CSV 輸出實作這種行為嗎?
uj5u.com熱心網友回復:
我需要洗掉 S3 上資料框有資料要寫入的磁區(即日期)
假設您有一個方便的日期串列,您可以使用該replaceWhere選項來確定要覆寫的磁區(洗掉和替換)。
例如:
df.write
.partitionBy("date")
.option("replaceWhere", "date >= '2020-12-14' AND date <= '2020-12-15'")
.format("csv")
.mode("overwrite")
.save(output_dir)
一種更動態的方法是,如果您將start_dateandend_date存盤在變數中:
start_date = "2022-01-01"
end_date = "2022-01-14"
condition = f"date >= '{start_date}' AND date <= '{end_date}'"
df.write
.partitionBy("date")
.option("replaceWhere", condition)
.format("csv")
.mode("overwrite")
.save(output_dir)
uj5u.com熱心網友回復:
你使用什么 Spark 版本?對于 Spark <2.0.0,似乎不可能將磁區與 csv 格式一起使用
uj5u.com熱心網友回復:
如果您不在 EMR 上,并且正在使用 s3a 提交者將作業安全地提交到 s3,則可以將磁區提交者設定為在提交新作業之前洗掉目標磁區中的所有資料,而保留所有其他磁區。
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/461482.html
上一篇:SQLserver:使用LIKE關鍵字在CSV欄位中查找完全匹配
下一篇:用python清理csv檔案
