我有以下 Spark SQL 查詢:
val subquery =
"( select garment_group_name , prod_name, "
"row_number() over (partition by garment_group_name order by count(prod_name) desc) as seqnum "
"from articles a1 "
"group by garment_group_name, prod_name )"
val query = "SELECT garment_group_name, prod_name "
"FROM " subquery
" WHERE seqnum = 1 "
val query3 = spark.sql(query)
我正在嘗試做與資料框 API 完全相同的事情。我想首先專注于子查詢部分,我做了這樣的事情
import org.apache.spark.sql.expressions.Window // imports the needed Window object
import org.apache.spark.sql.functions.row_number
val windowSpec = Window.partitionBy("garment_group_name")
articlesDF.withColumn("row_number", row_number.over(windowSpec))
.show()
但是我收到以下錯誤
org.apache.spark.sql.AnalysisException: Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:42)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveWindowOrder$$anonfun$apply$33.applyOrElse(Analyzer.scala:2207)......... and so on.
我看到我需要包含一個orderBy子句,但是如果我實際上是先從兩列的 group by 計數,然后按順序排列,我該怎么做?
警告給出了示例:SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table,但我不知道如何作為資料框 API 執行此操作,并且我沒有在網上看到這個。
uj5u.com熱心網友回復:
解決方案是首先count("prod_name")在由兩者磁區的 Window 中執行"garment_group_name","prod_name"然后在windowSpec.
從一些示例資料開始:
val df = List(
("a", "aa1"), ("a", "aa2"), ("a", "aa3"), ("b", "bb")
)
.toDF("garment_group_name", "prod_name")
df.show(false)
給出:
------------------ ---------
|garment_group_name|prod_name|
------------------ ---------
|a |aa1 |
|a |aa2 |
|a |aa3 |
|b |bb |
------------------ ---------
以及我們需要的兩個視窗函式:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
val countWindowSpec = Window.partitionBy("garment_group_name", "prod_name")
val windowSpec = Window.partitionBy(col("garment_group_name")).orderBy(col("count").desc)
然后我們可以使用它們:
df
// create the `count` column to be used by `windowSpec`
.withColumn("count", count(col("prod_name")).over(countWindowSpec))
.withColumn("seqnum", row_number.over(windowSpec))
// take only the first row of each partition
.filter(col("seqnum") === 1)
// select only the rows we care about
.select("garment_group_name", "prod_name")
.show(false)
這使:
------------------ ---------
|garment_group_name|prod_name|
------------------ ---------
|a |aa1 |
|b |bb |
------------------ ---------
將此與您的 SQL 實作進行比較,使用相同的df:
df.createOrReplaceTempView("a1")
val subquery =
"( select garment_group_name , prod_name, "
"row_number() over (partition by garment_group_name order by count(prod_name) desc) as seqnum "
"from a1 "
"group by garment_group_name, prod_name )"
val query = "SELECT garment_group_name, prod_name "
"FROM " subquery
" WHERE seqnum = 1 "
spark.sql(query).show(false)
我們得到相同的結果:
------------------ ---------
|garment_group_name|prod_name|
------------------ ---------
|a |aa1 |
|b |bb |
------------------ ---------
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/459749.html
標籤:sql服务器 斯卡拉 阿帕奇火花 apache-spark-sql
上一篇:在元組中查找元素最小值的最佳方法
