我有一個通用的 SQL 查詢問題,我認為可以用各種 SQL 風格來回答,盡管我下面的示例使用的是 spark sql。
我試圖將表 1 ( t1) 連接到表 2 ( t2),目標如下:
- 保留所有值
t1(因此左連接) t2根據聚合函式選擇一列t2
下面是一些測驗資料:
t1
--- --------
|pk1|constant|
--- --------
| a|constant|
| b|constant|
| c|constant|
| d|constant|
--- --------
t2
--- --------- ------
|fk1|condition|target|
--- --------- ------
| a| 1|check1|
| a| 2|check2|
| b| 1|check1|
| b| 2|check2|
--- --------- ------
以下是幾個(失敗的)示例查詢:
spark.sql("""
select
pk1,
constant,
target
from
t1
left join
t2
on
pk1 = fk1
group by
pk1, constant, target
having
min(condition)
""").show
--- -------- ------
|pk1|constant|target|
--- -------- ------
| b|constant|check1|
| a|constant|check2|
| a|constant|check1|
| b|constant|check2|
--- -------- ------
查詢 1 的問題:我丟失了in ,的t1行。它看起來像我的內部連接而不是左連接。pk1'c''d'
spark.sql("""
select
pk1,
constant,
min(condition),
target
from
t1
left join
t2
on
pk1 = fk1
group by
pk1, constant, target
""").show
--- -------- -------------- ------
|pk1|constant|min(condition)|target|
--- -------- -------------- ------
| a|constant| 1|check1|
| a|constant| 2|check2|
| b|constant| 2|check2|
| b|constant| 1|check1|
| c|constant| null| null|
| d|constant| null| null|
--- -------- -------------- ------
查詢 2 的問題:我不再有條件最小值的過濾器。例如,對于pk1 = a,我采用了condition = 1和condition = 2。min 函式似乎沒有被應用。
期望輸出
--- -------- -------------- ------
|pk1|constant|min(condition)|target|
--- -------- -------------- ------
| a|constant| 1|check1|
| b|constant| 1|check1|
| c|constant| null| null|
| d|constant| null| null|
--- -------- -------------- ------
該min(condition)列是可選的。反正我稍后會過濾掉它。
我可以通過將查詢分成兩個陳述句來創建所需的輸出,但我覺得這里必須有一個優雅的單查詢解決方案。有人會知道如何做到這一點嗎?謝謝!
附錄
以下是構建測驗表的命令,以防有人想復制測驗:
val columns1 = Seq("pk1", "constant")
val columns2 = Seq("fk1","condition","target")
val data1 = Seq( ("a","constant"), ("b","constant"), ("c","constant"), ("d","constant") )
val data2 = Seq( ("a",1,"check1"), ("a",2,"check2"), ("b",1,"check1"), ("b",2,"check2") )
val t1 = spark.createDataFrame(data1).toDF(columns1:_*)
val t2 = spark.createDataFrame(data2).toDF(columns2:_*)
uj5u.com熱心網友回復:
首先 group by t2onfk1并使用 min onstruct(condition, target)得到 min 條件對應的目標值,然后將分組結果與 連接起來t1:
spark.sql("""
WITH t3 AS (
SELECT fk1,
MIN(struct(condition, target))['target'] AS target
FROM t2
GROUP BY fk1
)
SELECT pk1,
constant,
target
FROM t1
LEFT JOIN t3
ON pk1 = fk1
""").show
// --- -------- ------
//|pk1|constant|target|
// --- -------- ------
//| a|constant|check1|
//| b|constant|check1|
//| c|constant| null|
//| d|constant| null|
// --- -------- ------
使用row_number()視窗函式的另一種方法:
spark.sql("""
WITH t3 AS (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY fk1 ORDER BY condition) AS rn
FROM t2
)
SELECT pk1,
constant,
target
FROM t1
LEFT JOIN t3
ON pk1 = fk1
AND rn = 1
""").show
如果你想離開加入然后做聚合:
spark.sql("""
SELECT pk1,
MAX(constant) AS constant,
MIN(struct(condition, target))['target'] AS target
FROM t1
LEFT JOIN t2
ON pk1 = fk1
GROUP BY pk1
""").show
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/372903.html
標籤:sql 阿帕奇火花 apache-spark-sql 左连接
