假設我有一個將磁區加載到的 spark 應用程式dataframe:
users/sales/2021/10/20
我的集群資源配置為能夠讀取此磁區中的 X 條記錄。但我不控制記錄的數量。
有沒有辦法對磁區內的記錄進行限制/分頁?
例如,如果突然我在磁區中有 2X 資料而我的應用程式預計一次處理 1X,我將讀取 1X 然后再讀取一個 1X
uj5u.com熱心網友回復:
這個問題實際上就是為什么性能調優是一門藝術而不是一門科學。藝術是指被創造出來的杰作。這是一個時間點的藝術作品,對于藝術家當前的背景來說是獨一無二的。
您可以請求資料提供磁區/檔案大小,以便希望它增加磁區數量而不是磁區本身。這并不總是可能的,但你可以問。(如果您使用的是 HDFS,那么磁區大小與 HDFS 塊大小匹配是理想的。)這肯定會有所幫助,但最好解決問題,然后進行藝術/性能調整。我不會為尚未發生的問題預先調整。(我不是說你在做這件事,而是鼓勵你越過那座橋,如果它之前沒有發生過。)
uj5u.com熱心網友回復:
如果你玩maxPartitionBytesSpark的引數會更好。它的默認值是 128 MB,即 spark 將嘗試創建每個 128 MB 的磁區。如果您開始在源磁區中獲取 2X 資料,您應該調整此引數以創建更多磁區,并且基于可用內核 spark 將在后續迭代中處理這些磁區。
下面的代碼可以幫助自動調整這個引數。它將嘗試在磁區和核心方面優化利用集群資源。
def autoTuneMaxPartitionBytes(format:String, path:String, schema:String, maxSteps:Int, startingBytes:Long=134217728):Long = {
var cores = sc.defaultParallelism
var maxPartitionBytes:Long = startingBytes
val originalMaxPartitionBytes = spark.conf.get("spark.sql.files.maxPartitionBytes")
for (step <- 0 to maxSteps) {
maxPartitionBytes = maxPartitionBytes (step * 1024 * 1024)
val maxPartitionMB = maxPartitionBytes / 1024 / 1024
spark.conf.set("spark.sql.files.maxPartitionBytes", f"${maxPartitionBytes}b")
val partitions = spark.read.format(format).schema(schema).load(path).rdd.getNumPartitions
if (partitions % cores == 0) {
println("*** Found it! ***")
println(f"$maxPartitionMB%,d MB with $partitions%,d partitions, iterations: ${partitions/cores.toDouble}")
return maxPartitionBytes
} else {
println(f"$maxPartitionMB%,d MB with $partitions%,d partitions, iterations: ${partitions/cores.toDouble}")
}
}
spark.conf.set("spark.sql.files.maxPartitionBytes", originalMaxPartitionBytes)
throw new IllegalArgumentException("An appropriate maxPartitionBytes was not found")
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/334504.html
