我正在使用子查詢運行下面的 spark SQL。
val df = spark.sql("""select * from employeesTableTempview where dep_id in (select dep_id from departmentTableTempview)""")
df.count()
我也在資料框功能方式的幫助下運行相同的方法,如下所示,假設我們將員工表和部門表作為資料框讀取,它們的名稱應分別為 empDF 和 DepDF,
val depidList = DepDF.map(x=>x(0).string).collect().toList()
val empdf2 = empDF.filter(col("dep_id").isin(depidList:_*))
empdf2.count
在以上兩種情況下,哪一種表現更好,為什么?請幫助我理解 spark scala 中的這種情況。
uj5u.com熱心網友回復:
我可以給你經典的答案:這取決于 :D
讓我們來看看第一個案例。我準備了類似的例子:
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
val data = Seq(("test", "3"),("test", "3"), ("test2", "5"), ("test3", "7"), ("test55", "86"))
val data2 = Seq(("test", "3"),("test", "3"), ("test2", "5"), ("test3", "6"), ("test33", "76"))
val df1 = data.toDF("name", "dep_id")
val df2 = data2.toDF("name", "dep_id")
df1.createOrReplaceTempView("employeesTableTempview")
df2.createOrReplaceTempView("departmentTableTempview")
val result = spark.sql("select * from employeesTableTempview where dep_id in (select dep_id from departmentTableTempview)")
result.count
我將 autoBroadcastJoinThreshold 設定為 -1,因為我假設您的資料集將大于此引數的默認 10mb
此 Sql 查詢生成此計劃:

正如您所看到的,spark 正在執行 SMJ,對于大于 10mb 的資料集,大多數情況下都是這種情況。這需要對資料進行洗牌然后排序,以便其安靜的繁重操作
現在讓我們檢查選項 2(第一行代碼與之前相同):
val depidList = df1.map(x=>x.getString(1)).collect().toList
val empdf2 = df2.filter(col("dep_id").isin(depidList:_*))
empdf2.count
對于這個選項計劃是不同的。您顯然沒有連接,但有兩個單獨的 sql。首先是讀取 DepDF 資料集,然后收集一列作為串列。在第二個 sql 中,此串列用于過濾 empDF 資料集中的資料。
當 DepDF 相對較小時應該沒問題,但如果您需要更通用的解決方案,您可能會堅持使用將決議加入的子查詢。您還可以使用 Spark df api 直接在資料幀上使用 join
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/535800.html
上一篇:scala-akka-streamsrun()和runWIth()之間的區別
下一篇:將正斜杠替換為反斜杠然后正斜杠
