我有一個資料幀在標簽欄包含不同的key->values.我試圖篩選出values的資訊,其中key=name。過濾掉的資訊應該放在一個新的資料框中。
初始df具有以下架構:
root
|-- id: long (nullable = true)
|-- type: string (nullable = true)
|-- tags: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- nds: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- ref: long (nullable = true)
|-- members: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- type: string (nullable = true)
| | |-- ref: long (nullable = true)
| | |-- role: string (nullable = true)
|-- visible: boolean (nullable = true)
我想要一個newdf架構:
root
|-- place: string (nullable = true)
|-- num_evacuees string (nullable = true)
我應該怎么做過濾器?我嘗試了很多方法,至少我嘗試過使用普通過濾器。但是每次過濾器的結果都是一個空的資料幀。例如:
val newdf = df.filter($"tags"("key") contains "name")
val newdf = df.where(places("tags")("key") === "name")
我嘗試了很多方法,但都沒有奏效 我應該如何進行適當的過濾
uj5u.com熱心網友回復:
您可以通過以下方式實作您想要的結果:
val df = Seq(
(1L, Map("sf" -> "100")),
(2L, Map("ny" -> "200"))
).toDF("id", "tags")
val resultDf = df
.select(explode(map_filter(col("tags"), (k, _) => k === "ny")))
.withColumnRenamed("key", "place")
.withColumnRenamed("value", "num_evacuees")
resultDf.printSchema
resultDf.show
這將顯示:
root
|-- place: string (nullable = false)
|-- num_evacuees: string (nullable = true)
----- ------------
|place|num_evacuees|
----- ------------
| ny| 200|
----- ------------
關鍵思想是使用map_filter從您想要的地圖中選擇欄位,然后explode將地圖轉換為兩列(key和value),然后您可以重命名以DataFrame匹配您的規范。
上面的例子假設你想得到一個值來演示這個想法。by 使用的 lambda 函式map_filter可以根據需要盡可能復雜。它的簽名map_filter(expr: Column, f: (Column, Column) => Column): Column表明,只要你回傳一個Column它就會很高興。
如果您想過濾大量條目,您可以執行以下操作:
val resultDf = df
.withColumn("filterList", array("sf", "place_n"))
.select(explode(map_filter(col("tags"), (k, _) => array_contains(col("filterList"), k))))
uj5u.com熱心網友回復:
這個想法是提取映射列的鍵(標簽),然后使用 array_contains 來檢查名為“name”的鍵。
import org.apache.spark.sql.functions._
val newdf = df.filter(array_contains(map_keys($"tags), "name"))
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/322621.html
標籤:爪哇 斯卡拉 阿帕奇火花 apache-spark-sql 可扩展性
