我有一個 ~10GB 的資料幀,應該寫成一堆 CSV 檔案,每個磁區一個。
CSV 應按 3 個欄位進行磁區:“system”、“date_month”和“customer”。
在每個檔案夾中,應寫入一個 CSV 檔案,并且 CSV 檔案中的資料應按另外兩個欄位排序:“date_day”和“date_hour”。
檔案系統(一個 S3 存盤桶)應如下所示:
/system=foo/date_month=2022-04/customer=CU000001/part-00000-x.c000.csv
/system=foo/date_month=2022-04/customer=CU000002/part-00000-x.c000.csv
/system=foo/date_month=2022-04/customer=CU000003/part-00000-x.c000.csv
/system=foo/date_month=2022-04/customer=CU000004/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000002/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000003/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000004/part-00000-x.c000.csv
我知道我可以很容易地實作這一點,coalesce(1)但這只會使用一名工人,我想避免這種情況。
我試過這個策略
mydataframe.
repartition($"system", $"date_month", $"customer").
sort("date_day", "date_hour").
write.
partitionBy("system", "date_month", "customer").
option("header", "false").
option("sep", "\t").
format("csv").
save(s"s3://bucket/spool/")
我的想法是每個作業人員都會得到一個不同的磁區,因此它可以輕松地對資料進行排序并在磁區路徑中寫入一個檔案。運行代碼后,我注意到每個磁區都有很多 CSV,如下所示:
/system=foo/date_month=2022-05/customer=CU000001/part-00000-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00001-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00002-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00003-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00004-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00005-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00006-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00007-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv
[...]
每個檔案中的資料按預期排序,所有檔案的串聯將創建正確的檔案,但這需要太多時間,我更愿意依賴 Spark。
有沒有辦法為每個磁區創建一個有序的 CSV 檔案,而無需將所有資料移動到單個作業人員coalesce(1)?
我正在使用 scala,如果這很重要。
uj5u.com熱心網友回復:
sort()(并且orderBy())觸發洗牌,因為它對整個資料幀進行排序,要在磁區內排序,您應該使用恰當命名的sortWithinPartitions.
mydataframe.
repartition($"system", $"date_month", $"customer").
sortWithinPartitions("date_day", "date_hour").
write.
partitionBy("system", "date_month", "customer").
option("header", "false").
option("sep", "\t").
format("csv").
save(s"s3://bucket/spool/")
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/483635.html
標籤:阿帕奇火花
