當我們從 eventhub 讀取事件時,我們試圖實作 badRecordsPath,作為嘗試讓它作業的例子,我已經放入了應該使事件失敗的模式:
eventStreamDF = (spark.readStream
.format("eventhubs")
.options(**eventHubsConf)
.option("badRecordsPath", "/tmp/badRecordsPath/test1")
.schema(badSchema)
.load()
)
然而,這永遠不會失敗并且總是讀取事件,這是資料塊的 eventhub 的讀取流的行為嗎?目前的解決方法是根據我們自己的模式檢查 inferSchema。
uj5u.com熱心網友回復:
EventHubs 中的資料架構是固定的(參見檔案)(Kafka 也是如此)——實際的有效載荷總是編碼為帶有 name 的二進制欄位,body開發人員根據生產者之間的“聯系”來解碼這個二進制有效載荷(s) 資料和該資料的消費者。因此,即使您指定架構和badRecordsPath選項,也不會使用它們。
您將需要實作一些函式來解碼來自 JSON 的資料或其他東西,例如,如果資料損壞,則回傳 null,然后您將有一個空值過濾器,將流拆分為兩個子流 - 好壞資料。
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/325188.html
標籤:天蓝色 数据块 azure-databricks azure-eventhub
上一篇:誰以及何時洗掉了azure資源組
