我正在使用 Spark Structured Streaming 來計算用戶的每月數量。我正在使用以下代碼:
df = spark
.readStream
.format('kafka')
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load()
df1 = df.groupby('client_id', 'id', window(col('date'), "30 days"))
.agg(sum(col('amount')).alias('amount'), count(col('id')).alias('count'))
df1.selectExpr("CAST(client_id AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream
.format('kafka')
........
我觀察到輸出不正確。例如:
輸入
{"client_id":"1", "id":"1", "date":"2022-08-01", "amount": 10.0}
{"client_id":"1", "id":"1", "date":"2022-08-15", "amount": 10.0}
{"client_id":"1", "id":"1", "date":"2022-08-25", "amount": 10.0}
{"client_id":"1", "id":"1", "date":"2022-08-26", "amount": 10.0}
{"client_id":"1", "id":"1", "date":"2022-08-27", "amount": 10.0}
{"client_id":"1", "id":"1", "date":"2022-08-28", "amount": 10.0}
{"client_id":"1", "id":"1", "date":"2022-08-29", "amount": 10.0}
輸出
{"client_id":"1","id":"1","amount":10.0,"count":1}
{"client_id":"1","id":"1","amount":20.0,"count":2}
{"client_id":"1","id":"1","amount":30.0,"count":3}
{"client_id":"1","id":"1","amount":40.0,"count":4}
{"client_id":"1","id":"1","amount":50.0,"count":5}
{"client_id":"1","id":"1","amount":10.0,"count":1}
{"client_id":"1","id":"1","amount":20.0,"count":2}
第一個輸入記錄在“2022-08-01”,數量為 10。所以它應該總結接下來 30 天的數量。所以最終的總和應該是 70,但它是 50,然后是 20。它正在計算接下來 27 天的總和。可以看到“計數”和“金額”在“2022-08-28”更新。它不會匯總 30 天的記錄。
uj5u.com熱心網友回復:
根據檔案,有第四個論點......
startTime:str,可選
相對于 1970-01-01 00:00:00 UTC 的偏移量,用于啟動視窗間隔。例如,為了讓每小時翻滾視窗在整點后 15 分鐘開始,例如 12:15-13:15, 13:15-14:15... 將startTime 設定為15 分鐘。
這意味著,視窗是自 1970-01-01 00:00:00 UTC 以來的固定持續時間,除非您指定其他一些startTime必須是從該時間點開始的偏移間隔。
如果你使用window沒有startTime指定 and的函式windowDuration="30 days",你會得到從 1970 年 1 月 1 日 00:00:00 UTC 開始劃分為 30 天的時間間隔。在你的情況下,我真的不明白為什么兩個 30 天視窗之間的界線在 2022 年 8 月 28 日,因為對我來說它是在 2022 年 8 月 26 日:
{2022-07-27 00:00:00, 2022-08-26 00:00:00}
{2022-08-26 00:00:00, 2022-09-25 00:00:00}
盡管確切日期有所不同,但邏輯是相同的:30 天視窗是從某個特定時間點固定創建的。
如果我指定startTime="1 day",我會轉移 30 天的視窗:
{2022-07-28 00:00:00, 2022-08-27 00:00:00}
{2022-08-27 00:00:00, 2022-09-26 00:00:00}
只有仔細選擇,startTime我們才能讓時間視窗從我們想要的日期開始,但它會一直固定到startTime.
為了得到你想要的,你可能會使用slideDuration函式的引數window并根據聚合結果進行過濾:
df1 = (df
.groupby('client_id', 'id', F.window('date', "30 days", "1 day"))
.agg(F.sum('amount').alias('amount'), F.count('id').alias('count'))
.filter((F.date_add(F.sort_array(F.collect_set('date'), False)[0], 1) == F.col("window.end")))
).selectExpr("CAST(client_id AS STRING) AS key", "to_json(struct(*)) AS value")
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/512107.html
