語境
我正在嘗試使用 Spark/Scala 來有效地“編輯”多個鑲木地板檔案(可能超過 50k)。唯一需要進行的編輯是根據給定的行 ID 集進行洗掉(即洗掉記錄/行)。
Parquet 檔案作為磁區 DataFrame 存盤在 s3 中,其中示例磁區如下所示:
s3://mybucket/transformed/year=2021/month=11/day=02/*.snappy.parquet
每個磁區可以有 100 個以上的鑲木地板檔案,每個檔案的大小在 50mb 到 500mb 之間。
輸入
我們得到了一個Dataset[MyClass]名為spark 的filesToModify有 2 列的資料:
s3path: String= s3 中需要編輯的鑲木地板檔案的完整 s3 路徑ids: Set[String]= 位于以下位置的鑲木地板檔案中需要洗掉的一組 ID(行)s3path
示例輸入資料集filesToModify:
| s3path | 身份證 |
|---|---|
| s3://mybucket/transformed/year=2021/month=11/day=02/part-1.snappy.parquet | 設定(“a”,“b”) |
| s3://mybucket/transformed/year=2021/month=11/day=02/part-2.snappy.parquet | 設定(“b”) |
預期行為
鑒于filesToModify我想利用 Spark 中的并行性,對每個執行以下操作row:
- 加載位于
row.s3path - 過濾,以便我們排除
id在集合中的任何行row.ids - 計算每個 id 中洗掉/排除的行數
row.ids(可選) - 將過濾后的資料保存回相同
row.s3path以覆寫檔案 - 回傳洗掉的行數(可選)
我試過的
我試過使用filesToModify.map(row => deleteIDs(row.s3path, row.ids))where deleteIDsis 看起來像這樣:
def deleteIDs(s3path: String, ids: Set[String]): Int = {
import spark.implicits._
val data = spark
.read
.parquet(s3path)
.as[DataModel]
val clean = data
.filter(not(col("id").isInCollection(ids)))
// write to a temp directory and then upload to s3 with same
// prefix as original file to overwrite it
writeToSingleFile(clean, s3path)
1 // dummy output for simplicity (otherwise it should correspond to the number of deleted rows)
}
然而這會導致NullPointerException在map操作中執行時。如果我在map塊外單獨執行它,那么它可以作業,但我不明白為什么它不在它內部(與懶惰評估有關?)。
uj5u.com熱心網友回復:
s3path和ids傳遞給的引數deleteIDs實際上分別不是字串和集合。它們是列。
為了對這些值進行操作,您可以創建一個接受列而不是內部型別的 UDF,或者您可以收集足夠小的資料集,以便您可以deleteIDs直接使用函式中的值。如果您想利用 Spark 的并行性,前者可能是您最好的選擇。
您可以在此處閱讀有關 UDF 的資訊
uj5u.com熱心網友回復:
你得到一個是NullPointerException因為你試圖從一個執行者那里檢索你的 spark 會話。
這不是明確的,但要執行 spark 操作,您的DeleteIDs函式需要檢索活動的 spark 會話。為此,它getActiveSession從SparkSession物件呼叫方法。但是當從執行程式呼叫時,此getActiveSession方法回傳None如SparkSession 的源代碼中所述:
回傳由構建器回傳的默認 SparkSession。
注意:在執行器上呼叫此函式時回傳 None
因此NullPointerException,當您的代碼開始使用此Nonespark 會話時會拋出。
更一般地,您不能重新創建資料集并在另一個資料集的轉換中使用 spark 轉換/操作。
所以我看到了您的問題的兩種解決方案:
- 要么在
DeleteIDs不使用 spark 的情況下重寫函式的代碼,要么使用parquet4s修改您的鑲木地板檔案。 - 或轉換
filesToModify為 Scala 集合并使用 Scala 的map而不是 Spark 的集合。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/366265.html
