我有這個資料框,中間有很多缺失的日期
df = pd.DataFrame({'date':['2021-12-1','2021-12-2','2021-12-21','2021-12-1','2021-12-7','2021-12-1','2021-12-5','2021-12-1','2021-12-5'],
'id1':['a1','a1','a1','a1','a1','a2','a2','a2','a2'],
'id2':['b1','b1','b1','b2','b2','b3','b3','b4','b4'],
'value1':[1,5,7,2,9,3,0,1,7],
'value2':[6,2,8,1,9,3,0,2,6]})
看起來像這樣
date id1 id2 value1 value2
0 2021-12-1 a1 b1 1 6
1 2021-12-2 a1 b1 5 2
2 2021-12-21 a1 b1 7 8
3 2021-12-1 a1 b2 2 1
4 2021-12-7 a1 b2 9 9
5 2021-12-1 a2 b3 3 3
6 2021-12-5 a2 b3 0 0
7 2021-12-1 a2 b4 1 2
8 2021-12-5 a2 b4 7 6
我希望我的輸出看起來像這樣,頻率從每天更改為每周,每周從星期一開始。
id1 id2 date value1 value2
0 a1 b1 2021-12-06 6 8
1 a1 b1 2021-12-13 0 0
2 a1 b1 2021-12-20 0 0
3 a1 b1 2021-12-27 7 8
4 a1 b2 2021-12-06 2 1
5 a1 b2 2021-12-13 9 9
6 a2 b3 2021-12-06 3 3
7 a2 b4 2021-12-06 8 8
我已經在 Pandas 中完成了編碼首先我用零值填充缺失的日期,然后在第二步中使用resample. 我在這里使用W-Mon這意味著我從星期一開始我的一周。
#Filling missing dates values with zero
df['date'] = pd.to_datetime(df['date'])
df = (df.set_index('date')
.groupby(['id1','id2'])['value1','value2']
.apply(lambda x: x.asfreq('d', fill_value=0))
.reset_index()
[['date','id1','id2','value1','value2']])
#convert to weekly data and set monday as starting day for each week
df = (df.groupby(['id1','id2'])
.resample('W-Mon', label='right', closed = 'left', on='date')
.agg({'value1':'sum',"value2":'sum'} )
.reset_index())
我正在嘗試將我的代碼轉換為 Spark 我已經經歷了這有沒有更簡單的方法?
uj5u.com熱心網友回復:
試試這個。
創建一個tmp資料框,其日期序列從下一個星期一開始,從日期列的最小值開始,間隔為 7 天。然后將其與主資料框連接,然后根據周數之間的差異進行操作:
from pyspark.sql import functions as F
df = df.withColumn("date",F.to_date("date"))
tmp = (df.groupBy("id1","id2").agg(F.min("date").alias("Mindate")
,F.max("date").alias("Maxdate"))
.withColumn("MinMonday",F.next_day("Mindate","Mon"))
.withColumn("MaxMonday",F.next_day("Maxdate","Mon"))
.withColumn("Seq",
F.explode(F.expr("sequence(MinMonday,MaxMonday,interval 7 day)")))
.drop("Mindate","Maxdate","MinMonday","MaxMonday"))
def maskedvalue(col) : return f"""CASE WHEN weekdiff <=1 THEN {col} ELSE 0 END"""
out = (df.alias("left").join(tmp.alias("right"),
on=[df['id1']==tmp['id1'],df['id2']==tmp['id2'],df['date']<=tmp['Seq']])
.select("date","left.id1","left.id2","Seq","value1","value2")
.withColumn("weekdiff",F.weekofyear("Seq")-F.weekofyear("date"))
.withColumn("value1",F.expr(maskedvalue(("value1"))))
.withColumn("value2",F.expr(maskedvalue(("value2"))))
.groupBy("id1","id2","Seq").agg(F.sum("value1").alias("value1")
,F.sum("value2").alias("value2"))
.withColumnRenamed("Seq","Date")
)
out.orderBy("id1","id2","Date").show()
--- --- ---------- ------ ------
|id1|id2| Date|value1|value2|
--- --- ---------- ------ ------
| a1| b1|2021-12-06| 6| 8|
| a1| b1|2021-12-13| 0| 0|
| a1| b1|2021-12-20| 0| 0|
| a1| b1|2021-12-27| 7| 8|
| a1| b2|2021-12-06| 2| 1|
| a1| b2|2021-12-13| 9| 9|
| a2| b3|2021-12-06| 3| 3|
| a2| b4|2021-12-06| 8| 8|
--- --- ---------- ------ ------
請注意,tmp 資料框如下所示:
--- --- ----------
|id1|id2| Seq|
--- --- ----------
| a1| b1|2021-12-06|
| a1| b1|2021-12-13|
| a1| b1|2021-12-20|
| a1| b1|2021-12-27|
| a1| b2|2021-12-06|
| a1| b2|2021-12-13|
| a2| b3|2021-12-06|
| a2| b4|2021-12-06|
--- --- ----------
uj5u.com熱心網友回復:
這將做到這一點,代碼非常簡單,但如果有疑問,請檢查 spark 檔案中的功能
df = df.withColumn('Date', F.next_day('Date','Mon'))
df = df.groupby((['id1','id2','Date'])).agg(*[F.sum(c).alias(c) for c in ['value1', 'value2']])
new_dts = df.groupby(['id1','id2']).agg(
F.array_except(
F.expr('sequence(min(Date), max(Date), interval 1 week)'),
F.collect_set('Date'),
).name('Date')
)
new_dts = new_dts.withColumn('Date', F.explode('Date'))
df = df.union(new_dts).na.fill('0)
df.show()
--- --- ---------- ------ ------
|id1|id2| Date|value1|value2|
--- --- ---------- ------ ------
| a1| b2|2021-12-06| 2| 1|
| a1| b1|2021-12-27| 7| 8|
| a1| b1|2021-12-06| 6| 8|
| a2| b4|2021-12-06| 8| 8|
| a2| b3|2021-12-06| 3| 3|
| a1| b2|2021-12-13| 9| 9|
| a1| b1|2021-12-13| 0| 0|
| a1| b1|2021-12-20| 0| 0|
--- --- ---------- ------ ------
您可能需要考慮當前正在將日期與下一周的星期一對齊。要將您的日期與同一周的星期一對齊,請執行以下操作
F.date_sub(F.next_day('Date','Mon'), 7)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/387647.html
