我有 sparkSQL 資料框,其中包含唯一代碼、月份日期和營業額。我想遍歷每個月的日期,以獲得每年營業額的 12 個月營業額總和。例如,如果月份日期是 2022 年 1 月,那么總和將從 2021 年 1 月到 2022 年 1 月。例如,如果我有 2021 年和 2022 年的資料。turnover_per_year in january 1st 2022 = turnover Jan 2021 turnover feb 2021 turnover march 2021 ... turnover dec 2021 turnover jan 2022但是如果我想獲得 2021 年 1 月的年營業額,那么它將為空因為我沒有2020年的資料。
這是資料框的示例
------ ------------ ----------
| code | monthdate | turnover |
------ ------------ ----------
| AA1 | 2021-01-01 | 10 |
------ ------------ ----------
| AA1 | 2021-02-01 | 20 |
------ ------------ ----------
| AA1 | 2021-03-01 | 30 |
------ ------------ ----------
| AA1 | 2021-04-01 | 40 |
------ ------------ ----------
| AA1 | 2021-05-01 | 50 |
------ ------------ ----------
| AA1 | 2021-06-01 | 60 |
------ ------------ ----------
| AA1 | 2021-07-01 | 70 |
------ ------------ ----------
| AA1 | 2021-08-01 | 80 |
------ ------------ ----------
| AA1 | 2021-09-01 | 90 |
------ ------------ ----------
| AA1 | 2021-10-01 | 100 |
------ ------------ ----------
| AA1 | 2021-11-01 | 101 |
------ ------------ ----------
| AA1 | 2021-12-01 | 102 |
------ ------------ ----------
| AA1 | 2022-01-01 | 103 |
------ ------------ ----------
| AA1 | 2022-02-01 | 104 |
------ ------------ ----------
| AA1 | 2022-03-01 | 105 |
------ ------------ ----------
| AA1 | 2022-04-01 | 106 |
------ ------------ ----------
| AA1 | 2022-05-01 | 107 |
------ ------------ ----------
| AA1 | 2022-06-01 | 108 |
------ ------------ ----------
| AA1 | 2022-07-01 | 109 |
------ ------------ ----------
| AA1 | 2022-08-01 | 110 |
------ ------------ ----------
| AA1 | 2022-09-01 | 111 |
------ ------------ ----------
| AA1 | 2022-10-01 | 112 |
------ ------------ ----------
| AA1 | 2022-11-01 | 113 |
------ ------------ ----------
| AA1 | 2022-12-01 | 114 |
------ ------------ ----------
我對 spark 和 scala 很陌生,以 spark scala 的方式解決這個問題讓我很困惑。我已經開發了邏輯,但很難將其轉化為 spark scala。我正在使用集群模式。這是我的邏輯。
listkey = df.select("code").distinct.map(r => r(0)).collect())
listkey.foreach(key=>
df.select(*).filter("code==${key}").oderBy("monthdate").foreach(
row=>
var monthdate = row.monthdate
var turnover = row.turnover
var sum = 0
sum = sum turnover
var n = 1
var i = 1
while (n<12){
var monthdate_temp = datetime-i
var turnover_temp =
df.select("turnover").filter("monthdate=${monthdate_temp} and code =${key}").collect()
sum = sum turnover_temp
n =n 1
i = i 1
}
row = row.withColumn("turnover_per_year",sum)
)
)
任何幫助將不勝感激,在此先感謝
uj5u.com熱心網友回復:
原始資料幀中的每一行可以通過“explode”函式擴展為 12 行回溯日期,并將結果與??原始資料幀連接,并分組:
val df = Seq(
("AA1", "2021-01-01", 25),
("AA1", "2022-01-01", 103)
)
.toDF("code", "monthdate", "turnover")
.withColumn("monthdate", to_date($"monthdate", "yyyy-MM-dd"))
val oneYearBackMonths = (0 to 12).map(n => lit(-n))
val explodedWithBackMonths = df
.withColumn("shift", explode(array(oneYearBackMonths: _*)))
.withColumn("rangeMonth",expr("add_months(monthdate, shift)"))
val joinCondition = $"exploded.code" === $"original.code" &&
$"exploded.rangeMonth" === $"original.monthdate"
explodedWithBackMonths.alias("exploded")
.join(df.alias("original"), joinCondition)
.groupBy($"exploded.code", $"exploded.monthdate")
.agg(sum($"original.turnover").alias("oneYearTurnover"))
結果:
---- ---------- ---------------
|code|monthdate |oneYearTurnover|
---- ---------- ---------------
|AA1 |2021-01-01|25 |
|AA1 |2022-01-01|128 |
---- ---------- ---------------
uj5u.com熱心網友回復:
你可以使用 Spark 的 Window 功能
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val raw = Seq(
("AA1", "2019-01-01", 25),
("AA1", "2021-01-01", 25),
("AA1","2021-08-01",80),
("AA1" ,"2021-09-01" , 90 ),
("AA1", "2022-01-01", 103),
("AA2", "2022-01-01", 10)
).toDF("code", "monthdate", "turnover")
val df = raw.withColumn("monthdate",to_timestamp($"monthdate","yyyy-mm-dd"))
val pw = Window.partitionBy($"code").orderBy($"monthdate".cast("long")).rangeBetween(-(86400*365), 0)
df.withColumn("sum",sum($"turnover").over(pw)).show()
---- ------------------- -------- ---
|code| monthdate|turnover|sum|
---- ------------------- -------- ---
| AA1|2019-01-01 00:01:00| 25| 25|
| AA1|2021-01-01 00:01:00| 25| 25|
| AA1|2021-01-01 00:08:00| 80|105|
| AA1|2021-01-01 00:09:00| 90|195|
| AA1|2022-01-01 00:01:00| 103|298|
| AA2|2022-01-01 00:01:00| 10| 10|
---- ------------------- -------- ---
uj5u.com熱心網友回復:
我創建了 2 個視窗函式用于測驗,請您檢查一下并評論這是否可以。
val w = Window.partitionBy($"code")
.orderBy($"rownum")
.rowsBetween(-11, Window.currentRow)
val w1 = Window.partitionBy($"code")
.orderBy($"monthdate")
val newDf = initDf.withColumn("rownum", row_number().over(w1))
.withColumn("csum",sum("turnover").over(w))
我們可能需要先按月和年分組,然后取該月營業額的總和,然后按月對該代碼進行排序
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/459756.html
