SparkSession/SparkContext/RDD 上是否有任何穩定的方法可以呼叫以輕松檢測何時發生驅逐?
有關更多背景關系,請參閱在記憶體不足時禁用驅逐快取磁區的新 Spark 行為或何時實作了自動 Spark RDD 磁區快取驅逐?
uj5u.com熱心網友回復:
您可以RddInfo從 SparkContext檢索陣列,并查詢其元素以獲取您感興趣的 RDD 的磁區計數。如果某些磁區被驅逐/不適合執行程式存盤,則該數量numCachedPartitions將小于 RDD 的磁區總數numPartitions。
scala> val rdd = sc.textFile("file:///etc/spark/conf/spark-defaults.conf").repartition(10)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at repartition at <console>:27
scala> rdd.persist().count()
res14: Long = 34
scala> val rddStorage = rdd.context.getRDDStorageInfo(0)
rddStorage: org.apache.spark.storage.RDDInfo = RDD "MapPartitionsRDD" (9) StorageLevel: StorageLevel(memory, deserialized, 1 replicas); CachedPartitions: 10; TotalPartitions: 10; MemorySize: 5.1 KB; DiskSize: 0.0 B
scala> val fullyCached = (rddStorage.numCachedPartitions == rddStorage.numPartitions)
fullyCached: Boolean = true
上面的零...getRDDStorageInfo(0)僅用于說明目的。實際上,0您需要獲取id您感興趣的 RDD 的,而不是簡單地使用,RDD.id然后遍歷RDDInfo[]陣列以查找帶有 的元素rddInfo.id = id。rddInfo.name如果你給 RDD 一個名字,你可能也可以用來做同樣的事情。
最后,您可以通過以下方式檢測是否有任何RDD 被驅逐:
sparkSession
.sparkContext.getRDDStorageInfo.filter(_.isCached)
.find(rdd => rdd.numCachedPartitions < rdd.numPartitions)
.foreach(rdd =>
throw new IllegalArgumentException(s"RDD is being evicted, please configure cluster with more memory. "
s"numCachedPartitions = ${rdd.numCachedPartitions}, "
s"numPartitions = ${rdd.numPartitions}, "
s"name = ${rdd.name}, "
s"id = ${rdd.id}, "
s"memSize = ${rdd.memSize}, "
s"diskSize = ${rdd.diskSize}, "
s"externalBlockStoreSize = ${rdd.externalBlockStoreSize}"
))
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/322625.html
