我在 S3 中有 130 GB csv.gz 檔案,該檔案是使用從 redshift 到 S3 的并行卸載加載的。由于它包含多個檔案,我想減少檔案數量,以便我的 ML 模型(使用 sklearn)更容易閱讀。
我設法使用以下方法將多個從 S3 轉換為 spark 資料幀(稱為 spark_df):
spark_df1=spark.read.csv(path,header=False,schema=schema)
spark_df1 包含 100 列(特征),是我對數百萬客戶 ID 的時間序列推理資料。由于它是一個時間序列資料,我想確保“customerID”的資料點應該出現在同一個輸出檔案中,因為我會將每個磁區檔案作為一個塊讀取。我想將此資料卸載回 S3。我不介意較小的資料磁區,但每個磁區檔案應該具有單個客戶的整個時間序列資料。換句話說,一個客戶的資料不能在 2 個檔案中。
當前代碼:
datasink3=spark_df1.repartition(1).write.format("parquet").save(destination_path)
但是,這需要永遠運行,并且輸出是單個檔案,甚至沒有壓縮。我也嘗試使用“.coalesce(1)”而不是“.repartition(1)”,但在我的情況下它比較慢。
uj5u.com熱心網友回復:
您可以使用 customerID 對其進行磁區:
spark_df1.partitionBy("customerID") \
.write.format("parquet") \
.save(destination_path)
您可以在此處閱讀有關它的更多資訊:https ://sparkbyexamples.com/pyspark/pyspark-repartition-vs-partitionby/
uj5u.com熱心網友回復:
此代碼有效,運行時間減少到原始結果的 1/5。唯一需要注意的是,確保負載在節點之間平均分配(在我的情況下,我必須確保每個客戶 ID 具有相同的行數)
spark_df1.repartition("customerID").write.partitionBy("customerID").format("csv").option("compression","gzip").save(destination_path)
uj5u.com熱心網友回復:
添加到 manks answer后,您需要按 customerID 對 DataFrame 重新磁區,然后 write.partitionBy(customerID) 為每個客戶獲取一個檔案。你可以在這里看到一個類似的問題。
此外,關于您對 parquet 檔案未壓縮的評論,默認壓縮是snappy,與 gzip 壓縮相比有一些優點和缺點,但它仍然比未壓縮的要好得多。
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/476418.html
