我想每 5 秒閱讀一個主題;對于舊版本,pyspark我可以使用 kafka-utils 和 window 方法,但目前,我不能使用它。
現在我正在kafka使用spark以下代碼加載資料
spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", 'localhost:9092') \
.option("subscribe", 'data') \
.load()
但是,我正在閱讀所有資料。
所以我想知道如果可能的話,我如何以每 5 秒 1 秒的批次大小讀取資料。
謝謝
uj5u.com熱心網友回復:
假設您希望每 5 秒間隔一次聚合和分組,請參閱有關視窗的檔案
這應該定義一個翻滾視窗
kafka_df \
.withWatermark("timestamp", "5 seconds") \
.groupBy(
window(kafka_df.timestamp, "5 seconds", "1 second"),
kafka_df.value)
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/416374.html
標籤:
