是否有辦法在spark流作業中的upsert using merge獲得更新/插入的行?
val df = spark.readStream(...)
val deltaTable = DeltaTable.forName("...")
def upsertToDelta(events: DataFrame, batchId: Long) {
deltaTable.as("table")
.merge()
events.as("event"),
"event.entityId == table.entityId")
.whenMatched()
.updateExpr(...))
.whenNotMatched()
.insertAll()
.execute()
}
df
.writeStream
.format("delta")
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
我知道我可以創建另一個作業來從delta表中讀取更新。但是否有可能做同樣的作業?從我所看到的,execute() 回傳 Unit.
uj5u.com熱心網友回復:
你可以在表上啟用Change Data Feed,然后用另一個流或批處理作業來獲取變化,這樣你就能收到關于哪些行被改變/洗掉/插入的資訊。它可以通過以下方式啟用:
ALTER TABLE table_name SET TBLPROPERTIES (delta. enableChangeDataFeed = true)
如果thable沒有注冊,你可以用路徑代替表名:
ALTER TABLE delta. `path` SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
如果你在從表中讀取資料流時加入.option("readChangeFeed", "true")選項,將可以獲得這些變化:
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.table("table_name")
并且它將向表添加三列描述變化--最重要的是_change_type(請注意更新操作有兩種不同型別)。
如果你擔心有另一個流 - 這不是一個問題,因為你可以在同一個作業中運行多個流 - 你只是不需要使用.awaitTermination,而是像spark.streams.awaitAnyTermination()來等待多個流。
P.S. 但是,如果你解釋一下為什么你需要在同一個作業中獲得變化,也許這個答案會改變?
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/328159.html
標籤:
