從kafka流過來的資料有時間和設備的ID,設備的當前狀態,每10秒統計前1分鐘內接收到的資訊的設備數,
val df = kafkaDataFrame
.withWatermark("datatime", "1 minute")
.groupBy(
window($"datatime", "1 minute", "10 seconds"),
$"devId"
).agg(max($"datatime"))
df .writeStream
.outputMode(OutputMode.Append)
.foreachBatch((df: Dataset[Row], batch: Long) =>{
val fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
val dayStr = LocalDateTime.now().format(fmt)
df.foreach(row => println(s"${dayStr}:${row.toSeq}"))
})
看到輸出的資料不是最后一次視窗計算的,包含了前面好幾次視窗匯總資料,我只要最后一次怎么辦?
uj5u.com熱心網友回復:
這是structedStream的api。用stream(SparkStreamingContext)的API就可以了,缺點是無法使用event time轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/10979.html
標籤:Spark
