我們有一個啟用了標頭的 Kafka 流
.option("includeHeaders", true)
從而使它們存盤為高級資料集的列,承載帶有鍵和值的內部結構陣列:
root
|-- topic: string (nullable = true)
|-- key: string (nullable = true)
|-- value: string (nullable = true)
|-- timestamp: string (nullable = true)
|-- headers: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key: string (nullable = true)
| | |-- value: binary (nullable = true)
我可以使用陣列中的順序訪問所需的標頭,如下所示:
val controlDataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaLocation)
.option("includeHeaders", true)
.option("failOnDataLoss", value = false)
.option("subscribe", "mytopic")
.load()
.withColumn("acceptTimestamp", element_at(col("headers"),1))
.withColumn("acceptTimestamp2", col("acceptTimestamp.value").cast("STRING"))
但是這個解決方案看起來很脆弱,因為在另一端產生的標題的順序總是可以隨著更新而改變,而只有鍵名在那里看起來很穩定。如何通過結構鍵查找并提取所需的結構而不是指向陣列索引?
UPD。
感謝 Alex Ott 的 davice,我找到了將我想要的內容放入以下列的解決方案:
.withColumn("headers1", map_from_entries(col("headers")))
.withColumn("acceptTimestamp2", col("headers1.acceptTimestamp").cast("STRING"))
uj5u.com熱心網友回復:
您可以使用map_from_entries函式將結構陣列轉換為可以按名稱訪問條目的映射。
import org.apache.spark.sql.functions.map_from_entries
....
select(map_from_entries("headers").alias("headers"), ...)
但我記得,標頭名稱可能不是唯一的,這是將它們作為鍵/值對陣列發送的主要原因。
另一種方法是使用過濾器功能按名稱查找標題 - 這將允許處理非唯一標題。
PS 我使用了 Python 檔案,因為我可以鏈接各個函式——在 Scala 檔案中做到這一點并不容易。
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/416624.html
標籤:
上一篇:如何使用組合框架獲取展開值
