我有一個大資料集,我想找到有n個最高值的行。
id, count
id1, 10
id2, 15
id3,5
...
我唯一能想到的方法是使用row_number而不使用磁區,比如
val window = Window.orderBy(desc("count")
df.withColumn("row_number", row_number over window).filter(col("row_number") <= n)
但是當資料包含數百萬或數十億行時,這根本無法執行,因為它將資料推送到一個磁區,我得到了OOM。
有誰想出了一個高性能的解決方案嗎?
uj5u.com熱心網友回復:
<醇>uj5u.com熱心網友回復:
我認為有兩種方法可以提高你的演算法性能。首先是使用sort和limit來檢索最上面的n行。第二種是開發你的自定義Aggregator。
排序和限制方法
你對你的資料框架進行排序,然后你取前n行:
val n。Int = ??
import org.apache.spark.function.sql.desc
df.orderBy(desc("count")).limit(n)
Spark優化了這種轉換序列,首先在每個磁區上執行排序,在每個磁區上取前n行,在最后一個磁區上檢索,重新執行排序并取最后的前n行。你可以通過對轉換執行explain()來檢查這一點。你會得到以下的執行計劃:
== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[count#8 DESC NULLS LAST] 。output=[id#7, count#8])
- LocalTableScan [id#7, count#8]
而通過查看Spark的源代碼中limit.scala中如何執行TakeOrderedAndProjectExec步驟(案例類TakeOrderedAndProjectExec,方法doExecute)。
自定義聚合器方法
對于自定義聚合器,你創建了一個Aggregator,它將填充和更新一個頂部n行的有序陣列。
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator。
import org.apache.spark.sql.Encoder。
import scala.collection.mutable.ArrayBuffer
case class Record(id。String, count: Int)。
case class TopRecords(limit。Int) extends Aggregator[Record, ArrayBuffer[Record], Seq[Record] ] {
def zero。ArrayBuffer[Record] = ArrayBuffer.empty[Record] 。
def reduce(topRecords。ArrayBuffer[Record], currentRecord: Record)。) ArrayBuffer[Record] = {
val insertIndex = topRecords.lastIndexWhere(p => p.count > currentRecord.count)
if (topRecords.length < limit) {
topRecords.insert(insertIndex 1, currentRecord)
} else if (insertIndex < limit - 1) {
topRecords.insert(insertIndex 1, currentRecord)
topRecords.remove(topRecords.length - 1)
}
topRecords
}
def merge( topRecords1: ArrayBuffer[Record], topRecords2: ArrayBuffer[Record])。) ArrayBuffer[Record] = {
val merged = ArrayBuffer.empty[Record]
while (merged.length < limit && (topRecords1.nonEmpty || topRecords2.nonEmpty) ) {
if (topRecords1.isEmpty) {
merged.append(topRecords2.remove(0)
} else if (topRecords2.isEmpty) {
merged.append(topRecords1.remove(0)
} else if (topRecords2.head.count < topRecords1.head.count) {
merged.append(topRecords1.remove(0)
} else {
merged.append(topRecords2.remove(0)
}
}
合并的
}
def finish(reduction: ArrayBuffer[Record])。) Seq[Record] = reduction
def bufferEncoder。Encoder[ArrayBuffer[Record]] = ExpressionEncoder[ArrayBuffer[Record]] ]
def outputEncoder。Encoder[Seq[Record] ] = ExpressionEncoder[Seq[Record]] ]
然后你在你的資料框架上應用這個聚合器,并對聚合結果進行扁平化處理:
val n。Int = ??
import sparkSession.implicits._
df.as[Record].select(TopRecords(n).toColumn).flatMap(record => record
方法比較
為了比較這兩種方法,假設我們想從一個分布在p磁區上的資料框架中獲取最上面的n行,每個磁區有大約k條記錄。所以資料框架的大小為p-k。這就得到了以下的復雜性(可能會有錯誤):
方法
總的運算元 記憶體消耗
(在執行器上)
記憶體消耗 。
(在最終執行者上)
O(p-k-log(p-k))O(p-k)O(p-k)O(p-k-log(k) p-n-log(p-n))O(k)O(p-n)O(p-n)
自定義聚合器 自定義聚合器
O(p-k) O(p-k)
O(k) O(n)
O(p-n) O(p-n)
因此關于操作的數量,自定義聚合器是性能最好的。然而,這個方法是迄今為止最復雜的,并且意味著大量的序列化/反序列化,所以在某些情況下,它的性能可能不如排序和限制。
結論
你有兩種方法可以有效地獲取頂部n行,排序和限制和自定義聚合器。要選擇使用哪一個,你應該用你的真實資料框架對這兩種方法進行基準測驗。如果經過基準測驗,Sort and Limit比Custom Aggregator慢一點,我會選擇Sort and Limit,因為其代碼更容易維護。
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/326143.html
標籤:
上一篇:減去兩個選項[Double]
下一篇:下面的鑿子宣告是如何解碼的?
