我在筆記本的 Databricks CE 上運行了它,它產生了一個增量表的輸出。我正在使用
.format("rate")方法。val streamingQuery = aggregatesDF.writeStream .format("delta") .foreachBatch(upsertToDelta _) .outputMode("update") .start()但是,運行它不會產生任何輸出!它在一次呼叫后停止,但表仍然是空的。
原因是?
確定不是 CE 限制?錯誤?
- 這種處理模式不能在一個單元格中運行嗎?
- 音量問題?
- 這就引出了一個問題,Trigger Once 是否只能在 Databricks 環境中使用?我假設我可以在 Linux 下將其作為 jar 運行。
這可能是一個錯誤,這里是:
import org.apache.spark.sql.streaming.Trigger val streamingQuery = aggregatesDF.writeStream .trigger(Trigger.Once()) .format("delta") .foreachBatch(upsertToDelta _) .outputMode("update") .start()
.format("rate"),這可能是問題嗎?這對于原型設計很方便。
uj5u.com熱心網友回復:
Trigger.Once不僅限于 Databricks - 它是 Spark Structured Streaming 的標準功能。但問題是它需要一個有歷史的資料源,因為它觸發了自上次執行以來的資料處理,而rate源沒有歷史,總是從頭開始。很容易展示:
df = spark.readStream.format("rate").load()
df.writeStream.trigger(once=True).option("checkpointLocation", "1.cp") \
.outputMode("append").save("1.parquet")
spark.read.parquet("1.parquet").show()
--------- -----
|timestamp|value|
--------- -----
--------- -----
如果您想繼續rate用于實驗,最好創建一個額外的表作為rate您的代碼之間的緩沖區。像這樣的東西:
# Create a buffer table
df = spark.readStream.format("rate").load()
df.writeStream.trigger(once=True).option("checkpointLocation", "buffer.cp") \
.format("delta").outputMode("append").save("1.delta")
# Use buffer table
bufferDF = spark.read.stream.format("delta").load("1.delta")
aggregatesDF = bufferDF....
streamingQuery = aggregatesDF.writeStream
.trigger(once=True)
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
.format("delta")PS與 - 一起使用沒有意義.foreachBatch,后者優先。
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/416378.html
標籤:
上一篇:PandasUDF:AttributeError:“NoneType”物件沒有屬性“_jvm”(在UDF之外編碼作業正常)
下一篇:將相關資料Spark磁區為行組
