我有兩個資料框:
RegionValues:
----------- ---------- ----------------------
|marketplace|primary_id|values |
----------- ---------- ----------------------
|xyz |0000000001|[cat, dog, cow] |
|reg |PRT0000001|[hippo, dragon, moose]|
|asz |0000001333|[mouse, rhino, lion] |
----------- ---------- ----------------------
Marketplace:
---------- ----------- ----------
|primary_id|marketplace|parent_id |
---------- ----------- ----------
|0000000001|xyz |PRT0000001|
|0000000002|wrt |PRT0000001|
|PRT0000001|reg |PRT0000001|
|PRT00MISS0|asz |PRT00MISS0|
|000000000B|823 |PRT0000002|
---------- ----------- ----------
當我將資料幀連接在一起時,我想根據primary_id值將它們連接起來,但是如果該primary_id欄位不存在于RegionValues資料幀中,那么我想回parent_id退到 ===上加入primary_id。所以我想要的輸出是:
---------- -------------- ----------- -------------------------------------
|primary_id|marketplace |parent_id |values |
---------- -------------- ----------- -------------------------------------
|0000000001|... |PRT0000001 |[cat, dog, cow] |
|0000000002|... |PRT0000001 |[hippo, dragon, moose] |
|PRT0000001|... |PRT0000001 |[hippo, dragon, moose] |
|PRT00MISS0| |PRT00MISS0 |null |
|0000001333| |0000001333 |[mouse, rhino, lion] |
|000000000B| |PRT0000002 |null |
---------- -------------- ----------- -------------------------------------
需要注意的是0000000001保持了其原有values而是0000000002把它PARENT_ID的values,因為它不存在RegionValues。是否可以在 join 陳述句中完成此邏輯?我正在使用 Scala 和 Spark。
我曾嘗試使用這樣的連接陳述句,但這會導致值的空0000000002值:
val parentIdJoinCondition = when(
(regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id")).isNull,
marketplaceDf.col("parent_id") === regionValuesDf.col("primary_id")
).otherwise(regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id"))
val joinedDf = regionDf.join(
marketplaceDf,
parentIdJoinCondition,
"outer"
)
我想我可以通過使用 3 個不同的連接來獲得我想要的結果,但這似乎沒有必要而且更難閱讀。
uj5u.com熱心網友回復:
創建自定義條件將導致 Spark 執行交叉聯接,這是一種非常低效的聯接方式。此外,在執行實際連接之前,Spark 無法知道列不匹配,因此您的條件regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id")).isNull將始終回傳 false。
因此,正如您猜對的那樣,最好的解決方案是執行多個連接。您可以以兩個連接結束。首先連接確定我們是否應該使用primary_id或parent_id值用于外連接,以及實際的外連接。然后,您可以合并primary_id,marketplace并parent_id和無用的下降列
所以代碼將是:
import org.apache.spark.sql.functions.{coalesce, col, when}
val joinedDf = marketplaceDf.join(regionDf.drop("marketPlace"), Seq("primary_id"), "left")
.withColumn("join_key", when(col("values").isNotNull, col("primary_id")).otherwise(col("parent_id")))
.drop("values")
.join(
regionDf
.withColumnRenamed("primary_id", "join_key")
.withColumnRenamed("marketplace", "region_marketplace"),
Seq("join_key"),
"outer"
)
.withColumn("primary_id", coalesce(col("primary_id"), col("join_key")))
.withColumn("parent_id", coalesce(col("parent_id"), col("join_key")))
.withColumn("marketplace", coalesce(col("marketplace"), col("region_marketplace")))
.drop("join_key", "region_marketplace")
這為您提供以下joinDf資料框:
---------- ----------- ---------- ----------------------
|primary_id|marketplace|parent_id |values |
---------- ----------- ---------- ----------------------
|0000000001|xyz |PRT0000001|[cat, dog, cow] |
|0000001333|asz |0000001333|[mouse, rhino, lion] |
|0000000002|wrt |PRT0000001|[hippo, dragon, moose]|
|PRT0000001|reg |PRT0000001|[hippo, dragon, moose]|
|000000000B|823 |PRT0000002|null |
|PRT00MISS0|asz |PRT00MISS0|null |
---------- ----------- ---------- ----------------------
uj5u.com熱心網友回復:
不應該在您的加入宣告中regionValuesDf.col("primary_id") =!= marketplaceDf.col("primary_id"))代替regionValuesDf.col("primary_id") === marketplaceDf.col("primary_id")).isNull幫助嗎?
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/363978.html
